Skip to content

Commit b82a843

Browse files
Fixed MQTT Con thread leak on connection failure #850
1 parent 240f82f commit b82a843

3 files changed

Lines changed: 21 additions & 12 deletions

File tree

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsCallback.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,10 @@ public class CommsCallback implements Runnable {
5757
private final Vector<MqttWireMessage> messageQueue;
5858
private final Vector<MqttToken> completeQueue;
5959

60-
private enum State {STOPPED, RUNNING, QUIESCING}
60+
private enum State {READY, RUNNING, QUIESCING, STOPPED}
6161

62-
private State current_state = State.STOPPED;
63-
private State target_state = State.STOPPED;
62+
private State current_state = State.READY;
63+
private State target_state = State.READY;
6464
private final Object lifecycle = new Object();
6565
private Thread callbackThread;
6666
private String threadName;
@@ -92,7 +92,7 @@ public void start(String threadName, ExecutorService executorService) {
9292
this.threadName = threadName;
9393

9494
synchronized (lifecycle) {
95-
if (current_state == State.STOPPED) {
95+
if (current_state == State.READY) {
9696
// Preparatory work before starting the background thread.
9797
// For safety ensure any old events are cleared.
9898
messageQueue.clear();
@@ -108,6 +108,9 @@ public void start(String threadName, ExecutorService executorService) {
108108
}
109109
while (!isRunning()) {
110110
try { Thread.sleep(100); } catch (Exception e) { }
111+
if (current_state == State.STOPPED) {
112+
break;
113+
}
111114
}
112115
}
113116

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsReceiver.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,10 @@ public class CommsReceiver implements Runnable {
3838
private static final String CLASS_NAME = CommsReceiver.class.getName();
3939
private Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
4040

41-
private enum State {STOPPED, RUNNING, STARTING, RECEIVING}
41+
private enum State {READY, RUNNING, STARTING, RECEIVING, STOPPED}
4242

43-
private State current_state = State.STOPPED;
44-
private State target_state = State.STOPPED;
43+
private State current_state = State.READY;
44+
private State target_state = State.READY;
4545
private final Object lifecycle = new Object();
4646
private String threadName;
4747
private Future<?> receiverFuture;
@@ -71,7 +71,7 @@ public void start(String threadName, ExecutorService executorService) {
7171
//@TRACE 855=starting
7272
log.fine(CLASS_NAME,methodName, "855");
7373
synchronized (lifecycle) {
74-
if (current_state == State.STOPPED && target_state == State.STOPPED) {
74+
if (current_state == State.READY && target_state == State.READY) {
7575
target_state = State.RUNNING;
7676
if (executorService == null) {
7777
new Thread(this).start();
@@ -82,6 +82,9 @@ public void start(String threadName, ExecutorService executorService) {
8282
}
8383
while (!isRunning()) {
8484
try { Thread.sleep(100); } catch (Exception e) { }
85+
if (current_state == State.STOPPED) {
86+
break;
87+
}
8588
}
8689
}
8790

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsSender.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@ public class CommsSender implements Runnable {
3535
private Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
3636

3737
//Sends MQTT packets to the server on its own thread
38-
private enum State {STOPPED, RUNNING, STARTING}
38+
private enum State {READY, RUNNING, STARTING, STOPPED}
3939

40-
private State current_state = State.STOPPED;
41-
private State target_state = State.STOPPED;
40+
private State current_state = State.READY;
41+
private State target_state = State.READY;
4242
private final Object lifecycle = new Object();
4343
private Thread sendThread = null;
4444
private String threadName;
@@ -66,7 +66,7 @@ public CommsSender(ClientComms clientComms, ClientState clientState, CommsTokenS
6666
public void start(String threadName, ExecutorService executorService) {
6767
this.threadName = threadName;
6868
synchronized (lifecycle) {
69-
if (current_state == State.STOPPED && target_state == State.STOPPED) {
69+
if (current_state == State.READY && target_state == State.READY) {
7070
target_state = State.RUNNING;
7171
if (executorService == null) {
7272
new Thread(this).start();
@@ -77,6 +77,9 @@ public void start(String threadName, ExecutorService executorService) {
7777
}
7878
while (!isRunning()) {
7979
try { Thread.sleep(100); } catch (Exception e) { }
80+
if (current_state == State.STOPPED) {
81+
break;
82+
}
8083
}
8184
}
8285

0 commit comments

Comments
 (0)