|
22 | 22 | import org.eclipse.paho.client.mqttv3.IMqttAsyncClient; |
23 | 23 | import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; |
24 | 24 | import org.eclipse.paho.client.mqttv3.IMqttToken; |
| 25 | +import org.eclipse.paho.client.mqttv3.MqttAsyncClient; |
25 | 26 | import org.eclipse.paho.client.mqttv3.MqttConnectOptions; |
26 | 27 | import org.eclipse.paho.client.mqttv3.MqttException; |
| 28 | +import org.eclipse.paho.client.mqttv3.MqttMessage; |
27 | 29 | import org.eclipse.paho.client.mqttv3.test.client.MqttClientFactoryPaho; |
28 | 30 | import org.eclipse.paho.client.mqttv3.test.logging.LoggingUtilities; |
29 | 31 | import org.eclipse.paho.client.mqttv3.test.properties.TestProperties; |
@@ -732,7 +734,50 @@ public void testQoS0Tokens() { |
732 | 734 | log.log(Level.SEVERE, "caught exception:", exception); |
733 | 735 | } |
734 | 736 | } |
735 | | - |
736 | 737 | log.exiting(className, methodName); |
737 | | - } |
| 738 | + } |
| 739 | + |
| 740 | + @Test |
| 741 | + public void testPublishManyQoS0Messages() throws Exception { |
| 742 | + String methodName = Utility.getMethodName(); |
| 743 | + LoggingUtilities.banner(log, cclass, methodName); |
| 744 | + String clientId = methodName; |
| 745 | + IMqttAsyncClient asyncClient = new MqttAsyncClient(serverURI.toString(), clientId); |
| 746 | + |
| 747 | + // Connect to the server |
| 748 | + log.info("Connecting: [serverURI: " + serverURI + ", ClientId: " + clientId + "]"); |
| 749 | + IMqttToken connectToken = asyncClient.connect(); |
| 750 | + connectToken.waitForCompletion(5000); |
| 751 | + String clientId2 = asyncClient.getClientId(); |
| 752 | + log.info("Client ID = " + clientId2); |
| 753 | + boolean isConnected = asyncClient.isConnected(); |
| 754 | + log.info("isConnected: " + isConnected); |
| 755 | + |
| 756 | + MqttMessage testMessage = new MqttMessage("Test Payload".getBytes()); |
| 757 | + testMessage.setQos(0); |
| 758 | + testMessage.setRetained(false); |
| 759 | + long lStartTime = System.nanoTime(); |
| 760 | + int no_of_messages = 70000; |
| 761 | + for(int i = 0; i < no_of_messages; i++) { |
| 762 | + IMqttDeliveryToken deliveryToken = asyncClient.publish(topicPrefix + methodName, testMessage); |
| 763 | + try |
| 764 | + { |
| 765 | + deliveryToken.waitForCompletion(5000); |
| 766 | + } catch (Exception e) { |
| 767 | + System.out.println("wait failed "+i); |
| 768 | + } |
| 769 | + } |
| 770 | + |
| 771 | + long lEndTime = System.nanoTime(); |
| 772 | + long output = lEndTime - lStartTime; //time elapsed |
| 773 | + log.info("Sending "+no_of_messages+" of messages took : " + output / 1000000 + " milliseconds."); |
| 774 | + |
| 775 | + log.info("Disconnecting..."); |
| 776 | + IMqttToken disconnectToken = asyncClient.disconnect(); |
| 777 | + disconnectToken.waitForCompletion(5000); |
| 778 | + Assert.assertFalse(asyncClient.isConnected()); |
| 779 | + asyncClient.close(); |
| 780 | + |
| 781 | + } |
| 782 | + |
738 | 783 | } |
0 commit comments