forked from micropython/micropython-lib
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathl2cap_file_server.py
More file actions
183 lines (143 loc) · 5.49 KB
/
l2cap_file_server.py
File metadata and controls
183 lines (143 loc) · 5.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# MIT license; Copyright (c) 2021 Jim Mussared
# This is a BLE file server, based very loosely on the Object Transfer Service
# specification. It demonstrated transferring data over an L2CAP channel, as
# well as using notifications and GATT writes on a characteristic.
# The server supports downloading and uploading files, as well as querying
# directory listings and file sizes.
# In order to access the file server, a client must connect, then establish an
# L2CAP channel. To being an operation, a command is written to the control
# characteristic, including a command number, sequence number, and filesystem
# path. The response will be either via a notification on the control
# characteristic (e.g. file size), or via the L2CAP channel (file contents or
# directory listing).
import sys
# ruff: noqa: E402
sys.path.append("")
from micropython import const
import asyncio
import aioble
import bluetooth
import struct
import os
# Randomly generated UUIDs.
_FILE_SERVICE_UUID = bluetooth.UUID("0492fcec-7194-11eb-9439-0242ac130002")
_CONTROL_CHARACTERISTIC_UUID = bluetooth.UUID("0492fcec-7194-11eb-9439-0242ac130003")
# How frequently to send advertising beacons.
_ADV_INTERVAL_US = 250_000
_COMMAND_SEND = const(0)
_COMMAND_RECV = const(1) # Not yet implemented.
_COMMAND_LIST = const(2)
_COMMAND_SIZE = const(3)
_COMMAND_DONE = const(4)
_STATUS_OK = const(0)
_STATUS_NOT_IMPLEMENTED = const(1)
_STATUS_NOT_FOUND = const(2)
_L2CAP_PSN = const(22)
_L2CAP_MTU = const(128)
# Register GATT server.
file_service = aioble.Service(_FILE_SERVICE_UUID)
control_characteristic = aioble.Characteristic(
file_service, _CONTROL_CHARACTERISTIC_UUID, write=True, notify=True
)
aioble.register_services(file_service)
send_file = None
recv_file = None
list_path = None
op_seq = None
l2cap_event = asyncio.Event()
def send_done_notification(connection, status=_STATUS_OK):
global op_seq
control_characteristic.notify(connection, struct.pack("<BBB", _COMMAND_DONE, op_seq, status))
op_seq = None
async def l2cap_task(connection):
global send_file, recv_file, list_path
try:
channel = await connection.l2cap_accept(_L2CAP_PSN, _L2CAP_MTU)
print("channel accepted")
while True:
await l2cap_event.wait()
l2cap_event.clear()
if send_file:
print("Sending:", send_file)
with open(send_file, "rb") as f: # noqa: ASYNC230
buf = bytearray(channel.peer_mtu)
mv = memoryview(buf)
while n := f.readinto(buf):
await channel.send(mv[:n])
await channel.flush()
send_done_notification(connection)
send_file = None
if recv_file:
print("Receiving:", recv_file)
send_done_notification(connection, _STATUS_NOT_IMPLEMENTED)
recv_file = None
if list_path:
print("List:", list_path)
try:
for name, _, _, size in os.ilistdir(list_path):
await channel.send("{}:{}\n".format(size, name))
await channel.send("\n")
await channel.flush()
send_done_notification(connection)
except OSError:
send_done_notification(connection, _STATUS_NOT_FOUND)
list_path = None
except aioble.DeviceDisconnectedError:
print("Stopping l2cap")
return
async def control_task(connection):
global send_file, recv_file, list_path
try:
with connection.timeout(None):
while True:
print("Waiting for write")
await control_characteristic.written()
msg = control_characteristic.read()
control_characteristic.write(b"")
if len(msg) < 3:
continue
# Message is <command><seq><path...>.
command = msg[0]
seq = msg[1]
file = msg[2:].decode()
if command == _COMMAND_SEND:
send_file = file
l2cap_event.set()
elif command == _COMMAND_RECV:
recv_file = file
l2cap_event.set()
elif command == _COMMAND_LIST:
list_path = file
l2cap_event.set()
elif command == _COMMAND_SIZE:
try:
stat = os.stat(file)
size = stat[6]
status = 0
except OSError:
size = 0
status = _STATUS_NOT_FOUND
control_characteristic.notify(
connection, struct.pack("<BBI", seq, status, size)
)
except aioble.DeviceDisconnectedError:
return
# Serially wait for connections. Don't advertise while a central is
# connected.
async def peripheral_task():
while True:
print("Waiting for connection")
connection = await aioble.advertise(
_ADV_INTERVAL_US,
name="mpy-file",
services=[_FILE_SERVICE_UUID],
)
print("Connection from", connection.device)
t = asyncio.create_task(l2cap_task(connection))
await control_task(connection)
t.cancel()
await connection.disconnected()
# Run both tasks.
async def main():
await peripheral_task()
asyncio.run(main())