Skip to content

Commit a6faf77

Browse files
david-katzjpwsutton
authored andcommitted
Fixing Client Shutdown Bug(#379)
* Do not quiesce if clientState is null Fixes NPE experienced in some shutdown scenarios Change-Id: I41b0bfcc64a8e5b60d5a04cc116ad8e2f5cf3fff Signed-off-by: David Katz <David.Katz@bmw-carit.de> * Shutdown tasks submitted to the executor in stop methods Change-Id: I4e1de3bfa8b330b831fff981eeecea3c501b42f5 Signed-off-by: David Katz <David.Katz@bmw-carit.de>
1 parent ec97a95 commit a6faf77

4 files changed

Lines changed: 21 additions & 4 deletions

File tree

org.eclipse.paho.client.mqttv3/src/main/java-templates/org/eclipse/paho/client/mqttv3/internal/ClientComms.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,9 @@ public void disconnectForcibly(long quiesceTimeout, long disconnectTimeout) thro
519519
*/
520520
public void disconnectForcibly(long quiesceTimeout, long disconnectTimeout, boolean sendDisconnectPacket) throws MqttException {
521521
// Allow current inbound and outbound work to complete
522-
clientState.quiesce(quiesceTimeout);
522+
if (clientState != null) {
523+
clientState.quiesce(quiesceTimeout);
524+
}
523525
MqttToken token = new MqttToken(client.getClientId());
524526
try {
525527
// Send disconnect packet

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsCallback.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Hashtable;
2323
import java.util.Vector;
2424
import java.util.concurrent.ExecutorService;
25+
import java.util.concurrent.Future;
2526
import java.util.concurrent.Semaphore;
2627

2728
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
@@ -65,6 +66,7 @@ public class CommsCallback implements Runnable {
6566
private boolean manualAcks = false;
6667
private String threadName;
6768
private final Semaphore runningSemaphore = new Semaphore(1);
69+
private Future callbackFuture;
6870

6971
CommsCallback(ClientComms clientComms) {
7072
this.clientComms = clientComms;
@@ -94,7 +96,7 @@ public void start(String threadName, ExecutorService executorService) {
9496

9597
running = true;
9698
quiescing = false;
97-
executorService.execute(this);
99+
callbackFuture = executorService.submit(this);
98100
}
99101
}
100102
}
@@ -106,6 +108,9 @@ public void start(String threadName, ExecutorService executorService) {
106108
public void stop() {
107109
final String methodName = "stop";
108110
synchronized (lifecycle) {
111+
if (callbackFuture != null) {
112+
callbackFuture.cancel(true);
113+
}
109114
if (running) {
110115
// @TRACE 700=stopping
111116
log.fine(CLASS_NAME, methodName, "700");

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsReceiver.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.io.IOException;
1919
import java.io.InputStream;
2020
import java.util.concurrent.ExecutorService;
21+
import java.util.concurrent.Future;
2122
import java.util.concurrent.Semaphore;
2223

2324
import org.eclipse.paho.client.mqttv3.MqttException;
@@ -48,6 +49,7 @@ public class CommsReceiver implements Runnable {
4849
private volatile boolean receiving;
4950
private final Semaphore runningSemaphore = new Semaphore(1);
5051
private String threadName;
52+
private Future receiverFuture;
5153

5254

5355
public CommsReceiver(ClientComms clientComms, ClientState clientState,CommsTokenStore tokenStore, InputStream in) {
@@ -71,7 +73,7 @@ public void start(String threadName, ExecutorService executorService) {
7173
synchronized (lifecycle) {
7274
if (!running) {
7375
running = true;
74-
executorService.execute(this);
76+
receiverFuture = executorService.submit(this);
7577
}
7678
}
7779
}
@@ -82,6 +84,9 @@ public void start(String threadName, ExecutorService executorService) {
8284
public void stop() {
8385
final String methodName = "stop";
8486
synchronized (lifecycle) {
87+
if (receiverFuture != null) {
88+
receiverFuture.cancel(true);
89+
}
8590
//@TRACE 850=stopping
8691
log.fine(CLASS_NAME,methodName, "850");
8792
if (running) {

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsSender.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.io.IOException;
1919
import java.io.OutputStream;
2020
import java.util.concurrent.ExecutorService;
21+
import java.util.concurrent.Future;
2122
import java.util.concurrent.Semaphore;
2223
import java.util.concurrent.TimeUnit;
2324

@@ -46,6 +47,7 @@ public class CommsSender implements Runnable {
4647

4748
private String threadName;
4849
private final Semaphore runningSemaphore = new Semaphore(1);
50+
private Future senderFuture;
4951

5052
public CommsSender(ClientComms clientComms, ClientState clientState, CommsTokenStore tokenStore, OutputStream out) {
5153
this.out = new MqttOutputStream(clientState, out);
@@ -65,7 +67,7 @@ public void start(String threadName, ExecutorService executorService) {
6567
synchronized (lifecycle) {
6668
if (!running) {
6769
running = true;
68-
executorService.execute(this);
70+
senderFuture = executorService.submit(this);
6971
}
7072
}
7173
}
@@ -77,6 +79,9 @@ public void stop() {
7779
final String methodName = "stop";
7880

7981
synchronized (lifecycle) {
82+
if (senderFuture != null) {
83+
senderFuture.cancel(true);
84+
}
8085
//@TRACE 800=stopping sender
8186
log.fine(CLASS_NAME,methodName,"800");
8287
if (running) {

0 commit comments

Comments
 (0)