Skip to content

Commit b4e26f0

Browse files
authored
Issue 216 (#266)
Adding subscribeWithResponse call to MqttClient Signed-off-by: James Sutton <james.sutton@uk.ibm.com>
1 parent c098026 commit b4e26f0

2 files changed

Lines changed: 345 additions & 1 deletion

File tree

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/IMqttClient.java

Lines changed: 270 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,8 +435,277 @@ public interface IMqttClient { //extends IMqttAsyncClient {
435435
* @throws MqttException if there was an error registering the subscription.
436436
* @throws IllegalArgumentException if the two supplied arrays are not the same size.
437437
*/
438-
public void subscribe(String[] topicFilters, int[] qos, IMqttMessageListener[] messageListeners) throws MqttException;
438+
public void subscribe(String[] topicFilters, int[] qos, IMqttMessageListener[] messageListeners) throws MqttException;
439439

440+
/**
441+
* Subscribe to a topic, which may include wildcards using a QoS of 1.
442+
*
443+
* @see #subscribeWithResponse(String[], int[])
444+
*
445+
* @param topicFilter the topic to subscribe to, which can include wildcards.
446+
* @return token used to track the subscribe after it has completed.
447+
* @throws MqttException if there was an error registering the subscription.
448+
*/
449+
public IMqttToken subscribeWithResponse(String topicFilter) throws MqttException;
450+
451+
/**
452+
* Subscribe to a topic, which may include wildcards using a QoS of 1.
453+
*
454+
* @see #subscribeWithResponse(String[], int[])
455+
*
456+
* @param topicFilter the topic to subscribe to, which can include wildcards.
457+
* @param messageListener a callback to handle incoming messages
458+
* @return token used to track the subscribe after it has completed.
459+
* @throws MqttException if there was an error registering the subscription.
460+
*/
461+
public IMqttToken subscribeWithResponse(String topicFilter, IMqttMessageListener messageListener) throws MqttException;
462+
463+
464+
/**
465+
* Subscribe to a topic, which may include wildcards.
466+
*
467+
* @see #subscribeWithResponse(String[], int[])
468+
*
469+
* @param topicFilter the topic to subscribe to, which can include wildcards.
470+
* @param qos the maximum quality of service at which to subscribe. Messages
471+
* published at a lower quality of service will be received at the published
472+
* QoS. Messages published at a higher quality of service will be received using
473+
* the QoS specified on the subscribe.
474+
* @return token used to track the subscribe after it has completed.
475+
* @throws MqttException if there was an error registering the subscription.
476+
*/
477+
public IMqttToken subscribeWithResponse(String topicFilter, int qos) throws MqttException;
478+
479+
/**
480+
* Subscribe to a topic, which may include wildcards.
481+
*
482+
* @see #subscribeWithResponse(String[], int[])
483+
*
484+
* @param topicFilter the topic to subscribe to, which can include wildcards.
485+
* @param qos the maximum quality of service at which to subscribe. Messages
486+
* published at a lower quality of service will be received at the published
487+
* QoS. Messages published at a higher quality of service will be received using
488+
* the QoS specified on the subscribe.
489+
* @param messageListener a callback to handle incoming messages
490+
* @return token used to track the subscribe after it has completed.
491+
* @throws MqttException if there was an error registering the subscription.
492+
*/
493+
public IMqttToken subscribeWithResponse(String topicFilter, int qos, IMqttMessageListener messageListener) throws MqttException;
494+
495+
/**
496+
* Subscribes to a one or more topics, which may include wildcards using a QoS of 1.
497+
*
498+
* @see #subscribeWithResponse(String[], int[])
499+
*
500+
* @param topicFilters the topic to subscribe to, which can include wildcards.
501+
* @return token used to track the subscribe after it has completed.
502+
* @throws MqttException if there was an error registering the subscription.
503+
*/
504+
public IMqttToken subscribeWithResponse(String[] topicFilters) throws MqttException;
505+
506+
/**
507+
* Subscribes to a one or more topics, which may include wildcards using a QoS of 1.
508+
*
509+
* @see #subscribeWithResponse(String[], int[])
510+
*
511+
* @param topicFilters the topic to subscribe to, which can include wildcards.
512+
* @param messageListeners one or more callbacks to handle incoming messages
513+
* @return token used to track the subscribe after it has completed.
514+
* @throws MqttException if there was an error registering the subscription.
515+
*/
516+
public IMqttToken subscribeWithResponse(String[] topicFilters, IMqttMessageListener[] messageListeners) throws MqttException;
517+
518+
/**
519+
* Subscribes to multiple topics, each of which may include wildcards.
520+
* <p>The {@link #setCallback(MqttCallback)} method
521+
* should be called before this method, otherwise any received messages
522+
* will be discarded.
523+
* </p>
524+
* <p>
525+
* If (@link MqttConnectOptions#setCleanSession(boolean)} was set to true
526+
* when when connecting to the server then the subscription remains in place
527+
* until either:
528+
* <ul>
529+
* <li>The client disconnects</li>
530+
* <li>An unsubscribe method is called to un-subscribe the topic</li>
531+
* </li>
532+
* </p>
533+
* <p>
534+
* If (@link MqttConnectOptions#setCleanSession(boolean)} was set to false
535+
* when when connecting to the server then the subscription remains in place
536+
* until either:
537+
* <ul>
538+
* <li>An unsubscribe method is called to unsubscribe the topic</li>
539+
* <li>The client connects with cleanSession set to true</ul>
540+
* </li>
541+
* With cleanSession set to false the MQTT server will store messages on
542+
* behalf of the client when the client is not connected. The next time the
543+
* client connects with the <bold>same client ID</bold> the server will
544+
* deliver the stored messages to the client.
545+
* </p>
546+
*
547+
* <p>The "topic filter" string used when subscribing
548+
* may contain special characters, which allow you to subscribe to multiple topics
549+
* at once.</p>
550+
* <p>The topic level separator is used to introduce structure into the topic, and
551+
* can therefore be specified within the topic for that purpose. The multi-level
552+
* wildcard and single-level wildcard can be used for subscriptions, but they
553+
* cannot be used within a topic by the publisher of a message.
554+
* <dl>
555+
* <dt>Topic level separator</dt>
556+
* <dd>The forward slash (/) is used to separate each level within
557+
* a topic tree and provide a hierarchical structure to the topic space. The
558+
* use of the topic level separator is significant when the two wildcard characters
559+
* are encountered in topics specified by subscribers.</dd>
560+
*
561+
* <dt>Multi-level wildcard</dt>
562+
* <dd><p>The number sign (#) is a wildcard character that matches
563+
* any number of levels within a topic. For example, if you subscribe to
564+
* <span><span class="filepath">finance/stock/ibm/#</span></span>, you receive
565+
* messages on these topics:
566+
* <pre> finance/stock/ibm<br /> finance/stock/ibm/closingprice<br /> finance/stock/ibm/currentprice</pre>
567+
* </p>
568+
* <p>The multi-level wildcard
569+
* can represent zero or more levels. Therefore, <em>finance/#</em> can also match
570+
* the singular <em>finance</em>, where <em>#</em> represents zero levels. The topic
571+
* level separator is meaningless in this context, because there are no levels
572+
* to separate.</p>
573+
*
574+
* <p>The <span>multi-level</span> wildcard can
575+
* be specified only on its own or next to the topic level separator character.
576+
* Therefore, <em>#</em> and <em>finance/#</em> are both valid, but <em>finance#</em> is
577+
* not valid. <span>The multi-level wildcard must be the last character
578+
* used within the topic tree. For example, <em>finance/#</em> is valid but
579+
* <em>finance/#/closingprice</em> is not valid.</span></p></dd>
580+
*
581+
* <dt>Single-level wildcard</dt>
582+
* <dd><p>The plus sign (+) is a wildcard character that matches only one topic
583+
* level. For example, <em>finance/stock/+</em> matches
584+
* <em>finance/stock/ibm</em> and <em>finance/stock/xyz</em>,
585+
* but not <em>finance/stock/ibm/closingprice</em>. Also, because the single-level
586+
* wildcard matches only a single level, <em>finance/+</em> does not match <em>finance</em>.</p>
587+
*
588+
* <p>Use
589+
* the single-level wildcard at any level in the topic tree, and in conjunction
590+
* with the multilevel wildcard. Specify the single-level wildcard next to the
591+
* topic level separator, except when it is specified on its own. Therefore,
592+
* <em>+</em> and <em>finance/+</em> are both valid, but <em>finance+</em> is
593+
* not valid. <span>The single-level wildcard can be used at the end of the
594+
* topic tree or within the topic tree.
595+
* For example, <em>finance/+</em> and <em>finance/+/ibm</em> are both valid.</span></p>
596+
* </dd>
597+
* </dl>
598+
* </p>
599+
*
600+
* <p>This is a blocking method that returns once subscribe completes</p>
601+
*
602+
* @param topicFilters one or more topics to subscribe to, which can include wildcards.
603+
* @param qos the maximum quality of service to subscribe each topic at.Messages
604+
* published at a lower quality of service will be received at the published
605+
* QoS. Messages published at a higher quality of service will be received using
606+
* the QoS specified on the subscribe.
607+
* @throws MqttException if there was an error registering the subscription.
608+
* @return token used to track the subscribe after it has completed.
609+
* @throws IllegalArgumentException if the two supplied arrays are not the same size.
610+
*/
611+
public IMqttToken subscribeWithResponse(String[] topicFilters, int[] qos) throws MqttException;
612+
613+
/**
614+
* Subscribes to multiple topics, each of which may include wildcards.
615+
* <p>The {@link #setCallback(MqttCallback)} method
616+
* should be called before this method, otherwise any received messages
617+
* will be discarded.
618+
* </p>
619+
* <p>
620+
* If (@link MqttConnectOptions#setCleanSession(boolean)} was set to true
621+
* when when connecting to the server then the subscription remains in place
622+
* until either:
623+
* <ul>
624+
* <li>The client disconnects</li>
625+
* <li>An unsubscribe method is called to un-subscribe the topic</li>
626+
* </li>
627+
* </p>
628+
* <p>
629+
* If (@link MqttConnectOptions#setCleanSession(boolean)} was set to false
630+
* when when connecting to the server then the subscription remains in place
631+
* until either:
632+
* <ul>
633+
* <li>An unsubscribe method is called to unsubscribe the topic</li>
634+
* <li>The client connects with cleanSession set to true</ul>
635+
* </li>
636+
* With cleanSession set to false the MQTT server will store messages on
637+
* behalf of the client when the client is not connected. The next time the
638+
* client connects with the <bold>same client ID</bold> the server will
639+
* deliver the stored messages to the client.
640+
* </p>
641+
*
642+
* <p>The "topic filter" string used when subscribing
643+
* may contain special characters, which allow you to subscribe to multiple topics
644+
* at once.</p>
645+
* <p>The topic level separator is used to introduce structure into the topic, and
646+
* can therefore be specified within the topic for that purpose. The multi-level
647+
* wildcard and single-level wildcard can be used for subscriptions, but they
648+
* cannot be used within a topic by the publisher of a message.
649+
* <dl>
650+
* <dt>Topic level separator</dt>
651+
* <dd>The forward slash (/) is used to separate each level within
652+
* a topic tree and provide a hierarchical structure to the topic space. The
653+
* use of the topic level separator is significant when the two wildcard characters
654+
* are encountered in topics specified by subscribers.</dd>
655+
*
656+
* <dt>Multi-level wildcard</dt>
657+
* <dd><p>The number sign (#) is a wildcard character that matches
658+
* any number of levels within a topic. For example, if you subscribe to
659+
* <span><span class="filepath">finance/stock/ibm/#</span></span>, you receive
660+
* messages on these topics:
661+
* <pre> finance/stock/ibm<br /> finance/stock/ibm/closingprice<br /> finance/stock/ibm/currentprice</pre>
662+
* </p>
663+
* <p>The multi-level wildcard
664+
* can represent zero or more levels. Therefore, <em>finance/#</em> can also match
665+
* the singular <em>finance</em>, where <em>#</em> represents zero levels. The topic
666+
* level separator is meaningless in this context, because there are no levels
667+
* to separate.</p>
668+
*
669+
* <p>The <span>multi-level</span> wildcard can
670+
* be specified only on its own or next to the topic level separator character.
671+
* Therefore, <em>#</em> and <em>finance/#</em> are both valid, but <em>finance#</em> is
672+
* not valid. <span>The multi-level wildcard must be the last character
673+
* used within the topic tree. For example, <em>finance/#</em> is valid but
674+
* <em>finance/#/closingprice</em> is not valid.</span></p></dd>
675+
*
676+
* <dt>Single-level wildcard</dt>
677+
* <dd><p>The plus sign (+) is a wildcard character that matches only one topic
678+
* level. For example, <em>finance/stock/+</em> matches
679+
* <em>finance/stock/ibm</em> and <em>finance/stock/xyz</em>,
680+
* but not <em>finance/stock/ibm/closingprice</em>. Also, because the single-level
681+
* wildcard matches only a single level, <em>finance/+</em> does not match <em>finance</em>.</p>
682+
*
683+
* <p>Use
684+
* the single-level wildcard at any level in the topic tree, and in conjunction
685+
* with the multilevel wildcard. Specify the single-level wildcard next to the
686+
* topic level separator, except when it is specified on its own. Therefore,
687+
* <em>+</em> and <em>finance/+</em> are both valid, but <em>finance+</em> is
688+
* not valid. <span>The single-level wildcard can be used at the end of the
689+
* topic tree or within the topic tree.
690+
* For example, <em>finance/+</em> and <em>finance/+/ibm</em> are both valid.</span></p>
691+
* </dd>
692+
* </dl>
693+
* </p>
694+
*
695+
* <p>This is a blocking method that returns once subscribe completes</p>
696+
*
697+
* @param topicFilters one or more topics to subscribe to, which can include wildcards.
698+
* @param qos the maximum quality of service to subscribe each topic at.Messages
699+
* published at a lower quality of service will be received at the published
700+
* QoS. Messages published at a higher quality of service will be received using
701+
* the QoS specified on the subscribe.
702+
* @param messageListeners one or more callbacks to handle incoming messages
703+
* @throws MqttException if there was an error registering the subscription.
704+
* @return token used to track the subscribe after it has completed.
705+
* @throws IllegalArgumentException if the two supplied arrays are not the same size.
706+
*/
707+
public IMqttToken subscribeWithResponse(String[] topicFilters, int[] qos, IMqttMessageListener[] messageListeners) throws MqttException;
708+
440709
/**
441710
* Requests the server unsubscribe the client from a topic.
442711
*

org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttClient.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,81 @@ public void subscribe(String[] topicFilters, int[] qos, IMqttMessageListener[] m
368368
aClient.comms.setMessageListener(topicFilters[i], messageListeners[i]);
369369
}
370370
}
371+
372+
/*
373+
* @see IMqttClient#subscribeWithResponse(String)
374+
*/
375+
public IMqttToken subscribeWithResponse(String topicFilter) throws MqttException {
376+
return this.subscribeWithResponse(new String[] {topicFilter}, new int[] {1});
377+
}
378+
379+
/*
380+
* @see IMqttClient#subscribeWithResponse(String, IMqttMessageListener)
381+
*/
382+
public IMqttToken subscribeWithResponse(String topicFilter, IMqttMessageListener messageListener) throws MqttException {
383+
return this.subscribeWithResponse(new String[] {topicFilter}, new int[] {1}, new IMqttMessageListener[] {messageListener});
384+
}
385+
386+
/*
387+
* @see IMqttClient#subscribeWithResponse(String, int)
388+
*/
389+
public IMqttToken subscribeWithResponse(String topicFilter, int qos) throws MqttException {
390+
return this.subscribeWithResponse(new String[] {topicFilter}, new int[] {qos});
391+
}
392+
393+
/*
394+
* @see IMqttClient#subscribeWithResponse(String, int, IMqttMessageListener)
395+
*/
396+
public IMqttToken subscribeWithResponse(String topicFilter, int qos, IMqttMessageListener messageListener)
397+
throws MqttException {
398+
return this.subscribeWithResponse(new String[] {topicFilter}, new int[] {qos}, new IMqttMessageListener[] {messageListener});
399+
}
400+
401+
/*
402+
* @see IMqttClient#subscribeWithResponse(String[])
403+
*/
404+
public IMqttToken subscribeWithResponse(String[] topicFilters) throws MqttException {
405+
int[] qos = new int[topicFilters.length];
406+
for (int i=0; i<qos.length; i++) {
407+
qos[i] = 1;
408+
}
409+
return this.subscribeWithResponse(topicFilters, qos);
410+
}
411+
412+
/*
413+
* @see IMqttClient#subscribeWithResponse(String[], IMqttMessageListener[])
414+
*/
415+
public IMqttToken subscribeWithResponse(String[] topicFilters, IMqttMessageListener[] messageListeners)
416+
throws MqttException {
417+
int[] qos = new int[topicFilters.length];
418+
for (int i=0; i<qos.length; i++) {
419+
qos[i] = 1;
420+
}
421+
return this.subscribeWithResponse(topicFilters, qos, messageListeners);
422+
}
423+
424+
/*
425+
* @see IMqttClient#subscribeWithResponse(String[], int[])
426+
*/
427+
public IMqttToken subscribeWithResponse(String[] topicFilters, int[] qos) throws MqttException {
428+
IMqttToken tok = aClient.subscribe(topicFilters, qos, null, null);
429+
tok.waitForCompletion(getTimeToWait());
430+
return tok;
431+
}
432+
433+
/*
434+
* @see IMqttClient#subscribeWithResponse(String[], int[], IMqttMessageListener[])
435+
*/
436+
public IMqttToken subscribeWithResponse(String[] topicFilters, int[] qos, IMqttMessageListener[] messageListeners)
437+
throws MqttException {
438+
IMqttToken tok = this.subscribeWithResponse(topicFilters, qos);
439+
440+
// add message handlers to the list for this client
441+
for (int i = 0; i < topicFilters.length; ++i) {
442+
aClient.comms.setMessageListener(topicFilters[i], messageListeners[i]);
443+
}
444+
return tok;
445+
}
371446

372447
/*
373448
* @see IMqttClient#unsubscribe(String)

0 commit comments

Comments
 (0)