Skip to content

Commit 950f005

Browse files
author
Ranjan Dasgupta
authored
Merge pull request #746 from maxpagani/fix-nano-milli-comparisons
Fix #640 Use same time units in time computations
2 parents 3cf991a + 7612bbb commit 950f005

1 file changed

Lines changed: 24 additions & 17 deletions

File tree

  • org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/ClientState.java

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public class ClientState {
119119
private CommsTokenStore tokenStore;
120120
private ClientComms clientComms = null;
121121
private CommsCallback callback = null;
122-
private long keepAlive;
122+
private long keepAliveNanos; // nanoseconds time
123123
private boolean cleanSession;
124124
private MqttClientPersistence persistence;
125125

@@ -131,9 +131,9 @@ public class ClientState {
131131
private final Object quiesceLock = new Object();
132132
private boolean quiescing = false;
133133

134-
private long lastOutboundActivity = 0;
135-
private long lastInboundActivity = 0;
136-
private long lastPing = 0;
134+
private long lastOutboundActivity = 0; // nanoseconds absolute time
135+
private long lastInboundActivity = 0; // nanoseconds absolute time
136+
private long lastPing = 0; // nanoseconds absolute time
137137
private MqttWireMessage pingCommand;
138138
private final Object pingOutstandingLock = new Object();
139139
private int pingOutstanding = 0;
@@ -177,14 +177,14 @@ protected void setMaxInflight(int maxInflight) {
177177
pendingMessages = new Vector(this.maxInflight);
178178
}
179179
protected void setKeepAliveSecs(long keepAliveSecs) {
180-
this.keepAlive = TimeUnit.SECONDS.toNanos(keepAliveSecs);
180+
this.keepAliveNanos = TimeUnit.SECONDS.toNanos(keepAliveSecs);
181181
}
182182
/**
183183
* Returns the keepAlive in Milliseconds
184184
* @return The KeepAlive value in Milliseconds
185185
*/
186186
protected long getKeepAlive() {
187-
return TimeUnit.NANOSECONDS.toMillis(this.keepAlive);
187+
return TimeUnit.NANOSECONDS.toMillis(this.keepAliveNanos);
188188
}
189189
protected void setCleanSession(boolean cleanSession) {
190190
this.cleanSession = cleanSession;
@@ -715,9 +715,9 @@ public MqttToken checkForActivity(IMqttActionListener pingCallback) throws MqttE
715715
}
716716

717717
MqttToken token = null;
718-
long nextPingTime = this.keepAlive;
718+
long nextPingTime = TimeUnit.NANOSECONDS.toMillis(this.keepAliveNanos); // milliseconds relative time
719719

720-
if (connected && this.keepAlive > 0) {
720+
if (connected && this.keepAliveNanos > 0) {
721721
long time = System.nanoTime();
722722
// Below might not be necessary since move to nanoTime (Issue #278)
723723
//Reduce schedule frequency since System.currentTimeMillis is no accurate, add a buffer
@@ -728,24 +728,24 @@ public MqttToken checkForActivity(IMqttActionListener pingCallback) throws MqttE
728728
synchronized (pingOutstandingLock) {
729729

730730
// Is the broker connection lost because the broker did not reply to my ping?
731-
if (pingOutstanding > 0 && (time - lastInboundActivity >= keepAlive + delta)) {
731+
if (pingOutstanding > 0 && (time - lastInboundActivity >= keepAliveNanos + delta)) {
732732
// lastInboundActivity will be updated once receiving is done.
733733
// Add a delta, since the timer and System.currentTimeMillis() is not accurate.
734734
// TODO - Remove Delta, maybe?
735735
// A ping is outstanding but no packet has been received in KA so connection is deemed broken
736736
//@TRACE 619=Timed out as no activity, keepAlive={0} lastOutboundActivity={1} lastInboundActivity={2} time={3} lastPing={4}
737-
log.severe(CLASS_NAME,methodName,"619", new Object[]{ Long.valueOf(this.keepAlive), Long.valueOf(lastOutboundActivity), Long.valueOf(lastInboundActivity), Long.valueOf(time), Long.valueOf(lastPing)});
737+
log.severe(CLASS_NAME,methodName,"619", new Object[]{ Long.valueOf(this.keepAliveNanos), Long.valueOf(lastOutboundActivity), Long.valueOf(lastInboundActivity), Long.valueOf(time), Long.valueOf(lastPing)});
738738

739739
// A ping has already been sent. At this point, assume that the
740740
// broker has hung and the TCP layer hasn't noticed.
741741
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_TIMEOUT);
742742
}
743743

744744
// Is the broker connection lost because I could not get any successful write for 2 keepAlive intervals?
745-
if (pingOutstanding == 0 && (time - lastOutboundActivity >= 2*keepAlive)) {
745+
if (pingOutstanding == 0 && (time - lastOutboundActivity >= 2* keepAliveNanos)) {
746746

747747
// I am probably blocked on a write operations as I should have been able to write at least a ping message
748-
log.severe(CLASS_NAME,methodName,"642", new Object[]{ Long.valueOf(this.keepAlive), Long.valueOf(lastOutboundActivity), Long.valueOf(lastInboundActivity), Long.valueOf(time), Long.valueOf(lastPing)});
748+
log.severe(CLASS_NAME,methodName,"642", new Object[]{ Long.valueOf(this.keepAliveNanos), Long.valueOf(lastOutboundActivity), Long.valueOf(lastInboundActivity), Long.valueOf(time), Long.valueOf(lastPing)});
749749

750750
// A ping has not been sent but I am not progressing on the current write operation.
751751
// At this point, assume that the broker has hung and the TCP layer hasn't noticed.
@@ -761,11 +761,11 @@ public MqttToken checkForActivity(IMqttActionListener pingCallback) throws MqttE
761761
// This would be the case when receiving a large message;
762762
// the broker needs to keep receiving a regular ping even if the ping response are queued after the long message
763763
// If lacking to do so, the broker will consider my connection lost and cut my socket.
764-
if ((pingOutstanding == 0 && (time - lastInboundActivity >= keepAlive - delta)) ||
765-
(time - lastOutboundActivity >= keepAlive - delta)) {
764+
if ((pingOutstanding == 0 && (time - lastInboundActivity >= keepAliveNanos - delta)) ||
765+
(time - lastOutboundActivity >= keepAliveNanos - delta)) {
766766

767767
//@TRACE 620=ping needed. keepAlive={0} lastOutboundActivity={1} lastInboundActivity={2}
768-
log.fine(CLASS_NAME,methodName,"620", new Object[]{ Long.valueOf(this.keepAlive), Long.valueOf(lastOutboundActivity), Long.valueOf(lastInboundActivity)});
768+
log.fine(CLASS_NAME,methodName,"620", new Object[]{ Long.valueOf(this.keepAliveNanos), Long.valueOf(lastOutboundActivity), Long.valueOf(lastInboundActivity)});
769769

770770
// pingOutstanding++; // it will be set after the ping has been written on the wire
771771
// lastPing = time; // it will be set after the ping has been written on the wire
@@ -784,7 +784,9 @@ public MqttToken checkForActivity(IMqttActionListener pingCallback) throws MqttE
784784
else {
785785
//@TRACE 634=ping not needed yet. Schedule next ping.
786786
log.fine(CLASS_NAME, methodName, "634", null);
787-
nextPingTime = Math.max(1, getKeepAlive() - (time - lastOutboundActivity));
787+
long elapsedSinceLastActivityNanos = time - lastOutboundActivity;
788+
long elapsedSinceLastActivityMillis = TimeUnit.NANOSECONDS.toMillis( elapsedSinceLastActivityNanos );
789+
nextPingTime = Math.max(1, getKeepAlive() - elapsedSinceLastActivityMillis);
788790
}
789791
}
790792
//@TRACE 624=Schedule next ping at {0}
@@ -882,8 +884,13 @@ protected MqttWireMessage get() throws MqttException {
882884
return result;
883885
}
884886

887+
/**
888+
* Sets the keep alive interval.
889+
*
890+
* @param interval keep alive interval in milliseconds.
891+
*/
885892
public void setKeepAliveInterval(long interval) {
886-
this.keepAlive = interval;
893+
this.keepAliveNanos = TimeUnit.MILLISECONDS.toNanos(interval);
887894
}
888895

889896
public void notifySentBytes(int sentBytesCount) {

0 commit comments

Comments
 (0)