1010import org .eclipse .paho .mqttv5 .client .MqttConnectionOptionsBuilder ;
1111import org .eclipse .paho .mqttv5 .client .MqttDisconnectResponse ;
1212import org .eclipse .paho .mqttv5 .client .MqttToken ;
13- import org .eclipse .paho .mqttv5 .client .persist .MemoryPersistence ;
13+ import org .eclipse .paho .mqttv5 .client .persist .MqttDefaultFilePersistence ;
1414import org .eclipse .paho .mqttv5 .common .MqttException ;
1515import org .eclipse .paho .mqttv5 .common .MqttMessage ;
1616import org .eclipse .paho .mqttv5 .common .packet .MqttProperties ;
1717
1818public class V5Client implements MqttCallback {
1919
2020 private String broker = "tcp://localhost:1883" ;
21- private String clientId = "" ; // Use Empty ID to get a server assigned client ID
21+ private String clientId = "test-client-id " ;
2222 private String topic = "MQTT Examples" ;
2323 private String content = "Message from MqttPublishSample" ;
2424 private String willContent = "I've Disconnected, sorry!" ;
25- private int qos = 1 ;
25+ private int qos = 0 ;
2626 Object runningLock ;
2727 boolean running = true ;
2828 int x = 0 ;
2929 public V5Client () throws InterruptedException {
3030
3131 try {
32- MemoryPersistence persistence = new MemoryPersistence ();
32+ MqttDefaultFilePersistence persistence = new MqttDefaultFilePersistence ();
3333 MqttAsyncClient asyncClient = new MqttAsyncClient (broker , clientId , persistence );
3434
3535 // Lets build our Connection Options:
3636 MqttConnectionOptionsBuilder conOptsBuilder = new MqttConnectionOptionsBuilder ();
3737 MqttConnectionOptions conOpts = conOptsBuilder .serverURI (broker ).cleanSession (true )
3838 .sessionExpiryInterval (120L ).automaticReconnect (true )
39- .topicAliasMaximum (0 )
39+ .topicAliasMaximum (10 )
4040 .will (topic , new MqttMessage (willContent .getBytes (), qos , false , null )).build ();
4141 asyncClient .setCallback (this );
4242
4343
4444 System .out .println ("Connecting to broker: " + broker );
4545
46- asyncClient .connect (conOpts , null , new MqttActionListener () {
46+ IMqttToken connectToken = asyncClient .connect (conOpts , null , new MqttActionListener () {
4747
4848 @ Override
4949 public void onSuccess (IMqttToken asyncActionToken ) {
50- System .out .println ("Connected" );
50+ System .out .println ("Connected, RC: " + asyncActionToken .getResponse ());
51+
5152
5253 printConnectDetails ((MqttToken ) asyncActionToken );
5354 try {
5455 IMqttToken subToken = asyncClient .subscribe (topic , qos );
5556 subToken .waitForCompletion ();
5657 printSubscriptionDetails ((MqttToken ) subToken );
58+ System .out .println ("Sub Return Code: " );
59+ System .out .println (subToken .getReasonCodes ()[0 ]);
60+
5761 MqttMessage msg = new MqttMessage (content .getBytes ());
5862 msg .setQos (qos );
59- asyncClient .publish (topic , msg );
63+ IMqttDeliveryToken pubDelToken = asyncClient .publish (topic , msg );
64+
6065 } catch (MqttException e ) {
6166 System .err .println ("Exception Occured whilst Subscribing:" );
6267 e .printStackTrace ();
@@ -71,6 +76,8 @@ public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
7176 }
7277 });
7378
79+ connectToken .waitForCompletion ();
80+
7481
7582
7683 while (running ) {
@@ -83,10 +90,19 @@ public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
8390 message = "FINISH" ;
8491 running = false ;
8592 }
86- asyncClient .publish (topic , new MqttMessage (message .getBytes (), 2 , false , null ));
93+ IMqttDeliveryToken deliveryToken = asyncClient .publish (topic , new MqttMessage (message .getBytes (), qos , false , null ));
94+ deliveryToken .waitForCompletion ();
95+ //System.out.println("Delivery Reason Code: " + deliveryToken.getReasonCodes()[0]);
8796 x ++;
8897
8998 }
99+
100+ // Unsubscribe from the topic
101+ IMqttToken unsubscribeToken = asyncClient .unsubscribe (topic );
102+ unsubscribeToken .waitForCompletion ();
103+ System .out .println ("Unsubscribed: " + unsubscribeToken .getResponse ().getReasonCodes ()[0 ]);
104+
105+
90106 asyncClient .disconnect (5000 );
91107 System .out .println ("Disconnected" );
92108 asyncClient .close ();
0 commit comments