@@ -241,7 +241,7 @@ def __init__( # noqa: PLR0915, PLR0913, Too many statements, Too many arguments
241241 self ._lw_retain = False
242242
243243 # List of subscribed topics, used for tracking
244- self ._subscribed_topics : List [str ] = []
244+ self ._subscribed_topics : List [tuple [ str , int ] ] = []
245245 self ._on_message_filtered = MQTTMatcher ()
246246
247247 # Default topic callback methods
@@ -837,7 +837,7 @@ def subscribe( # noqa: PLR0912, PLR0915, Too many branches, Too many statements
837837 for t , q in topics :
838838 if self .on_subscribe is not None :
839839 self .on_subscribe (self , self .user_data , t , q )
840- self ._subscribed_topics .append (t )
840+ self ._subscribed_topics .append (( t , q ) )
841841
842842 return
843843
@@ -866,7 +866,7 @@ def unsubscribe( # noqa: PLR0912, Too many branches
866866 self ._valid_topic (t )
867867 topics .append (t )
868868 for t in topics :
869- if t not in self ._subscribed_topics :
869+ if t not in [ _t for _t , _ in self ._subscribed_topics ] :
870870 raise MMQTTStateError ("Topic must be subscribed to before attempting unsubscribe." )
871871 # Assemble packet
872872 self .logger .debug ("Sending UNSUBSCRIBE to broker..." )
@@ -907,7 +907,8 @@ def unsubscribe( # noqa: PLR0912, Too many branches
907907 for t in topics :
908908 if self .on_unsubscribe is not None :
909909 self .on_unsubscribe (self , self .user_data , t , self ._pid )
910- self ._subscribed_topics .remove (t )
910+ _ , q = [(_t , _q ) for _t , _q in self ._subscribed_topics if _t == t ][0 ]
911+ self ._subscribed_topics .remove ((t , q ))
911912 return
912913 if op != MQTT_PUBLISH :
913914 # [3.10.4] The Server may continue to deliver existing messages buffered
@@ -958,21 +959,31 @@ def reconnect(self, resub_topics: bool = True) -> int:
958959
959960 self .logger .debug ("Attempting to reconnect with MQTT broker" )
960961 subscribed_topics = []
961- if self .is_connected ():
962- # disconnect() will reset subscribed topics so stash them now.
963- if resub_topics :
964- subscribed_topics = self ._subscribed_topics .copy ()
965- self .disconnect ()
966-
967- ret = self .connect (session_id = self .session_id )
968- self .logger .debug ("Reconnected with broker" )
969-
970- if resub_topics and subscribed_topics :
971- self .logger .debug ("Attempting to resubscribe to previously subscribed topics." )
972- self ._subscribed_topics = []
973- while subscribed_topics :
974- feed = subscribed_topics .pop ()
975- self .subscribe (feed )
962+
963+ # disconnect() will reset subscribed topics, so stash them now.
964+ if resub_topics :
965+ subscribed_topics = self ._subscribed_topics .copy ()
966+
967+ try :
968+ if self .is_connected ():
969+ self .disconnect ()
970+
971+ ret = self .connect (session_id = self .session_id )
972+ self .logger .debug ("Reconnected with broker" )
973+
974+ if resub_topics and subscribed_topics :
975+ self .logger .debug ("Attempting to resubscribe to previously subscribed topics." )
976+ self ._subscribed_topics = []
977+ while subscribed_topics :
978+ feed = subscribed_topics .pop ()
979+ self .subscribe (* feed )
980+ except Exception :
981+ # Overly-broad exception to address #253; if we're about to fail, make sure that we
982+ # leave a full list of subscribed topics in our class so that we'll properly resub
983+ # on the next retry.
984+ if sorted (self ._subscribed_topics ) != sorted (subscribed_topics ):
985+ self ._subscribed_topics = subscribed_topics
986+ raise
976987
977988 return ret
978989
0 commit comments