1414 * Dave Locke - initial API and implementation and/or initial documentation
1515 * Ian Craggs - per subscription message handlers (bug 466579)
1616 * Ian Craggs - ack control (bug 472172)
17- * James Sutton - Automatic Reconnect & Offline Buffering
17+ * James Sutton - Automatic Reconnect & Offline Buffering
1818 */
1919package org .eclipse .paho .mqttv5 .client .internal ;
2020
@@ -105,8 +105,10 @@ public void start(String threadName, ExecutorService executorService) {
105105 if (!running ) {
106106 // Preparatory work before starting the background thread.
107107 // For safety ensure any old events are cleared.
108- messageQueue .clear ();
109- completeQueue .clear ();
108+ synchronized (workAvailable ) {
109+ messageQueue .clear ();
110+ completeQueue .clear ();
111+ }
110112
111113 running = true ;
112114 quiescing = false ;
@@ -191,7 +193,7 @@ public void run() {
191193 if (running ) {
192194 // Check for deliveryComplete callbacks...
193195 MqttToken token = null ;
194- synchronized (completeQueue ) {
196+ synchronized (workAvailable ) {
195197 if (!completeQueue .isEmpty ()) {
196198 // First call the delivery arrived callback if needed
197199 token = completeQueue .get (0 );
@@ -204,7 +206,7 @@ public void run() {
204206
205207 // Check for messageArrived callbacks...
206208 MqttPublish message = null ;
207- synchronized (messageQueue ) {
209+ synchronized (workAvailable ) {
208210 if (!messageQueue .isEmpty ()) {
209211 // Note, there is a window on connect where a publish
210212 // could arrive before we've
@@ -373,9 +375,9 @@ public void messageArrived(MqttPublish sendMessage) {
373375 }
374376 }
375377 if (!quiescing ) {
376- messageQueue .add (sendMessage );
377378 // Notify the CommsCallback thread that there's work to do...
378379 synchronized (workAvailable ) {
380+ messageQueue .add (sendMessage );
379381 // @TRACE 710=new msg avail, notify workAvailable
380382 log .fine (CLASS_NAME , methodName , "710" );
381383 workAvailable .notifyAll ();
@@ -425,9 +427,15 @@ public void quiesce() {
425427 spaceAvailable .notifyAll ();
426428 }
427429 }
430+
431+ boolean areQueuesEmpty () {
432+ synchronized (workAvailable ) {
433+ return completeQueue .isEmpty () && messageQueue .isEmpty ();
434+ }
435+ }
428436
429437 public boolean isQuiesced () {
430- return (quiescing && completeQueue . isEmpty () && messageQueue . isEmpty ());
438+ return (quiescing && areQueuesEmpty ());
431439 }
432440
433441 private void handleMessage (MqttPublish publishMessage ) throws Exception {
@@ -470,8 +478,8 @@ public void asyncOperationComplete(MqttToken token) {
470478
471479 if (running ) {
472480 // invoke callbacks on callback thread
473- completeQueue .add (token );
474481 synchronized (workAvailable ) {
482+ completeQueue .add (token );
475483 // @TRACE 715=new workAvailable. key={0}
476484 log .fine (CLASS_NAME , methodName , "715" , new Object [] { token .internalTok .getKey () });
477485 workAvailable .notifyAll ();
0 commit comments