Skip to content

Commit 2bd8a39

Browse files
committed
Fix for bug 446663. Please see bugs comments for the description.
Signed-off-by: Marco Carrer <marco.carrer@eurotech.com>
1 parent fe628bf commit 2bd8a39

19 files changed

Lines changed: 559 additions & 73 deletions

File tree

org.eclipse.paho.android.service/org.eclipse.paho.android.service/.classpath

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,5 @@
2020
<attribute name="maven.pomderived" value="true"/>
2121
</attributes>
2222
</classpathentry>
23-
<classpathentry kind="output" path="bin/classes"/>
23+
<classpathentry kind="output" path="target/classes"/>
2424
</classpath>

org.eclipse.paho.android.service/org.eclipse.paho.android.service/.project

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@
2121
</arguments>
2222
</buildCommand>
2323
<buildCommand>
24-
<name>org.eclipse.m2e.core.maven2Builder</name>
24+
<name>com.android.ide.eclipse.adt.ApkBuilder</name>
2525
<arguments>
2626
</arguments>
2727
</buildCommand>
2828
<buildCommand>
29-
<name>com.android.ide.eclipse.adt.ApkBuilder</name>
29+
<name>org.eclipse.m2e.core.maven2Builder</name>
3030
<arguments>
3131
</arguments>
3232
</buildCommand>

org.eclipse.paho.client.mqttv3.test/src/test/java/org/eclipse/paho/client/mqttv3/test/SendReceiveAsyncTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,7 @@ public void testVeryLargeMessageWithShortKeepAlive() {
550550

551551
String topic = "testLargeMsg/Topic";
552552
//10MB
553-
int largeSize = 60 * (1 << 20);
553+
int largeSize = 20 * (1 << 20);
554554
byte[] message = new byte[largeSize];
555555

556556
java.util.Arrays.fill(message, (byte) 's');
@@ -562,6 +562,7 @@ public void testVeryLargeMessageWithShortKeepAlive() {
562562
IMqttToken pubToken = mqttClient.publish(topic, message, 0, false, null, null);
563563
log.info("Publishing to..." + topic);
564564
pubToken.waitForCompletion();
565+
log.info("Published");
565566

566567
boolean ok = mqttV3Receiver.validateReceipt(topic, 0, message);
567568
if (!ok) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,331 @@
1+
package org.eclipse.paho.client.mqttv3.test.connectionLoss;
2+
3+
import java.util.Date;
4+
import java.util.Timer;
5+
import java.util.TimerTask;
6+
import java.util.logging.Logger;
7+
8+
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
9+
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
10+
import org.eclipse.paho.client.mqttv3.MqttCallback;
11+
import org.eclipse.paho.client.mqttv3.MqttClient;
12+
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
13+
import org.eclipse.paho.client.mqttv3.MqttException;
14+
import org.eclipse.paho.client.mqttv3.MqttMessage;
15+
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
16+
import org.junit.Assert;
17+
import org.junit.Test;
18+
19+
public class ConnectionLossTest implements MqttCallback
20+
{
21+
static final Class<?> cclass = ConnectionLossTest.class;
22+
private static final String className = cclass.getName();
23+
private static final Logger log = Logger.getLogger(className);
24+
25+
private static final MqttDefaultFilePersistence DATA_STORE = new MqttDefaultFilePersistence("/tmp");
26+
27+
private String username = "username";
28+
private char[] password = "password".toCharArray();
29+
private String clientId = "device-client-id";
30+
private String message = "12345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890";
31+
32+
@Test
33+
public void testConnectionLossWhilePublishingQos0()
34+
throws Exception
35+
{
36+
final int keepAlive = 15;
37+
38+
MqttConnectOptions options = new MqttConnectOptions();
39+
options.setCleanSession(true);
40+
options.setUserName(username);
41+
options.setPassword(password);
42+
options.setKeepAliveInterval(keepAlive);
43+
44+
MqttClient client = new MqttClient("tcp://iot.eclipse.org:1883", clientId, DATA_STORE);
45+
client.setCallback(this);
46+
client.connect(options);
47+
48+
log.info((new Date())+" - Connected.");
49+
for (int i=0; i<10; i++) {
50+
log.info("Disconnect your network in "+(10-i)+" sec...");
51+
client.publish(username+"/"+clientId+"/abc", message.getBytes(), 0, false);
52+
Thread.sleep(1000);
53+
}
54+
55+
final int[] res = new int[1];
56+
new Timer().schedule( new TimerTask() {
57+
@Override
58+
public void run() {
59+
res[0]++;
60+
if (res[0] == keepAlive + 1) {
61+
log.info((new Date())+" - Connection should be lost...");
62+
}
63+
}
64+
}, 0, 1000);
65+
66+
while (client.isConnected() && res[0] < 2*keepAlive) {
67+
try {
68+
client.publish(username+"/"+clientId+"/abc", message.getBytes(), 0, false);
69+
Thread.sleep(1000);
70+
}
71+
catch (MqttException e) {
72+
// ignore
73+
}
74+
}
75+
76+
Assert.assertFalse("Disconected", client.isConnected());
77+
if (client.isConnected()) client.disconnect(0);
78+
client.close();
79+
}
80+
81+
82+
@Test
83+
public void testConnectionLossWhilePublishingQos1()
84+
throws Exception
85+
{
86+
final int keepAlive = 15;
87+
88+
MqttConnectOptions options = new MqttConnectOptions();
89+
options.setCleanSession(true);
90+
options.setUserName(username);
91+
options.setPassword(password);
92+
options.setKeepAliveInterval(keepAlive);
93+
94+
MqttClient client = new MqttClient("tcp://iot.eclipse.org:1883", clientId, DATA_STORE);
95+
client.setCallback(this);
96+
client.connect(options);
97+
98+
log.info((new Date())+" - Connected.");
99+
for (int i=0; i<10; i++) {
100+
log.info("Disconnect your network in "+(10-i)+" sec...");
101+
client.publish(username+"/"+clientId+"/abc", message.getBytes(), 1, false);
102+
Thread.sleep(1000);
103+
}
104+
105+
final int[] res = new int[1];
106+
new Timer().schedule( new TimerTask() {
107+
@Override
108+
public void run() {
109+
res[0]++;
110+
if (res[0] == keepAlive + 1) {
111+
log.info((new Date())+" - Connection should be lost...");
112+
}
113+
}
114+
}, 0, 1000);
115+
116+
while (client.isConnected() && res[0] < 2*keepAlive) {
117+
try {
118+
client.publish(username+"/"+clientId+"/abc", message.getBytes(), 1, false);
119+
Thread.sleep(1000);
120+
}
121+
catch (MqttException e) {
122+
// ignore
123+
}
124+
}
125+
log.info("Finished publishing...");
126+
127+
Assert.assertFalse("Disconected", client.isConnected());
128+
if (client.isConnected()) client.disconnect(0);
129+
client.close();
130+
}
131+
132+
133+
@Test
134+
public void testConnectionLossWhilePublishingQos2()
135+
throws Exception
136+
{
137+
final int keepAlive = 15;
138+
139+
MqttConnectOptions options = new MqttConnectOptions();
140+
options.setCleanSession(true);
141+
options.setUserName(username);
142+
options.setPassword(password);
143+
options.setKeepAliveInterval(keepAlive);
144+
145+
MqttClient client = new MqttClient("tcp://iot.eclipse.org:1883", clientId, DATA_STORE);
146+
client.setCallback(this);
147+
client.connect(options);
148+
149+
log.info((new Date())+" - Connected.");
150+
for (int i=0; i<10; i++) {
151+
log.info("Disconnect your network in "+(10-i)+" sec...");
152+
client.publish(username+"/"+clientId+"/abc", message.getBytes(), 2, false);
153+
Thread.sleep(1000);
154+
}
155+
156+
final int[] res = new int[1];
157+
new Timer().schedule( new TimerTask() {
158+
@Override
159+
public void run() {
160+
res[0]++;
161+
if (res[0] == keepAlive + 1) {
162+
log.info((new Date())+" - Connection should be lost...");
163+
}
164+
}
165+
}, 0, 1000);
166+
167+
while (client.isConnected() && res[0] < 2*keepAlive) {
168+
try {
169+
client.publish(username+"/"+clientId+"/abc", message.getBytes(), 2, false);
170+
Thread.sleep(1000);
171+
}
172+
catch (MqttException e) {
173+
// ignore
174+
}
175+
}
176+
177+
Assert.assertFalse("Disconected", client.isConnected());
178+
if (client.isConnected()) client.disconnect(0);
179+
client.close();
180+
}
181+
182+
183+
@Test
184+
public void testConnectionLossWhilePublishingQos1Async()
185+
throws Exception
186+
{
187+
final int keepAlive = 15;
188+
189+
MqttConnectOptions options = new MqttConnectOptions();
190+
options.setCleanSession(true);
191+
options.setUserName(username);
192+
options.setPassword(password);
193+
options.setKeepAliveInterval(keepAlive);
194+
195+
MqttAsyncClient client = new MqttAsyncClient("tcp://iot.eclipse.org:1883", clientId, DATA_STORE);
196+
client.setCallback(this);
197+
198+
log.info((new Date())+" - Connecting...");
199+
client.connect(options);
200+
while (!client.isConnected()) {
201+
Thread.sleep(1000);
202+
}
203+
204+
log.info((new Date())+" - Connected.");
205+
for (int i=0; i<10; i++) {
206+
log.info("Disconnect your network in "+(10-i)+" sec...");
207+
client.publish(username+"/"+clientId+"/abc", message.getBytes(), 1, false);
208+
Thread.sleep(1000);
209+
}
210+
211+
final int[] res = new int[1];
212+
new Timer().schedule( new TimerTask() {
213+
@Override
214+
public void run() {
215+
res[0]++;
216+
if (res[0] == keepAlive + 1) {
217+
log.info((new Date())+" - Connection should be lost...");
218+
}
219+
}
220+
}, 0, 1000);
221+
222+
boolean stopPublishing = false;
223+
while (client.isConnected() && res[0] < 10*keepAlive) {
224+
if (!stopPublishing) {
225+
try {
226+
client.publish(username+"/"+clientId+"/abc", message.getBytes(), 1, false);
227+
log.info((new Date())+" - Published...");
228+
Thread.sleep(1000);
229+
}
230+
catch (MqttException e) {
231+
stopPublishing = true;
232+
}
233+
}
234+
}
235+
236+
Assert.assertFalse("Disconected", client.isConnected());
237+
if (client.isConnected()) client.disconnect(0);
238+
client.close();
239+
}
240+
241+
242+
@Test
243+
public void testKeepConnectionOpenWhilePublishingQos0()
244+
throws Exception
245+
{
246+
final int keepAlive = 15;
247+
248+
MqttConnectOptions options = new MqttConnectOptions();
249+
options.setCleanSession(true);
250+
options.setUserName(username);
251+
options.setPassword(password);
252+
options.setKeepAliveInterval(keepAlive);
253+
254+
MqttClient client = new MqttClient("tcp://iot.eclipse.org:1883", clientId, DATA_STORE);
255+
client.setCallback(this);
256+
client.connect(options);
257+
258+
log.info((new Date())+" - Connected.");
259+
260+
final int[] res = new int[1];
261+
new Timer().schedule( new TimerTask() {
262+
@Override
263+
public void run() {
264+
res[0]++;
265+
if (res[0] % keepAlive == 0) {
266+
log.info((new Date())+" - Still running keep alive count: "+res[0]+"...");
267+
}
268+
}
269+
}, 0, 1000);
270+
271+
while (client.isConnected() && res[0] < 10*keepAlive) {
272+
client.publish(username+"/"+clientId+"/abc", message.getBytes(), 0, false);
273+
Thread.sleep(1000);
274+
}
275+
276+
Assert.assertTrue("Connected", client.isConnected());
277+
if (client.isConnected()) client.disconnect(0);
278+
client.close();
279+
}
280+
281+
282+
@Test
283+
public void testKeepConnectionOpenIdle()
284+
throws Exception
285+
{
286+
final int keepAlive = 15;
287+
288+
MqttConnectOptions options = new MqttConnectOptions();
289+
options.setCleanSession(true);
290+
options.setUserName(username);
291+
options.setPassword(password);
292+
options.setKeepAliveInterval(keepAlive);
293+
294+
MqttClient client = new MqttClient("tcp://iot.eclipse.org:1883", clientId, DATA_STORE);
295+
client.setCallback(this);
296+
client.connect(options);
297+
298+
log.info((new Date())+" - Connected.");
299+
300+
final int[] res = new int[1];
301+
new Timer().schedule( new TimerTask() {
302+
@Override
303+
public void run() {
304+
res[0]++;
305+
if (res[0] % keepAlive == 0) {
306+
log.info((new Date())+" - Still running keep alive count: "+res[0]+"...");
307+
}
308+
}
309+
}, 0, 1000);
310+
311+
while (client.isConnected() && res[0] < 10*keepAlive) {
312+
Thread.sleep(1000);
313+
}
314+
315+
Assert.assertTrue("Connected", client.isConnected());
316+
if (client.isConnected()) client.disconnect(0);
317+
client.close();
318+
}
319+
320+
public void connectionLost(Throwable cause) {
321+
log.info((new Date())+" - Connection Lost");
322+
}
323+
324+
public void messageArrived(String topic, MqttMessage message) throws Exception {
325+
log.info("Message Arrived on " + topic + " with " + new String(message.getPayload()));
326+
}
327+
328+
public void deliveryComplete(IMqttDeliveryToken token) {
329+
// log.info("Delivery Complete: " + token.getMessageId());
330+
}
331+
}

org.eclipse.paho.client.mqttv3.test/src/test/java/org/eclipse/paho/client/mqttv3/test/utilities/MqttV3Receiver.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,10 @@ public synchronized ReceivedMessage receiveNext(long waitMilliseconds) throws In
136136
* @throws InterruptedException
137137
*/
138138
public boolean validateReceipt(String sendTopic, int expectedQos, byte[] sentBytes) throws MqttException, InterruptedException {
139-
final String methodName = "validateReceipt";
140-
log.entering(className, methodName, new Object[]{sendTopic, expectedQos, sentBytes});
139+
final String methodName = "validateReceipt";
140+
log.entering(className, methodName, new Object[]{sendTopic, expectedQos});
141141

142-
long waitMilliseconds = 10000;
142+
long waitMilliseconds = 40*30000;
143143
ReceivedMessage receivedMessage = receiveNext(waitMilliseconds);
144144
if (receivedMessage == null) {
145145
report(" No message received in waitMilliseconds=" + waitMilliseconds);

org.eclipse.paho.client.mqttv3/.classpath

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<classpath>
3-
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
3+
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/J2SE-1.4"/>
44
<classpathentry kind="con" path="org.eclipse.pde.core.requiredPlugins"/>
55
<classpathentry kind="src" path="src/main/resources/"/>
66
<classpathentry kind="src" path="src/main/java-templates/"/>

0 commit comments

Comments
 (0)