Skip to content

Commit dd4ce54

Browse files
committed
First Pass at Basic Unit Tests for Automatic Reconnect
Signed-off-by: James Sutton <james.sutton@uk.ibm.com>
1 parent 8d7c328 commit dd4ce54

4 files changed

Lines changed: 332 additions & 1 deletion

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package org.eclipse.paho.client.mqttv3.test.automaticReconnect;
2+
3+
import java.net.URI;
4+
import java.util.logging.Level;
5+
import java.util.logging.Logger;
6+
7+
import org.eclipse.paho.client.mqttv3.MqttClient;
8+
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
9+
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
10+
import org.eclipse.paho.client.mqttv3.test.logging.LoggingUtilities;
11+
import org.eclipse.paho.client.mqttv3.test.properties.TestProperties;
12+
import org.eclipse.paho.client.mqttv3.test.utilities.ConnectionManipulationProxyServer;
13+
import org.eclipse.paho.client.mqttv3.test.utilities.Utility;
14+
import org.junit.BeforeClass;
15+
import org.junit.Test;
16+
import org.junit.AfterClass;
17+
import org.junit.Assert;
18+
19+
public class AutomaticReconnectTest{
20+
21+
static final Class<?> cclass = AutomaticReconnectTest.class;
22+
private static final String className = cclass.getName();
23+
private static final Logger log = Logger.getLogger(className);
24+
25+
private static final MemoryPersistence DATA_STORE = new MemoryPersistence();
26+
27+
28+
private static URI serverURI;
29+
private String clientId = "device-client-id";
30+
static ConnectionManipulationProxyServer proxy;
31+
32+
33+
@BeforeClass
34+
public static void setUpBeforeClass() throws Exception{
35+
try {
36+
String methodName = Utility.getMethodName();
37+
LoggingUtilities.banner(log, cclass, methodName);
38+
serverURI = TestProperties.getServerURI();
39+
proxy = new ConnectionManipulationProxyServer(serverURI.getHost(), serverURI.getPort(), 4242);
40+
proxy.startProxy();
41+
} catch (Exception exception) {
42+
log.log(Level.SEVERE, "caught exception:", exception);
43+
throw exception;
44+
}
45+
46+
}
47+
48+
@AfterClass
49+
public static void tearDownAfterClass() throws Exception {
50+
log.info("Tests finished, stopping proxy");
51+
proxy.stopProxy();
52+
53+
}
54+
55+
/**
56+
* Tests that if a connection is opened and then is lost that the client automatically reconnects
57+
* @throws Exception
58+
*/
59+
@Test
60+
public void testAutomaticReconnectAfterDisconnect() throws Exception{
61+
String methodName = Utility.getMethodName();
62+
LoggingUtilities.banner(log, cclass, methodName);
63+
MqttConnectOptions options = new MqttConnectOptions();
64+
options.setCleanSession(true);
65+
options.setAutomaticReconnect(true);
66+
final MqttClient client = new MqttClient("tcp://localhost:4242", clientId, DATA_STORE);
67+
68+
proxy.enableProxy();
69+
client.connect(options);
70+
71+
boolean isConnected = client.isConnected();
72+
log.info("First Connection isConnected: " + isConnected);
73+
Assert.assertTrue(isConnected);
74+
75+
proxy.disableProxy();
76+
77+
// Give it a second to close everything down
78+
Thread.sleep(100);
79+
isConnected = client.isConnected();
80+
log.info("Proxy Disconnect isConnected: " + isConnected);
81+
Assert.assertFalse(isConnected);
82+
83+
proxy.enableProxy();
84+
// give it some time to reconnect
85+
long currentTime = System.currentTimeMillis();
86+
int timeout = 16000;
87+
while(client.isConnected() == false){
88+
long now = System.currentTimeMillis();
89+
if((currentTime + timeout) < now){
90+
log.warning("Timeout Exceeded");
91+
break;
92+
}
93+
Thread.sleep(500);
94+
}
95+
isConnected = client.isConnected();
96+
log.info("Proxy Re-Enabled isConnected: " + isConnected);
97+
Assert.assertTrue(isConnected);
98+
client.disconnect();
99+
Assert.assertFalse(client.isConnected());
100+
}
101+
102+
/**
103+
* Tests that if a connection is opened and lost, that when the user calls reconnect() that the
104+
* client will attempt to reconnect straight away
105+
*/
106+
@Test
107+
public void testManualReconnectAfterDisconnect() throws Exception {
108+
String methodName = Utility.getMethodName();
109+
LoggingUtilities.banner(log, cclass, methodName);
110+
MqttConnectOptions options = new MqttConnectOptions();
111+
options.setCleanSession(true);
112+
options.setAutomaticReconnect(true);
113+
114+
final MqttClient client = new MqttClient("tcp://localhost:4242", clientId, DATA_STORE);
115+
116+
proxy.enableProxy();
117+
client.connect(options);
118+
119+
boolean isConnected = client.isConnected();
120+
log.info("First Connection isConnected: " + isConnected);
121+
Assert.assertTrue(isConnected);
122+
123+
proxy.disableProxy();
124+
125+
// Give it a second to close everything down
126+
Thread.sleep(100);
127+
isConnected = client.isConnected();
128+
log.info("Proxy Disconnect isConnected: " + isConnected);
129+
Assert.assertFalse(isConnected);
130+
131+
proxy.enableProxy();
132+
client.reconnect();
133+
// give it some time to reconnect
134+
Thread.sleep(5000);
135+
isConnected = client.isConnected();
136+
log.info("Proxy Re-Enabled isConnected: " + isConnected);
137+
Assert.assertTrue(isConnected);
138+
client.disconnect();
139+
Assert.assertFalse(client.isConnected());
140+
}
141+
142+
143+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
package org.eclipse.paho.client.mqttv3.test.utilities;
2+
3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
import java.io.OutputStream;
6+
import java.net.ServerSocket;
7+
import java.net.Socket;
8+
import java.util.logging.Logger;
9+
10+
import org.eclipse.paho.client.mqttv3.test.automaticReconnect.AutomaticReconnectTest;
11+
12+
public class ConnectionManipulationProxyServer implements Runnable {
13+
static final Class<?> cclass = AutomaticReconnectTest.class;
14+
private static final String className = cclass.getName();
15+
private static final Logger log = Logger.getLogger(className);
16+
private int localPort;
17+
private String host;
18+
private int remotePort;
19+
private Thread proxyThread;
20+
private Object enableLock = new Object();
21+
private boolean enableProxy = true;
22+
private boolean running = true;
23+
Socket client = null, server = null;
24+
25+
public ConnectionManipulationProxyServer(String host, int remotePort, int localPort) {
26+
this.localPort = localPort;
27+
this.remotePort = remotePort;
28+
this.host = host;
29+
proxyThread = new Thread(this);
30+
31+
}
32+
33+
public void startProxy(){
34+
synchronized (enableLock) {
35+
enableProxy = true;
36+
}
37+
running = true;
38+
proxyThread.start();
39+
}
40+
41+
public void enableProxy(){
42+
synchronized (enableLock) {
43+
enableProxy = true;
44+
}
45+
running = true;
46+
if(proxyThread.isAlive() == false){
47+
proxyThread.start();
48+
}
49+
}
50+
51+
public void disableProxy(){
52+
synchronized (enableLock) {
53+
enableProxy = false;
54+
}
55+
try {
56+
client.close();
57+
server.close();
58+
} catch (IOException e) {
59+
}
60+
}
61+
62+
public void stopProxy(){
63+
synchronized (enableLock) {
64+
enableProxy = false;
65+
}
66+
running = false;
67+
try {
68+
client.close();
69+
server.close();
70+
} catch (IOException e) {
71+
}
72+
}
73+
74+
@Override
75+
public void run() {
76+
try {
77+
//Create the Listening Server
78+
ServerSocket serverSocket = new ServerSocket(localPort);
79+
final byte[] request = new byte[1024];
80+
byte[] reply = new byte[4096];
81+
boolean canIrun = true;
82+
while(running){
83+
synchronized (enableLock) {
84+
canIrun = enableProxy;
85+
}
86+
while(canIrun){
87+
log.fine("Waiting for incoming connection");
88+
89+
try {
90+
// Wait for a connection on the local Port
91+
client = serverSocket.accept();
92+
log.fine("Proxy: Client Opened Connection to Proxy...");
93+
94+
final InputStream streamFromClient = client.getInputStream();
95+
final OutputStream streamToClient = client.getOutputStream();
96+
97+
// Attempt to make a connection to the real server
98+
try {
99+
server = new Socket(host, remotePort);
100+
} catch (IOException ex){
101+
log.warning("ConnectionManipulationProxyServer cannot connect to " + host + ":" + remotePort);
102+
client.close();
103+
continue;
104+
}
105+
log.fine("Proxy: Proxy Connected to Server");
106+
107+
// Get Server Streams
108+
final InputStream streamFromServer = server.getInputStream();
109+
final OutputStream streamToServer = server.getOutputStream();
110+
111+
Thread thread = new Thread() {
112+
public void run() {
113+
int bytesRead;
114+
try {
115+
while((bytesRead = streamFromClient.read(request)) != -1) {
116+
streamToServer.write(request, 0, bytesRead);
117+
streamToServer.flush();
118+
}
119+
} catch (IOException ex){
120+
//log.warning("Proxy: 1 Connection lost: " + ex.getMessage());
121+
try {
122+
client.close();
123+
server.close();
124+
} catch (IOException e) {
125+
126+
}
127+
128+
}
129+
}
130+
};
131+
132+
thread.start();
133+
134+
// Read the Servers responses and pass them back to the client
135+
int bytesRead;
136+
try {
137+
while ((bytesRead = streamFromServer.read(reply))!= -1){
138+
streamToClient.write(reply, 0, bytesRead);
139+
streamToClient.flush();
140+
}
141+
142+
} catch (IOException ex){
143+
//log.warning("Proxy: 2 Connection lost: " + ex.getMessage());
144+
client.close();
145+
server.close();
146+
}
147+
148+
streamToClient.close();
149+
150+
151+
} catch (IOException ex) {
152+
//log.warning("Proxy: 3 Connection lost: " + ex.getMessage());
153+
} finally {
154+
try {
155+
if(server != null){
156+
server.close();
157+
}
158+
if(client != null){
159+
client.close();
160+
}
161+
} catch(IOException ex) {
162+
//log.warning("Proxy: 4 Connection lost: " + ex.getMessage());
163+
}
164+
}
165+
166+
167+
168+
}
169+
}
170+
log.fine("Proxy: Proxy Thread finishing..");
171+
serverSocket.close();
172+
} catch(IOException ex) {
173+
log.warning("Proxy: 5 Thread Connection lost: " + ex.getMessage());
174+
ex.printStackTrace();
175+
}
176+
177+
178+
}
179+
}

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1047,6 +1047,9 @@ public void reconnect() throws MqttException {
10471047
if (comms.isClosed()) {
10481048
throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED);
10491049
}
1050+
// We don't want to spam the server
1051+
stopReconnectCycle();
1052+
10501053
attemptReconnect();
10511054
}
10521055

@@ -1071,7 +1074,7 @@ private void attemptReconnect(){
10711074
public void onSuccess(IMqttToken asyncActionToken) {
10721075
//@Trace 501=Automatic Reconnect Successful: {0}
10731076
log.fine(CLASS_NAME, methodName, "501", new Object[]{asyncActionToken.getClient().getClientId()});
1074-
comms.setRestingState(true);
1077+
comms.setRestingState(false);
10751078
stopReconnectCycle();
10761079
}
10771080

@@ -1109,6 +1112,8 @@ private void stopReconnectCycle(){
11091112
//@Trace 504=Stop reconnect timer for client: {0}
11101113
log.fine(CLASS_NAME, methodName, "504", new Object[]{this.clientId});
11111114
reconnectTimer.cancel();
1115+
reconnectDelay = 1000; // Reset Delay Timer
1116+
11121117
}
11131118

11141119
private void rescheduleReconnectCycle(int delay){

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,10 @@ public void messageArrivedComplete(int messageId, int qos) throws MqttException
519519
public static String generateClientId() {
520520
return MqttAsyncClient.generateClientId();
521521
}
522+
523+
public void reconnect() throws MqttException {
524+
aClient.reconnect();
525+
}
522526

523527
/**
524528
* Return a debug object that can be used to help solve problems.

0 commit comments

Comments
 (0)