Skip to content

Commit d75a46f

Browse files
author
Ian Craggs
committed
Update V5 client with issue 330 fix to match V3
1 parent 0a2f69f commit d75a46f

12 files changed

Lines changed: 613 additions & 443 deletions

File tree

org.eclipse.paho.client.mqttv3.test/src/test/java/org/eclipse/paho/client/mqttv3/test/connectionLoss/ConnectionLossTest.java

Lines changed: 219 additions & 209 deletions
Large diffs are not rendered by default.

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/ClientState.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -831,8 +831,8 @@ protected MqttWireMessage get() throws MqttException {
831831
// Handle the case where not connected. This should only be the case if:
832832
// - in the process of disconnecting / shutting down
833833
// - in the process of connecting
834-
if (!connected &&
835-
(pendingFlows.isEmpty() || !((MqttWireMessage)pendingFlows.elementAt(0) instanceof MqttConnect))) {
834+
if (pendingFlows == null || (!connected &&
835+
(pendingFlows.isEmpty() || !((MqttWireMessage)pendingFlows.elementAt(0) instanceof MqttConnect)))) {
836836
//@TRACE 621=no outstanding flows and not connected
837837
log.fine(CLASS_NAME,methodName,"621");
838838

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsCallback.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2009, 2016 IBM Corp.
2+
* Copyright (c) 2009, 2019 IBM Corp.
33
*
44
* All rights reserved. This program and the accompanying materials
55
* are made available under the terms of the Eclipse Public License v1.0
@@ -533,5 +533,4 @@ public boolean isQuiescing() {
533533
}
534534
return result;
535535
}
536-
537536
}

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttAsyncClient.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -579,9 +579,6 @@ public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence
579579
}
580580

581581
this.executorService = executorService;
582-
if (this.executorService == null) {
583-
this.executorService = Executors.newScheduledThreadPool(10);
584-
}
585582

586583
this.pingSender = pingSender;
587584
if (this.pingSender == null) {

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/TimerPingSender.java

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414

1515
package org.eclipse.paho.mqttv5.client;
1616

17+
import java.util.Timer;
18+
import java.util.TimerTask;
19+
1720
import java.util.concurrent.ScheduledExecutorService;
1821
import java.util.concurrent.ScheduledFuture;
1922
import java.util.concurrent.TimeUnit;
@@ -36,14 +39,12 @@ public class TimerPingSender implements MqttPingSender{
3639
private Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
3740

3841
private ClientComms comms;
39-
private ScheduledExecutorService executorService;
42+
private Timer timer;
43+
private ScheduledExecutorService executorService = null;
4044
private ScheduledFuture<?> scheduledFuture;
4145
private String clientid;
4246

4347
public TimerPingSender(ScheduledExecutorService executorService) {
44-
if (executorService == null) {
45-
throw new IllegalArgumentException("ExecutorService cannot be null.");
46-
}
4748
this.executorService = executorService;
4849
}
4950

@@ -61,21 +62,48 @@ public void start() {
6162

6263
//@Trace 659=start timer for client:{0}
6364
log.fine(CLASS_NAME, methodName, "659", new Object[]{ clientid });
64-
//Check ping after first keep alive interval.
65-
schedule(comms.getKeepAlive());
65+
if (executorService == null) {
66+
timer = new Timer("MQTT Ping: " + clientid);
67+
//Check ping after first keep alive interval.
68+
timer.schedule(new PingTask(), comms.getKeepAlive());
69+
} else {
70+
//Check ping after first keep alive interval.
71+
schedule(comms.getKeepAlive());
72+
}
6673
}
6774

6875
public void stop() {
6976
final String methodName = "stop";
7077
//@Trace 661=stop
7178
log.fine(CLASS_NAME, methodName, "661", null);
72-
if (scheduledFuture != null) {
73-
scheduledFuture.cancel(true);
79+
if (executorService == null) {
80+
if (timer != null){
81+
timer.cancel();
82+
}
83+
} else {
84+
if (scheduledFuture != null) {
85+
scheduledFuture.cancel(true);
86+
}
7487
}
7588
}
7689

7790
public void schedule(long delayInMilliseconds) {
78-
scheduledFuture = executorService.schedule(new PingRunnable(), delayInMilliseconds, TimeUnit.MILLISECONDS);
91+
if (executorService == null) {
92+
timer.schedule(new PingTask(), delayInMilliseconds);
93+
} else {
94+
scheduledFuture = executorService.schedule(new PingRunnable(), delayInMilliseconds, TimeUnit.MILLISECONDS);
95+
}
96+
}
97+
98+
private class PingTask extends TimerTask {
99+
private static final String methodName = "PingTask.run";
100+
101+
public void run() {
102+
Thread.currentThread().setName("MQTT Ping: " + clientid);
103+
//@Trace 660=Check schedule at {0}
104+
log.fine(CLASS_NAME, methodName, "660", new Object[]{ new Long(System.nanoTime()) });
105+
comms.checkForActivity();
106+
}
79107
}
80108

81109
private class PingRunnable implements Runnable {

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientComms.java

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.Properties;
2424
import java.util.Vector;
2525
import java.util.concurrent.ExecutorService;
26-
import java.util.concurrent.TimeUnit;
2726

2827
import org.eclipse.paho.mqttv5.client.BufferedMessage;
2928
import org.eclipse.paho.mqttv5.client.IMqttMessageListener;
@@ -38,7 +37,6 @@
3837
import org.eclipse.paho.mqttv5.client.MqttToken;
3938
import org.eclipse.paho.mqttv5.client.MqttTopic;
4039
import org.eclipse.paho.mqttv5.client.TimerPingSender;
41-
import org.eclipse.paho.mqttv5.client.alpha.IMqttAsyncClient;
4240
import org.eclipse.paho.mqttv5.client.logging.Logger;
4341
import org.eclipse.paho.mqttv5.client.logging.LoggerFactory;
4442
import org.eclipse.paho.mqttv5.common.MqttException;
@@ -129,22 +127,6 @@ CommsReceiver getReceiver() {
129127
return receiver;
130128
}
131129

132-
private void shutdownExecutorService() {
133-
String methodName = "shutdownExecutorService";
134-
executorService.shutdown();
135-
try {
136-
if (!executorService.awaitTermination(conOptions.getExecutorServiceTimeout(), TimeUnit.SECONDS)) {
137-
executorService.shutdownNow();
138-
if (!executorService.awaitTermination(conOptions.getExecutorServiceTimeout(), TimeUnit.SECONDS)) {
139-
log.fine(CLASS_NAME, methodName, "executorService did not terminate");
140-
}
141-
}
142-
} catch (InterruptedException ie) {
143-
executorService.shutdownNow();
144-
Thread.currentThread().interrupt();
145-
}
146-
}
147-
148130
/**
149131
* Sends a message to the server. Does not check if connected this validation
150132
* must be done by invoking routines.
@@ -279,7 +261,8 @@ public void close(boolean force) throws MqttException {
279261
}
280262

281263
conState = CLOSED;
282-
shutdownExecutorService();
264+
// Don't shut down an externally supplied executor service
265+
//shutdownExecutorService();
283266
// ShutdownConnection has already cleaned most things
284267
clientState.close();
285268
clientState = null;
@@ -495,7 +478,7 @@ public void shutdownConnection(MqttToken token, MqttException reason, MqttDiscon
495478
// it now. This is done at the end to allow a new connect
496479
// to be processed and now throw a currently disconnecting error.
497480
// any outstanding tokens and unblock any waiters
498-
if (endToken != null & callback != null) {
481+
if (endToken != null && callback != null) {
499482
callback.asyncOperationComplete(endToken);
500483
}
501484
if (wasConnected && callback != null) {
@@ -765,7 +748,11 @@ private class ConnectBG implements Runnable {
765748
}
766749

767750
void start() {
768-
executorService.execute(this);
751+
if (executorService == null) {
752+
new Thread(this).start();
753+
} else {
754+
executorService.execute(this);
755+
}
769756
}
770757

771758
public void run() {
@@ -830,8 +817,12 @@ private class DisconnectBG implements Runnable {
830817
}
831818

832819
void start() {
833-
threadName = "MQTT Disc: " + getClient().getClientId();
834-
executorService.execute(this);
820+
threadName = "MQTT Disc: "+getClient().getClientId();
821+
if (executorService == null) {
822+
new Thread(this).start();
823+
} else {
824+
executorService.execute(this);
825+
}
835826
}
836827

837828
public void run() {
@@ -844,10 +835,19 @@ public void run() {
844835
clientState.quiesce(quiesceTimeout);
845836
try {
846837
internalSend(disconnect, token);
847-
token.internalTok.waitUntilSent();
848-
} catch (MqttException ex) {
849-
} finally {
838+
// do not wait if the sender process is not running
839+
if (sender != null && sender.isRunning()) {
840+
token.internalTok.waitUntilSent();
841+
}
842+
}
843+
catch (MqttException ex) {
844+
}
845+
finally {
850846
token.internalTok.markComplete(null, null);
847+
if (sender == null || !sender.isRunning()) {
848+
// if the sender process is not running
849+
token.internalTok.notifyComplete();
850+
}
851851
shutdownConnection(token, null, null);
852852
}
853853
}
@@ -933,7 +933,11 @@ public void notifyReconnect() {
933933
log.fine(CLASS_NAME, methodName, "509");
934934

935935
disconnectedMessageBuffer.setPublishCallback(new ReconnectDisconnectedBufferCallback(methodName));
936-
executorService.execute(disconnectedMessageBuffer);
936+
if (executorService == null) {
937+
new Thread(disconnectedMessageBuffer).start();
938+
} else {
939+
executorService.execute(disconnectedMessageBuffer);
940+
}
937941
}
938942
}
939943

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientState.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -842,8 +842,8 @@ protected MqttWireMessage get() throws MqttException {
842842
// Handle the case where not connected. This should only be the case if:
843843
// - in the process of disconnecting / shutting down
844844
// - in the process of connecting
845-
if (!connected && (pendingFlows.isEmpty()
846-
|| !((MqttWireMessage) pendingFlows.elementAt(0) instanceof MqttConnect))) {
845+
if (pendingFlows == null || (!connected && (pendingFlows.isEmpty()
846+
|| !((MqttWireMessage) pendingFlows.elementAt(0) instanceof MqttConnect)))) {
847847
// @TRACE 621=no outstanding flows and not connected
848848
log.fine(CLASS_NAME, methodName, "621");
849849

@@ -886,8 +886,8 @@ protected MqttWireMessage get() throws MqttException {
886886
log.fine(CLASS_NAME, methodName, "622");
887887
}
888888
}
889-
}
890-
}
889+
} // end while
890+
} // synchronized
891891
return result;
892892
}
893893

@@ -920,6 +920,7 @@ protected void notifySent(MqttWireMessage message) {
920920
log.fine(CLASS_NAME, methodName, "625", new Object[] { message.getKey() });
921921

922922
MqttToken token = tokenStore.getToken(message);
923+
if (token == null) return;
923924
token.internalTok.notifySent();
924925
if (message instanceof MqttPingReq) {
925926
synchronized (pingOutstandingLock) {

0 commit comments

Comments
 (0)