Skip to content

Commit a083bd5

Browse files
committed
Issue #582 - Tweaking Offline Buffer test to work, fixing trace in multiple locations
Signed-off-by: James Sutton <james.sutton@uk.ibm.com>
1 parent 3906943 commit a083bd5

10 files changed

Lines changed: 88 additions & 71 deletions

File tree

org.eclipse.paho.client.mqttv3.test/src/test/java/org/eclipse/paho/client/mqttv3/test/automaticReconnect/OfflineBufferingTest.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public static void setUpBeforeClass() throws Exception {
6363
LoggingUtilities.banner(log, cclass, methodName);
6464
serverURI = TestProperties.getServerURI();
6565
serverURIString = "tcp://" + serverURI.getHost() + ":" + serverURI.getPort();
66-
topicPrefix = "OfflineBufferingTest-" + UUID.randomUUID().toString() + "-";
66+
topicPrefix = "OfflineBufferingTest-" + UUID.randomUUID().toString() + "-";
6767

6868
// Use 0 for the first time.
6969
proxy = new ConnectionManipulationProxyServer(serverURI.getHost(), serverURI.getPort(), 2883);
@@ -78,16 +78,15 @@ public static void setUpBeforeClass() throws Exception {
7878
}
7979

8080
}
81-
81+
8282
@After
8383
public void clearUpAfterTest() {
8484
proxy.disableProxy();
8585
}
8686

8787
/**
88-
* Tests that A message can be buffered whilst the client is in a
89-
* disconnected state and is then delivered once the client has reconnected
90-
* automatically.
88+
* Tests that A message can be buffered whilst the client is in a disconnected
89+
* state and is then delivered once the client has reconnected automatically.
9190
*/
9291
@Test
9392
public void testSingleMessageBufferAndDeliver() throws Exception {
@@ -157,8 +156,8 @@ public void testSingleMessageBufferAndDeliver() throws Exception {
157156

158157
/**
159158
* Tests that multiple messages can be buffered whilst the client is in a
160-
* disconnected state and that they are all then delivered once the client
161-
* has connected automatically.
159+
* disconnected state and that they are all then delivered once the client has
160+
* connected automatically.
162161
*/
163162
@Test
164163
public void testManyMessageBufferAndDeliver() throws Exception {
@@ -172,13 +171,18 @@ public void testManyMessageBufferAndDeliver() throws Exception {
172171
MqttConnectOptions options = new MqttConnectOptions();
173172
options.setCleanSession(true);
174173
options.setAutomaticReconnect(true);
174+
175+
// Workaround for Issue #582 - Remove once fixed.
176+
options.setMaxInflight(100);
177+
175178
MqttAsyncClient client = new MqttAsyncClient("tcp://localhost:" + proxy.getLocalPort(), methodName, DATA_STORE);
176179
DisconnectedBufferOptions disconnectedOpts = new DisconnectedBufferOptions();
177180
disconnectedOpts.setBufferEnabled(true);
178181
client.setBufferOpts(disconnectedOpts);
179182

180183
// Create subscription client that won't be affected by proxy
181-
MqttAsyncClient subClient = new MqttAsyncClient(serverURIString, methodName + "sub-client");
184+
MemoryPersistence persistence = new MemoryPersistence();
185+
MqttAsyncClient subClient = new MqttAsyncClient(serverURIString, methodName + "sub-client", persistence);
182186
MqttV3Receiver mqttV3Receiver = new MqttV3Receiver(subClient, LoggingUtilities.getPrintStream());
183187
subClient.setCallback(mqttV3Receiver);
184188
IMqttToken subConnectToken = subClient.connect();
@@ -226,12 +230,13 @@ public void testManyMessageBufferAndDeliver() throws Exception {
226230
isConnected = client.isConnected();
227231
log.info("Proxy Re-Enabled isConnected: " + isConnected);
228232
Assert.assertTrue(isConnected);
229-
233+
230234
Thread.sleep(5000);
231235

232236
// Check that all messages have been delivered
233237
for (int x = 0; x < 100; x++) {
234-
boolean recieved = mqttV3Receiver.validateReceipt(topicPrefix + methodName, 1, Integer.toString(x).getBytes());
238+
boolean recieved = mqttV3Receiver.validateReceipt(topicPrefix + methodName, 1,
239+
Integer.toString(x).getBytes());
235240
Assert.assertTrue(recieved);
236241
}
237242
log.info("All messages sent and Recieved correctly.");
@@ -447,7 +452,7 @@ public void testUnPersistBufferedMessagesOnNewClient() throws Exception {
447452
List<String> persistedKeys = Collections.list(persistence.keys());
448453
log.info("There are now: " + persistedKeys.size() + " keys in persistence");
449454
Assert.assertEquals(1, persistedKeys.size());
450-
455+
451456
// Create Subscription client to watch for the message being published
452457
// as soon as the main client connects
453458
log.info("Creating subscription client");

org.eclipse.paho.client.mqttv3/src/main/java-templates/org/eclipse/paho/client/mqttv3/internal/ClientComms.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2009, 2016 IBM Corp.
2+
* Copyright (c) 2009, 2018 IBM Corp.
33
*
44
* All rights reserved. This program and the accompanying materials
55
* are made available under the terms of the Eclipse Public License v1.0
@@ -56,8 +56,8 @@
5656
public class ClientComms {
5757
public static String VERSION = "${project.version}";
5858
public static String BUILD_LEVEL = "L${build.level}";
59-
private static final String CLASS_NAME = ClientComms.class.getName();
60-
private static final Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT,CLASS_NAME);
59+
private final String CLASS_NAME = ClientComms.class.getName();
60+
private final Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT,CLASS_NAME);
6161

6262
private static final byte CONNECTED = 0;
6363
private static final byte CONNECTING = 1;
@@ -854,7 +854,7 @@ public void notifyConnect() {
854854
final String methodName = "notifyConnect";
855855
if(disconnectedMessageBuffer != null){
856856
//@TRACE 509=Client Connected, Offline Buffer Available. Sending Buffered Messages.
857-
log.fine(CLASS_NAME, methodName, "509");
857+
log.fine(CLASS_NAME, methodName, "509", null);
858858

859859
disconnectedMessageBuffer.setPublishCallback(new ReconnectDisconnectedBufferCallback(methodName));
860860
executorService.execute(disconnectedMessageBuffer);
@@ -871,14 +871,16 @@ class ReconnectDisconnectedBufferCallback implements IDisconnectedBufferCallback
871871

872872
public void publishBufferedMessage(BufferedMessage bufferedMessage) throws MqttException {
873873
if (isConnected()) {
874-
while(clientState.getActualInFlight() >= (clientState.getMaxInFlight()-1)){
874+
// First pass at making sure that we don't flood the in-flight messages
875+
while(clientState.getActualInFlight() >= (clientState.getMaxInFlight()-3)){
875876
// We need to Yield to the other threads to allow the in flight messages to clear
876877
Thread.yield();
877878

878879
}
879-
//@TRACE 510=Publising Buffered message message={0}
880+
//@TRACE 510=Publishing Buffered message message={0}
880881
log.fine(CLASS_NAME, methodName, "510", new Object[] {bufferedMessage.getMessage().getKey()});
881882
internalSend(bufferedMessage.getMessage(), bufferedMessage.getToken());
883+
882884
// Delete from persistence if in there
883885
clientState.unPersistBufferedMessage(bufferedMessage.getMessage());
884886
} else {

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttClient.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2009, 2015 IBM Corp.
2+
* Copyright (c) 2009, 2018 IBM Corp.
33
*
44
* All rights reserved. This program and the accompanying materials
55
* are made available under the terms of the Eclipse Public License v1.0
@@ -58,9 +58,7 @@
5858
*
5959
* @see IMqttClient
6060
*/
61-
public class MqttClient implements IMqttClient { //), DestinationProvider {
62-
//private static final String CLASS_NAME = MqttClient.class.getName();
63-
//private static final Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT,CLASS_NAME);
61+
public class MqttClient implements IMqttClient {
6462

6563
protected MqttAsyncClient aClient = null; // Delegate implementation to MqttAsyncClient
6664
protected long timeToWait = -1; // How long each method should wait for action to complete

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/ScheduledExecutorPingSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
*/
3434
public class ScheduledExecutorPingSender implements MqttPingSender {
3535
private static final String CLASS_NAME = ScheduledExecutorPingSender.class.getName();
36-
private static final Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
36+
private final Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
3737

3838
private ClientComms comms;
3939
private ScheduledExecutorService executorService;

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/ClientState.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -720,7 +720,7 @@ public MqttToken checkForActivity(IMqttActionListener pingCallback) throws MqttE
720720
// Below might not be necessary since move to nanoTime (Issue #278)
721721
//Reduce schedule frequency since System.currentTimeMillis is no accurate, add a buffer
722722
//It is 1/10 in minimum keepalive unit.
723-
int delta = 1000;
723+
int delta = 100000;
724724

725725
// ref bug: https://bugs.eclipse.org/bugs/show_bug.cgi?id=446663
726726
synchronized (pingOutstandingLock) {
@@ -774,14 +774,15 @@ public MqttToken checkForActivity(IMqttActionListener pingCallback) throws MqttE
774774
tokenStore.saveToken(token, pingCommand);
775775
pendingFlows.insertElementAt(pingCommand, 0);
776776

777-
nextPingTime = getKeepAlive();
777+
nextPingTime = this.keepAlive;
778778

779779
//Wake sender thread since it may be in wait state (in ClientState.get())
780780
notifyQueueLock();
781781
}
782782
else {
783+
//@TRACE 634=ping not needed yet. Schedule next ping.
783784
log.fine(CLASS_NAME, methodName, "634", null);
784-
nextPingTime = Math.max(1, getKeepAlive() - (time - lastOutboundActivity));
785+
nextPingTime = Math.max(1, this.keepAlive - (time - lastOutboundActivity));
785786
}
786787
}
787788
//@TRACE 624=Schedule next ping at {0}

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsCallback.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
*/
4848
public class CommsCallback implements Runnable {
4949
private static final String CLASS_NAME = CommsCallback.class.getName();
50-
private static final Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
50+
private final Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
5151

5252
private static final int INBOUND_QUEUE_SIZE = 10;
5353
private MqttCallback mqttCallback;

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/DisconnectedMessageBuffer.java

Lines changed: 47 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -26,102 +26,112 @@
2626
import org.eclipse.paho.client.mqttv3.logging.LoggerFactory;
2727

2828
public class DisconnectedMessageBuffer implements Runnable {
29-
30-
private static final String CLASS_NAME = "DisconnectedMessageBuffer";
29+
30+
private final String CLASS_NAME = "DisconnectedMessageBuffer";
3131
private Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
3232
private DisconnectedBufferOptions bufferOpts;
3333
private ArrayList<BufferedMessage> buffer;
34-
private Object bufLock = new Object(); // Used to synchronise the buffer
34+
private Object bufLock = new Object(); // Used to synchronise the buffer
3535
private IDisconnectedBufferCallback callback;
36-
37-
public DisconnectedMessageBuffer(DisconnectedBufferOptions options){
36+
37+
public DisconnectedMessageBuffer(DisconnectedBufferOptions options) {
3838
this.bufferOpts = options;
3939
buffer = new ArrayList<BufferedMessage>();
4040
}
41-
41+
4242
/**
43-
* This will add a new message to the offline buffer,
44-
* if the buffer is full and deleteOldestMessages is enabled
45-
* then the 0th item in the buffer will be deleted and the
46-
* new message will be added. If it is not enabled then an
43+
* This will add a new message to the offline buffer, if the buffer is full and
44+
* deleteOldestMessages is enabled then the 0th item in the buffer will be
45+
* deleted and the new message will be added. If it is not enabled then an
4746
* MqttException will be thrown.
48-
* @param message the {@link MqttWireMessage} that will be buffered
49-
* @param token the associated {@link MqttToken}
50-
* @throws MqttException if the Buffer is full
47+
*
48+
* @param message
49+
* the {@link MqttWireMessage} that will be buffered
50+
* @param token
51+
* the associated {@link MqttToken}
52+
* @throws MqttException
53+
* if the Buffer is full
5154
*/
52-
public void putMessage(MqttWireMessage message, MqttToken token) throws MqttException{
55+
public void putMessage(MqttWireMessage message, MqttToken token) throws MqttException {
5356
BufferedMessage bufferedMessage = new BufferedMessage(message, token);
5457
synchronized (bufLock) {
55-
if(buffer.size() < bufferOpts.getBufferSize()){
58+
if (buffer.size() < bufferOpts.getBufferSize()) {
5659
buffer.add(bufferedMessage);
57-
} else if(bufferOpts.isDeleteOldestMessages() == true){
60+
} else if (bufferOpts.isDeleteOldestMessages() == true) {
5861
buffer.remove(0);
5962
buffer.add(bufferedMessage);
60-
}else {
63+
} else {
6164
throw new MqttException(MqttException.REASON_CODE_DISCONNECTED_BUFFER_FULL);
6265
}
6366
}
6467
}
65-
68+
6669
/**
6770
* Retrieves a message from the buffer at the given index.
68-
* @param messageIndex the index of the message to be retrieved in the buffer
71+
*
72+
* @param messageIndex
73+
* the index of the message to be retrieved in the buffer
6974
* @return the {@link BufferedMessage}
7075
*/
71-
public BufferedMessage getMessage(int messageIndex){
76+
public BufferedMessage getMessage(int messageIndex) {
7277
synchronized (bufLock) {
73-
return((BufferedMessage) buffer.get(messageIndex));
78+
return ((BufferedMessage) buffer.get(messageIndex));
7479
}
7580
}
76-
77-
81+
7882
/**
7983
* Removes a message from the buffer
80-
* @param messageIndex the index of the message to be deleted in the buffer
84+
*
85+
* @param messageIndex
86+
* the index of the message to be deleted in the buffer
8187
*/
82-
public void deleteMessage(int messageIndex){
88+
public void deleteMessage(int messageIndex) {
8389
synchronized (bufLock) {
8490
buffer.remove(messageIndex);
8591
}
8692
}
87-
93+
8894
/**
8995
* Returns the number of messages currently in the buffer
96+
*
9097
* @return The count of messages in the buffer
9198
*/
9299
public int getMessageCount() {
93100
synchronized (bufLock) {
94101
return buffer.size();
95102
}
96103
}
97-
104+
98105
/**
99106
* Flushes the buffer of messages into an open connection
100107
*/
101108
public void run() {
102109
final String methodName = "run";
103110
// @TRACE 516=Restoring all buffered messages.
104111
log.fine(CLASS_NAME, methodName, "516");
105-
while(getMessageCount() > 0){
106-
try {
112+
while (getMessageCount() > 0) {
113+
try {
107114
BufferedMessage bufferedMessage = getMessage(0);
108115
callback.publishBufferedMessage(bufferedMessage);
109116
// Publish was successful, remove message from buffer.
110117
deleteMessage(0);
111-
} catch (MqttException ex) {
112-
// Error occurred attempting to publish buffered message likely because the client is not connected
113-
// @TRACE 519=Error occurred attempting to publish buffered message due to disconnect. Exception: {0}.
114-
log.warning(CLASS_NAME, methodName, "519", new Object[]{ex.getMessage()});
115-
break;
116-
}
118+
} catch (MqttException ex) {
119+
// Error occurred attempting to publish buffered message likely because the
120+
// client is not connected
121+
// @TRACE 519=Error occurred attempting to publish buffered message due to disconnect. Exception: {0}:{1}.
122+
log.severe(CLASS_NAME, methodName, "519", new Object[] { ex.getReasonCode(), ex.getMessage() });
123+
break;
124+
117125
}
126+
127+
}
118128
}
119129

120130
public void setPublishCallback(IDisconnectedBufferCallback callback) {
121131
this.callback = callback;
122132
}
123-
124-
public boolean isPersistBuffer(){
133+
134+
public boolean isPersistBuffer() {
125135
return bufferOpts.isPersistBuffer();
126136
}
127137

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttInputStream.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@
3434
* <code>MqttWireMessage</code>.
3535
*/
3636
public class MqttInputStream extends InputStream {
37-
private static final String CLASS_NAME = MqttInputStream.class.getName();
38-
private static final Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
37+
private final String CLASS_NAME = MqttInputStream.class.getName();
38+
private final Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
3939

4040
private ClientState clientState = null;
4141
private DataInputStream in;
@@ -117,8 +117,8 @@ public MqttWireMessage readMqttWireMessage() throws IOException, MqttException {
117117
byte[] header = bais.toByteArray();
118118
System.arraycopy(header,0,packet,0, header.length);
119119
message = MqttWireMessage.createWireMessage(packet);
120-
// @TRACE 501= received {0}
121-
log.fine(CLASS_NAME, methodName, "501",new Object[] {message});
120+
// @TRACE 301= received {0}
121+
log.fine(CLASS_NAME, methodName, "301",new Object[] {message});
122122
}
123123
} catch (SocketTimeoutException e) {
124124
// ignore socket read timeout

0 commit comments

Comments
 (0)