@@ -253,8 +253,13 @@ public class MqttAsyncClient implements MqttClientInterface, IMqttAsyncClient {
253253 // second
254254 private boolean reconnecting = false ;
255255 private static Object clientLock = new Object (); // Simple lock
256- private MqttSessionState mqttSession = new MqttSessionState (); // Variables that exist within the life of an MQTT session
257- private MqttConnectionState mqttConnection = new MqttConnectionState (); // Variables that exist within the life of an MQTT connection.
256+
257+ // Variables that exist within the life of an MQTT session
258+ private MqttSessionState mqttSession = new MqttSessionState ();
259+
260+ // Variables that exist within the life of an MQTT connection.
261+ private MqttConnectionState mqttConnection = new MqttConnectionState ();
262+
258263 private ScheduledExecutorService executorService ;
259264 private MqttPingSender pingSender ;
260265
@@ -424,9 +429,9 @@ public MqttAsyncClient(String serverURI, String clientId) throws MqttException {
424429 * {@link MqttClientPersistence} interface. An implementer of this interface
425430 * that safely stores messages must be specified in order for delivery of
426431 * messages to be reliable. In addition
427- * {@link MqttConnectionOptions#setCleanStart(boolean)} must be set to false.
428- * In the event that only QoS 0 messages are sent or received or cleanStart is
429- * set to true then a safe store is not needed.
432+ * {@link MqttConnectionOptions#setCleanStart(boolean)} must be set to false. In
433+ * the event that only QoS 0 messages are sent or received or cleanStart is set
434+ * to true then a safe store is not needed.
430435 * </p>
431436 * <p>
432437 * An implementation of file-based persistence is provided in class
@@ -527,9 +532,9 @@ public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence
527532 * {@link MqttClientPersistence} interface. An implementer of this interface
528533 * that safely stores messages must be specified in order for delivery of
529534 * messages to be reliable. In addition
530- * {@link MqttConnectionOptions#setCleanStart(boolean)} must be set to false.
531- * In the event that only QoS 0 messages are sent or received or cleanStart is
532- * set to true then a safe store is not needed.
535+ * {@link MqttConnectionOptions#setCleanStart(boolean)} must be set to false. In
536+ * the event that only QoS 0 messages are sent or received or cleanStart is set
537+ * to true then a safe store is not needed.
533538 * </p>
534539 * <p>
535540 * An implementation of file-based persistence is provided in class
@@ -575,14 +580,13 @@ public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence
575580 DataOutputStream dos = new DataOutputStream (baos );
576581 MqttDataTypes .encodeUTF8 (dos , clientId );
577582 // Remove the two size bytes.
578- if (dos .size () - 2 > 65535 ) {
583+ if (dos .size () - 2 > 65535 ) {
579584 throw new IllegalArgumentException ("ClientId longer than 65535 characters" );
580585 }
581-
586+
582587 } else {
583588 clientId = "" ;
584589 }
585-
586590
587591 MqttConnectionOptions .validateURI (serverURI );
588592
@@ -608,7 +612,8 @@ public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence
608612 log .fine (CLASS_NAME , methodName , "101" , new Object [] { clientId , serverURI , persistence });
609613
610614 this .persistence .open (clientId );
611- this .comms = new ClientComms (this , this .persistence , this .pingSender , this .executorService , this .mqttSession , this .mqttConnection );
615+ this .comms = new ClientComms (this , this .persistence , this .pingSender , this .executorService , this .mqttSession ,
616+ this .mqttConnection );
612617 this .persistence .close ();
613618 this .topics = new Hashtable <String , MqttTopic >();
614619
@@ -899,7 +904,7 @@ public IMqttToken connect(MqttConnectionOptions options, Object userContext, Mqt
899904 userToken , userContext , callback , reconnecting , mqttSession , mqttConnection );
900905 userToken .setActionCallback (connectActionListener );
901906 userToken .setUserContext (this );
902-
907+
903908 this .mqttConnection .setSendReasonMessages (this .connOpts .isSendReasonMessages ());
904909
905910 // If we are using the MqttCallbackExtended, set it on the
@@ -1134,7 +1139,7 @@ public String getCurrentServerURI() {
11341139 * if the topic contains a '+' or '#' wildcard character.
11351140 */
11361141 protected MqttTopic getTopic (String topic ) {
1137- MqttTopicValidator .validate (topic , false /* wildcards NOT allowed */ );
1142+ MqttTopicValidator .validate (topic , false /* wildcards NOT allowed */ , true );
11381143
11391144 MqttTopic result = (MqttTopic ) topics .get (topic );
11401145 if (result == null ) {
@@ -1200,7 +1205,7 @@ public IMqttToken subscribe(String topicFilter, int qos) throws MqttException {
12001205 return this .subscribe (new MqttSubscription [] { new MqttSubscription (topicFilter , qos ) }, null , null ,
12011206 new MqttProperties ());
12021207 }
1203-
1208+
12041209 /*
12051210 * (non-Javadoc)
12061211 *
@@ -1210,8 +1215,7 @@ public IMqttToken subscribe(String topicFilter, int qos) throws MqttException {
12101215 */
12111216 @ Override
12121217 public IMqttToken subscribe (MqttSubscription subscription ) throws MqttException {
1213- return this .subscribe (new MqttSubscription [] { subscription }, null , null ,
1214- new MqttProperties ());
1218+ return this .subscribe (new MqttSubscription [] { subscription }, null , null , new MqttProperties ());
12151219 }
12161220
12171221 /*
@@ -1240,9 +1244,13 @@ public IMqttToken subscribe(MqttSubscription[] subscriptions, Object userContext
12401244 MqttProperties subscriptionProperties ) throws MqttException {
12411245 final String methodName = "subscribe" ;
12421246
1243- // remove any message handlers for individual topics
1247+ // remove any message handlers for individual topics and validate Topics
12441248 for (int i = 0 ; i < subscriptions .length ; ++i ) {
12451249 this .comms .removeMessageListener (subscriptions [i ].getTopic ());
1250+ // Check if the topic filter is valid before subscribing
1251+ MqttTopicValidator .validate (subscriptions [i ].getTopic (),
1252+ this .mqttConnection .isWildcardSubscriptionsAvailable (),
1253+ this .mqttConnection .isSharedSubscriptionsAvailable ());
12461254 }
12471255
12481256 // Only Generate Log string if we are logging at FINE level
@@ -1253,9 +1261,6 @@ public IMqttToken subscribe(MqttSubscription[] subscriptions, Object userContext
12531261 subs .append (", " );
12541262 }
12551263 subs .append (subscriptions [i ].toString ());
1256-
1257- // Check if the topic filter is valid before subscribing
1258- MqttTopicValidator .validate (subscriptions [i ].getTopic (), true /* allow wildcards */ );
12591264 }
12601265 // @TRACE 106=Subscribe topicFilter={0} userContext={1} callback={2}
12611266 log .fine (CLASS_NAME , methodName , "106" , new Object [] { subs .toString (), userContext , callback });
@@ -1365,14 +1370,15 @@ public IMqttToken subscribe(MqttSubscription[] subscriptions, Object userContext
13651370 int subId = subscriptionProperties .getSubscriptionIdentifiers ().get (0 );
13661371
13671372 // Automatic Subscription Identifier Assignment is enabled
1368- if (connOpts .useSubscriptionIdentifiers ()) {
1373+ if (connOpts .useSubscriptionIdentifiers () && this . mqttConnection . isSubscriptionIdentifiersAvailable () ) {
13691374
13701375 // Application is overriding the subscription Identifier
13711376 if (subId != 0 ) {
13721377 // Check that we are not already using this ID, else throw Illegal Argument
13731378 // Exception
13741379 if (this .comms .doesSubscriptionIdentifierExist (subId )) {
1375- throw new IllegalArgumentException (String .format ("The Subscription Identifier %s already exists." , subId ));
1380+ throw new IllegalArgumentException (
1381+ String .format ("The Subscription Identifier %s already exists." , subId ));
13761382 }
13771383
13781384 } else {
@@ -1459,7 +1465,7 @@ public IMqttToken unsubscribe(String[] topicFilters, Object userContext, MqttAct
14591465 // Although we already checked when subscribing, but invalid
14601466 // topic filter is meanless for unsubscribing, just prohibit it
14611467 // to reduce unnecessary control packet send to broker.
1462- MqttTopicValidator .validate (topicFilters [i ], true /* allow wildcards */ );
1468+ MqttTopicValidator .validate (topicFilters [i ], true /* allow wildcards */ , this . mqttConnection . isSharedSubscriptionsAvailable () );
14631469 }
14641470
14651471 // remove message handlers from the list for this client
@@ -1588,7 +1594,7 @@ public IMqttDeliveryToken publish(String topic, MqttMessage message, Object user
15881594 log .fine (CLASS_NAME , methodName , "111" , new Object [] { topic , userContext , callback });
15891595
15901596 // Checks if a topic is valid when publishing a message.
1591- MqttTopicValidator .validate (topic , false /* wildcards NOT allowed */ );
1597+ MqttTopicValidator .validate (topic , false /* wildcards NOT allowed */ , true );
15921598
15931599 MqttDeliveryToken token = new MqttDeliveryToken (getClientId ());
15941600 token .setActionCallback (callback );
0 commit comments