ADR-025: Command Channel and Sub-Topic Routing¶
Status¶
Accepted Date: 2026-03-31
Context¶
Three command-handling mechanisms exist today:
@app.command("name")— module-level standalone async function with DI-injected parameters. Best for stateless, fire-and-forget commands.@ctx.on_command— runtime callback registered onDeviceContext. Single handler slot per device;DeviceContextraisesRuntimeErroron duplicate registration. Best for devices that react to commands but don't need a long-running loop.@app.devicewith manual loop — escape hatch for devices that maintain complex state machines across command boundaries (a cover controller tracking position, a GPIO-driven actuator with multi-step sequences).
The third pattern is where problems concentrate. Devices with long-running command loops repeatedly reinvent three workaround patterns:
Inline execution in callback. The @ctx.on_command handler does all the work
directly. This blocks on long-running commands and prevents concurrent processing —
a cover controller that takes 30 seconds to reach a target position cannot accept
new commands during traversal.
Explicit asyncio.Queue bridge. The developer creates a queue, forwards
commands from the callback into it, and awaits from the queue in the device loop.
This works correctly but adds ~20 lines of mechanical boilerplate per device:
@app.device("actuator")
async def actuator_device(ctx):
queue: asyncio.Queue[str] = asyncio.Queue()
@ctx.on_command
async def handle(topic, payload):
await queue.put(payload)
while not ctx.shutdown_requested:
try:
payload = await asyncio.wait_for(queue.get(), timeout=5)
await process(payload)
except TimeoutError:
await periodic_check()
nonlocal variable mutation. The callback sets a flag or value via closure,
and the device loop polls it — fragile, race-prone, and difficult to reason about
under concurrent access.
All three workarounds exist because DeviceContext has no built-in mechanism to
bridge commands into a running device loop.
Sub-topic problem¶
Devices that handle multiple command types — position commands alongside
calibration commands, or mode-switch commands alongside parameter-set commands —
must currently demultiplex by payload inspection. The single {prefix}/{device}/set
topic carries all command types, so handlers parse JSON payloads to determine intent.
This couples command routing to payload structure and makes adding new command types a
change to the parsing logic rather than a new subscription.
MQTT's topic hierarchy is the natural routing mechanism. A cover controller should
receive position commands on {prefix}/cover/set and calibration commands on
{prefix}/cover/calibrate/set — separate topics, separate handlers, no payload
inspection needed.
No Command dataclass¶
Handlers receive raw (topic: str, payload: str) tuples with no metadata —
no timestamp, no sub-topic context. Downstream code that needs to know which
sub-topic a command arrived on must re-parse the topic string.
Decision¶
Introduce two complementary features: a queue-backed async iterator on
DeviceContext and sub-topic routing via @ctx.on_command.
Command dataclass¶
A frozen, slotted dataclass represents inbound commands:
@dataclass(frozen=True, slots=True)
class Command:
topic: str
payload: str
sub_topic: str | None = None
timestamp: float = 0.0
Immutable and hashable. Commands are messages from MQTT — semantically they should
not be mutated after receipt. sub_topic is None for root-level commands
({device}/set). timestamp defaults to 0.0 and is set by the framework at
receipt time.
ctx.commands() async iterator¶
DeviceContext gains a commands(timeout: float | None = None) method that
returns an async iterator backed by an internal asyncio.Queue[Command]:
@app.device("actuator")
async def actuator_device(ctx):
async for cmd in ctx.commands(timeout=5):
if cmd is None:
await periodic_check()
else:
await process(cmd.payload)
When timeout is provided, the iterator yields None on timeout expiry,
enabling periodic-work patterns without external asyncio wrappers. Without
timeout, the iterator blocks until a command arrives or ctx.shutdown_requested
becomes true. Internally, a 1-second polling interval checks the shutdown flag.
Sub-topic routing¶
@ctx.on_command accepts an optional sub-topic string:
@ctx.on_command("calibrate")
async def handle_calibrate(topic, payload):
await run_calibration(payload)
This subscribes to {prefix}/{device}/calibrate/set. The root
@ctx.on_command (no argument) continues to subscribe to
{prefix}/{device}/set.
Topic segment order is {device}/{sub}/set — sub-topic before the /set
suffix. This matches Home Assistant and Zigbee2MQTT conventions and is
consistent with ADR-002 where /set is the terminal command suffix.
Sub-topic depth is limited to a single level. The sub-topic string must not
contain /; the framework raises ValueError at registration time if it does.
The TopicRouter currently rejects topics with / in the device segment —
single-level sub-topics preserve this invariant.
Per-sub-topic exclusivity¶
ctx.commands() and @ctx.on_command cannot both handle the same sub-topic on
the same device. Attempting to register both raises RuntimeError at
registration time. However, they can coexist on the same device for
different sub-topics:
@app.device("controller")
async def controller_device(ctx):
# Calibration: separate callback on sub-topic
@ctx.on_command("calibrate")
async def handle_calibrate(topic, payload):
await run_calibration(payload)
# Position: root commands via async iterator
async for cmd in ctx.commands(timeout=60):
if cmd is None:
await periodic_maintenance()
else:
await set_position(cmd.payload)
This extends the existing single-handler constraint (DeviceContext already
raises RuntimeError on duplicate on_command registration) to the
per-sub-topic level.
Command DI injection in @app.command handlers¶
@app.command handlers can declare cmd: Command to receive the full
Command object via dependency injection, in addition to the existing topic
and payload parameters:
@app.command("reset")
async def handle_reset(cmd: Command) -> None:
log.info("Reset at %s on %s", cmd.timestamp, cmd.topic)
await do_reset(cmd.payload)
This makes the interface consistent across @app.command and @app.device
patterns — both can work with the structured Command object.
Backward compatibility¶
Existing @ctx.on_command handlers with (topic, payload) signatures continue
to work unchanged. The Command dataclass is additive — handlers that don't
declare it never see it. Sub-topic routing is opt-in; devices with no sub-topic
argument behave exactly as before.
Decision Drivers¶
- Three independent workarounds. The queue bridge, inline execution, and nonlocal mutation patterns appear across multiple device implementations. A framework-level primitive eliminates the pattern.
- MQTT-native routing. Topic hierarchies are the protocol's built-in demultiplexing mechanism. Payload inspection for command routing works against the grain of MQTT.
- ADR-002 consistency. The
/setsuffix convention naturally extends to{device}/{sub}/setwithout inventing new conventions. - Structured command metadata. Handlers need timestamp and sub-topic context without re-parsing raw topic strings.
- Backward compatibility. Both features are additive — no existing API changes, no migration required.
Decision Matrix¶
Decision 1 (exclusivity model) is the most nuanced trade-off with three viable options. The remaining five decisions are binary or near-obvious — their rationale is covered in Considered Options below.
| Criterion | A: Fully exclusive | B: Exclusive per sub-topic (chosen) | C: Non-exclusive (both receive) |
|---|---|---|---|
| Simplicity of mental model | 5 | 4 | 2 |
| Flexibility for mixed cases | 1 | 4 | 5 |
| Risk of double-processing | 5 | 4 | 1 |
| Backward compatibility | 4 | 5 | 3 |
Scale: 1 (poor) to 5 (excellent)
Considered Options¶
Decision 1: Exclusivity model¶
Option A: Fully exclusive. ctx.commands() and @ctx.on_command cannot
coexist on the same device at all. Simple rule, but too restrictive — devices
that want a main command loop plus a separate calibration callback would need
to route everything through the queue.
Option B: Exclusive per sub-topic (chosen). Each sub-topic has exactly one owner. Allows clean separation of concerns on devices with multiple command types while preventing accidental double-processing.
Option C: Non-exclusive (both receive). Both the callback and the queue receive every command. Maximally flexible, but introduces ordering ambiguity and makes it easy to process commands twice.
Decision 2: Command dataclass mutability¶
Option A: Mutable dataclass. Allows handlers to annotate commands with processing metadata. Rejected — commands are inbound messages, not working state. Mutation invites action-at-a-distance bugs when commands are shared.
Option B: Frozen dataclass (chosen). Immutable and hashable. Matches the semantic nature of MQTT messages and enables safe passing across async boundaries.
Decision 3: Command DI in @app.command¶
Option A: No injection — handlers parse topic/payload manually. Status quo. Keeps the DI surface small but forces handlers that need metadata to re-derive it.
Option B: Injectable (chosen). Handlers declare cmd: Command to opt in.
Consistent with the framework's existing DI model (Settings, adapters, Logger).
Non-breaking — handlers without the parameter are unaffected.
Decision 4: Sub-topic depth¶
Option A: Single level only (chosen). @ctx.on_command("calibrate") maps
to {device}/calibrate/set. Simple, predictable, preserves the
TopicRouter's single-segment device constraint.
Option B: Arbitrary depth. @ctx.on_command("calibrate/step") maps to
{device}/calibrate/step/set. More expressive, but complicates topic matching,
introduces ambiguity in wildcard subscriptions, and breaks the TopicRouter's
assumption that device names are single segments.
Decision 5: Topic segment order¶
Option A: {device}/{sub}/set (chosen). Sub-topic before suffix. Matches
Home Assistant conventions (light/brightness/set) and keeps /set as the
terminal segment per ADR-002.
Option B: {device}/set/{sub}. Suffix before sub-topic. Allows wildcard
{device}/set/+ to catch all command types, but breaks the convention that
/set terminates the topic and is inconsistent with the broader ecosystem.
Decision 6: Timeout model¶
Option A: No built-in timeout. Callers wrap with asyncio.wait_for or
asyncio.timeout. Pushes boilerplate back to the user — the exact problem
ctx.commands() is solving.
Option B: Built-in timeout parameter (chosen). ctx.commands(timeout=60)
yields None on timeout. Clean periodic-work pattern without external
wrappers. The None sentinel is explicit — callers must handle it, which
prevents silent timeout bugs.
Option C: Separate ctx.commands_with_timeout() method. Avoids the
Command | None union type by splitting the API. Rejected — two methods for
the same underlying queue is unnecessary indirection.
Consequences¶
Positive¶
- Eliminates three boilerplate workaround patterns (inline, queue bridge, nonlocal mutation) with a single framework primitive
- Command routing via MQTT topics — natural for the protocol — instead of payload inspection
- Structured
Commandobject with timestamp and sub-topic metadata - Per-sub-topic exclusivity prevents accidental double-processing while allowing mixed patterns on the same device
- Fully backward-compatible — no breaking changes to existing APIs
- Timeout model enables clean periodic-work patterns without external
asynciowrappers - Consistent DI model —
Commandis injectable in@app.commandthe same waySettingsand adapters are
Negative¶
ctx.commands()yield type isCommand | Nonewhentimeoutis used — callers must handle the union- Internal queue adds a small per-command allocation
- Sub-topic routing changes
TopicRouterinternals — must handle the{device}/{sub}/setpattern alongside existing{device}/set @app.commandDI injection adds one more annotation-inspection path to the framework's startup logic
Neutral¶
@app.command(module-level) and@ctx.on_command(device-level callback) remain the recommended patterns for simple devices;ctx.commands()is for devices that need a control loop with command integration
References¶
- ADR-002: MQTT Topic Conventions
- ADR-010: Device Archetypes
- ADR-023:
on_configureLifecycle Phase
2026-03-31