Skip to content

Commit 2716d29

Browse files
author
Ian Craggs
committed
Fix for max inflight overrun in V5 buffered reconnect #582
1 parent 5aedf07 commit 2716d29

3 files changed

Lines changed: 9 additions & 13 deletions

File tree

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientComms.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ void internalSend(MqttWireMessage message, MqttToken token) throws MqttException
173173
// Persist if needed and send the message
174174
this.clientState.send(message, token);
175175
} catch (MqttException e) {
176+
token.internalTok.setClient(null); // undo client setting on error
176177
if (message instanceof MqttPublish) {
177178
this.clientState.undo((MqttPublish) message);
178179
}
@@ -946,12 +947,6 @@ class ReconnectDisconnectedBufferCallback implements IDisconnectedBufferCallback
946947

947948
public void publishBufferedMessage(BufferedMessage bufferedMessage) throws MqttException {
948949
if (isConnected()) {
949-
//int qos = ((MqttPublish) bufferedMessage.getMessage()).getQos();
950-
while (clientState.getActualInFlight() >= (mqttConnection.getReceiveMaximum() - 4)) {
951-
// We need to Yield to the other threads to allow the in flight messages to
952-
// clear
953-
Thread.yield();
954-
}
955950
// @TRACE 510=Publising Buffered message message={0}
956951
log.fine(CLASS_NAME, methodName, "510", new Object[] { bufferedMessage.getMessage().getKey() });
957952
internalSend(bufferedMessage.getMessage(), bufferedMessage.getToken());

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientState.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,8 @@ public void send(MqttWireMessage message, MqttToken token) throws MqttException
512512
message.setMessageId(getNextMessageId());
513513
}
514514
// Set Topic Alias if required
515-
if (message instanceof MqttPublish && this.mqttConnection.getOutgoingTopicAliasMaximum() > 0) {
515+
if (message instanceof MqttPublish && ((MqttPublish) message).getTopicName() != null
516+
&& this.mqttConnection.getOutgoingTopicAliasMaximum() > 0) {
516517
String topic = ((MqttPublish) message).getTopicName();
517518
if (outgoingTopicAliases.containsKey(topic)) {
518519
// Existing Topic Alias, Assign it and remove the topic string

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/DisconnectedMessageBuffer.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,15 +105,15 @@ public void run() {
105105
log.fine(CLASS_NAME, methodName, "516");
106106
while(getMessageCount() > 0){
107107
try {
108-
BufferedMessage bufferedMessage = getMessage(0);
109-
callback.publishBufferedMessage(bufferedMessage);
110-
// Publish was successful, remove message from buffer.
111-
deleteMessage(0);
108+
BufferedMessage bufferedMessage = getMessage(0);
109+
callback.publishBufferedMessage(bufferedMessage);
110+
// Publish was successful, remove message from buffer.
111+
deleteMessage(0);
112112
} catch (MqttException ex) {
113113
// Error occurred attempting to publish buffered message likely because the client is not connected
114114
// @TRACE 519=Error occurred attempting to publish buffered message due to disconnect. Exception: {0}.
115-
log.warning(CLASS_NAME, methodName, "519", new Object[]{ex.getMessage()});
116-
break;
115+
//log.warning(CLASS_NAME, methodName, "519", new Object[]{ex.getMessage()});
116+
Thread.yield();
117117
}
118118
}
119119
}

0 commit comments

Comments
 (0)