2020import java .util .concurrent .ExecutorService ;
2121import java .util .concurrent .Future ;
2222import java .util .concurrent .Semaphore ;
23+ import java .util .concurrent .TimeUnit ;
2324
2425import org .eclipse .paho .client .mqttv3 .MqttException ;
2526import org .eclipse .paho .client .mqttv3 .MqttToken ;
@@ -39,7 +40,7 @@ public class CommsReceiver implements Runnable {
3940 private static final String CLASS_NAME = CommsReceiver .class .getName ();
4041 private static final Logger log = LoggerFactory .getLogger (LoggerFactory .MQTT_CLIENT_MSG_CAT , CLASS_NAME );
4142
42- private boolean running = false ;
43+ private volatile boolean running = false ;
4344 private Object lifecycle = new Object ();
4445 private ClientState clientState = null ;
4546 private ClientComms clientComms = null ;
@@ -95,7 +96,22 @@ public void stop() {
9596 if (!Thread .currentThread ().equals (recThread )) {
9697 try {
9798 // Wait for the thread to finish.
98- runningSemaphore .acquire ();
99+ // We need to set a certain timeout since there is no universal way
100+ // to wake up the worker thread from blocking read.
101+ // If network module is websocket, the worker thread returns from read with InterruptedIOException
102+ // in response to receiverFuture.cancel(true) above.
103+ //
104+ // If network module is tcp or ssl, however, InterruptedIOException is not thrown on read.
105+ // In normal cases, the worker thread returns from read with EOFException when
106+ // the connection is terminated from the broker side in response to DISCONNECT packet.
107+ // In this case, the SSL session is preserved and available for reuse in later connections.
108+ //
109+ // If stop is called because the broker connection is unexpectedly lost,
110+ // the worker thread will keep waiting on read, and tryAcquire below will return on timeout expiration.
111+ // The worker thread wakes up with SocketException when the socket is closed later.
112+ // In this case, the SSL session is invalidated by SSLSocket implementation.
113+ //
114+ runningSemaphore .tryAcquire (3000 , TimeUnit .MILLISECONDS );
99115 }
100116 catch (InterruptedException ex ) {
101117 } finally {
@@ -124,67 +140,70 @@ public void run() {
124140 running = false ;
125141 return ;
126142 }
127-
128- while (running && (in != null )) {
129- try {
130- //@TRACE 852=network read message
131- log .fine (CLASS_NAME ,methodName ,"852" );
132- receiving = in .available () > 0 ;
133- MqttWireMessage message = in .readMqttWireMessage ();
134- receiving = false ;
135-
136- // instanceof checks if message is null
137- if (message instanceof MqttAck ) {
138- token = tokenStore .getToken (message );
139- if (token !=null ) {
140- synchronized (token ) {
141- // Ensure the notify processing is done under a lock on the token
142- // This ensures that the send processing can complete before the
143- // receive processing starts! ( request and ack and ack processing
144- // can occur before request processing is complete if not!
145- clientState .notifyReceivedAck ((MqttAck )message );
143+
144+ try {
145+ while (running && (in != null )) {
146+ try {
147+ //@TRACE 852=network read message
148+ log .fine (CLASS_NAME ,methodName ,"852" );
149+ receiving = in .available () > 0 ;
150+ MqttWireMessage message = in .readMqttWireMessage ();
151+ receiving = false ;
152+
153+ // instanceof checks if message is null
154+ if (message instanceof MqttAck ) {
155+ token = tokenStore .getToken (message );
156+ if (token !=null ) {
157+ synchronized (token ) {
158+ // Ensure the notify processing is done under a lock on the token
159+ // This ensures that the send processing can complete before the
160+ // receive processing starts! ( request and ack and ack processing
161+ // can occur before request processing is complete if not!
162+ clientState .notifyReceivedAck ((MqttAck )message );
163+ }
164+ } else if (message instanceof MqttPubRec || message instanceof MqttPubComp || message instanceof MqttPubAck ) {
165+ //This is an ack for a message we no longer have a ticket for.
166+ //This probably means we already received this message and it's being send again
167+ //because of timeouts, crashes, disconnects, restarts etc.
168+ //It should be safe to ignore these unexpected messages.
169+ log .fine (CLASS_NAME , methodName , "857" );
170+ } else {
171+ // It its an ack and there is no token then something is not right.
172+ // An ack should always have a token assoicated with it.
173+ throw new MqttException (MqttException .REASON_CODE_UNEXPECTED_ERROR );
146174 }
147- } else if (message instanceof MqttPubRec || message instanceof MqttPubComp || message instanceof MqttPubAck ) {
148- //This is an ack for a message we no longer have a ticket for.
149- //This probably means we already received this message and it's being send again
150- //because of timeouts, crashes, disconnects, restarts etc.
151- //It should be safe to ignore these unexpected messages.
152- log .fine (CLASS_NAME , methodName , "857" );
153175 } else {
154- // It its an ack and there is no token then something is not right.
155- // An ack should always have a token assoicated with it.
156- throw new MqttException (MqttException .REASON_CODE_UNEXPECTED_ERROR );
176+ if (message != null ) {
177+ // A new message has arrived
178+ clientState .notifyReceivedMsg (message );
179+ }
157180 }
158- } else {
159- if (message != null ) {
160- // A new message has arrived
161- clientState .notifyReceivedMsg (message );
181+ }
182+ catch (MqttException ex ) {
183+ //@TRACE 856=Stopping, MQttException
184+ log .fine (CLASS_NAME ,methodName ,"856" ,null ,ex );
185+ running = false ;
186+ // Token maybe null but that is handled in shutdown
187+ clientComms .shutdownConnection (token , ex );
188+ }
189+ catch (IOException ioe ) {
190+ //@TRACE 853=Stopping due to IOException
191+ log .fine (CLASS_NAME ,methodName ,"853" );
192+
193+ running = false ;
194+ // An EOFException could be raised if the broker processes the
195+ // DISCONNECT and ends the socket before we complete. As such,
196+ // only shutdown the connection if we're not already shutting down.
197+ if (!clientComms .isDisconnecting ()) {
198+ clientComms .shutdownConnection (token , new MqttException (MqttException .REASON_CODE_CONNECTION_LOST , ioe ));
162199 }
163200 }
164- }
165- catch (MqttException ex ) {
166- //@TRACE 856=Stopping, MQttException
167- log .fine (CLASS_NAME ,methodName ,"856" ,null ,ex );
168- running = false ;
169- // Token maybe null but that is handled in shutdown
170- clientComms .shutdownConnection (token , ex );
171- }
172- catch (IOException ioe ) {
173- //@TRACE 853=Stopping due to IOException
174- log .fine (CLASS_NAME ,methodName ,"853" );
175-
176- running = false ;
177- // An EOFException could be raised if the broker processes the
178- // DISCONNECT and ends the socket before we complete. As such,
179- // only shutdown the connection if we're not already shutting down.
180- if (!clientComms .isDisconnecting ()) {
181- clientComms .shutdownConnection (token , new MqttException (MqttException .REASON_CODE_CONNECTION_LOST , ioe ));
201+ finally {
202+ receiving = false ;
182203 }
183204 }
184- finally {
185- receiving = false ;
186- runningSemaphore .release ();
187- }
205+ } finally {
206+ runningSemaphore .release ();
188207 }
189208
190209 //@TRACE 854=<
0 commit comments