Skip to content

Commit 23e8c35

Browse files
author
Ing. Jan Kaláb
committed
Merge remote-tracking branch 'upstream/develop' into patch-3
2 parents d5fe475 + 525c705 commit 23e8c35

7 files changed

Lines changed: 65 additions & 29 deletions

File tree

org.eclipse.paho.client.mqttv3.test/src/test/java/org/eclipse/paho/client/mqttv3/test/MqttTopicTest.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,18 @@ public void testValidTopicFilterWildcards() throws Exception {
6060
public void testMatchedTopicFilterWildcards() throws Exception {
6161
String methodName = Utility.getMethodName();
6262
LoggingUtilities.banner(log, cclass, methodName);
63-
String[][] matchingTopics = new String[][] { { "sport/tennis/player1/#", "sport/tennis/player1" },
64-
{ "sport/tennis/player1/#", "sport/tennis/player1/ranking" },
65-
{ "sport/tennis/player1/#", "sport/tennis/player1/score/wimbledon" }, { "sport/#", "sport" },
66-
{ "#", "sport/tennis/player1" } };
63+
String[][] matchingTopics = new String[][] {
64+
{ "+/+", "sport/hockey" },
65+
{ "/+", "/sport" },
66+
{ "sport/tennis/player1/#", "sport/tennis/player1" },
67+
{ "sport/tennis/player1/#", "sport/tennis/player1/ranking" },
68+
{ "sport/tennis/player1/#", "sport/tennis/player1/score/wimbledon" },
69+
{ "sport/#", "sport" },
70+
{ "#", "sport/tennis/player1" },
71+
{ "sport/tennis/player1/#", "sport/tennis/player1//wimbledon" },
72+
{ "sport/+/player1/#", "sport/tennis/player1/wimbledon" },
73+
{ "sport/+/player1/#", "sport/soccer/player1/UEFA" }
74+
};
6775

6876
for (String[] pair : matchingTopics) {
6977
Assert.assertTrue(pair[0] + " should match " + pair[1], MqttTopic.isMatched(pair[0], pair[1]));
@@ -74,8 +82,15 @@ public void testMatchedTopicFilterWildcards() throws Exception {
7482
public void testNonMatchedTopicFilterWildcards() throws Exception {
7583
String methodName = Utility.getMethodName();
7684
LoggingUtilities.banner(log, cclass, methodName);
77-
String[][] matchingTopics = new String[][] { { "sport/tennis/player1/#", "sport/tennis/player2" },
78-
{ "sport1/#", "sport2" }, { "sport/tennis1/player/#", "sport/tennis2/player" } };
85+
String[][] matchingTopics = new String[][] {
86+
{ "+/+", "/sport" },
87+
{ "+/+", "a/b/c" },
88+
{ "/sport/+", "/sport/" },
89+
{ "sport/tennis/player1/#", "sport/tennis/player2" },
90+
{ "sport1/#", "sport2" },
91+
{ "sport/tennis1/player/#", "sport/tennis2/player" },
92+
{ "sport//tennis/player1/#", "sport/tennis/player1//wimbledon" }
93+
};
7994

8095
for (String[] pair : matchingTopics) {
8196
Assert.assertFalse(pair[0] + " should NOT match " + pair[1], MqttTopic.isMatched(pair[0], pair[1]));

org.eclipse.paho.client.mqttv3.test/src/test/java/org/eclipse/paho/client/mqttv3/test/SSLSessionResumptionTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public static void setUpBeforeClass() throws Exception {
8282
*
8383
* @throws Exception
8484
*/
85-
@Test(timeout=30000)
85+
@Test(timeout=60000)
8686
public void testSSLSessionInvalidated() throws Exception {
8787
//System.setProperty("javax.net.debug", "all");
8888

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttTopic.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,9 @@ public static boolean isMatched(String topicFilter, String topicName) throws Ill
275275
}
276276

277277
while (filterPos < filterLen && topicPos < topicLen) {
278+
if (topicFilter.charAt(filterPos) == '#')
279+
topicPos = topicLen - 1; // skip until end of string
280+
278281
if (topicName.charAt(topicPos) == '/' && topicFilter.charAt(filterPos) != '/')
279282

280283
break;
@@ -285,8 +288,8 @@ public static boolean isMatched(String topicFilter, String topicName) throws Ill
285288
int nextpos = topicPos + 1;
286289
while (nextpos < topicLen && topicName.charAt(nextpos) != '/')
287290
nextpos = ++topicPos + 1;
288-
} else if (topicFilter.charAt(filterPos) == '#')
289-
topicPos = topicLen - 1; // skip until end of string
291+
}
292+
290293
filterPos++;
291294
topicPos++;
292295
}

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/ClientState.java

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public class ClientState {
119119
private CommsTokenStore tokenStore;
120120
private ClientComms clientComms = null;
121121
private CommsCallback callback = null;
122-
private long keepAlive;
122+
private long keepAliveNanos; // nanoseconds time
123123
private boolean cleanSession;
124124
private MqttClientPersistence persistence;
125125

@@ -131,9 +131,9 @@ public class ClientState {
131131
private final Object quiesceLock = new Object();
132132
private boolean quiescing = false;
133133

134-
private long lastOutboundActivity = 0;
135-
private long lastInboundActivity = 0;
136-
private long lastPing = 0;
134+
private long lastOutboundActivity = 0; // nanoseconds absolute time
135+
private long lastInboundActivity = 0; // nanoseconds absolute time
136+
private long lastPing = 0; // nanoseconds absolute time
137137
private MqttWireMessage pingCommand;
138138
private final Object pingOutstandingLock = new Object();
139139
private int pingOutstanding = 0;
@@ -177,14 +177,14 @@ protected void setMaxInflight(int maxInflight) {
177177
pendingMessages = new Vector(this.maxInflight);
178178
}
179179
protected void setKeepAliveSecs(long keepAliveSecs) {
180-
this.keepAlive = TimeUnit.SECONDS.toNanos(keepAliveSecs);
180+
this.keepAliveNanos = TimeUnit.SECONDS.toNanos(keepAliveSecs);
181181
}
182182
/**
183183
* Returns the keepAlive in Milliseconds
184184
* @return The KeepAlive value in Milliseconds
185185
*/
186186
protected long getKeepAlive() {
187-
return TimeUnit.NANOSECONDS.toMillis(this.keepAlive);
187+
return TimeUnit.NANOSECONDS.toMillis(this.keepAliveNanos);
188188
}
189189
protected void setCleanSession(boolean cleanSession) {
190190
this.cleanSession = cleanSession;
@@ -715,9 +715,9 @@ public MqttToken checkForActivity(IMqttActionListener pingCallback) throws MqttE
715715
}
716716

717717
MqttToken token = null;
718-
long nextPingTime = this.keepAlive;
718+
long nextPingTime = TimeUnit.NANOSECONDS.toMillis(this.keepAliveNanos); // milliseconds relative time
719719

720-
if (connected && this.keepAlive > 0) {
720+
if (connected && this.keepAliveNanos > 0) {
721721
long time = System.nanoTime();
722722
// Below might not be necessary since move to nanoTime (Issue #278)
723723
//Reduce schedule frequency since System.currentTimeMillis is no accurate, add a buffer
@@ -728,24 +728,24 @@ public MqttToken checkForActivity(IMqttActionListener pingCallback) throws MqttE
728728
synchronized (pingOutstandingLock) {
729729

730730
// Is the broker connection lost because the broker did not reply to my ping?
731-
if (pingOutstanding > 0 && (time - lastInboundActivity >= keepAlive + delta)) {
731+
if (pingOutstanding > 0 && (time - lastInboundActivity >= keepAliveNanos + delta)) {
732732
// lastInboundActivity will be updated once receiving is done.
733733
// Add a delta, since the timer and System.currentTimeMillis() is not accurate.
734734
// TODO - Remove Delta, maybe?
735735
// A ping is outstanding but no packet has been received in KA so connection is deemed broken
736736
//@TRACE 619=Timed out as no activity, keepAlive={0} lastOutboundActivity={1} lastInboundActivity={2} time={3} lastPing={4}
737-
log.severe(CLASS_NAME,methodName,"619", new Object[]{ Long.valueOf(this.keepAlive), Long.valueOf(lastOutboundActivity), Long.valueOf(lastInboundActivity), Long.valueOf(time), Long.valueOf(lastPing)});
737+
log.severe(CLASS_NAME,methodName,"619", new Object[]{ Long.valueOf(this.keepAliveNanos), Long.valueOf(lastOutboundActivity), Long.valueOf(lastInboundActivity), Long.valueOf(time), Long.valueOf(lastPing)});
738738

739739
// A ping has already been sent. At this point, assume that the
740740
// broker has hung and the TCP layer hasn't noticed.
741741
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_TIMEOUT);
742742
}
743743

744744
// Is the broker connection lost because I could not get any successful write for 2 keepAlive intervals?
745-
if (pingOutstanding == 0 && (time - lastOutboundActivity >= 2*keepAlive)) {
745+
if (pingOutstanding == 0 && (time - lastOutboundActivity >= 2* keepAliveNanos)) {
746746

747747
// I am probably blocked on a write operations as I should have been able to write at least a ping message
748-
log.severe(CLASS_NAME,methodName,"642", new Object[]{ Long.valueOf(this.keepAlive), Long.valueOf(lastOutboundActivity), Long.valueOf(lastInboundActivity), Long.valueOf(time), Long.valueOf(lastPing)});
748+
log.severe(CLASS_NAME,methodName,"642", new Object[]{ Long.valueOf(this.keepAliveNanos), Long.valueOf(lastOutboundActivity), Long.valueOf(lastInboundActivity), Long.valueOf(time), Long.valueOf(lastPing)});
749749

750750
// A ping has not been sent but I am not progressing on the current write operation.
751751
// At this point, assume that the broker has hung and the TCP layer hasn't noticed.
@@ -761,11 +761,11 @@ public MqttToken checkForActivity(IMqttActionListener pingCallback) throws MqttE
761761
// This would be the case when receiving a large message;
762762
// the broker needs to keep receiving a regular ping even if the ping response are queued after the long message
763763
// If lacking to do so, the broker will consider my connection lost and cut my socket.
764-
if ((pingOutstanding == 0 && (time - lastInboundActivity >= keepAlive - delta)) ||
765-
(time - lastOutboundActivity >= keepAlive - delta)) {
764+
if ((pingOutstanding == 0 && (time - lastInboundActivity >= keepAliveNanos - delta)) ||
765+
(time - lastOutboundActivity >= keepAliveNanos - delta)) {
766766

767767
//@TRACE 620=ping needed. keepAlive={0} lastOutboundActivity={1} lastInboundActivity={2}
768-
log.fine(CLASS_NAME,methodName,"620", new Object[]{ Long.valueOf(this.keepAlive), Long.valueOf(lastOutboundActivity), Long.valueOf(lastInboundActivity)});
768+
log.fine(CLASS_NAME,methodName,"620", new Object[]{ Long.valueOf(this.keepAliveNanos), Long.valueOf(lastOutboundActivity), Long.valueOf(lastInboundActivity)});
769769

770770
// pingOutstanding++; // it will be set after the ping has been written on the wire
771771
// lastPing = time; // it will be set after the ping has been written on the wire
@@ -784,7 +784,9 @@ public MqttToken checkForActivity(IMqttActionListener pingCallback) throws MqttE
784784
else {
785785
//@TRACE 634=ping not needed yet. Schedule next ping.
786786
log.fine(CLASS_NAME, methodName, "634", null);
787-
nextPingTime = Math.max(1, getKeepAlive() - (time - lastOutboundActivity));
787+
long elapsedSinceLastActivityNanos = time - lastOutboundActivity;
788+
long elapsedSinceLastActivityMillis = TimeUnit.NANOSECONDS.toMillis( elapsedSinceLastActivityNanos );
789+
nextPingTime = Math.max(1, getKeepAlive() - elapsedSinceLastActivityMillis);
788790
}
789791
}
790792
//@TRACE 624=Schedule next ping at {0}
@@ -882,8 +884,13 @@ protected MqttWireMessage get() throws MqttException {
882884
return result;
883885
}
884886

887+
/**
888+
* Sets the keep alive interval.
889+
*
890+
* @param interval keep alive interval in milliseconds.
891+
*/
885892
public void setKeepAliveInterval(long interval) {
886-
this.keepAlive = interval;
893+
this.keepAliveNanos = TimeUnit.MILLISECONDS.toNanos(interval);
887894
}
888895

889896
public void notifySentBytes(int sentBytesCount) {

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsReceiver.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,13 @@ public void run() {
165165
if (message != null) {
166166
// A new message has arrived
167167
clientState.notifyReceivedMsg(message);
168-
}
168+
}
169+
else {
170+
// fix for bug 719
171+
if (!clientComms.isConnected()) {
172+
throw new IOException("Connection is lost.");
173+
}
174+
}
169175
}
170176
}
171177
catch (MqttException ex) {

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/DisconnectedMessageBuffer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
public class DisconnectedMessageBuffer implements Runnable {
2929

30-
private final String CLASS_NAME = "DisconnectedMessageBuffer";
30+
private final String CLASS_NAME = DisconnectedMessageBuffer.class.getName();
3131
private Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
3232
private DisconnectedBufferOptions bufferOpts;
3333
private ArrayList<BufferedMessage> buffer;
@@ -53,6 +53,11 @@ public DisconnectedMessageBuffer(DisconnectedBufferOptions options) {
5353
* if the Buffer is full
5454
*/
5555
public void putMessage(MqttWireMessage message, MqttToken token) throws MqttException {
56+
if (token != null) {
57+
message.setToken(token);
58+
token.internalTok.setMessageID(message.getMessageId());
59+
}
60+
5661
BufferedMessage bufferedMessage = new BufferedMessage(message, token);
5762
synchronized (bufLock) {
5863
if (buffer.size() < bufferOpts.getBufferSize()) {

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/DisconnectedMessageBuffer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929
public class DisconnectedMessageBuffer implements Runnable {
3030

31-
private static final String CLASS_NAME = "DisconnectedMessageBuffer";
31+
private static final String CLASS_NAME = DisconnectedMessageBuffer.class.getName();
3232
private Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
3333
private DisconnectedBufferOptions bufferOpts;
3434
private ArrayList<BufferedMessage> buffer;

0 commit comments

Comments
 (0)