Skip to content

Commit 18203b1

Browse files
author
Ian Craggs
committed
Fix for issue #432 - not receiving retained messages
1 parent 8dd628b commit 18203b1

2 files changed

Lines changed: 31 additions & 25 deletions

File tree

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -925,7 +925,6 @@ public IMqttToken subscribe(String[] topicFilters, int[] qos) throws MqttExcepti
925925
*/
926926
public IMqttToken subscribe(String[] topicFilters, int[] qos, Object userContext, IMqttActionListener callback)
927927
throws MqttException {
928-
final String methodName = "subscribe";
929928

930929
if (topicFilters.length != qos.length) {
931930
throw new IllegalArgumentException();
@@ -937,17 +936,22 @@ public IMqttToken subscribe(String[] topicFilters, int[] qos, Object userContext
937936
MqttTopic.validate(topicFilters[i], true/* allow wildcards */);
938937
this.comms.removeMessageListener(topicFilters[i]);
939938
}
939+
940+
return this.subscribeBase(topicFilters, qos, userContext, callback);
941+
}
940942

943+
private IMqttToken subscribeBase(String[] topicFilters, int[] qos, Object userContext, IMqttActionListener callback)
944+
throws MqttException {
945+
final String methodName = "subscribe";
946+
941947
// Only Generate Log string if we are logging at FINE level
942948
if (log.isLoggable(Logger.FINE)) {
943949
StringBuffer subs = new StringBuffer();
944950
for (int i = 0; i < topicFilters.length; i++) {
945951
if (i > 0) {
946952
subs.append(", ");
947953
}
948-
subs.append("topic=").append(topicFilters[i]).append(" qos=").append(qos[i]);
949-
950-
954+
subs.append("topic=").append(topicFilters[i]).append(" qos=").append(qos[i]);
951955
}
952956
// @TRACE 106=Subscribe topicFilter={0} userContext={1} callback={2}
953957
log.fine(CLASS_NAME, methodName, "106", new Object[] { subs.toString(), userContext, callback });
@@ -1010,20 +1014,19 @@ public IMqttToken subscribe(String[] topicFilters, int[] qos, Object userContext
10101014
if ((messageListeners.length != qos.length) || (qos.length != topicFilters.length)) {
10111015
throw new IllegalArgumentException();
10121016
}
1013-
1014-
IMqttToken token = this.subscribe(topicFilters, qos, userContext, callback);
1015-
1017+
10161018
// add or remove message handlers to the list for this client
10171019
for (int i = 0; i < topicFilters.length; ++i) {
1020+
MqttTopic.validate(topicFilters[i], true/* allow wildcards */);
10181021
if (messageListeners[i] == null) {
10191022
this.comms.removeMessageListener(topicFilters[i]);
10201023
}
10211024
else {
10221025
this.comms.setMessageListener(topicFilters[i], messageListeners[i]);
10231026
}
10241027
}
1025-
1026-
return token;
1028+
1029+
return this.subscribeBase(topicFilters, qos, userContext, callback);
10271030
}
10281031

10291032
/*

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttAsyncClient.java

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2009, 2016 IBM Corp.
2+
* Copyright (c) 2009, 2018 IBM Corp.
33
*
44
* All rights reserved. This program and the accompanying materials
55
* are made available under the terms of the Eclipse Public License v1.0
@@ -11,12 +11,7 @@
1111
* http://www.eclipse.org/org/documents/edl-v10.php.
1212
*
1313
* Contributors:
14-
* Dave Locke - initial API and implementation and/or initial documentation
15-
* Ian Craggs - MQTT 3.1.1 support
16-
* Ian Craggs - per subscription message handlers (bug 466579)
17-
* Ian Craggs - ack control (bug 472172)
18-
* James Sutton - Bug 459142 - WebSocket support for the Java client.
19-
* James Sutton - Automatic Reconnect & Offline Buffering.
14+
* James Sutton - initial API and implementation and/or initial documentation
2015
*/
2116

2217
package org.eclipse.paho.mqttv5.client;
@@ -1107,7 +1102,6 @@ public IMqttToken subscribe(MqttSubscription[] subscriptions) throws MqttExcepti
11071102
@Override
11081103
public IMqttToken subscribe(MqttSubscription[] subscriptions, Object userContext, MqttActionListener callback,
11091104
MqttProperties subscriptionProperties) throws MqttException {
1110-
final String methodName = "subscribe";
11111105

11121106
// remove any message handlers for individual topics and validate Topics
11131107
for (int i = 0; i < subscriptions.length; ++i) {
@@ -1117,7 +1111,14 @@ public IMqttToken subscribe(MqttSubscription[] subscriptions, Object userContext
11171111
this.mqttConnection.isWildcardSubscriptionsAvailable(),
11181112
this.mqttConnection.isSharedSubscriptionsAvailable());
11191113
}
1114+
1115+
return this.subscribeBase(subscriptions, userContext, callback, subscriptionProperties);
1116+
}
11201117

1118+
private IMqttToken subscribeBase(MqttSubscription[] subscriptions, Object userContext, MqttActionListener callback,
1119+
MqttProperties subscriptionProperties) throws MqttException {
1120+
final String methodName = "subscribe";
1121+
11211122
// Only Generate Log string if we are logging at FINE level
11221123
if (log.isLoggable(Logger.FINE)) {
11231124
StringBuffer subs = new StringBuffer();
@@ -1208,14 +1209,15 @@ public IMqttToken subscribe(MqttSubscription[] subscriptions, IMqttMessageListen
12081209
public IMqttToken subscribe(MqttSubscription[] subscriptions, Object userContext, MqttActionListener callback,
12091210
IMqttMessageListener[] messageListeners, MqttProperties subscriptionProperties) throws MqttException {
12101211

1211-
IMqttToken token = this.subscribe(subscriptions, userContext, callback, subscriptionProperties);
1212-
12131212
// add message handlers to the list for this client
12141213
for (int i = 0; i < subscriptions.length; ++i) {
1214+
MqttTopicValidator.validate(subscriptions[i].getTopic(),
1215+
this.mqttConnection.isWildcardSubscriptionsAvailable(),
1216+
this.mqttConnection.isSharedSubscriptionsAvailable());
12151217
this.comms.setMessageListener(null, subscriptions[i].getTopic(), messageListeners[i]);
12161218
}
1217-
1218-
return token;
1219+
1220+
return this.subscribeBase(subscriptions, userContext, callback, subscriptionProperties);
12191221
}
12201222

12211223
/*
@@ -1251,15 +1253,16 @@ public IMqttToken subscribe(MqttSubscription[] subscriptions, Object userContext
12511253
subId = this.mqttSession.getNextSubscriptionIdentifier();
12521254
}
12531255
}
1254-
1255-
IMqttToken token = this.subscribe(subscriptions, userContext, callback, subscriptionProperties);
1256-
1256+
12571257
// add message handlers to the list for this client
12581258
for (int i = 0; i < subscriptions.length; ++i) {
1259+
MqttTopicValidator.validate(subscriptions[i].getTopic(),
1260+
this.mqttConnection.isWildcardSubscriptionsAvailable(),
1261+
this.mqttConnection.isSharedSubscriptionsAvailable());
12591262
this.comms.setMessageListener(subId, subscriptions[i].getTopic(), messageListener);
12601263
}
12611264

1262-
return token;
1265+
return this.subscribeBase(subscriptions, userContext, callback, subscriptionProperties);
12631266
}
12641267

12651268
/*

0 commit comments

Comments
 (0)