Skip to content

Commit 7de5c13

Browse files
committed
Issue #520 - Clearing Topic Alias Map on socket disconnect and adding Test to verify.
Signed-off-by: James Sutton <james.sutton@uk.ibm.com>
1 parent 66de59c commit 7de5c13

21 files changed

Lines changed: 332 additions & 170 deletions

File tree

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientState.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,17 @@ protected void clearState() throws MqttException {
240240
outboundQoS0.clear();
241241
inboundQoS2.clear();
242242
tokenStore.clear();
243+
outgoingTopicAliases.clear();
244+
incomingTopicAliases.clear();
245+
}
246+
247+
protected void clearConnectionState() throws MqttException {
248+
final String methodName = "clearConnectionState";
249+
// @TRACE=665=Clearing Connection State (Topic Aliases)
250+
log.fine(CLASS_NAME, methodName, "665");
251+
outgoingTopicAliases.clear();
252+
incomingTopicAliases.clear();
253+
243254
}
244255

245256
private MqttWireMessage restoreMessage(String key, MqttPersistable persistable) throws MqttException {
@@ -1350,6 +1361,8 @@ public void disconnected(MqttException reason) {
13501361
if (cleanSession) {
13511362
clearState();
13521363
}
1364+
1365+
clearConnectionState();
13531366

13541367
pendingMessages.clear();
13551368
pendingFlows.clear();

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/CommsReceiver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ public void run() {
162162
}
163163
} else if (message != null && message instanceof MqttDisconnect) {
164164
// This is a Disconnect Message
165-
clientComms.shutdownConnection(null, new MqttException(MqttClientException.REASON_CODE_SERVER_DISCONNECTED), (MqttDisconnect) message);
165+
clientComms.shutdownConnection(null, new MqttException(MqttClientException.REASON_CODE_SERVER_DISCONNECTED, (MqttDisconnect) message), (MqttDisconnect) message);
166166
} else {
167167
if (message != null) {
168168
// A new message has arrived

org.eclipse.paho.mqttv5.client/src/main/resources/org/eclipse/paho/mqttv5/client/internal/nls/logcat.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@
141141
662=no message found for ack id={0}
142142
663=Disconnect message received from Server. Details={0}
143143
664=[MQTT-4.3.3-4] - A Reason code greater than 0x80 (128) was received in an incoming PUBREC id={0} rc={1}, halting QoS 2 flow.
144+
665=Clearing Connection State (Topic Aliases)
144145
700=stopping
145146
701=notify workAvailable and wait for run
146147
703=stopped

org.eclipse.paho.mqttv5.client/src/main/resources/org/eclipse/paho/mqttv5/client/internal/nls/messages.properties

Lines changed: 0 additions & 38 deletions
This file was deleted.

org.eclipse.paho.mqttv5.client/src/test/java/org/eclipse/paho/mqttv5/client/test/automaticReconnect/AutomaticReconnectTest.java

Lines changed: 125 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -17,172 +17,173 @@
1717
import org.junit.BeforeClass;
1818
import org.junit.Test;
1919

20-
public class AutomaticReconnectTest{
21-
20+
public class AutomaticReconnectTest {
21+
2222
static final Class<?> cclass = AutomaticReconnectTest.class;
2323
private static final String className = cclass.getName();
2424
private static final Logger log = Logger.getLogger(className);
25-
25+
2626
private static final MemoryPersistence DATA_STORE = new MemoryPersistence();
2727

28-
2928
private static URI serverURI;
30-
private String clientId = "device-client-id";
29+
private String clientId = "device-client-id";
3130
static ConnectionManipulationProxyServer proxy;
32-
33-
31+
3432
@BeforeClass
35-
public static void setUpBeforeClass() throws Exception{
33+
public static void setUpBeforeClass() throws Exception {
3634
try {
3735
String methodName = Utility.getMethodName();
3836
LoggingUtilities.banner(log, cclass, methodName);
3937
serverURI = TestProperties.getServerURI();
4038
// Use 0 for the first time.
4139
proxy = new ConnectionManipulationProxyServer(serverURI.getHost(), serverURI.getPort(), 0);
4240
proxy.startProxy();
43-
while(!proxy.isPortSet()){
41+
while (!proxy.isPortSet()) {
4442
Thread.sleep(0);
4543
}
4644
log.log(Level.INFO, "Proxy Started, port set to: " + proxy.getLocalPort());
4745
} catch (Exception exception) {
48-
log.log(Level.SEVERE, "caught exception:", exception);
49-
throw exception;
50-
}
51-
46+
log.log(Level.SEVERE, "caught exception:", exception);
47+
throw exception;
48+
}
49+
5250
}
53-
51+
5452
@AfterClass
5553
public static void tearDownAfterClass() throws Exception {
5654
log.info("Tests finished, stopping proxy");
5755
proxy.stopProxy();
58-
56+
5957
}
60-
6158

6259
/**
63-
* Tests that if a connection is opened and then is lost that the client automatically reconnects
60+
* Tests that if a connection is opened and then is lost that the client
61+
* automatically reconnects
62+
*
6463
* @throws Exception
6564
*/
6665
@Test
67-
public void testAutomaticReconnectAfterDisconnect() throws Exception{
68-
String methodName = Utility.getMethodName();
69-
LoggingUtilities.banner(log, cclass, methodName);
70-
MqttConnectionOptions options = new MqttConnectionOptions();
71-
options.setCleanSession(true);
72-
options.setAutomaticReconnect(true);
73-
final MqttLegacyBlockingClient client = new MqttLegacyBlockingClient("tcp://localhost:" + proxy.getLocalPort(), clientId, DATA_STORE);
74-
75-
proxy.enableProxy();
76-
client.connect(options);
77-
78-
boolean isConnected = client.isConnected();
79-
log.info("First Connection isConnected: " + isConnected);
80-
Assert.assertTrue(isConnected);
81-
82-
proxy.disableProxy();
83-
isConnected = client.isConnected();
84-
log.info("Proxy Disconnect isConnected: " + isConnected);
85-
Assert.assertFalse(isConnected);
86-
87-
proxy.enableProxy();
88-
// give it some time to reconnect
89-
long currentTime = System.currentTimeMillis();
90-
int timeout = 4000;
91-
while(client.isConnected() == false){
92-
long now = System.currentTimeMillis();
93-
if((currentTime + timeout) < now){
94-
log.warning("Timeout Exceeded");
95-
break;
96-
}
97-
Thread.sleep(500);
98-
}
99-
isConnected = client.isConnected();
100-
log.info("Proxy Re-Enabled isConnected: " + isConnected);
101-
Assert.assertTrue(isConnected);
102-
client.disconnect();
103-
Assert.assertFalse(client.isConnected());
66+
public void testAutomaticReconnectAfterDisconnect() throws Exception {
67+
String methodName = Utility.getMethodName();
68+
LoggingUtilities.banner(log, cclass, methodName);
69+
MqttConnectionOptions options = new MqttConnectionOptions();
70+
options.setCleanSession(true);
71+
options.setAutomaticReconnect(true);
72+
final MqttLegacyBlockingClient client = new MqttLegacyBlockingClient("tcp://localhost:" + proxy.getLocalPort(),
73+
clientId, DATA_STORE);
74+
75+
proxy.enableProxy();
76+
client.connect(options);
77+
78+
boolean isConnected = client.isConnected();
79+
log.info("First Connection isConnected: " + isConnected);
80+
Assert.assertTrue(isConnected);
81+
82+
proxy.disableProxy();
83+
isConnected = client.isConnected();
84+
log.info("Proxy Disconnect isConnected: " + isConnected);
85+
Assert.assertFalse(isConnected);
86+
87+
proxy.enableProxy();
88+
// give it some time to reconnect
89+
long currentTime = System.currentTimeMillis();
90+
int timeout = 4000;
91+
while (client.isConnected() == false) {
92+
long now = System.currentTimeMillis();
93+
if ((currentTime + timeout) < now) {
94+
log.warning("Timeout Exceeded");
95+
break;
96+
}
97+
Thread.sleep(500);
98+
}
99+
isConnected = client.isConnected();
100+
log.info("Proxy Re-Enabled isConnected: " + isConnected);
101+
Assert.assertTrue(isConnected);
102+
client.disconnect();
103+
Assert.assertFalse(client.isConnected());
104104
}
105-
105+
106106
/**
107-
* Tests that if a connection is opened and lost, that when the user calls reconnect() that the
108-
* client will attempt to reconnect straight away
107+
* Tests that if a connection is opened and lost, that when the user calls
108+
* reconnect() that the client will attempt to reconnect straight away
109109
*/
110110
@Test
111111
public void testManualReconnectAfterDisconnect() throws Exception {
112-
String methodName = Utility.getMethodName();
113-
LoggingUtilities.banner(log, cclass, methodName);
112+
String methodName = Utility.getMethodName();
113+
LoggingUtilities.banner(log, cclass, methodName);
114114
MqttConnectionOptions options = new MqttConnectionOptions();
115-
options.setCleanSession(true);
116-
options.setAutomaticReconnect(true);
117-
118-
final MqttLegacyBlockingClient client = new MqttLegacyBlockingClient("tcp://localhost:" + proxy.getLocalPort(), clientId, DATA_STORE);
119-
120-
proxy.enableProxy();
121-
client.connect(options);
122-
123-
boolean isConnected = client.isConnected();
124-
log.info("First Connection isConnected: " + isConnected);
125-
Assert.assertTrue(isConnected);
126-
127-
proxy.disableProxy();
128-
isConnected = client.isConnected();
129-
log.info("Proxy Disconnect isConnected: " + isConnected);
130-
Assert.assertFalse(isConnected);
131-
132-
proxy.enableProxy();
133-
client.reconnect();
134-
// give it some time to reconnect
135-
Thread.sleep(4000);
136-
isConnected = client.isConnected();
137-
log.info("Proxy Re-Enabled isConnected: " + isConnected);
138-
Assert.assertTrue(isConnected);
139-
client.disconnect();
140-
Assert.assertFalse(client.isConnected());
141-
}
115+
options.setCleanSession(true);
116+
options.setAutomaticReconnect(true);
117+
118+
final MqttLegacyBlockingClient client = new MqttLegacyBlockingClient("tcp://localhost:" + proxy.getLocalPort(),
119+
clientId, DATA_STORE);
120+
121+
proxy.enableProxy();
122+
client.connect(options);
142123

124+
boolean isConnected = client.isConnected();
125+
log.info("First Connection isConnected: " + isConnected);
126+
Assert.assertTrue(isConnected);
127+
128+
proxy.disableProxy();
129+
isConnected = client.isConnected();
130+
log.info("Proxy Disconnect isConnected: " + isConnected);
131+
Assert.assertFalse(isConnected);
132+
133+
proxy.enableProxy();
134+
client.reconnect();
135+
// give it some time to reconnect
136+
Thread.sleep(4000);
137+
isConnected = client.isConnected();
138+
log.info("Proxy Re-Enabled isConnected: " + isConnected);
139+
Assert.assertTrue(isConnected);
140+
client.disconnect();
141+
Assert.assertFalse(client.isConnected());
142+
}
143143

144144
/**
145-
* Tests that if the initial connection attempt fails, that the automatic reconnect code does NOT
146-
* engage.
145+
* Tests that if the initial connection attempt fails, that the automatic
146+
* reconnect code does NOT engage.
147147
*/
148148
@Test
149149
public void testNoAutomaticReconnectWithNoInitialConnect() throws Exception {
150-
String methodName = Utility.getMethodName();
151-
LoggingUtilities.banner(log, cclass, methodName);
152-
MqttConnectionOptions options = new MqttConnectionOptions();
153-
options.setCleanSession(true);
154-
options.setAutomaticReconnect(true);
155-
options.setConnectionTimeout(15);
156-
final MqttLegacyBlockingClient client = new MqttLegacyBlockingClient("tcp://localhost:" + proxy.getLocalPort(), clientId, DATA_STORE);
157-
158-
// Make sure the proxy is disabled and give it a second to close everything down
159-
proxy.disableProxy();
160-
try {
161-
client.connect(options);
162-
} catch (MqttException ex) {
163-
// Exceptions are good in this case!
164-
}
165-
boolean isConnected = client.isConnected();
166-
log.info("First Connection isConnected: " + isConnected);
167-
Assert.assertFalse(isConnected);
168-
169-
// Enable The Proxy
170-
proxy.enableProxy();
171-
172-
// Give it some time to make sure we are still not connected
173-
long currentTime = System.currentTimeMillis();
174-
int timeout = 4000;
175-
while(client.isConnected() == false){
176-
long now = System.currentTimeMillis();
177-
if((currentTime + timeout) < now){
178-
Assert.assertFalse(isConnected);
179-
break;
180-
}
181-
Thread.sleep(500);
182-
}
183-
isConnected = client.isConnected();
184-
log.info("Proxy Re-Enabled isConnected: " + isConnected);
185-
Assert.assertFalse(isConnected);
150+
String methodName = Utility.getMethodName();
151+
LoggingUtilities.banner(log, cclass, methodName);
152+
MqttConnectionOptions options = new MqttConnectionOptions();
153+
options.setCleanSession(true);
154+
options.setAutomaticReconnect(true);
155+
options.setConnectionTimeout(15);
156+
final MqttLegacyBlockingClient client = new MqttLegacyBlockingClient("tcp://localhost:" + proxy.getLocalPort(),
157+
clientId, DATA_STORE);
158+
159+
// Make sure the proxy is disabled and give it a second to close everything down
160+
proxy.disableProxy();
161+
try {
162+
client.connect(options);
163+
} catch (MqttException ex) {
164+
// Exceptions are good in this case!
165+
}
166+
boolean isConnected = client.isConnected();
167+
log.info("First Connection isConnected: " + isConnected);
168+
Assert.assertFalse(isConnected);
169+
170+
// Enable The Proxy
171+
proxy.enableProxy();
172+
173+
// Give it some time to make sure we are still not connected
174+
long currentTime = System.currentTimeMillis();
175+
int timeout = 4000;
176+
while (client.isConnected() == false) {
177+
long now = System.currentTimeMillis();
178+
if ((currentTime + timeout) < now) {
179+
Assert.assertFalse(isConnected);
180+
break;
181+
}
182+
Thread.sleep(500);
183+
}
184+
isConnected = client.isConnected();
185+
log.info("Proxy Re-Enabled isConnected: " + isConnected);
186+
Assert.assertFalse(isConnected);
186187

187188
}
188189
}

0 commit comments

Comments
 (0)