Skip to content

Commit 44072a5

Browse files
committed
Add peripheral support and tests
1 parent ac11cb2 commit 44072a5

File tree

2 files changed

+149
-0
lines changed

2 files changed

+149
-0
lines changed

ports/zephyr-cp/common-hal/_bleio/Adapter.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,12 @@ static void bleio_connected_cb(struct bt_conn *conn, uint8_t err) {
148148
return;
149149
}
150150

151+
// When connectable advertising results in a connection, the controller
152+
// auto-stops advertising. Clear our flag to match (we cannot call
153+
// stop_advertising() here because this callback runs in Zephyr's BT
154+
// thread context).
155+
ble_advertising = false;
156+
151157
common_hal_bleio_adapter_obj.connection_objs = NULL;
152158
}
153159

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
# SPDX-FileCopyrightText: 2026 Scott Shawcroft for Adafruit Industries
2+
# SPDX-License-Identifier: MIT
3+
4+
"""BLE peripheral connection tests for nrf5340bsim."""
5+
6+
import pytest
7+
8+
pytestmark = pytest.mark.circuitpython_board("native_nrf5340bsim")
9+
10+
BSIM_PERIPHERAL_CODE = """\
11+
import _bleio
12+
import time
13+
14+
adapter = _bleio.adapter
15+
16+
name = b"CPPERIPH"
17+
advertisement = bytes((2, 0x01, 0x06, len(name) + 1, 0x09)) + name
18+
19+
print("peripheral start")
20+
adapter.start_advertising(advertisement, connectable=True)
21+
print("advertising", adapter.advertising)
22+
23+
for _ in range(80):
24+
if adapter.connected:
25+
break
26+
time.sleep(0.1)
27+
28+
print("connected", adapter.connected, "advertising", adapter.advertising)
29+
30+
for _ in range(80):
31+
if not adapter.connected:
32+
break
33+
time.sleep(0.1)
34+
35+
print("disconnected", adapter.connected, len(adapter.connections))
36+
"""
37+
38+
BSIM_PERIPHERAL_CP_CENTRAL_PERIPHERAL_CODE = """\
39+
import _bleio
40+
import time
41+
42+
adapter = _bleio.adapter
43+
44+
name = b"CPPERIPH"
45+
advertisement = bytes((2, 0x01, 0x06, len(name) + 1, 0x09)) + name
46+
47+
print("peripheral start")
48+
adapter.start_advertising(advertisement, connectable=True)
49+
print("advertising", adapter.advertising)
50+
51+
for _ in range(80):
52+
if adapter.connected:
53+
break
54+
time.sleep(0.1)
55+
56+
print("connected", adapter.connected, "advertising", adapter.advertising)
57+
58+
for _ in range(80):
59+
if not adapter.connected:
60+
break
61+
time.sleep(0.1)
62+
63+
print("disconnected", adapter.connected, len(adapter.connections))
64+
"""
65+
66+
BSIM_PERIPHERAL_CP_CENTRAL_CODE = """\
67+
import _bleio
68+
import time
69+
70+
adapter = _bleio.adapter
71+
72+
print("central start")
73+
target = None
74+
for entry in adapter.start_scan(timeout=6.0, active=True):
75+
if entry.connectable and b"CPPERIPH" in entry.advertisement_bytes:
76+
target = entry.address
77+
print("found peripheral")
78+
break
79+
adapter.stop_scan()
80+
print("have target", target is not None)
81+
82+
if target is None:
83+
raise RuntimeError("No connectable peripheral found")
84+
85+
connection = adapter.connect(target, timeout=5.0)
86+
print("connected", connection.connected, adapter.connected, len(adapter.connections))
87+
connection.disconnect()
88+
89+
for _ in range(40):
90+
if not connection.connected and not adapter.connected:
91+
break
92+
time.sleep(0.1)
93+
94+
print("disconnected", connection.connected, adapter.connected, len(adapter.connections))
95+
"""
96+
97+
98+
@pytest.mark.zephyr_sample("bluetooth/central")
99+
@pytest.mark.duration(14)
100+
@pytest.mark.circuitpy_drive({"code.py": BSIM_PERIPHERAL_CODE})
101+
def test_bsim_peripheral_zephyr_central(bsim_phy, circuitpython, zephyr_sample):
102+
"""Advertise as connectable from CP; Zephyr central connects and disconnects."""
103+
central = zephyr_sample
104+
105+
circuitpython.wait_until_done()
106+
107+
cp_output = circuitpython.serial.all_output
108+
central_output = central.serial.all_output
109+
110+
assert "peripheral start" in cp_output
111+
assert "advertising True" in cp_output
112+
assert "connected True advertising False" in cp_output
113+
assert "disconnected False 0" in cp_output
114+
115+
assert "Scanning successfully started" in central_output
116+
assert "Connected:" in central_output
117+
assert "Disconnected:" in central_output
118+
119+
120+
@pytest.mark.duration(14)
121+
@pytest.mark.circuitpy_drive({"code.py": BSIM_PERIPHERAL_CP_CENTRAL_CODE})
122+
@pytest.mark.circuitpy_drive({"code.py": BSIM_PERIPHERAL_CP_CENTRAL_PERIPHERAL_CODE})
123+
def test_bsim_peripheral_cp_central(bsim_phy, circuitpython1, circuitpython2):
124+
"""Two CP instances: device 0 peripheral, device 1 central."""
125+
peripheral = circuitpython1
126+
central = circuitpython2
127+
128+
central.wait_until_done()
129+
peripheral.wait_until_done()
130+
131+
periph_output = peripheral.serial.all_output
132+
central_output = central.serial.all_output
133+
134+
assert "peripheral start" in periph_output
135+
assert "advertising True" in periph_output
136+
assert "connected True advertising False" in periph_output
137+
assert "disconnected False 0" in periph_output
138+
139+
assert "central start" in central_output
140+
assert "found peripheral" in central_output
141+
assert "have target True" in central_output
142+
assert "connected True True 1" in central_output
143+
assert "disconnected False False 0" in central_output

0 commit comments

Comments
 (0)