Skip to content

Commit c21f93f

Browse files
author
bzhangxmt
committed
Fix Bug 443142 - Deadlocked 'MQTT Rec': When 'MQTT Call:' Dies
The root cause is the CommsCallback.messageArrived, the while condition will never be false when the messageQueue is full and not quiescing, even when the callback thread is trying to stop. Fixed by checking the running flag in CommsCallback.messageArrived. Bug: 443142 Signed-off-by: bzhangxmt <zhbinbj@cn.ibm.com>
1 parent dd99bf2 commit c21f93f

2 files changed

Lines changed: 117 additions & 12 deletions

File tree

  • org.eclipse.paho.client.mqttv3.test/src/test/java/org/eclipse/paho/client/mqttv3/test
  • org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package org.eclipse.paho.client.mqttv3.test;
2+
3+
import java.net.URI;
4+
import java.util.concurrent.CountDownLatch;
5+
import java.util.logging.Level;
6+
import java.util.logging.Logger;
7+
8+
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
9+
import org.eclipse.paho.client.mqttv3.MqttCallback;
10+
import org.eclipse.paho.client.mqttv3.MqttClient;
11+
import org.eclipse.paho.client.mqttv3.MqttMessage;
12+
import org.eclipse.paho.client.mqttv3.test.logging.LoggingUtilities;
13+
import org.eclipse.paho.client.mqttv3.test.properties.TestProperties;
14+
import org.eclipse.paho.client.mqttv3.test.utilities.Utility;
15+
import org.junit.Assert;
16+
import org.junit.BeforeClass;
17+
import org.junit.Test;
18+
19+
/**
20+
* <a href="https://bugs.eclipse.org/bugs/show_bug.cgi?id=443142"> Bug 443142 </a>: Deadlocked "MQTT Rec:" When
21+
* "MQTT Call:" Dies.
22+
* <p>
23+
*
24+
* This bug is caused by the {@link org.eclipse.paho.client.mqttv3.internal.CommsCallback#messageArrived}, the while
25+
* condition will never be false when the {@code messageQueue} is full and not {@code quiescing}, even when the callback
26+
* thread is trying to stop.
27+
*/
28+
public class Bug443142Test {
29+
private static final Logger log = Logger.getLogger(Bug443142Test.class.getName());
30+
private static URI serverURI;
31+
32+
@BeforeClass
33+
public static void setUpBeforeClass() throws Exception {
34+
try {
35+
String methodName = Utility.getMethodName();
36+
LoggingUtilities.banner(log, Bug443142Test.class, methodName);
37+
serverURI = TestProperties.getServerURI();
38+
}
39+
catch (Exception exception) {
40+
log.log(Level.SEVERE, "caught exception:", exception);
41+
throw exception;
42+
}
43+
}
44+
45+
@Test
46+
public void testBug443142() throws Exception {
47+
CountDownLatch stopLatch = new CountDownLatch(1);
48+
MqttClient client1 = new MqttClient(serverURI.toString(), "foo");
49+
client1.connect();
50+
MqttClient client2 = new MqttClient(serverURI.toString(), "bar");
51+
client2.setCallback(new MyMqttCallback(stopLatch));
52+
client2.connect();
53+
client2.subscribe("bar");
54+
55+
// publish messages until the queue is full > 10
56+
for (int i = 0; i < 16; i++) {
57+
MqttMessage message = new MqttMessage(("foo-" + i).getBytes());
58+
client1.publish("bar", message);
59+
log.info("client1 publish: " + message);
60+
}
61+
62+
// wait until the exception is thrown
63+
stopLatch.await();
64+
65+
// wait some time let client2 to shutdown because of the exception thrown from the callback
66+
Thread.sleep(5000);
67+
68+
// client2 should be disconnected
69+
Assert.assertTrue("client1 should connected", client1.isConnected());
70+
Assert.assertFalse("client2 should disconnected", client2.isConnected());
71+
72+
// close client1
73+
client1.disconnect();
74+
client1.close();
75+
Assert.assertFalse("client1 should disconnected", client1.isConnected());
76+
}
77+
78+
private static class MyMqttCallback implements MqttCallback {
79+
private final CountDownLatch stopLatch;
80+
81+
public MyMqttCallback(CountDownLatch stopLatch) {
82+
this.stopLatch = stopLatch;
83+
}
84+
85+
@Override
86+
public void connectionLost(Throwable cause) {
87+
}
88+
89+
@Override
90+
public void messageArrived(String topic, MqttMessage message) throws Exception {
91+
System.out.println(new String(message.getPayload()));
92+
Thread.sleep(5000);
93+
stopLatch.countDown();
94+
throw new RuntimeException("deadlock");
95+
96+
}
97+
98+
@Override
99+
public void deliveryComplete(IMqttDeliveryToken token) {
100+
}
101+
}
102+
103+
}

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/CommsCallback.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public void run() {
122122
// If no work is currently available, then wait until there is some...
123123
try {
124124
synchronized (workAvailable) {
125-
if (running & messageQueue.isEmpty()
125+
if (running && messageQueue.isEmpty()
126126
&& completeQueue.isEmpty()) {
127127
// @TRACE 704=wait for workAvailable
128128
log.fine(CLASS_NAME, methodName, "704");
@@ -166,22 +166,24 @@ public void run() {
166166
if (quiescing) {
167167
clientState.checkQuiesceLock();
168168
}
169-
170-
synchronized (spaceAvailable) {
171-
// Notify the spaceAvailable lock, to say that there's now
172-
// some space on the queue...
173-
174-
// @TRACE 706=notify spaceAvailable
175-
log.fine(CLASS_NAME, methodName, "706");
176-
spaceAvailable.notifyAll();
177-
}
169+
178170
} catch (Throwable ex) {
179171
// Users code could throw an Error or Exception e.g. in the case
180172
// of class NoClassDefFoundError
181173
// @TRACE 714=callback threw exception
182174
log.fine(CLASS_NAME, methodName, "714", null, ex);
183175
running = false;
184176
clientComms.shutdownConnection(null, new MqttException(ex));
177+
178+
} finally {
179+
synchronized (spaceAvailable) {
180+
// Notify the spaceAvailable lock, to say that there's now
181+
// some space on the queue...
182+
183+
// @TRACE 706=notify spaceAvailable
184+
log.fine(CLASS_NAME, methodName, "706");
185+
spaceAvailable.notifyAll();
186+
}
185187
}
186188
}
187189
}
@@ -295,11 +297,11 @@ public void messageArrived(MqttPublish sendMessage) {
295297
// the client protect itself from getting flooded by messages
296298
// from the server.
297299
synchronized (spaceAvailable) {
298-
while (!quiescing && messageQueue.size() >= INBOUND_QUEUE_SIZE) {
300+
while (running && !quiescing && messageQueue.size() >= INBOUND_QUEUE_SIZE) {
299301
try {
300302
// @TRACE 709=wait for spaceAvailable
301303
log.fine(CLASS_NAME, methodName, "709");
302-
spaceAvailable.wait();
304+
spaceAvailable.wait(200);
303305
} catch (InterruptedException ex) {
304306
}
305307
}

0 commit comments

Comments
 (0)