Skip to content

Commit 20d95a4

Browse files
committed
Refactored thread stopping and starting, to not introduce 100ms delays by waiting for threads to start & stop with Thread.sleep(). Threads will enter the running state immediately when start() is called. If the ExecutorService is used, gracefully terminate threads by waiting for their completion, rather than interupting.
Related commits: a6faf77, 4b23f12 Signed-off-by: Liu Woon Yung <pirorin187@gmail.com>
1 parent e92138e commit 20d95a4

3 files changed

Lines changed: 70 additions & 58 deletions

File tree

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsCallback.java

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Enumeration;
2222
import java.util.Hashtable;
2323
import java.util.Vector;
24+
import java.util.concurrent.ExecutionException;
2425
import java.util.concurrent.ExecutorService;
2526
import java.util.concurrent.Future;
2627

@@ -98,37 +99,33 @@ public void start(String threadName, ExecutorService executorService) {
9899
completeQueue.clear();
99100

100101
target_state = State.RUNNING;
102+
current_state = State.RUNNING;
101103
if (executorService == null) {
102-
new Thread(this).start();
104+
callbackFuture = null;
105+
callbackThread = new Thread(this);
106+
callbackThread.start();
103107
} else {
108+
callbackThread = null;
104109
callbackFuture = executorService.submit(this);
105110
}
106111
}
107112
}
108-
while (!isRunning()) {
109-
try { Thread.sleep(100); } catch (Exception e) { }
110-
}
111113
}
112-
113114

114115
/**
115116
* Stops the callback thread.
116117
* This call will block until stop has completed.
117118
*/
118119
public void stop() {
119120
final String methodName = "stop";
120-
121-
synchronized (lifecycle) {
122-
if (callbackFuture != null) {
123-
callbackFuture.cancel(true);
124-
}
125-
}
121+
126122
if (isRunning()) {
127123
// @TRACE 700=stopping
128124
log.fine(CLASS_NAME, methodName, "700");
129125
synchronized (lifecycle) {
130126
target_state = State.STOPPED;
131127
}
128+
// Do not allow a thread to wait for itself.
132129
if (!Thread.currentThread().equals(callbackThread)) {
133130
synchronized (workAvailable) {
134131
// @TRACE 701=notify workAvailable and wait for run
@@ -137,9 +134,16 @@ public void stop() {
137134
workAvailable.notifyAll();
138135
}
139136
// Wait for the thread to finish.
140-
while (isRunning()) {
141-
try { Thread.sleep(100); } catch (Exception e) { }
142-
clientState.notifyQueueLock();
137+
if (callbackFuture != null) {
138+
try {
139+
callbackFuture.get();
140+
} catch (ExecutionException | InterruptedException e) {
141+
}
142+
} else {
143+
try {
144+
callbackThread.join();
145+
} catch (InterruptedException e) {
146+
}
143147
}
144148
}
145149
// @TRACE 703=stopped
@@ -163,10 +167,6 @@ public void run() {
163167
final String methodName = "run";
164168
callbackThread = Thread.currentThread();
165169
callbackThread.setName(threadName);
166-
167-
synchronized (lifecycle) {
168-
current_state = State.RUNNING;
169-
}
170170

171171
while (isRunning()) {
172172
try {
@@ -240,7 +240,6 @@ public void run() {
240240
synchronized (lifecycle) {
241241
current_state = State.STOPPED;
242242
}
243-
callbackThread = null;
244243
}
245244

246245
private void handleActionComplete(MqttToken token)

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsReceiver.java

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.io.IOException;
1919
import java.io.InputStream;
20+
import java.util.concurrent.ExecutionException;
2021
import java.util.concurrent.ExecutorService;
2122
import java.util.concurrent.Future;
2223

@@ -44,7 +45,7 @@ private enum State {STOPPED, RUNNING, STARTING, RECEIVING};
4445
private Object lifecycle = new Object();
4546
private String threadName;
4647
private Future<?> receiverFuture;
47-
48+
4849
private ClientState clientState = null;
4950
private ClientComms clientComms = null;
5051
private MqttInputStream in;
@@ -72,35 +73,47 @@ public void start(String threadName, ExecutorService executorService) {
7273
synchronized (lifecycle) {
7374
if (current_state == State.STOPPED && target_state == State.STOPPED) {
7475
target_state = State.RUNNING;
76+
current_state = State.RUNNING;
7577
if (executorService == null) {
76-
new Thread(this).start();
78+
receiverFuture = null;
79+
recThread = new Thread(this);
80+
recThread.start();
7781
} else {
82+
recThread = null;
7883
receiverFuture = executorService.submit(this);
7984
}
8085
}
8186
}
82-
while (!isRunning()) {
83-
try { Thread.sleep(100); } catch (Exception e) { }
84-
}
8587
}
8688

8789
/**
8890
* Stops the Receiver's thread. This call will block.
8991
*/
9092
public void stop() {
9193
final String methodName = "stop";
94+
boolean isRunning;
95+
9296
synchronized (lifecycle) {
93-
if (receiverFuture != null) {
94-
receiverFuture.cancel(true);
95-
}
9697
//@TRACE 850=stopping
9798
log.fine(CLASS_NAME,methodName, "850");
98-
if (isRunning()) {
99+
isRunning = isRunning();
100+
if (isRunning) {
99101
target_state = State.STOPPED;
100102
}
101103
}
102-
while (isRunning()) {
103-
try { Thread.sleep(100); } catch (Exception e) { }
104+
// This and the clause above will prevent a thread from waiting for itself.
105+
if (isRunning) {
106+
if (receiverFuture != null) {
107+
try {
108+
receiverFuture.get();
109+
} catch (ExecutionException | InterruptedException e) {
110+
}
111+
} else {
112+
try {
113+
recThread.join();
114+
} catch (InterruptedException e) {
115+
}
116+
}
104117
}
105118
//@TRACE 851=stopped
106119
log.fine(CLASS_NAME,methodName,"851");
@@ -110,15 +123,10 @@ public void stop() {
110123
* Run loop to receive messages from the server.
111124
*/
112125
public void run() {
113-
recThread = Thread.currentThread();
114-
recThread.setName(threadName);
126+
Thread.currentThread().setName(threadName);
115127
final String methodName = "run";
116128
MqttToken token = null;
117129

118-
synchronized (lifecycle) {
119-
current_state = State.RUNNING;
120-
}
121-
122130
try {
123131
State my_target;
124132
synchronized (lifecycle) {
@@ -204,7 +212,6 @@ public void run() {
204212
}
205213
} // end try
206214

207-
recThread = null;
208215
//@TRACE 854=<
209216
log.fine(CLASS_NAME,methodName,"854");
210217
}

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsSender.java

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.io.IOException;
1919
import java.io.OutputStream;
20+
import java.util.concurrent.ExecutionException;
2021
import java.util.concurrent.ExecutorService;
2122
import java.util.concurrent.Future;
2223

@@ -42,7 +43,7 @@ private enum State {STOPPED, RUNNING, STARTING};
4243
private Thread sendThread = null;
4344
private String threadName;
4445
private Future<?> senderFuture;
45-
46+
4647
private ClientState clientState = null;
4748
private MqttOutputStream out;
4849
private ClientComms clientComms = null;
@@ -67,56 +68,62 @@ public void start(String threadName, ExecutorService executorService) {
6768
synchronized (lifecycle) {
6869
if (current_state == State.STOPPED && target_state == State.STOPPED) {
6970
target_state = State.RUNNING;
71+
current_state = State.RUNNING;
7072
if (executorService == null) {
71-
new Thread(this).start();
73+
senderFuture = null;
74+
sendThread = new Thread(this);
75+
sendThread.start();
7276
} else {
77+
sendThread = null;
7378
senderFuture = executorService.submit(this);
7479
}
7580
}
7681
}
77-
while (!isRunning()) {
78-
try { Thread.sleep(100); } catch (Exception e) { }
79-
}
8082
}
8183

8284
/**
8385
* Stops the Sender's thread. This call will block.
8486
*/
8587
public void stop() {
8688
final String methodName = "stop";
87-
89+
boolean isRunning;
90+
8891
if (!isRunning()) {
8992
return;
9093
}
91-
94+
9295
synchronized (lifecycle) {
93-
if (senderFuture != null) {
94-
senderFuture.cancel(true);
95-
}
9696
//@TRACE 800=stopping sender
9797
log.fine(CLASS_NAME,methodName,"800");
98-
if (isRunning()) {
98+
isRunning = isRunning();
99+
if (isRunning) {
99100
target_state = State.STOPPED;
100101
clientState.notifyQueueLock();
101102
}
102103
}
103-
while (isRunning()) {
104-
try { Thread.sleep(100); } catch (Exception e) { }
105-
clientState.notifyQueueLock();
104+
// This and the clause above will prevent a thread from waiting for itself.
105+
if (isRunning) {
106+
if (senderFuture != null) {
107+
try {
108+
senderFuture.get();
109+
} catch (ExecutionException | InterruptedException e) {
110+
}
111+
} else {
112+
try {
113+
sendThread.join();
114+
} catch (InterruptedException e) {
115+
e.printStackTrace();
116+
}
117+
}
106118
}
107119
//@TRACE 801=stopped
108120
log.fine(CLASS_NAME,methodName,"801");
109121
}
110122

111123
public void run() {
112-
sendThread = Thread.currentThread();
113-
sendThread.setName(threadName);
124+
Thread.currentThread().setName(threadName);
114125
final String methodName = "run";
115126
MqttWireMessage message = null;
116-
117-
synchronized (lifecycle) {
118-
current_state = State.RUNNING;
119-
}
120127

121128
try {
122129
State my_target;
@@ -176,7 +183,6 @@ public void run() {
176183
} finally {
177184
synchronized (lifecycle) {
178185
current_state = State.STOPPED;
179-
sendThread = null;
180186
}
181187
}
182188

0 commit comments

Comments
 (0)