Skip to content

Commit fdab097

Browse files
committed
Tweak peripheral tests
1 parent 7150f82 commit fdab097

File tree

5 files changed

+152
-48
lines changed

5 files changed

+152
-48
lines changed

ports/zephyr-cp/boards/nrf5340bsim_nrf5340_cpuapp.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ CONFIG_BT_OBSERVER=y
1212
CONFIG_BT_BROADCASTER=y
1313

1414
CONFIG_BT_L2CAP_TX_MTU=253
15+
CONFIG_BT_BUF_CMD_TX_COUNT=2
1516
CONFIG_BT_BUF_CMD_TX_SIZE=255
17+
CONFIG_BT_HCI_VS=y
1618
CONFIG_BT_BUF_EVT_RX_COUNT=16
1719
CONFIG_BT_BUF_EVT_RX_SIZE=255
1820
CONFIG_BT_BUF_ACL_TX_COUNT=3

ports/zephyr-cp/common-hal/_bleio/Adapter.c

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <zephyr/bluetooth/bluetooth.h>
1414
#include <zephyr/bluetooth/conn.h>
1515
#include <zephyr/bluetooth/hci.h>
16+
#include <zephyr/bluetooth/hci_vs.h>
1617

1718
#include "py/gc.h"
1819
#include "py/runtime.h"
@@ -278,11 +279,48 @@ bool common_hal_bleio_adapter_get_enabled(bleio_adapter_obj_t *self) {
278279
}
279280

280281
mp_int_t common_hal_bleio_adapter_get_tx_power(bleio_adapter_obj_t *self) {
281-
mp_raise_NotImplementedError(NULL);
282+
struct bt_hci_cp_vs_read_tx_power_level *cp;
283+
struct bt_hci_rp_vs_read_tx_power_level *rp;
284+
struct net_buf *buf, *rsp = NULL;
285+
286+
buf = bt_hci_cmd_alloc(K_MSEC(1000));
287+
if (!buf) {
288+
mp_raise_msg(&mp_type_MemoryError, NULL);
289+
}
290+
cp = net_buf_add(buf, sizeof(*cp));
291+
cp->handle_type = BT_HCI_VS_LL_HANDLE_TYPE_ADV;
292+
cp->handle = 0;
293+
294+
int err = bt_hci_cmd_send_sync(BT_HCI_OP_VS_READ_TX_POWER_LEVEL, buf, &rsp);
295+
if (err) {
296+
raise_zephyr_error(err);
297+
}
298+
299+
rp = (void *)rsp->data;
300+
int8_t power = rp->tx_power_level;
301+
net_buf_unref(rsp);
302+
return power;
282303
}
283304

284305
void common_hal_bleio_adapter_set_tx_power(bleio_adapter_obj_t *self, mp_int_t tx_power) {
285-
mp_raise_NotImplementedError(NULL);
306+
struct bt_hci_cp_vs_write_tx_power_level *cp;
307+
struct net_buf *buf, *rsp = NULL;
308+
309+
buf = bt_hci_cmd_alloc(K_MSEC(3000));
310+
if (!buf) {
311+
mp_raise_msg(&mp_type_MemoryError, NULL);
312+
}
313+
cp = net_buf_add(buf, sizeof(*cp));
314+
cp->handle_type = BT_HCI_VS_LL_HANDLE_TYPE_ADV;
315+
cp->handle = 0;
316+
cp->tx_power_level = (int8_t)tx_power;
317+
318+
int err = bt_hci_cmd_send_sync(BT_HCI_OP_VS_WRITE_TX_POWER_LEVEL, buf, &rsp);
319+
if (err) {
320+
raise_zephyr_error(err);
321+
}
322+
323+
net_buf_unref(rsp);
286324
}
287325

288326
bleio_address_obj_t *common_hal_bleio_adapter_get_address(bleio_adapter_obj_t *self) {
@@ -321,7 +359,6 @@ void common_hal_bleio_adapter_start_advertising(bleio_adapter_obj_t *self,
321359
mp_buffer_info_t *advertising_data_bufinfo,
322360
mp_buffer_info_t *scan_response_data_bufinfo,
323361
mp_int_t tx_power, const bleio_address_obj_t *directed_to) {
324-
(void)tx_power;
325362
(void)directed_to;
326363
(void)interval;
327364

@@ -391,6 +428,8 @@ void common_hal_bleio_adapter_start_advertising(bleio_adapter_obj_t *self,
391428
NULL);
392429
}
393430

431+
common_hal_bleio_adapter_set_tx_power(self, tx_power);
432+
394433
raise_zephyr_error(bt_le_adv_start(&adv_params,
395434
adv_data,
396435
adv_count,
@@ -555,7 +594,7 @@ mp_obj_t common_hal_bleio_adapter_connect(bleio_adapter_obj_t *self, bleio_addre
555594

556595
if (info.state == BT_CONN_STATE_DISCONNECTED) {
557596
bt_conn_unref(conn);
558-
mp_raise_bleio_BluetoothError(MP_ERROR_TEXT("Failed to connect"));
597+
mp_raise_bleio_BluetoothError(MP_ERROR_TEXT("Failed to connect: timeout"));
559598
}
560599
} else if (err != -ENOTCONN) {
561600
bt_conn_unref(conn);

ports/zephyr-cp/tests/bsim/conftest.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ def bsim_phy(request, bsim_phy_binary, native_sim_env, sim_id):
107107
"-v=9", # Cleaning up level is on 9. Connecting is 7.
108108
f"-s={sim_id}",
109109
f"-D={devices}",
110+
"-argschannel",
111+
"-at=40", # 40 dB attenuation (default 60) so RSSI ~ -40 dBm
110112
]
111113
print("Running:", " ".join(cmd))
112114
proc = subprocess.Popen(
@@ -213,11 +215,13 @@ def zephyr_sample(request, bsim_phy, native_sim_env, sim_id):
213215
print(sample_proc.serial.all_output)
214216

215217

218+
# pytest markers are defined inside out meaning the bottom one is first in the
219+
# list and the top is last. So use negative indices to reverse them.
216220
@pytest.fixture
217221
def circuitpython1(circuitpython):
218-
return circuitpython[0]
222+
return circuitpython[-1]
219223

220224

221225
@pytest.fixture
222226
def circuitpython2(circuitpython):
223-
return circuitpython[1]
227+
return circuitpython[-2]

ports/zephyr-cp/tests/bsim/test_bsim_ble_advertising.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"""BLE advertising tests for nrf5340bsim."""
55

66
import logging
7+
import re
78

89
import pytest
910

@@ -44,6 +45,39 @@
4445
"""
4546

4647

48+
BSIM_TX_POWER_DEFAULT_CODE = """\
49+
import _bleio
50+
import time
51+
52+
adapter = _bleio.adapter
53+
54+
name = b"CPTXPWR"
55+
advertisement = bytes((2, 0x01, 0x06, len(name) + 1, 0x09)) + name
56+
57+
print("advertising default")
58+
adapter.start_advertising(advertisement)
59+
time.sleep(4)
60+
adapter.stop_advertising()
61+
print("done")
62+
"""
63+
64+
BSIM_TX_POWER_LOW_CODE = """\
65+
import _bleio
66+
import time
67+
68+
adapter = _bleio.adapter
69+
70+
name = b"CPTXPWR"
71+
advertisement = bytes((2, 0x01, 0x06, len(name) + 1, 0x09)) + name
72+
73+
print("advertising low")
74+
adapter.start_advertising(advertisement, tx_power=-20)
75+
time.sleep(4)
76+
adapter.stop_advertising()
77+
print("done")
78+
"""
79+
80+
4781
@pytest.mark.zephyr_sample("bluetooth/observer")
4882
@pytest.mark.circuitpy_drive({"code.py": BSIM_ADV_CODE})
4983
def test_bsim_advertise_and_scan(bsim_phy, circuitpython, zephyr_sample):
@@ -90,3 +124,51 @@ def test_bsim_advertise_ctrl_c_reload(bsim_phy, circuitpython, zephyr_sample):
90124
assert cp_output.count("adv run done") >= 1
91125
assert observer_output.count("Device found:") >= observer_count_before + 1
92126
assert "Already advertising" not in cp_output
127+
128+
129+
@pytest.mark.zephyr_sample("bluetooth/observer")
130+
@pytest.mark.circuitpy_drive({"code.py": BSIM_TX_POWER_DEFAULT_CODE})
131+
def test_bsim_tx_power_default_rssi(bsim_phy, circuitpython, zephyr_sample):
132+
"""Verify default TX power produces expected RSSI."""
133+
observer = zephyr_sample
134+
135+
circuitpython.wait_until_done()
136+
137+
cp_output = circuitpython.serial.all_output
138+
obs_output = observer.serial.all_output
139+
140+
assert "advertising default" in cp_output
141+
assert "done" in cp_output
142+
143+
# Observer: "Device found: <addr> (RSSI <n>), type <t>, AD data len <l>"
144+
# Advertisement is 12 bytes: flags (3) + name (9).
145+
# With 40 dB channel attenuation and 0 dBm TX → RSSI ~ -39
146+
rssi_pattern = re.compile(r"RSSI (-?\d+)\), type \d+, AD data len 12")
147+
all_rssi = [int(m.group(1)) for m in rssi_pattern.finditer(obs_output)]
148+
logger.info("RSSI values: %s", all_rssi)
149+
150+
assert len(all_rssi) > 0, "Observer saw no advertisements"
151+
assert all_rssi[0] == -39, f"Expected RSSI -39 (0 dBm TX), got {all_rssi[0]}"
152+
153+
154+
@pytest.mark.zephyr_sample("bluetooth/observer")
155+
@pytest.mark.circuitpy_drive({"code.py": BSIM_TX_POWER_LOW_CODE})
156+
def test_bsim_tx_power_low_rssi(bsim_phy, circuitpython, zephyr_sample):
157+
"""Verify low TX power reduces RSSI."""
158+
observer = zephyr_sample
159+
160+
circuitpython.wait_until_done()
161+
162+
cp_output = circuitpython.serial.all_output
163+
obs_output = observer.serial.all_output
164+
165+
assert "advertising low" in cp_output
166+
assert "done" in cp_output
167+
168+
# With 40 dB channel attenuation and -20 dBm TX → RSSI ~ -59
169+
rssi_pattern = re.compile(r"RSSI (-?\d+)\), type \d+, AD data len 12")
170+
all_rssi = [int(m.group(1)) for m in rssi_pattern.finditer(obs_output)]
171+
logger.info("RSSI values: %s", all_rssi)
172+
173+
assert len(all_rssi) > 0, "Observer saw no advertisements"
174+
assert all_rssi[0] < -39, f"Expected lower RSSI with -20 dBm TX, got {all_rssi[0]}"

ports/zephyr-cp/tests/bsim/test_bsim_ble_peripheral.py

Lines changed: 19 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
BSIM_PERIPHERAL_CODE = """\
1111
import _bleio
1212
import time
13+
import sys
1314
1415
adapter = _bleio.adapter
1516
@@ -20,50 +21,27 @@
2021
adapter.start_advertising(advertisement, connectable=True)
2122
print("advertising", adapter.advertising)
2223
23-
for _ in range(80):
24-
if adapter.connected:
25-
break
26-
time.sleep(0.1)
27-
28-
print("connected", adapter.connected, "advertising", adapter.advertising)
29-
30-
for _ in range(80):
31-
if not adapter.connected:
32-
break
33-
time.sleep(0.1)
34-
35-
print("disconnected", adapter.connected, len(adapter.connections))
36-
"""
24+
was_connected = False
25+
timeout = time.monotonic() + 8.0
26+
while not was_connected and time.monotonic() < timeout:
27+
time.sleep(0.01)
28+
was_connected = adapter.connected
3729
38-
BSIM_PERIPHERAL_CP_CENTRAL_PERIPHERAL_CODE = """\
39-
import _bleio
40-
import time
30+
if not was_connected:
31+
print("connect timed out")
32+
sys.exit(-1)
4133
42-
adapter = _bleio.adapter
34+
print("connected", was_connected, "advertising", adapter.advertising)
4335
44-
name = b"CPPERIPH"
45-
advertisement = bytes((2, 0x01, 0x06, len(name) + 1, 0x09)) + name
46-
47-
print("peripheral start")
48-
adapter.start_advertising(advertisement, connectable=True)
49-
print("advertising", adapter.advertising)
50-
51-
for _ in range(80):
52-
if adapter.connected:
53-
break
54-
time.sleep(0.1)
55-
56-
print("connected", adapter.connected, "advertising", adapter.advertising)
57-
58-
for _ in range(80):
59-
if not adapter.connected:
60-
break
61-
time.sleep(0.1)
36+
if was_connected:
37+
timeout = time.monotonic() + 8.0
38+
while adapter.connected and time.monotonic() < timeout:
39+
time.sleep(0.1)
6240
6341
print("disconnected", adapter.connected, len(adapter.connections))
6442
"""
6543

66-
BSIM_PERIPHERAL_CP_CENTRAL_CODE = """\
44+
BSIM_CENTRAL_CODE = """\
6745
import _bleio
6846
import time
6947
@@ -86,9 +64,8 @@
8664
print("connected", connection.connected, adapter.connected, len(adapter.connections))
8765
connection.disconnect()
8866
89-
for _ in range(40):
90-
if not connection.connected and not adapter.connected:
91-
break
67+
timeout = time.monotonic() + 4.0
68+
while (connection.connected or adapter.connected) and time.monotonic() < timeout:
9269
time.sleep(0.1)
9370
9471
print("disconnected", connection.connected, adapter.connected, len(adapter.connections))
@@ -118,8 +95,8 @@ def test_bsim_peripheral_zephyr_central(bsim_phy, circuitpython, zephyr_sample):
11895

11996

12097
@pytest.mark.duration(14)
121-
@pytest.mark.circuitpy_drive({"code.py": BSIM_PERIPHERAL_CP_CENTRAL_CODE})
122-
@pytest.mark.circuitpy_drive({"code.py": BSIM_PERIPHERAL_CP_CENTRAL_PERIPHERAL_CODE})
98+
@pytest.mark.circuitpy_drive({"code.py": BSIM_PERIPHERAL_CODE})
99+
@pytest.mark.circuitpy_drive({"code.py": BSIM_CENTRAL_CODE})
123100
def test_bsim_peripheral_cp_central(bsim_phy, circuitpython1, circuitpython2):
124101
"""Two CP instances: device 0 peripheral, device 1 central."""
125102
peripheral = circuitpython1

0 commit comments

Comments
 (0)