Skip to content

Commit 66de59c

Browse files
committed
Issue #527 - Stripping Topic Alias when buffering offline message and adding test
Signed-off-by: James Sutton <james.sutton@uk.ibm.com>
1 parent 33ba034 commit 66de59c

5 files changed

Lines changed: 728 additions & 2 deletions

File tree

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientComms.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,12 @@ public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttExce
197197
// @TRACE 507=Client Connected, Offline Buffer available, but not empty. Adding
198198
// message to buffer. message={0}
199199
log.fine(CLASS_NAME, methodName, "507", new Object[] { message.getKey() });
200+
// If the message is a publish, strip the topic alias:
201+
if(message instanceof MqttPublish && message.getProperties().getTopicAlias()!= null) {
202+
MqttProperties messageProps = message.getProperties();
203+
messageProps.setTopicAlias(null);
204+
message.setProperties(messageProps);
205+
}
200206
if (disconnectedMessageBuffer.isPersistBuffer()) {
201207
this.clientState.persistBufferedMessage(message);
202208
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
package org.eclipse.paho.mqttv5.client.test.automaticReconnect;
2+
3+
import java.net.URI;
4+
import java.util.logging.Level;
5+
import java.util.logging.Logger;
6+
7+
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
8+
import org.eclipse.paho.mqttv5.client.MqttLegacyBlockingClient;
9+
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
10+
import org.eclipse.paho.mqttv5.client.test.logging.LoggingUtilities;
11+
import org.eclipse.paho.mqttv5.client.test.properties.TestProperties;
12+
import org.eclipse.paho.mqttv5.client.test.utilities.ConnectionManipulationProxyServer;
13+
import org.eclipse.paho.mqttv5.client.test.utilities.Utility;
14+
import org.eclipse.paho.mqttv5.common.MqttException;
15+
import org.junit.AfterClass;
16+
import org.junit.Assert;
17+
import org.junit.BeforeClass;
18+
import org.junit.Test;
19+
20+
public class AutomaticReconnectTest{
21+
22+
static final Class<?> cclass = AutomaticReconnectTest.class;
23+
private static final String className = cclass.getName();
24+
private static final Logger log = Logger.getLogger(className);
25+
26+
private static final MemoryPersistence DATA_STORE = new MemoryPersistence();
27+
28+
29+
private static URI serverURI;
30+
private String clientId = "device-client-id";
31+
static ConnectionManipulationProxyServer proxy;
32+
33+
34+
@BeforeClass
35+
public static void setUpBeforeClass() throws Exception{
36+
try {
37+
String methodName = Utility.getMethodName();
38+
LoggingUtilities.banner(log, cclass, methodName);
39+
serverURI = TestProperties.getServerURI();
40+
// Use 0 for the first time.
41+
proxy = new ConnectionManipulationProxyServer(serverURI.getHost(), serverURI.getPort(), 0);
42+
proxy.startProxy();
43+
while(!proxy.isPortSet()){
44+
Thread.sleep(0);
45+
}
46+
log.log(Level.INFO, "Proxy Started, port set to: " + proxy.getLocalPort());
47+
} catch (Exception exception) {
48+
log.log(Level.SEVERE, "caught exception:", exception);
49+
throw exception;
50+
}
51+
52+
}
53+
54+
@AfterClass
55+
public static void tearDownAfterClass() throws Exception {
56+
log.info("Tests finished, stopping proxy");
57+
proxy.stopProxy();
58+
59+
}
60+
61+
62+
/**
63+
* Tests that if a connection is opened and then is lost that the client automatically reconnects
64+
* @throws Exception
65+
*/
66+
@Test
67+
public void testAutomaticReconnectAfterDisconnect() throws Exception{
68+
String methodName = Utility.getMethodName();
69+
LoggingUtilities.banner(log, cclass, methodName);
70+
MqttConnectionOptions options = new MqttConnectionOptions();
71+
options.setCleanSession(true);
72+
options.setAutomaticReconnect(true);
73+
final MqttLegacyBlockingClient client = new MqttLegacyBlockingClient("tcp://localhost:" + proxy.getLocalPort(), clientId, DATA_STORE);
74+
75+
proxy.enableProxy();
76+
client.connect(options);
77+
78+
boolean isConnected = client.isConnected();
79+
log.info("First Connection isConnected: " + isConnected);
80+
Assert.assertTrue(isConnected);
81+
82+
proxy.disableProxy();
83+
isConnected = client.isConnected();
84+
log.info("Proxy Disconnect isConnected: " + isConnected);
85+
Assert.assertFalse(isConnected);
86+
87+
proxy.enableProxy();
88+
// give it some time to reconnect
89+
long currentTime = System.currentTimeMillis();
90+
int timeout = 4000;
91+
while(client.isConnected() == false){
92+
long now = System.currentTimeMillis();
93+
if((currentTime + timeout) < now){
94+
log.warning("Timeout Exceeded");
95+
break;
96+
}
97+
Thread.sleep(500);
98+
}
99+
isConnected = client.isConnected();
100+
log.info("Proxy Re-Enabled isConnected: " + isConnected);
101+
Assert.assertTrue(isConnected);
102+
client.disconnect();
103+
Assert.assertFalse(client.isConnected());
104+
}
105+
106+
/**
107+
* Tests that if a connection is opened and lost, that when the user calls reconnect() that the
108+
* client will attempt to reconnect straight away
109+
*/
110+
@Test
111+
public void testManualReconnectAfterDisconnect() throws Exception {
112+
String methodName = Utility.getMethodName();
113+
LoggingUtilities.banner(log, cclass, methodName);
114+
MqttConnectionOptions options = new MqttConnectionOptions();
115+
options.setCleanSession(true);
116+
options.setAutomaticReconnect(true);
117+
118+
final MqttLegacyBlockingClient client = new MqttLegacyBlockingClient("tcp://localhost:" + proxy.getLocalPort(), clientId, DATA_STORE);
119+
120+
proxy.enableProxy();
121+
client.connect(options);
122+
123+
boolean isConnected = client.isConnected();
124+
log.info("First Connection isConnected: " + isConnected);
125+
Assert.assertTrue(isConnected);
126+
127+
proxy.disableProxy();
128+
isConnected = client.isConnected();
129+
log.info("Proxy Disconnect isConnected: " + isConnected);
130+
Assert.assertFalse(isConnected);
131+
132+
proxy.enableProxy();
133+
client.reconnect();
134+
// give it some time to reconnect
135+
Thread.sleep(4000);
136+
isConnected = client.isConnected();
137+
log.info("Proxy Re-Enabled isConnected: " + isConnected);
138+
Assert.assertTrue(isConnected);
139+
client.disconnect();
140+
Assert.assertFalse(client.isConnected());
141+
}
142+
143+
144+
/**
145+
* Tests that if the initial connection attempt fails, that the automatic reconnect code does NOT
146+
* engage.
147+
*/
148+
@Test
149+
public void testNoAutomaticReconnectWithNoInitialConnect() throws Exception {
150+
String methodName = Utility.getMethodName();
151+
LoggingUtilities.banner(log, cclass, methodName);
152+
MqttConnectionOptions options = new MqttConnectionOptions();
153+
options.setCleanSession(true);
154+
options.setAutomaticReconnect(true);
155+
options.setConnectionTimeout(15);
156+
final MqttLegacyBlockingClient client = new MqttLegacyBlockingClient("tcp://localhost:" + proxy.getLocalPort(), clientId, DATA_STORE);
157+
158+
// Make sure the proxy is disabled and give it a second to close everything down
159+
proxy.disableProxy();
160+
try {
161+
client.connect(options);
162+
} catch (MqttException ex) {
163+
// Exceptions are good in this case!
164+
}
165+
boolean isConnected = client.isConnected();
166+
log.info("First Connection isConnected: " + isConnected);
167+
Assert.assertFalse(isConnected);
168+
169+
// Enable The Proxy
170+
proxy.enableProxy();
171+
172+
// Give it some time to make sure we are still not connected
173+
long currentTime = System.currentTimeMillis();
174+
int timeout = 4000;
175+
while(client.isConnected() == false){
176+
long now = System.currentTimeMillis();
177+
if((currentTime + timeout) < now){
178+
Assert.assertFalse(isConnected);
179+
break;
180+
}
181+
Thread.sleep(500);
182+
}
183+
isConnected = client.isConnected();
184+
log.info("Proxy Re-Enabled isConnected: " + isConnected);
185+
Assert.assertFalse(isConnected);
186+
187+
}
188+
}

0 commit comments

Comments
 (0)