Skip to content

Commit 8d7c328

Browse files
committed
Automatic Reconnect & Offline Buffering functionality
Signed-off-by: James Sutton <james.sutton@uk.ibm.com>
1 parent e71b62a commit 8d7c328

14 files changed

Lines changed: 764 additions & 19 deletions

File tree

org.eclipse.paho.client.mqttv3/src/main/java-templates/org/eclipse/paho/client/mqttv3/internal/ClientComms.java

Lines changed: 93 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2009, 2015 IBM Corp.
2+
* Copyright (c) 2009, 2016 IBM Corp.
33
*
44
* All rights reserved. This program and the accompanying materials
55
* are made available under the terms of the Eclipse Public License v1.0
@@ -15,21 +15,25 @@
1515
* Ian Craggs - per subscription message handlers (bug 466579)
1616
* Ian Craggs - ack control (bug 472172)
1717
* James Sutton - checkForActivity Token (bug 473928)
18+
* James Sutton - Automatic Reconnect & Offline Buffering.
1819
*/
1920
package org.eclipse.paho.client.mqttv3.internal;
2021

2122
import java.util.Enumeration;
2223
import java.util.Properties;
2324
import java.util.Vector;
2425

26+
import org.eclipse.paho.client.mqttv3.BufferedMessage;
2527
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
2628
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
2729
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
2830
import org.eclipse.paho.client.mqttv3.MqttCallback;
31+
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
2932
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
3033
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
3134
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
3235
import org.eclipse.paho.client.mqttv3.MqttException;
36+
import org.eclipse.paho.client.mqttv3.MqttMessage;
3337
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
3438
import org.eclipse.paho.client.mqttv3.MqttPingSender;
3539
import org.eclipse.paho.client.mqttv3.MqttToken;
@@ -74,7 +78,9 @@ public class ClientComms {
7478
private byte conState = DISCONNECTED;
7579
private Object conLock = new Object(); // Used to synchronize connection state
7680
private boolean closePending = false;
77-
81+
private boolean resting = false;
82+
private DisconnectedMessageBuffer disconnectedMessageBuffer;
83+
7884
/**
7985
* Creates a new ClientComms object, using the specified module to handle
8086
* the network calls.
@@ -141,7 +147,19 @@ public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttExce
141147
if (isConnected() ||
142148
(!isConnected() && message instanceof MqttConnect) ||
143149
(isDisconnecting() && message instanceof MqttDisconnect)) {
144-
this.internalSend(message, token);
150+
if(disconnectedMessageBuffer != null && disconnectedMessageBuffer.getMessageCount() != 0){
151+
//@TRACE 507=Client Connected, Offline Buffer available, but not empty. Adding message to buffer. message={0}
152+
log.fine(CLASS_NAME, methodName, "507", new Object[] {message.getKey()});
153+
this.clientState.persistBufferedMessage(message);
154+
disconnectedMessageBuffer.putMessage(message, token);
155+
} else {
156+
this.internalSend(message, token);
157+
}
158+
} else if(disconnectedMessageBuffer != null && isResting()){
159+
//@TRACE 508=Client Resting, Offline Buffer available. Adding message to buffer. message={0}
160+
log.fine(CLASS_NAME, methodName, "508", new Object[] {message.getKey()});
161+
this.clientState.persistBufferedMessage(message);
162+
disconnectedMessageBuffer.putMessage(message, token);
145163
} else {
146164
//@TRACE 208=failed: not connected
147165
log.fine(CLASS_NAME, methodName, "208");
@@ -336,7 +354,10 @@ public void shutdownConnection(MqttToken token, MqttException reason) {
336354
}
337355

338356
try {
339-
if (persistence != null) {persistence.close();}
357+
if(disconnectedMessageBuffer == null && persistence != null){
358+
persistence.close();
359+
}
360+
340361
}catch(Exception ex) {
341362
// Ignore as we are shutting down
342363
}
@@ -499,12 +520,22 @@ public boolean isClosed() {
499520
return conState == CLOSED;
500521
}
501522
}
523+
524+
public boolean isResting() {
525+
synchronized (conLock) {
526+
return resting;
527+
}
528+
}
502529

503530

504531
public void setCallback(MqttCallback mqttCallback) {
505532
this.callback.setCallback(mqttCallback);
506533
}
507534

535+
public void setReconnectCallback(MqttCallbackExtended callback){
536+
this.callback.setReconnectCallback(callback);
537+
}
538+
508539
public void setManualAcks(boolean manualAcks) {
509540
this.callback.setManualAcks(manualAcks);
510541
}
@@ -720,4 +751,62 @@ private void handleRunException(Exception ex) {
720751

721752
shutdownConnection(null, mex);
722753
}
754+
755+
/**
756+
* When Automatic reconnect is enabled, we want ClientComs to enter the
757+
* 'resting' state if disconnected. This will allow us to publish messages
758+
* @param resting
759+
*/
760+
public void setRestingState(boolean resting) {
761+
this.resting = resting;
762+
}
763+
764+
public void setDisconnectedMessageBuffer(DisconnectedMessageBuffer disconnectedMessageBuffer) {
765+
this.disconnectedMessageBuffer = disconnectedMessageBuffer;
766+
}
767+
768+
public int getBufferedMessageCount(){
769+
return this.disconnectedMessageBuffer.getMessageCount();
770+
}
771+
772+
public MqttMessage getBufferedMessage(int bufferIndex){
773+
MqttPublish send = (MqttPublish) this.disconnectedMessageBuffer.getMessage(bufferIndex).getMessage();
774+
return send.getMessage();
775+
}
776+
777+
public void deleteBufferedMessage(int bufferIndex){
778+
this.disconnectedMessageBuffer.deleteMessage(bufferIndex);
779+
}
780+
781+
782+
/**
783+
* When the client automatically reconnects, we want to send all messages from the
784+
* buffer first before allowing the user to send any messages
785+
* @throws MqttException
786+
*/
787+
public void notifyReconnect() {
788+
final String methodName = "notifyReconnect";
789+
if(disconnectedMessageBuffer != null){
790+
//@TRACE 509=Client Reconnected, Offline Buffer Available. Sending Buffered Messages.
791+
log.fine(CLASS_NAME, methodName, "509");
792+
disconnectedMessageBuffer.setPublishCallback(new IDisconnectedBufferCallback() {
793+
794+
public void publishBufferedMessage(BufferedMessage bufferedMessage) throws MqttException {
795+
if (isConnected()) {
796+
//@TRACE 510=Publising Buffered message message={0}
797+
log.fine(CLASS_NAME, methodName, "510", new Object[] {bufferedMessage.getMessage().getKey()});
798+
internalSend(bufferedMessage.getMessage(), bufferedMessage.getToken());
799+
// Delete from persistence if in there
800+
clientState.unPersistBufferedMessage(bufferedMessage.getMessage());
801+
} else {
802+
//@TRACE 208=failed: not connected
803+
log.fine(CLASS_NAME, methodName, "208");
804+
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
805+
}
806+
}
807+
});
808+
new Thread(disconnectedMessageBuffer).start();
809+
}
810+
}
811+
723812
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2016 IBM Corp.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Public License v1.0
6+
* and Eclipse Distribution License v1.0 which accompany this distribution.
7+
*
8+
* The Eclipse Public License is available at
9+
* http://www.eclipse.org/legal/epl-v10.html
10+
* and the Eclipse Distribution License is available at
11+
* http://www.eclipse.org/org/documents/edl-v10.php.
12+
*
13+
* Contributors:
14+
* James Sutton - Initial Contribution for Automatic Reconnect & Offline Buffering
15+
*/
16+
package org.eclipse.paho.client.mqttv3;
17+
18+
import org.eclipse.paho.client.mqttv3.internal.wire.MqttWireMessage;
19+
20+
/**
21+
* A BufferedMessage contains an MqttWire Message and token
22+
* it allows both message and token to be buffered when the client
23+
* is in resting state
24+
*/
25+
public class BufferedMessage {
26+
27+
private MqttWireMessage message;
28+
private MqttToken token;
29+
30+
public BufferedMessage(MqttWireMessage message, MqttToken token){
31+
this.message = message;
32+
this.token = token;
33+
}
34+
35+
public MqttWireMessage getMessage() {
36+
return message;
37+
}
38+
39+
public MqttToken getToken() {
40+
return token;
41+
}
42+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2016 IBM Corp.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Public License v1.0
6+
* and Eclipse Distribution License v1.0 which accompany this distribution.
7+
*
8+
* The Eclipse Public License is available at
9+
* http://www.eclipse.org/legal/epl-v10.html
10+
* and the Eclipse Distribution License is available at
11+
* http://www.eclipse.org/org/documents/edl-v10.php.
12+
*
13+
* Contributors:
14+
* James Sutton - Initial Contribution for Automatic Reconnect & Offline Buffering
15+
*/
16+
package org.eclipse.paho.client.mqttv3;
17+
18+
/**
19+
* Holds the set of options that govern the behaviour
20+
* of Offline (or Disconnected) buffering of messages
21+
*/
22+
public class DisconnectedBufferOptions {
23+
24+
/**
25+
* The default size of the disconnected buffer
26+
*/
27+
public static final int DISCONNECTED_BUFFER_SIZE_DEFAULT = 5000;
28+
29+
public static final boolean DISCONNECTED_BUFFER_ENABLED_DEFAULT = false;
30+
31+
public static final boolean PERSIST_DISCONNECTED_BUFFER_DEFAULT = false;
32+
33+
public static final boolean DELETE_OLDEST_MESSAGES_DEFAULT = false;
34+
35+
private int bufferSize = DISCONNECTED_BUFFER_SIZE_DEFAULT;
36+
private boolean bufferEnabled = DISCONNECTED_BUFFER_ENABLED_DEFAULT;
37+
private boolean persistBuffer = PERSIST_DISCONNECTED_BUFFER_DEFAULT;
38+
private boolean deleteOldestMessages = DELETE_OLDEST_MESSAGES_DEFAULT;
39+
40+
/**
41+
* Constructs a new <code>DisconnectedBufferOptions</code> object using the
42+
* default values.
43+
*
44+
* The defaults are:
45+
* <ul>
46+
* <li>The disconnected buffer is disabled</li>
47+
* <li>The buffer holds 5000 messages</li>
48+
* <li>The buffer is not persisted</li>
49+
* <li>Once the buffer is full, old messages are not deleted</li>
50+
* </ul>
51+
* More information about these values can be found in the setter methods.
52+
*/
53+
public DisconnectedBufferOptions() {
54+
}
55+
56+
public int getBufferSize() {
57+
return bufferSize;
58+
}
59+
60+
public void setBufferSize(int bufferSize) {
61+
if (bufferSize < 1) {
62+
throw new IllegalArgumentException();
63+
}
64+
this.bufferSize = bufferSize;
65+
}
66+
67+
public boolean isBufferEnabled() {
68+
return bufferEnabled;
69+
}
70+
71+
public void setBufferEnabled(boolean bufferEnabled) {
72+
this.bufferEnabled = bufferEnabled;
73+
}
74+
75+
public boolean isPersistBuffer() {
76+
return persistBuffer;
77+
}
78+
79+
public void setPersistBuffer(boolean persistBuffer) {
80+
this.persistBuffer = persistBuffer;
81+
}
82+
83+
public boolean isDeleteOldestMessages() {
84+
return deleteOldestMessages;
85+
}
86+
87+
public void setDeleteOldestMessages(boolean deleteOldestMessages) {
88+
this.deleteOldestMessages = deleteOldestMessages;
89+
}
90+
}

0 commit comments

Comments
 (0)