Skip to content

Commit 819bb6d

Browse files
committed
fix: reconnect(): respect QoS and fail-safe
Closes #252 Closes #253
1 parent bf47a0e commit 819bb6d

1 file changed

Lines changed: 30 additions & 19 deletions

File tree

adafruit_minimqtt/adafruit_minimqtt.py

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ def __init__( # noqa: PLR0915, PLR0913, Too many statements, Too many arguments
241241
self._lw_retain = False
242242

243243
# List of subscribed topics, used for tracking
244-
self._subscribed_topics: List[str] = []
244+
self._subscribed_topics: List[tuple[str, int]] = []
245245
self._on_message_filtered = MQTTMatcher()
246246

247247
# Default topic callback methods
@@ -837,7 +837,7 @@ def subscribe( # noqa: PLR0912, PLR0915, Too many branches, Too many statements
837837
for t, q in topics:
838838
if self.on_subscribe is not None:
839839
self.on_subscribe(self, self.user_data, t, q)
840-
self._subscribed_topics.append(t)
840+
self._subscribed_topics.append((t, q))
841841

842842
return
843843

@@ -866,7 +866,7 @@ def unsubscribe( # noqa: PLR0912, Too many branches
866866
self._valid_topic(t)
867867
topics.append(t)
868868
for t in topics:
869-
if t not in self._subscribed_topics:
869+
if t not in [_t for _t, _ in self._subscribed_topics]:
870870
raise MMQTTStateError("Topic must be subscribed to before attempting unsubscribe.")
871871
# Assemble packet
872872
self.logger.debug("Sending UNSUBSCRIBE to broker...")
@@ -907,7 +907,8 @@ def unsubscribe( # noqa: PLR0912, Too many branches
907907
for t in topics:
908908
if self.on_unsubscribe is not None:
909909
self.on_unsubscribe(self, self.user_data, t, self._pid)
910-
self._subscribed_topics.remove(t)
910+
_, q = [(_t, _q) for _t, _q in self._subscribed_topics if _t == t][0]
911+
self._subscribed_topics.remove(t, q)
911912
return
912913
if op != MQTT_PUBLISH:
913914
# [3.10.4] The Server may continue to deliver existing messages buffered
@@ -958,21 +959,31 @@ def reconnect(self, resub_topics: bool = True) -> int:
958959

959960
self.logger.debug("Attempting to reconnect with MQTT broker")
960961
subscribed_topics = []
961-
if self.is_connected():
962-
# disconnect() will reset subscribed topics so stash them now.
963-
if resub_topics:
964-
subscribed_topics = self._subscribed_topics.copy()
965-
self.disconnect()
966-
967-
ret = self.connect(session_id=self.session_id)
968-
self.logger.debug("Reconnected with broker")
969-
970-
if resub_topics and subscribed_topics:
971-
self.logger.debug("Attempting to resubscribe to previously subscribed topics.")
972-
self._subscribed_topics = []
973-
while subscribed_topics:
974-
feed = subscribed_topics.pop()
975-
self.subscribe(feed)
962+
963+
# disconnect() will reset subscribed topics, so stash them now.
964+
if resub_topics:
965+
subscribed_topics = self._subscribed_topics.copy()
966+
967+
try:
968+
if self.is_connected():
969+
self.disconnect()
970+
971+
ret = self.connect(session_id=self.session_id)
972+
self.logger.debug("Reconnected with broker")
973+
974+
if resub_topics and subscribed_topics:
975+
self.logger.debug("Attempting to resubscribe to previously subscribed topics.")
976+
self._subscribed_topics = []
977+
while subscribed_topics:
978+
feed = subscribed_topics.pop()
979+
self.subscribe(feed)
980+
except Exception:
981+
# Overly-broad exception to address #253; if we're about to fail, make sure that we
982+
# leave a full list of subscribed topics in our class so that we'll properly resub
983+
# on the next retry.
984+
if sorted(self._subscribed_topics) != sorted(subscribed_topics):
985+
self._subscribed_topics = subscribed_topics
986+
raise
976987

977988
return ret
978989

0 commit comments

Comments
 (0)