Skip to content

Commit f8a2137

Browse files
author
Ranjan Dasgupta
authored
Merge pull request #671 from sp193/thread-nodelay
Refactored thread stopping and starting, to not introduce 100ms delays per client
2 parents bf2745b + 20d95a4 commit f8a2137

3 files changed

Lines changed: 70 additions & 58 deletions

File tree

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsCallback.java

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Enumeration;
2222
import java.util.Hashtable;
2323
import java.util.Vector;
24+
import java.util.concurrent.ExecutionException;
2425
import java.util.concurrent.ExecutorService;
2526
import java.util.concurrent.Future;
2627

@@ -101,37 +102,33 @@ public void start(String threadName, ExecutorService executorService) {
101102
completeQueue.clear();
102103

103104
target_state = State.RUNNING;
105+
current_state = State.RUNNING;
104106
if (executorService == null) {
105-
new Thread(this).start();
107+
callbackFuture = null;
108+
callbackThread = new Thread(this);
109+
callbackThread.start();
106110
} else {
111+
callbackThread = null;
107112
callbackFuture = executorService.submit(this);
108113
}
109114
}
110115
}
111-
while (!isRunning()) {
112-
try { Thread.sleep(100); } catch (Exception e) { }
113-
}
114116
}
115-
116117

117118
/**
118119
* Stops the callback thread.
119120
* This call will block until stop has completed.
120121
*/
121122
public void stop() {
122123
final String methodName = "stop";
123-
124-
synchronized (lifecycle) {
125-
if (callbackFuture != null) {
126-
callbackFuture.cancel(true);
127-
}
128-
}
124+
129125
if (isRunning()) {
130126
// @TRACE 700=stopping
131127
log.fine(CLASS_NAME, methodName, "700");
132128
synchronized (lifecycle) {
133129
target_state = State.STOPPED;
134130
}
131+
// Do not allow a thread to wait for itself.
135132
if (!Thread.currentThread().equals(callbackThread)) {
136133
synchronized (workAvailable) {
137134
// @TRACE 701=notify workAvailable and wait for run
@@ -140,9 +137,16 @@ public void stop() {
140137
workAvailable.notifyAll();
141138
}
142139
// Wait for the thread to finish.
143-
while (isRunning()) {
144-
try { Thread.sleep(100); } catch (Exception e) { }
145-
clientState.notifyQueueLock();
140+
if (callbackFuture != null) {
141+
try {
142+
callbackFuture.get();
143+
} catch (ExecutionException | InterruptedException e) {
144+
}
145+
} else {
146+
try {
147+
callbackThread.join();
148+
} catch (InterruptedException e) {
149+
}
146150
}
147151
}
148152
// @TRACE 703=stopped
@@ -166,10 +170,6 @@ public void run() {
166170
final String methodName = "run";
167171
callbackThread = Thread.currentThread();
168172
callbackThread.setName(threadName);
169-
170-
synchronized (lifecycle) {
171-
current_state = State.RUNNING;
172-
}
173173

174174
while (isRunning()) {
175175
try {
@@ -243,7 +243,6 @@ public void run() {
243243
synchronized (lifecycle) {
244244
current_state = State.STOPPED;
245245
}
246-
callbackThread = null;
247246
}
248247

249248
private void handleActionComplete(MqttToken token)

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsReceiver.java

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.io.IOException;
1919
import java.io.InputStream;
20+
import java.util.concurrent.ExecutionException;
2021
import java.util.concurrent.ExecutorService;
2122
import java.util.concurrent.Future;
2223

@@ -45,7 +46,7 @@ private enum State {STOPPED, RUNNING, STARTING, RECEIVING}
4546
private final Object lifecycle = new Object();
4647
private String threadName;
4748
private Future<?> receiverFuture;
48-
49+
4950
private ClientState clientState = null;
5051
private ClientComms clientComms = null;
5152
private MqttInputStream in;
@@ -73,35 +74,47 @@ public void start(String threadName, ExecutorService executorService) {
7374
synchronized (lifecycle) {
7475
if (current_state == State.STOPPED && target_state == State.STOPPED) {
7576
target_state = State.RUNNING;
77+
current_state = State.RUNNING;
7678
if (executorService == null) {
77-
new Thread(this).start();
79+
receiverFuture = null;
80+
recThread = new Thread(this);
81+
recThread.start();
7882
} else {
83+
recThread = null;
7984
receiverFuture = executorService.submit(this);
8085
}
8186
}
8287
}
83-
while (!isRunning()) {
84-
try { Thread.sleep(100); } catch (Exception e) { }
85-
}
8688
}
8789

8890
/**
8991
* Stops the Receiver's thread. This call will block.
9092
*/
9193
public void stop() {
9294
final String methodName = "stop";
95+
boolean isRunning;
96+
9397
synchronized (lifecycle) {
94-
if (receiverFuture != null) {
95-
receiverFuture.cancel(true);
96-
}
9798
//@TRACE 850=stopping
9899
log.fine(CLASS_NAME,methodName, "850");
99-
if (isRunning()) {
100+
isRunning = isRunning();
101+
if (isRunning) {
100102
target_state = State.STOPPED;
101103
}
102104
}
103-
while (isRunning()) {
104-
try { Thread.sleep(100); } catch (Exception e) { }
105+
// This and the clause above will prevent a thread from waiting for itself.
106+
if (isRunning) {
107+
if (receiverFuture != null) {
108+
try {
109+
receiverFuture.get();
110+
} catch (ExecutionException | InterruptedException e) {
111+
}
112+
} else {
113+
try {
114+
recThread.join();
115+
} catch (InterruptedException e) {
116+
}
117+
}
105118
}
106119
//@TRACE 851=stopped
107120
log.fine(CLASS_NAME,methodName,"851");
@@ -111,15 +124,10 @@ public void stop() {
111124
* Run loop to receive messages from the server.
112125
*/
113126
public void run() {
114-
recThread = Thread.currentThread();
115-
recThread.setName(threadName);
127+
Thread.currentThread().setName(threadName);
116128
final String methodName = "run";
117129
MqttToken token = null;
118130

119-
synchronized (lifecycle) {
120-
current_state = State.RUNNING;
121-
}
122-
123131
try {
124132
State my_target;
125133
synchronized (lifecycle) {
@@ -213,7 +221,6 @@ public void run() {
213221
}
214222
} // end try
215223

216-
recThread = null;
217224
//@TRACE 854=<
218225
log.fine(CLASS_NAME,methodName,"854");
219226
}

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsSender.java

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.io.IOException;
1919
import java.io.OutputStream;
20+
import java.util.concurrent.ExecutionException;
2021
import java.util.concurrent.ExecutorService;
2122
import java.util.concurrent.Future;
2223

@@ -43,7 +44,7 @@ private enum State {STOPPED, RUNNING, STARTING}
4344
private Thread sendThread = null;
4445
private String threadName;
4546
private Future<?> senderFuture;
46-
47+
4748
private ClientState clientState = null;
4849
private MqttOutputStream out;
4950
private ClientComms clientComms = null;
@@ -68,56 +69,62 @@ public void start(String threadName, ExecutorService executorService) {
6869
synchronized (lifecycle) {
6970
if (current_state == State.STOPPED && target_state == State.STOPPED) {
7071
target_state = State.RUNNING;
72+
current_state = State.RUNNING;
7173
if (executorService == null) {
72-
new Thread(this).start();
74+
senderFuture = null;
75+
sendThread = new Thread(this);
76+
sendThread.start();
7377
} else {
78+
sendThread = null;
7479
senderFuture = executorService.submit(this);
7580
}
7681
}
7782
}
78-
while (!isRunning()) {
79-
try { Thread.sleep(100); } catch (Exception e) { }
80-
}
8183
}
8284

8385
/**
8486
* Stops the Sender's thread. This call will block.
8587
*/
8688
public void stop() {
8789
final String methodName = "stop";
88-
90+
boolean isRunning;
91+
8992
if (!isRunning()) {
9093
return;
9194
}
92-
95+
9396
synchronized (lifecycle) {
94-
if (senderFuture != null) {
95-
senderFuture.cancel(true);
96-
}
9797
//@TRACE 800=stopping sender
9898
log.fine(CLASS_NAME,methodName,"800");
99-
if (isRunning()) {
99+
isRunning = isRunning();
100+
if (isRunning) {
100101
target_state = State.STOPPED;
101102
clientState.notifyQueueLock();
102103
}
103104
}
104-
while (isRunning()) {
105-
try { Thread.sleep(100); } catch (Exception e) { }
106-
clientState.notifyQueueLock();
105+
// This and the clause above will prevent a thread from waiting for itself.
106+
if (isRunning) {
107+
if (senderFuture != null) {
108+
try {
109+
senderFuture.get();
110+
} catch (ExecutionException | InterruptedException e) {
111+
}
112+
} else {
113+
try {
114+
sendThread.join();
115+
} catch (InterruptedException e) {
116+
e.printStackTrace();
117+
}
118+
}
107119
}
108120
//@TRACE 801=stopped
109121
log.fine(CLASS_NAME,methodName,"801");
110122
}
111123

112124
public void run() {
113-
sendThread = Thread.currentThread();
114-
sendThread.setName(threadName);
125+
Thread.currentThread().setName(threadName);
115126
final String methodName = "run";
116127
MqttWireMessage message = null;
117-
118-
synchronized (lifecycle) {
119-
current_state = State.RUNNING;
120-
}
121128

122129
try {
123130
State my_target;
@@ -177,7 +184,6 @@ public void run() {
177184
} finally {
178185
synchronized (lifecycle) {
179186
current_state = State.STOPPED;
180-
sendThread = null;
181187
}
182188
}
183189

0 commit comments

Comments
 (0)