Skip to content

Commit bf2b33e

Browse files
committed
Issue #540 - Differentiating Connection State and Session State
Signed-off-by: James Sutton <james.sutton@uk.ibm.com>
1 parent efdba4a commit bf2b33e

7 files changed

Lines changed: 111 additions & 82 deletions

File tree

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/IMqttClient.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.eclipse.paho.mqttv5.common.MqttPersistenceException;
2525
import org.eclipse.paho.mqttv5.common.MqttSecurityException;
2626
import org.eclipse.paho.mqttv5.common.MqttSubscription;
27-
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
2827

2928
/**
3029
* Enables an application to communicate with an MQTT server using blocking methods.

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttAsyncClient.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@
3939
import org.eclipse.paho.mqttv5.client.internal.ClientComms;
4040
import org.eclipse.paho.mqttv5.client.internal.ConnectActionListener;
4141
import org.eclipse.paho.mqttv5.client.internal.DisconnectedMessageBuffer;
42-
import org.eclipse.paho.mqttv5.client.internal.MqttSession;
42+
import org.eclipse.paho.mqttv5.client.internal.MqttConnectionState;
43+
import org.eclipse.paho.mqttv5.client.internal.MqttSessionState;
4344
import org.eclipse.paho.mqttv5.client.internal.NetworkModule;
4445
import org.eclipse.paho.mqttv5.client.internal.SSLNetworkModule;
4546
import org.eclipse.paho.mqttv5.client.internal.TCPNetworkModule;
@@ -252,7 +253,8 @@ public class MqttAsyncClient implements MqttClientInterface, IMqttAsyncClient {
252253
// second
253254
private boolean reconnecting = false;
254255
private static Object clientLock = new Object(); // Simple lock
255-
private MqttSession mqttSession = new MqttSession();
256+
private MqttSessionState mqttSession = new MqttSessionState(); // Variables that exist within the life of an MQTT session
257+
private MqttConnectionState mqttConnection = new MqttConnectionState(); // Variables that exist within the life of an MQTT connection.
256258
private ScheduledExecutorService executorService;
257259
private MqttPingSender pingSender;
258260

@@ -606,7 +608,7 @@ public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence
606608
log.fine(CLASS_NAME, methodName, "101", new Object[] { clientId, serverURI, persistence });
607609

608610
this.persistence.open(clientId);
609-
this.comms = new ClientComms(this, this.persistence, this.pingSender, this.executorService, this.mqttSession);
611+
this.comms = new ClientComms(this, this.persistence, this.pingSender, this.executorService, this.mqttSession, this.mqttConnection);
610612
this.persistence.close();
611613
this.topics = new Hashtable<String, MqttTopic>();
612614

@@ -894,11 +896,11 @@ public IMqttToken connect(MqttConnectionOptions options, Object userContext, Mqt
894896
// succeeds
895897
MqttToken userToken = new MqttToken(getClientId());
896898
ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options,
897-
userToken, userContext, callback, reconnecting, mqttSession);
899+
userToken, userContext, callback, reconnecting, mqttSession, mqttConnection);
898900
userToken.setActionCallback(connectActionListener);
899901
userToken.setUserContext(this);
900902

901-
this.mqttSession.setSendReasonMessages(this.connOpts.isSendReasonMessages());
903+
this.mqttConnection.setSendReasonMessages(this.connOpts.isSendReasonMessages());
902904

903905
// If we are using the MqttCallbackExtended, set it on the
904906
// connectActionListener
@@ -907,14 +909,11 @@ public IMqttToken connect(MqttConnectionOptions options, Object userContext, Mqt
907909
}
908910

909911
if (this.connOpts.isCleanStart()) {
910-
this.mqttSession.clearSession();
912+
this.mqttSession.clearSessionState();
911913
}
914+
this.mqttConnection.clearConnectionState();
912915

913-
if (this.connOpts.isCleanStart()) {
914-
this.mqttSession.clearSession();
915-
}
916-
917-
this.mqttSession.setIncomingTopicAliasMax(this.connOpts.getTopicAliasMaximum());
916+
this.mqttConnection.setIncomingTopicAliasMax(this.connOpts.getTopicAliasMaximum());
918917

919918
comms.setNetworkModuleIndex(0);
920919
connectActionListener.connect();
@@ -1373,7 +1372,7 @@ public IMqttToken subscribe(MqttSubscription[] subscriptions, Object userContext
13731372
// Check that we are not already using this ID, else throw Illegal Argument
13741373
// Exception
13751374
if (this.comms.doesSubscriptionIdentifierExist(subId)) {
1376-
throw new IllegalArgumentException("The Subscription Identifier " + subId + " already exists.");
1375+
throw new IllegalArgumentException(String.format("The Subscription Identifier %s already exists.", subId));
13771376
}
13781377

13791378
} else {

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientComms.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public class ClientComms {
8686
private boolean resting = false;
8787
private DisconnectedMessageBuffer disconnectedMessageBuffer;
8888
private ExecutorService executorService;
89-
private MqttSession mqttSession;
89+
private MqttConnectionState mqttConnection;
9090

9191
/**
9292
* Creates a new ClientComms object, using the specified module to handle the
@@ -101,23 +101,23 @@ public class ClientComms {
101101
* @param executorService
102102
* the {@link ExecutorService}
103103
* @param mqttSession
104-
* the {@link MqttSession}
104+
* the {@link MqttConnectionState}
105105
* @throws MqttException
106106
* if an exception occurs whilst communicating with the server
107107
*/
108108
public ClientComms(MqttClientInterface client, MqttClientPersistence persistence, MqttPingSender pingSender,
109-
ExecutorService executorService, MqttSession mqttSession) throws MqttException {
109+
ExecutorService executorService, MqttSessionState mqttSession, MqttConnectionState mqttConnection) throws MqttException {
110110
this.conState = DISCONNECTED;
111111
this.client = client;
112112
this.persistence = persistence;
113113
this.pingSender = pingSender;
114114
this.pingSender.init(this);
115115
this.executorService = executorService;
116-
this.mqttSession = mqttSession;
116+
this.mqttConnection = mqttConnection;
117117

118118
this.tokenStore = new CommsTokenStore(getClient().getClientId());
119119
this.callback = new CommsCallback(this);
120-
this.clientState = new ClientState(persistence, tokenStore, this.callback, this, pingSender, this.mqttSession);
120+
this.clientState = new ClientState(persistence, tokenStore, this.callback, this, pingSender, this.mqttConnection);
121121

122122
callback.setClientState(clientState);
123123
log.setResourceName(getClient().getClientId());
@@ -211,17 +211,17 @@ public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttExce
211211

212212
if (message instanceof MqttPublish) {
213213
// Override the QoS if the server has set a maximum
214-
if (this.mqttSession.getMaximumQoS() != null
215-
&& ((MqttPublish) message).getMessage().getQos() > this.mqttSession.getMaximumQoS()) {
214+
if (this.mqttConnection.getMaximumQoS() != null
215+
&& ((MqttPublish) message).getMessage().getQos() > this.mqttConnection.getMaximumQoS()) {
216216
MqttMessage mqttMessage = ((MqttPublish) message).getMessage();
217-
mqttMessage.setQos(this.mqttSession.getMaximumQoS());
217+
mqttMessage.setQos(this.mqttConnection.getMaximumQoS());
218218
((MqttPublish) message).setMessage(mqttMessage);
219219
}
220220

221221
// Override the Retain flag if the server has disabled it
222-
if (this.mqttSession.isRetainAvailable() != null
222+
if (this.mqttConnection.isRetainAvailable() != null
223223
&& ((MqttPublish) message).getMessage().isRetained()
224-
&& (this.mqttSession.isRetainAvailable() == false)) {
224+
&& (this.mqttConnection.isRetainAvailable() == false)) {
225225
MqttMessage mqttMessage = ((MqttPublish) message).getMessage();
226226
mqttMessage.setRetained(false);
227227
((MqttPublish) message).setMessage(mqttMessage);
@@ -945,7 +945,7 @@ class ReconnectDisconnectedBufferCallback implements IDisconnectedBufferCallback
945945

946946
public void publishBufferedMessage(BufferedMessage bufferedMessage) throws MqttException {
947947
if (isConnected()) {
948-
while (clientState.getActualInFlight() >= (mqttSession.getReceiveMaximum() - 1)) {
948+
while (clientState.getActualInFlight() >= (mqttConnection.getReceiveMaximum() - 1)) {
949949
// We need to Yield to the other threads to allow the in flight messages to
950950
// clear
951951
Thread.yield();

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientState.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -148,19 +148,19 @@ public class ClientState implements MqttState {
148148
// Topic Alias Maps
149149
private Hashtable<String, Integer> outgoingTopicAliases;
150150
private Hashtable<Integer, String> incomingTopicAliases;
151-
private int outgoingTopicAliasCount = 1;
152151

153-
private MqttSession mqttSession;
152+
private MqttConnectionState mqttConnection;
154153

155154
protected ClientState(MqttClientPersistence persistence, CommsTokenStore tokenStore, CommsCallback callback,
156-
ClientComms clientComms, MqttPingSender pingSender, MqttSession mqttSession) throws MqttException {
155+
ClientComms clientComms, MqttPingSender pingSender, MqttConnectionState mqttConnection)
156+
throws MqttException {
157157

158158
log.setResourceName(clientComms.getClient().getClientId());
159159
log.finer(CLASS_NAME, "<Init>", "");
160160

161161
inUseMsgIds = new ConcurrentHashMap<>();
162162
pendingFlows = new Vector<MqttWireMessage>();
163-
pendingMessages = new Vector<MqttWireMessage>(mqttSession.getReceiveMaximum());
163+
pendingMessages = new Vector<MqttWireMessage>(mqttConnection.getReceiveMaximum());
164164
outboundQoS2 = new ConcurrentHashMap<>();
165165
outboundQoS1 = new ConcurrentHashMap<>();
166166
outboundQoS0 = new ConcurrentHashMap<>();
@@ -176,7 +176,7 @@ protected ClientState(MqttClientPersistence persistence, CommsTokenStore tokenSt
176176
this.tokenStore = tokenStore;
177177
this.clientComms = clientComms;
178178
this.pingSender = pingSender;
179-
this.mqttSession = mqttSession;
179+
this.mqttConnection = mqttConnection;
180180

181181
restoreState();
182182
}
@@ -460,7 +460,7 @@ protected void restoreState() throws MqttException {
460460

461461
private void restoreInflightMessages() {
462462
final String methodName = "restoreInflightMessages";
463-
pendingMessages = new Vector<MqttWireMessage>(this.mqttSession.getReceiveMaximum());
463+
pendingMessages = new Vector<MqttWireMessage>(this.mqttConnection.getReceiveMaximum());
464464
pendingFlows = new Vector<MqttWireMessage>();
465465

466466
Enumeration<Integer> keys = outboundQoS2.keys();
@@ -519,18 +519,18 @@ public void send(MqttWireMessage message, MqttToken token) throws MqttException
519519
message.setMessageId(getNextMessageId());
520520
}
521521
// Set Topic Alias if required
522-
if (message instanceof MqttPublish && this.mqttSession.getOutgoingTopicAliasMaximum() > 0) {
522+
if (message instanceof MqttPublish && this.mqttConnection.getOutgoingTopicAliasMaximum() > 0) {
523523
String topic = ((MqttPublish) message).getTopicName();
524524
if (outgoingTopicAliases.containsKey(topic)) {
525525
// Existing Topic Alias, Assign it and remove the topic string
526526
((MqttPublish) message).getProperties().setTopicAlias(outgoingTopicAliases.get(topic));
527527
((MqttPublish) message).setTopicName(null);
528528
} else {
529-
if (outgoingTopicAliasCount <= this.mqttSession.getOutgoingTopicAliasMaximum()) {
529+
int nextOutgoingTopicAlias = this.mqttConnection.getNextOutgoingTopicAlias();
530+
if (nextOutgoingTopicAlias <= this.mqttConnection.getOutgoingTopicAliasMaximum()) {
530531
// Create a new Topic Alias and increment the counter
531-
((MqttPublish) message).getProperties().setTopicAlias(outgoingTopicAliasCount);
532-
outgoingTopicAliases.put(((MqttPublish) message).getTopicName(), outgoingTopicAliasCount);
533-
outgoingTopicAliasCount++;
532+
((MqttPublish) message).getProperties().setTopicAlias(nextOutgoingTopicAlias);
533+
outgoingTopicAliases.put(((MqttPublish) message).getTopicName(), nextOutgoingTopicAlias);
534534
}
535535
}
536536
}
@@ -544,7 +544,7 @@ public void send(MqttWireMessage message, MqttToken token) throws MqttException
544544

545545
if (message instanceof MqttPublish) {
546546
synchronized (queueLock) {
547-
if (actualInFlight >= this.mqttSession.getReceiveMaximum()) {
547+
if (actualInFlight >= this.mqttConnection.getReceiveMaximum()) {
548548
// @TRACE 613= sending {0} msgs at max inflight window
549549
log.fine(CLASS_NAME, methodName, "613", new Object[] { Integer.valueOf(actualInFlight) });
550550

@@ -830,7 +830,7 @@ protected MqttWireMessage get() throws MqttException {
830830
// freed.
831831
// In both cases queueLock will be notified.
832832
if ((pendingMessages.isEmpty() && pendingFlows.isEmpty())
833-
|| (pendingFlows.isEmpty() && actualInFlight >= this.mqttSession.getReceiveMaximum())) {
833+
|| (pendingFlows.isEmpty() && actualInFlight >= this.mqttConnection.getReceiveMaximum())) {
834834
try {
835835
// @TRACE 644=wait for new work or for space in the inflight window
836836
log.fine(CLASS_NAME, methodName, "644");
@@ -876,7 +876,7 @@ protected MqttWireMessage get() throws MqttException {
876876

877877
// If the inflight window is full then messages are not
878878
// processed until the inflight window has space.
879-
if (actualInFlight < this.mqttSession.getReceiveMaximum()) {
879+
if (actualInFlight < this.mqttConnection.getReceiveMaximum()) {
880880
// The in flight window is not full so process the
881881
// first message in the queue
882882
result = (MqttWireMessage) pendingMessages.elementAt(0);
@@ -1127,7 +1127,7 @@ protected void handleOrphanedAcks(MqttAck ack) throws MqttException {
11271127
} else if (ack instanceof MqttPubRec) {
11281128
// MqttPubRec - Send an MqttPubRel with the appropriate Reason Code
11291129
MqttProperties pubRelProperties = new MqttProperties();
1130-
if (this.mqttSession.isSendReasonMessages()) {
1130+
if (this.mqttConnection.isSendReasonMessages()) {
11311131
String reasonString = String.format("Message identifier [%d] was not found. Discontinuing QoS 2 flow.",
11321132
ack.getMessageId());
11331133
pubRelProperties.setReasonString(reasonString);
@@ -1192,10 +1192,11 @@ protected void notifyReceivedMsg(MqttWireMessage message) throws MqttException {
11921192
int incomingTopicAlias = send.getProperties().getTopicAlias();
11931193

11941194
// Are incoming Topic Aliases enabled / is it a valid Alias?
1195-
if (incomingTopicAlias > this.mqttSession.getIncomingTopicAliasMax() || incomingTopicAlias == 0) {
1195+
if (incomingTopicAlias > this.mqttConnection.getIncomingTopicAliasMax()
1196+
|| incomingTopicAlias == 0) {
11961197
// @TRACE 653=Invalid Topic Alias: topicAliasMax={0}, publishTopicAlias={1}
11971198
log.severe(CLASS_NAME, methodName, "653",
1198-
new Object[] { Integer.valueOf(this.mqttSession.getIncomingTopicAliasMax()),
1199+
new Object[] { Integer.valueOf(this.mqttConnection.getIncomingTopicAliasMax()),
11991200
Integer.valueOf(incomingTopicAlias) });
12001201
if (callback != null) {
12011202
callback.mqttErrorOccurred(new MqttException(MqttException.REASON_CODE_INVALID_TOPIC_ALAS));
@@ -1613,7 +1614,7 @@ public Properties getDebug() {
16131614
props.put("In use msgids", inUseMsgIds);
16141615
props.put("pendingMessages", pendingMessages);
16151616
props.put("pendingFlows", pendingFlows);
1616-
props.put("serverReceiveMaximum", Integer.valueOf(this.mqttSession.getReceiveMaximum()));
1617+
props.put("serverReceiveMaximum", Integer.valueOf(this.mqttConnection.getReceiveMaximum()));
16171618
props.put("nextMsgID", Integer.valueOf(nextMsgId));
16181619
props.put("actualInFlight", Integer.valueOf(actualInFlight));
16191620
props.put("inFlightPubRels", Integer.valueOf(inFlightPubRels));

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ConnectActionListener.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ public class ConnectActionListener implements MqttActionListener {
6363
private Object userContext;
6464
private MqttActionListener userCallback;
6565
private MqttCallback mqttCallback;
66-
private MqttSession mqttSession;
66+
private MqttSessionState mqttSession;
67+
private MqttConnectionState mqttConnection;
6768
private boolean reconnect;
6869

6970
/**
@@ -82,13 +83,13 @@ public class ConnectActionListener implements MqttActionListener {
8283
* @param userCallback
8384
* the {@link MqttActionListener} as the callback for the user
8485
* @param mqttSession
85-
* the {@link MqttSession}
86+
* the {@link MqttConnectionState}
8687
* @param reconnect
8788
* If true, this is a reconnect attempt
8889
*/
8990
public ConnectActionListener(MqttAsyncClient client, MqttClientPersistence persistence, ClientComms comms,
9091
MqttConnectionOptions options, MqttToken userToken, Object userContext, MqttActionListener userCallback,
91-
boolean reconnect, MqttSession mqttSession) {
92+
boolean reconnect, MqttSessionState mqttSession, MqttConnectionState mqttConnection) {
9293
this.persistence = persistence;
9394
this.client = client;
9495
this.comms = comms;
@@ -110,16 +111,16 @@ public ConnectActionListener(MqttAsyncClient client, MqttClientPersistence persi
110111
public void onSuccess(IMqttToken token) {
111112
// Set properties imposed on us by the Server
112113
MqttToken myToken = (MqttToken) token;
113-
mqttSession.setReceiveMaximum(myToken.getMessageProperties().getReceiveMaximum());
114-
mqttSession.setMaximumQoS(myToken.getMessageProperties().getMaximumQoS());
115-
mqttSession.setRetainAvailable(myToken.getMessageProperties().isRetainAvailable());
116-
mqttSession.setMaximumPacketSize(myToken.getMessageProperties().getMaximumPacketSize());
117-
mqttSession.setOutgoingTopicAliasMaximum(myToken.getMessageProperties().getTopicAliasMaximum());
118-
mqttSession
114+
mqttConnection.setReceiveMaximum(myToken.getMessageProperties().getReceiveMaximum());
115+
mqttConnection.setMaximumQoS(myToken.getMessageProperties().getMaximumQoS());
116+
mqttConnection.setRetainAvailable(myToken.getMessageProperties().isRetainAvailable());
117+
mqttConnection.setMaximumPacketSize(myToken.getMessageProperties().getMaximumPacketSize());
118+
mqttConnection.setOutgoingTopicAliasMaximum(myToken.getMessageProperties().getTopicAliasMaximum());
119+
mqttConnection
119120
.setWildcardSubscriptionsAvailable(myToken.getMessageProperties().isWildcardSubscriptionsAvailable());
120-
mqttSession.setSubscriptionIdentifiersAvailable(
121+
mqttConnection.setSubscriptionIdentifiersAvailable(
121122
myToken.getMessageProperties().isSubscriptionIdentifiersAvailable());
122-
mqttSession.setSharedSubscriptionsAvailable(myToken.getMessageProperties().isSharedSubscriptionAvailable());
123+
mqttConnection.setSharedSubscriptionsAvailable(myToken.getMessageProperties().isSharedSubscriptionAvailable());
123124

124125
// If we are assigning the client ID post connect, then we need to re-initialise
125126
// our persistence layer.

0 commit comments

Comments
 (0)