Skip to content

Commit 0941287

Browse files
author
Ranjan Dasgupta
authored
Merge pull request #920 from PavelAnikeichyk/develop
Fixed issue #850 - MQTT Con thread leak on connection failure
2 parents 3daa4dd + a356442 commit 0941287

3 files changed

Lines changed: 45 additions & 0 deletions

File tree

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsCallback.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.ExecutionException;
2525
import java.util.concurrent.ExecutorService;
2626
import java.util.concurrent.Future;
27+
import java.util.concurrent.atomic.AtomicInteger;
2728

2829
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
2930
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
@@ -41,6 +42,8 @@
4142
import org.eclipse.paho.client.mqttv3.logging.Logger;
4243
import org.eclipse.paho.client.mqttv3.logging.LoggerFactory;
4344

45+
import static org.eclipse.paho.client.mqttv3.internal.CommsSender.MAX_STOPPED_STATE_TO_STOP_THREAD;
46+
4447
/**
4548
* Bridge between Receiver and the external API. This class gets called by
4649
* Receiver, and then converts the comms-centric MQTT message objects into ones
@@ -113,6 +116,18 @@ public void start(String threadName, ExecutorService executorService) {
113116
}
114117
}
115118
}
119+
120+
AtomicInteger stoppedStateCounter = new AtomicInteger(0);
121+
while (!isRunning()) {
122+
try { Thread.sleep(100); } catch (Exception e) { }
123+
if (current_state == State.STOPPED) {
124+
if (stoppedStateCounter.incrementAndGet() > MAX_STOPPED_STATE_TO_STOP_THREAD) {
125+
break;
126+
}
127+
} else {
128+
stoppedStateCounter.set(0);
129+
}
130+
}
116131
}
117132

118133
/**

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsReceiver.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.concurrent.ExecutionException;
2121
import java.util.concurrent.ExecutorService;
2222
import java.util.concurrent.Future;
23+
import java.util.concurrent.atomic.AtomicInteger;
2324

2425
import org.eclipse.paho.client.mqttv3.MqttException;
2526
import org.eclipse.paho.client.mqttv3.MqttToken;
@@ -32,6 +33,8 @@
3233
import org.eclipse.paho.client.mqttv3.logging.Logger;
3334
import org.eclipse.paho.client.mqttv3.logging.LoggerFactory;
3435

36+
import static org.eclipse.paho.client.mqttv3.internal.CommsSender.MAX_STOPPED_STATE_TO_STOP_THREAD;
37+
3538
/**
3639
* Receives MQTT packets from the server.
3740
*/
@@ -85,6 +88,18 @@ public void start(String threadName, ExecutorService executorService) {
8588
}
8689
}
8790
}
91+
92+
AtomicInteger stoppedStateCounter = new AtomicInteger(0);
93+
while (!isRunning()) {
94+
try { Thread.sleep(100); } catch (Exception e) { }
95+
if (current_state == State.STOPPED) {
96+
if (stoppedStateCounter.incrementAndGet() > MAX_STOPPED_STATE_TO_STOP_THREAD) {
97+
break;
98+
}
99+
} else {
100+
stoppedStateCounter.set(0);
101+
}
102+
}
88103
}
89104

90105
/**

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsSender.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.concurrent.ExecutionException;
2121
import java.util.concurrent.ExecutorService;
2222
import java.util.concurrent.Future;
23+
import java.util.concurrent.atomic.AtomicInteger;
2324

2425
import org.eclipse.paho.client.mqttv3.MqttException;
2526
import org.eclipse.paho.client.mqttv3.MqttToken;
@@ -35,6 +36,8 @@ public class CommsSender implements Runnable {
3536
private static final String CLASS_NAME = CommsSender.class.getName();
3637
private Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
3738

39+
public static final int MAX_STOPPED_STATE_TO_STOP_THREAD = 300; // 30 seconds
40+
3841
//Sends MQTT packets to the server on its own thread
3942
private enum State {STOPPED, RUNNING, STARTING}
4043

@@ -80,6 +83,18 @@ public void start(String threadName, ExecutorService executorService) {
8083
}
8184
}
8285
}
86+
87+
AtomicInteger stoppedStateCounter = new AtomicInteger(0);
88+
while (!isRunning()) {
89+
try { Thread.sleep(100); } catch (Exception e) { }
90+
if (current_state == State.STOPPED) {
91+
if (stoppedStateCounter.incrementAndGet() > MAX_STOPPED_STATE_TO_STOP_THREAD) {
92+
break;
93+
}
94+
} else {
95+
stoppedStateCounter.set(0);
96+
}
97+
}
8398
}
8499

85100
/**

0 commit comments

Comments
 (0)