Skip to content

Commit 1c8ad37

Browse files
committed
Merge branch 'develop' of github.com:eclipse/paho.mqtt.java into develop
Signed-off-by: James Sutton <james.sutton@uk.ibm.com>
2 parents 0ae15cf + ec51902 commit 1c8ad37

5 files changed

Lines changed: 67 additions & 32 deletions

File tree

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public class MqttAsyncClient implements IMqttAsyncClient {
104104
private Timer reconnectTimer; // Automatic reconnect timer
105105
private static int reconnectDelay = 1000; // Reconnect delay, starts at 1 second
106106
private boolean reconnecting = false;
107-
107+
private static Object clientLock = new Object(); // Simple lock
108108

109109

110110

@@ -1217,21 +1217,32 @@ private void stopReconnectCycle(){
12171217
String methodName = "stopReconnectCycle";
12181218
//@Trace 504=Stop reconnect timer for client: {0}
12191219
log.fine(CLASS_NAME, methodName, "504", new Object[]{this.clientId});
1220-
if(reconnectTimer != null){
1221-
reconnectTimer.cancel();
1220+
synchronized(clientLock) {
1221+
if (this.connOpts.isAutomaticReconnect()) {
1222+
if(reconnectTimer != null){
1223+
reconnectTimer.cancel();
1224+
reconnectTimer = null;
1225+
}
1226+
reconnectDelay = 1000; // Reset Delay Timer
1227+
}
12221228
}
1223-
reconnectDelay = 1000; // Reset Delay Timer
1224-
12251229
}
12261230

12271231
private void rescheduleReconnectCycle(int delay){
12281232
String methodName = "rescheduleReconnectCycle";
12291233
//@Trace 505=Rescheduling reconnect timer for client: {0}, delay: {1}
12301234
log.fine(CLASS_NAME, methodName, "505", new Object[]{this.clientId, new Long(reconnectDelay)});
1231-
if(reconnectTimer != null){
1232-
reconnectTimer.schedule(new ReconnectTask(), reconnectDelay);
1235+
synchronized(clientLock) {
1236+
if(this.connOpts.isAutomaticReconnect()) {
1237+
if (reconnectTimer != null) {
1238+
reconnectTimer.schedule(new ReconnectTask(), delay);
1239+
} else {
1240+
// The previous reconnect timer was cancelled
1241+
reconnectDelay = delay;
1242+
startReconnectCycle();
1243+
}
1244+
}
12331245
}
1234-
12351246
}
12361247

12371248
private class ReconnectTask extends TimerTask {

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttConnectOptions.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -500,26 +500,29 @@ public void setServerURIs(String[] array) {
500500
* @param srvURI The Server URI
501501
* @return the URI type
502502
*/
503-
protected static int validateURI(String srvURI) {
503+
public static int validateURI(String srvURI) {
504504
try {
505505
URI vURI = new URI(srvURI);
506-
if (vURI.getScheme().equals("ws")){
506+
if ("ws".equals(vURI.getScheme())){
507507
return URI_TYPE_WS;
508508
}
509-
else if (vURI.getScheme().equals("wss")) {
509+
else if ("wss".equals(vURI.getScheme())) {
510510
return URI_TYPE_WSS;
511511
}
512512

513-
if (!vURI.getPath().equals("")) {
514-
throw new IllegalArgumentException(srvURI);
513+
if ((vURI.getPath() == null) || vURI.getPath().isEmpty()) {
514+
// No op path must be empty
515515
}
516-
if (vURI.getScheme().equals("tcp")) {
516+
else {
517+
throw new IllegalArgumentException(srvURI);
518+
}
519+
if ("tcp".equals(vURI.getScheme())) {
517520
return URI_TYPE_TCP;
518521
}
519-
else if (vURI.getScheme().equals("ssl")) {
522+
else if ("ssl".equals(vURI.getScheme())) {
520523
return URI_TYPE_SSL;
521524
}
522-
else if (vURI.getScheme().equals("local")) {
525+
else if ("local".equals(vURI.getScheme())) {
523526
return URI_TYPE_LOCAL;
524527
}
525528
else {

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/ClientState.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -615,6 +615,13 @@ protected void undo(MqttPublish message) throws MqttPersistenceException {
615615
pendingMessages.removeElement(message);
616616
persistence.remove(getSendPersistenceKey(message));
617617
tokenStore.removeToken(message);
618+
if(message.getMessage().getQos() > 0){
619+
//Free this message Id so it can be used again
620+
releaseMessageId(message.getMessageId());
621+
//Set the messageId to 0 so if it's ever retried, it will get a new messageId
622+
message.setMessageId(0);
623+
}
624+
618625
checkQuiesceLock();
619626
}
620627
}

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsReceiver.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
*
44
* All rights reserved. This program and the accompanying materials
55
* are made available under the terms of the Eclipse Public License v1.0
6-
* and Eclipse Distribution License v1.0 which accompany this distribution.
6+
* and Eclipse Distribution License v1.0 which accompany this distribution.
77
*
8-
* The Eclipse Public License is available at
8+
* The Eclipse Public License is available at
99
* http://www.eclipse.org/legal/epl-v10.html
10-
* and the Eclipse Distribution License is available at
10+
* and the Eclipse Distribution License is available at
1111
* http://www.eclipse.org/org/documents/edl-v10.php.
1212
*
1313
* Contributors:
@@ -44,15 +44,15 @@ public class CommsReceiver implements Runnable {
4444
private CommsTokenStore tokenStore = null;
4545
private Thread recThread = null;
4646
private volatile boolean receiving;
47-
47+
4848
public CommsReceiver(ClientComms clientComms, ClientState clientState,CommsTokenStore tokenStore, InputStream in) {
4949
this.in = new MqttInputStream(clientState, in);
5050
this.clientComms = clientComms;
5151
this.clientState = clientState;
5252
this.tokenStore = tokenStore;
5353
log.setResourceName(clientComms.getClient().getClientId());
5454
}
55-
55+
5656
/**
5757
* Starts up the Receiver's thread.
5858
* @param threadName The name of the thread
@@ -100,29 +100,29 @@ public void stop() {
100100
//@TRACE 851=stopped
101101
log.fine(CLASS_NAME,methodName,"851");
102102
}
103-
103+
104104
/**
105105
* Run loop to receive messages from the server.
106106
*/
107107
public void run() {
108108
final String methodName = "run";
109109
MqttToken token = null;
110-
110+
111111
while (running && (in != null)) {
112112
try {
113113
//@TRACE 852=network read message
114114
log.fine(CLASS_NAME,methodName,"852");
115115
receiving = in.available() > 0;
116116
MqttWireMessage message = in.readMqttWireMessage();
117117
receiving = false;
118-
118+
119119
// instanceof checks if message is null
120120
if (message instanceof MqttAck) {
121121
token = tokenStore.getToken(message);
122122
if (token!=null) {
123123
synchronized (token) {
124124
// Ensure the notify processing is done under a lock on the token
125-
// This ensures that the send processing can complete before the
125+
// This ensures that the send processing can complete before the
126126
// receive processing starts! ( request and ack and ack processing
127127
// can occur before request processing is complete if not!
128128
clientState.notifyReceivedAck((MqttAck)message);
@@ -133,7 +133,6 @@ public void run() {
133133
//because of timeouts, crashes, disconnects, restarts etc.
134134
//It should be safe to ignore these unexpected messages.
135135
log.fine(CLASS_NAME, methodName, "857");
136-
137136
} else {
138137
// It its an ack and there is no token then something is not right.
139138
// An ack should always have a token assoicated with it.
@@ -152,13 +151,13 @@ public void run() {
152151
running = false;
153152
// Token maybe null but that is handled in shutdown
154153
clientComms.shutdownConnection(token, ex);
155-
}
154+
}
156155
catch (IOException ioe) {
157156
//@TRACE 853=Stopping due to IOException
158157
log.fine(CLASS_NAME,methodName,"853");
159158

160159
running = false;
161-
// An EOFException could be raised if the broker processes the
160+
// An EOFException could be raised if the broker processes the
162161
// DISCONNECT and ends the socket before we complete. As such,
163162
// only shutdown the connection if we're not already shutting down.
164163
if (!clientComms.isDisconnecting()) {
@@ -169,18 +168,18 @@ public void run() {
169168
receiving = false;
170169
}
171170
}
172-
171+
173172
//@TRACE 854=<
174173
log.fine(CLASS_NAME,methodName,"854");
175174
}
176-
175+
177176
public boolean isRunning() {
178177
return running;
179178
}
180-
179+
181180
/**
182181
* Returns the receiving state.
183-
*
182+
*
184183
* @return true if the receiver is receiving data, false otherwise.
185184
*/
186185
public boolean isReceiving() {

pom.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,21 @@
9494
<target>${java.version}</target>
9595
</configuration>
9696
</plugin>
97+
<plugin>
98+
<groupId>org.apache.maven.plugins</groupId>
99+
<artifactId>maven-javadoc-plugin</artifactId>
100+
<version>2.9.1</version>
101+
</plugin>
102+
<plugin>
103+
<groupId>org.apache.maven.plugins</groupId>
104+
<artifactId>maven-resources-plugin</artifactId>
105+
<version>2.4.2</version>
106+
</plugin>
107+
<plugin>
108+
<groupId>org.apache.maven.plugins</groupId>
109+
<artifactId>maven-source-plugin</artifactId>
110+
<version>3.0.1</version>
111+
</plugin>
97112
<plugin>
98113
<groupId>org.apache.maven.plugins</groupId>
99114
<artifactId>maven-deploy-plugin</artifactId>

0 commit comments

Comments
 (0)