Skip to content

Commit 191b155

Browse files
authored
Merge pull request #225 from jpwsutton/develop
Resolving main issues in Issue #224
2 parents 69e91eb + 26f7214 commit 191b155

3 files changed

Lines changed: 9 additions & 2 deletions

File tree

org.eclipse.paho.client.mqttv3/src/main/java-templates/org/eclipse/paho/client/mqttv3/internal/ClientComms.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,9 @@ public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttExce
150150
if(disconnectedMessageBuffer != null && disconnectedMessageBuffer.getMessageCount() != 0){
151151
//@TRACE 507=Client Connected, Offline Buffer available, but not empty. Adding message to buffer. message={0}
152152
log.fine(CLASS_NAME, methodName, "507", new Object[] {message.getKey()});
153-
this.clientState.persistBufferedMessage(message);
153+
if(disconnectedMessageBuffer.isPersistBuffer()){
154+
this.clientState.persistBufferedMessage(message);
155+
}
154156
disconnectedMessageBuffer.putMessage(message, token);
155157
} else {
156158
this.internalSend(message, token);

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/ClientState.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.eclipse.paho.client.mqttv3.internal.wire.MqttPubRec;
4646
import org.eclipse.paho.client.mqttv3.internal.wire.MqttPubRel;
4747
import org.eclipse.paho.client.mqttv3.internal.wire.MqttPublish;
48+
import org.eclipse.paho.client.mqttv3.internal.wire.MqttSuback;
4849
import org.eclipse.paho.client.mqttv3.internal.wire.MqttWireMessage;
4950
import org.eclipse.paho.client.mqttv3.logging.Logger;
5051
import org.eclipse.paho.client.mqttv3.logging.LoggerFactory;
@@ -562,6 +563,7 @@ public void persistBufferedMessage(MqttWireMessage message) {
562563
// Because the client will have disconnected, we will want to re-open persistence
563564
try {
564565
message.setMessageId(getNextMessageId());
566+
key = getSendBufferedPersistenceKey(message);
565567
try {
566568
persistence.put(key, (MqttPublish) message);
567569
} catch (MqttPersistenceException mpe){
@@ -1000,7 +1002,6 @@ protected void notifyReceivedAck(MqttAck ack) throws MqttException {
10001002
queueLock.notifyAll();
10011003
}
10021004
} else {
1003-
// Sub ack or unsuback
10041005
notifyResult(ack, token, mex);
10051006
releaseMessageId(ack.getMessageId());
10061007
tokenStore.removeToken(ack);

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/DisconnectedMessageBuffer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,5 +122,9 @@ public void run() {
122122
public void setPublishCallback(IDisconnectedBufferCallback callback) {
123123
this.callback = callback;
124124
}
125+
126+
public boolean isPersistBuffer(){
127+
return bufferOpts.isPersistBuffer();
128+
}
125129

126130
}

0 commit comments

Comments
 (0)