-
Notifications
You must be signed in to change notification settings - Fork 51
Expand file tree
/
Copy pathtest_unsubscribe.py
More file actions
186 lines (166 loc) · 4.84 KB
/
test_unsubscribe.py
File metadata and controls
186 lines (166 loc) · 4.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# SPDX-FileCopyrightText: 2023 Vladimír Kotal
#
# SPDX-License-Identifier: Unlicense
"""unsubscribe tests"""
import logging
import ssl
import pytest
from mocket import Mocket
import adafruit_minimqtt.adafruit_minimqtt as MQTT
def handle_unsubscribe(client, user_data, topic, pid):
"""
Record topics into user data.
"""
assert topic
user_data.append(topic)
# The MQTT packet contents below were captured using Mosquitto client+server.
# These are all verbatim, except:
# - message ID that was changed from 2 to 1 since in the real world
# the UNSUBSCRIBE packet followed the SUBSCRIBE packet.
# - the long list topics is sent as individual UNSUBSCRIBE packets by Mosquitto
testdata = [
# short topic with remaining length encoded as single byte
(
"foo/bar",
bytearray([0xB0, 0x02, 0x00, 0x01]),
bytearray([
0xA2, # fixed header
0x0B, # remaining length
0x00, # message ID
0x01,
0x00, # topic length
0x07,
0x66, # topic
0x6F,
0x6F,
0x2F,
0x62,
0x61,
0x72,
]),
),
# remaining length is encoded as 2 bytes due to long topic name.
(
"f" + "o" * 257,
bytearray([0xB0, 0x02, 0x00, 0x01]),
bytearray(
[
0xA2, # fixed header
0x86, # remaining length
0x02,
0x00, # message ID
0x01,
0x01, # topic length
0x02,
0x66, # topic
]
+ [0x6F] * 257
),
),
# UNSUBSCRIBE responded to by PUBLISH followed by UNSUBACK
(
"foo/bar",
bytearray([
0x30, # PUBLISH
0x0C,
0x00,
0x07,
0x66,
0x6F,
0x6F,
0x2F,
0x62,
0x61,
0x72,
0x66,
0x6F,
0x6F,
0xB0, # UNSUBACK
0x02,
0x00,
0x01,
]),
bytearray([
0xA2, # fixed header
0x0B, # remaining length
0x00,
0x01, # message ID
0x00,
0x07, # topic length
0x66, # topic
0x6F,
0x6F,
0x2F,
0x62,
0x61,
0x72,
]),
),
# use list of topics for more coverage. If the range was (1, 10000), that would be
# long enough to use 3 bytes for remaining length, however that would make the test
# run for many minutes even on modern systems, so 1000 is used instead.
# This results in 2 bytes for the remaining length.
(
[f"foo/bar{x:04}" for x in range(1, 1000)],
bytearray([0xB0, 0x02, 0x00, 0x01]),
bytearray(
[
0xA2, # fixed header
0xBD, # remaining length
0x65,
0x00, # message ID
0x01,
]
+ sum(
[[0x00, 0x0B] + list(f"foo/bar{x:04}".encode("ascii")) for x in range(1, 1000)],
[],
)
),
),
]
@pytest.mark.parametrize(
"topic,to_send,exp_recv",
testdata,
ids=["short_topic", "long_topic", "publish_first", "topic_list_long"],
)
def test_unsubscribe(topic, to_send, exp_recv) -> None:
"""
Protocol level testing of UNSUBSCRIBE and UNSUBACK packet handling.
Nothing will travel over the wire, it is all fake.
Also, the topics are not subscribed into.
"""
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
host = "localhost"
port = 1883
unsubscribed_topics = []
mqtt_client = MQTT.MQTT(
broker=host,
port=port,
ssl_context=ssl.create_default_context(),
connect_retries=1,
user_data=unsubscribed_topics,
)
mqtt_client.on_unsubscribe = handle_unsubscribe
# patch is_connected() to avoid CONNECT/CONNACK handling.
mqtt_client.is_connected = lambda: True
mocket = Mocket(to_send)
mqtt_client._sock = mocket
mqtt_client.logger = logger
if isinstance(topic, str):
mqtt_client._subscribed_topics = [(topic, 1)]
elif isinstance(topic, list):
if topic and isinstance(topic[0], tuple):
mqtt_client._subscribed_topics = topic
else:
mqtt_client._subscribed_topics = [(t, 1) for t in topic]
logger.info(f"unsubscribing from {topic}")
mqtt_client.unsubscribe(topic)
if isinstance(topic, str):
assert topic in unsubscribed_topics
elif isinstance(topic, list):
for topic_name in topic:
assert topic_name in unsubscribed_topics
assert mocket.sent == exp_recv
assert len(mocket._to_send) == 0