Skip to content

Commit 75370af

Browse files
author
Ian Craggs
committed
Cleanup subscribe listeners after failed subscribe #432
1 parent 1dbafa6 commit 75370af

2 files changed

Lines changed: 36 additions & 5 deletions

File tree

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
4545
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
4646
import org.eclipse.paho.client.mqttv3.util.Debug;
47+
import org.eclipse.paho.client.mqttv3.IMqttToken;
4748

4849
/**
4950
* Lightweight client for talking to an MQTT server using non-blocking methods
@@ -1014,7 +1015,7 @@ public IMqttToken subscribe(String[] topicFilters, int[] qos, Object userContext
10141015
if ((messageListeners.length != qos.length) || (qos.length != topicFilters.length)) {
10151016
throw new IllegalArgumentException();
10161017
}
1017-
1018+
10181019
// add or remove message handlers to the list for this client
10191020
for (int i = 0; i < topicFilters.length; ++i) {
10201021
MqttTopic.validate(topicFilters[i], true/* allow wildcards */);
@@ -1025,8 +1026,18 @@ public IMqttToken subscribe(String[] topicFilters, int[] qos, Object userContext
10251026
this.comms.setMessageListener(topicFilters[i], messageListeners[i]);
10261027
}
10271028
}
1028-
1029-
return this.subscribeBase(topicFilters, qos, userContext, callback);
1029+
1030+
IMqttToken token = null;
1031+
try {
1032+
token = this.subscribeBase(topicFilters, qos, userContext, callback);
1033+
} catch(Exception e) {
1034+
// if the subscribe fails, then we have to remove the message handlers
1035+
for (int i = 0; i < topicFilters.length; ++i) {
1036+
this.comms.removeMessageListener(topicFilters[i]);
1037+
}
1038+
throw e;
1039+
}
1040+
return token;
10301041
}
10311042

10321043
/*

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttAsyncClient.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1217,7 +1217,17 @@ public IMqttToken subscribe(MqttSubscription[] subscriptions, Object userContext
12171217
this.comms.setMessageListener(null, subscriptions[i].getTopic(), messageListeners[i]);
12181218
}
12191219

1220-
return this.subscribeBase(subscriptions, userContext, callback, subscriptionProperties);
1220+
IMqttToken token = null;
1221+
try {
1222+
token = this.subscribeBase(subscriptions, userContext, callback, subscriptionProperties);
1223+
} catch(Exception e) {
1224+
// if the subscribe fails, then we have to remove the message handlers
1225+
for (int i = 0; i < subscriptions.length; ++i) {
1226+
this.comms.removeMessageListener(subscriptions[i].getTopic());
1227+
}
1228+
throw e;
1229+
}
1230+
return token;
12211231
}
12221232

12231233
/*
@@ -1262,7 +1272,17 @@ public IMqttToken subscribe(MqttSubscription[] subscriptions, Object userContext
12621272
this.comms.setMessageListener(subId, subscriptions[i].getTopic(), messageListener);
12631273
}
12641274

1265-
return this.subscribeBase(subscriptions, userContext, callback, subscriptionProperties);
1275+
IMqttToken token = null;
1276+
try {
1277+
token = this.subscribeBase(subscriptions, userContext, callback, subscriptionProperties);
1278+
} catch(Exception e) {
1279+
// if the subscribe fails, then we have to remove the message handlers
1280+
for (int i = 0; i < subscriptions.length; ++i) {
1281+
this.comms.removeMessageListener(subscriptions[i].getTopic());
1282+
}
1283+
throw e;
1284+
}
1285+
return token;
12661286
}
12671287

12681288
/*

0 commit comments

Comments
 (0)