Skip to content

Commit 1dbafa6

Browse files
author
Ian Craggs
committed
Fix max_inflight condition for disconnected buffer #582
1 parent 56f83de commit 1dbafa6

5 files changed

Lines changed: 39 additions & 20 deletions

File tree

org.eclipse.paho.client.mqttv3.test/src/test/java/org/eclipse/paho/client/mqttv3/test/automaticReconnect/OfflineBufferingTest.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ public void testManyMessageBufferAndDeliver() throws Exception {
173173
options.setAutomaticReconnect(true);
174174

175175
// Workaround for Issue #582 - Remove once fixed.
176-
options.setMaxInflight(100);
176+
//options.setMaxInflight(100);
177177

178178
MqttAsyncClient client = new MqttAsyncClient("tcp://localhost:" + proxy.getLocalPort(), methodName, DATA_STORE);
179179
DisconnectedBufferOptions disconnectedOpts = new DisconnectedBufferOptions();
@@ -203,9 +203,11 @@ public void testManyMessageBufferAndDeliver() throws Exception {
203203
isConnected = client.isConnected();
204204
log.info("Proxy Disconnect isConnected: " + isConnected);
205205
Assert.assertFalse(isConnected);
206+
207+
int msg_count = 100;
206208

207-
// Publish 100 messages
208-
for (int x = 0; x < 100; x++) {
209+
// Publish msg_count messages
210+
for (int x = 0; x < msg_count; x++) {
209211
client.publish(topicPrefix + methodName, new MqttMessage(Integer.toString(x).getBytes()));
210212
}
211213
// Enable Proxy
@@ -234,7 +236,7 @@ public void testManyMessageBufferAndDeliver() throws Exception {
234236
Thread.sleep(5000);
235237

236238
// Check that all messages have been delivered
237-
for (int x = 0; x < 100; x++) {
239+
for (int x = 0; x < msg_count; x++) {
238240
boolean recieved = mqttV3Receiver.validateReceipt(topicPrefix + methodName, 1,
239241
Integer.toString(x).getBytes());
240242
Assert.assertTrue(recieved);

org.eclipse.paho.client.mqttv3/src/main/java-templates/org/eclipse/paho/client/mqttv3/internal/ClientComms.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ void internalSend(MqttWireMessage message, MqttToken token) throws MqttException
160160
// Persist if needed and send the message
161161
this.clientState.send(message, token);
162162
} catch(MqttException e) {
163+
token.internalTok.setClient(null); // undo client setting on error
163164
if (message instanceof MqttPublish) {
164165
this.clientState.undo((MqttPublish)message);
165166
}

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/DisconnectedMessageBuffer.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ public int getMessageCount() {
102102
}
103103
}
104104

105+
private int mycount = 0;
105106
/**
106107
* Flushes the buffer of messages into an open connection
107108
*/
@@ -116,12 +117,17 @@ public void run() {
116117
// Publish was successful, remove message from buffer.
117118
deleteMessage(0);
118119
} catch (MqttException ex) {
119-
// Error occurred attempting to publish buffered message likely because the
120-
// client is not connected
121-
// @TRACE 519=Error occurred attempting to publish buffered message due to disconnect. Exception: {0}:{1}.
122-
log.severe(CLASS_NAME, methodName, "519", new Object[] { ex.getReasonCode(), ex.getMessage() });
123-
break;
124-
120+
if (ex.getReasonCode() == MqttException.REASON_CODE_MAX_INFLIGHT) {
121+
// If we get the max_inflight condition, try again after a short
122+
// interval to allow more messages to be completely sent.
123+
try { Thread.sleep(100); } catch (Exception e) {}
124+
} else {
125+
// Error occurred attempting to publish buffered message likely because the
126+
// client is not connected
127+
// @TRACE 519=Error occurred attempting to publish buffered message due to disconnect. Exception: {0}:{1}.
128+
log.severe(CLASS_NAME, methodName, "519", new Object[] { ex.getReasonCode(), ex.getMessage() });
129+
break;
130+
}
125131
}
126132

127133
}

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/DisconnectedMessageBuffer.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,16 @@ public void run() {
110110
// Publish was successful, remove message from buffer.
111111
deleteMessage(0);
112112
} catch (MqttException ex) {
113-
// Error occurred attempting to publish buffered message likely because the client is not connected
114-
// @TRACE 519=Error occurred attempting to publish buffered message due to disconnect. Exception: {0}.
115-
//log.warning(CLASS_NAME, methodName, "519", new Object[]{ex.getMessage()});
116-
Thread.yield();
113+
if (ex.getReasonCode() == MqttClientException.REASON_CODE_MAX_INFLIGHT) {
114+
// If we get the max_inflight condition, try again after a short
115+
// interval to allow more messages to be completely sent.
116+
try { Thread.sleep(100); } catch (Exception e) {}
117+
} else {
118+
// Error occurred attempting to publish buffered message likely because the client is not connected
119+
// @TRACE 519=Error occurred attempting to publish buffered message due to disconnect. Exception: {0}.
120+
log.warning(CLASS_NAME, methodName, "519", new Object[]{ex.getMessage()});
121+
break;
122+
}
117123
}
118124
}
119125
}

org.eclipse.paho.mqttv5.client/src/test/java/org/eclipse/paho/mqttv5/client/test/automaticReconnect/OfflineBufferingTest.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -157,12 +157,16 @@ public void testManyMessageBufferAndDeliver() throws Exception {
157157
// Tokens
158158
IMqttToken connectToken;
159159

160+
int msg_count = 1000;
161+
160162
// Client Options
161163
MqttConnectionOptions options = new MqttConnectionOptions();
162164
options.setCleanStart(true);
163165
options.setAutomaticReconnect(true);
166+
164167
MqttAsyncClient client = new MqttAsyncClient("tcp://localhost:" + proxy.getLocalPort(), methodName, DATA_STORE);
165168
DisconnectedBufferOptions disconnectedOpts = new DisconnectedBufferOptions();
169+
disconnectedOpts.setBufferSize(msg_count);
166170
disconnectedOpts.setBufferEnabled(true);
167171
client.setBufferOpts(disconnectedOpts);
168172

@@ -189,8 +193,8 @@ public void testManyMessageBufferAndDeliver() throws Exception {
189193
log.info("Proxy Disconnect isConnected: " + isConnected);
190194
Assert.assertFalse(isConnected);
191195

192-
// Publish 100 messages
193-
for (int x = 0; x < 100; x++) {
196+
// Publish some messages
197+
for (int x = 0; x < msg_count; x++) {
194198
client.publish(topicPrefix + methodName, new MqttMessage(Integer.toString(x).getBytes()));
195199
}
196200
// Enable Proxy
@@ -219,11 +223,11 @@ public void testManyMessageBufferAndDeliver() throws Exception {
219223
Thread.sleep(5000);
220224

221225
// Check that all messages have been delivered
222-
for (int x = 0; x < 100; x++) {
223-
boolean recieved = mqttV3Receiver.validateReceipt(topicPrefix + methodName, 0, Integer.toString(x).getBytes());
224-
Assert.assertTrue(recieved);
226+
for (int x = 0; x < msg_count; x++) {
227+
boolean received = mqttV3Receiver.validateReceipt(topicPrefix + methodName, 0, Integer.toString(x).getBytes());
228+
Assert.assertTrue(received);
225229
}
226-
log.info("All messages sent and Recieved correctly.");
230+
log.info("All messages sent and received correctly.");
227231
IMqttToken disconnectToken = client.disconnect();
228232
disconnectToken.waitForCompletion(5000);
229233
client.close();

0 commit comments

Comments
 (0)