Build a Command & Control Device¶
Command & control devices are bidirectional — they receive commands via MQTT and
publish state back. The @app.command() decorator is the recommended way to build
command devices: you write a simple function, the framework handles subscription,
dispatch, error isolation, and state publication.
Multiple similar command devices?
If you're registering several command devices with the same handler logic (e.g. a bank of relays), use dict-name decorators to avoid copy-paste. See Multi-Device Registration.
Prerequisites
This guide assumes you've completed the Quickstart.
How @app.command Works¶
The @app.command decorator registers a standalone async function that:
- Receives MQTT message values by name — parameters named
topicandpayloadare optional and injected only if declared in the handler signature. Declare only what you need. - Receives dependencies by type annotation — all other parameters (e.g.
ctx: DeviceContext, adapters) are injected by matching their type. - Auto-publishes state — if the handler returns a
dict, the framework JSON-serialises it and publishes to{prefix}/{name}/statewithretain=Trueandqos=1. ReturnNoneto skip auto-publishing. - Is error-isolated — if the handler raises an exception, the framework catches it, logs at ERROR level, and publishes a structured error payload to the error topics. Other devices are unaffected.
The framework subscribes to {prefix}/{name}/set and dispatches inbound messages
to your handler function.
Which pattern to use
| Pattern | Use when |
|---|---|
@app.command(name) |
Device reacts to MQTT commands. Simplest, most common. |
@app.telemetry(name, interval=N) |
Device polls/streams data on an interval. |
@app.device(name) + ctx.commands() |
Commands + periodic work in a single loop. |
@app.device(name) + @ctx.on_command |
Fire-and-forget command callbacks in a coroutine. |
See Device Archetypes for the full picture.
Why command devices always use DeviceContext
Unlike telemetry handlers (which can be zero-arg), command devices typically
request ctx: DeviceContext because they need ctx.publish_state(),
ctx.on_command, and ctx.sleep(). Other injectable types (Settings,
logging.Logger, ClockPort, adapter ports) are also available via
signature-based injection — but DeviceContext bundles them all for the
bidirectional use case.
A Minimal Command Device¶
import cosalette
app = cosalette.App(name="gas2mqtt", version="1.0.0")
@app.command("valve") # (1)!
async def handle_valve(payload: str) -> dict[str, object]: # (2)!
"""Handle valve commands."""
return {"state": payload} # (3)!
app.run()
"valve"is the device name — the framework subscribes togas2mqtt/valve/setfor inbound commands.payloadis injected by name from the MQTT message. The return type isdict[str, object] | None. Returning a dict triggers auto-publishing.- The returned dict is published as
{"state": "open"}(or whateverpayloadcontains) togas2mqtt/valve/statewithretain=True,qos=1.
Declare only the MQTT params you need
The handler above only declares payload. You can also declare topic: str
to receive the full MQTT topic string, add ctx: cosalette.DeviceContext
for framework services, or omit both MQTT params entirely:
# payload only (most common)
async def handle(payload: str) -> dict[str, object]: ...
# payload + topic
async def handle(topic: str, payload: str) -> dict[str, object]: ...
# payload + context
async def handle(payload: str, ctx: cosalette.DeviceContext) -> dict[str, object]: ...
# no MQTT params — side-effect only, uses adapter
async def handle(ctx: cosalette.DeviceContext) -> dict[str, object]: ...
When you run this, the framework:
- Connects to the MQTT broker.
- Subscribes to
gas2mqtt/valve/set. - Dispatches each inbound message to
handle_valve(). - Publishes the returned dict as JSON to
gas2mqtt/valve/state. - Keeps running until
SIGTERMorSIGINT.
Single-Device Apps (Root Device)¶
For apps with a single command device, omit the name to publish at the root level:
import cosalette
app = cosalette.App(name="relay2mqtt", version="1.0.0")
@app.command() # (1)!
async def handle(payload: str) -> dict[str, object]:
"""Control the relay."""
return {"state": payload}
app.run()
- No device name — subscribes to
relay2mqtt/setand publishes state torelay2mqtt/state.
The same root device rules apply as for telemetry — see Single-Device Apps for details on naming, topics, and constraints.
The Command Handler¶
An @app.command handler is a plain async def function with two kinds of
parameters:
topicandpayload(by name, both optional) — the full MQTT topic string (e.g.gas2mqtt/valve/set) and the raw message payload string. Declare only the ones your handler needs — the framework inspects the function signature at registration time and injects only what is declared.- Everything else (by type annotation) — injected automatically.
ctx: cosalette.DeviceContextis the most common, but adapters work too.
@app.command("valve")
async def handle_valve(payload: str) -> dict[str, object] | None:
valid_commands = {"open", "close", "toggle"}
if payload not in valid_commands:
raise ValueError(f"Unknown command: {payload!r}") # (1)!
return {"state": payload}
- Raising inside the command handler is safe. The framework catches handler exceptions, logs them at ERROR level, and publishes a structured error payload. Other devices and subsequent commands continue normally.
Return Value Contract¶
| Return value | Framework behaviour |
|---|---|
dict |
JSON-serialised and published to {prefix}/{name}/state |
None |
No state publication — use when you publish manually or conditionally |
Using DeviceContext¶
The DeviceContext gives you access to shared infrastructure without globals:
@app.command("valve")
async def handle_valve(
payload: str, ctx: cosalette.DeviceContext
) -> dict[str, object]:
settings = ctx.settings # (1)!
device_name = ctx.name # (2)!
clock_value = ctx.clock.now() # (3)!
return {"state": payload, "updated_at": clock_value}
- Access the application
Settingsinstance (or your custom subclass). - The device name as registered —
"valve"in this case. - The monotonic clock port — useful for timing calculations. In tests, this is a
FakeClockyou control directly.
DeviceContext vs AppContext
Command and device functions receive DeviceContext, which has publish, sleep,
and adapter capabilities. Lifecycle hooks receive AppContext, which only has
.settings and .adapter(). Don't mix them up — see
Lifespan for details.
DeviceContext API¶
@app.command handlers can use a subset of the DeviceContext surface (the parts
relevant to per-message handling):
| Property / Method | Description |
|---|---|
ctx.name |
Device name as registered ("valve") |
ctx.settings |
Application Settings instance |
ctx.clock |
Monotonic clock port |
ctx.adapter(PortType) |
Resolve a registered adapter |
ctx.publish_state(dict) |
Publish to {prefix}/{name}/state (retained) — manual override |
ctx.publish(channel, str) |
Publish to {prefix}/{name}/{channel} (arbitrary) |
Auto-publish vs manual publish
For most command handlers, simply return a dict and let the framework publish.
Use ctx.publish_state() directly only when you need side-effect publishing
(e.g. publishing to multiple channels) and return None to skip the
auto-publish.
Resolving Adapters¶
When your command device needs hardware access, use the adapter pattern:
from gas2mqtt.ports import RelayPort
@app.command("valve")
async def handle_valve(
payload: str, ctx: cosalette.DeviceContext
) -> dict[str, object]:
relay = ctx.adapter(RelayPort) # (1)!
match payload:
case "on":
relay.turn_on()
case "off":
relay.turn_off()
case _:
raise ValueError(f"Unknown command: {payload!r}")
return {"state": payload}
- Resolves the adapter registered for
RelayPort. RaisesLookupErrorif no adapter is registered. See Hardware Adapters for registration.
Imperative Registration¶
The @app.command decorator works great when the handler is defined in the same
module as the App. When the handler lives in a separate module — a device
library, a shared utility, or a generated function — the decorator forces you to
write a pass-through wrapper:
from my_devices import handle_switch
@app.command("switch")
async def switch_handler(payload: str) -> dict[str, object]:
return await handle_switch(payload) # just forwarding
The app.add_command() method eliminates the wrapper — register the imported
function directly:
from my_devices import handle_switch
app.add_command("switch", handle_switch)
Full Signature¶
app.add_command(
name, # device name (always required — no root device)
func, # async callable receiving topic/payload by name, others by type
*,
init=None, # optional synchronous factory
)
All keyword parameters behave identically to the decorator form.
Using init=¶
init= works the same way as the decorator — pass a synchronous factory whose
return value is injected by type:
from dataclasses import dataclass
from my_devices import handle_switch
@dataclass
class SwitchState:
position: str = "off"
command_count: int = 0
def make_state() -> SwitchState:
return SwitchState()
app.add_command("switch", handle_switch, init=make_state)
Choosing Between Decorator and Imperative¶
| Scenario | Preferred style |
|---|---|
| Handler defined inline, same file | @app.command decorator |
| Handler imported from another module | app.add_command() |
| Handler generated dynamically (factory) | app.add_command() |
| Registering in a loop | app.add_command() |
/// admonition | Identical validation type: info
Both paths run the same registration logic — signature validation,
init= type-collision checks, and duplicate-name detection happen
identically whether you use the decorator or add_command().
///
/// admonition | Named devices only type: warning
add_command() always requires a device name — root (unnamed)
devices can only be created via the decorator.
///
Conditional Registration¶
Use enabled= to skip registration based on a settings flag — no if block needed:
settings = app.settings
if settings.enable_valve:
@app.command("valve")
async def handle_valve(payload: str) -> dict[str, object]:
return {"state": payload}
settings = app.settings
@app.command("valve", enabled=settings.enable_valve)
async def handle_valve(payload: str) -> dict[str, object]:
return {"state": payload}
The imperative form works identically:
/// admonition | Disabled devices are invisible type: info
When enabled=False, the device is not registered at all — it won't
appear in MQTT topics, won't reserve a name slot, and won't consume
resources at runtime.
///
Stateful Command Handlers¶
For most command handlers, the return-dict pattern is sufficient — state is
derived from the payload and returned directly. When you need to track state
across multiple commands (e.g. toggle), the init= parameter is the
recommended approach.
Using init= for Per-Device State¶
The init= parameter accepts a synchronous factory callable that runs
once before the first command is dispatched. The result is injected into
the handler by type on every subsequent invocation:
import cosalette
from dataclasses import dataclass, field
app = cosalette.App(name="gas2mqtt", version="1.0.0")
@dataclass
class ValveState: # (1)!
"""Tracks valve position across commands."""
position: str = "closed"
command_count: int = 0
def make_valve_state() -> ValveState:
return ValveState()
@app.command("valve", init=make_valve_state) # (2)!
async def handle_valve(
payload: str, state: ValveState # (3)!
) -> dict[str, object]:
state.command_count += 1
match payload:
case "open":
state.position = "open"
case "close":
state.position = "closed"
case "toggle":
state.position = (
"open" if state.position == "closed" else "closed"
)
case _:
raise ValueError(f"Unknown command: {payload!r}")
return {"state": state.position, "commands_received": state.command_count}
app.run()
- A plain dataclass — no framework base class needed. The
init=callback creates it, the framework injects it. init=make_valve_stateruns once before the first command. The returnedValveStateinstance is reused for every subsequent message.- The handler receives the same
ValveStateinstance on every call — noglobal, nononlocal, no closures.
Why init= over module-level globals?
- Explicit ownership — the state is scoped to the device registration, not floating at module level.
- Testable — create a
ValveState()directly in tests without importing the module's global variable. - No
globalkeyword — the handler mutates a regular object, which is easier to reason about and lint-friendly.
Legacy Pattern: Module-Level State¶
Before init=, the idiomatic approach was a module-level variable with
global. This still works but is less clean:
_valve_state = "closed"
@app.command("valve")
async def handle_valve(
payload: str, ctx: cosalette.DeviceContext
) -> dict[str, object]:
global _valve_state
match payload:
case "open":
_valve_state = "open"
case "close":
_valve_state = "closed"
case "toggle":
_valve_state = (
"open" if _valve_state == "closed" else "closed"
)
case _:
raise ValueError(f"Unknown command: {payload!r}")
return {"state": _valve_state}
Rules and Constraints¶
The init= callback follows the same rules across all three decorators:
- Synchronous only —
async defcallbacks raiseTypeErrorat decoration time. - Type collision guard — returning a framework-provided type raises
TypeError. - Fail-fast — bad signatures are caught at decoration time.
For full details and examples, see Initialisation Callbacks in the telemetry guide.
When state gets complex
For devices with many state variables or complex transitions, extract a
dataclass or a small state class. For devices that need background loops,
periodic state refresh, or combined command + telemetry behaviour, use
@app.device — see Command Handling in @app.device
below.
Practical Example: WiFi Smart Plug¶
A complete command device for a WiFi relay (smart plug) with on/off/toggle support:
"""gas2mqtt — Smart plug (relay) command device."""
from __future__ import annotations
from typing import Protocol, runtime_checkable
import cosalette
@runtime_checkable
class RelayPort(Protocol):
"""Hardware abstraction for a relay switch."""
def turn_on(self) -> None: ...
def turn_off(self) -> None: ...
def is_on(self) -> bool: ...
app = cosalette.App(name="gas2mqtt", version="1.0.0")
@app.command("plug") # (1)!
async def handle_plug(
payload: str, ctx: cosalette.DeviceContext
) -> dict[str, object]:
"""Control a smart plug relay via MQTT commands."""
relay = ctx.adapter(RelayPort)
match payload:
case "on":
relay.turn_on()
case "off":
relay.turn_off()
case "toggle":
if relay.is_on():
relay.turn_off()
else:
relay.turn_on()
case _:
raise ValueError(
f"Unknown command: {payload!r}. "
f"Valid: on, off, toggle"
)
state = "on" if relay.is_on() else "off"
return {"state": state} # (2)!
app.run()
@app.command("plug")— no closure, no main loop, nononlocal. Just a function that receives a command and returns state.- The returned dict is auto-published to
gas2mqtt/plug/state.
MQTT interaction:
Error Behaviour¶
When an @app.command handler raises an exception:
- The framework catches it (except
CancelledError). - Logs the error at
ERRORlevel. - Publishes a structured error payload to
{prefix}/errorand{prefix}/{name}/error. - Continues normally — subsequent commands are dispatched as usual. Other devices are unaffected.
@app.command("valve")
async def handle_valve(payload: str) -> dict[str, object]:
if payload not in {"open", "close"}:
raise ValueError(f"Invalid command: {payload!r}") # (1)!
return {"state": payload}
- The framework catches
ValueError, publishes the error, and continues listening for the next command. No crash, no restart needed.
Custom error types
For machine-readable error classification, define an error_type_map. See
Custom Error Types for details.
Validate early
Check command payloads at the top of your handler and raise with a descriptive message. This gives consumers clear error feedback via the MQTT error topic.
Command Handling in @app.device¶
@app.command covers most command device use cases. Use @app.device when you
need a long-running coroutine — periodic hardware polling, state machines, or
combined command + telemetry behaviour.
| Need | Use |
|---|---|
| Simple command → state | @app.command ✓ |
| Command with hardware adapter | @app.command ✓ |
| Periodic state refresh + commands | @app.device + ctx.commands() |
| Fire-and-forget command callbacks | @app.device + @ctx.on_command |
| Multiple command sub-topics | @ctx.on_command("sub") |
The Command Iterator: ctx.commands()¶
The ctx.commands() async iterator is the recommended pattern for @app.device
loops that need to react to commands. It replaces manual asyncio.Queue bridges:
@app.device("thermostat")
async def thermostat(ctx: cosalette.DeviceContext) -> None:
target = 20.0
async for cmd in ctx.commands(timeout=10): # (1)!
if cmd is None: # (2)!
current = await read_sensor()
await ctx.publish_state({"current": current, "target": target})
else: # (3)!
target = float(cmd.payload)
await ctx.publish_state({"target": target})
timeout=10— yieldsNoneevery 10 seconds when no command arrives. Withouttimeout, blocks until a command arrives or shutdown is requested.None→ timeout expired. Poll the sensor and publish current state.Commandobject arrived.cmd.payloadcontains the raw MQTT payload,cmd.topicthe full topic string,cmd.timestampthe monotonic receive time.
The iterator drives the device loop — no while / ctx.sleep() / shutdown_requested
needed. The framework terminates the iterator on shutdown and drains any queued
commands.
Sub-Topic Routing with @ctx.on_command¶
When a device handles multiple command types, sub-topic routing avoids payload inspection. Each sub-topic gets its own MQTT topic:
@app.device("cover")
async def cover(ctx: cosalette.DeviceContext) -> None:
driver = ctx.adapter(CoverPort)
@ctx.on_command # {prefix}/cover/set
async def handle_position(cmd: cosalette.Command) -> None:
await driver.set_position(int(cmd.payload))
await ctx.publish_state({"position": int(cmd.payload)})
@ctx.on_command("calibrate") # {prefix}/cover/calibrate/set
async def handle_cal(cmd: cosalette.Command) -> None:
await driver.calibrate(cmd.payload)
await ctx.publish_state({"calibrating": True})
await ctx.publish_state({"position": await driver.read_position()})
while not ctx.shutdown_requested:
await ctx.sleep(30)
The framework subscribes to {prefix}/cover/+/set (wildcard) and routes each
message to the matching handler based on the sub-topic segment.
Sub-topic naming rules
Sub-topic strings must be non-empty, single-level identifiers — no /, +,
or # characters. Invalid names raise ValueError at registration time.
Combining Iterator and Sub-Topic Handlers¶
ctx.commands() and sub-topic @ctx.on_command("sub") can coexist. The iterator
receives root commands while sub-topic handlers fire independently:
@app.device("cover")
async def cover(ctx: cosalette.DeviceContext) -> None:
driver = ctx.adapter(CoverPort)
@ctx.on_command("calibrate") # {prefix}/cover/calibrate/set
async def handle_cal(cmd: cosalette.Command) -> None:
await driver.calibrate(cmd.payload)
async for cmd in ctx.commands(timeout=30): # {prefix}/cover/set
if cmd is None:
pos = await driver.read_position()
await ctx.publish_state({"position": pos})
else:
await driver.set_position(int(cmd.payload))
await ctx.publish_state({"position": int(cmd.payload)})
Exclusivity rules:
- A root
@ctx.on_command(no sub-topic) andctx.commands()cannot coexist — both claim root commands. Choose one. - Sub-topic handlers and
ctx.commands()can coexist freely. ctx.commands()can only be called once per device context.
Periodic State Refresh¶
When a device needs to poll hardware between commands:
@app.device("valve")
async def valve(ctx: cosalette.DeviceContext) -> None:
controller = ctx.adapter(ValveControllerPort)
async for cmd in ctx.commands(timeout=10):
if cmd is None:
state = controller.read_state()
else:
controller.actuate(cmd.payload)
state = controller.read_state()
await ctx.publish_state({"state": state})
Every 10 seconds (or after a command), re-read hardware and publish updated state. This catches out-of-band changes (e.g. someone pressing a physical button).
Error Handling in @app.device¶
Errors can occur in two places:
- In a command handler — The framework's command proxy catches the exception
and publishes a structured error payload via
ErrorPublisher(fire-and-forget). The device loop continues unaffected. - In the main loop — if the device coroutine crashes, the framework catches the exception, logs it, and publishes an error. The device task ends, but other devices continue.
Imperative Registration with add_device()¶
Like add_command() and add_telemetry(), there is an add_device()
method for registering imported device coroutines:
The signature mirrors the decorator:
app.add_device(
name, # device name (always required — no root device)
func, # async callable implementing the device loop
*,
init=None, # optional synchronous factory
enabled=True, # False to skip registration
)
See Also¶
- Device Archetypes — command vs telemetry vs device archetypes
- Telemetry Device — deep dive into
@app.telemetry - MQTT Topics — topic layout for commands and state
- Error Handling — how the framework isolates errors
- ADR-010 — the decision behind device archetypes
- ADR-002 — MQTT topic conventions
- ADR-025 — command channel and sub-topic routing design