Skip to content

Commit a6887fd

Browse files
committed
Merge pull request #183 from jpwsutton/automatic-reconnect
Automatic reconnect & Offline Buffering functionality
2 parents 8abd71c + d0f85f5 commit a6887fd

19 files changed

Lines changed: 1773 additions & 19 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
package org.eclipse.paho.client.mqttv3.test.automaticReconnect;
2+
3+
import java.net.URI;
4+
import java.util.logging.Level;
5+
import java.util.logging.Logger;
6+
7+
import org.eclipse.paho.client.mqttv3.MqttClient;
8+
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
9+
import org.eclipse.paho.client.mqttv3.MqttException;
10+
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
11+
import org.eclipse.paho.client.mqttv3.test.logging.LoggingUtilities;
12+
import org.eclipse.paho.client.mqttv3.test.properties.TestProperties;
13+
import org.eclipse.paho.client.mqttv3.test.utilities.ConnectionManipulationProxyServer;
14+
import org.eclipse.paho.client.mqttv3.test.utilities.Utility;
15+
import org.junit.BeforeClass;
16+
import org.junit.Test;
17+
import org.junit.AfterClass;
18+
import org.junit.Assert;
19+
20+
public class AutomaticReconnectTest{
21+
22+
static final Class<?> cclass = AutomaticReconnectTest.class;
23+
private static final String className = cclass.getName();
24+
private static final Logger log = Logger.getLogger(className);
25+
26+
private static final MemoryPersistence DATA_STORE = new MemoryPersistence();
27+
28+
29+
private static URI serverURI;
30+
private String clientId = "device-client-id";
31+
static ConnectionManipulationProxyServer proxy;
32+
33+
34+
@BeforeClass
35+
public static void setUpBeforeClass() throws Exception{
36+
try {
37+
String methodName = Utility.getMethodName();
38+
LoggingUtilities.banner(log, cclass, methodName);
39+
serverURI = TestProperties.getServerURI();
40+
proxy = new ConnectionManipulationProxyServer(serverURI.getHost(), serverURI.getPort(), 4242);
41+
proxy.startProxy();
42+
} catch (Exception exception) {
43+
log.log(Level.SEVERE, "caught exception:", exception);
44+
throw exception;
45+
}
46+
47+
}
48+
49+
@AfterClass
50+
public static void tearDownAfterClass() throws Exception {
51+
log.info("Tests finished, stopping proxy");
52+
proxy.stopProxy();
53+
54+
}
55+
56+
57+
/**
58+
* Tests that if a connection is opened and then is lost that the client automatically reconnects
59+
* @throws Exception
60+
*/
61+
@Test
62+
public void testAutomaticReconnectAfterDisconnect() throws Exception{
63+
String methodName = Utility.getMethodName();
64+
LoggingUtilities.banner(log, cclass, methodName);
65+
MqttConnectOptions options = new MqttConnectOptions();
66+
options.setCleanSession(true);
67+
options.setAutomaticReconnect(true);
68+
final MqttClient client = new MqttClient("tcp://localhost:4242", clientId, DATA_STORE);
69+
70+
proxy.enableProxy();
71+
client.connect(options);
72+
73+
boolean isConnected = client.isConnected();
74+
log.info("First Connection isConnected: " + isConnected);
75+
Assert.assertTrue(isConnected);
76+
77+
proxy.disableProxy();
78+
isConnected = client.isConnected();
79+
log.info("Proxy Disconnect isConnected: " + isConnected);
80+
Assert.assertFalse(isConnected);
81+
82+
proxy.enableProxy();
83+
// give it some time to reconnect
84+
long currentTime = System.currentTimeMillis();
85+
int timeout = 4000;
86+
while(client.isConnected() == false){
87+
long now = System.currentTimeMillis();
88+
if((currentTime + timeout) < now){
89+
log.warning("Timeout Exceeded");
90+
break;
91+
}
92+
Thread.sleep(500);
93+
}
94+
isConnected = client.isConnected();
95+
log.info("Proxy Re-Enabled isConnected: " + isConnected);
96+
Assert.assertTrue(isConnected);
97+
client.disconnect();
98+
Assert.assertFalse(client.isConnected());
99+
}
100+
101+
/**
102+
* Tests that if a connection is opened and lost, that when the user calls reconnect() that the
103+
* client will attempt to reconnect straight away
104+
*/
105+
@Test
106+
public void testManualReconnectAfterDisconnect() throws Exception {
107+
String methodName = Utility.getMethodName();
108+
LoggingUtilities.banner(log, cclass, methodName);
109+
MqttConnectOptions options = new MqttConnectOptions();
110+
options.setCleanSession(true);
111+
options.setAutomaticReconnect(true);
112+
113+
final MqttClient client = new MqttClient("tcp://localhost:4242", clientId, DATA_STORE);
114+
115+
proxy.enableProxy();
116+
client.connect(options);
117+
118+
boolean isConnected = client.isConnected();
119+
log.info("First Connection isConnected: " + isConnected);
120+
Assert.assertTrue(isConnected);
121+
122+
proxy.disableProxy();
123+
isConnected = client.isConnected();
124+
log.info("Proxy Disconnect isConnected: " + isConnected);
125+
Assert.assertFalse(isConnected);
126+
127+
proxy.enableProxy();
128+
client.reconnect();
129+
// give it some time to reconnect
130+
Thread.sleep(4000);
131+
isConnected = client.isConnected();
132+
log.info("Proxy Re-Enabled isConnected: " + isConnected);
133+
Assert.assertTrue(isConnected);
134+
client.disconnect();
135+
Assert.assertFalse(client.isConnected());
136+
}
137+
138+
139+
/**
140+
* Tests that if the initial connection attempt fails, that the automatic reconnect code does NOT
141+
* engage.
142+
*/
143+
@Test
144+
public void testNoAutomaticReconnectWithNoInitialConnect() throws Exception {
145+
String methodName = Utility.getMethodName();
146+
LoggingUtilities.banner(log, cclass, methodName);
147+
MqttConnectOptions options = new MqttConnectOptions();
148+
options.setCleanSession(true);
149+
options.setAutomaticReconnect(true);
150+
options.setConnectionTimeout(15);
151+
final MqttClient client = new MqttClient("tcp://localhost:4242", clientId, DATA_STORE);
152+
153+
// Make sure the proxy is disabled and give it a second to close everything down
154+
proxy.disableProxy();
155+
try {
156+
client.connect(options);
157+
} catch (MqttException ex) {
158+
// Exceptions are good in this case!
159+
}
160+
boolean isConnected = client.isConnected();
161+
log.info("First Connection isConnected: " + isConnected);
162+
Assert.assertFalse(isConnected);
163+
164+
// Enable The Proxy
165+
proxy.enableProxy();
166+
167+
// Give it some time to make sure we are still not connected
168+
long currentTime = System.currentTimeMillis();
169+
int timeout = 4000;
170+
while(client.isConnected() == false){
171+
long now = System.currentTimeMillis();
172+
if((currentTime + timeout) < now){
173+
Assert.assertFalse(isConnected);
174+
break;
175+
}
176+
Thread.sleep(500);
177+
}
178+
isConnected = client.isConnected();
179+
log.info("Proxy Re-Enabled isConnected: " + isConnected);
180+
Assert.assertFalse(isConnected);
181+
182+
}
183+
}

0 commit comments

Comments
 (0)