Skip to content

Build a Command & Control Device

Command & control devices are bidirectional — they receive commands via MQTT and publish state back. The @app.command() decorator is the recommended way to build command devices: you write a simple function, the framework handles subscription, dispatch, error isolation, and state publication.

Multiple similar command devices?

If you're registering several command devices with the same handler logic (e.g. a bank of relays), use dict-name decorators to avoid copy-paste. See Multi-Device Registration.

Prerequisites

This guide assumes you've completed the Quickstart.

How @app.command Works

The @app.command decorator registers a standalone async function that:

  1. Receives MQTT message values by name — parameters named topic and payload are optional and injected only if declared in the handler signature. Declare only what you need.
  2. Receives dependencies by type annotation — all other parameters (e.g. ctx: DeviceContext, adapters) are injected by matching their type.
  3. Auto-publishes state — if the handler returns a dict, the framework JSON-serialises it and publishes to {prefix}/{name}/state with retain=True and qos=1. Return None to skip auto-publishing.
  4. Is error-isolated — if the handler raises an exception, the framework catches it, logs at ERROR level, and publishes a structured error payload to the error topics. Other devices are unaffected.

The framework subscribes to {prefix}/{name}/set and dispatches inbound messages to your handler function.

Which pattern to use

Pattern Use when
@app.command(name) Device reacts to MQTT commands. Simplest, most common.
@app.telemetry(name, interval=N) Device polls/streams data on an interval.
@app.device(name) + ctx.commands() Commands + periodic work in a single loop.
@app.device(name) + @ctx.on_command Fire-and-forget command callbacks in a coroutine.

See Device Archetypes for the full picture.

Why command devices always use DeviceContext

Unlike telemetry handlers (which can be zero-arg), command devices typically request ctx: DeviceContext because they need ctx.publish_state(), ctx.on_command, and ctx.sleep(). Other injectable types (Settings, logging.Logger, ClockPort, adapter ports) are also available via signature-based injection — but DeviceContext bundles them all for the bidirectional use case.

A Minimal Command Device

app.py
import cosalette

app = cosalette.App(name="gas2mqtt", version="1.0.0")


@app.command("valve")  # (1)!
async def handle_valve(payload: str) -> dict[str, object]:  # (2)!
    """Handle valve commands."""
    return {"state": payload}  # (3)!


app.run()
  1. "valve" is the device name — the framework subscribes to gas2mqtt/valve/set for inbound commands.
  2. payload is injected by name from the MQTT message. The return type is dict[str, object] | None. Returning a dict triggers auto-publishing.
  3. The returned dict is published as {"state": "open"} (or whatever payload contains) to gas2mqtt/valve/state with retain=True, qos=1.

Declare only the MQTT params you need

The handler above only declares payload. You can also declare topic: str to receive the full MQTT topic string, add ctx: cosalette.DeviceContext for framework services, or omit both MQTT params entirely:

# payload only (most common)
async def handle(payload: str) -> dict[str, object]: ...

# payload + topic
async def handle(topic: str, payload: str) -> dict[str, object]: ...

# payload + context
async def handle(payload: str, ctx: cosalette.DeviceContext) -> dict[str, object]: ...

# no MQTT params — side-effect only, uses adapter
async def handle(ctx: cosalette.DeviceContext) -> dict[str, object]: ...

When you run this, the framework:

  • Connects to the MQTT broker.
  • Subscribes to gas2mqtt/valve/set.
  • Dispatches each inbound message to handle_valve().
  • Publishes the returned dict as JSON to gas2mqtt/valve/state.
  • Keeps running until SIGTERM or SIGINT.

Single-Device Apps (Root Device)

For apps with a single command device, omit the name to publish at the root level:

app.py
import cosalette

app = cosalette.App(name="relay2mqtt", version="1.0.0")


@app.command()  # (1)!
async def handle(payload: str) -> dict[str, object]:
    """Control the relay."""
    return {"state": payload}


app.run()
  1. No device name — subscribes to relay2mqtt/set and publishes state to relay2mqtt/state.

The same root device rules apply as for telemetry — see Single-Device Apps for details on naming, topics, and constraints.

The Command Handler

An @app.command handler is a plain async def function with two kinds of parameters:

  • topic and payload (by name, both optional) — the full MQTT topic string (e.g. gas2mqtt/valve/set) and the raw message payload string. Declare only the ones your handler needs — the framework inspects the function signature at registration time and injects only what is declared.
  • Everything else (by type annotation) — injected automatically. ctx: cosalette.DeviceContext is the most common, but adapters work too.
Handler with validation
@app.command("valve")
async def handle_valve(payload: str) -> dict[str, object] | None:
    valid_commands = {"open", "close", "toggle"}

    if payload not in valid_commands:
        raise ValueError(f"Unknown command: {payload!r}")  # (1)!

    return {"state": payload}
  1. Raising inside the command handler is safe. The framework catches handler exceptions, logs them at ERROR level, and publishes a structured error payload. Other devices and subsequent commands continue normally.

Return Value Contract

Return value Framework behaviour
dict JSON-serialised and published to {prefix}/{name}/state
None No state publication — use when you publish manually or conditionally

Using DeviceContext

The DeviceContext gives you access to shared infrastructure without globals:

app.py
@app.command("valve")
async def handle_valve(
    payload: str, ctx: cosalette.DeviceContext
) -> dict[str, object]:
    settings = ctx.settings          # (1)!
    device_name = ctx.name           # (2)!
    clock_value = ctx.clock.now()    # (3)!

    return {"state": payload, "updated_at": clock_value}
  1. Access the application Settings instance (or your custom subclass).
  2. The device name as registered — "valve" in this case.
  3. The monotonic clock port — useful for timing calculations. In tests, this is a FakeClock you control directly.

DeviceContext vs AppContext

Command and device functions receive DeviceContext, which has publish, sleep, and adapter capabilities. Lifecycle hooks receive AppContext, which only has .settings and .adapter(). Don't mix them up — see Lifespan for details.

DeviceContext API

@app.command handlers can use a subset of the DeviceContext surface (the parts relevant to per-message handling):

Property / Method Description
ctx.name Device name as registered ("valve")
ctx.settings Application Settings instance
ctx.clock Monotonic clock port
ctx.adapter(PortType) Resolve a registered adapter
ctx.publish_state(dict) Publish to {prefix}/{name}/state (retained) — manual override
ctx.publish(channel, str) Publish to {prefix}/{name}/{channel} (arbitrary)

Auto-publish vs manual publish

For most command handlers, simply return a dict and let the framework publish. Use ctx.publish_state() directly only when you need side-effect publishing (e.g. publishing to multiple channels) and return None to skip the auto-publish.

Resolving Adapters

When your command device needs hardware access, use the adapter pattern:

app.py
from gas2mqtt.ports import RelayPort

@app.command("valve")
async def handle_valve(
    payload: str, ctx: cosalette.DeviceContext
) -> dict[str, object]:
    relay = ctx.adapter(RelayPort)  # (1)!

    match payload:
        case "on":
            relay.turn_on()
        case "off":
            relay.turn_off()
        case _:
            raise ValueError(f"Unknown command: {payload!r}")

    return {"state": payload}
  1. Resolves the adapter registered for RelayPort. Raises LookupError if no adapter is registered. See Hardware Adapters for registration.

Imperative Registration

The @app.command decorator works great when the handler is defined in the same module as the App. When the handler lives in a separate module — a device library, a shared utility, or a generated function — the decorator forces you to write a pass-through wrapper:

app.py — wrapper approach (verbose)
from my_devices import handle_switch

@app.command("switch")
async def switch_handler(payload: str) -> dict[str, object]:
    return await handle_switch(payload)  # just forwarding

The app.add_command() method eliminates the wrapper — register the imported function directly:

app.py — imperative approach
from my_devices import handle_switch

app.add_command("switch", handle_switch)

Full Signature

app.add_command(
    name,       # device name (always required — no root device)
    func,       # async callable receiving topic/payload by name, others by type
    *,
    init=None,  # optional synchronous factory
)

All keyword parameters behave identically to the decorator form.

Using init=

init= works the same way as the decorator — pass a synchronous factory whose return value is injected by type:

app.py
from dataclasses import dataclass
from my_devices import handle_switch


@dataclass
class SwitchState:
    position: str = "off"
    command_count: int = 0


def make_state() -> SwitchState:
    return SwitchState()


app.add_command("switch", handle_switch, init=make_state)

Choosing Between Decorator and Imperative

Scenario Preferred style
Handler defined inline, same file @app.command decorator
Handler imported from another module app.add_command()
Handler generated dynamically (factory) app.add_command()
Registering in a loop app.add_command()

/// admonition | Identical validation type: info

Both paths run the same registration logic — signature validation, init= type-collision checks, and duplicate-name detection happen identically whether you use the decorator or add_command(). ///

/// admonition | Named devices only type: warning

add_command() always requires a device name — root (unnamed) devices can only be created via the decorator. ///

Conditional Registration

Use enabled= to skip registration based on a settings flag — no if block needed:

Before — imperative if-block
settings = app.settings

if settings.enable_valve:
    @app.command("valve")
    async def handle_valve(payload: str) -> dict[str, object]:
        return {"state": payload}
After — declarative enabled=
settings = app.settings

@app.command("valve", enabled=settings.enable_valve)
async def handle_valve(payload: str) -> dict[str, object]:
    return {"state": payload}

The imperative form works identically:

app.add_command("valve", handle_valve, enabled=settings.enable_valve)

/// admonition | Disabled devices are invisible type: info

When enabled=False, the device is not registered at all — it won't appear in MQTT topics, won't reserve a name slot, and won't consume resources at runtime. ///

Stateful Command Handlers

For most command handlers, the return-dict pattern is sufficient — state is derived from the payload and returned directly. When you need to track state across multiple commands (e.g. toggle), the init= parameter is the recommended approach.

Using init= for Per-Device State

The init= parameter accepts a synchronous factory callable that runs once before the first command is dispatched. The result is injected into the handler by type on every subsequent invocation:

app.py
import cosalette
from dataclasses import dataclass, field

app = cosalette.App(name="gas2mqtt", version="1.0.0")


@dataclass
class ValveState:  # (1)!
    """Tracks valve position across commands."""

    position: str = "closed"
    command_count: int = 0


def make_valve_state() -> ValveState:
    return ValveState()


@app.command("valve", init=make_valve_state)  # (2)!
async def handle_valve(
    payload: str, state: ValveState  # (3)!
) -> dict[str, object]:
    state.command_count += 1

    match payload:
        case "open":
            state.position = "open"
        case "close":
            state.position = "closed"
        case "toggle":
            state.position = (
                "open" if state.position == "closed" else "closed"
            )
        case _:
            raise ValueError(f"Unknown command: {payload!r}")

    return {"state": state.position, "commands_received": state.command_count}


app.run()
  1. A plain dataclass — no framework base class needed. The init= callback creates it, the framework injects it.
  2. init=make_valve_state runs once before the first command. The returned ValveState instance is reused for every subsequent message.
  3. The handler receives the same ValveState instance on every call — no global, no nonlocal, no closures.

Why init= over module-level globals?

  • Explicit ownership — the state is scoped to the device registration, not floating at module level.
  • Testable — create a ValveState() directly in tests without importing the module's global variable.
  • No global keyword — the handler mutates a regular object, which is easier to reason about and lint-friendly.

Legacy Pattern: Module-Level State

Before init=, the idiomatic approach was a module-level variable with global. This still works but is less clean:

app.py (legacy pattern)
_valve_state = "closed"


@app.command("valve")
async def handle_valve(
    payload: str, ctx: cosalette.DeviceContext
) -> dict[str, object]:
    global _valve_state

    match payload:
        case "open":
            _valve_state = "open"
        case "close":
            _valve_state = "closed"
        case "toggle":
            _valve_state = (
                "open" if _valve_state == "closed" else "closed"
            )
        case _:
            raise ValueError(f"Unknown command: {payload!r}")

    return {"state": _valve_state}

Rules and Constraints

The init= callback follows the same rules across all three decorators:

  • Synchronous onlyasync def callbacks raise TypeError at decoration time.
  • Type collision guard — returning a framework-provided type raises TypeError.
  • Fail-fast — bad signatures are caught at decoration time.

For full details and examples, see Initialisation Callbacks in the telemetry guide.

When state gets complex

For devices with many state variables or complex transitions, extract a dataclass or a small state class. For devices that need background loops, periodic state refresh, or combined command + telemetry behaviour, use @app.device — see Command Handling in @app.device below.

Practical Example: WiFi Smart Plug

A complete command device for a WiFi relay (smart plug) with on/off/toggle support:

app.py
"""gas2mqtt — Smart plug (relay) command device."""

from __future__ import annotations

from typing import Protocol, runtime_checkable

import cosalette


@runtime_checkable
class RelayPort(Protocol):
    """Hardware abstraction for a relay switch."""

    def turn_on(self) -> None: ...
    def turn_off(self) -> None: ...
    def is_on(self) -> bool: ...


app = cosalette.App(name="gas2mqtt", version="1.0.0")


@app.command("plug")  # (1)!
async def handle_plug(
    payload: str, ctx: cosalette.DeviceContext
) -> dict[str, object]:
    """Control a smart plug relay via MQTT commands."""
    relay = ctx.adapter(RelayPort)

    match payload:
        case "on":
            relay.turn_on()
        case "off":
            relay.turn_off()
        case "toggle":
            if relay.is_on():
                relay.turn_off()
            else:
                relay.turn_on()
        case _:
            raise ValueError(
                f"Unknown command: {payload!r}. "
                f"Valid: on, off, toggle"
            )

    state = "on" if relay.is_on() else "off"
    return {"state": state}  # (2)!


app.run()
  1. @app.command("plug") — no closure, no main loop, no nonlocal. Just a function that receives a command and returns state.
  2. The returned dict is auto-published to gas2mqtt/plug/state.

MQTT interaction:

→ gas2mqtt/plug/set       "on"
← gas2mqtt/plug/state     {"state": "on"}
→ gas2mqtt/plug/set       "toggle"
← gas2mqtt/plug/state     {"state": "off"}
→ gas2mqtt/plug/set       "blink"
← gas2mqtt/error           {"error_type": "error", "message": "Unknown command: 'blink'..."}
← gas2mqtt/plug/error     {"error_type": "error", "message": "Unknown command: 'blink'..."}

Error Behaviour

When an @app.command handler raises an exception:

  1. The framework catches it (except CancelledError).
  2. Logs the error at ERROR level.
  3. Publishes a structured error payload to {prefix}/error and {prefix}/{name}/error.
  4. Continues normally — subsequent commands are dispatched as usual. Other devices are unaffected.
Example error flow
@app.command("valve")
async def handle_valve(payload: str) -> dict[str, object]:
    if payload not in {"open", "close"}:
        raise ValueError(f"Invalid command: {payload!r}")  # (1)!
    return {"state": payload}
  1. The framework catches ValueError, publishes the error, and continues listening for the next command. No crash, no restart needed.

Custom error types

For machine-readable error classification, define an error_type_map. See Custom Error Types for details.

Validate early

Check command payloads at the top of your handler and raise with a descriptive message. This gives consumers clear error feedback via the MQTT error topic.

Command Handling in @app.device

@app.command covers most command device use cases. Use @app.device when you need a long-running coroutine — periodic hardware polling, state machines, or combined command + telemetry behaviour.

Need Use
Simple command → state @app.command
Command with hardware adapter @app.command
Periodic state refresh + commands @app.device + ctx.commands()
Fire-and-forget command callbacks @app.device + @ctx.on_command
Multiple command sub-topics @ctx.on_command("sub")

The Command Iterator: ctx.commands()

The ctx.commands() async iterator is the recommended pattern for @app.device loops that need to react to commands. It replaces manual asyncio.Queue bridges:

@app.device("thermostat")
async def thermostat(ctx: cosalette.DeviceContext) -> None:
    target = 20.0

    async for cmd in ctx.commands(timeout=10):  # (1)!
        if cmd is None:                          # (2)!
            current = await read_sensor()
            await ctx.publish_state({"current": current, "target": target})
        else:                                    # (3)!
            target = float(cmd.payload)
            await ctx.publish_state({"target": target})
  1. timeout=10 — yields None every 10 seconds when no command arrives. Without timeout, blocks until a command arrives or shutdown is requested.
  2. None → timeout expired. Poll the sensor and publish current state.
  3. Command object arrived. cmd.payload contains the raw MQTT payload, cmd.topic the full topic string, cmd.timestamp the monotonic receive time.

The iterator drives the device loop — no while / ctx.sleep() / shutdown_requested needed. The framework terminates the iterator on shutdown and drains any queued commands.

Sub-Topic Routing with @ctx.on_command

When a device handles multiple command types, sub-topic routing avoids payload inspection. Each sub-topic gets its own MQTT topic:

@app.device("cover")
async def cover(ctx: cosalette.DeviceContext) -> None:
    driver = ctx.adapter(CoverPort)

    @ctx.on_command                           # {prefix}/cover/set
    async def handle_position(cmd: cosalette.Command) -> None:
        await driver.set_position(int(cmd.payload))
        await ctx.publish_state({"position": int(cmd.payload)})

    @ctx.on_command("calibrate")              # {prefix}/cover/calibrate/set
    async def handle_cal(cmd: cosalette.Command) -> None:
        await driver.calibrate(cmd.payload)
        await ctx.publish_state({"calibrating": True})

    await ctx.publish_state({"position": await driver.read_position()})
    while not ctx.shutdown_requested:
        await ctx.sleep(30)

The framework subscribes to {prefix}/cover/+/set (wildcard) and routes each message to the matching handler based on the sub-topic segment.

Sub-topic naming rules

Sub-topic strings must be non-empty, single-level identifiers — no /, +, or # characters. Invalid names raise ValueError at registration time.

Combining Iterator and Sub-Topic Handlers

ctx.commands() and sub-topic @ctx.on_command("sub") can coexist. The iterator receives root commands while sub-topic handlers fire independently:

@app.device("cover")
async def cover(ctx: cosalette.DeviceContext) -> None:
    driver = ctx.adapter(CoverPort)

    @ctx.on_command("calibrate")              # {prefix}/cover/calibrate/set
    async def handle_cal(cmd: cosalette.Command) -> None:
        await driver.calibrate(cmd.payload)

    async for cmd in ctx.commands(timeout=30): # {prefix}/cover/set
        if cmd is None:
            pos = await driver.read_position()
            await ctx.publish_state({"position": pos})
        else:
            await driver.set_position(int(cmd.payload))
            await ctx.publish_state({"position": int(cmd.payload)})

Exclusivity rules:

  • A root @ctx.on_command (no sub-topic) and ctx.commands() cannot coexist — both claim root commands. Choose one.
  • Sub-topic handlers and ctx.commands() can coexist freely.
  • ctx.commands() can only be called once per device context.

Periodic State Refresh

When a device needs to poll hardware between commands:

@app.device("valve")
async def valve(ctx: cosalette.DeviceContext) -> None:
    controller = ctx.adapter(ValveControllerPort)

    async for cmd in ctx.commands(timeout=10):
        if cmd is None:
            state = controller.read_state()
        else:
            controller.actuate(cmd.payload)
            state = controller.read_state()
        await ctx.publish_state({"state": state})

Every 10 seconds (or after a command), re-read hardware and publish updated state. This catches out-of-band changes (e.g. someone pressing a physical button).

Error Handling in @app.device

Errors can occur in two places:

  1. In a command handler — The framework's command proxy catches the exception and publishes a structured error payload via ErrorPublisher (fire-and-forget). The device loop continues unaffected.
  2. In the main loop — if the device coroutine crashes, the framework catches the exception, logs it, and publishes an error. The device task ends, but other devices continue.

Imperative Registration with add_device()

Like add_command() and add_telemetry(), there is an add_device() method for registering imported device coroutines:

from my_devices import valve_loop

app.add_device("valve", valve_loop)

The signature mirrors the decorator:

app.add_device(
    name,       # device name (always required — no root device)
    func,       # async callable implementing the device loop
    *,
    init=None,  # optional synchronous factory
    enabled=True,  # False to skip registration
)

See Also