Skip to content

Commit ac6b9ca

Browse files
Ranjan-DasguptaRanjan-Dasgupta
authored andcommitted
Add/update for MQTTv5 release
Signed-off-by: Ranjan-Dasgupta <Ranjan.Dasgupta@us.ibm.com>
1 parent 7ed9556 commit ac6b9ca

12 files changed

Lines changed: 124 additions & 57 deletions

File tree

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/IMqttToken.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.eclipse.paho.mqttv5.common.MqttException;
2020
import org.eclipse.paho.mqttv5.common.MqttMessage;
21+
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
2122
import org.eclipse.paho.mqttv5.common.packet.MqttWireMessage;
2223

2324
/**
@@ -177,6 +178,11 @@ public interface IMqttToken {
177178
*/
178179
public MqttWireMessage getResponse();
179180

181+
/**
182+
* @return the response wire message properties
183+
*/
184+
public MqttProperties getResponseProperties();
185+
180186
/**
181187
* Returns the message associated with this token.
182188
* <p>Until the message has been delivered, the message being delivered will
@@ -187,6 +193,14 @@ public interface IMqttToken {
187193
*/
188194
public MqttMessage getMessage() throws MqttException;
189195

190-
public boolean isDeliveryToken();
196+
/**
197+
* @return the request wire message
198+
*/
199+
public MqttWireMessage getRequestMessage();
200+
201+
/**
202+
* @return the request wire message properties
203+
*/
204+
public MqttProperties getRequestProperties();
191205

192206
}

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttAsyncClient.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ public class MqttAsyncClient implements MqttClientInterface, IMqttAsyncClient {
244244
private MqttSessionState mqttSession = new MqttSessionState();
245245

246246
// Variables that exist within the life of an MQTT connection.
247-
private MqttConnectionState mqttConnection = new MqttConnectionState();
247+
private MqttConnectionState mqttConnection;
248248

249249
private ScheduledExecutorService executorService;
250250
private MqttPingSender pingSender;
@@ -568,6 +568,8 @@ public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence
568568
clientId = "";
569569
}
570570

571+
mqttConnection = new MqttConnectionState(clientId);
572+
571573
NetworkModuleService.validateURI(serverURI);
572574

573575
this.serverURI = serverURI;
@@ -758,7 +760,7 @@ public IMqttToken connect(MqttConnectionOptions options, Object userContext, Mqt
758760
// succeeds
759761
MqttToken userToken = new MqttToken(getClientId());
760762
ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options,
761-
userToken, userContext, callback, reconnecting, mqttSession, mqttConnection);
763+
userToken, userContext, callback, reconnecting, mqttSession, this.mqttConnection);
762764
userToken.setActionCallback(connectActionListener);
763765
userToken.setUserContext(this);
764766

@@ -1163,7 +1165,7 @@ private IMqttToken subscribeBase(MqttSubscription[] subscriptions, Object userCo
11631165
// TODO - Build up MQTT Subscriptions properly here
11641166

11651167
MqttSubscribe register = new MqttSubscribe(subscriptions, subscriptionProperties);
1166-
1168+
token.setRequestMessage(register);
11671169
comms.sendNoWait(register, token);
11681170
// @TRACE 109=<
11691171
log.fine(CLASS_NAME, methodName, "109");
@@ -1398,6 +1400,7 @@ public IMqttToken unsubscribe(String[] topicFilters, Object userContext, MqttAct
13981400
token.internalTok.setTopics(topicFilters);
13991401

14001402
MqttUnsubscribe unregister = new MqttUnsubscribe(topicFilters, unsubscribeProperties);
1403+
token.setRequestMessage(unregister);
14011404

14021405
comms.sendNoWait(unregister, token);
14031406
// @TRACE 110=<
@@ -1515,13 +1518,15 @@ public IMqttToken publish(String topic, MqttMessage message, Object userContext,
15151518
// Checks if a topic is valid when publishing a message.
15161519
MqttTopicValidator.validate(topic, false/* wildcards NOT allowed */, true);
15171520

1518-
MqttToken token = new MqttToken(getClientId(), true);
1521+
MqttToken token = new MqttToken(getClientId());
1522+
token.internalTok.setDeliveryToken(true);
15191523
token.setActionCallback(callback);
15201524
token.setUserContext(userContext);
15211525
token.setMessage(message);
15221526
token.internalTok.setTopics(new String[] { topic });
15231527

15241528
MqttPublish pubMsg = new MqttPublish(topic, message, message.getProperties());
1529+
token.setRequestMessage(pubMsg);
15251530
comms.sendNoWait(pubMsg, token);
15261531

15271532
// @TRACE 112=<

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttToken.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,22 +33,21 @@
3333
*/
3434

3535
public class MqttToken implements IMqttToken {
36-
private boolean deliveryToken = false;
36+
private MqttAsyncClient client = null;
3737
/**
3838
* A reference to the the class that provides most of the implementation of the
3939
* MqttToken. MQTT application programs must not use the internal class.
4040
*/
4141
public Token internalTok = null;
4242

43-
public MqttToken() {
44-
}
43+
public MqttToken() {
44+
}
4545

46-
public MqttToken(String logContext) {
47-
internalTok = new Token(logContext);
48-
}
46+
public MqttToken(MqttAsyncClient client) {
47+
this.client = client;
48+
}
4949

50-
public MqttToken(String logContext, boolean dToken) {
51-
this.deliveryToken = dToken;
50+
public MqttToken(String logContext) {
5251
internalTok = new Token(logContext);
5352
}
5453

@@ -109,10 +108,22 @@ public MqttWireMessage getResponse() {
109108
return internalTok.getResponse();
110109
}
111110

112-
public MqttProperties getMessageProperties() {
111+
public MqttProperties getResponseProperties() {
113112
return (internalTok.getWireMessage() == null) ? null : internalTok.getWireMessage().getProperties();
114113
}
115114

115+
public MqttWireMessage getRequestMessage() {
116+
return internalTok.getRequestMessage();
117+
}
118+
119+
public void setRequestMessage(MqttWireMessage request) {
120+
internalTok.setRequestMessage(request);
121+
}
122+
123+
public MqttProperties getRequestProperties() {
124+
return (internalTok.getRequestMessage() == null) ? null : internalTok.getRequestMessage().getProperties();
125+
}
126+
116127
@Override
117128
public int[] getReasonCodes() {
118129
return internalTok.getReasonCodes();
@@ -134,8 +145,4 @@ protected void setMessage(MqttMessage msg) {
134145
internalTok.setMessage(msg);
135146
}
136147

137-
public boolean isDeliveryToken() {
138-
return this.deliveryToken;
139-
}
140-
141148
}

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttTopic.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ public MqttToken publish(byte[] payload, int qos, boolean retained)
9090
* if an error occurs persisting the message
9191
*/
9292
public MqttToken publish(MqttMessage message) throws MqttException, MqttPersistenceException {
93-
MqttToken token = new MqttToken(comms.getClient().getClientId(), true);
93+
MqttToken token = new MqttToken(comms.getClient().getClientId());
94+
token.internalTok.setDeliveryToken(true);
9495
token.setMessage(message);
9596
comms.sendNoWait(createPublish(message, new MqttProperties()), token);
9697
token.internalTok.waitUntilSent();

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientComms.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,10 @@ void internalSend(MqttWireMessage message, MqttToken token) throws MqttException
175175
*/
176176
public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttException {
177177
final String methodName = "sendNoWait";
178+
178179
if (isConnected() || (!isConnected() && message instanceof MqttConnect)
179180
|| (isDisconnecting() && message instanceof MqttDisconnect)) {
181+
180182
if (disconnectedMessageBuffer != null && disconnectedMessageBuffer.getMessageCount() != 0) {
181183
// @TRACE 507=Client Connected, Offline Buffer available, but not empty. Adding
182184
// message to buffer. message={0}
@@ -191,6 +193,7 @@ public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttExce
191193
this.clientState.persistBufferedMessage(message);
192194
}
193195
disconnectedMessageBuffer.putMessage(message, token);
196+
194197
} else {
195198

196199
if (message instanceof MqttPublish) {

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientState.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -512,8 +512,8 @@ public void send(MqttWireMessage message, MqttToken token) throws MqttException
512512
}
513513
// Set Topic Alias if required
514514
if (message instanceof MqttPublish && ((MqttPublish) message).getTopicName() != null
515-
&& this.mqttConnection.getOutgoingTopicAliasMaximum() > 0) {
516-
String topic = ((MqttPublish) message).getTopicName();
515+
&& this.mqttConnection != null && this.mqttConnection.getOutgoingTopicAliasMaximum() > 0) {
516+
String topic = ((MqttPublish) message).getTopicName();
517517
if (outgoingTopicAliases.containsKey(topic)) {
518518
// Existing Topic Alias, Assign it and remove the topic string
519519
((MqttPublish) message).getProperties().setTopicAlias(outgoingTopicAliases.get(topic));
@@ -1379,7 +1379,7 @@ public Vector<MqttToken> resolveOldTokens(MqttException reason) {
13791379
tok.internalTok.setException(shutReason);
13801380
}
13811381
}
1382-
if (!(tok.isDeliveryToken())) {
1382+
if (!(tok.internalTok.isDeliveryToken())) {
13831383
// If not a delivery token it is not valid on
13841384
// restart so remove
13851385
tokenStore.removeToken(tok.internalTok.getKey());

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/CommsCallback.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ private void handleActionComplete(MqttToken token) throws MqttException {
272272
if (!token.internalTok.isNotified()) {
273273
// If a callback is registered and delivery has finished
274274
// call delivery complete callback.
275-
if (mqttCallback != null && token.isDeliveryToken() == true && token.isComplete()) {
275+
if (mqttCallback != null && token.internalTok.isDeliveryToken() == true && token.isComplete()) {
276276
try {
277277
mqttCallback.deliveryComplete(token);
278278
} catch (Throwable ex) {
@@ -287,7 +287,7 @@ private void handleActionComplete(MqttToken token) throws MqttException {
287287

288288
// Set notified so we don't tell the user again about this action.
289289
if (token.isComplete()) {
290-
if (token.isDeliveryToken() == true || token.getActionCallback() instanceof MqttActionListener) {
290+
if (token.internalTok.isDeliveryToken() == true || token.getActionCallback() instanceof MqttActionListener) {
291291
token.internalTok.setNotified(true);
292292
}
293293
}

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/CommsTokenStore.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ protected MqttToken restoreToken(MqttPublish message) {
114114
//@TRACE 302=existing key={0} message={1} token={2}
115115
log.fine(CLASS_NAME,methodName, "302",new Object[]{key, message,token});
116116
} else {
117-
token = new MqttToken(logContext, true);
117+
token = new MqttToken(logContext);
118+
token.internalTok.setDeliveryToken(true);
118119
token.internalTok.setKey(key);
119120
this.tokens.put(key, token);
120121
//@TRACE 303=creating new token key={0} message={1} token={2}
@@ -188,7 +189,7 @@ public MqttToken[] getOutstandingDelTokens() {
188189
while(enumeration.hasMoreElements()) {
189190
token = (MqttToken)enumeration.nextElement();
190191
if (token != null
191-
&& token.isDeliveryToken() == true
192+
&& token.internalTok.isDeliveryToken() == true
192193
&& !token.internalTok.isNotified()) {
193194

194195
list.addElement(token);

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ConnectActionListener.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -114,30 +114,30 @@ public ConnectActionListener(MqttAsyncClient client, MqttClientPersistence persi
114114
public void onSuccess(IMqttToken token) {
115115
// Set properties imposed on us by the Server
116116
MqttToken myToken = (MqttToken) token;
117-
if (myToken.getMessageProperties() != null) {
118-
mqttConnection.setReceiveMaximum(myToken.getMessageProperties().getReceiveMaximum());
119-
mqttConnection.setMaximumQoS(myToken.getMessageProperties().getMaximumQoS());
120-
mqttConnection.setRetainAvailable(myToken.getMessageProperties().isRetainAvailable());
121-
mqttConnection.setOutgoingMaximumPacketSize(myToken.getMessageProperties().getMaximumPacketSize());
117+
if (myToken.getResponseProperties() != null) {
118+
mqttConnection.setReceiveMaximum(myToken.getResponseProperties().getReceiveMaximum());
119+
mqttConnection.setMaximumQoS(myToken.getResponseProperties().getMaximumQoS());
120+
mqttConnection.setRetainAvailable(myToken.getResponseProperties().isRetainAvailable());
121+
mqttConnection.setOutgoingMaximumPacketSize(myToken.getResponseProperties().getMaximumPacketSize());
122122
mqttConnection.setIncomingMaximumPacketSize(options.getMaximumPacketSize());
123-
mqttConnection.setOutgoingTopicAliasMaximum(myToken.getMessageProperties().getTopicAliasMaximum());
123+
mqttConnection.setOutgoingTopicAliasMaximum(myToken.getResponseProperties().getTopicAliasMaximum());
124124
mqttConnection
125-
.setWildcardSubscriptionsAvailable(myToken.getMessageProperties().isWildcardSubscriptionsAvailable());
125+
.setWildcardSubscriptionsAvailable(myToken.getResponseProperties().isWildcardSubscriptionsAvailable());
126126
mqttConnection.setSubscriptionIdentifiersAvailable(
127-
myToken.getMessageProperties().isSubscriptionIdentifiersAvailable());
128-
mqttConnection.setSharedSubscriptionsAvailable(myToken.getMessageProperties().isSharedSubscriptionAvailable());
127+
myToken.getResponseProperties().isSubscriptionIdentifiersAvailable());
128+
mqttConnection.setSharedSubscriptionsAvailable(myToken.getResponseProperties().isSharedSubscriptionAvailable());
129129

130130
// If provided, set the server keep alive value.
131-
if(myToken.getMessageProperties().getServerKeepAlive() != null) {
132-
mqttConnection.setKeepAliveSeconds(myToken.getMessageProperties().getServerKeepAlive());
131+
if(myToken.getResponseProperties().getServerKeepAlive() != null) {
132+
mqttConnection.setKeepAliveSeconds(myToken.getResponseProperties().getServerKeepAlive());
133133
}
134134

135135
// If we are assigning the client ID post connect, then we need to re-initialise
136136
// our persistence layer.
137-
if (myToken.getMessageProperties().getAssignedClientIdentifier() != null) {
138-
mqttSession.setClientId(myToken.getMessageProperties().getAssignedClientIdentifier());
137+
if (myToken.getResponseProperties().getAssignedClientIdentifier() != null) {
138+
mqttSession.setClientId(myToken.getResponseProperties().getAssignedClientIdentifier());
139139
try {
140-
persistence.open(myToken.getMessageProperties().getAssignedClientIdentifier());
140+
persistence.open(myToken.getResponseProperties().getAssignedClientIdentifier());
141141

142142
if (options.isCleanStart()) {
143143
persistence.clear();

0 commit comments

Comments
 (0)