diff --git a/adafruit_minimqtt/adafruit_minimqtt.py b/adafruit_minimqtt/adafruit_minimqtt.py index 2a1d600..ac5f233 100644 --- a/adafruit_minimqtt/adafruit_minimqtt.py +++ b/adafruit_minimqtt/adafruit_minimqtt.py @@ -240,8 +240,8 @@ def __init__( # noqa: PLR0915, PLR0913, Too many statements, Too many arguments self._lw_msg = None self._lw_retain = False - # List of subscribed topics, used for tracking - self._subscribed_topics: List[str] = [] + # List of subscribed topics and the QoS for each, used for tracking + self._subscribed_topics: List[tuple[str, int]] = [] self._on_message_filtered = MQTTMatcher() # Default topic callback methods @@ -837,7 +837,7 @@ def subscribe( # noqa: PLR0912, PLR0915, Too many branches, Too many statements for t, q in topics: if self.on_subscribe is not None: self.on_subscribe(self, self.user_data, t, q) - self._subscribed_topics.append(t) + self._subscribed_topics.append((t, q)) return @@ -866,7 +866,7 @@ def unsubscribe( # noqa: PLR0912, Too many branches self._valid_topic(t) topics.append(t) for t in topics: - if t not in self._subscribed_topics: + if t not in [_t for _t, _ in self._subscribed_topics]: raise MMQTTStateError("Topic must be subscribed to before attempting unsubscribe.") # Assemble packet self.logger.debug("Sending UNSUBSCRIBE to broker...") @@ -907,7 +907,8 @@ def unsubscribe( # noqa: PLR0912, Too many branches for t in topics: if self.on_unsubscribe is not None: self.on_unsubscribe(self, self.user_data, t, self._pid) - self._subscribed_topics.remove(t) + _, q = [(_t, _q) for _t, _q in self._subscribed_topics if _t == t][0] + self._subscribed_topics.remove((t, q)) return if op != MQTT_PUBLISH: # [3.10.4] The Server may continue to deliver existing messages buffered @@ -958,21 +959,31 @@ def reconnect(self, resub_topics: bool = True) -> int: self.logger.debug("Attempting to reconnect with MQTT broker") subscribed_topics = [] - if self.is_connected(): - # disconnect() will reset subscribed topics so stash them now. - if resub_topics: - subscribed_topics = self._subscribed_topics.copy() - self.disconnect() - - ret = self.connect(session_id=self.session_id) - self.logger.debug("Reconnected with broker") - - if resub_topics and subscribed_topics: - self.logger.debug("Attempting to resubscribe to previously subscribed topics.") - self._subscribed_topics = [] - while subscribed_topics: - feed = subscribed_topics.pop() - self.subscribe(feed) + + # disconnect() will reset subscribed topics, so stash them now. + if resub_topics: + subscribed_topics = self._subscribed_topics.copy() + + try: + if self.is_connected(): + self.disconnect() + + ret = self.connect(session_id=self.session_id) + self.logger.debug("Reconnected with broker") + + if resub_topics and subscribed_topics: + self.logger.debug("Attempting to resubscribe to previously subscribed topics.") + self._subscribed_topics = [] + while subscribed_topics: + feed = subscribed_topics.pop() + self.subscribe(*feed) + except Exception: + # Overly-broad exception to address #253; if we're about to fail, make sure that we + # leave a full list of subscribed topics in our class so that we'll properly resub + # on the next retry. + if sorted(self._subscribed_topics) != sorted(subscribed_topics): + self._subscribed_topics = subscribed_topics + raise return ret @@ -1195,7 +1206,7 @@ def _valid_qos(qos_level: int) -> None: """ if isinstance(qos_level, int): if qos_level < 0 or qos_level > 2: - raise NotImplementedError("QoS must be between 1 and 2.") + raise NotImplementedError("QoS must be between 0 and 2.") else: raise ValueError("QoS must be an integer.") diff --git a/tests/test_unsubscribe.py b/tests/test_unsubscribe.py index 0f9ed2f..ff3ac53 100644 --- a/tests/test_unsubscribe.py +++ b/tests/test_unsubscribe.py @@ -167,9 +167,12 @@ def test_unsubscribe(topic, to_send, exp_recv) -> None: mqtt_client.logger = logger if isinstance(topic, str): - mqtt_client._subscribed_topics = [topic] + mqtt_client._subscribed_topics = [(topic, 1)] elif isinstance(topic, list): - mqtt_client._subscribed_topics = topic + if topic and isinstance(topic[0], tuple): + mqtt_client._subscribed_topics = topic + else: + mqtt_client._subscribed_topics = [(t, 1) for t in topic] logger.info(f"unsubscribing from {topic}") mqtt_client.unsubscribe(topic)