Skip to content

Commit 7b9a209

Browse files
author
Ian Craggs
committed
Refactor MQTTClient so it relies on MQTTAsync fixes for #432
1 parent d32134b commit 7b9a209

2 files changed

Lines changed: 16 additions & 32 deletions

File tree

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1008,14 +1008,14 @@ public IMqttToken subscribe(String[] topicFilters, int[] qos, IMqttMessageListen
10081008
public IMqttToken subscribe(String[] topicFilters, int[] qos, Object userContext, IMqttActionListener callback,
10091009
IMqttMessageListener[] messageListeners) throws MqttException {
10101010

1011-
if ((messageListeners.length != qos.length) || (qos.length != topicFilters.length)) {
1011+
if (messageListeners != null && (messageListeners.length != qos.length) || (qos.length != topicFilters.length)) {
10121012
throw new IllegalArgumentException();
10131013
}
10141014

10151015
// add or remove message handlers to the list for this client
10161016
for (int i = 0; i < topicFilters.length; ++i) {
10171017
MqttTopic.validate(topicFilters[i], true/* allow wildcards */);
1018-
if (messageListeners[i] == null) {
1018+
if (messageListeners == null || messageListeners[i] == null) {
10191019
this.comms.removeMessageListener(topicFilters[i]);
10201020
}
10211021
else {

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttClient.java

Lines changed: 14 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -426,15 +426,7 @@ public void subscribe(String topicFilter, int qos) throws MqttException {
426426
* @see IMqttClient#subscribe(String[], int[])
427427
*/
428428
public void subscribe(String[] topicFilters, int[] qos) throws MqttException {
429-
IMqttToken tok = aClient.subscribe(topicFilters, qos, null, null);
430-
tok.waitForCompletion(getTimeToWait());
431-
int[] grantedQos = tok.getGrantedQos();
432-
for (int i = 0; i < grantedQos.length; ++i) {
433-
qos[i] = grantedQos[i];
434-
}
435-
if (grantedQos.length == 1 && qos[0] == 0x80) {
436-
throw new MqttException(MqttException.REASON_CODE_SUBSCRIBE_FAILED);
437-
}
429+
this.subscribe(topicFilters, qos, null);
438430
}
439431

440432
/* (non-Javadoc)
@@ -463,17 +455,15 @@ public void subscribe(String topicFilter, int qos, IMqttMessageListener messageL
463455
}
464456

465457

466-
public void subscribe(String[] topicFilters, int[] qos, IMqttMessageListener[] messageListeners) throws MqttException {
467-
this.subscribe(topicFilters, qos);
468-
469-
// add or remove message handlers to the list for this client
470-
for (int i = 0; i < topicFilters.length; ++i) {
471-
if (messageListeners[i] == null) {
472-
aClient.comms.removeMessageListener(topicFilters[i]);
473-
}
474-
else {
475-
aClient.comms.setMessageListener(topicFilters[i], messageListeners[i]);
476-
}
458+
public void subscribe(String[] topicFilters, int[] qos, IMqttMessageListener[] messageListeners) throws MqttException {
459+
IMqttToken tok = aClient.subscribe(topicFilters, qos, null, null, messageListeners);
460+
tok.waitForCompletion(getTimeToWait());
461+
int[] grantedQos = tok.getGrantedQos();
462+
for (int i = 0; i < grantedQos.length; ++i) {
463+
qos[i] = grantedQos[i];
464+
}
465+
if (grantedQos.length == 1 && qos[0] == 0x80) {
466+
throw new MqttException(MqttException.REASON_CODE_SUBSCRIBE_FAILED);
477467
}
478468
}
479469

@@ -533,22 +523,16 @@ public IMqttToken subscribeWithResponse(String[] topicFilters, IMqttMessageListe
533523
* @see IMqttClient#subscribeWithResponse(String[], int[])
534524
*/
535525
public IMqttToken subscribeWithResponse(String[] topicFilters, int[] qos) throws MqttException {
536-
IMqttToken tok = aClient.subscribe(topicFilters, qos, null, null);
537-
tok.waitForCompletion(getTimeToWait());
538-
return tok;
526+
return this.subscribeWithResponse(topicFilters, qos, null);
539527
}
540528

541529
/*
542530
* @see IMqttClient#subscribeWithResponse(String[], int[], IMqttMessageListener[])
543531
*/
544532
public IMqttToken subscribeWithResponse(String[] topicFilters, int[] qos, IMqttMessageListener[] messageListeners)
545-
throws MqttException {
546-
IMqttToken tok = this.subscribeWithResponse(topicFilters, qos);
547-
548-
// add message handlers to the list for this client
549-
for (int i = 0; i < topicFilters.length; ++i) {
550-
aClient.comms.setMessageListener(topicFilters[i], messageListeners[i]);
551-
}
533+
throws MqttException {
534+
IMqttToken tok = aClient.subscribe(topicFilters, qos, null, null, messageListeners);
535+
tok.waitForCompletion(getTimeToWait());
552536
return tok;
553537
}
554538

0 commit comments

Comments
 (0)