Skip to content

Commit 5787d6e

Browse files
author
Ranjan Dasgupta
authored
Merge pull request #760 from rdasgupt/fix596rework
Fix conflicts in PR 596
2 parents d7a3bdb + 88b645e commit 5787d6e

4 files changed

Lines changed: 115 additions & 0 deletions

File tree

org.eclipse.paho.client.mqttv3.test/src/test/java/org/eclipse/paho/client/mqttv3/test/automaticReconnect/OfflineBufferingTest.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
3030
import org.eclipse.paho.client.mqttv3.MqttException;
3131
import org.eclipse.paho.client.mqttv3.MqttMessage;
32+
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
3233
import org.eclipse.paho.client.mqttv3.internal.wire.MqttPublish;
3334
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
3435
import org.eclipse.paho.client.mqttv3.test.logging.LoggingUtilities;
@@ -312,6 +313,75 @@ public void testDeleteOldestBufferedMessages() throws Exception {
312313
proxy.disableProxy();
313314
}
314315

316+
317+
/**
318+
* Tests that the buffer correctly handles persisted messages being buffered when the
319+
* buffer is full and deleteOldestBufferedMessage is set to true.
320+
*/
321+
@Test
322+
public void testDeleteOldestBufferedMessagesWithPersistance() throws Exception {
323+
String methodName = Utility.getMethodName();
324+
LoggingUtilities.banner(log, cclass, methodName);
325+
int maxMessages = 10;
326+
327+
// Tokens
328+
IMqttToken connectToken;
329+
330+
MqttClientPersistence persistence = new MemoryPersistence();
331+
332+
// Client Options
333+
MqttConnectOptions options = new MqttConnectOptions();
334+
options.setCleanSession(false);
335+
options.setAutomaticReconnect(false);
336+
MqttAsyncClient client = new MqttAsyncClient("tcp://localhost:" + proxy.getLocalPort(), methodName, persistence);
337+
DisconnectedBufferOptions disconnectedOpts = new DisconnectedBufferOptions();
338+
disconnectedOpts.setBufferEnabled(true);
339+
// Set buffer to 100 to save time
340+
disconnectedOpts.setBufferSize(maxMessages);
341+
disconnectedOpts.setPersistBuffer(true);
342+
disconnectedOpts.setDeleteOldestMessages(true);
343+
client.setBufferOpts(disconnectedOpts);
344+
345+
// Enable Proxy & Connect to server
346+
proxy.enableProxy();
347+
connectToken = client.connect(options);
348+
connectToken.waitForCompletion();
349+
boolean isConnected = client.isConnected();
350+
log.info("First Connection isConnected: " + isConnected);
351+
Assert.assertTrue(isConnected);
352+
353+
// Disable Proxy and cause disconnect
354+
proxy.disableProxy();
355+
isConnected = client.isConnected();
356+
log.info("Proxy Disconnect isConnected: " + isConnected);
357+
Assert.assertFalse(isConnected);
358+
359+
int x;
360+
361+
// Publish 10 messages
362+
for (x = 0; x < maxMessages; x++) {
363+
MqttMessage message = new MqttMessage(Integer.toString(x).getBytes());
364+
client.publish(topicPrefix + methodName, message);
365+
}
366+
367+
// Publish one message too many
368+
log.info("About to publish one message too many");
369+
client.publish(topicPrefix + methodName, new MqttMessage(Integer.toString(x).getBytes()));
370+
371+
Assert.assertFalse(persistence.containsKey("sb-1"));
372+
Assert.assertTrue(persistence.containsKey("sb-2"));
373+
Assert.assertTrue(persistence.containsKey("sb-11"));
374+
375+
// Make sure that the message now at index 0 in the buffer is '1'
376+
// instead of '0'
377+
MqttMessage messageAt0 = client.getBufferedMessage(0);
378+
String msg = new String(messageAt0.getPayload());
379+
Assert.assertEquals("1", msg);
380+
client.close();
381+
client = null;
382+
proxy.disableProxy();
383+
}
384+
315385
/**
316386
* Tests that A message cannot be buffered when the buffer is full and
317387
* deleteOldestBufferedMessage is set to false.

org.eclipse.paho.client.mqttv3/src/main/java-templates/org/eclipse/paho/client/mqttv3/internal/ClientComms.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -874,13 +874,26 @@ public void notifyConnect() {
874874
log.fine(CLASS_NAME, methodName, "509", null);
875875

876876
disconnectedMessageBuffer.setPublishCallback(new ReconnectDisconnectedBufferCallback(methodName));
877+
disconnectedMessageBuffer.setMessageDiscardedCallBack(new MessageDiscardedCallback());
877878
if (executorService == null) {
878879
new Thread(disconnectedMessageBuffer).start();
879880
} else {
880881
executorService.execute(disconnectedMessageBuffer);
881882
}
882883
}
883884
}
885+
886+
887+
class MessageDiscardedCallback implements IDiscardedBufferMessageCallback {
888+
889+
@Override
890+
public void messageDiscarded(MqttWireMessage message) {
891+
if(disconnectedMessageBuffer.isPersistBuffer()) {
892+
clientState.unPersistBufferedMessage(message);
893+
}
894+
}
895+
}
896+
884897

885898
class ReconnectDisconnectedBufferCallback implements IDisconnectedBufferCallback{
886899

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/DisconnectedMessageBuffer.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class DisconnectedMessageBuffer implements Runnable {
3333
private ArrayList<BufferedMessage> buffer;
3434
private final Object bufLock = new Object(); // Used to synchronise the buffer
3535
private IDisconnectedBufferCallback callback;
36+
private IDiscardedBufferMessageCallback messageDiscardedCallBack;
3637

3738
public DisconnectedMessageBuffer(DisconnectedBufferOptions options) {
3839
this.bufferOpts = options;
@@ -63,6 +64,10 @@ public void putMessage(MqttWireMessage message, MqttToken token) throws MqttExce
6364
if (buffer.size() < bufferOpts.getBufferSize()) {
6465
buffer.add(bufferedMessage);
6566
} else if (bufferOpts.isDeleteOldestMessages() == true) {
67+
if(messageDiscardedCallBack != null){
68+
BufferedMessage discardedMessage = (BufferedMessage) buffer.get(0);
69+
messageDiscardedCallBack.messageDiscarded(discardedMessage.getMessage());
70+
}
6671
buffer.remove(0);
6772
buffer.add(bufferedMessage);
6873
} else {
@@ -146,4 +151,8 @@ public boolean isPersistBuffer() {
146151
return bufferOpts.isPersistBuffer();
147152
}
148153

154+
public void setMessageDiscardedCallBack(IDiscardedBufferMessageCallback callback) {
155+
this.messageDiscardedCallBack = callback;
156+
}
157+
149158
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*******************************************************************************
2+
*
3+
* All rights reserved. This program and the accompanying materials
4+
* are made available under the terms of the Eclipse Public License v2.0
5+
* and Eclipse Distribution License v1.0 which accompany this distribution.
6+
*
7+
* The Eclipse Public License is available at
8+
* https://www.eclipse.org/legal/epl-2.0
9+
* and the Eclipse Distribution License is available at
10+
* https://www.eclipse.org/org/documents/edl-v10.php
11+
*
12+
* Contributors:
13+
* alexm 26/10/18
14+
*/
15+
16+
package org.eclipse.paho.client.mqttv3.internal;
17+
18+
import org.eclipse.paho.client.mqttv3.internal.wire.MqttWireMessage;
19+
20+
public interface IDiscardedBufferMessageCallback {
21+
void messageDiscarded(MqttWireMessage message);
22+
}
23+

0 commit comments

Comments
 (0)