Skip to content

Commit 43bc967

Browse files
committed
Issue #510 - MQTTv5 - Don't persist topic Aliases
Signed-off-by: James Sutton <james.sutton@uk.ibm.com>
1 parent 37113b1 commit 43bc967

3 files changed

Lines changed: 233 additions & 1 deletion

File tree

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package org.eclipse.paho.mqttv5.client.test;
2+
3+
import java.net.URI;
4+
import java.net.URISyntaxException;
5+
import java.util.logging.Logger;
6+
7+
import org.eclipse.paho.mqttv5.client.IMqttDeliveryToken;
8+
import org.eclipse.paho.mqttv5.client.IMqttToken;
9+
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
10+
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
11+
import org.eclipse.paho.mqttv5.client.internal.MqttPersistentData;
12+
import org.eclipse.paho.mqttv5.client.test.properties.TestProperties;
13+
import org.eclipse.paho.mqttv5.client.test.utilities.TestByteArrayMemoryPersistence;
14+
import org.eclipse.paho.mqttv5.common.MqttException;
15+
import org.eclipse.paho.mqttv5.common.MqttMessage;
16+
import org.eclipse.paho.mqttv5.common.MqttPersistable;
17+
import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage;
18+
import org.junit.Assert;
19+
import org.junit.Test;
20+
21+
public class PersistenceTests {
22+
private static URI serverURI;
23+
private static final String className = PersistenceTests.class.getName();
24+
private static final Logger log = Logger.getLogger(className);
25+
26+
27+
@Test
28+
/**
29+
* Tests that an MQTTv5 client persists MQTTPublish Messages correctly,
30+
* specifically that Topic Aliases are not persisted.
31+
*/
32+
public void persistMessageWithTopicAlias() throws URISyntaxException, MqttException {
33+
serverURI = TestProperties.getServerURI();
34+
TestByteArrayMemoryPersistence memoryPersistence = new TestByteArrayMemoryPersistence();
35+
36+
// Create an MqttAsyncClient with a null Client ID.
37+
MqttAsyncClient client = new MqttAsyncClient(serverURI.toString(), "testClientId", memoryPersistence, null, null);
38+
39+
MqttConnectionOptions options = new MqttConnectionOptions();
40+
options.setTopicAliasMaximum(10);
41+
42+
IMqttToken connectToken = client.connect(options);
43+
connectToken.waitForCompletion(1000);
44+
Assert.assertTrue("The client should be connected.", client.isConnected());
45+
46+
// Publish a message at QoS 2
47+
MqttMessage message = new MqttMessage("Test Message".getBytes(), 2, false, null);
48+
IMqttDeliveryToken deliveryToken = client.publish("testTopic", message);
49+
deliveryToken.waitForCompletion(1000);
50+
51+
// Validate that the message that was persisted does not have a topic alias.
52+
String expectedKey = "s-1";
53+
Assert.assertNotNull(memoryPersistence.getDataCache());
54+
Assert.assertNotNull(memoryPersistence.getDataCache().get(expectedKey));
55+
byte[] messageBytes = (byte[]) memoryPersistence.getDataCache().get(expectedKey);
56+
MqttPersistable persistable = new MqttPersistentData(expectedKey, messageBytes, 0, messageBytes.length, null, 0, 0);
57+
MqttWireMessage wireMessage = MqttWireMessage.createWireMessage(persistable);
58+
Assert.assertNull(wireMessage.getProperties().getTopicAlias());
59+
60+
// Cleanup
61+
IMqttToken disconnectToken = client.disconnect();
62+
disconnectToken.waitForCompletion(1000);
63+
Assert.assertFalse("The client should now be disconnected.", client.isConnected());
64+
client.close();
65+
66+
}
67+
68+
69+
70+
71+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2009, 2014 IBM Corp.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Public License v1.0
6+
* and Eclipse Distribution License v1.0 which accompany this distribution.
7+
*
8+
* The Eclipse Public License is available at
9+
* http://www.eclipse.org/legal/epl-v10.html
10+
* and the Eclipse Distribution License is available at
11+
* http://www.eclipse.org/org/documents/edl-v10.php.
12+
*
13+
* Contributors:
14+
* Dave Locke - initial API and implementation and/or initial documentation
15+
*/
16+
package org.eclipse.paho.mqttv5.client.test.utilities;
17+
18+
import java.nio.ByteBuffer;
19+
import java.util.Enumeration;
20+
import java.util.Hashtable;
21+
22+
import org.eclipse.paho.mqttv5.client.MqttClientPersistence;
23+
import org.eclipse.paho.mqttv5.client.internal.MqttPersistentData;
24+
import org.eclipse.paho.mqttv5.common.MqttPersistable;
25+
import org.eclipse.paho.mqttv5.common.MqttPersistenceException;
26+
27+
/**
28+
* Persistence that uses memory
29+
*
30+
* In cases where reliability is not required across client or device
31+
* restarts memory this memory persistence can be used. In cases where
32+
* reliability is required like when clean session is set to false
33+
* then a non-volatile form of persistence should be used.
34+
*/
35+
public class TestByteArrayMemoryPersistence implements MqttClientPersistence {
36+
37+
private Hashtable<String, byte[]> data;
38+
private Hashtable<String, byte[]> dataCache;
39+
private String clientId;
40+
private String serverURI;
41+
42+
/* (non-Javadoc)
43+
* @see org.eclipse.paho.client.mqttv3.MqttClientPersistence#close()
44+
*/
45+
public void close() throws MqttPersistenceException {
46+
//data.clear();
47+
}
48+
49+
/* (non-Javadoc)
50+
* @see org.eclipse.paho.client.mqttv3.MqttClientPersistence#keys()
51+
*/
52+
public Enumeration<String> keys() throws MqttPersistenceException {
53+
return data.keys();
54+
}
55+
56+
/* (non-Javadoc)
57+
* @see org.eclipse.paho.client.mqttv3.MqttClientPersistence#get(java.lang.String)
58+
*/
59+
public MqttPersistable get(String key) throws MqttPersistenceException {
60+
byte[] persistedMessage = data.get(key);
61+
MqttPersistable message = new MqttPersistentData(key, persistedMessage, 0, persistedMessage.length, null, 0, 0);
62+
return message;
63+
}
64+
65+
/* (non-Javadoc)
66+
* @see org.eclipse.paho.client.mqttv3.MqttClientPersistence#open(java.lang.String, java.lang.String)
67+
*/
68+
public void open(String clientId, String serverURI) throws MqttPersistenceException {
69+
this.clientId = clientId;
70+
this.serverURI = serverURI;
71+
if(this.data == null){
72+
this.data = new Hashtable<String, byte[]>();
73+
}
74+
}
75+
76+
/* (non-Javadoc)
77+
* @see org.eclipse.paho.client.mqttv3.MqttClientPersistence#put(java.lang.String, org.eclipse.paho.client.mqttv3.MqttPersistable)
78+
*/
79+
public void put(String key, MqttPersistable persistable) throws MqttPersistenceException {
80+
int length = persistable.getHeaderLength() + persistable.getPayloadLength();
81+
ByteBuffer messageByteBuffer = ByteBuffer.wrap(new byte[length]).put(persistable.getHeaderBytes());
82+
if(persistable.getPayloadBytes() != null) {
83+
messageByteBuffer.put(persistable.getPayloadBytes());
84+
}
85+
byte[] messageBytes = messageByteBuffer.array();
86+
data.put(key, messageBytes);
87+
dataCache.put(key, messageBytes);
88+
}
89+
90+
/* (non-Javadoc)
91+
* @see org.eclipse.paho.client.mqttv3.MqttClientPersistence#remove(java.lang.String)
92+
*/
93+
public void remove(String key) throws MqttPersistenceException {
94+
data.remove(key);
95+
}
96+
97+
/* (non-Javadoc)
98+
* @see org.eclipse.paho.client.mqttv3.MqttClientPersistence#clear()
99+
*/
100+
public void clear() throws MqttPersistenceException {
101+
data.clear();
102+
}
103+
104+
/* (non-Javadoc)
105+
* @see org.eclipse.paho.client.mqttv3.MqttClientPersistence#containsKey(java.lang.String)
106+
*/
107+
public boolean containsKey(String key) throws MqttPersistenceException {
108+
return data.containsKey(key);
109+
}
110+
111+
@Override
112+
public void open(String clientId) throws MqttPersistenceException {
113+
data = new Hashtable<String, byte[]>();
114+
dataCache = new Hashtable<String, byte[]>();
115+
116+
}
117+
118+
// Returns the Data cache
119+
public Hashtable<String, byte[]> getDataCache() {
120+
return dataCache;
121+
}
122+
123+
public Hashtable<String, byte[]> getData() {
124+
return data;
125+
}
126+
127+
public void setData(Hashtable<String, byte[]> data) {
128+
this.data = data;
129+
}
130+
131+
public String getClientId() {
132+
return clientId;
133+
}
134+
135+
public void setClientId(String clientId) {
136+
this.clientId = clientId;
137+
}
138+
139+
public String getServerURI() {
140+
return serverURI;
141+
}
142+
143+
public void setServerURI(String serverURI) {
144+
this.serverURI = serverURI;
145+
}
146+
147+
148+
}

org.eclipse.paho.mqttv5.common/src/main/java/org/eclipse/paho/mqttv5/common/packet/MqttPersistableWireMessage.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,21 @@ public MqttPersistableWireMessage(byte type) {
2727
}
2828

2929
public byte[] getHeaderBytes() throws MqttPersistenceException {
30+
byte[] headerBytes = null;
3031
try {
31-
return getHeader();
32+
if(this.getClass() == MqttPublish.class && this.getProperties().getTopicAlias() != null) {
33+
// Remove the Topic Alias temporarily.
34+
MqttProperties props = this.getProperties();
35+
Integer topicAlias = props.getTopicAlias();
36+
props.setTopicAlias(null);
37+
headerBytes = getHeader();
38+
// Re Set Topic Alias
39+
props.setTopicAlias(topicAlias);
40+
this.properties = props;
41+
} else {
42+
headerBytes = getHeader();
43+
}
44+
return headerBytes;
3245
}
3346
catch (MqttException ex) {
3447
throw new MqttPersistenceException(ex.getCause());

0 commit comments

Comments
 (0)