Device Archetypes¶
Cosalette recognises three device archetypes. Every device in an IoT-to-MQTT bridge falls into one of these categories — or can be expressed as a composition of them.
Device Archetypes¶
| Aspect | Command (@app.command) |
Telemetry (@app.telemetry) |
Device (@app.device) |
|---|---|---|---|
| Direction | Bidirectional | Unidirectional (device → broker) | Bidirectional or unidirectional |
| Execution model | Per-message dispatch | Framework-managed polling loop | Long-running coroutine |
| Inbound commands | Automatic — handler receives them | Not applicable | ctx.commands() or @ctx.on_command |
| State publishing | Automatic — return a dict |
Automatic — return a dict |
Manual via ctx.publish_state() |
| Publish control | Not applicable | publish= strategies |
Manual (your loop logic) |
| Typical devices | GPIO relays, WiFi bulbs, simple actuators | BLE sensors, I²C temperature probes | State machines, combined patterns |
graph LR
A[MQTT /set topic] -->|message| B[Handler function]
B -->|return dict| C[Framework publishes to /state]
graph LR
D[Hardware sensor] -->|read| E[Polling function]
E -->|return dict| F[Framework publishes to /state]
graph LR
A[MQTT /set topic] -->|command| B[Device coroutine]
B -->|publish_state| C[MQTT /state topic]
Command & Control Devices¶
Command devices receive MQTT commands and publish state back. The @app.command
decorator is the recommended approach — it registers a simple handler function
that the framework calls on each inbound message.
@app.command("blind") # (1)!
async def handle_blind(
payload: str, ctx: cosalette.DeviceContext # (2)!
) -> dict[str, object]: # (3)!
driver = ctx.adapter(VeluxPort)
position = int(payload)
await driver.set_position(position)
return {"position": position} # (4)!
@app.commandregisters a handler for{prefix}/blind/setmessages.payloadis optional and injected by name from the MQTT message;ctxis injected by type annotation. Declare only what you need.- Returning a
dictauto-publishes to{prefix}/blind/state. - No closure, no main loop, no
nonlocal— just a function.
When to Use @app.device Instead¶
For devices that need a long-running coroutine — periodic hardware polling,
custom event loops, state machines, or combined command + telemetry behaviour —
use @app.device with ctx.commands():
@app.device("blind") # (1)!
async def blind(ctx: cosalette.DeviceContext) -> None:
driver = ctx.adapter(VeluxPort)
async for cmd in ctx.commands(timeout=30): # (2)!
if cmd is None:
status = await driver.poll_status()
await ctx.publish_state(status)
else: # (3)!
position = int(cmd.payload)
await driver.set_position(position)
await ctx.publish_state({"position": position})
@app.deviceregisters the function as a long-running coroutine.ctx.commands(timeout=30)drives the loop — yieldsNoneevery 30 seconds for periodic work, or aCommandwhen one arrives on{prefix}/blind/set.- Commands carry
payload,topic,sub_topic, andtimestampfields.
Coroutine ownership
The framework creates one asyncio.Task per @app.device. Your coroutine runs
concurrently alongside other devices. When shutdown is signalled, the
framework cancels the task after the current iteration completes.
Command Routing¶
When a message arrives on {prefix}/blind/set, the framework's
TopicRouter extracts the device name and dispatches to the registered
handler (@app.command), the command queue (ctx.commands()), or callback
(@ctx.on_command). Sub-topic commands ({prefix}/blind/calibrate/set)
are routed to their specific handler. See MQTT Topics for
the full topic layout.
Telemetry Devices¶
A telemetry device is a simple function that reads a sensor and returns a dict. The framework handles the polling schedule and MQTT publication.
The simplest form takes zero arguments:
@app.telemetry("temperature", interval=60) # (1)!
async def temperature() -> dict[str, object]:
reading = await read_i2c_sensor() # (2)!
return {"celsius": reading.temp, "humidity": reading.rh} # (3)!
- Framework calls this function every 60 seconds.
- Your code reads the hardware (or adapter).
- The returned dict is JSON-serialised and published to
{prefix}/temperature/stateas a retained QoS 1 message.
When you need infrastructure access (adapters, settings, MQTT publishing), declare
a ctx: DeviceContext parameter and the framework injects it:
@app.telemetry("temperature", interval=60)
async def temperature(ctx: cosalette.DeviceContext) -> dict[str, object]:
sensor = ctx.adapter(SensorPort)
return {"celsius": sensor.read_temp()}
Telemetry Internals¶
Under the hood, @app.telemetry is syntactic sugar for a polling loop inside
the framework:
# Simplified TelemetryRunner.run_telemetry (see _telemetry_runner.py)
async def run_telemetry(self, reg, ctx, error_publisher):
last_published = None
last_error_type = None
while not ctx.shutdown_requested:
try:
result = await reg.func(ctx)
if result is None:
await ctx.sleep(reg.interval)
continue
strategy = reg.publish_strategy
should_publish = (
last_published is None # First → always
or strategy is None # No strategy → always
or strategy.should_publish(result, last_published)
)
if should_publish:
await ctx.publish_state(result)
last_published = result
if strategy is not None:
strategy.on_published()
if last_error_type is not None:
last_error_type = None # Recovery
except asyncio.CancelledError:
raise # Let shutdown cancellation propagate
except Exception as exc:
if type(exc) is not last_error_type:
await error_publisher.publish(exc, device=reg.name)
last_error_type = type(exc)
await ctx.sleep(reg.interval)
The framework wraps each telemetry call in error isolation with state-transition deduplication — the first error of each type is published, but repeated same-type errors are suppressed to prevent flooding. When the sensor recovers, the framework logs recovery and restores the device health status.
Publish Strategies¶
By default, the framework publishes every probe result. Publish strategies decouple the probing frequency from the publishing frequency — probe often, publish selectively:
from cosalette import Every, OnChange
@app.telemetry("temperature", interval=10, publish=Every(seconds=300))
async def temperature() -> dict[str, object]:
return {"celsius": await read_sensor()}
Here, interval=10 means the sensor is probed every 10 seconds, but
Every(seconds=300) ensures state is published at most once every 5
minutes. This is useful when you want responsive readings locally (e.g. for
EWMA smoothing) but don't need to flood MQTT.
For threshold modes, composition operators, and the full strategy reference, see Publish Strategies.
Coalescing Groups¶
When multiple telemetry handlers share a physical resource — such as a serial bus,
SPI interface, or rate-limited API — they can be grouped into a shared execution
window using the group= parameter:
@app.telemetry("outdoor", interval=300, group="optolink")
async def outdoor(port: OptolinkPort) -> dict[str, object]:
return await port.read_signals(["outdoor_temp"])
@app.telemetry("hotwater", interval=300, group="optolink")
async def hotwater(port: OptolinkPort) -> dict[str, object]:
return await port.read_signals(["hot_water_temp"])
Handlers in the same group are managed by a shared tick-aligned scheduler. At t=0 all grouped handlers fire together; at subsequent ticks only those whose interval divides evenly into the elapsed time fire. This reduces resource sessions from N (one per handler) down to 1 per coinciding tick, eliminates timing drift, and enables adapter session sharing.
Each handler retains its own publish strategy, error isolation, persistence policy,
and init function — group= is purely an execution scheduling hint.
See ADR-018 for the full design rationale.
Manual Telemetry Escape Hatch¶
Some sensors require complex polling logic — backoff, adaptive intervals,
or multi-step reads. For these cases, use @app.device with a manual loop:
@app.device("complex_sensor")
async def complex_sensor(ctx: cosalette.DeviceContext) -> None:
adapter = ctx.adapter(SensorPort)
interval = 10.0
while not ctx.shutdown_requested:
try:
data = await adapter.read()
await ctx.publish_state(data)
interval = 10.0 # reset on success
except SensorTimeoutError:
interval = min(interval * 2, 300) # exponential backoff
await ctx.sleep(interval)
When to use which
Use @app.telemetry for straightforward read-and-return sensors.
Use @app.device when you need custom error handling, adaptive intervals,
or inbound command support alongside telemetry.
When to Use Which¶
Use this decision matrix to choose the right decorator:
| Need | Decorator |
|---|---|
| React to MQTT commands, publish state | @app.command ✓ |
| Poll a sensor on a fixed interval | @app.telemetry ✓ |
| Poll often, publish selectively | @app.telemetry + publish= ✓ |
| Suppress duplicate readings | @app.telemetry + OnChange() ✓ |
| Command + periodic hardware polling | @app.telemetry + @app.command or @app.device |
| Custom event loop or state machine | @app.device (escape hatch) |
| Adaptive intervals or backoff | @app.device (manual loop) |
@app.command and @app.telemetry are the recommended decorators for the
vast majority of devices. With publish strategies, @app.telemetry now covers
use cases that previously required @app.device — like polling frequently but
publishing only on change. Use @app.device only when you need capabilities
that the simpler decorators cannot provide (adaptive intervals, state machines,
or combined command + telemetry behaviour).
Choosing an Archetype¶
Use this decision tree to find the right decorator for your device:
graph TD
Start([New device]) --> Q1{Receives MQTT<br/>commands?}
Q1 -->|No| Q2{Polls on a<br/>fixed interval?}
Q2 -->|Yes| T(["@app.telemetry"])
Q2 -->|No| D1(["@app.device"])
Q1 -->|Yes| Q3{Also needs<br/>periodic polling?}
Q3 -->|No| C(["@app.command"])
Q3 -->|Yes| Q4{Needs telemetry features?<br/>publish strategies,<br/>persistence, coalescing}
Q4 -->|Yes| TC(["@app.telemetry +<br/>@app.command"])
Q4 -->|No| D2(["@app.device with<br/>@ctx.on_command"])
style T fill:#2FB170,color:#fff
style D1 fill:#2FB170,color:#fff
style C fill:#2FB170,color:#fff
style TC fill:#2FB170,color:#fff
style D2 fill:#2FB170,color:#fff
@app.command- WiFi smart plug, GPIO relay
@app.telemetry- BLE thermometer, I²C humidity sensor
@app.telemetry+@app.command- Hot water controller with periodic temp reads and target temp commands (see ADR-019)
@app.device- Complex state machine, sensor with adaptive backoff, custom event loop
Mixed Applications¶
Most real bridges combine multiple archetypes:
app = cosalette.App(name="home2mqtt", version="1.0.0")
@app.command("relay")
async def handle_relay(
payload: str, ctx: cosalette.DeviceContext
) -> dict[str, object]:
"""Bidirectional: accepts on/off commands, returns state."""
...
@app.telemetry("outdoor_temp", interval=120)
async def outdoor_temp() -> dict[str, object]:
"""Unidirectional: reads a BLE thermometer every 2 minutes."""
...
@app.telemetry("indoor_temp", interval=60)
async def indoor_temp(ctx: cosalette.DeviceContext) -> dict[str, object]:
"""Unidirectional: reads an I²C sensor every minute (uses ctx for adapter)."""
...
app.run()
Error Isolation¶
Each device runs in its own asyncio.Task with independent error boundaries.
A crash in one device does not take down others:
- Command (
@app.command): if the handler raises, the error is logged and published to the error topic. Subsequent commands are dispatched normally. - Device (
@app.device): if the coroutine raises, the error is logged and published to the device's error topic. Other devices continue running. - Telemetry: if one polling cycle raises, the error is published and the next cycle runs on schedule.
This isolation is fundamental to daemon reliability — a flaky BLE sensor should never prevent an actuator motor from responding to commands.
CancelledError is special
asyncio.CancelledError is not caught by the error isolation layer.
It propagates normally to allow graceful shutdown via task cancellation.
Naming Constraints¶
Device names must be unique within each registration type. Two telemetry registrations or two command registrations cannot share the same name, because they would conflict on the same MQTT topic suffix.
However, a @app.telemetry and a @app.command registration can share the
same name — they publish to different MQTT suffixes (/state vs /set) and the
framework creates a shared DeviceContext for both. This enables the ADR-002
topic layout where a single device segment holds both state and command topics:
import cosalette
@app.telemetry("hot_water", interval=30)
async def read_temps(ctx: cosalette.DeviceContext) -> dict[str, object]: ...
@app.command("hot_water") # Same name — allowed (telemetry + command)
async def set_temp(payload: str, ctx: cosalette.DeviceContext) -> dict[str, object]: ...
# Result:
# {app}/hot_water/state ← telemetry publishes here
# {app}/hot_water/set ← command subscribes here
@app.device registrations remain globally unique — the device archetype already
handles both state and commands, so collisions with any other type are rejected:
@app.device("sensor")
async def sensor_loop(ctx: cosalette.DeviceContext) -> None: ...
@app.telemetry("sensor", interval=10) # ValueError: name conflicts with device registration
async def sensor_data(ctx: cosalette.DeviceContext) -> dict[str, object]: ...
See ADR-019 for the full decision record.
Device names are used as MQTT topic segments ({prefix}/{name}/state) and must
be unambiguous within their topic suffix.
Root Devices (Unnamed)¶
When name is omitted, the device publishes to root-level topics —
{prefix}/state instead of {prefix}/{device}/state. This is ideal
for single-device apps where a device segment would be redundant:
# Named device — publishes to weather2mqtt/sensor/state
@app.telemetry("sensor", interval=30)
async def sensor() -> dict[str, object]: ...
# Root device — publishes to weather2mqtt/state
@app.telemetry(interval=30)
async def sensor() -> dict[str, object]: ...
At most one root device is allowed per app. Mixing root and named devices is supported but discouraged — the framework logs a warning.
See Also¶
- Architecture — composition root and registration API
- MQTT Topics — topic layout for state, commands, and errors
- Error Handling — structured error payloads per device
- Lifecycle — when devices start, run, and stop
- Testing — testing device functions with
DeviceContextfixtures - Publish Strategies — publishing control concepts
- Signal Filters — handler-level data transformations
- ADR-010 — Device Archetypes
- ADR-013 — Telemetry Publish Strategies