Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 184 additions & 4 deletions roborock/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from roborock.devices.device import RoborockDevice
from roborock.devices.device_manager import DeviceManager, UserParams, create_device_manager
from roborock.devices.traits import Trait
from roborock.devices.traits.b01.q10 import Q10PropertiesApi
from roborock.devices.traits.b01.q10.vacuum import VacuumTrait
from roborock.devices.traits.v1 import V1TraitMixin
from roborock.devices.traits.v1.consumeable import ConsumableAttribute
Expand Down Expand Up @@ -521,6 +522,46 @@ async def maps(ctx, device_id: str):
await _display_v1_trait(context, device_id, lambda v1: v1.maps)


# The Q10 pushes its map ~9s after a dpRequestDps; firmware throttles pushes to
# ~once per 60-70s, so a single request is answered quickly but rapid re-requests
# may not be. This bounds how long a one-shot CLI command waits for that push.
_Q10_MAP_PUSH_TIMEOUT = 30.0


async def _await_q10_map_push(
properties: Q10PropertiesApi,
predicate: Callable[[], bool],
*,
timeout: float = _Q10_MAP_PUSH_TIMEOUT,
) -> bool:
"""Nudge a Q10 to push its map/trace and wait until ``predicate`` holds.

The Q10 map API is entirely push-driven: there is no synchronous get-map
request. A ``dpRequestDps`` causes the device to publish a ``MAP_RESPONSE``,
which the device's subscribe loop feeds into the map trait. Here we register
an update listener, send the request, and wait for the pushed data to satisfy
``predicate``. Returns whether it did within ``timeout``.
"""
if predicate():
return True
loop = asyncio.get_running_loop()
updated: asyncio.Future[None] = loop.create_future()

def on_update() -> None:
if predicate() and not updated.done():
updated.set_result(None)

unsub = properties.map.add_update_listener(on_update)
try:
await properties.refresh()
await asyncio.wait_for(updated, timeout=timeout)
return True
except TimeoutError:
return False
finally:
unsub()


@session.command()
@click.option("--device_id", required=True)
@click.option("--output-file", required=True, help="Path to save the map image.")
Expand All @@ -529,15 +570,76 @@ async def maps(ctx, device_id: str):
async def map_image(ctx, device_id: str, output_file: str):
"""Get device map image and save it to a file."""
context: RoborockContext = ctx.obj
trait: MapContentTrait = await _v1_trait(context, device_id, lambda v1: v1.map_content)
if trait.image_content:
device_manager = await context.get_device_manager()
device = await device_manager.get_device(device_id)
if device.b01_q10_properties is not None:
properties = device.b01_q10_properties
await _await_q10_map_push(properties, lambda: properties.map.image_content is not None)
image_content = properties.map.image_content
else:
v1_trait: MapContentTrait = await _v1_trait(context, device_id, lambda v1: v1.map_content)
image_content = v1_trait.image_content
if image_content:
with open(output_file, "wb") as f:
f.write(trait.image_content)
f.write(image_content)
click.echo(f"Map image saved to {output_file}")
else:
click.echo("No map image content available.")


@session.command()
@click.option("--device_id", required=True)
@click.option("--output-dir", default=None, help="If set, write one transparent PNG per layer here.")
@click.pass_context
@async_command
async def q10_map_layers(ctx, device_id: str, output_dir: str | None):
"""List the Q10 map's separable layers (background/wall/floor/per-room).

With --output-dir, also exports each layer as a transparent PNG that can be
stacked in a frontend (background, then floor, then walls, then each room).
"""
import os

context: RoborockContext = ctx.obj
device_manager = await context.get_device_manager()
device = await device_manager.get_device(device_id)
if device.b01_q10_properties is None:
click.echo("Feature not supported by device")
return
properties = device.b01_q10_properties
await _await_q10_map_push(properties, lambda: properties.map.layers is not None)
layers = properties.map.layers
if layers is None:
click.echo("No map layers available.")
return

summary = {
"size": {"width": layers.width, "height": layers.height},
"class_counts": layers.class_counts,
"rooms": [
{"id": r.id, "name": r.name, "pixel_count": r.pixel_count, "bbox": list(r.bbox)} for r in layers.rooms
],
}
click.echo(dump_json(summary))

if output_dir:
os.makedirs(output_dir, exist_ok=True)
exports = {
"background": layers.render_class("background", (210, 210, 215, 255), scale=2),
"floor": layers.render_class("floor", (70, 170, 95, 200), scale=2),
"wall": layers.render_class("wall", (20, 20, 25, 255), scale=2),
}
for name, png in exports.items():
with open(os.path.join(output_dir, f"layer_{name}.png"), "wb") as f:
f.write(png)
for room in layers.rooms:
png = layers.render_room(room.id, (90, 140, 220, 200), scale=2)
safe = "".join(c if c.isalnum() else "_" for c in room.name) or f"room{room.id}"
with open(os.path.join(output_dir, f"room_{room.id}_{safe}.png"), "wb") as f:
f.write(png)
click.echo(f"Wrote {3 + len(layers.rooms)} layer PNGs to {output_dir}")


@session.command()
@click.option("--device_id", required=True)
@click.option("--include_path", is_flag=True, default=False, help="Include path data in the output.")
Expand All @@ -564,6 +666,75 @@ async def map_data(ctx, device_id: str, include_path: bool):
click.echo(dump_json(data_summary))


@session.command()
@click.option("--device_id", required=True)
@click.option("--include_path", is_flag=True, default=False, help="Include all path points in the output.")
@click.pass_context
@async_command
async def q10_position(ctx, device_id: str, include_path: bool):
"""Get the current Q10 robot position and live cleaning path.

The Q10 only streams its position/path while it is actively cleaning, so this
will report that no live trace is available for an idle/docked robot.
"""
context: RoborockContext = ctx.obj
device_manager = await context.get_device_manager()
device = await device_manager.get_device(device_id)
if device.b01_q10_properties is None:
click.echo("Feature not supported by device")
return
properties = device.b01_q10_properties
got_trace = await _await_q10_map_push(properties, lambda: bool(properties.map.path))
if not got_trace:
click.echo("No live trace available (the robot only reports position while cleaning).")
return
map_trait = properties.map
position = map_trait.robot_position
summary: dict[str, Any] = {
"robot_position": {"x": position.x, "y": position.y} if position else None,
"path_points": len(map_trait.path),
}
if include_path:
summary["path"] = [[p.x, p.y] for p in map_trait.path]
click.echo(dump_json(summary))


@session.command()
@click.option("--device_id", required=True)
@click.option("--output-file", required=True, help="Path to save the map image with the path drawn.")
@click.pass_context
@async_command
async def q10_map_with_path(ctx, device_id: str, output_file: str):
"""Render the Q10 map with the current cleaning path + robot position drawn.

Needs the robot to be actively cleaning (the path/calibration come from the
live trace). Fetches the map and the path, solves the world<->pixel
calibration, and writes the annotated PNG.
"""
context: RoborockContext = ctx.obj
device_manager = await context.get_device_manager()
device = await device_manager.get_device(device_id)
if device.b01_q10_properties is None:
click.echo("Feature not supported by device")
return
properties = device.b01_q10_properties
map_trait = properties.map
await _await_q10_map_push(properties, lambda: map_trait.image_content is not None)
got_path = await _await_q10_map_push(properties, lambda: bool(map_trait.path))
if not got_path:
click.echo("No live path available (the robot only reports its path while cleaning).")
return
try:
image = map_trait.render_path_on_map()
except RoborockException as err:
click.echo(f"Could not render path on map: {err}")
return
with open(output_file, "wb") as f:
f.write(image)
cal = map_trait.calibration
click.echo(f"Saved map with {len(map_trait.path)}-point path to {output_file} (calibration: {cal})")


@session.command()
@click.option("--device_id", required=True)
@click.pass_context
Expand Down Expand Up @@ -705,7 +876,16 @@ async def set_child_lock(ctx, device_id: str, enabled: bool):
async def rooms(ctx, device_id: str):
"""Get device room mapping info."""
context: RoborockContext = ctx.obj
await _display_v1_trait(context, device_id, lambda v1: v1.rooms)
device_manager = await context.get_device_manager()
device = await device_manager.get_device(device_id)
if device.b01_q10_properties is not None:
properties = device.b01_q10_properties
# A valid map may have no room records, so wait on the map arriving
# (image_content) rather than on rooms being non-empty.
await _await_q10_map_push(properties, lambda: properties.map.image_content is not None)
click.echo(dump_json({room.id: room.name for room in properties.map.rooms}))
else:
await _display_v1_trait(context, device_id, lambda v1: v1.rooms)


@session.command()
Expand Down
4 changes: 4 additions & 0 deletions roborock/data/b01_q10/b01_q10_containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,7 @@ class Q10Status(RoborockBase):
back_type: YXBackType | None = field(default=None, metadata={"dps": B01_Q10_DP.BACK_TYPE})
cleaning_progress: int | None = field(default=None, metadata={"dps": B01_Q10_DP.CLEAN_PROGRESS})
fault: int | None = field(default=None, metadata={"dps": B01_Q10_DP.FAULT})
# Raw base64 map-overlay blobs (decoded by roborock.map.b01_q10_overlays).
restricted_zone_up: str | None = field(default=None, metadata={"dps": B01_Q10_DP.RESTRICTED_ZONE_UP})
virtual_wall_up: str | None = field(default=None, metadata={"dps": B01_Q10_DP.VIRTUAL_WALL_UP})
zoned_up: str | None = field(default=None, metadata={"dps": B01_Q10_DP.ZONED_UP})
7 changes: 6 additions & 1 deletion roborock/devices/rpc/b01_q10_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@
async def stream_decoded_responses(
mqtt_channel: MqttChannel,
) -> AsyncGenerator[dict[B01_Q10_DP, Any], None]:
"""Stream decoded DPS messages received via MQTT."""
"""Stream decoded DPS messages received via MQTT.

Messages that are not decodable DPS responses (e.g. protocol-301
``MAP_RESPONSE`` map pushes) are skipped; callers that need the raw
messages should subscribe to :meth:`MqttChannel.subscribe_stream` directly.
"""

async for response_message in mqtt_channel.subscribe_stream():
try:
Expand Down
52 changes: 43 additions & 9 deletions roborock/devices/traits/b01/q10/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@
import logging

from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP
from roborock.devices.rpc.b01_q10_channel import stream_decoded_responses
from roborock.devices.traits import Trait
from roborock.devices.transport.mqtt_channel import MqttChannel
from roborock.exceptions import RoborockException
from roborock.protocols.b01_q10_protocol import decode_rpc_response
from roborock.roborock_message import RoborockMessage

from .command import CommandTrait
from .map import MapContentTrait
from .remote import RemoteTrait
from .status import StatusTrait
from .vacuum import VacuumTrait

__all__ = [
"Q10PropertiesApi",
"MapContentTrait",
]

_LOGGER = logging.getLogger(__name__)
Expand All @@ -35,13 +39,17 @@ class Q10PropertiesApi(Trait):
remote: RemoteTrait
"""Trait for sending remote control related commands to Q10 devices."""

map: MapContentTrait
"""Trait for fetching the current parsed map (image + rooms)."""

def __init__(self, channel: MqttChannel) -> None:
"""Initialize the B01Props API."""
self._channel = channel
self.command = CommandTrait(channel)
self.vacuum = VacuumTrait(self.command)
self.remote = RemoteTrait(self.command)
self.status = StatusTrait()
self.map = MapContentTrait()
self._subscribe_task: asyncio.Task[None] | None = None

async def start(self) -> None:
Expand All @@ -65,14 +73,40 @@ async def refresh(self) -> None:
await self.command.send(B01_Q10_DP.REQUEST_DPS, params={})

async def _subscribe_loop(self) -> None:
"""Persistent loop to listen for status updates."""
async for decoded_dps in stream_decoded_responses(self._channel):
_LOGGER.debug("Received Q10 status update: %s", decoded_dps)

# Notify all traits about a new message and each trait will
# only update what fields that it is responsible for.
# More traits can be added here below.
self.status.update_from_dps(decoded_dps)
"""Persistent loop dispatching pushed messages to the read-model traits."""
async for message in self._channel.subscribe_stream():
self._handle_message(message)

def _handle_message(self, message: RoborockMessage) -> None:
"""Route a single pushed message to the trait responsible for it.

Map/trace pushes arrive as protocol-301 ``MAP_RESPONSE`` messages (not
DPS), so they are handled separately from the status DPS stream. The Q10
is entirely push-driven: there is no synchronous get-map request, the
device just publishes its current map (a ``dpRequestDps`` nudges it to).
"""
if self.map.update_from_map_response(message):
return

try:
decoded_dps = decode_rpc_response(message)
except RoborockException as ex:
_LOGGER.debug("Failed to decode Q10 RPC response: %s: %s", message, ex)
return

_LOGGER.debug("Received Q10 status update: %s", decoded_dps)
# Notify all traits about a new message and each trait will
# only update what fields that it is responsible for.
# More traits can be added here below.
self.status.update_from_dps(decoded_dps)

# Feed the map's vector-overlay data points (no-go zones / virtual
# walls) to the map trait so they are decoded as they arrive.
if B01_Q10_DP.RESTRICTED_ZONE_UP in decoded_dps or B01_Q10_DP.VIRTUAL_WALL_UP in decoded_dps:
self.map.load_overlays(
restricted_zone_up=decoded_dps.get(B01_Q10_DP.RESTRICTED_ZONE_UP),
virtual_wall_up=decoded_dps.get(B01_Q10_DP.VIRTUAL_WALL_UP),
)


def create(channel: MqttChannel) -> Q10PropertiesApi:
Expand Down
Loading