Skip to content

Commit 26523a1

Browse files
Logic changed
Signed-off-by: Pavel Anikeichyk <panikeichyk@kaaiot.io>
1 parent b82a843 commit 26523a1

3 files changed

Lines changed: 39 additions & 15 deletions

File tree

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsCallback.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Vector;
2424
import java.util.concurrent.ExecutorService;
2525
import java.util.concurrent.Future;
26+
import java.util.concurrent.atomic.AtomicInteger;
2627

2728
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
2829
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
@@ -40,6 +41,8 @@
4041
import org.eclipse.paho.client.mqttv3.logging.Logger;
4142
import org.eclipse.paho.client.mqttv3.logging.LoggerFactory;
4243

44+
import static org.eclipse.paho.client.mqttv3.internal.CommsSender.MAX_STOPPED_STATE_TO_STOP_THREAD;
45+
4346
/**
4447
* Bridge between Receiver and the external API. This class gets called by
4548
* Receiver, and then converts the comms-centric MQTT message objects into ones
@@ -57,10 +60,10 @@ public class CommsCallback implements Runnable {
5760
private final Vector<MqttWireMessage> messageQueue;
5861
private final Vector<MqttToken> completeQueue;
5962

60-
private enum State {READY, RUNNING, QUIESCING, STOPPED}
63+
private enum State {STOPPED, RUNNING, QUIESCING}
6164

62-
private State current_state = State.READY;
63-
private State target_state = State.READY;
65+
private State current_state = State.STOPPED;
66+
private State target_state = State.STOPPED;
6467
private final Object lifecycle = new Object();
6568
private Thread callbackThread;
6669
private String threadName;
@@ -92,7 +95,7 @@ public void start(String threadName, ExecutorService executorService) {
9295
this.threadName = threadName;
9396

9497
synchronized (lifecycle) {
95-
if (current_state == State.READY) {
98+
if (current_state == State.STOPPED) {
9699
// Preparatory work before starting the background thread.
97100
// For safety ensure any old events are cleared.
98101
messageQueue.clear();
@@ -106,10 +109,15 @@ public void start(String threadName, ExecutorService executorService) {
106109
}
107110
}
108111
}
112+
AtomicInteger stoppedStateCounter = new AtomicInteger(0);
109113
while (!isRunning()) {
110114
try { Thread.sleep(100); } catch (Exception e) { }
111115
if (current_state == State.STOPPED) {
112-
break;
116+
if (stoppedStateCounter.incrementAndGet() > MAX_STOPPED_STATE_TO_STOP_THREAD) {
117+
break;
118+
}
119+
} else {
120+
stoppedStateCounter.set(0);
113121
}
114122
}
115123
}

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsReceiver.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.io.InputStream;
2020
import java.util.concurrent.ExecutorService;
2121
import java.util.concurrent.Future;
22+
import java.util.concurrent.atomic.AtomicInteger;
2223

2324
import org.eclipse.paho.client.mqttv3.MqttException;
2425
import org.eclipse.paho.client.mqttv3.MqttToken;
@@ -31,17 +32,19 @@
3132
import org.eclipse.paho.client.mqttv3.logging.Logger;
3233
import org.eclipse.paho.client.mqttv3.logging.LoggerFactory;
3334

35+
import static org.eclipse.paho.client.mqttv3.internal.CommsSender.MAX_STOPPED_STATE_TO_STOP_THREAD;
36+
3437
/**
3538
* Receives MQTT packets from the server.
3639
*/
3740
public class CommsReceiver implements Runnable {
3841
private static final String CLASS_NAME = CommsReceiver.class.getName();
3942
private Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
4043

41-
private enum State {READY, RUNNING, STARTING, RECEIVING, STOPPED}
44+
private enum State {STOPPED, RUNNING, STARTING, RECEIVING}
4245

43-
private State current_state = State.READY;
44-
private State target_state = State.READY;
46+
private State current_state = State.STOPPED;
47+
private State target_state = State.STOPPED;
4548
private final Object lifecycle = new Object();
4649
private String threadName;
4750
private Future<?> receiverFuture;
@@ -71,7 +74,7 @@ public void start(String threadName, ExecutorService executorService) {
7174
//@TRACE 855=starting
7275
log.fine(CLASS_NAME,methodName, "855");
7376
synchronized (lifecycle) {
74-
if (current_state == State.READY && target_state == State.READY) {
77+
if (current_state == State.STOPPED && target_state == State.STOPPED) {
7578
target_state = State.RUNNING;
7679
if (executorService == null) {
7780
new Thread(this).start();
@@ -80,10 +83,15 @@ public void start(String threadName, ExecutorService executorService) {
8083
}
8184
}
8285
}
86+
AtomicInteger stoppedStateCounter = new AtomicInteger(0);
8387
while (!isRunning()) {
8488
try { Thread.sleep(100); } catch (Exception e) { }
8589
if (current_state == State.STOPPED) {
86-
break;
90+
if (stoppedStateCounter.incrementAndGet() > MAX_STOPPED_STATE_TO_STOP_THREAD) {
91+
break;
92+
}
93+
} else {
94+
stoppedStateCounter.set(0);
8795
}
8896
}
8997
}

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsSender.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.io.OutputStream;
2020
import java.util.concurrent.ExecutorService;
2121
import java.util.concurrent.Future;
22+
import java.util.concurrent.atomic.AtomicInteger;
2223

2324
import org.eclipse.paho.client.mqttv3.MqttException;
2425
import org.eclipse.paho.client.mqttv3.MqttToken;
@@ -34,11 +35,13 @@ public class CommsSender implements Runnable {
3435
private static final String CLASS_NAME = CommsSender.class.getName();
3536
private Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
3637

38+
public static final int MAX_STOPPED_STATE_TO_STOP_THREAD = 300; // 30 seconds
39+
3740
//Sends MQTT packets to the server on its own thread
38-
private enum State {READY, RUNNING, STARTING, STOPPED}
41+
private enum State {STOPPED, RUNNING, STARTING}
3942

40-
private State current_state = State.READY;
41-
private State target_state = State.READY;
43+
private State current_state = State.STOPPED;
44+
private State target_state = State.STOPPED;
4245
private final Object lifecycle = new Object();
4346
private Thread sendThread = null;
4447
private String threadName;
@@ -66,7 +69,7 @@ public CommsSender(ClientComms clientComms, ClientState clientState, CommsTokenS
6669
public void start(String threadName, ExecutorService executorService) {
6770
this.threadName = threadName;
6871
synchronized (lifecycle) {
69-
if (current_state == State.READY && target_state == State.READY) {
72+
if (current_state == State.STOPPED && target_state == State.STOPPED) {
7073
target_state = State.RUNNING;
7174
if (executorService == null) {
7275
new Thread(this).start();
@@ -75,10 +78,15 @@ public void start(String threadName, ExecutorService executorService) {
7578
}
7679
}
7780
}
81+
AtomicInteger stoppedStateCounter = new AtomicInteger(0);
7882
while (!isRunning()) {
7983
try { Thread.sleep(100); } catch (Exception e) { }
8084
if (current_state == State.STOPPED) {
81-
break;
85+
if (stoppedStateCounter.incrementAndGet() > MAX_STOPPED_STATE_TO_STOP_THREAD) {
86+
break;
87+
}
88+
} else {
89+
stoppedStateCounter.set(0);
8290
}
8391
}
8492
}

0 commit comments

Comments
 (0)