Skip to content

Commit 7e3acf3

Browse files
committed
MQTTv5 - Fixing Encoding and Decoding of small and very small Pub Ack packets
Signed-off-by: James Sutton <james.sutton@uk.ibm.com>
1 parent ba2abaf commit 7e3acf3

8 files changed

Lines changed: 148 additions & 35 deletions

File tree

org.eclipse.paho.mqttv5.common/src/main/java/org/eclipse/paho/mqttv5/common/packet/MqttPubAck.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,29 +32,29 @@ public class MqttPubAck extends MqttAck {
3232
MqttReturnCode.RETURN_CODE_IMPLEMENTATION_SPECIFIC_ERROR, MqttReturnCode.RETURN_CODE_NOT_AUTHORIZED,
3333
MqttReturnCode.RETURN_CODE_TOPIC_NAME_INVALID, MqttReturnCode.RETURN_CODE_QUOTA_EXCEEDED,
3434
MqttReturnCode.RETURN_CODE_PAYLOAD_FORMAT_INVALID };
35-
36-
private static final Byte[] validProperties = {MqttProperties.REASON_STRING_IDENTIFIER, MqttProperties.USER_DEFINED_PAIR_IDENTIFIER};
37-
35+
36+
private static final Byte[] validProperties = { MqttProperties.REASON_STRING_IDENTIFIER,
37+
MqttProperties.USER_DEFINED_PAIR_IDENTIFIER };
38+
3839
private MqttProperties properties;
39-
4040

41-
public MqttPubAck( byte[] data) throws IOException, MqttException {
41+
public MqttPubAck(byte[] data) throws IOException, MqttException {
4242
super(MqttWireMessage.MESSAGE_TYPE_PUBACK);
4343
properties = new MqttProperties(validProperties);
4444
ByteArrayInputStream bais = new ByteArrayInputStream(data);
4545
CountingInputStream counter = new CountingInputStream(bais);
4646
DataInputStream dis = new DataInputStream(counter);
4747
msgId = dis.readUnsignedShort();
48-
long remainder = (long)data.length - counter.getCounter();
49-
if (remainder >= 2) {
48+
long remainder = (long) data.length - counter.getCounter();
49+
if (remainder >= 1) {
5050
reasonCode = dis.readUnsignedByte();
5151
validateReturnCode(reasonCode, validReturnCodes);
5252
} else {
5353
reasonCode = 0;
5454
}
55-
if( remainder >= 4) {
56-
this.properties.decodeProperties(dis);
57-
}
55+
if (remainder >= 4) {
56+
this.properties.decodeProperties(dis);
57+
}
5858
dis.close();
5959
}
6060

@@ -82,11 +82,12 @@ protected byte[] getVariableHeader() throws MqttException {
8282

8383
byte[] identifierValueFieldsByteArray = this.properties.encodeProperties();
8484

85-
if (reasonCode != MqttReturnCode.RETURN_CODE_SUCCESS || identifierValueFieldsByteArray.length != 0) {
86-
85+
if (reasonCode != MqttReturnCode.RETURN_CODE_SUCCESS && identifierValueFieldsByteArray.length == 1) {
86+
// Encode the Return Code
87+
outputStream.write((byte) reasonCode);
88+
} else if (reasonCode != MqttReturnCode.RETURN_CODE_SUCCESS || identifierValueFieldsByteArray.length > 1) {
8789
// Encode the Return Code
8890
outputStream.write((byte) reasonCode);
89-
9091
// Write Identifier / Value Fields
9192
outputStream.write(identifierValueFieldsByteArray);
9293
}
@@ -97,11 +98,11 @@ protected byte[] getVariableHeader() throws MqttException {
9798
throw new MqttException(ioe);
9899
}
99100
}
100-
101+
101102
public int getReturnCode() {
102103
return reasonCode;
103104
}
104-
105+
105106
@Override
106107
public MqttProperties getProperties() {
107108
return this.properties;

org.eclipse.paho.mqttv5.common/src/main/java/org/eclipse/paho/mqttv5/common/packet/MqttPubComp.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public MqttPubComp(byte[] data) throws IOException, MqttException {
4444
DataInputStream dis = new DataInputStream(counter);
4545
msgId = dis.readUnsignedShort();
4646
long remainder = (long) data.length - counter.getCounter();
47-
if (remainder >= 2) {
47+
if (remainder >= 1) {
4848
reasonCode = dis.readUnsignedByte();
4949
validateReturnCode(reasonCode, validReturnCodes);
5050
} else {
@@ -53,7 +53,6 @@ public MqttPubComp(byte[] data) throws IOException, MqttException {
5353
if (remainder >= 4) {
5454
this.properties.decodeProperties(dis);
5555
}
56-
5756
dis.close();
5857
}
5958

@@ -81,10 +80,12 @@ protected byte[] getVariableHeader() throws MqttException {
8180

8281
byte[] identifierValueFieldsByteArray = this.properties.encodeProperties();
8382

84-
if (reasonCode != MqttReturnCode.RETURN_CODE_SUCCESS || identifierValueFieldsByteArray.length != 0) {
83+
if (reasonCode != MqttReturnCode.RETURN_CODE_SUCCESS && identifierValueFieldsByteArray.length == 1) {
84+
// Encode the Return Code
85+
outputStream.write((byte) reasonCode);
86+
} else if (reasonCode != MqttReturnCode.RETURN_CODE_SUCCESS || identifierValueFieldsByteArray.length > 1) {
8587
// Encode the Return Code
8688
outputStream.write((byte) reasonCode);
87-
8889
// Write Identifier / Value Fields
8990
outputStream.write(identifierValueFieldsByteArray);
9091
}

org.eclipse.paho.mqttv5.common/src/main/java/org/eclipse/paho/mqttv5/common/packet/MqttPubRec.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public MqttPubRec(byte[] data) throws IOException, MqttException {
4747
DataInputStream dis = new DataInputStream(counter);
4848
msgId = dis.readUnsignedShort();
4949
long remainder = (long) data.length - counter.getCounter();
50-
if (remainder >= 2) {
50+
if (remainder >= 1) {
5151
reasonCode = dis.readUnsignedByte();
5252
validateReturnCode(reasonCode, validReturnCodes);
5353
} else {
@@ -83,10 +83,12 @@ protected byte[] getVariableHeader() throws MqttException {
8383

8484
byte[] identifierValueFieldsByteArray = this.properties.encodeProperties();
8585

86-
if (reasonCode != MqttReturnCode.RETURN_CODE_SUCCESS || identifierValueFieldsByteArray.length != 0) {
86+
if (reasonCode != MqttReturnCode.RETURN_CODE_SUCCESS && identifierValueFieldsByteArray.length == 1) {
87+
// Encode the Return Code
88+
outputStream.write((byte) reasonCode);
89+
} else if (reasonCode != MqttReturnCode.RETURN_CODE_SUCCESS || identifierValueFieldsByteArray.length > 1) {
8790
// Encode the Return Code
8891
outputStream.write((byte) reasonCode);
89-
9092
// Write Identifier / Value Fields
9193
outputStream.write(identifierValueFieldsByteArray);
9294
}

org.eclipse.paho.mqttv5.common/src/main/java/org/eclipse/paho/mqttv5/common/packet/MqttPubRel.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public MqttPubRel(byte[] data) throws IOException, MqttException {
4444
DataInputStream dis = new DataInputStream(counter);
4545
msgId = dis.readUnsignedShort();
4646
long remainder = (long) data.length - counter.getCounter();
47-
if (remainder >= 2) {
47+
if (remainder >= 1) {
4848
reasonCode = dis.readUnsignedByte();
4949
validateReturnCode(reasonCode, validReturnCodes);
5050
} else {
@@ -80,13 +80,16 @@ protected byte[] getVariableHeader() throws MqttException {
8080

8181
byte[] identifierValueFieldsByteArray = this.properties.encodeProperties();
8282

83-
if (reasonCode != MqttReturnCode.RETURN_CODE_SUCCESS || identifierValueFieldsByteArray.length != 0) {
83+
if (reasonCode != MqttReturnCode.RETURN_CODE_SUCCESS && identifierValueFieldsByteArray.length == 1) {
84+
// Encode the Return Code
85+
outputStream.write((byte) reasonCode);
86+
} else if (reasonCode != MqttReturnCode.RETURN_CODE_SUCCESS || identifierValueFieldsByteArray.length > 1) {
8487
// Encode the Return Code
8588
outputStream.write((byte) reasonCode);
86-
8789
// Write Identifier / Value Fields
8890
outputStream.write(identifierValueFieldsByteArray);
8991
}
92+
9093
outputStream.flush();
9194
return baos.toByteArray();
9295
} catch (IOException ioe) {

org.eclipse.paho.mqttv5.common/src/test/java/org/eclipse/paho/mqttv5/common/packet/MqttPubAckTest.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,35 @@ public void testDecodingMqttPuback() throws MqttException, IOException {
5959
Assert.assertTrue(new UserProperty(userKey1, userValue1).equals(decodedProperties.getUserProperties().get(0)));
6060
Assert.assertTrue(new UserProperty(userKey2, userValue2).equals(decodedProperties.getUserProperties().get(1)));
6161
Assert.assertTrue(new UserProperty(userKey3, userValue3).equals(decodedProperties.getUserProperties().get(2)));
62-
63-
64-
62+
}
63+
64+
@Test
65+
public void testDecodingSmallMqttPuback() throws MqttException, IOException {
66+
MqttPubAck mqttPubackPacket = new MqttPubAck(MqttReturnCode.RETURN_CODE_TOPIC_NAME_INVALID, 1, new MqttProperties());
67+
byte[] header = mqttPubackPacket.getHeader();
68+
byte[] payload = mqttPubackPacket.getPayload();
69+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
70+
outputStream.write(header);
71+
outputStream.write(payload);
72+
// Total Packet should be 5 bytes long
73+
Assert.assertEquals(5, outputStream.size());
74+
MqttPubAck decodedPubackPacket = (MqttPubAck) MqttWireMessage.createWireMessage(outputStream.toByteArray());
75+
Assert.assertEquals(MqttReturnCode.RETURN_CODE_TOPIC_NAME_INVALID, decodedPubackPacket.getReturnCode());
76+
77+
}
78+
79+
@Test
80+
public void testDecodingVerySmallMqttPuback() throws MqttException, IOException {
81+
MqttPubAck mqttPubackPacket = new MqttPubAck(MqttReturnCode.RETURN_CODE_SUCCESS, 1, new MqttProperties());
82+
byte[] header = mqttPubackPacket.getHeader();
83+
byte[] payload = mqttPubackPacket.getPayload();
84+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
85+
outputStream.write(header);
86+
outputStream.write(payload);
87+
// Total Packet should be 4 bytes long
88+
Assert.assertEquals(4, outputStream.size());
89+
MqttPubAck decodedPubackPacket = (MqttPubAck) MqttWireMessage.createWireMessage(outputStream.toByteArray());
90+
Assert.assertEquals(MqttReturnCode.RETURN_CODE_SUCCESS, decodedPubackPacket.getReturnCode());
6591
}
6692

6793
public MqttPubAck generateMqttPubackPacket() throws MqttException{

org.eclipse.paho.mqttv5.common/src/test/java/org/eclipse/paho/mqttv5/common/packet/MqttPubCompTest.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,35 @@ public void testDecodingMqttPubComp() throws MqttException, IOException {
6060
Assert.assertTrue(new UserProperty(userKey1, userValue1).equals(properties.getUserProperties().get(0)));
6161
Assert.assertTrue(new UserProperty(userKey2, userValue2).equals(properties.getUserProperties().get(1)));
6262
Assert.assertTrue(new UserProperty(userKey3, userValue3).equals(properties.getUserProperties().get(2)));
63-
64-
63+
}
64+
65+
@Test
66+
public void testDecodingSmallMqttPubcomp() throws MqttException, IOException {
67+
MqttPubComp mqttPubackPacket = new MqttPubComp(MqttReturnCode.RETURN_CODE_PACKET_ID_NOT_FOUND, 1, new MqttProperties());
68+
byte[] header = mqttPubackPacket.getHeader();
69+
byte[] payload = mqttPubackPacket.getPayload();
70+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
71+
outputStream.write(header);
72+
outputStream.write(payload);
73+
// Total Packet should be 5 bytes long
74+
Assert.assertEquals(5, outputStream.size());
75+
MqttPubComp decodedPubackPacket = (MqttPubComp) MqttWireMessage.createWireMessage(outputStream.toByteArray());
76+
Assert.assertEquals(MqttReturnCode.RETURN_CODE_PACKET_ID_NOT_FOUND, decodedPubackPacket.getReturnCode());
77+
78+
}
79+
80+
@Test
81+
public void testDecodingVerySmallMqttPubcomp() throws MqttException, IOException {
82+
MqttPubComp mqttPubackPacket = new MqttPubComp(MqttReturnCode.RETURN_CODE_SUCCESS, 1, new MqttProperties());
83+
byte[] header = mqttPubackPacket.getHeader();
84+
byte[] payload = mqttPubackPacket.getPayload();
85+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
86+
outputStream.write(header);
87+
outputStream.write(payload);
88+
// Total Packet should be 4 bytes long
89+
Assert.assertEquals(4, outputStream.size());
90+
MqttPubComp decodedPubackPacket = (MqttPubComp) MqttWireMessage.createWireMessage(outputStream.toByteArray());
91+
Assert.assertEquals(MqttReturnCode.RETURN_CODE_SUCCESS, decodedPubackPacket.getReturnCode());
6592
}
6693

6794
public MqttPubComp generateMqttPubCompPacket() throws MqttException{

org.eclipse.paho.mqttv5.common/src/test/java/org/eclipse/paho/mqttv5/common/packet/MqttPubRecTest.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,35 @@ public void testDecodingMqttPubRec() throws MqttException, IOException {
5959
Assert.assertTrue(new UserProperty(userKey1, userValue1).equals(properties.getUserProperties().get(0)));
6060
Assert.assertTrue(new UserProperty(userKey2, userValue2).equals(properties.getUserProperties().get(1)));
6161
Assert.assertTrue(new UserProperty(userKey3, userValue3).equals(properties.getUserProperties().get(2)));
62-
63-
62+
}
63+
64+
@Test
65+
public void testDecodingSmallMqttPubrec() throws MqttException, IOException {
66+
MqttPubRec mqttPubackPacket = new MqttPubRec(MqttReturnCode.RETURN_CODE_TOPIC_NAME_INVALID, 1, new MqttProperties());
67+
byte[] header = mqttPubackPacket.getHeader();
68+
byte[] payload = mqttPubackPacket.getPayload();
69+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
70+
outputStream.write(header);
71+
outputStream.write(payload);
72+
// Total Packet should be 5 bytes long
73+
Assert.assertEquals(5, outputStream.size());
74+
MqttPubRec decodedPubackPacket = (MqttPubRec) MqttWireMessage.createWireMessage(outputStream.toByteArray());
75+
Assert.assertEquals(MqttReturnCode.RETURN_CODE_TOPIC_NAME_INVALID, decodedPubackPacket.getReturnCode());
76+
77+
}
78+
79+
@Test
80+
public void testDecodingVerySmallMqttPubrec() throws MqttException, IOException {
81+
MqttPubRec mqttPubackPacket = new MqttPubRec(MqttReturnCode.RETURN_CODE_SUCCESS, 1, new MqttProperties());
82+
byte[] header = mqttPubackPacket.getHeader();
83+
byte[] payload = mqttPubackPacket.getPayload();
84+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
85+
outputStream.write(header);
86+
outputStream.write(payload);
87+
// Total Packet should be 4 bytes long
88+
Assert.assertEquals(4, outputStream.size());
89+
MqttPubRec decodedPubackPacket = (MqttPubRec) MqttWireMessage.createWireMessage(outputStream.toByteArray());
90+
Assert.assertEquals(MqttReturnCode.RETURN_CODE_SUCCESS, decodedPubackPacket.getReturnCode());
6491
}
6592

6693
public MqttPubRec generateMqttPubRecPacket() throws MqttException{

org.eclipse.paho.mqttv5.common/src/test/java/org/eclipse/paho/mqttv5/common/packet/MqttPubRelTest.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,35 @@ public void testDecodingMqttPubRel() throws MqttException, IOException {
6060
Assert.assertTrue(new UserProperty(userKey1, userValue1).equals(properties.getUserProperties().get(0)));
6161
Assert.assertTrue(new UserProperty(userKey2, userValue2).equals(properties.getUserProperties().get(1)));
6262
Assert.assertTrue(new UserProperty(userKey3, userValue3).equals(properties.getUserProperties().get(2)));
63-
64-
65-
63+
}
64+
65+
@Test
66+
public void testDecodingSmallMqttPubrel() throws MqttException, IOException {
67+
MqttPubRel mqttPubackPacket = new MqttPubRel(MqttReturnCode.RETURN_CODE_PACKET_ID_NOT_FOUND, 1, new MqttProperties());
68+
byte[] header = mqttPubackPacket.getHeader();
69+
byte[] payload = mqttPubackPacket.getPayload();
70+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
71+
outputStream.write(header);
72+
outputStream.write(payload);
73+
// Total Packet should be 5 bytes long
74+
Assert.assertEquals(5, outputStream.size());
75+
MqttPubRel decodedPubackPacket = (MqttPubRel) MqttWireMessage.createWireMessage(outputStream.toByteArray());
76+
Assert.assertEquals(MqttReturnCode.RETURN_CODE_PACKET_ID_NOT_FOUND, decodedPubackPacket.getReturnCode());
77+
78+
}
79+
80+
@Test
81+
public void testDecodingVerySmallMqttPubrel() throws MqttException, IOException {
82+
MqttPubRel mqttPubackPacket = new MqttPubRel(MqttReturnCode.RETURN_CODE_SUCCESS, 1, new MqttProperties());
83+
byte[] header = mqttPubackPacket.getHeader();
84+
byte[] payload = mqttPubackPacket.getPayload();
85+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
86+
outputStream.write(header);
87+
outputStream.write(payload);
88+
// Total Packet should be 4 bytes long
89+
Assert.assertEquals(4, outputStream.size());
90+
MqttPubRel decodedPubackPacket = (MqttPubRel) MqttWireMessage.createWireMessage(outputStream.toByteArray());
91+
Assert.assertEquals(MqttReturnCode.RETURN_CODE_SUCCESS, decodedPubackPacket.getReturnCode());
6692
}
6793

6894
public MqttPubRel generateMqttPubRelPacket() throws MqttException{

0 commit comments

Comments
 (0)