Skip to content

Commit 4e53347

Browse files
author
Ian Craggs
committed
A test to go along with PR #563
1 parent 1af0487 commit 4e53347

1 file changed

Lines changed: 79 additions & 0 deletions

File tree

org.eclipse.paho.client.mqttv3.test/src/test/java/org/eclipse/paho/client/mqttv3/test/SendReceiveAsyncTest.java

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -656,4 +656,83 @@ public void testConnectTimeout() throws Exception {
656656

657657
log.exiting(className, methodName);
658658
}
659+
660+
/**
661+
* Test tokens for QoS 0 being 'lost'
662+
*/
663+
@Test
664+
public void testQoS0Tokens() {
665+
final String methodName = Utility.getMethodName();
666+
LoggingUtilities.banner(log, cclass, methodName);
667+
log.entering(className, methodName);
668+
669+
int tokenCount = 1000; // how many QoS 0 tokens shall we track?
670+
671+
IMqttAsyncClient mqttClient = null;
672+
try {
673+
mqttClient = clientFactory.createMqttAsyncClient(serverURI, methodName);
674+
IMqttToken connectToken;
675+
IMqttToken subToken;
676+
IMqttDeliveryToken[] pubTokens = new IMqttDeliveryToken[tokenCount];
677+
678+
MqttV3Receiver mqttV3Receiver = new MqttV3Receiver(mqttClient, LoggingUtilities.getPrintStream());
679+
log.info("Assigning callback...");
680+
mqttClient.setCallback(mqttV3Receiver);
681+
682+
MqttConnectOptions opts = new MqttConnectOptions();
683+
opts.setMaxInflight(tokenCount);
684+
connectToken = mqttClient.connect(opts);
685+
log.info("Connecting...(serverURI:" + serverURI + ", ClientId:" + methodName);
686+
connectToken.waitForCompletion();
687+
688+
String[] topicNames = new String[]{topicPrefix + methodName + "/Topic"};
689+
690+
subToken = mqttClient.subscribe(topicNames[0], 2);
691+
log.info("Subscribing to..." + topicNames[0]);
692+
subToken.waitForCompletion();
693+
694+
for (int i = 0; i < tokenCount; ++i) {
695+
try {
696+
pubTokens[i] = mqttClient.publish(topicNames[0], "message".getBytes(), 0, false);
697+
} catch (Exception e) {
698+
e.printStackTrace();
699+
}
700+
}
701+
log.info(tokenCount + " messages sent");
702+
int errors = 0;
703+
for (int i = 0; i < tokenCount; ++i) {
704+
try {
705+
pubTokens[i].waitForCompletion(10);
706+
} catch (Exception e) {
707+
errors += 1;
708+
}
709+
}
710+
log.info("Number of waits incomplete "+errors);
711+
712+
while (mqttV3Receiver.receivedMessageCount() < tokenCount) {
713+
Thread.sleep(10);
714+
}
715+
}
716+
catch (Exception exception) {
717+
log.log(Level.SEVERE, "caught exception:", exception);
718+
Assert.fail("Exception:" + methodName + " exception="+ exception);
719+
}
720+
finally {
721+
try {
722+
if (mqttClient != null) {
723+
IMqttToken disconnectToken;
724+
disconnectToken = mqttClient.disconnect(null, null);
725+
log.info("Disconnecting...");
726+
disconnectToken.waitForCompletion();
727+
log.info("Close...");
728+
mqttClient.close();
729+
}
730+
}
731+
catch (Exception exception) {
732+
log.log(Level.SEVERE, "caught exception:", exception);
733+
}
734+
}
735+
736+
log.exiting(className, methodName);
737+
}
659738
}

0 commit comments

Comments
 (0)