Skip to content

Commit b753a04

Browse files
author
Ian Craggs
committed
Latest MQTT V5 updates
1 parent 0500133 commit b753a04

8 files changed

Lines changed: 851 additions & 372 deletions

File tree

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/IMqttAsyncClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,9 @@ void disconnectForcibly(long quiesceTimeout, long disconnectTimeout, boolean sen
392392
*/
393393
IMqttToken subscribe(String topicFilter, int qos, Object userContext, MqttActionListener callback)
394394
throws MqttException;
395+
396+
IMqttToken subscribe(String[] topicFilters, int[] qos, Object userContext, MqttActionListener callback)
397+
throws MqttException;
395398

396399
/**
397400
* Subscribe to a topic, which may include wildcards.
@@ -412,6 +415,8 @@ IMqttToken subscribe(String topicFilter, int qos, Object userContext, MqttAction
412415
* if there was an error registering the subscription.
413416
*/
414417
IMqttToken subscribe(String topicFilter, int qos) throws MqttException;
418+
419+
IMqttToken subscribe(String[] topicFilters, int[] qos) throws MqttException;
415420

416421
/**
417422
* Subscribe to multiple topics, each of which may include wildcards.

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/IMqttClient.java

Lines changed: 4 additions & 237 deletions
Original file line numberDiff line numberDiff line change
@@ -181,27 +181,6 @@ public interface IMqttClient { //extends IMqttAsyncClient {
181181
*/
182182
public void disconnectForcibly(long quiesceTimeout, long disconnectTimeout) throws MqttException;
183183

184-
/**
185-
* Subscribe to a topic, which may include wildcards using a QoS of 1.
186-
*
187-
* @see #subscribe(String[], int[])
188-
*
189-
* @param topicFilter the topic to subscribe to, which can include wildcards.
190-
* @throws MqttException if there was an error registering the subscription.
191-
* @throws MqttSecurityException if the client is not authorized to register the subscription
192-
*/
193-
public void subscribe(String topicFilter) throws MqttException, MqttSecurityException;
194-
195-
/**
196-
* Subscribes to a one or more topics, which may include wildcards using a QoS of 1.
197-
*
198-
* @see #subscribe(String[], int[])
199-
*
200-
* @param topicFilters the topic to subscribe to, which can include wildcards.
201-
* @throws MqttException if there was an error registering the subscription.
202-
*/
203-
public void subscribe(String[] topicFilters) throws MqttException;
204-
205184
/**
206185
* Subscribe to a topic, which may include wildcards.
207186
*
@@ -214,7 +193,7 @@ public interface IMqttClient { //extends IMqttAsyncClient {
214193
* the QoS specified on the subscribe.
215194
* @throws MqttException if there was an error registering the subscription.
216195
*/
217-
public void subscribe(String topicFilter, int qos) throws MqttException;
196+
public IMqttToken subscribe(String topicFilter, int qos) throws MqttException;
218197

219198
/**
220199
* Subscribes to multiple topics, each of which may include wildcards.
@@ -312,20 +291,8 @@ public interface IMqttClient { //extends IMqttAsyncClient {
312291
* @throws MqttException if there was an error registering the subscription.
313292
* @throws IllegalArgumentException if the two supplied arrays are not the same size.
314293
*/
315-
public void subscribe(String[] topicFilters, int[] qos) throws MqttException;
294+
public IMqttToken subscribe(String[] topicFilters, int[] qos) throws MqttException;
316295

317-
/**
318-
* Subscribe to a topic, which may include wildcards using a QoS of 1.
319-
*
320-
* @see #subscribe(String[], int[])
321-
*
322-
* @param topicFilter the topic to subscribe to, which can include wildcards.
323-
* @param messageListener a callback to handle incoming messages
324-
* @throws MqttException if there was an error registering the subscription.
325-
* @throws MqttSecurityException if the client is not authorized to register the subscription
326-
*/
327-
public void subscribe(String topicFilter, IMqttMessageListener messageListener) throws MqttException, MqttSecurityException;
328-
329296
/**
330297
* Subscribes to a one or more topics, which may include wildcards using a QoS of 1.
331298
*
@@ -335,7 +302,7 @@ public interface IMqttClient { //extends IMqttAsyncClient {
335302
* @param messageListener one callbacks to handle incoming messages
336303
* @throws MqttException if there was an error registering the subscription.
337304
*/
338-
public void subscribe(String topicFilter, int qos, IMqttMessageListener messageListener) throws MqttException;
305+
public IMqttToken subscribe(String topicFilter, int qos, IMqttMessageListener messageListener) throws MqttException;
339306

340307
/**
341308
* Subscribes to multiple topics, each of which may include wildcards.
@@ -432,207 +399,7 @@ public interface IMqttClient { //extends IMqttAsyncClient {
432399
* @throws MqttException if there was an error registering the subscription.
433400
* @throws IllegalArgumentException if the two supplied arrays are not the same size.
434401
*/
435-
public void subscribe(String[] topicFilters, int[] qos, IMqttMessageListener[] messageListeners) throws MqttException;
436-
437-
/**
438-
* Subscribe to a topic, which may include wildcards using a QoS of 1.
439-
*
440-
* @see #subscribeWithResponse(MqttSubscription[], IMqttMessageListener[])
441-
*
442-
* @param topicFilter the topic to subscribe to, which can include wildcards.
443-
* @return token used to track the subscribe after it has completed.
444-
* @throws MqttException if there was an error registering the subscription.
445-
*/
446-
public IMqttToken subscribeWithResponse(String topicFilter) throws MqttException;
447-
448-
/**
449-
* Subscribes to multiple topics, each of which may include wildcards.
450-
* <p>The {@link #setCallback(MqttCallback)} method
451-
* should be called before this method, otherwise any received messages
452-
* will be discarded.
453-
* </p>
454-
* <p>
455-
* If (@link MqttConnectOptions#setCleanStart(boolean)} was set to true
456-
* when when connecting to the server then the subscription remains in place
457-
* until either:</p>
458-
* <ul>
459-
* <li>The client disconnects</li>
460-
* <li>An unsubscribe method is called to un-subscribe the topic</li>
461-
* </ul>
462-
* <p>
463-
* If (@link MqttConnectOptions#setCleanStart(boolean)} was set to false
464-
* when when connecting to the server then the subscription remains in place
465-
* until either:</p>
466-
* <ul>
467-
* <li>An unsubscribe method is called to unsubscribe the topic</li>
468-
* <li>The client connects with cleanStart set to true</li>
469-
* </ul>
470-
* <p>
471-
* With cleanStart set to false the MQTT server will store messages on
472-
* behalf of the client when the client is not connected. The next time the
473-
* client connects with the <b>same client ID</b> the server will
474-
* deliver the stored messages to the client.
475-
* </p>
476-
*
477-
* <p>The "topic filter" string used when subscribing
478-
* may contain special characters, which allow you to subscribe to multiple topics
479-
* at once.</p>
480-
* <p>The topic level separator is used to introduce structure into the topic, and
481-
* can therefore be specified within the topic for that purpose. The multi-level
482-
* wildcard and single-level wildcard can be used for subscriptions, but they
483-
* cannot be used within a topic by the publisher of a message.
484-
* <dl>
485-
* <dt>Topic level separator</dt>
486-
* <dd>The forward slash (/) is used to separate each level within
487-
* a topic tree and provide a hierarchical structure to the topic space. The
488-
* use of the topic level separator is significant when the two wildcard characters
489-
* are encountered in topics specified by subscribers.</dd>
490-
*
491-
* <dt>Multi-level wildcard</dt>
492-
* <dd><p>The number sign (#) is a wildcard character that matches
493-
* any number of levels within a topic. For example, if you subscribe to
494-
* <span><span class="filepath">finance/stock/ibm/#</span></span>, you receive
495-
* messages on these topics:</p>
496-
* <ul>
497-
* <li><pre>finance/stock/ibm</pre></li>
498-
* <li><pre>finance/stock/ibm/closingprice</pre></li>
499-
* <li><pre>finance/stock/ibm/currentprice</pre></li>
500-
* </ul>
501-
* <p>The multi-level wildcard
502-
* can represent zero or more levels. Therefore, <em>finance/#</em> can also match
503-
* the singular <em>finance</em>, where <em>#</em> represents zero levels. The topic
504-
* level separator is meaningless in this context, because there are no levels
505-
* to separate.</p>
506-
*
507-
* <p>The <span>multi-level</span> wildcard can
508-
* be specified only on its own or next to the topic level separator character.
509-
* Therefore, <em>#</em> and <em>finance/#</em> are both valid, but <em>finance#</em> is
510-
* not valid. <span>The multi-level wildcard must be the last character
511-
* used within the topic tree. For example, <em>finance/#</em> is valid but
512-
* <em>finance/#/closingprice</em> is not valid.</span></p></dd>
513-
*
514-
* <dt>Single-level wildcard</dt>
515-
* <dd><p>The plus sign (+) is a wildcard character that matches only one topic
516-
* level. For example, <em>finance/stock/+</em> matches
517-
* <em>finance/stock/ibm</em> and <em>finance/stock/xyz</em>,
518-
* but not <em>finance/stock/ibm/closingprice</em>. Also, because the single-level
519-
* wildcard matches only a single level, <em>finance/+</em> does not match <em>finance</em>.</p>
520-
*
521-
* <p>Use
522-
* the single-level wildcard at any level in the topic tree, and in conjunction
523-
* with the multilevel wildcard. Specify the single-level wildcard next to the
524-
* topic level separator, except when it is specified on its own. Therefore,
525-
* <em>+</em> and <em>finance/+</em> are both valid, but <em>finance+</em> is
526-
* not valid. <span>The single-level wildcard can be used at the end of the
527-
* topic tree or within the topic tree.
528-
* For example, <em>finance/+</em> and <em>finance/+/ibm</em> are both valid.</span></p>
529-
* </dd>
530-
* </dl>
531-
*
532-
* <p>This is a blocking method that returns once subscribe completes</p>
533-
*
534-
* @param subscriptions one or more {@link MqttSubscription} defining the subscription to be made.
535-
* @throws MqttException if there was an error registering the subscription.
536-
* @return token used to track the subscribe after it has completed.
537-
* @throws IllegalArgumentException if the two supplied arrays are not the same size.
538-
*/
539-
public IMqttToken subscribeWithResponse(MqttSubscription[] subscriptions) throws MqttException;
540-
541-
/**
542-
* Subscribes to multiple topics, each of which may include wildcards.
543-
* <p>The {@link #setCallback(MqttCallback)} method
544-
* should be called before this method, otherwise any received messages
545-
* will be discarded.
546-
* </p>
547-
* <p>
548-
* If (@link MqttConnectOptions#setCleanStart(boolean)} was set to true
549-
* when when connecting to the server then the subscription remains in place
550-
* until either:</p>
551-
* <ul>
552-
* <li>The client disconnects</li>
553-
* <li>An unsubscribe method is called to un-subscribe the topic</li>
554-
* </ul>
555-
*
556-
* <p>
557-
* If (@link MqttConnectOptions#setCleanStart(boolean)} was set to false
558-
* when when connecting to the server then the subscription remains in place
559-
* until either:</p>
560-
* <ul>
561-
* <li>An unsubscribe method is called to unsubscribe the topic</li>
562-
* <li>The client connects with cleanStart set to true</li>
563-
* </ul>
564-
* <p>
565-
* With cleanStart set to false the MQTT server will store messages on
566-
* behalf of the client when the client is not connected. The next time the
567-
* client connects with the <b>same client ID</b> the server will
568-
* deliver the stored messages to the client.
569-
* </p>
570-
*
571-
* <p>The "topic filter" string used when subscribing
572-
* may contain special characters, which allow you to subscribe to multiple topics
573-
* at once.</p>
574-
* <p>The topic level separator is used to introduce structure into the topic, and
575-
* can therefore be specified within the topic for that purpose. The multi-level
576-
* wildcard and single-level wildcard can be used for subscriptions, but they
577-
* cannot be used within a topic by the publisher of a message.
578-
* <dl>
579-
* <dt>Topic level separator</dt>
580-
* <dd>The forward slash (/) is used to separate each level within
581-
* a topic tree and provide a hierarchical structure to the topic space. The
582-
* use of the topic level separator is significant when the two wildcard characters
583-
* are encountered in topics specified by subscribers.</dd>
584-
*
585-
* <dt>Multi-level wildcard</dt>
586-
* <dd><p>The number sign (#) is a wildcard character that matches
587-
* any number of levels within a topic. For example, if you subscribe to
588-
* <span><span class="filepath">finance/stock/ibm/#</span></span>, you receive
589-
* messages on these topics:</p>
590-
* <ul>
591-
* <li><pre>finance/stock/ibm</pre></li>
592-
* <li><pre>finance/stock/ibm/closingprice</pre></li>
593-
* <li><pre>finance/stock/ibm/currentprice</pre></li>
594-
* </ul>
595-
* <p>The multi-level wildcard
596-
* can represent zero or more levels. Therefore, <em>finance/#</em> can also match
597-
* the singular <em>finance</em>, where <em>#</em> represents zero levels. The topic
598-
* level separator is meaningless in this context, because there are no levels
599-
* to separate.</p>
600-
*
601-
* <p>The <span>multi-level</span> wildcard can
602-
* be specified only on its own or next to the topic level separator character.
603-
* Therefore, <em>#</em> and <em>finance/#</em> are both valid, but <em>finance#</em> is
604-
* not valid. <span>The multi-level wildcard must be the last character
605-
* used within the topic tree. For example, <em>finance/#</em> is valid but
606-
* <em>finance/#/closingprice</em> is not valid.</span></p></dd>
607-
*
608-
* <dt>Single-level wildcard</dt>
609-
* <dd><p>The plus sign (+) is a wildcard character that matches only one topic
610-
* level. For example, <em>finance/stock/+</em> matches
611-
* <em>finance/stock/ibm</em> and <em>finance/stock/xyz</em>,
612-
* but not <em>finance/stock/ibm/closingprice</em>. Also, because the single-level
613-
* wildcard matches only a single level, <em>finance/+</em> does not match <em>finance</em>.</p>
614-
*
615-
* <p>Use
616-
* the single-level wildcard at any level in the topic tree, and in conjunction
617-
* with the multilevel wildcard. Specify the single-level wildcard next to the
618-
* topic level separator, except when it is specified on its own. Therefore,
619-
* <em>+</em> and <em>finance/+</em> are both valid, but <em>finance+</em> is
620-
* not valid. <span>The single-level wildcard can be used at the end of the
621-
* topic tree or within the topic tree.
622-
* For example, <em>finance/+</em> and <em>finance/+/ibm</em> are both valid.</span></p>
623-
* </dd>
624-
* </dl>
625-
626-
*
627-
* <p>This is a blocking method that returns once subscribe completes</p>
628-
*
629-
* @param subscriptions one or more {@link MqttSubscription} defining the subscription to be made.
630-
* @param messageListeners one or more callbacks to handle incoming messages
631-
* @throws MqttException if there was an error registering the subscription.
632-
* @return token used to track the subscribe after it has completed.
633-
* @throws IllegalArgumentException if the two supplied arrays are not the same size.
634-
*/
635-
public IMqttToken subscribeWithResponse(MqttSubscription[] subscriptions, IMqttMessageListener[] messageListeners) throws MqttException;
402+
public IMqttToken subscribe(String[] topicFilters, int[] qos, IMqttMessageListener[] messageListeners) throws MqttException;
636403

637404
/**
638405
* Requests the server unsubscribe the client from a topic.

org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/MqttAsyncClient.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@
3333
import org.eclipse.paho.mqttv5.client.internal.MqttSessionState;
3434
import org.eclipse.paho.mqttv5.client.internal.NetworkModule;
3535
import org.eclipse.paho.mqttv5.client.internal.NetworkModuleService;
36-
import org.eclipse.paho.mqttv5.client.logging.Logger;
37-
import org.eclipse.paho.mqttv5.client.logging.LoggerFactory;
3836
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
3937
import org.eclipse.paho.mqttv5.client.persist.MqttDefaultFilePersistence;
4038
import org.eclipse.paho.mqttv5.client.util.Debug;
39+
import org.eclipse.paho.mqttv5.client.logging.Logger;
40+
import org.eclipse.paho.mqttv5.client.logging.LoggerFactory;
4141
import org.eclipse.paho.mqttv5.common.ExceptionHelper;
4242
import org.eclipse.paho.mqttv5.common.MqttException;
4343
import org.eclipse.paho.mqttv5.common.MqttMessage;
@@ -1049,6 +1049,16 @@ public IMqttToken subscribe(String topicFilter, int qos, Object userContext, Mqt
10491049
return this.subscribe(new MqttSubscription[] { new MqttSubscription(topicFilter, qos) }, userContext, callback,
10501050
new MqttProperties());
10511051
}
1052+
1053+
@Override
1054+
public IMqttToken subscribe(String[] topicFilters, int[] qoss, Object userContext, MqttActionListener callback)
1055+
throws MqttException {
1056+
MqttSubscription[] subs = new MqttSubscription[topicFilters.length];
1057+
for (int i = 0; i < topicFilters.length; ++ i) {
1058+
subs[i] = new MqttSubscription(topicFilters[i], qoss[i]);
1059+
}
1060+
return this.subscribe(subs, userContext, callback, new MqttProperties());
1061+
}
10521062

10531063
/*
10541064
* (non-Javadoc)
@@ -1062,6 +1072,11 @@ public IMqttToken subscribe(String topicFilter, int qos) throws MqttException {
10621072
return this.subscribe(new MqttSubscription[] { new MqttSubscription(topicFilter, qos) }, null, null,
10631073
new MqttProperties());
10641074
}
1075+
1076+
@Override
1077+
public IMqttToken subscribe(String[] topicFilters, int[] qoss) throws MqttException {
1078+
return this.subscribe(topicFilters, qoss, null, null);
1079+
}
10651080

10661081
/*
10671082
* (non-Javadoc)
@@ -1211,7 +1226,11 @@ public IMqttToken subscribe(MqttSubscription[] subscriptions, Object userContext
12111226
MqttTopicValidator.validate(subscriptions[i].getTopic(),
12121227
this.mqttConnection.isWildcardSubscriptionsAvailable(),
12131228
this.mqttConnection.isSharedSubscriptionsAvailable());
1214-
this.comms.setMessageListener(null, subscriptions[i].getTopic(), messageListeners[i]);
1229+
if (messageListeners == null || messageListeners[i] == null) {
1230+
this.comms.removeMessageListener(subscriptions[i].getTopic());
1231+
} else {
1232+
this.comms.setMessageListener(null, subscriptions[i].getTopic(), messageListeners[i]);
1233+
}
12151234
}
12161235

12171236
IMqttToken token = null;
@@ -1266,7 +1285,11 @@ public IMqttToken subscribe(MqttSubscription[] subscriptions, Object userContext
12661285
MqttTopicValidator.validate(subscriptions[i].getTopic(),
12671286
this.mqttConnection.isWildcardSubscriptionsAvailable(),
12681287
this.mqttConnection.isSharedSubscriptionsAvailable());
1269-
this.comms.setMessageListener(subId, subscriptions[i].getTopic(), messageListener);
1288+
if (messageListener == null) {
1289+
this.comms.removeMessageListener(subscriptions[i].getTopic());
1290+
} else {
1291+
this.comms.setMessageListener(subId, subscriptions[i].getTopic(), messageListener);
1292+
}
12701293
}
12711294

12721295
IMqttToken token = null;

0 commit comments

Comments
 (0)