Skip to content

Commit bea7787

Browse files
committed
Issue #531 - Reverting misguided change that didn't assign Message IDs to QoS 0 messages
Signed-off-by: James Sutton <james.sutton@uk.ibm.com>
1 parent 7e3acf3 commit bea7787

2 files changed

Lines changed: 69 additions & 19 deletions

File tree

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientState.java

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public class ClientState implements MqttState {
125125
private long keepAlive;
126126
private boolean cleanSession;
127127
private MqttClientPersistence persistence;
128-
128+
129129
private int actualInFlight = 0;
130130
private int inFlightPubRels = 0;
131131

@@ -238,14 +238,14 @@ protected void clearState() throws MqttException {
238238
outgoingTopicAliases.clear();
239239
incomingTopicAliases.clear();
240240
}
241-
241+
242242
protected void clearConnectionState() throws MqttException {
243243
final String methodName = "clearConnectionState";
244244
// @TRACE=665=Clearing Connection State (Topic Aliases)
245245
log.fine(CLASS_NAME, methodName, "665");
246246
outgoingTopicAliases.clear();
247247
incomingTopicAliases.clear();
248-
248+
249249
}
250250

251251
private MqttWireMessage restoreMessage(String key, MqttPersistable persistable) throws MqttException {
@@ -520,14 +520,7 @@ public void send(MqttWireMessage message, MqttToken token) throws MqttException
520520
final String methodName = "send";
521521
// Set Message ID if required
522522
if (message.isMessageIdRequired() && (message.getMessageId() == 0)) {
523-
if (message instanceof MqttPublish && (((MqttPublish) message).getMessage().getQos() != 0)) {
524-
message.setMessageId(getNextMessageId());
525-
} else if (message instanceof MqttPubAck || message instanceof MqttPubRec || message instanceof MqttPubRel
526-
|| message instanceof MqttPubComp || message instanceof MqttSubscribe
527-
|| message instanceof MqttSubAck || message instanceof MqttUnsubscribe
528-
|| message instanceof MqttUnsubAck) {
529-
message.setMessageId(getNextMessageId());
530-
}
523+
message.setMessageId(getNextMessageId());
531524
}
532525
// Set Topic Alias if required
533526
if (message instanceof MqttPublish && this.mqttSession.getOutgoingTopicAliasMaximum() > 0) {
@@ -545,7 +538,7 @@ public void send(MqttWireMessage message, MqttToken token) throws MqttException
545538
}
546539
}
547540
}
548-
541+
549542
if (token != null) {
550543
try {
551544
token.internalTok.setMessageID(message.getMessageId());
@@ -1043,7 +1036,7 @@ protected void notifyReceivedAck(MqttAck ack) throws MqttException {
10431036
// @TRACE 662=no message found for ack id={0}
10441037
log.fine(CLASS_NAME, methodName, "662", new Object[] { Integer.valueOf(ack.getMessageId()) });
10451038
} else if (ack instanceof MqttPubRec) {
1046-
1039+
10471040
// Update the token with the reason codes
10481041
updateResult(ack, token, mex);
10491042

@@ -1261,12 +1254,17 @@ protected void notifyComplete(MqttToken token) throws MqttException {
12611254
checkQuiesceLock();
12621255
}
12631256
}
1264-
1257+
12651258
/**
1266-
* Updates a token with the latest reason codes, currently only used for PubRec messages.
1267-
* @param msg - The message that we are using for the update
1268-
* @param token - The Token we are updating
1269-
* @param ex - if there was a problem store the exception in the token.
1259+
* Updates a token with the latest reason codes, currently only used for PubRec
1260+
* messages.
1261+
*
1262+
* @param msg
1263+
* - The message that we are using for the update
1264+
* @param token
1265+
* - The Token we are updating
1266+
* @param ex
1267+
* - if there was a problem store the exception in the token.
12701268
*/
12711269
protected void updateResult(MqttWireMessage ack, MqttToken token, MqttException ex) {
12721270
token.internalTok.update(ack, ex);
@@ -1369,7 +1367,7 @@ public void disconnected(MqttException reason) {
13691367
if (cleanSession) {
13701368
clearState();
13711369
}
1372-
1370+
13731371
clearConnectionState();
13741372

13751373
pendingMessages.clear();

org.eclipse.paho.mqttv5.client/src/test/java/org/eclipse/paho/mqttv5/client/test/PublishTests.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,58 @@ public void testPublishRC() throws Exception {
8888
int[] expectedRC = new int[] {16, 0};
8989
Assert.assertArrayEquals(expectedRC, deliveryToken.getReasonCodes());
9090

91+
log.info("Disconnecting...");
92+
IMqttToken disconnectToken = asyncClient.disconnect();
93+
disconnectToken.waitForCompletion(5000);
94+
Assert.assertFalse(asyncClient.isConnected());
95+
asyncClient.close();
96+
97+
}
98+
99+
@Test
100+
public void testPublishManyQoS0Messages() throws Exception {
101+
String methodName = Utility.getMethodName();
102+
LoggingUtilities.banner(log, cclass, methodName);
103+
String clientId = methodName;
104+
MqttAsyncClient asyncClient = new MqttAsyncClient(serverURI.toString(), clientId);
105+
106+
// Connect to the server
107+
log.info("Connecting: [serverURI: " + serverURI + ", ClientId: " + clientId + "]");
108+
IMqttToken connectToken = asyncClient.connect();
109+
connectToken.waitForCompletion(5000);
110+
String clientId2 = asyncClient.getClientId();
111+
log.info("Client ID = " + clientId2);
112+
boolean isConnected = asyncClient.isConnected();
113+
log.info("isConnected: " + isConnected);
114+
115+
MqttMessage testMessage = new MqttMessage("Test Payload".getBytes(), 0, false, new MqttProperties());
116+
long lStartTime = System.nanoTime();
117+
int messagesSentThisSecond = 0;
118+
long lastExTime = lStartTime;
119+
for(int i = 0; i < 70000; i++) {
120+
IMqttDeliveryToken deliveryToken = asyncClient.publish(topicPrefix + methodName, testMessage);
121+
deliveryToken.waitForCompletion(1000);
122+
messagesSentThisSecond++;
123+
long now = System.nanoTime();
124+
if(((now - lastExTime) /1000000) > 1000) {
125+
lastExTime = now;
126+
127+
System.out.println("Have sent " + messagesSentThisSecond + " messages in the last second.");
128+
messagesSentThisSecond = 0;
129+
}
130+
131+
}
132+
133+
//end
134+
long lEndTime = System.nanoTime();
135+
136+
//time elapsed
137+
long output = lEndTime - lStartTime;
138+
139+
System.out.println("Sending lots of messages took : " + output / 1000000 + " milliseconds.");
140+
141+
142+
91143
log.info("Disconnecting...");
92144
IMqttToken disconnectToken = asyncClient.disconnect();
93145
disconnectToken.waitForCompletion(5000);

0 commit comments

Comments
 (0)