Skip to content

Commit 7043ecd

Browse files
committed
Issue #553 - MQTT-3.1.2-21 - Using server keep alive if provided.
Signed-off-by: James Sutton <james.sutton@uk.ibm.com>
1 parent dbb3e9f commit 7043ecd

5 files changed

Lines changed: 27 additions & 32 deletions

File tree

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientComms.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -340,8 +340,7 @@ public void connect(MqttConnectionOptions options, MqttToken token) throws MqttE
340340
* conOptions.getUserName(), conOptions.getPassword(),
341341
* conOptions.getWillMessage(), conOptions.getWillDestination()
342342
*/
343-
344-
this.clientState.setKeepAliveSecs(conOptions.getKeepAliveInterval());
343+
this.mqttConnection.setKeepAliveSeconds(conOptions.getKeepAliveInterval());
345344
this.clientState.setCleanStart(conOptions.isCleanStart());
346345

347346
tokenStore.open();
@@ -728,7 +727,7 @@ public MqttClientInterface getClient() {
728727
}
729728

730729
public long getKeepAlive() {
731-
return this.clientState.getKeepAlive();
730+
return this.mqttConnection.getKeepAlive();
732731
}
733732

734733
public MqttState getClientState() {

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientState.java

Lines changed: 9 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public class ClientState implements MqttState {
118118
private CommsTokenStore tokenStore;
119119
private ClientComms clientComms = null;
120120
private CommsCallback callback = null;
121-
private long keepAlive;
121+
//private long keepAlive;
122122
private boolean cleanStart;
123123
private MqttClientPersistence persistence;
124124

@@ -181,14 +181,6 @@ protected ClientState(MqttClientPersistence persistence, CommsTokenStore tokenSt
181181
restoreState();
182182
}
183183

184-
protected void setKeepAliveSecs(long keepAliveSecs) {
185-
this.keepAlive = keepAliveSecs * 1000;
186-
}
187-
188-
protected long getKeepAlive() {
189-
return this.keepAlive;
190-
}
191-
192184
protected void setCleanStart(boolean cleanStart) {
193185
this.cleanStart = cleanStart;
194186
}
@@ -712,9 +704,10 @@ public MqttToken checkForActivity(MqttActionListener pingCallback) throws MqttEx
712704
}
713705

714706
MqttToken token = null;
715-
long nextPingTime = getKeepAlive();
707+
long nextPingTime = this.mqttConnection.getKeepAlive();
708+
long keepAlive = this.mqttConnection.getKeepAlive();
716709

717-
if (connected && this.keepAlive > 0) {
710+
if (connected && this.mqttConnection.getKeepAlive() > 0) {
718711
long time = System.currentTimeMillis();
719712
// Reduce schedule frequency since System.currentTimeMillis is no accurate, add
720713
// a buffer
@@ -733,7 +726,7 @@ public MqttToken checkForActivity(MqttActionListener pingCallback) throws MqttEx
733726
// @TRACE 619=Timed out as no activity, keepAlive={0} lastOutboundActivity={1}
734727
// lastInboundActivity={2} time={3} lastPing={4}
735728
log.severe(CLASS_NAME, methodName, "619",
736-
new Object[] { Long.valueOf(this.keepAlive), Long.valueOf(lastOutboundActivity),
729+
new Object[] { Long.valueOf(keepAlive), Long.valueOf(lastOutboundActivity),
737730
Long.valueOf(lastInboundActivity), Long.valueOf(time), Long.valueOf(lastPing) });
738731

739732
// A ping has already been sent. At this point, assume that the
@@ -748,7 +741,7 @@ public MqttToken checkForActivity(MqttActionListener pingCallback) throws MqttEx
748741
// I am probably blocked on a write operations as I should have been able to
749742
// write at least a ping message
750743
log.severe(CLASS_NAME, methodName, "642",
751-
new Object[] { Long.valueOf(this.keepAlive), Long.valueOf(lastOutboundActivity),
744+
new Object[] { Long.valueOf(keepAlive), Long.valueOf(lastOutboundActivity),
752745
Long.valueOf(lastInboundActivity), Long.valueOf(time), Long.valueOf(lastPing) });
753746

754747
// A ping has not been sent but I am not progressing on the current write
@@ -776,7 +769,7 @@ public MqttToken checkForActivity(MqttActionListener pingCallback) throws MqttEx
776769

777770
// @TRACE 620=ping needed. keepAlive={0} lastOutboundActivity={1}
778771
// lastInboundActivity={2}
779-
log.fine(CLASS_NAME, methodName, "620", new Object[] { Long.valueOf(this.keepAlive),
772+
log.fine(CLASS_NAME, methodName, "620", new Object[] { Long.valueOf(keepAlive),
780773
Long.valueOf(lastOutboundActivity), Long.valueOf(lastInboundActivity) });
781774

782775
// pingOutstanding++; // it will be set after the ping has been written on the
@@ -790,13 +783,13 @@ public MqttToken checkForActivity(MqttActionListener pingCallback) throws MqttEx
790783
tokenStore.saveToken(token, pingCommand);
791784
pendingFlows.insertElementAt(pingCommand, 0);
792785

793-
nextPingTime = getKeepAlive();
786+
nextPingTime = this.mqttConnection.getKeepAlive();
794787

795788
// Wake sender thread since it may be in wait state (in ClientState.get())
796789
notifyQueueLock();
797790
} else {
798791
log.fine(CLASS_NAME, methodName, "634", null);
799-
nextPingTime = Math.max(1, getKeepAlive() - (time - lastOutboundActivity));
792+
nextPingTime = Math.max(1, this.mqttConnection.getKeepAlive() - (time - lastOutboundActivity));
800793
}
801794
}
802795
// @TRACE 624=Schedule next ping at {0}
@@ -895,17 +888,6 @@ protected MqttWireMessage get() throws MqttException {
895888
return result;
896889
}
897890

898-
/*
899-
* (non-Javadoc)
900-
*
901-
* @see
902-
* org.eclipse.paho.mqttv5.client.internal.MqttState#setKeepAliveInterval(long)
903-
*/
904-
@Override
905-
public void setKeepAliveInterval(long interval) {
906-
this.keepAlive = interval;
907-
}
908-
909891
/*
910892
* (non-Javadoc)
911893
*

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ConnectActionListener.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,11 @@ public void onSuccess(IMqttToken token) {
124124
mqttConnection.setSubscriptionIdentifiersAvailable(
125125
myToken.getMessageProperties().isSubscriptionIdentifiersAvailable());
126126
mqttConnection.setSharedSubscriptionsAvailable(myToken.getMessageProperties().isSharedSubscriptionAvailable());
127+
128+
// If provided, set the server keep alive value.
129+
if(myToken.getMessageProperties().getServerKeepAlive() != null) {
130+
mqttConnection.setKeepAliveSeconds(myToken.getMessageProperties().getServerKeepAlive());
131+
}
127132

128133
// If we are assigning the client ID post connect, then we need to re-initialise
129134
// our persistence layer.

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/MqttConnectionState.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class MqttConnectionState {
3636
private Boolean subscriptionIdentifiersAvailable = true;
3737
private Boolean sharedSubscriptionsAvailable = true;
3838
private boolean sendReasonMessages = false;
39+
private long keepAlive = 60;
3940

4041
// ******* Counters ******//
4142
private AtomicInteger nextOutgoingTopicAlias = new AtomicInteger(1);
@@ -138,4 +139,14 @@ public void setSendReasonMessages(boolean enableReasonMessages) {
138139
this.sendReasonMessages = enableReasonMessages;
139140
}
140141

142+
143+
public long getKeepAlive() {
144+
return keepAlive;
145+
}
146+
147+
148+
public void setKeepAliveSeconds(long keepAlive) {
149+
this.keepAlive = keepAlive * 1000;
150+
}
151+
141152
}

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/MqttState.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,6 @@ public interface MqttState {
6666
*/
6767
MqttToken checkForActivity(MqttActionListener pingCallback) throws MqttException;
6868

69-
void setKeepAliveInterval(long interval);
70-
7169
void notifySentBytes(int sentBytesCount);
7270

7371
void notifyReceivedBytes(int receivedBytesCount);

0 commit comments

Comments
 (0)