Skip to content

Commit 576455e

Browse files
author
Ian Craggs
committed
Stop threads after disconnect when no executor service #402
1 parent c426b53 commit 576455e

5 files changed

Lines changed: 143 additions & 11 deletions

File tree

org.eclipse.paho.client.mqttv3.test/src/test/java/org/eclipse/paho/client/mqttv3/test/BasicTest.java

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616
import java.net.URI;
1717
import java.util.ArrayList;
1818
import java.util.UUID;
19+
import java.util.concurrent.Executors;
1920
import java.util.logging.Level;
2021
import java.util.logging.Logger;
2122

2223
import org.eclipse.paho.client.mqttv3.IMqttClient;
2324
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
2425
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
2526
import org.eclipse.paho.client.mqttv3.MqttCallback;
27+
import org.eclipse.paho.client.mqttv3.MqttClient;
2628
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
2729
import org.eclipse.paho.client.mqttv3.MqttException;
2830
import org.eclipse.paho.client.mqttv3.MqttMessage;
@@ -352,6 +354,119 @@ public void test330() throws Exception {
352354
client.close();
353355
}
354356
}
357+
358+
359+
@Test
360+
public void test402() throws Exception {
361+
String methodName = Utility.getMethodName();
362+
LoggingUtilities.banner(log, cclass, methodName);
363+
364+
IMqttClient client = null;
365+
int before_thread_count = Thread.activeCount();
366+
try {
367+
String clientId = methodName;
368+
client = clientFactory.createMqttClient(serverURI, clientId);
369+
370+
log.info("Connecting...(serverURI:" + serverURI + ", ClientId:" + clientId);
371+
client.connect();
372+
373+
String clientId2 = client.getClientId();
374+
log.info("clientId = " + clientId2);
375+
376+
boolean isConnected = client.isConnected();
377+
log.info("isConnected = " + isConnected);
378+
379+
String id = client.getServerURI();
380+
log.info("ServerURI = " + id);
381+
382+
log.info("Disconnecting...");
383+
client.disconnect();
384+
385+
log.info("Re-Connecting...");
386+
client.connect();
387+
388+
log.info("Disconnecting...");
389+
client.disconnect();
390+
391+
int after_count = Thread.activeCount();
392+
Thread[] tarray = new Thread[after_count];
393+
after_count = Thread.enumerate(tarray);
394+
for (int i = 0; i < after_count; ++i) {
395+
log.info(i + " " + tarray[i].getName());
396+
}
397+
Assert.assertEquals(before_thread_count, after_count);
398+
}
399+
catch (MqttException exception) {
400+
log.log(Level.SEVERE, "caught exception:", exception);
401+
Assert.fail("Unexpected exception: " + exception);
402+
}
403+
finally {
404+
if (client != null) {
405+
log.info("Close...");
406+
client.close();
407+
}
408+
}
409+
int after_count = Thread.activeCount();
410+
Thread[] tarray = new Thread[after_count];
411+
after_count = Thread.enumerate(tarray);
412+
for (int i = 0; i < after_count; ++i) {
413+
log.info(i + " " + tarray[i].getName());
414+
}
415+
Assert.assertEquals(before_thread_count, after_count);
416+
}
417+
418+
419+
@Test
420+
public void test402a() throws Exception {
421+
String methodName = Utility.getMethodName();
422+
LoggingUtilities.banner(log, cclass, methodName);
423+
424+
IMqttClient client = null;
425+
int before_thread_count = Thread.activeCount();
426+
final int pool_size = 10;
427+
try {
428+
String clientId = methodName;
429+
client = new MqttClient(serverURI.toString(), clientId, null, Executors.newScheduledThreadPool(pool_size));
430+
431+
log.info("Connecting...(serverURI:" + serverURI + ", ClientId:" + clientId);
432+
client.connect();
433+
434+
String clientId2 = client.getClientId();
435+
log.info("clientId = " + clientId2);
436+
437+
boolean isConnected = client.isConnected();
438+
log.info("isConnected = " + isConnected);
439+
440+
String id = client.getServerURI();
441+
log.info("ServerURI = " + id);
442+
443+
log.info("Disconnecting...");
444+
client.disconnect();
445+
446+
log.info("Re-Connecting...");
447+
client.connect();
448+
449+
log.info("Disconnecting...");
450+
client.disconnect();
451+
}
452+
catch (MqttException exception) {
453+
log.log(Level.SEVERE, "caught exception:", exception);
454+
Assert.fail("Unexpected exception: " + exception);
455+
}
456+
finally {
457+
if (client != null) {
458+
log.info("Close...");
459+
client.close();
460+
}
461+
}
462+
int after_count = Thread.activeCount();
463+
Thread[] tarray = new Thread[after_count];
464+
after_count = Thread.enumerate(tarray);
465+
for (int i = 0; i < after_count; ++i) {
466+
log.info(i + " " + tarray[i].getName());
467+
}
468+
Assert.assertEquals(before_thread_count, after_count - pool_size);
469+
}
355470

356471

357472
// -------------------------------------------------------------

org.eclipse.paho.client.mqttv3/src/main/java-templates/org/eclipse/paho/client/mqttv3/internal/ClientComms.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,8 @@ public void close(boolean force) throws MqttException {
242242
}
243243

244244
conState = CLOSED;
245-
shutdownExecutorService();
245+
// Don't shut down an externally supplied executor service
246+
//shutdownExecutorService();
246247
// ShutdownConnection has already cleaned most things
247248
clientState.close();
248249
clientState = null;
@@ -688,7 +689,11 @@ private class ConnectBG implements Runnable {
688689
}
689690

690691
void start() {
691-
executorService.execute(this);
692+
if (executorService == null) {
693+
new Thread(this).start();
694+
} else {
695+
executorService.execute(this);
696+
}
692697
}
693698

694699
public void run() {
@@ -753,7 +758,11 @@ private class DisconnectBG implements Runnable {
753758

754759
void start() {
755760
threadName = "MQTT Disc: "+getClient().getClientId();
756-
executorService.execute(this);
761+
if (executorService == null) {
762+
new Thread(this).start();
763+
} else {
764+
executorService.execute(this);
765+
}
757766
}
758767

759768
public void run() {
@@ -865,7 +874,11 @@ public void notifyConnect() {
865874
log.fine(CLASS_NAME, methodName, "509", null);
866875

867876
disconnectedMessageBuffer.setPublishCallback(new ReconnectDisconnectedBufferCallback(methodName));
868-
executorService.execute(disconnectedMessageBuffer);
877+
if (executorService == null) {
878+
new Thread(disconnectedMessageBuffer).start();
879+
} else {
880+
executorService.execute(disconnectedMessageBuffer);
881+
}
869882
}
870883
}
871884

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -421,8 +421,7 @@ public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence
421421
* @param pingSender
422422
* Custom {@link MqttPingSender} implementation.
423423
* @param executorService
424-
* used for managing threads. If null then a newScheduledThreadPool
425-
* is used.
424+
* used for managing threads. If null no executor service is used.
426425
* @throws IllegalArgumentException
427426
* if the URI does not start with "tcp://", "ssl://" or
428427
* "local://"
@@ -463,9 +462,6 @@ public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence
463462
}
464463

465464
this.executorService = executorService;
466-
if (this.executorService == null) {
467-
this.executorService = Executors.newScheduledThreadPool(10);
468-
}
469465

470466
// @TRACE 101=<init> ClientID={0} ServerURI={1} PersistenceType={2}
471467
log.fine(CLASS_NAME, methodName, "101", new Object[] { clientId, serverURI, persistence });

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsCallback.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,11 @@ public void start(String threadName, ExecutorService executorService) {
9696

9797
running = true;
9898
quiescing = false;
99-
callbackFuture = executorService.submit(this);
99+
if (executorService == null) {
100+
new Thread(this).start();
101+
} else {
102+
callbackFuture = executorService.submit(this);
103+
}
100104
}
101105
}
102106
}

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsReceiver.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,11 @@ public void start(String threadName, ExecutorService executorService) {
7474
synchronized (lifecycle) {
7575
if (!running) {
7676
running = true;
77-
receiverFuture = executorService.submit(this);
77+
if (executorService == null) {
78+
new Thread(this).start();
79+
} else {
80+
receiverFuture = executorService.submit(this);
81+
}
7882
}
7983
}
8084
}

0 commit comments

Comments
 (0)