@@ -361,6 +361,7 @@ public void testPersistBufferedMessages() throws Exception {
361361 persistence );
362362 DisconnectedBufferOptions disconnectedOpts = new DisconnectedBufferOptions ();
363363 disconnectedOpts .setBufferEnabled (true );
364+ disconnectedOpts .setPersistBuffer (true );
364365 client .setBufferOpts (disconnectedOpts );
365366
366367 // Enable Proxy & Connect to server
@@ -406,19 +407,22 @@ public void testPersistBufferedMessages() throws Exception {
406407 public void testUnPersistBufferedMessagesOnNewClient () throws Exception {
407408 String methodName = Utility .getMethodName ();
408409 LoggingUtilities .banner (log , cclass , methodName );
410+ int qos = 2 ;
409411
410412 // Mock up an Mqtt Message to be stored in Persistence
411413 MqttMessage mqttMessage = new MqttMessage (methodName .getBytes ());
412- mqttMessage .setQos (0 );
414+ mqttMessage .setQos (qos );
413415 MqttPublish pubMessage = new MqttPublish (topicPrefix + methodName , mqttMessage );
416+ // If ID is not set, then the persisted message may be invalid for QoS 1 & 2
417+ pubMessage .setMessageId (1 );
414418 final TestMemoryPersistence persistence = new TestMemoryPersistence ();
415419 persistence .open (null , null );
416- persistence .put ("sb-0 " , (MqttPublish ) pubMessage );
420+ persistence .put ("sb-1 " , (MqttPublish ) pubMessage );
417421 @ SuppressWarnings ("unchecked" )
418422 List <String > persistedKeys = Collections .list (persistence .keys ());
419423 log .info ("There are now: " + persistedKeys .size () + " keys in persistence" );
420424 Assert .assertEquals (1 , persistedKeys .size ());
421-
425+
422426 // Create Subscription client to watch for the message being published
423427 // as soon as the main client connects
424428 log .info ("Creating subscription client" );
@@ -428,22 +432,21 @@ public void testUnPersistBufferedMessagesOnNewClient() throws Exception {
428432 IMqttToken subConnectToken = subClient .connect ();
429433 subConnectToken .waitForCompletion ();
430434 Assert .assertTrue (subClient .isConnected ());
431- IMqttToken subToken = subClient .subscribe (topicPrefix + methodName , 0 );
435+ IMqttToken subToken = subClient .subscribe (topicPrefix + methodName , qos );
432436 subToken .waitForCompletion ();
433437
434438 // Create Real client
435439 log .info ("Creating new client that uses existing persistence layer" );
436440 MqttConnectOptions optionsNew = new MqttConnectOptions ();
437441 optionsNew .setCleanSession (false );
438442 MqttAsyncClient newClient = new MqttAsyncClient (serverURIString , methodName + "new-client11" , persistence );
439-
440443 // Connect Client with existing persistence layer
441444 IMqttToken newClientConnectToken = newClient .connect (optionsNew );
442445 newClientConnectToken .waitForCompletion ();
443446 Assert .assertTrue (newClient .isConnected ());
444447
445448 // Check that message is published / delivered
446- boolean recieved = mqttV3Receiver .validateReceipt (topicPrefix + methodName , 0 , methodName .getBytes ());
449+ boolean recieved = mqttV3Receiver .validateReceipt (topicPrefix + methodName , qos , methodName .getBytes ());
447450 Assert .assertTrue (recieved );
448451 log .info ("Message was successfully delivered after connect" );
449452
0 commit comments