Skip to content

Commit 2068983

Browse files
author
bzhangxmt
committed
Fix Bug 431268 - DUP flag incorrectly set by paho client
This issue is cause by caching the encoded message header, and didn't set DUP flag when retransmission in case of reconnect. Bug: 431268 Signed-off-by: bzhangxmt <zhbinbj@cn.ibm.com>
1 parent d0258e4 commit 2068983

2 files changed

Lines changed: 21 additions & 21 deletions

File tree

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/ClientState.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -315,9 +315,11 @@ protected void restoreState() throws MqttException {
315315
if (persistence.containsKey(getSendConfirmPersistenceKey(sendMessage))) {
316316
MqttPersistable persistedConfirm = persistence.get(getSendConfirmPersistenceKey(sendMessage));
317317
// QoS 2, and CONFIRM has already been sent...
318+
// NO DUP flag is allowed for 3.1.1 spec while it's not clear for 3.1 spec
319+
// So we just remove DUP
318320
MqttPubRel confirmMessage = (MqttPubRel) restoreMessage(key, persistedConfirm);
319321
if (confirmMessage != null) {
320-
confirmMessage.setDuplicate(true);
322+
// confirmMessage.setDuplicate(true); // REMOVED
321323
//@TRACE 605=outbound QoS 2 pubrel key={0} message={1}
322324
log.fine(CLASS_NAME,methodName, "605", new Object[]{key,message});
323325

@@ -375,11 +377,12 @@ private void restoreInflightMessages() {
375377
Enumeration keys = outboundQoS2.keys();
376378
while (keys.hasMoreElements()) {
377379
Object key = keys.nextElement();
378-
Object msg = outboundQoS2.get(key);
380+
MqttWireMessage msg = (MqttWireMessage) outboundQoS2.get(key);
379381
if (msg instanceof MqttPublish) {
380382
//@TRACE 610=QoS 2 publish key={0}
381383
log.fine(CLASS_NAME,methodName, "610", new Object[]{key});
382-
384+
// set DUP flag only for PUBLISH, but NOT for PUBREL (spec 3.1.1)
385+
msg.setDuplicate(true);
383386
insertInOrder(pendingMessages, (MqttPublish)msg);
384387
} else if (msg instanceof MqttPubRel) {
385388
//@TRACE 611=QoS 2 pubrel key={0}
@@ -392,6 +395,7 @@ private void restoreInflightMessages() {
392395
while (keys.hasMoreElements()) {
393396
Object key = keys.nextElement();
394397
MqttPublish msg = (MqttPublish)outboundQoS1.get(key);
398+
msg.setDuplicate(true);
395399
//@TRACE 612=QoS 1 publish key={0}
396400
log.fine(CLASS_NAME,methodName, "612", new Object[]{key});
397401

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttWireMessage.java

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ public abstract class MqttWireMessage {
6060

6161
protected boolean duplicate = false;
6262

63-
private byte[] encodedHeader = null;
6463

6564
public MqttWireMessage(byte type) {
6665
this.type = type;
@@ -114,24 +113,21 @@ public String getKey() {
114113
}
115114

116115
public byte[] getHeader() throws MqttException {
117-
if (encodedHeader == null) {
118-
try {
119-
int first = ((getType() & 0x0f) << 4) ^ (getMessageInfo() & 0x0f);
120-
byte[] varHeader = getVariableHeader();
121-
int remLen = varHeader.length + getPayload().length;
116+
try {
117+
int first = ((getType() & 0x0f) << 4) ^ (getMessageInfo() & 0x0f);
118+
byte[] varHeader = getVariableHeader();
119+
int remLen = varHeader.length + getPayload().length;
122120

123-
ByteArrayOutputStream baos = new ByteArrayOutputStream();
124-
DataOutputStream dos = new DataOutputStream(baos);
125-
dos.writeByte(first);
126-
dos.write(encodeMBI(remLen));
127-
dos.write(varHeader);
128-
dos.flush();
129-
encodedHeader = baos.toByteArray();
130-
} catch(IOException ioe) {
131-
throw new MqttException(ioe);
132-
}
133-
}
134-
return encodedHeader;
121+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
122+
DataOutputStream dos = new DataOutputStream(baos);
123+
dos.writeByte(first);
124+
dos.write(encodeMBI(remLen));
125+
dos.write(varHeader);
126+
dos.flush();
127+
return baos.toByteArray();
128+
} catch(IOException ioe) {
129+
throw new MqttException(ioe);
130+
}
135131
}
136132

137133
protected abstract byte[] getVariableHeader() throws MqttException;

0 commit comments

Comments
 (0)