-
Notifications
You must be signed in to change notification settings - Fork 51
fix: reconnect(): respect QoS and fail-safe #254
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -241,7 +241,7 @@ def __init__( # noqa: PLR0915, PLR0913, Too many statements, Too many arguments | |
| self._lw_retain = False | ||
|
|
||
| # List of subscribed topics, used for tracking | ||
| self._subscribed_topics: List[str] = [] | ||
| self._subscribed_topics: List[tuple[str, int]] = [] | ||
| self._on_message_filtered = MQTTMatcher() | ||
|
|
||
| # Default topic callback methods | ||
|
|
@@ -837,7 +837,7 @@ def subscribe( # noqa: PLR0912, PLR0915, Too many branches, Too many statements | |
| for t, q in topics: | ||
| if self.on_subscribe is not None: | ||
| self.on_subscribe(self, self.user_data, t, q) | ||
| self._subscribed_topics.append(t) | ||
| self._subscribed_topics.append((t, q)) | ||
|
|
||
| return | ||
|
|
||
|
|
@@ -866,7 +866,7 @@ def unsubscribe( # noqa: PLR0912, Too many branches | |
| self._valid_topic(t) | ||
| topics.append(t) | ||
| for t in topics: | ||
| if t not in self._subscribed_topics: | ||
| if t not in [_t for _t, _ in self._subscribed_topics]: | ||
| raise MMQTTStateError("Topic must be subscribed to before attempting unsubscribe.") | ||
| # Assemble packet | ||
| self.logger.debug("Sending UNSUBSCRIBE to broker...") | ||
|
|
@@ -907,7 +907,8 @@ def unsubscribe( # noqa: PLR0912, Too many branches | |
| for t in topics: | ||
| if self.on_unsubscribe is not None: | ||
| self.on_unsubscribe(self, self.user_data, t, self._pid) | ||
| self._subscribed_topics.remove(t) | ||
| _, q = [(_t, _q) for _t, _q in self._subscribed_topics if _t == t][0] | ||
| self._subscribed_topics.remove((t, q)) | ||
| return | ||
| if op != MQTT_PUBLISH: | ||
| # [3.10.4] The Server may continue to deliver existing messages buffered | ||
|
|
@@ -958,21 +959,31 @@ def reconnect(self, resub_topics: bool = True) -> int: | |
|
|
||
| self.logger.debug("Attempting to reconnect with MQTT broker") | ||
| subscribed_topics = [] | ||
| if self.is_connected(): | ||
| # disconnect() will reset subscribed topics so stash them now. | ||
| if resub_topics: | ||
| subscribed_topics = self._subscribed_topics.copy() | ||
| self.disconnect() | ||
|
|
||
| ret = self.connect(session_id=self.session_id) | ||
| self.logger.debug("Reconnected with broker") | ||
|
|
||
| if resub_topics and subscribed_topics: | ||
| self.logger.debug("Attempting to resubscribe to previously subscribed topics.") | ||
| self._subscribed_topics = [] | ||
| while subscribed_topics: | ||
| feed = subscribed_topics.pop() | ||
| self.subscribe(feed) | ||
|
|
||
| # disconnect() will reset subscribed topics, so stash them now. | ||
| if resub_topics: | ||
| subscribed_topics = self._subscribed_topics.copy() | ||
|
|
||
| try: | ||
| if self.is_connected(): | ||
| self.disconnect() | ||
|
|
||
| ret = self.connect(session_id=self.session_id) | ||
| self.logger.debug("Reconnected with broker") | ||
|
|
||
| if resub_topics and subscribed_topics: | ||
| self.logger.debug("Attempting to resubscribe to previously subscribed topics.") | ||
| self._subscribed_topics = [] | ||
| while subscribed_topics: | ||
| feed = subscribed_topics.pop() | ||
| self.subscribe(*feed) | ||
| except Exception: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if the broad exception could be reduced to the MQTT exception ?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be clear, no matter what the exception is, we re-raise it (see the bare As I see it, there are three options here, in addition to what I've implemented:
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the detailed information about the thought process, really appreciated. I think the key question here is whether anything besides
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, I see. Thank you for laying that out so clearly! To work out the best path, I think it's helpful to have a real scenario. One of the lines within the try-catch is: self.logger.debug("Reconnected with broker")Let us imagine that a custom global logging handler has a PotM bug, and that an exception will be raised from I put myself in the position of an engineer (who doesn't control the MiniMQTT library) who became aware of this bug when it triggered last week, but doesn't quite know how to reproduce it yet. Helpfully, the custom logger handler throws a corresponding current_state = State()
while True:
try:
current_state.run_main_loop_once()
except CustomLoggingException as e:
# upload lots of debugging info, then...
passAnd inside of Perfect! Now I've got resilient code that won't crash in the face of a CLE, but will give me lots of debugging info. Problem is, and we happen to fail our However, I think the bigger issue is that, if that bug is found a different way that doesn't result in a partial re-subscription, then even given a very skilled programmer who is tasked with working around that bug (and, let us say, is somehow prevented from directly addressing the bug itself), their solution almost certainly would rely on
Both of these require a lot more (branching) code, and both carry costs in at least two of the three categories of CPU, memory, and/or network traffic. Further, they require a level of defensive coding that seems unreasonable to expect from a consumer of this library. Put simply, our API contract isn't supposed to require this sort of legwork from our upstream consumer. They were told that the |
||
| # Overly-broad exception to address #253; if we're about to fail, make sure that we | ||
| # leave a full list of subscribed topics in our class so that we'll properly resub | ||
| # on the next retry. | ||
| if sorted(self._subscribed_topics) != sorted(subscribed_topics): | ||
| self._subscribed_topics = subscribed_topics | ||
| raise | ||
|
|
||
| return ret | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.