Skip to content

Commit 46edc58

Browse files
david-katzjpwsutton
authored andcommitted
Use ScheduledExecutor for the PingSender (#364)
The use of Timer, which handles its own thread, was causing exceptions when used in a JEE context when the heart beat was missed: the reconnect, which was being executed from the heart beat’s Timer thread, was trying to submit to the ManagedExecutorService, which is not allowed. Signed-off-by: David Katz <David.Katz@bmw-carit.de> Change-Id: I8aa89bcdaf85db6fa03c39f7b19b2e6398431301 Signed-off-by: David Katz <David.Katz@bmw-carit.de>
1 parent a459860 commit 46edc58

3 files changed

Lines changed: 100 additions & 8 deletions

File tree

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727
import java.util.Properties;
2828
import java.util.Timer;
2929
import java.util.TimerTask;
30-
import java.util.concurrent.ExecutorService;
3130
import java.util.concurrent.Executors;
31+
import java.util.concurrent.ScheduledExecutorService;
3232

3333
import javax.net.SocketFactory;
3434
import javax.net.ssl.SSLSocketFactory;
@@ -112,7 +112,7 @@ public class MqttAsyncClient implements IMqttAsyncClient {
112112

113113

114114

115-
private ExecutorService executorService;
115+
private ScheduledExecutorService executorService;
116116

117117
/**
118118
* Create an MqttAsyncClient that is used to communicate with an MQTT server.
@@ -366,8 +366,8 @@ public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence
366366
* @throws IllegalArgumentException if the clientId is null or is greater than 65535 characters in length
367367
* @throws MqttException if any other problem was encountered
368368
*/
369-
public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence persistence, MqttPingSender pingSender, ExecutorService executorService) throws MqttException {
370-
final String methodName = "MqttAsyncClient";
369+
public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence persistence, MqttPingSender pingSender, ScheduledExecutorService executorService) throws MqttException {
370+
final String methodName = "MqttAsyncClient";
371371

372372
log.setResourceName(clientId);
373373

@@ -397,7 +397,7 @@ public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence
397397

398398
this.executorService = executorService;
399399
if (this.executorService == null) {
400-
this.executorService = Executors.newFixedThreadPool(10);
400+
this.executorService = Executors.newScheduledThreadPool(10);
401401
}
402402

403403
// @TRACE 101=<init> ClientID={0} ServerURI={1} PersistenceType={2}

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.eclipse.paho.client.mqttv3;
2020

2121
import java.util.Properties;
22-
import java.util.concurrent.ExecutorService;
22+
import java.util.concurrent.ScheduledExecutorService;
2323

2424
import javax.net.SocketFactory;
2525

@@ -315,8 +315,8 @@ public MqttClient(String serverURI, String clientId, MqttClientPersistence persi
315315
* @throws IllegalArgumentException if the clientId is null or is greater than 65535 characters in length
316316
* @throws MqttException if any other problem was encountered
317317
*/
318-
public MqttClient(String serverURI, String clientId, MqttClientPersistence persistence, ExecutorService executorService) throws MqttException {
319-
aClient = new MqttAsyncClient(serverURI, clientId, persistence, new TimerPingSender(), executorService);
318+
public MqttClient(String serverURI, String clientId, MqttClientPersistence persistence, ScheduledExecutorService executorService) throws MqttException {
319+
aClient = new MqttAsyncClient(serverURI, clientId, persistence, new ScheduledExecutorPingSender(executorService), executorService);
320320
}
321321

322322
/*
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2014 IBM Corp.
3+
* Copyright (c) 2017 BMW Car IT GmbH.
4+
*
5+
* All rights reserved. This program and the accompanying materials
6+
* are made available under the terms of the Eclipse Public License v1.0
7+
* and Eclipse Distribution License v1.0 which accompany this distribution.
8+
*
9+
* The Eclipse Public License is available at
10+
* http://www.eclipse.org/legal/epl-v10.html
11+
* and the Eclipse Distribution License is available at
12+
* http://www.eclipse.org/org/documents/edl-v10.php.
13+
*/
14+
15+
package org.eclipse.paho.client.mqttv3;
16+
17+
import java.util.concurrent.ScheduledExecutorService;
18+
import java.util.concurrent.ScheduledFuture;
19+
import java.util.concurrent.TimeUnit;
20+
21+
import org.eclipse.paho.client.mqttv3.internal.ClientComms;
22+
import org.eclipse.paho.client.mqttv3.logging.Logger;
23+
import org.eclipse.paho.client.mqttv3.logging.LoggerFactory;
24+
25+
/**
26+
* Default ping sender implementation
27+
*
28+
* <p>This class implements the {@link IMqttPingSender} pinger interface
29+
* allowing applications to send ping packet to server every keep alive interval.
30+
* </p>
31+
*
32+
* @see MqttPingSender
33+
*/
34+
public class ScheduledExecutorPingSender implements MqttPingSender {
35+
private static final String CLASS_NAME = ScheduledExecutorPingSender.class.getName();
36+
private static final Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
37+
38+
private ClientComms comms;
39+
private ScheduledExecutorService executorService;
40+
private ScheduledFuture scheduledFuture;
41+
private String clientid;
42+
43+
public ScheduledExecutorPingSender(ScheduledExecutorService executorService) {
44+
if (executorService == null) {
45+
throw new IllegalArgumentException("ExecutorService cannot be null.");
46+
}
47+
this.executorService = executorService;
48+
}
49+
50+
public void init(ClientComms comms) {
51+
if (comms == null) {
52+
throw new IllegalArgumentException("ClientComms cannot be null.");
53+
}
54+
this.comms = comms;
55+
clientid = comms.getClient().getClientId();
56+
}
57+
58+
public void start() {
59+
final String methodName = "start";
60+
61+
//@Trace 659=start timer for client:{0}
62+
log.fine(CLASS_NAME, methodName, "659", new Object[]{ clientid });
63+
//Check ping after first keep alive interval.
64+
schedule(comms.getKeepAlive());
65+
}
66+
67+
public void stop() {
68+
final String methodName = "stop";
69+
//@Trace 661=stop
70+
log.fine(CLASS_NAME, methodName, "661", null);
71+
if (scheduledFuture != null) {
72+
scheduledFuture.cancel(true);
73+
}
74+
}
75+
76+
public void schedule(long delayInMilliseconds) {
77+
scheduledFuture = executorService.schedule(new PingRunnable(), delayInMilliseconds, TimeUnit.MILLISECONDS);
78+
}
79+
80+
private class PingRunnable implements Runnable {
81+
private static final String methodName = "PingTask.run";
82+
83+
public void run() {
84+
String originalThreadName = Thread.currentThread().getName();
85+
Thread.currentThread().setName("MQTT Ping: " + clientid);
86+
//@Trace 660=Check schedule at {0}
87+
log.fine(CLASS_NAME, methodName, "660", new Object[]{ new Long(System.currentTimeMillis()) });
88+
comms.checkForActivity();
89+
Thread.currentThread().setName(originalThreadName);
90+
}
91+
}
92+
}

0 commit comments

Comments
 (0)