Skip to content

Commit 01967fb

Browse files
committed
Issue #548 - Fixing duplicate QoS 2 issue
Signed-off-by: James Sutton <james.sutton@uk.ibm.com>
1 parent 44bdf6a commit 01967fb

5 files changed

Lines changed: 67 additions & 6 deletions

File tree

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientState.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,6 +1167,8 @@ protected void handleInboundPubRel(MqttPubRel pubRel) throws MqttException {
11671167
// Currently this client has no need of the properties, so this is left empty.
11681168
MqttPubComp pubComp = new MqttPubComp(MqttReturnCode.RETURN_CODE_SUCCESS, pubRel.getMessageId(),
11691169
new MqttProperties());
1170+
// TODO - Fix Trace
1171+
log.info(CLASS_NAME, methodName, "Creating MqttPubComp due to pubRel: " + pubComp.toString());
11701172
this.send(pubComp, null);
11711173
}
11721174
}

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/CommsCallback.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -453,12 +453,16 @@ private void handleMessage(MqttPublish publishMessage) throws Exception {
453453
this.clientComms.internalSend(
454454
new MqttPubAck(MqttReturnCode.RETURN_CODE_SUCCESS, publishMessage.getMessageId(), new MqttProperties()),
455455
new MqttToken(clientComms.getClient().getClientId()));
456-
} else if (publishMessage.getMessage().getQos() == 2) {
456+
}
457+
458+
/*else if (publishMessage.getMessage().getQos() == 2) {
457459
this.clientComms.deliveryComplete(publishMessage);
458460
MqttPubComp pubComp = new MqttPubComp(MqttReturnCode.RETURN_CODE_SUCCESS,
459461
publishMessage.getMessageId(), new MqttProperties());
462+
log.info(CLASS_NAME, methodName, "Creating MqttPubComp due to handleMessage: " + pubComp.toString());
463+
460464
this.clientComms.internalSend(pubComp, new MqttToken(clientComms.getClient().getClientId()));
461-
}
465+
}*/
462466
}
463467
}
464468

@@ -469,6 +473,8 @@ public void messageArrivedComplete(int messageId, int qos) throws MqttException
469473
} else if (qos == 2) {
470474
this.clientComms.deliveryComplete(messageId);
471475
MqttPubComp pubComp = new MqttPubComp(MqttReturnCode.RETURN_CODE_SUCCESS, messageId, new MqttProperties());
476+
log.info(CLASS_NAME, "messageArrivedComplete", "Creating MqttPubComp due to messageArrivedComplete: " + pubComp.toString());
477+
472478
this.clientComms.internalSend(pubComp, new MqttToken(clientComms.getClient().getClientId()));
473479
}
474480
}

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/wire/MqttInputStream.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ public MqttWireMessage readMqttWireMessage() throws IOException, MqttException {
121121
byte[] header = bais.toByteArray();
122122
System.arraycopy(header,0,packet,0, header.length);
123123
message = MqttWireMessage.createWireMessage(packet);
124-
// @TRACE 501= received {0}
125-
log.fine(CLASS_NAME, methodName, "501",new Object[] {message});
124+
// @TRACE 530= Received {0}
125+
log.fine(CLASS_NAME, methodName, "530",new Object[] {message});
126126
}
127127
} catch (SocketTimeoutException e) {
128128
// ignore socket read timeout

org.eclipse.paho.mqttv5.client/src/main/resources/org/eclipse/paho/mqttv5/client/internal/nls/logcat.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
517=Un-Persisting Buffered message key={0}
8282
518=Failed to Un-Persist Buffered message key={0}
8383
529=Sent {0}
84+
530=Received {0}
8485
600=>
8586
601=key={0} message={1}
8687
602=key={0} exception

org.eclipse.paho.mqttv5.client/src/test/java/org/eclipse/paho/mqttv5/client/test/BasicTest.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,6 @@ public static void setUpBeforeClass() throws Exception {
5252
}
5353
}
5454

55-
56-
5755
/**
5856
* Very simple test case that validates that the client can connect and then
5957
* disconnect cleanly.
@@ -118,4 +116,58 @@ public void testPublishAndReceive() throws MqttException, InterruptedException {
118116
TestClientUtilities.disconnectAndCloseClient(asyncClient, timeout);
119117
}
120118

119+
/**
120+
* @throws MqttException,
121+
* InterruptedException
122+
*
123+
*/
124+
@Test
125+
public void testQoS2DuplicateIssue() throws MqttException, InterruptedException {
126+
String methodName = Utility.getMethodName();
127+
LoggingUtilities.banner(log, SubscribeTests.class, methodName);
128+
String subTopic = "a/+/c";
129+
130+
int timeout = 5000;
131+
132+
MqttV5Receiver mqttV5Receiver = new MqttV5Receiver(methodName, LoggingUtilities.getPrintStream());
133+
MqttAsyncClient asyncClient = TestClientUtilities.connectAndGetClient(serverURI.toString(), methodName,
134+
mqttV5Receiver, null, timeout);
135+
136+
int qos = 2;
137+
log.info("Testing Publish and Receive at QoS: " + qos);
138+
// Subscribe to a topic
139+
log.info(String.format("Subscribing to: %s at QoS %d", subTopic, qos));
140+
MqttSubscription subscription = new MqttSubscription(subTopic, qos);
141+
IMqttToken subscribeToken = asyncClient.subscribe(subscription);
142+
subscribeToken.waitForCompletion(timeout);
143+
144+
String samplePayload = "Hello World";
145+
146+
log.info("Publishing messages on topics.");
147+
//publishMessage(samplePayload, 0, "a/b/c", asyncClient, timeout);
148+
//publishMessage(samplePayload, 0, "a/0/c", asyncClient, timeout);
149+
//publishMessage(samplePayload, 1, "a/1/c", asyncClient, timeout);
150+
publishMessage(samplePayload, 2, "a/2/c", asyncClient, timeout);
151+
152+
153+
log.info("Waiting for delivery and validating message.");
154+
//Assert.assertTrue(mqttV5Receiver.validateReceipt("a/b/c", 0, samplePayload.getBytes()));
155+
//Assert.assertTrue(mqttV5Receiver.validateReceipt("a/0/c", 0, samplePayload.getBytes()));
156+
//Assert.assertTrue(mqttV5Receiver.validateReceipt("a/1/c", 1, samplePayload.getBytes()));
157+
Assert.assertTrue(mqttV5Receiver.validateReceipt("a/2/c", 2, samplePayload.getBytes()));
158+
159+
Thread.sleep(10000);
160+
161+
162+
TestClientUtilities.disconnectAndCloseClient(asyncClient, timeout);
163+
}
164+
165+
private void publishMessage(String payload, int qos, String topic, MqttAsyncClient client, int timeout) throws MqttException {
166+
MqttMessage testMessage = new MqttMessage(payload.getBytes(), qos, false, null);
167+
log.info(String.format("Publishing Message %s to: %s at QoS: %d", testMessage.toDebugString(), topic,
168+
qos));
169+
IMqttDeliveryToken deliveryToken = client.publish(topic, testMessage);
170+
deliveryToken.waitForCompletion(timeout);
171+
}
172+
121173
}

0 commit comments

Comments
 (0)