|
6 | 6 |
|
7 | 7 | import logging |
8 | 8 | import ssl |
9 | | -from unittest import mock |
10 | 9 |
|
11 | 10 | import pytest |
| 11 | +from mocket import Mocket |
12 | 12 |
|
13 | 13 | import adafruit_minimqtt.adafruit_minimqtt as MQTT |
14 | 14 |
|
15 | 15 |
|
16 | | -class Mocket: |
17 | | - """ |
18 | | - Mock Socket tailored for MiniMQTT testing. Records sent data, |
19 | | - hands out pre-recorded reply. |
20 | | -
|
21 | | - Inspired by the Mocket class from Adafruit_CircuitPython_Requests |
22 | | - """ |
23 | | - |
24 | | - def __init__(self, to_send): |
25 | | - self._to_send = to_send |
26 | | - |
27 | | - self.sent = bytearray() |
28 | | - |
29 | | - self.timeout = mock.Mock() |
30 | | - self.connect = mock.Mock() |
31 | | - self.close = mock.Mock() |
32 | | - |
33 | | - def send(self, bytes_to_send): |
34 | | - """merely record the bytes. return the length of this bytearray.""" |
35 | | - self.sent.extend(bytes_to_send) |
36 | | - return len(bytes_to_send) |
37 | | - |
38 | | - # MiniMQTT checks for the presence of "recv_into" and switches behavior based on that. |
39 | | - def recv_into(self, retbuf, bufsize): |
40 | | - """return data from internal buffer""" |
41 | | - size = min(bufsize, len(self._to_send)) |
42 | | - if size == 0: |
43 | | - return size |
44 | | - chop = self._to_send[0:size] |
45 | | - retbuf[0:] = chop |
46 | | - self._to_send = self._to_send[size:] |
47 | | - return size |
48 | | - |
49 | | - |
50 | 16 | # pylint: disable=unused-argument |
51 | 17 | def handle_subscribe(client, user_data, topic, qos): |
52 | 18 | """ |
|
0 commit comments