Skip to content

Commit cd00fa3

Browse files
committed
Fixes #421, #477, #509: QoS 0 tokens get lost.
QoS 0 messages have no message id, and the tokens of these messages are thus all stored in the token store under id 0. When multiple messages are posted in quick succession, the token of the previous message is overwritten in the token store. As a result, clients waiting on this token hang, and the inFlight message count is never decreased. This commit attaches the token to the message itself. CommsSender and ClientState first look in the message for a token, and then, if there is no token there, in the token store. Tokens of QoS 0 messages are never stored in the token store. Signed-off-by: Hylke van der Schaaf <hylke.vds@gmail.com>
1 parent 3324b9c commit cd00fa3

3 files changed

Lines changed: 40 additions & 5 deletions

File tree

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/ClientState.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,8 @@ public void send(MqttWireMessage message, MqttToken token) throws MqttException
502502
message.setMessageId(getNextMessageId());
503503
}
504504
}
505-
if (token != null ) {
505+
if (token != null) {
506+
message.setToken(token);
506507
try {
507508
token.internalTok.setMessageID(message.getMessageId());
508509
} catch (Exception e) {
@@ -526,13 +527,14 @@ public void send(MqttWireMessage message, MqttToken token) throws MqttException
526527
case 2:
527528
outboundQoS2.put( Integer.valueOf(message.getMessageId()), message);
528529
persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
530+
tokenStore.saveToken(token, message);
529531
break;
530532
case 1:
531533
outboundQoS1.put( Integer.valueOf(message.getMessageId()), message);
532534
persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
535+
tokenStore.saveToken(token, message);
533536
break;
534537
}
535-
tokenStore.saveToken(token, message);
536538
pendingMessages.addElement(message);
537539
queueLock.notifyAll();
538540
}
@@ -898,8 +900,11 @@ protected void notifySent(MqttWireMessage message) {
898900
//@TRACE 625=key={0}
899901
log.fine(CLASS_NAME,methodName,"625",new Object[]{message.getKey()});
900902

901-
MqttToken token = tokenStore.getToken(message);
902-
if (token == null) return;
903+
MqttToken token = message.getToken();
904+
if (token == null) {
905+
token = tokenStore.getToken(message);
906+
if (token == null) return;
907+
}
903908
token.internalTok.notifySent();
904909
if (message instanceof MqttPingReq) {
905910
synchronized (pingOutstandingLock) {

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsSender.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,10 @@ public void run() {
131131
out.write(message);
132132
out.flush();
133133
} else {
134-
MqttToken token = tokenStore.getToken(message);
134+
MqttToken token = message.getToken();
135+
if (token == null) {
136+
tokenStore.getToken(message);
137+
}
135138
// While quiescing the tokenstore can be cleared so need
136139
// to check for null for the case where clear occurs
137140
// while trying to send a message.

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttWireMessage.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import org.eclipse.paho.client.mqttv3.MqttException;
2929
import org.eclipse.paho.client.mqttv3.MqttPersistable;
30+
import org.eclipse.paho.client.mqttv3.MqttToken;
3031
import org.eclipse.paho.client.mqttv3.internal.ExceptionHelper;
3132

3233
/**
@@ -65,6 +66,14 @@ public abstract class MqttWireMessage {
6566

6667
protected boolean duplicate = false;
6768

69+
/**
70+
* The token associated with the message. It needs to be stored here,
71+
* because QoS 0 messages do not have an ID, and tokens for these messages
72+
* can thus not be stored in the Token Store.
73+
*/
74+
private MqttToken token;
75+
76+
6877
public MqttWireMessage(byte type) {
6978
this.type = type;
7079
// Use zero as the default message ID. Can't use -1, as that is serialized
@@ -386,6 +395,24 @@ public static void validateVariableByteInt(int value) throws IllegalArgumentExce
386395

387396
}
388397

398+
/**
399+
* Get the token associated with the message.
400+
*
401+
* @return The token associated with the message.
402+
*/
403+
public MqttToken getToken() {
404+
return token;
405+
}
406+
407+
/**
408+
* Set the token associated with the message.
409+
*
410+
* @param token the token associated with the message.
411+
*/
412+
public void setToken(MqttToken token) {
413+
this.token = token;
414+
}
415+
389416
public String toString() {
390417
return PACKET_NAMES[type];
391418
}

0 commit comments

Comments
 (0)