Skip to content

Commit 7fb3938

Browse files
committed
Issue #548 - Further tweaks that should fix Dup PubCom on QoS2 as well as dup message delivery.
Signed-off-by: James Sutton <james.sutton@uk.ibm.com>
1 parent 01967fb commit 7fb3938

10 files changed

Lines changed: 39 additions & 48 deletions

File tree

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/TimerPingSender.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,19 @@ public class TimerPingSender implements MqttPingSender {
3535

3636
private ClientComms comms;
3737
private Timer timer;
38+
private String clientid;
3839

3940
public void init(ClientComms comms) {
4041
if (comms == null) {
4142
throw new IllegalArgumentException("ClientComms cannot be null.");
4243
}
4344
this.comms = comms;
45+
clientid = comms.getClient().getClientId();
46+
log.setResourceName(clientid);
4447
}
4548

4649
public void start() {
4750
final String methodName = "start";
48-
String clientid = comms.getClient().getClientId();
4951

5052
//@Trace 659=start timer for client:{0}
5153
log.fine(CLASS_NAME, methodName, "659", new Object[]{clientid});

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/TimerPingSender.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public void init(ClientComms comms) {
5353
}
5454
this.comms = comms;
5555
clientid = comms.getClient().getClientId();
56+
log.setResourceName(clientid);
5657
}
5758

5859
public void start() {

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientState.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1150,25 +1150,18 @@ protected void handleOrphanedAcks(MqttAck ack) throws MqttException {
11501150
*/
11511151
protected void handleInboundPubRel(MqttPubRel pubRel) throws MqttException {
11521152
final String methodName = "handleInboundPubRel";
1153-
MqttPublish sendMsg = (MqttPublish) inboundQoS2.get(Integer.valueOf(pubRel.getMessageId()));
11541153
if (pubRel.getReasonCodes()[0] > MqttReturnCode.RETURN_CODE_UNSPECIFIED_ERROR) {
11551154
// @TRACE 667=MqttPubRel was received with an error code: key={0} message={1},
11561155
// Reason Code={2}
11571156
log.severe(CLASS_NAME, methodName, "667",
11581157
new Object[] { pubRel.getMessageId(), pubRel.toString(), pubRel.getReasonCodes()[0] });
11591158
throw new MqttException(pubRel.getReasonCodes()[0]);
1160-
}
1161-
if (sendMsg != null) {
1162-
if (callback != null) {
1163-
callback.messageArrived(sendMsg);
1164-
}
11651159
} else {
1166-
// Original publish has already been delivered.
11671160
// Currently this client has no need of the properties, so this is left empty.
11681161
MqttPubComp pubComp = new MqttPubComp(MqttReturnCode.RETURN_CODE_SUCCESS, pubRel.getMessageId(),
11691162
new MqttProperties());
1170-
// TODO - Fix Trace
1171-
log.info(CLASS_NAME, methodName, "Creating MqttPubComp due to pubRel: " + pubComp.toString());
1163+
// @TRACE 668=Creating MqttPubComp: {0}
1164+
log.info(CLASS_NAME, methodName, "668", new Object[] { pubComp.toString()});
11721165
this.send(pubComp, null);
11731166
}
11741167
}

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/CommsCallback.java

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,8 @@ public void connectionLost(MqttException cause, MqttDisconnect message) {
302302
// @TRACE 722=Server initiated disconnect, connection closed. Disconnect={0}
303303
log.fine(CLASS_NAME, methodName, "722", new Object[] { message.toString() });
304304
MqttDisconnectResponse disconnectResponse = new MqttDisconnectResponse(message.getReturnCode(),
305-
message.getProperties().getReasonString(), (ArrayList<UserProperty>) message.getProperties().getUserProperties(),
305+
message.getProperties().getReasonString(),
306+
(ArrayList<UserProperty>) message.getProperties().getUserProperties(),
306307
message.getProperties().getServerReference());
307308
mqttCallback.disconnected(disconnectResponse);
308309
} else if (mqttCallback != null && cause != null) {
@@ -385,13 +386,15 @@ public void messageArrived(MqttPublish sendMessage) {
385386
}
386387
}
387388
}
388-
389+
389390
/**
390391
* This method is called when an Auth Message is received.
391-
* @param authMessage The {@link MqttAuth} message.
392+
*
393+
* @param authMessage
394+
* The {@link MqttAuth} message.
392395
*/
393396
public void authMessageReceived(MqttAuth authMessage) {
394-
if(mqttCallback != null) {
397+
if (mqttCallback != null) {
395398
mqttCallback.authPacketArrived(authMessage.getReturnCode(), authMessage.getProperties());
396399
}
397400
}
@@ -427,11 +430,11 @@ public void quiesce() {
427430
spaceAvailable.notifyAll();
428431
}
429432
}
430-
433+
431434
boolean areQueuesEmpty() {
432-
synchronized (workAvailable) {
433-
return completeQueue.isEmpty() && messageQueue.isEmpty();
434-
}
435+
synchronized (workAvailable) {
436+
return completeQueue.isEmpty() && messageQueue.isEmpty();
437+
}
435438
}
436439

437440
public boolean isQuiesced() {
@@ -441,39 +444,30 @@ public boolean isQuiesced() {
441444
private void handleMessage(MqttPublish publishMessage) throws Exception {
442445
final String methodName = "handleMessage";
443446
// If quisecing process any pending messages.
444-
445447
String destName = publishMessage.getTopicName();
446448

447449
// @TRACE 713=call messageArrived key={0} topic={1}
448450
log.fine(CLASS_NAME, methodName, "713", new Object[] { new Integer(publishMessage.getMessageId()), destName });
449451
deliverMessage(destName, publishMessage.getMessageId(), publishMessage.getMessage());
450452

451-
if (!this.manualAcks) {
452-
if (publishMessage.getMessage().getQos() == 1) {
453-
this.clientComms.internalSend(
454-
new MqttPubAck(MqttReturnCode.RETURN_CODE_SUCCESS, publishMessage.getMessageId(), new MqttProperties()),
455-
new MqttToken(clientComms.getClient().getClientId()));
456-
}
457-
458-
/*else if (publishMessage.getMessage().getQos() == 2) {
459-
this.clientComms.deliveryComplete(publishMessage);
460-
MqttPubComp pubComp = new MqttPubComp(MqttReturnCode.RETURN_CODE_SUCCESS,
461-
publishMessage.getMessageId(), new MqttProperties());
462-
log.info(CLASS_NAME, methodName, "Creating MqttPubComp due to handleMessage: " + pubComp.toString());
463-
464-
this.clientComms.internalSend(pubComp, new MqttToken(clientComms.getClient().getClientId()));
465-
}*/
453+
// If we are not in manual ACK mode:
454+
if (!this.manualAcks && publishMessage.getMessage().getQos() == 1) {
455+
this.clientComms.internalSend(new MqttPubAck(MqttReturnCode.RETURN_CODE_SUCCESS,
456+
publishMessage.getMessageId(), new MqttProperties()),
457+
new MqttToken(clientComms.getClient().getClientId()));
466458
}
467459
}
468460

469461
public void messageArrivedComplete(int messageId, int qos) throws MqttException {
470462
if (qos == 1) {
471-
this.clientComms.internalSend(new MqttPubAck(MqttReturnCode.RETURN_CODE_SUCCESS, messageId, new MqttProperties()),
463+
this.clientComms.internalSend(
464+
new MqttPubAck(MqttReturnCode.RETURN_CODE_SUCCESS, messageId, new MqttProperties()),
472465
new MqttToken(clientComms.getClient().getClientId()));
473466
} else if (qos == 2) {
474467
this.clientComms.deliveryComplete(messageId);
475468
MqttPubComp pubComp = new MqttPubComp(MqttReturnCode.RETURN_CODE_SUCCESS, messageId, new MqttProperties());
476-
log.info(CLASS_NAME, "messageArrivedComplete", "Creating MqttPubComp due to messageArrivedComplete: " + pubComp.toString());
469+
// @TRACE 723=Creating MqttPubComp due to manual ACK: {0}
470+
log.info(CLASS_NAME, "messageArrivedComplete", "723", new Object[] {pubComp.toString()});
477471

478472
this.clientComms.internalSend(pubComp, new MqttToken(clientComms.getClient().getClientId()));
479473
}

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/CommsReceiver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class CommsReceiver implements Runnable {
5151
private Future<?> receiverFuture;
5252

5353
public CommsReceiver(ClientComms clientComms, ClientState clientState, CommsTokenStore tokenStore, InputStream in) {
54-
this.in = new MqttInputStream(clientState, in);
54+
this.in = new MqttInputStream(clientState, in, clientComms.getClient().getClientId());
5555
this.clientComms = clientComms;
5656
this.clientState = clientState;
5757
this.tokenStore = tokenStore;

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/CommsSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class CommsSender implements Runnable {
5151
private Future<?> senderFuture;
5252

5353
public CommsSender(ClientComms clientComms, ClientState clientState, CommsTokenStore tokenStore, OutputStream out) {
54-
this.out = new MqttOutputStream(clientState, out);
54+
this.out = new MqttOutputStream(clientState, out, clientComms.getClient().getClientId());
5555
this.clientComms = clientComms;
5656
this.clientState = clientState;
5757
this.tokenStore = tokenStore;

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/wire/MqttInputStream.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,12 @@ public class MqttInputStream extends InputStream {
4747
private int packetLen;
4848
private byte[] packet;
4949

50-
public MqttInputStream(MqttState clientState, InputStream in) {
50+
public MqttInputStream(MqttState clientState, InputStream in, String clientId) {
5151
this.clientState = clientState;
5252
this.in = new DataInputStream(in);
5353
this.bais = new ByteArrayOutputStream();
5454
this.remLen = -1;
55+
log.setResourceName(clientId);
5556
}
5657

5758
public int read() throws IOException {

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/wire/MqttOutputStream.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,10 @@ public class MqttOutputStream extends OutputStream {
3737
private MqttState clientState = null;
3838
private BufferedOutputStream out;
3939

40-
public MqttOutputStream(MqttState clientState, OutputStream out) {
40+
public MqttOutputStream(MqttState clientState, OutputStream out, String clientId) {
4141
this.clientState = clientState;
4242
this.out = new BufferedOutputStream(out);
43+
log.setResourceName(clientId);
4344
}
4445

4546
public void close() throws IOException {

org.eclipse.paho.mqttv5.client/src/main/resources/org/eclipse/paho/mqttv5/client/internal/nls/logcat.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@
145145
665=Clearing Connection State (Topic Aliases)
146146
666=Orphaned Ack key={0} message={1}
147147
667=MqttPubRel was received with an error code: key={0} message={1}, Reason Code= {2}
148+
668=Creating MqttPubComp: {0}
148149
700=stopping
149150
701=notify workAvailable and wait for run
150151
703=stopped
@@ -164,6 +165,7 @@
164165
720=exception from connectionLost {0}
165166
721=Non-Critical MQTT error thrown, passing back to application={0}
166167
722=Server initiated disconnect, connection closed. Disconnect={0}
168+
723=Creating MqttPubComp due to manual ACK: {0}
167169
800=stopping sender
168170
801=stopped
169171
802=network send key={0} msg={1}

org.eclipse.paho.mqttv5.client/src/test/java/org/eclipse/paho/mqttv5/client/test/BasicTest.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import org.eclipse.paho.mqttv5.client.IMqttDeliveryToken;
99
import org.eclipse.paho.mqttv5.client.IMqttToken;
1010
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
11+
import org.eclipse.paho.mqttv5.client.MqttCallback;
12+
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
1113
import org.eclipse.paho.mqttv5.client.test.logging.LoggingUtilities;
1214
import org.eclipse.paho.mqttv5.client.test.properties.TestProperties;
1315
import org.eclipse.paho.mqttv5.client.test.utilities.MqttV5Receiver;
@@ -16,6 +18,7 @@
1618
import org.eclipse.paho.mqttv5.common.MqttException;
1719
import org.eclipse.paho.mqttv5.common.MqttMessage;
1820
import org.eclipse.paho.mqttv5.common.MqttSubscription;
21+
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
1922
import org.junit.Assert;
2023
import org.junit.BeforeClass;
2124
import org.junit.Test;
@@ -144,20 +147,14 @@ public void testQoS2DuplicateIssue() throws MqttException, InterruptedException
144147
String samplePayload = "Hello World";
145148

146149
log.info("Publishing messages on topics.");
147-
//publishMessage(samplePayload, 0, "a/b/c", asyncClient, timeout);
148-
//publishMessage(samplePayload, 0, "a/0/c", asyncClient, timeout);
149-
//publishMessage(samplePayload, 1, "a/1/c", asyncClient, timeout);
150150
publishMessage(samplePayload, 2, "a/2/c", asyncClient, timeout);
151151

152152

153153
log.info("Waiting for delivery and validating message.");
154-
//Assert.assertTrue(mqttV5Receiver.validateReceipt("a/b/c", 0, samplePayload.getBytes()));
155-
//Assert.assertTrue(mqttV5Receiver.validateReceipt("a/0/c", 0, samplePayload.getBytes()));
156-
//Assert.assertTrue(mqttV5Receiver.validateReceipt("a/1/c", 1, samplePayload.getBytes()));
157154
Assert.assertTrue(mqttV5Receiver.validateReceipt("a/2/c", 2, samplePayload.getBytes()));
155+
log.info("Number of received message: " + mqttV5Receiver.receivedMessageCount());
156+
158157

159-
Thread.sleep(10000);
160-
161158

162159
TestClientUtilities.disconnectAndCloseClient(asyncClient, timeout);
163160
}

0 commit comments

Comments
 (0)