Skip to content

Commit ba2abaf

Browse files
committed
Issue #530 - Fixing client initialisation and adding test case for user properties
Signed-off-by: James Sutton <james.sutton@uk.ibm.com>
1 parent f78775a commit ba2abaf

4 files changed

Lines changed: 136 additions & 7 deletions

File tree

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientState.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ protected ClientState(MqttClientPersistence persistence, CommsTokenStore tokenSt
164164

165165
inUseMsgIds = new ConcurrentHashMap<>();
166166
pendingFlows = new Vector<MqttWireMessage>();
167+
pendingMessages = new Vector<MqttWireMessage>(mqttSession.getReceiveMaximum());
167168
outboundQoS2 = new ConcurrentHashMap<>();
168169
outboundQoS1 = new ConcurrentHashMap<>();
169170
outboundQoS0 = new ConcurrentHashMap<>();
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package org.eclipse.paho.mqttv5.client.test;
2+
3+
import java.net.URI;
4+
import java.util.ArrayList;
5+
import java.util.List;
6+
import java.util.UUID;
7+
import java.util.logging.Level;
8+
import java.util.logging.Logger;
9+
10+
import org.eclipse.paho.mqttv5.client.IMqttDeliveryToken;
11+
import org.eclipse.paho.mqttv5.client.IMqttToken;
12+
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
13+
import org.eclipse.paho.mqttv5.client.test.client.MqttClientFactoryPaho;
14+
import org.eclipse.paho.mqttv5.client.test.logging.LoggingUtilities;
15+
import org.eclipse.paho.mqttv5.client.test.properties.TestProperties;
16+
import org.eclipse.paho.mqttv5.client.test.utilities.MqttV5Receiver;
17+
import org.eclipse.paho.mqttv5.client.test.utilities.MqttV5Receiver.ReceivedMessage;
18+
import org.eclipse.paho.mqttv5.client.test.utilities.Utility;
19+
import org.eclipse.paho.mqttv5.common.MqttMessage;
20+
import org.eclipse.paho.mqttv5.common.MqttSubscription;
21+
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
22+
import org.eclipse.paho.mqttv5.common.packet.UserProperty;
23+
import org.junit.AfterClass;
24+
import org.junit.Assert;
25+
import org.junit.BeforeClass;
26+
import org.junit.Test;
27+
28+
public class SubscribeTests {
29+
30+
static final Class<?> cclass = SubscribeTests.class;
31+
private static final String className = cclass.getName();
32+
private static final Logger log = Logger.getLogger(className);
33+
34+
private static URI serverURI;
35+
private static MqttClientFactoryPaho clientFactory;
36+
private static String topicPrefix;
37+
38+
39+
@BeforeClass
40+
public static void setUpBeforeClass() throws Exception {
41+
try {
42+
String methodName = Utility.getMethodName();
43+
LoggingUtilities.banner(log, cclass, methodName);
44+
45+
serverURI = TestProperties.getServerURI();
46+
clientFactory = new MqttClientFactoryPaho();
47+
clientFactory.open();
48+
topicPrefix = "BasicTest-" + UUID.randomUUID().toString() + "-";
49+
50+
} catch (Exception exception) {
51+
log.log(Level.SEVERE , "caught exception:", exception);
52+
throw exception;
53+
}
54+
}
55+
56+
@AfterClass
57+
public static void tearDownAfterClass() throws Exception {
58+
String methodName = Utility.getMethodName();
59+
LoggingUtilities.banner(log, cclass, methodName);
60+
61+
try {
62+
if(clientFactory != null) {
63+
clientFactory.close();
64+
clientFactory.disconnect();
65+
}
66+
} catch (Exception exception) {
67+
log.log(Level.SEVERE, "caught exception:", exception);
68+
}
69+
}
70+
71+
@Test
72+
public void testPublishRecieveUserProperties() throws Exception {
73+
String methodName = Utility.getMethodName();
74+
LoggingUtilities.banner(log, cclass, methodName);
75+
String clientId = methodName;
76+
String exampleKey = "exampleKey";
77+
String exampleValue = "exampleValue";
78+
MqttAsyncClient asyncClient = new MqttAsyncClient(serverURI.toString(), clientId);
79+
MqttV5Receiver mqttV5Receiver = new MqttV5Receiver(asyncClient, LoggingUtilities.getPrintStream());
80+
asyncClient.setCallback(mqttV5Receiver);
81+
82+
// Connect to the server
83+
log.info("Connecting: [serverURI: " + serverURI + ", ClientId: " + clientId + "]");
84+
IMqttToken connectToken = asyncClient.connect();
85+
connectToken.waitForCompletion(5000);
86+
String clientId2 = asyncClient.getClientId();
87+
log.info("Client ID = " + clientId2);
88+
boolean isConnected = asyncClient.isConnected();
89+
log.info("isConnected: " + isConnected);
90+
91+
// Subscribe to a topic
92+
log.info("Subscribing to: " + topicPrefix + methodName);
93+
MqttSubscription subscription = new MqttSubscription(topicPrefix + methodName);
94+
IMqttToken subscribeToken = asyncClient.subscribe(new MqttSubscription[] {subscription});
95+
subscribeToken.waitForCompletion(5000);
96+
97+
// Publish a message to a random topic
98+
MqttProperties messageProps = new MqttProperties();
99+
List<UserProperty> userProps = new ArrayList<UserProperty>();
100+
userProps.add(new UserProperty(exampleKey, exampleValue));
101+
102+
messageProps.setUserProperties(userProps);
103+
MqttMessage testMessage = new MqttMessage("Test Payload".getBytes(), 2, false, messageProps);
104+
log.info("Publishing Message with User Properties to: " + topicPrefix + methodName);
105+
IMqttDeliveryToken deliveryToken = asyncClient.publish(topicPrefix + methodName, testMessage);
106+
deliveryToken.waitForCompletion(5000);
107+
108+
109+
log.info("Waiting for delivery and validating message.");
110+
ReceivedMessage receivedMessage = mqttV5Receiver.receiveNext(10000);
111+
MqttProperties receivedProps = receivedMessage.message.getProperties();
112+
Assert.assertEquals(1, receivedProps.getUserProperties().size());
113+
UserProperty recievedUserProps = receivedProps.getUserProperties().get(0);
114+
Assert.assertEquals(exampleKey, recievedUserProps.getKey());
115+
Assert.assertEquals(exampleValue, recievedUserProps.getValue());
116+
log.info("Received User Property: " + recievedUserProps.toString());
117+
118+
119+
log.info("Disconnecting...");
120+
IMqttToken disconnectToken = asyncClient.disconnect();
121+
disconnectToken.waitForCompletion(5000);
122+
Assert.assertFalse(asyncClient.isConnected());
123+
asyncClient.close();
124+
125+
}
126+
127+
128+
}

org.eclipse.paho.mqttv5.client/src/test/java/org/eclipse/paho/mqttv5/client/test/automaticReconnect/OfflineBufferingTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import org.eclipse.paho.mqttv5.client.test.logging.LoggingUtilities;
1717
import org.eclipse.paho.mqttv5.client.test.properties.TestProperties;
1818
import org.eclipse.paho.mqttv5.client.test.utilities.ConnectionManipulationProxyServer;
19-
import org.eclipse.paho.mqttv5.client.test.utilities.MqttV3Receiver;
19+
import org.eclipse.paho.mqttv5.client.test.utilities.MqttV5Receiver;
2020
import org.eclipse.paho.mqttv5.client.test.utilities.TestMemoryPersistence;
2121
import org.eclipse.paho.mqttv5.client.test.utilities.Utility;
2222
import org.eclipse.paho.mqttv5.client.util.Debug;
@@ -160,7 +160,7 @@ public void testManyMessageBufferAndDeliver() throws Exception {
160160

161161
// Create subscription client that won't be affected by proxy
162162
MqttAsyncClient subClient = new MqttAsyncClient(serverURIString, methodName + "sub-client");
163-
MqttV3Receiver mqttV3Receiver = new MqttV3Receiver(subClient, LoggingUtilities.getPrintStream());
163+
MqttV5Receiver mqttV3Receiver = new MqttV5Receiver(subClient, LoggingUtilities.getPrintStream());
164164
subClient.setCallback(mqttV3Receiver);
165165
IMqttToken subConnectToken = subClient.connect();
166166
subConnectToken.waitForCompletion(5000);
@@ -433,7 +433,7 @@ public void testUnPersistBufferedMessagesOnNewClient() throws Exception {
433433
// as soon as the main client connects
434434
log.info("Creating subscription client");
435435
MqttAsyncClient subClient = new MqttAsyncClient(serverURIString, methodName + "sub-client");
436-
MqttV3Receiver mqttV3Receiver = new MqttV3Receiver(subClient, LoggingUtilities.getPrintStream());
436+
MqttV5Receiver mqttV3Receiver = new MqttV5Receiver(subClient, LoggingUtilities.getPrintStream());
437437
subClient.setCallback(mqttV3Receiver);
438438
IMqttToken subConnectToken = subClient.connect();
439439
subConnectToken.waitForCompletion(5000);

org.eclipse.paho.mqttv5.client/src/test/java/org/eclipse/paho/mqttv5/client/test/utilities/MqttV3Receiver.java renamed to org.eclipse.paho.mqttv5.client/src/test/java/org/eclipse/paho/mqttv5/client/test/utilities/MqttV5Receiver.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@
3232
/**
3333
* Listen for in bound messages and connection loss.
3434
*/
35-
public class MqttV3Receiver implements MqttCallback {
35+
public class MqttV5Receiver implements MqttCallback {
3636

37-
static final String className = MqttV3Receiver.class.getName();
37+
static final String className = MqttV5Receiver.class.getName();
3838
static final Logger log = Logger.getLogger(className);
3939

4040
final static String TRACE_GROUP = "Test";
@@ -66,7 +66,7 @@ public class ReceivedMessage {
6666
* @param mqttClient
6767
* @param reportStream
6868
*/
69-
public MqttV3Receiver(IMqttClient mqttClient, PrintStream reportStream) {
69+
public MqttV5Receiver(IMqttClient mqttClient, PrintStream reportStream) {
7070
String methodName = Utility.getMethodName();
7171
log.entering(className, methodName);
7272

@@ -82,7 +82,7 @@ public MqttV3Receiver(IMqttClient mqttClient, PrintStream reportStream) {
8282
* @param mqttClient
8383
* @param reportStream
8484
*/
85-
public MqttV3Receiver(MqttAsyncClient mqttClient, PrintStream reportStream) {
85+
public MqttV5Receiver(MqttAsyncClient mqttClient, PrintStream reportStream) {
8686
String methodName = Utility.getMethodName();
8787
log.entering(className, methodName);
8888

0 commit comments

Comments
 (0)