Skip to content

Commit e4ff13c

Browse files
committed
Issue #520
- Fixing Ack Deserialisation logic for scenarios where remaininglength = 2 bytes - Allowing QoS 2 message flows to stack return codes. Signed-off-by: James Sutton <james.sutton@uk.ibm.com>
1 parent a2e5cef commit e4ff13c

7 files changed

Lines changed: 156 additions & 5 deletions

File tree

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientState.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,6 +1048,9 @@ protected void notifyReceivedAck(MqttAck ack) throws MqttException {
10481048
// @TRACE 662=no message found for ack id={0}
10491049
log.fine(CLASS_NAME, methodName, "662", new Object[] { Integer.valueOf(ack.getMessageId()) });
10501050
} else if (ack instanceof MqttPubRec) {
1051+
1052+
// Update the token with the reason codes
1053+
updateResult(ack, token, mex);
10511054

10521055
// Complete the QoS 2 flow. Unlike all other
10531056
// flows, QoS is a 2 phase flow. The second phase sends a
@@ -1263,6 +1266,16 @@ protected void notifyComplete(MqttToken token) throws MqttException {
12631266
checkQuiesceLock();
12641267
}
12651268
}
1269+
1270+
/**
1271+
* Updates a token with the latest reason codes, currently only used for PubRec messages.
1272+
* @param msg - The message that we are using for the update
1273+
* @param token - The Token we are updating
1274+
* @param ex - if there was a problem store the exception in the token.
1275+
*/
1276+
protected void updateResult(MqttWireMessage ack, MqttToken token, MqttException ex) {
1277+
token.internalTok.update(ack, ex);
1278+
}
12661279

12671280
protected void notifyResult(MqttWireMessage ack, MqttToken token, MqttException ex) {
12681281
final String methodName = "notifyResult";

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/Token.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,45 @@ protected MqttWireMessage waitForResponse(long timeout) throws MqttException {
181181
log.fine(CLASS_NAME, methodName, "402", new Object[] { getKey(), this.response });
182182
return this.response;
183183
}
184+
185+
/**
186+
* Update the token with any new reason codes, currently only used for PubRec
187+
*
188+
* @param msg
189+
* response message.
190+
* @param ex
191+
* if there was a problem store the exception in the token.
192+
*/
193+
protected void update(MqttWireMessage msg, MqttException ex) {
194+
final String methodName = "markComplete";
195+
// @TRACE 411=>key={0} response={1} excep={2}
196+
log.fine(CLASS_NAME, methodName, "411", new Object[] { getKey(), msg, ex });
197+
198+
synchronized (responseLock){
199+
if(msg instanceof MqttPubRec) {
200+
if(msg.getReasonCodes() != null) {
201+
updateReasonCodes(msg.getReasonCodes());
202+
}
203+
}
204+
}
205+
206+
}
207+
208+
/**
209+
* Updates the reasonCodes Array and extends it with any new reason codes.
210+
* This allows message flows like Qos 2 to combine reason codes together across multiple messages e.g. PubRec and PubComp
211+
* @param newReasonCodes - The additional Reason Codes.
212+
*/
213+
protected void updateReasonCodes(int[] newReasonCodes) {
214+
if(this.reasonCodes == null) {
215+
this.reasonCodes = newReasonCodes;
216+
} else {
217+
int[] updatedReasonCodes = new int[this.reasonCodes.length + newReasonCodes.length];
218+
System.arraycopy(this.reasonCodes, 0, updatedReasonCodes, 0, this.reasonCodes.length);
219+
System.arraycopy(newReasonCodes, 0, updatedReasonCodes, this.reasonCodes.length, newReasonCodes.length);
220+
this.reasonCodes = updatedReasonCodes;
221+
}
222+
}
184223

185224
/**
186225
* Mark the token as complete and ready for users to be notified.
@@ -202,7 +241,7 @@ protected void markComplete(MqttWireMessage msg, MqttException ex) {
202241
if (msg instanceof MqttPubAck || msg instanceof MqttPubComp || msg instanceof MqttPubRec
203242
|| msg instanceof MqttPubRel || msg instanceof MqttSubAck || msg instanceof MqttUnsubAck) {
204243
if (msg.getReasonCodes() != null) {
205-
this.reasonCodes = msg.getReasonCodes();
244+
updateReasonCodes(msg.getReasonCodes());
206245
}
207246
}
208247

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package org.eclipse.paho.mqttv5.client.test;
2+
3+
import java.net.URI;
4+
import java.util.Arrays;
5+
import java.util.UUID;
6+
import java.util.logging.Level;
7+
import java.util.logging.Logger;
8+
9+
import org.eclipse.paho.mqttv5.client.IMqttDeliveryToken;
10+
import org.eclipse.paho.mqttv5.client.IMqttToken;
11+
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
12+
import org.eclipse.paho.mqttv5.client.test.client.MqttClientFactoryPaho;
13+
import org.eclipse.paho.mqttv5.client.test.logging.LoggingUtilities;
14+
import org.eclipse.paho.mqttv5.client.test.properties.TestProperties;
15+
import org.eclipse.paho.mqttv5.client.test.utilities.Utility;
16+
import org.eclipse.paho.mqttv5.common.MqttMessage;
17+
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
18+
import org.junit.AfterClass;
19+
import org.junit.Assert;
20+
import org.junit.BeforeClass;
21+
import org.junit.Test;
22+
23+
public class PublishTests {
24+
25+
static final Class<?> cclass = PublishTests.class;
26+
private static final String className = cclass.getName();
27+
private static final Logger log = Logger.getLogger(className);
28+
29+
private static URI serverURI;
30+
private static MqttClientFactoryPaho clientFactory;
31+
private static String topicPrefix;
32+
33+
@BeforeClass
34+
public static void setUpBeforeClass() throws Exception {
35+
try {
36+
String methodName = Utility.getMethodName();
37+
LoggingUtilities.banner(log, cclass, methodName);
38+
39+
serverURI = TestProperties.getServerURI();
40+
clientFactory = new MqttClientFactoryPaho();
41+
clientFactory.open();
42+
topicPrefix = "Mqttv5PublishTests-" + UUID.randomUUID().toString() + "-";
43+
44+
} catch (Exception exception) {
45+
log.log(Level.SEVERE, "caught exception:", exception);
46+
throw exception;
47+
}
48+
}
49+
50+
@AfterClass
51+
public static void tearDownAfterClass() throws Exception {
52+
String methodName = Utility.getMethodName();
53+
LoggingUtilities.banner(log, cclass, methodName);
54+
55+
try {
56+
if (clientFactory != null) {
57+
clientFactory.close();
58+
clientFactory.disconnect();
59+
}
60+
} catch (Exception exception) {
61+
log.log(Level.SEVERE, "caught exception:", exception);
62+
}
63+
}
64+
65+
@Test
66+
public void testPublishRC() throws Exception {
67+
String methodName = Utility.getMethodName();
68+
LoggingUtilities.banner(log, cclass, methodName);
69+
String clientId = methodName;
70+
MqttAsyncClient asyncClient = new MqttAsyncClient(serverURI.toString(), clientId);
71+
72+
// Connect to the server
73+
log.info("Connecting: [serverURI: " + serverURI + ", ClientId: " + clientId + "]");
74+
IMqttToken connectToken = asyncClient.connect();
75+
connectToken.waitForCompletion(5000);
76+
String clientId2 = asyncClient.getClientId();
77+
log.info("Client ID = " + clientId2);
78+
boolean isConnected = asyncClient.isConnected();
79+
log.info("isConnected: " + isConnected);
80+
81+
// Publish a message to a random topic
82+
MqttMessage testMessage = new MqttMessage("Test Payload".getBytes(), 2, false, new MqttProperties());
83+
log.info("Publishing Message to: " + topicPrefix + methodName);
84+
IMqttDeliveryToken deliveryToken = asyncClient.publish(topicPrefix + methodName, testMessage);
85+
deliveryToken.waitForCompletion(5000);
86+
log.info(deliveryToken.getResponse().toString());
87+
log.info("Return codes: " + Arrays.toString(deliveryToken.getReasonCodes()));
88+
int[] expectedRC = new int[] {16, 0};
89+
Assert.assertArrayEquals(expectedRC, deliveryToken.getReasonCodes());
90+
91+
log.info("Disconnecting...");
92+
IMqttToken disconnectToken = asyncClient.disconnect();
93+
disconnectToken.waitForCompletion(5000);
94+
Assert.assertFalse(asyncClient.isConnected());
95+
asyncClient.close();
96+
97+
}
98+
99+
}

org.eclipse.paho.mqttv5.common/src/main/java/org/eclipse/paho/mqttv5/common/packet/MqttPubAck.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public MqttPubAck( byte[] data) throws IOException, MqttException {
4646
DataInputStream dis = new DataInputStream(counter);
4747
msgId = dis.readUnsignedShort();
4848
long remainder = (long)data.length - counter.getCounter();
49-
if (remainder > 2) {
49+
if (remainder >= 2) {
5050
reasonCode = dis.readUnsignedByte();
5151
validateReturnCode(reasonCode, validReturnCodes);
5252
} else {

org.eclipse.paho.mqttv5.common/src/main/java/org/eclipse/paho/mqttv5/common/packet/MqttPubComp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public MqttPubComp(byte[] data) throws IOException, MqttException {
4444
DataInputStream dis = new DataInputStream(counter);
4545
msgId = dis.readUnsignedShort();
4646
long remainder = (long) data.length - counter.getCounter();
47-
if (remainder > 2) {
47+
if (remainder >= 2) {
4848
reasonCode = dis.readUnsignedByte();
4949
validateReturnCode(reasonCode, validReturnCodes);
5050
} else {

org.eclipse.paho.mqttv5.common/src/main/java/org/eclipse/paho/mqttv5/common/packet/MqttPubRec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public MqttPubRec(byte[] data) throws IOException, MqttException {
4747
DataInputStream dis = new DataInputStream(counter);
4848
msgId = dis.readUnsignedShort();
4949
long remainder = (long) data.length - counter.getCounter();
50-
if (remainder > 2) {
50+
if (remainder >= 2) {
5151
reasonCode = dis.readUnsignedByte();
5252
validateReturnCode(reasonCode, validReturnCodes);
5353
} else {

org.eclipse.paho.mqttv5.common/src/main/java/org/eclipse/paho/mqttv5/common/packet/MqttPubRel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public MqttPubRel(byte[] data) throws IOException, MqttException {
4444
DataInputStream dis = new DataInputStream(counter);
4545
msgId = dis.readUnsignedShort();
4646
long remainder = (long) data.length - counter.getCounter();
47-
if (remainder > 2) {
47+
if (remainder >= 2) {
4848
reasonCode = dis.readUnsignedByte();
4949
validateReturnCode(reasonCode, validReturnCodes);
5050
} else {

0 commit comments

Comments
 (0)