2323import java .util .Vector ;
2424import java .util .concurrent .ExecutorService ;
2525import java .util .concurrent .Future ;
26- import java .util .concurrent .Semaphore ;
2726
2827import org .eclipse .paho .client .mqttv3 .IMqttActionListener ;
2928import org .eclipse .paho .client .mqttv3 .IMqttMessageListener ;
3736import org .eclipse .paho .client .mqttv3 .internal .wire .MqttPubAck ;
3837import org .eclipse .paho .client .mqttv3 .internal .wire .MqttPubComp ;
3938import org .eclipse .paho .client .mqttv3 .internal .wire .MqttPublish ;
39+ import org .eclipse .paho .client .mqttv3 .internal .wire .MqttWireMessage ;
4040import org .eclipse .paho .client .mqttv3 .logging .Logger ;
4141import org .eclipse .paho .client .mqttv3 .logging .LoggerFactory ;
4242
@@ -52,27 +52,29 @@ public class CommsCallback implements Runnable {
5252 private static final int INBOUND_QUEUE_SIZE = 10 ;
5353 private MqttCallback mqttCallback ;
5454 private MqttCallbackExtended reconnectInternalCallback ;
55- private Hashtable callbacks ; // topicFilter -> messageHandler
55+ private Hashtable < String , IMqttMessageListener > callbacks ; // topicFilter -> messageHandler
5656 private ClientComms clientComms ;
57- private Vector messageQueue ;
58- private Vector completeQueue ;
59- public boolean running = false ;
60- private boolean quiescing = false ;
57+ private Vector <MqttWireMessage > messageQueue ;
58+ private Vector <MqttToken > completeQueue ;
59+
60+ private enum State {STOPPED , RUNNING , QUIESCING };
61+ private State current_state = State .STOPPED ;
62+ private State target_state = State .STOPPED ;
6163 private Object lifecycle = new Object ();
6264 private Thread callbackThread ;
65+ private String threadName ;
66+ private Future <?> callbackFuture ;
67+
6368 private Object workAvailable = new Object ();
6469 private Object spaceAvailable = new Object ();
6570 private ClientState clientState ;
6671 private boolean manualAcks = false ;
67- private String threadName ;
68- private final Semaphore runningSemaphore = new Semaphore (1 );
69- private Future callbackFuture ;
7072
7173 CommsCallback (ClientComms clientComms ) {
7274 this .clientComms = clientComms ;
73- this .messageQueue = new Vector (INBOUND_QUEUE_SIZE );
74- this .completeQueue = new Vector (INBOUND_QUEUE_SIZE );
75- this .callbacks = new Hashtable ();
75+ this .messageQueue = new Vector < MqttWireMessage > (INBOUND_QUEUE_SIZE );
76+ this .completeQueue = new Vector < MqttToken > (INBOUND_QUEUE_SIZE );
77+ this .callbacks = new Hashtable < String , IMqttMessageListener > ();
7678 log .setResourceName (clientComms .getClient ().getClientId ());
7779 }
7880
@@ -87,55 +89,59 @@ public void setClientState(ClientState clientState) {
8789 */
8890 public void start (String threadName , ExecutorService executorService ) {
8991 this .threadName = threadName ;
92+
9093 synchronized (lifecycle ) {
91- if (! running ) {
94+ if (current_state == State . STOPPED ) {
9295 // Preparatory work before starting the background thread.
9396 // For safety ensure any old events are cleared.
9497 messageQueue .clear ();
9598 completeQueue .clear ();
96-
97- running = true ;
98- quiescing = false ;
99+
100+ target_state = State .RUNNING ;
99101 if (executorService == null ) {
100102 new Thread (this ).start ();
101103 } else {
102104 callbackFuture = executorService .submit (this );
103105 }
104106 }
105107 }
108+ while (!isRunning ()) {
109+ try { Thread .sleep (100 ); } catch (Exception e ) { }
110+ }
106111 }
112+
107113
108114 /**
109115 * Stops the callback thread.
110116 * This call will block until stop has completed.
111117 */
112118 public void stop () {
113119 final String methodName = "stop" ;
120+
114121 synchronized (lifecycle ) {
115122 if (callbackFuture != null ) {
116123 callbackFuture .cancel (true );
117124 }
118- if ( running ) {
119- // @TRACE 700=stopping
120- log . fine ( CLASS_NAME , methodName , " 700" );
121- running = false ;
122- if (! Thread . currentThread (). equals ( callbackThread ) ) {
123- try {
124- synchronized ( workAvailable ) {
125- // @TRACE 701=notify workAvailable and wait for run
126- // to finish
127- log . fine ( CLASS_NAME , methodName , " 701" );
128- workAvailable . notifyAll ();
129- }
130- // Wait for the thread to finish.
131- runningSemaphore . acquire ();
132- } catch ( InterruptedException ex ) {
133- } finally {
134- runningSemaphore . release ();
135- }
125+ }
126+ if ( isRunning ()) {
127+ // @TRACE 700=stopping
128+ log . fine ( CLASS_NAME , methodName , "700" ) ;
129+ synchronized ( lifecycle ) {
130+ target_state = State . STOPPED ;
131+ }
132+ if (! Thread . currentThread (). equals ( callbackThread )) {
133+ synchronized ( workAvailable ) {
134+ // @TRACE 701=notify workAvailable and wait for run
135+ // to finish
136+ log . fine ( CLASS_NAME , methodName , "701" );
137+ workAvailable . notifyAll ();
138+ }
139+ // Wait for the thread to finish.
140+ while ( isRunning ()) {
141+ try { Thread . sleep ( 100 ); } catch ( Exception e ) { }
142+ clientState . notifyQueueLock ();
136143 }
137144 }
138- callbackThread = null ;
139145 // @TRACE 703=stopped
140146 log .fine (CLASS_NAME , methodName , "703" );
141147 }
@@ -157,20 +163,17 @@ public void run() {
157163 final String methodName = "run" ;
158164 callbackThread = Thread .currentThread ();
159165 callbackThread .setName (threadName );
160-
161- try {
162- runningSemaphore .acquire ();
163- } catch (InterruptedException e ) {
164- running = false ;
165- return ;
166+
167+ synchronized (lifecycle ) {
168+ current_state = State .RUNNING ;
166169 }
167170
168- while (running ) {
171+ while (isRunning () ) {
169172 try {
170173 // If no work is currently available, then wait until there is some...
171174 try {
172175 synchronized (workAvailable ) {
173- if (running && messageQueue .isEmpty ()
176+ if (isRunning () && messageQueue .isEmpty ()
174177 && completeQueue .isEmpty ()) {
175178 // @TRACE 704=wait for workAvailable
176179 log .fine (CLASS_NAME , methodName , "704" );
@@ -180,7 +183,7 @@ public void run() {
180183 } catch (InterruptedException e ) {
181184 }
182185
183- if (running ) {
186+ if (isRunning () ) {
184187 // Check for deliveryComplete callbacks...
185188 MqttToken token = null ;
186189 synchronized (completeQueue ) {
@@ -211,7 +214,7 @@ public void run() {
211214 }
212215 }
213216
214- if (quiescing ) {
217+ if (isQuiescing () ) {
215218 clientState .checkQuiesceLock ();
216219 }
217220
@@ -220,10 +223,10 @@ public void run() {
220223 // of class NoClassDefFoundError
221224 // @TRACE 714=callback threw exception
222225 log .fine (CLASS_NAME , methodName , "714" , null , ex );
223- running = false ;
226+
224227 clientComms .shutdownConnection (null , new MqttException (ex ));
225228 } finally {
226- runningSemaphore . release ();
229+
227230 synchronized (spaceAvailable ) {
228231 // Notify the spaceAvailable lock, to say that there's now
229232 // some space on the queue...
@@ -234,6 +237,10 @@ public void run() {
234237 }
235238 }
236239 }
240+ synchronized (lifecycle ) {
241+ current_state = State .STOPPED ;
242+ }
243+ callbackThread = null ;
237244 }
238245
239246 private void handleActionComplete (MqttToken token )
@@ -349,7 +356,7 @@ public void messageArrived(MqttPublish sendMessage) {
349356 // the client protect itself from getting flooded by messages
350357 // from the server.
351358 synchronized (spaceAvailable ) {
352- while (running && !quiescing && messageQueue .size () >= INBOUND_QUEUE_SIZE ) {
359+ while (isRunning () && !isQuiescing () && messageQueue .size () >= INBOUND_QUEUE_SIZE ) {
353360 try {
354361 // @TRACE 709=wait for spaceAvailable
355362 log .fine (CLASS_NAME , methodName , "709" );
@@ -358,7 +365,7 @@ public void messageArrived(MqttPublish sendMessage) {
358365 }
359366 }
360367 }
361- if (!quiescing ) {
368+ if (!isQuiescing () ) {
362369 messageQueue .addElement (sendMessage );
363370 // Notify the CommsCallback thread that there's work to do...
364371 synchronized (workAvailable ) {
@@ -377,7 +384,10 @@ public void messageArrived(MqttPublish sendMessage) {
377384 */
378385 public void quiesce () {
379386 final String methodName = "quiesce" ;
380- this .quiescing = true ;
387+ synchronized (lifecycle ) {
388+ if (current_state == State .RUNNING )
389+ current_state = State .QUIESCING ;
390+ }
381391 synchronized (spaceAvailable ) {
382392 // @TRACE 711=quiesce notify spaceAvailable
383393 log .fine (CLASS_NAME , methodName , "711" );
@@ -387,7 +397,7 @@ public void quiesce() {
387397 }
388398
389399 public boolean isQuiesced () {
390- if (quiescing && completeQueue .size () == 0 && messageQueue .size () == 0 ) {
400+ if (isQuiescing () && completeQueue .size () == 0 && messageQueue .size () == 0 ) {
391401 return true ;
392402 }
393403 return false ;
@@ -435,7 +445,7 @@ public void messageArrivedComplete(int messageId, int qos)
435445 public void asyncOperationComplete (MqttToken token ) {
436446 final String methodName = "asyncOperationComplete" ;
437447
438- if (running ) {
448+ if (isRunning () ) {
439449 // invoke callbacks on callback thread
440450 completeQueue .addElement (token );
441451 synchronized (workAvailable ) {
@@ -487,7 +497,7 @@ protected boolean deliverMessage(String topicName, int messageId, MqttMessage aM
487497 {
488498 boolean delivered = false ;
489499
490- Enumeration keys = callbacks .keys ();
500+ Enumeration < String > keys = callbacks .keys ();
491501 while (keys .hasMoreElements ()) {
492502 String topicFilter = (String )keys .nextElement ();
493503 if (MqttTopic .isMatched (topicFilter , topicName )) {
@@ -506,5 +516,22 @@ protected boolean deliverMessage(String topicName, int messageId, MqttMessage aM
506516
507517 return delivered ;
508518 }
519+
520+ public boolean isRunning () {
521+ boolean result ;
522+ synchronized (lifecycle ) {
523+ result = ((current_state == State .RUNNING || current_state == State .QUIESCING )
524+ && target_state == State .RUNNING );
525+ }
526+ return result ;
527+ }
528+
529+ public boolean isQuiescing () {
530+ boolean result ;
531+ synchronized (lifecycle ) {
532+ result = (current_state == State .QUIESCING );
533+ }
534+ return result ;
535+ }
509536
510537}
0 commit comments