Skip to content

Device Archetypes

Cosalette recognises three device archetypes. Every device in an IoT-to-MQTT bridge falls into one of these categories — or can be expressed as a composition of them.

Device Archetypes

Aspect Command (@app.command) Telemetry (@app.telemetry) Device (@app.device)
Direction Bidirectional Unidirectional (default) or bidirectional (triggerable=True) Bidirectional or unidirectional
Execution model Per-message dispatch Framework-managed polling loop Long-running async generator
Inbound commands Automatic — handler receives them Optional via triggerable=True ctx.commands() or @ctx.on_command
State publishing Automatic — return a dict Automatic — return a dict Manual via ctx.publish_state()
Publish control Not applicable publish= strategies Manual (your loop logic)
Reaction boundary After successful return After successful return After each yield
Typical devices GPIO relays, WiFi bulbs, simple actuators BLE sensors, I²C temperature probes State machines, combined patterns
Scheduling On-demand (per message) interval= or schedule= (cron) Manual via ctx.sleep() / ctx.sleep_until()
graph LR
    A[MQTT /set topic] -->|message| B[Handler function]
    B -->|return dict| C[Framework publishes to /state]
graph LR
    D[Hardware sensor] -->|read| E[Polling function]
    E -->|return dict| F[Framework publishes to /state]
graph LR
    A[MQTT /set topic] -->|command| B[Device coroutine]
    B -->|publish_state| C[MQTT /state topic]

Command & Control Devices

Command devices receive MQTT commands and publish state back. The @app.command decorator is the recommended approach — it registers a simple handler function that the framework calls on each inbound message.

@app.command("blind")  # (1)!
async def handle_blind(
    payload: str, ctx: cosalette.DeviceContext  # (2)!
) -> dict[str, object]:  # (3)!
    driver = ctx.adapter(VeluxPort)
    position = int(payload)
    await driver.set_position(position)
    return {"position": position}  # (4)!
  1. @app.command registers a handler for {prefix}/blind/set messages.
  2. payload is optional and injected by name from the MQTT message; ctx is injected by type annotation. Declare only what you need.
  3. Returning a dict auto-publishes to {prefix}/blind/state.
  4. No closure, no main loop, no nonlocal — just a function.

When to Use @app.device Instead

For devices that need full lifecycle control — periodic hardware polling, custom event loops, state machines, or combined command + telemetry behaviour — use @app.device. The handler must be an async generator: yield after each unit of work marks the reaction boundary.

@app.device("blind")  # (1)!
async def blind(ctx: cosalette.DeviceContext):  # (2)!
    driver = ctx.adapter(VeluxPort)

    async for cmd in ctx.commands(timeout=30):  # (3)!
        if cmd is None:
            status = await driver.poll_status()
            await ctx.publish_state(status)
        else:  # (4)!
            position = int(cmd.payload)
            await driver.set_position(position)
            await ctx.publish_state({"position": position})
        yield  # (5)!
  1. @app.device registers the function as a long-running async generator task.
  2. Return annotation is omitted — async generators do not return a value.
  3. ctx.commands(timeout=30) drives the loop — yields None every 30 seconds for periodic work, or a Command when one arrives on {prefix}/blind/set.
  4. Commands carry payload, topic, sub_topic, and timestamp fields.
  5. yield is the reaction boundary. The framework dispatches any registered @app.react reactors for state objects here, before the next loop iteration.

Async generator ownership

The framework creates one asyncio.Task per @app.device. The generator runs concurrently alongside other devices. When shutdown is signalled, the framework cancels the task; cancellation does not trigger reactor dispatch.

Command Routing

When a message arrives on {prefix}/blind/set, the framework's TopicRouter extracts the device name and dispatches to the registered handler (@app.command), the command queue (ctx.commands()), or callback (@ctx.on_command). Sub-topic commands ({prefix}/blind/calibrate/set) are routed to their specific handler. See MQTT Topics for the full topic layout.

Telemetry Devices

A telemetry device is a simple function that reads a sensor and returns a dict. The framework handles the polling schedule and MQTT publication.

The simplest form takes zero arguments:

@app.telemetry("temperature", interval=60)  # (1)!
async def temperature() -> dict[str, object]:
    reading = await read_i2c_sensor()  # (2)!
    return {"celsius": reading.temp, "humidity": reading.rh}  # (3)!
  1. Framework calls this function every 60 seconds.
  2. Your code reads the hardware (or adapter).
  3. The returned dict is JSON-serialised and published to {prefix}/temperature/state as a retained QoS 1 message.

When you need infrastructure access (adapters, settings, MQTT publishing), declare a ctx: DeviceContext parameter and the framework injects it:

@app.telemetry("temperature", interval=60)
async def temperature(ctx: cosalette.DeviceContext) -> dict[str, object]:
    sensor = ctx.adapter(SensorPort)
    return {"celsius": sensor.read_temp()}

Telemetry devices are normally poll-only, but adding triggerable=True makes them also respond to inbound MQTT commands — see the Triggerable Telemetry guide.

Telemetry Internals

Under the hood, @app.telemetry is syntactic sugar for a polling loop inside the framework:

# Simplified TelemetryRunner.run_telemetry (see _telemetry_runner.py)
async def run_telemetry(self, reg, ctx, error_publisher):
    last_published = None
    last_error_type = None
    while not ctx.shutdown_requested:
        try:
            result = await reg.func(ctx)
            if result is None:
                await ctx.sleep(reg.interval)
                continue
            strategy = reg.publish_strategy
            should_publish = (
                last_published is None          # First → always
                or strategy is None             # No strategy → always
                or strategy.should_publish(result, last_published)
            )
            if should_publish:
                await ctx.publish_state(result)
                last_published = result
                if strategy is not None:
                    strategy.on_published()
            if last_error_type is not None:
                last_error_type = None  # Recovery
        except asyncio.CancelledError:
            raise  # Let shutdown cancellation propagate
        except Exception as exc:
            if type(exc) is not last_error_type:
                await error_publisher.publish(exc, device=reg.name)
            last_error_type = type(exc)
        await ctx.sleep(reg.interval)

The framework wraps each telemetry call in error isolation with state-transition deduplication — the first error of each type is published, but repeated same-type errors are suppressed to prevent flooding. When the sensor recovers, the framework logs recovery and restores the device health status.

Publish Strategies

By default, the framework publishes every probe result. Publish strategies decouple the probing frequency from the publishing frequency — probe often, publish selectively:

from cosalette import Every, OnChange

@app.telemetry("temperature", interval=10, publish=Every(seconds=300))
async def temperature() -> dict[str, object]:
    return {"celsius": await read_sensor()}

Here, interval=10 means the sensor is probed every 10 seconds, but Every(seconds=300) ensures state is published at most once every 5 minutes. This is useful when you want responsive readings locally (e.g. for EWMA smoothing) but don't need to flood MQTT.

For threshold modes, composition operators, and the full strategy reference, see Publish Strategies.

Coalescing Groups

When multiple telemetry handlers share a physical resource — such as a serial bus, SPI interface, or rate-limited API — they can be grouped into a shared execution window using the group= parameter:

@app.telemetry("outdoor", interval=300, group="optolink")
async def outdoor(port: OptolinkPort) -> dict[str, object]:
    return await port.read_signals(["outdoor_temp"])

@app.telemetry("hotwater", interval=300, group="optolink")
async def hotwater(port: OptolinkPort) -> dict[str, object]:
    return await port.read_signals(["hot_water_temp"])

Handlers in the same group are managed by a shared tick-aligned scheduler. At t=0 all grouped handlers fire together; at subsequent ticks only those whose interval divides evenly into the elapsed time fire. This reduces resource sessions from N (one per handler) down to 1 per coinciding tick, eliminates timing drift, and enables adapter session sharing.

Each handler retains its own publish strategy, error isolation, persistence policy, and init function — group= is purely an execution scheduling hint.

See ADR-018 for the full design rationale.

Deferred enabled=

Sometimes a device should only be registered when the app's settings dictate it. All three decorator forms (@app.telemetry, @app.device, @app.command) accept enabled= as a callable that receives the resolved Settings instance and returns a bool:

@app.telemetry(
    "magnetometer",
    interval=lambda s: s.poll_interval,
    enabled=lambda s: s.enable_debug_device,  # resolved at bootstrap
)
async def magnetometer(mag: MagnetometerPort) -> dict[str, object]:
    reading = mag.read()
    return {"bx": reading.bx, "by": reading.by, "bz": reading.bz}

When enabled= is a callable, the framework defers the decision to the bootstrap phase — after settings are resolved — alongside interval= deferred resolution. Devices where the callable returns False are silently dropped from the registry before MQTT wiring begins.

This preserves the fully-declarative main.py style: every device is visible at module level, and no @app.on_configure boilerplate is needed just to conditionally register one device.

Imperative add_*() methods

app.add_telemetry(), app.add_device(), and app.add_command() only accept enabled: bool. Inside @app.on_configure, settings are already available, so a callable is unnecessary.

See ADR-038 for the full decision record.

Manual Telemetry Escape Hatch

Some sensors require complex polling logic — backoff, adaptive intervals, or multi-step reads. For these cases, use @app.device with a manual loop:

@app.device("complex_sensor")
async def complex_sensor(ctx: cosalette.DeviceContext):
    adapter = ctx.adapter(SensorPort)
    interval = 10.0

    while not ctx.shutdown_requested:
        try:
            data = await adapter.read()
            await ctx.publish_state(data)
            interval = 10.0  # reset on success
        except SensorTimeoutError:
            interval = min(interval * 2, 300)  # exponential backoff
        yield
        await ctx.sleep(interval)

When to use which

Use @app.telemetry for straightforward read-and-return sensors. Use @app.device when you need custom error handling, adaptive intervals, or inbound command support alongside telemetry.

When to Use Which

Use this decision matrix to choose the right decorator:

Need Decorator
React to MQTT commands, publish state @app.command
Poll a sensor on a fixed interval @app.telemetry
Poll often, publish selectively @app.telemetry + publish=
Suppress duplicate readings @app.telemetry + OnChange()
On-demand refresh + polling fallback @app.telemetry + triggerable=True
Hardware-fired callbacks (BLE, serial, HID) @app.stream
Command + periodic hardware polling @app.telemetry + @app.command or @app.device
Custom event loop or state machine @app.device (escape hatch)
Time-of-day-aligned polling (e.g. 06:00) @app.telemetry + schedule= or @app.device + ctx.sleep_until()
Adaptive intervals or backoff @app.device (manual loop)

@app.command and @app.telemetry are the recommended decorators for the vast majority of devices. With publish strategies, @app.telemetry now covers use cases that previously required @app.device — like polling frequently but publishing only on change. Use @app.device only when you need capabilities that the simpler decorators cannot provide (adaptive intervals, state machines, or combined command + telemetry behaviour).

Choosing an Archetype

Use this decision tree to find the right decorator for your device:

graph TD
    Start([New device]) --> Q1{Receives MQTT<br/>commands?}

    Q1 -->|No| Q2{Polls on a<br/>fixed interval?}
    Q2 -->|Yes| T(["@app.telemetry"])
    Q2 -->|No| Q2b{Hardware fires<br/>callbacks?}
    Q2b -->|Yes| S(["@app.stream"])
    Q2b -->|No| D1(["@app.device"])

    Q1 -->|Yes| Q1a{On-demand refresh<br/>of polled data?}
    Q1a -->|Yes| TT(["@app.telemetry +<br/>triggerable=True"])
    Q1a -->|No| Q3{Also needs<br/>periodic polling?}
    Q3 -->|No| C(["@app.command"])
    Q3 -->|Yes| Q4{Needs telemetry features?<br/>publish strategies,<br/>persistence, coalescing}
    Q4 -->|Yes| TC(["@app.telemetry +<br/>@app.command"])
    Q4 -->|No| D2(["@app.device with<br/>@ctx.on_command"])

    style T fill:#FFC105,color:#000000
    style D1 fill:#FFC105,color:#000000
    style C fill:#FFC105,color:#000000
    style TT fill:#FFC105,color:#000000
    style TC fill:#FFC105,color:#000000
    style D2 fill:#FFC105,color:#000000
    style S fill:#FFC105,color:#000000
@app.command
WiFi smart plug, GPIO relay
@app.telemetry
BLE thermometer, I²C humidity sensor
@app.telemetry + @app.command
Hot water controller with periodic temp reads and target temp commands (see ADR-019)
@app.device
Complex state machine, sensor with adaptive backoff, custom event loop

Mixed Applications

Most real bridges combine multiple archetypes:

app = cosalette.App(name="home2mqtt", version="1.0.0")

@app.command("relay")
async def handle_relay(
    payload: str, ctx: cosalette.DeviceContext
) -> dict[str, object]:
    """Bidirectional: accepts on/off commands, returns state."""
    ...

@app.telemetry("outdoor_temp", interval=120)
async def outdoor_temp() -> dict[str, object]:
    """Unidirectional: reads a BLE thermometer every 2 minutes."""
    ...

@app.telemetry("indoor_temp", interval=60)
async def indoor_temp(ctx: cosalette.DeviceContext) -> dict[str, object]:
    """Unidirectional: reads an I²C sensor every minute (uses ctx for adapter)."""
    ...

app.run()

Error Isolation

Each device runs in its own asyncio.Task with independent error boundaries. A crash in one device does not take down others:

  • Command (@app.command): if the handler raises, the error is logged and published to the error topic. Subsequent commands are dispatched normally.
  • Device (@app.device): if the coroutine raises, the error is logged and published to the device's error topic. Other devices continue running.
  • Telemetry: if one polling cycle raises, the error is published and the next cycle runs on schedule.

This isolation is fundamental to daemon reliability — a flaky BLE sensor should never prevent an actuator motor from responding to commands.

CancelledError is special

asyncio.CancelledError is not caught by the error isolation layer. It propagates normally to allow graceful shutdown via task cancellation.

Naming Constraints

Device names must be unique within each registration type. Two telemetry registrations or two command registrations cannot share the same name, because they would conflict on the same MQTT topic suffix.

However, a @app.telemetry and a @app.command registration can share the same name — they publish to different MQTT suffixes (/state vs /set) and the framework creates a shared DeviceContext for both. This enables the ADR-002 topic layout where a single device segment holds both state and command topics:

import cosalette

@app.telemetry("hot_water", interval=30)
async def read_temps(ctx: cosalette.DeviceContext) -> dict[str, object]: ...

@app.command("hot_water")  # Same name — allowed (telemetry + command)
async def set_temp(payload: str, ctx: cosalette.DeviceContext) -> dict[str, object]: ...

# Result:
#   {app}/hot_water/state   ← telemetry publishes here
#   {app}/hot_water/set     ← command subscribes here

@app.device registrations remain globally unique — the device archetype already handles both state and commands, so collisions with any other type are rejected:

@app.device("sensor")
async def sensor_loop(ctx: cosalette.DeviceContext): ...

@app.telemetry("sensor", interval=10)  # ValueError: name conflicts with device registration
async def sensor_data(ctx: cosalette.DeviceContext) -> dict[str, object]: ...

See ADR-019 for the full decision record.

Device names are used as MQTT topic segments ({prefix}/{name}/state) and must be unambiguous within their topic suffix.

The Read/Write Split Pattern

When @app.telemetry and @app.command share a device name they model a resource with distinct read and write paths — the telemetry handler produces state, the command handler accepts mutations. This is the correct cosalette pattern for bidirectional devices where reading and writing require different code paths.

import cosalette

@app.telemetry("gas_counter", interval=60, triggerable=True)
async def read_counter(ctx: cosalette.DeviceContext) -> dict[str, object]:
    """Read impulse count; also fires on demand when /set receives a message."""
    return {"impulses": ctx.adapter(GasMeterPort).read_impulses()}


@app.command("gas_counter")   # same name — distinct MQTT suffix
async def write_counter(
    payload: str, ctx: cosalette.DeviceContext
) -> dict[str, object]:
    """Accept counter reset or offset mutations."""
    await ctx.adapter(GasMeterPort).set_offset(int(payload))
    return {"impulses": ctx.adapter(GasMeterPort).read_impulses()}

Topic layout:

Topic Direction Handler
{prefix}/gas_counter/state outbound telemetry publishes
{prefix}/gas_counter/set inbound command subscribes

This is different from triggerable=True alone — triggerable=True causes a message on /set to re-fire the read handler immediately (no mutation). The read/write split uses @app.command for mutations and keeps the telemetry handler as a pure reader.

For a full walkthrough and contract metadata examples, see the Contract-First Route Design guide.

Root Devices (Unnamed)

When name is omitted, the device publishes to root-level topics — {prefix}/state instead of {prefix}/{device}/state. This is ideal for single-device apps where a device segment would be redundant:

# Named device — publishes to weather2mqtt/sensor/state
@app.telemetry("sensor", interval=30)
async def sensor() -> dict[str, object]: ...

# Root device — publishes to weather2mqtt/state
@app.telemetry(interval=30)
async def sensor() -> dict[str, object]: ...

At most one root device is allowed per app. Mixing root and named devices is supported but discouraged — the framework logs a warning.


Streaming handlers

@app.stream is a managed-lifecycle decorator for hardware that delivers data via push callbacks — BLE characteristic notifications, serial port events, HID input reports, USB bulk transfers. The framework owns the port lifecycle; the handler iterates a Stream[T] and may inject DeviceContext, DeviceStore, and a capability-limited proxy under the concrete adapter type alongside it. Non-lifecycle attributes and methods on the proxy forward to the real adapter, but open(), close(), start_scan(), and stop_scan() raise AttributeError because lifecycle belongs to the framework. AppHarness.inject_stream() bypasses this proxy and may inject raw test instances.

When to use @app.stream

Need Primitive
Callback-based hardware with managed port lifecycle @app.stream
Publish MQTT from a stream handler @app.stream + DeviceContext injection
Persist state across restarts @app.stream + DeviceStore injection
Full port control, multiple streams, or inbound MQTT commands @app.device (manual)

Minimal example

from cosalette import Stream, StreamablePort
from myapp.models import Barcode

app.adapter(StreamablePort[Barcode], lambda: UsbScannerAdapter(device="/dev/hidraw0"))


@app.stream("barcode-scanner")
async def handle_scans(stream: Stream[Barcode]):
    async for barcode in stream:
        await process_barcode(barcode)
        yield

The framework calls await port.open(), port.register_callback(stream.put), and await port.start_scan() before invoking the handler. On shutdown, it calls stream.shutdown(), then await port.stop_scan() and await port.close().

Stateful example

Handlers may also declare DeviceContext (to publish MQTT) and DeviceStore (to persist state). DeviceStore requires the app to be configured with a store backend (App(store=...)):

from collections.abc import AsyncIterator

from cosalette import DeviceContext, DeviceStore, Stream
from myapp.models import SensorReading

app.adapter(StreamablePort[SensorReading], lambda: BleAdapter("AA:BB:CC:DD"))


@app.stream("ble-sensor")
async def handle_readings(
    stream: Stream[SensorReading],
    ctx: DeviceContext,
    store: DeviceStore,
) -> AsyncIterator[None]:
    registry.restore_from(store)

    async for reading in stream:
        result = registry.record(reading)
        if result.is_new:
            await ctx.publish_state({"sensor": result.name, "value": reading.value})
        store["last_seen"] = reading.sensor_id
        yield  # reaction boundary

Testing

AppHarness.inject_stream feeds items directly into the handler's stream, bypassing the hardware adapter. It provides production-equivalent DI:

# Stateless handler
await harness.inject_stream("barcode-scanner", barcode, shutdown=True)

# Stateful handler — store is auto-created from app._store when configured
await harness.inject_stream("ble-sensor", reading)

Pass ctx=, store=, providers=, or adapters= to override specific DI providers for the test.

Exception isolation

@app.stream handlers run in their own asyncio.Task. Uncaught exceptions are logged and the task exits. Unlike @app.device and @app.telemetry, they are not published to the device error topic. asyncio.CancelledError is never caught.

See the Using @app.stream guide for step-by-step setup, DI patterns, and testing.


Periodic Companion Tasks

@app.periodic registers a background coroutine that runs on a fixed interval with no MQTT output. It is not a device archetype in the MQTT sense — it has no topic ownership, no DeviceContext, and no publish strategy — but it frequently accompanies device handlers as a side-effect partner.

When to use @app.periodic

Need Primitive
Publish sensor data to MQTT @app.telemetry
MQTT command + state @app.command or @app.device
Side-effect task with no MQTT output @app.periodic

Typical uses: write-buffer flushing, watchdog pings, cache warming, LED state synchronisation, background database sync.

Companion pattern: flush buffer alongside telemetry

The most common @app.periodic use case is a telemetry handler that accumulates readings into an in-process buffer, with a periodic task that flushes the buffer to an upstream API on a longer cadence:

app = cosalette.App(name="sensor-bridge", version="1.0.0")


@app.state
def reading_buffer() -> ReadingBuffer:
    return ReadingBuffer(capacity=100)


@app.telemetry("temperature", interval=10.0)  # (1)!
async def read_temperature(
    sensor: SensorPort,
    buf: ReadingBuffer,
) -> dict[str, object]:
    reading = await sensor.read()
    buf.append(reading)
    return {"celsius": reading.temp}


@app.periodic("upstream-flush", interval=300.0)  # (2)!
async def flush_upstream(buf: ReadingBuffer) -> None:
    if buf.pending_count() > 0:
        await buf.flush_to_api()
  1. Telemetry publishes to MQTT every 10 s for Home Assistant / MQTT consumers.
  2. The periodic task flushes accumulated readings to the upstream API every 5 minutes — independently of the MQTT cadence.

Both handlers share ReadingBuffer via @app.state DI. Neither owns the other's timing. Exception isolation is independent: a flush failure logs an error and retries next cycle; it does not affect MQTT telemetry.

Exception isolation

Periodic tasks catch all exceptions except asyncio.CancelledError, log at ERROR level, and continue the loop. There is no MQTT error topic for periodic tasks — unlike @app.telemetry and @app.device, which publish structured errors to {prefix}/{name}/error.

Shutdown behaviour

Periodic tasks are cancelled during Phase 4 (Teardown) with a 5-second grace period. The framework waits for running handlers to complete before logging a timeout warning if any exceed the grace period.

See the Periodic Tasks guide for the full API and testing patterns, and ADR-041 for the design rationale.


See Also