2222import java .util .Enumeration ;
2323import java .util .Properties ;
2424import java .util .Vector ;
25+ import java .util .concurrent .ExecutorService ;
26+ import java .util .concurrent .TimeUnit ;
2527
2628import org .eclipse .paho .client .mqttv3 .BufferedMessage ;
2729import org .eclipse .paho .client .mqttv3 .IMqttActionListener ;
@@ -80,7 +82,9 @@ public class ClientComms {
8082 private boolean closePending = false ;
8183 private boolean resting = false ;
8284 private DisconnectedMessageBuffer disconnectedMessageBuffer ;
83-
85+
86+ private ExecutorService executorService ;
87+
8488 /**
8589 * Creates a new ClientComms object, using the specified module to handle
8690 * the network calls.
@@ -89,13 +93,14 @@ public class ClientComms {
8993 * @param pingSender the {@link MqttPingSender}
9094 * @throws MqttException if an exception occurs whilst communicating with the server
9195 */
92- public ClientComms (IMqttAsyncClient client , MqttClientPersistence persistence , MqttPingSender pingSender ) throws MqttException {
96+ public ClientComms (IMqttAsyncClient client , MqttClientPersistence persistence , MqttPingSender pingSender , ExecutorService executorService ) throws MqttException {
9397 this .conState = DISCONNECTED ;
9498 this .client = client ;
9599 this .persistence = persistence ;
96100 this .pingSender = pingSender ;
97101 this .pingSender .init (this );
98-
102+ this .executorService = executorService ;
103+
99104 this .tokenStore = new CommsTokenStore (getClient ().getClientId ());
100105 this .callback = new CommsCallback (this );
101106 this .clientState = new ClientState (persistence , tokenStore , this .callback , this , pingSender );
@@ -108,6 +113,22 @@ CommsReceiver getReceiver() {
108113 return receiver ;
109114 }
110115
116+ private void shutdownExecutorService () {
117+ String methodName = "shutdownExecutorService" ;
118+ executorService .shutdown ();
119+ try {
120+ if (!executorService .awaitTermination (1 , TimeUnit .SECONDS )) {
121+ executorService .shutdownNow ();
122+ if (!executorService .awaitTermination (1 , TimeUnit .SECONDS )) {
123+ log .fine (CLASS_NAME , methodName , "executorService did not terminate" );
124+ }
125+ }
126+ } catch (InterruptedException ie ) {
127+ executorService .shutdownNow ();
128+ Thread .currentThread ().interrupt ();
129+ }
130+ }
131+
111132 /**
112133 * Sends a message to the server. Does not check if connected this validation must be done
113134 * by invoking routines.
@@ -205,7 +226,7 @@ public void close() throws MqttException {
205226 }
206227
207228 conState = CLOSED ;
208-
229+ shutdownExecutorService ();
209230 // ShutdownConnection has already cleaned most things
210231 clientState .close ();
211232 clientState = null ;
@@ -254,7 +275,7 @@ public void connect(MqttConnectOptions options, MqttToken token) throws MqttExce
254275 this .clientState .setMaxInflight (conOptions .getMaxInflight ());
255276
256277 tokenStore .open ();
257- ConnectBG conbg = new ConnectBG (this , token , connect );
278+ ConnectBG conbg = new ConnectBG (this , token , connect , executorService );
258279 conbg .start ();
259280 }
260281 else {
@@ -479,7 +500,7 @@ public void disconnect(MqttDisconnect disconnect, long quiesceTimeout, MqttToken
479500 //@TRACE 218=state=DISCONNECTING
480501 log .fine (CLASS_NAME ,methodName ,"218" );
481502 conState = DISCONNECTING ;
482- DisconnectBG discbg = new DisconnectBG (disconnect ,quiesceTimeout ,token );
503+ DisconnectBG discbg = new DisconnectBG (disconnect ,quiesceTimeout ,token , executorService );
483504 discbg .start ();
484505 }
485506 }
@@ -636,22 +657,23 @@ public Properties getDebug() {
636657 // the socket could take time to create.
637658 private class ConnectBG implements Runnable {
638659 ClientComms clientComms = null ;
639- Thread cBg = null ;
640660 MqttToken conToken ;
641661 MqttConnect conPacket ;
662+ private String threadName ;
642663
643- ConnectBG (ClientComms cc , MqttToken cToken , MqttConnect cPacket ) {
664+ ConnectBG (ClientComms cc , MqttToken cToken , MqttConnect cPacket , ExecutorService executorService ) {
644665 clientComms = cc ;
645666 conToken = cToken ;
646667 conPacket = cPacket ;
647- cBg = new Thread ( this , "MQTT Con: " +getClient ().getClientId () );
668+ threadName = "MQTT Con: " +getClient ().getClientId ();
648669 }
649670
650671 void start () {
651- cBg . start ( );
672+ executorService . execute ( this );
652673 }
653674
654675 public void run () {
676+ Thread .currentThread ().setName (threadName );
655677 final String methodName = "connectBG:run" ;
656678 MqttException mqttEx = null ;
657679 //@TRACE 220=>
@@ -675,10 +697,10 @@ public void run() {
675697 NetworkModule networkModule = networkModules [networkModuleIndex ];
676698 networkModule .start ();
677699 receiver = new CommsReceiver (clientComms , clientState , tokenStore , networkModule .getInputStream ());
678- receiver .start ("MQTT Rec: " +getClient ().getClientId ());
700+ receiver .start ("MQTT Rec: " +getClient ().getClientId (), executorService );
679701 sender = new CommsSender (clientComms , clientState , tokenStore , networkModule .getOutputStream ());
680- sender .start ("MQTT Snd: " +getClient ().getClientId ());
681- callback .start ("MQTT Call: " +getClient ().getClientId ());
702+ sender .start ("MQTT Snd: " +getClient ().getClientId (), executorService );
703+ callback .start ("MQTT Call: " +getClient ().getClientId (), executorService );
682704 internalSend (conPacket , conToken );
683705 } catch (MqttException ex ) {
684706 //@TRACE 212=connect failed: unexpected exception
@@ -699,23 +721,24 @@ public void run() {
699721 // Kick off the disconnect processing in the background so that it does not block. For instance
700722 // the quiesce
701723 private class DisconnectBG implements Runnable {
702- Thread dBg = null ;
703724 MqttDisconnect disconnect ;
704725 long quiesceTimeout ;
705726 MqttToken token ;
727+ private String threadName ;
706728
707- DisconnectBG (MqttDisconnect disconnect , long quiesceTimeout , MqttToken token ) {
729+ DisconnectBG (MqttDisconnect disconnect , long quiesceTimeout , MqttToken token , ExecutorService executorService ) {
708730 this .disconnect = disconnect ;
709731 this .quiesceTimeout = quiesceTimeout ;
710732 this .token = token ;
711733 }
712734
713735 void start () {
714- dBg = new Thread ( this , "MQTT Disc: " +getClient ().getClientId () );
715- dBg . start ( );
736+ threadName = "MQTT Disc: " +getClient ().getClientId ();
737+ executorService . execute ( this );
716738 }
717-
739+
718740 public void run () {
741+ Thread .currentThread ().setName (threadName );
719742 final String methodName = "disconnectBG:run" ;
720743 //@TRACE 221=>
721744 log .fine (CLASS_NAME , methodName , "221" );
@@ -835,7 +858,7 @@ public void publishBufferedMessage(BufferedMessage bufferedMessage) throws MqttE
835858 }
836859 }
837860 });
838- new Thread ( disconnectedMessageBuffer ). start ( );
861+ executorService . execute ( disconnectedMessageBuffer );
839862 }
840863 }
841864
0 commit comments