Build a Telemetry Device¶
Telemetry devices are the most simple archetype in cosalette. They poll a sensor at a fixed interval and publish a JSON state message — the framework handles the timing loop, serialisation, and error isolation for you.
Prerequisites
This guide assumes you've completed the Quickstart.
How Telemetry Works¶
The @app.telemetry decorator registers a function that:
- Optionally receives a
DeviceContextor other injectable parameters. - Returns a dict — the framework JSON-serialises it and publishes to
{prefix}/{name}/state. - Runs on a fixed interval — the framework calls
await ctx.sleep(interval)between invocations under the hood. This is the probing frequency. - Optionally uses a publish strategy (
publish=) to control which probe results are actually published — decoupling probing from publishing. - Can return
Noneto suppress a single cycle. - Is error-isolated — if one poll raises an exception, the framework logs the error, publishes it to the error topic, and continues the loop. A single bad reading never stops the daemon.
This is the return-dict contract: your function produces data, the framework
handles delivery. Compare this to @app.device where you own the main loop and
call ctx.publish_state() manually (see
Command & Control Device).
Under the hood
The framework wraps your telemetry function in a loop roughly equivalent to:
strategy = ... # from the publish= parameter, or None
last_published = None
last_error_type = None
while not ctx.shutdown_requested:
try:
result = await your_function(ctx)
if result is None:
await ctx.sleep(interval)
continue
should_publish = (
last_published is None # First → always
or strategy is None # No strategy → always
or strategy.should_publish(result, last_published)
)
if should_publish:
await ctx.publish_state(result)
last_published = result
if strategy is not None:
strategy.on_published()
if last_error_type is not None:
log_recovery()
last_error_type = None
except Exception as exc:
if type(exc) is not last_error_type:
log_and_publish_error(exc)
last_error_type = type(exc)
await ctx.sleep(interval)
When triggerable=True, the await ctx.sleep(interval) is replaced with a
combined wait that also listens for MQTT trigger messages — whichever arrives
first wakes the handler.
You never write this loop yourself — that's the task of the framework.
A Minimal Telemetry Device¶
The simplest telemetry handler takes zero arguments — just return a dict:
import cosalette
app = cosalette.App(name="gas2mqtt", version="1.0.0")
@app.telemetry("counter", interval=60) # (1)!
async def counter() -> dict[str, object]: # (2)!
"""Read the gas meter impulse count."""
return {"impulses": 42} # (3)!
app.run()
"counter"is the device name — it determines the MQTT topic:gas2mqtt/counter/state.interval=60means polling every 60 seconds.- Zero-arg handlers are valid. The framework injects nothing — your function
just returns data. You can also request
ctx: DeviceContextif needed. - The returned dict is published as
{"impulses": 42}togas2mqtt/counter/statewithretain=Trueandqos=1.
When you run this, the framework:
- Connects to the MQTT broker.
- Calls
counter()every 60 seconds. - Publishes the returned dict as JSON to
gas2mqtt/counter/state. - Keeps running until
SIGTERMorSIGINT.
Single-Device Apps (Root Device)¶
When your app has only one device, you can omit the device name entirely.
The framework publishes directly to root-level topics — no /{device}/
segment:
import cosalette
app = cosalette.App(name="weather2mqtt", version="1.0.0")
@app.telemetry(interval=30) # (1)!
async def read_sensor() -> dict[str, object]:
"""Read weather station sensors."""
return {"temperature": 21.5, "humidity": 58.0}
app.run()
- No device name — the function name
read_sensoris used internally for logging. The MQTT topic isweather2mqtt/state(no device segment).
Topic layout:
| Pattern | Named device | Root device |
|---|---|---|
| State | weather2mqtt/sensor/state |
weather2mqtt/state |
| Availability | weather2mqtt/sensor/availability |
weather2mqtt/availability |
| Error | weather2mqtt/sensor/error |
(global only) |
One root device per app
An app can have at most one root (unnamed) device. Registering a
second raises ValueError. You can mix one root device with named
devices, but the framework logs a warning — this combination is unusual
and may indicate a design issue.
Using DeviceContext¶
When your handler needs infrastructure access, declare a ctx: DeviceContext
parameter — the framework injects it automatically:
@app.telemetry("counter", interval=60)
async def counter(ctx: cosalette.DeviceContext) -> dict[str, object]:
settings = ctx.settings # (1)!
device_name = ctx.name # (2)!
clock_value = ctx.clock.now() # (3)!
return {"impulses": 42, "read_at": clock_value}
- Access the application
Settingsinstance (or your custom subclass). - The device name as registered —
"counter"in this case. - The monotonic clock port — useful for timing calculations. In tests, this is a
FakeClockyou control directly.
DeviceContext vs AppContext
Telemetry and device functions can request DeviceContext, which has publish,
sleep, and on_command capabilities. The lifespan function receives AppContext,
which only has .settings and .adapter(). Don't mix them up — see
Lifespan for details.
Resolving Adapters¶
When your telemetry device needs hardware access, use the adapter pattern:
from gas2mqtt.ports import GasMeterPort
@app.telemetry("counter", interval=60)
async def counter(ctx: cosalette.DeviceContext) -> dict[str, object]:
meter = ctx.adapter(GasMeterPort) # (1)!
reading = meter.read_impulses()
return {"impulses": reading}
- Resolves the adapter registered for
GasMeterPort. RaisesLookupErrorif no adapter is registered. See Hardware Adapters for registration.
Multiple Sensors in One App¶
A single app can register multiple telemetry devices, each with its own interval:
import cosalette
from gas2mqtt.ports import GasMeterPort
app = cosalette.App(name="gas2mqtt", version="1.0.0")
@app.telemetry("counter", interval=60)
async def counter(ctx: cosalette.DeviceContext) -> dict[str, object]:
"""Read impulse count every 60 seconds."""
meter = ctx.adapter(GasMeterPort)
return {"impulses": meter.read_impulses()}
@app.telemetry("temperature", interval=30)
async def temperature(ctx: cosalette.DeviceContext) -> dict[str, object]:
"""Read the meter's temperature sensor every 30 seconds."""
meter = ctx.adapter(GasMeterPort)
return {"celsius": meter.read_temperature()}
app.run()
Each telemetry device runs as an independent asyncio task. They share the same MQTT
connection and adapter instances, but their polling loops are completely independent.
If temperature fails, counter keeps running.
Topic layout:
| Device | Topic | Interval |
|---|---|---|
counter |
gas2mqtt/counter/state |
60 s |
temperature |
gas2mqtt/temperature/state |
30 s |
Many similar devices?
When managing a fleet of similar sensors (e.g. 10 BLE thermometers),
manually duplicating decorators doesn't scale. Use dict-name decorators
(name=lambda s: {...}) to register multiple devices from a single handler,
optionally driven by configuration.
See Multi-Device Registration for the full pattern.
Imperative Registration¶
The @app.telemetry decorator works great when the handler is defined in the same
module as the App. When the handler lives in a separate module — a sensor
library, a shared utility, or a generated function — the decorator forces you to
write a pass-through wrapper:
from my_sensors import read_temperature
@app.telemetry("temperature", interval=30)
async def temperature(ctx: cosalette.DeviceContext) -> dict[str, object]:
return await read_temperature(ctx) # just forwarding
The app.add_telemetry() method eliminates the wrapper — register the imported
function directly:
from my_sensors import read_temperature
app.add_telemetry("temperature", read_temperature, interval=30)
Full Signature¶
app.add_telemetry(
name, # device name (always required — no root device)
func, # async callable returning dict | None
*,
interval=0.0, # polling interval in seconds (required unless schedule=)
schedule=None, # cron string or CronSchedule (mutually exclusive with interval=)
schedule_spec=None, # per-device callable CronSpec — (per_device_config) -> str | CronSchedule (use with name=callable)
publish=None, # optional PublishStrategy
persist=None, # optional PersistPolicy
init=None, # optional synchronous factory
enabled=True, # False to skip registration entirely
group=None, # coalescing group name (requires interval=, not schedule=)
triggerable=False, # listen for MQTT triggers on {prefix}/{device}/set
retry=0, # max retry attempts (0 = disabled)
retry_on=None, # exception types to retry on
backoff=None, # BackoffStrategy (default: ExponentialBackoff)
circuit_breaker=None, # optional CircuitBreaker
)
All keyword parameters behave identically to the decorator form.
Using init=¶
init= works the same way as the decorator — pass a synchronous factory whose
return value is injected by type:
from my_sensors import read_temperature
from cosalette.filters import Pt1Filter
def make_filter() -> Pt1Filter:
return Pt1Filter(tau=5.0, dt=10.0)
app.add_telemetry(
"temperature",
read_temperature,
interval=10,
publish=cosalette.OnChange(threshold=0.5),
init=make_filter,
)
Choosing Between Decorator and Imperative¶
| Scenario | Preferred style |
|---|---|
| Handler defined inline, same file | @app.telemetry decorator |
| Handler imported from another module | app.add_telemetry() |
| Handler generated dynamically (factory) | app.add_telemetry() |
| Registering in a loop | app.add_telemetry() |
/// admonition | Identical validation type: info
Both paths run the same registration logic — signature validation,
init= type-collision checks, and duplicate-name detection happen
identically whether you use the decorator or add_telemetry().
///
/// admonition | Named devices only type: warning
add_telemetry() always requires a device name — root (unnamed)
devices can only be created via the decorator.
///
Conditional Registration¶
Use enabled= to skip registration based on a settings flag — no if block needed:
settings = app.settings
if settings.enable_temperature:
@app.telemetry("temperature", interval=30)
async def temperature() -> dict[str, object]:
return {"celsius": read_temp()}
settings = app.settings
@app.telemetry("temperature", interval=30, enabled=settings.enable_temperature)
async def temperature() -> dict[str, object]:
return {"celsius": read_temp()}
The imperative form works identically:
/// admonition | Disabled devices are invisible type: info
When enabled=False, the device is not registered at all — it won't
appear in MQTT topics, won't reserve a name slot, and won't consume
resources at runtime.
///
Publish Strategies¶
By default, every probe result is published to MQTT. Publish strategies let you
decouple the probing frequency from the publishing frequency — the handler runs on
interval, but only selected results are actually sent.
Basic Usage¶
from cosalette import Every, OnChange
@app.telemetry("temperature", interval=10, publish=Every(seconds=300))
async def temperature() -> dict[str, object]:
"""Probe every 10s, publish at most once every 5 minutes."""
return {"celsius": await read_sensor()}
Without publish=, the behaviour is exactly as before — every result is published.
Available Strategies¶
| Strategy | Publishes when… |
|---|---|
Every(seconds=N) |
At least N seconds elapsed since last publish |
Every(n=N) |
Every N-th probe result |
OnChange() |
The payload differs from the last published payload |
OnChange(threshold=T) |
Any numeric leaf field changed by more than T |
OnChange(threshold={…}) |
Per-field numeric thresholds (dot-notation for nested) |
Composing Strategies¶
Combine strategies with | (OR) and & (AND):
# Publish on change OR every 5 minutes (heartbeat guarantee)
@app.telemetry("temp", interval=10, publish=OnChange() | Every(seconds=300))
async def temp() -> dict[str, object]:
return {"celsius": await read_sensor()}
# Publish only when changed AND at least 30s have passed (debounce)
@app.telemetry("temp", interval=10, publish=OnChange() & Every(seconds=30))
async def temp() -> dict[str, object]:
return {"celsius": await read_sensor()}
|(OR): publish if any strategy says yes — useful for change detection with a periodic heartbeat fallback.&(AND): publish only if all strategies agree — useful for debouncing rapid changes.
For threshold modes, comparison semantics, edge cases, and composition details, see Publish Strategies.
Returning None¶
Handlers can return None to suppress a single cycle, independently of any
strategy. The strategy is not consulted for None returns, and the "last
published" value is not updated.
@app.telemetry("counter", interval=5, publish=OnChange())
async def counter(ctx: cosalette.DeviceContext) -> dict[str, object] | None:
meter = ctx.adapter(GasMeterPort)
if not meter.is_ready():
return None # skips this cycle entirely
return {"impulses": meter.read_impulses()}
Initialisation Callbacks (init=)¶
When a telemetry handler needs per-device state — such as a filter instance,
a calibration table, or a connection pool — the init= parameter provides a
clean way to create it once and inject it into every poll cycle.
Without init=, you'd resort to module-level globals or closures.
init= keeps state creation explicit, co-located with the decorator, and
testable in isolation.
Basic Usage¶
class SmoothingFilter:
"""Moving-average filter for noisy sensor readings."""
def __init__(self, window: int = 5) -> None:
self.readings: list[float] = []
self.window = window
def update(self, value: float) -> float:
self.readings.append(value)
if len(self.readings) > self.window:
self.readings.pop(0)
return sum(self.readings) / len(self.readings)
def make_filter() -> SmoothingFilter: # (1)!
return SmoothingFilter(window=10)
@app.telemetry("temperature", interval=30, init=make_filter) # (2)!
async def temperature(smoother: SmoothingFilter) -> dict[str, object]: # (3)!
raw = read_sensor()
return {"celsius": smoother.update(raw)}
- The factory is a plain synchronous callable. It runs once before the first poll cycle — not on every interval.
init=make_filtertells the framework to callmake_filter()and inject the result into the handler.- The handler declares
smoother: SmoothingFilter— the framework matches the return type of the init callback to this parameter automatically.
How It Works¶
- The framework calls
init()once before the handler's polling loop starts. - The return value is added to the dependency-injection provider map, keyed by its type.
- Any handler parameter whose type annotation matches the init result type receives the same instance on every invocation.
- The init callback can itself receive injected parameters (e.g.
Settings) — the same DI machinery used for handler parameters.
Combining with Filters and Strategies¶
init= pairs naturally with the framework's built-in filters and publish
strategies. Use init= to create the filter instance, and publish= to
control when results are sent:
from cosalette import OnChange
from cosalette.filters import Pt1Filter
def make_pt1() -> Pt1Filter:
return Pt1Filter(tau=5.0, dt=10.0)
@app.telemetry(
"temperature",
interval=10,
publish=OnChange(threshold=0.5),
init=make_pt1,
)
async def temperature(pt1: Pt1Filter) -> dict[str, object]:
raw = await read_sensor()
return {"celsius": round(pt1.update(raw), 1)}
Compare this to a module-level pt1 = Pt1Filter(...) pattern — init=
achieves the same result but scopes the filter to the device registration,
making it explicit which device owns the state.
Rules and Constraints¶
- Synchronous only —
async definit callbacks raiseTypeErrorat decoration time. The callback runs during bootstrap, before the async event loop processes device tasks. - Type collision guard — if the init callback returns a type the framework
already provides (
Settings,DeviceContext,Logger,ClockPort,Event), aTypeErroris raised immediately. Use a wrapper class if you need to inject something with a colliding type. - Fail-fast validation — bad signatures (e.g. un-annotated parameters) are caught at decoration time, not at runtime.
Signal Filters¶
Filters are handler-level data transformations that smooth or clean sensor
readings before they reach publish strategies. Unlike strategies that control
when to publish, filters control what is published. They implement the
Filter protocol (update(value) -> float) and compose naturally.
Available Filters¶
cosalette ships three filter implementations in cosalette.filters:
| Filter | Algorithm | Use case |
|---|---|---|
Pt1Filter(tau, dt) |
First-order low-pass (time constant) | Noise smoothing, sample-rate-independent |
MedianFilter(window) |
Sliding-window median | Spike / outlier rejection |
OneEuroFilter(min_cutoff, beta, d_cutoff, dt) |
Adaptive 1€ Filter (Casiez 2012) | Mostly-static signals with occasional movement |
Example: PT1 Filter with init=¶
from cosalette import OnChange
from cosalette.filters import Pt1Filter
def make_pt1() -> Pt1Filter:
return Pt1Filter(tau=5.0, dt=10.0)
@app.telemetry(
"temperature",
interval=10,
publish=OnChange(threshold=0.5),
init=make_pt1,
)
async def temperature(pt1: Pt1Filter) -> dict[str, object]:
raw = await read_sensor()
return {"celsius": round(pt1.update(raw), 1)}
For algorithm details, parameter tuning, and the decision table, see Signal Filters.
Persistence¶
Telemetry devices can persist state across restarts using the store=
and persist= parameters.
Basic Usage¶
Pass a Store backend to the app, then declare store: DeviceStore in
your handler:
import cosalette
from cosalette import JsonFileStore, DeviceStore, SaveOnPublish
app = cosalette.App(
"myapp", "1.0.0",
store=JsonFileStore("./data/state.json"),
)
@app.telemetry("counter", interval=30, persist=SaveOnPublish())
async def counter(store: DeviceStore) -> dict[str, object]:
store["total"] = store.get("total", 0) + 1
return {"total": store["total"]}
Available Save Policies¶
| Policy | Saves when |
|---|---|
SaveOnPublish() |
After each MQTT publish |
SaveOnChange() |
Whenever the store is dirty |
SaveOnShutdown() |
Only on graceful shutdown |
Policies compose with | (OR) and & (AND):
Combining with Other Features¶
Persistence works seamlessly with publish strategies, filters, and init callbacks:
from cosalette import DeviceStore, OnChange, Pt1Filter, SaveOnPublish
@app.telemetry(
"sensor",
interval=10,
publish=OnChange(threshold=0.5),
persist=SaveOnPublish(),
init=lambda: Pt1Filter(tau=2.0, dt=10.0),
)
async def sensor(
store: DeviceStore,
lpf: Pt1Filter,
) -> dict[str, object]:
raw = 21.5 # e.g. from an adapter
filtered = lpf.update(raw)
store["last_value"] = filtered
return {"value": filtered}
Testing persistence
Use MemoryStore() in tests — it keeps data in memory with no
filesystem access. See the Testing Guide for details.
For full details, see the Persistence concept.
Typed Telemetry Returns¶
Instead of returning a raw dict, you can return a Pydantic model. The framework
serializes it via Pydantic TypeAdapter (JSON-mode serialization) before publishing.
This supports BaseModel, stdlib dataclasses, TypedDict, and primitives — not just
BaseModel. The return annotation (or
state_model= on the decorator) drives normalization:
from pydantic import BaseModel
class SensorReading(BaseModel):
celsius: float
humidity: float
@app.telemetry("climate", interval=60, state_model=SensorReading)
async def climate(ctx: cosalette.DeviceContext) -> SensorReading:
return SensorReading(celsius=21.5, humidity=58.0)
Primitive / list returns are wrapped as {"value": ...} automatically. Return
None to suppress a cycle as usual.
Triggerable Telemetry¶
By default, telemetry devices are poll-only — the framework calls them on a
fixed interval. Adding triggerable=True makes a device also respond to inbound
MQTT commands on {prefix}/{device}/set, firing the handler immediately when a
message arrives. The regular interval-based polling continues alongside triggers.
This is useful for devices that normally poll on a long interval but need on-demand refresh — e.g. a sensor that reports every 5 minutes but can be read immediately when a user clicks "Refresh" in the UI.
Basic Usage¶
@app.telemetry("sensor", interval=300, triggerable=True) # (1)!
async def sensor() -> dict[str, object]:
"""Read sensor — every 5 min, or immediately on trigger."""
return {"temperature": await read_sensor()}
- The framework subscribes to
myapp/sensor/set. Any message on that topic fires the handler immediately. The 300-second interval continues in parallel.
Accessing the Trigger Payload¶
When a handler needs to know whether it was triggered or access the
MQTT payload that caused the trigger, declare a TriggerPayload parameter:
from cosalette import TriggerPayload
@app.telemetry("sensor", interval=300, triggerable=True)
async def sensor(trigger: TriggerPayload) -> dict[str, object]: # (1)!
days = trigger.get("days", 7) if trigger.is_triggered else 7 # (2)!
return {"temperature": await read_sensor(days=days)}
TriggerPayloadis injected automatically via DI — noinit=needed.- On scheduled runs,
trigger.is_triggeredisFalseandget()returns the default. On triggered runs,trigger.datacontains the parsed JSON payload (if valid), andtrigger.rawholds the raw MQTT string.
Typed Trigger Payload¶
Declare Annotated[Model | None, Payload()] to receive the trigger payload as a
parsed Pydantic model. On scheduled runs the parameter is bound to None; on
triggered runs it holds the validated model:
from __future__ import annotations
from typing import Annotated
from pydantic import BaseModel
from cosalette.mqtt import Payload
class RefreshCommand(BaseModel):
days: int = 7
@app.telemetry("sensor", interval=300, triggerable=True)
async def sensor(
cmd: Annotated[RefreshCommand | None, Payload()],
) -> dict[str, object]:
days = cmd.days if cmd is not None else 7
return {"data": await read_sensor(days=days)}
The raw TriggerPayload approach (see above) remains available when you only
need is_triggered / raw / data without a full Pydantic model.
Constraints¶
/// admonition | Root devices cannot be triggerable type: warning
triggerable=True requires a named device — root (unnamed) devices
have no topic segment to subscribe to. Attempting @app.telemetry(interval=60, triggerable=True) raises ValueError.
///
/// admonition | Coalescing groups are incompatible type: warning
triggerable=True and group= cannot be combined. Coalescing groups use
a shared tick-aligned scheduler that is incompatible with on-demand triggers.
///
Coalescing Behaviour¶
If multiple MQTT messages arrive before the handler finishes its current
execution, the trigger coalesces — only the latest payload is used.
The handler runs once with the most recent TriggerPayload, not once per
message. This prevents thundering-herd scenarios when a burst of triggers
arrives.
Contract Metadata¶
Every @app.telemetry registration can carry contract metadata — optional
fields that describe what the device produces and how it behaves. The metadata
is surfaced by cosalette manifest and the MCP server; it has no runtime effect.
from pydantic import BaseModel
import cosalette
class CounterReading(BaseModel):
impulses: int
temperature_celsius: float
@app.telemetry(
"counter",
interval=cosalette.setting_ref("poll_interval"), # (1)!
triggerable=True,
summary="Gas meter impulse count and ambient temperature", # (2)!
state_model=CounterReading, # (3)!
behavior=["reads serial port", "applies outlier rejection"], # (4)!
effects=["updates Home Assistant energy dashboard"], # (5)!
)
async def counter(ctx: cosalette.DeviceContext) -> dict[str, object]:
meter = ctx.adapter(GasMeterPort)
return {"impulses": meter.read_impulses(), "temperature_celsius": meter.read_temperature()}
setting_ref("poll_interval")exposes the field name in the manifest. A rawlambda s: s.poll_intervalworks identically at runtime but shows"<deferred>"in manifest output.summaryis a one-line human-readable description of what this device reports.state_modeldocuments the expected shape of the published JSON — useful for tooling and AI coding assistants.behaviorlists ordered operational steps; each string is one bullet in the manifest.effectslists side effects and downstream mutations.
Use payload_model on triggerable telemetry to document the JSON shape accepted
on the /set topic:
class RefreshRequest(BaseModel):
days: int = 7
@app.telemetry(
"counter",
interval=cosalette.setting_ref("poll_interval"),
triggerable=True,
state_model=CounterReading,
payload_model=RefreshRequest, # shape accepted on /set
)
async def counter() -> dict[str, object]: ...
For the full contract-first workflow — including the read/write split pattern,
setting_ref inspectability, and viewing the manifest — see the
Contract-First Route Design guide.
Practical Example: Gas Meter Impulse Counter¶
Here's a complete, realistic telemetry device for a gas meter with a reed switch impulse sensor:
"""gas2mqtt — Gas meter impulse counter bridge."""
from __future__ import annotations
from typing import Protocol, runtime_checkable
import cosalette
from pydantic import Field
from pydantic_settings import SettingsConfigDict
# --- Port (Protocol) for hardware abstraction ---
@runtime_checkable
class GasMeterPort(Protocol):
"""Hardware abstraction for gas meter impulse sensors."""
def read_impulses(self) -> int: ...
def read_temperature(self) -> float: ...
# --- Settings ---
class Gas2MqttSettings(cosalette.Settings):
model_config = SettingsConfigDict(
env_prefix="GAS2MQTT_",
env_nested_delimiter="__",
env_file=".env",
env_file_encoding="utf-8",
)
serial_port: str = Field(default="/dev/ttyUSB0")
poll_interval: int = Field(default=60, ge=1)
# --- App ---
app = cosalette.App(
name="gas2mqtt",
version="1.0.0",
settings_class=Gas2MqttSettings,
)
# --- Telemetry device ---
@app.telemetry("counter", interval=app.settings.poll_interval) # (1)!
async def counter(ctx: cosalette.DeviceContext) -> dict[str, object]:
"""Read gas meter impulses and publish state."""
meter = ctx.adapter(GasMeterPort)
impulses = meter.read_impulses()
temp = meter.read_temperature()
return {
"impulses": impulses,
"temperature_celsius": temp,
}
app.run()
app.settingsis available at decoration time becauseApp.__init__eagerly instantiates the settings class. Thepoll_intervalvalue here reflects environment variables and.envfiles — no hardcoded constants needed.
Error Behaviour¶
When a telemetry function raises an exception, the framework applies state-transition deduplication:
- First error — caught, logged at
ERRORlevel, and published togas2mqtt/errorandgas2mqtt/counter/error. The device health status in the heartbeat is set to"error". - Repeated same-type errors — suppressed. No additional MQTT publishes until the error type changes. This prevents flooding the broker when a sensor is persistently broken.
- Different error type — treated as a new error: published and logged.
- Recovery — when the next poll succeeds after a failure, recovery is
logged at
INFOlevel and the device health status is restored to"ok"in the heartbeat. - Continues the polling loop — the next interval always runs.
This means transient failures (sensor timeouts, I/O glitches) are self-healing. The daemon stays up and retries on the next cycle. Persistent failures produce a single error event instead of flooding MQTT with identical messages every interval.
@app.telemetry("counter", interval=60)
async def counter(ctx: cosalette.DeviceContext) -> dict[str, object]:
meter = ctx.adapter(GasMeterPort)
reading = meter.read_impulses() # (1)!
if reading < 0:
raise ValueError(f"Invalid impulse count: {reading}") # (2)!
return {"impulses": reading}
- If
read_impulses()raisesOSError, the framework catches it and publishes an error payload. The loop continues. - You can also raise explicitly — the framework treats it the same way.
Custom error types
For machine-readable error classification, define an error_type_map. See
Custom Error Types for details.
Retry / Backoff¶
By default, a failed telemetry poll publishes an error and waits for the next
interval. When polling a flaky transport (BLE, serial, HTTP), you often want
the framework to retry the handler a few times before giving up. The retry=
parameter adds exactly that — a configurable retry loop with backoff delays,
all shutdown-aware.
Basic Usage¶
import cosalette
app = cosalette.App(name="ble2mqtt", version="1.0.0")
@app.telemetry("sensor", interval=30, retry=3) # (1)!
async def sensor(ctx: cosalette.DeviceContext) -> dict[str, object]:
"""Read a BLE sensor that sometimes times out."""
adapter = ctx.adapter(BLESensorPort)
return {"temperature": await adapter.read_temperature()}
app.run()
- Up to 3 retry attempts on failure. Defaults to retrying on
OSErrorwithExponentialBackoff(base=2.0, max_delay=60.0)— delays of ~2 s, ~4 s, ~8 s (with ±20 % jitter).
How It Works¶
- The framework calls your handler.
- If it raises an exception matching
retry_on, the attempt is logged at WARNING level (not published to MQTT). - The framework sleeps for the backoff delay using
ctx.sleep()— if a shutdown signal arrives during the wait, the sleep is aborted immediately. - Steps 1–3 repeat up to
retrytimes. - If the handler still fails after all retries, the exception falls through to the normal error path: logged at ERROR, published to the error topic, and state-transition deduplication applies.
- On success, the cumulative retry counter resets to zero.
Cumulative counter
The retry counter is not reset between poll cycles. If the handler fails twice in cycle N and once more in cycle N+1, that counts as three total attempts. The counter only resets when a poll succeeds.
Custom Backoff Strategies¶
The default ExponentialBackoff works well for most transports. For
different patterns, choose an alternative or write your own:
from cosalette import LinearBackoff, FixedBackoff
# Linear: 1s, 2s, 3s, ... capped at 30s
@app.telemetry("serial", interval=60, retry=5, backoff=LinearBackoff(step=1.0, max_delay=30.0))
async def serial_sensor(ctx: cosalette.DeviceContext) -> dict[str, object]:
return {"value": await read_serial(ctx)}
# Fixed: always wait exactly 2s between attempts
@app.telemetry("http", interval=120, retry=3, backoff=FixedBackoff(delay=2.0))
async def http_sensor(ctx: cosalette.DeviceContext) -> dict[str, object]:
return {"value": await fetch_api(ctx)}
For fully custom logic, implement the BackoffStrategy protocol — a single
method delay(attempt: int) -> float:
class SlowStart:
"""No delay on first retry, then exponential."""
def delay(self, attempt: int) -> float:
if attempt <= 1:
return 0.0
return min(2.0 ** (attempt - 1), 30.0)
@app.telemetry("sensor", interval=30, retry=4, backoff=SlowStart())
async def sensor(ctx: cosalette.DeviceContext) -> dict[str, object]:
return {"temperature": await read_ble(ctx)}
Circuit Breaker¶
When a backend is down for an extended period, retrying every poll cycle
wastes resources and floods logs. A CircuitBreaker short-circuits the
retry loop after repeated failures:
from cosalette import CircuitBreaker, ExponentialBackoff
@app.telemetry(
"inverter",
interval=60,
retry=3,
backoff=ExponentialBackoff(base=2.0, max_delay=30.0),
circuit_breaker=CircuitBreaker(threshold=5), # (1)!
)
async def inverter(ctx: cosalette.DeviceContext) -> dict[str, object]:
adapter = ctx.adapter(ModbusPort)
return {"power_w": await adapter.read_register(0x0006)}
- After 5 consecutive failures (across poll cycles), the circuit opens — the handler is skipped entirely until a half-open probe succeeds.
The circuit breaker uses a three-state machine:
| State | Behaviour |
|---|---|
| Closed | Normal operation — handler runs, failures counted |
| Open | Handler skipped — no retries, no error publishes |
| Half-open | A single probe attempt — success closes, failure re-opens |
Combining with Other Features¶
Retry composes naturally with publish strategies, persistence, and coalescing groups. Each feature operates at its own layer:
from cosalette import (
CircuitBreaker,
DeviceStore,
ExponentialBackoff,
OnChange,
SaveOnPublish,
)
@app.telemetry(
"boiler",
interval=30,
publish=OnChange(threshold=0.5),
persist=SaveOnPublish(),
retry=3,
backoff=ExponentialBackoff(base=2.0, max_delay=30.0),
circuit_breaker=CircuitBreaker(threshold=5),
group="optolink", # (1)!
)
async def boiler(
ctx: cosalette.DeviceContext,
store: DeviceStore,
) -> dict[str, object]:
adapter = ctx.adapter(OptolinkPort)
data = await adapter.read_signals(["boiler_temp", "burner_hours"])
store["last_reading"] = data
return data
- Within a coalescing group, each handler has its own independent retry
state. If
boilerretries whilehotwatersucceeds, onlyboilercounts failures.
Constraints
retry_ondefaults to(OSError,)whenretry > 0and no explicitretry_onis provided. Non-matching exceptions bypass retry entirely and go straight to the error path.- Cumulative counter — retries accumulate across poll cycles and only reset on success.
- Telemetry only —
retry=is not available on@app.commandor@app.device. Those archetypes have different execution models.
Interval Guidelines¶
| Sensor Type | Typical Interval | Notes |
|---|---|---|
| Temperature / humidity | 30–60 s | Slow-changing physical quantities |
| Energy / impulse | 10–60 s | Depends on consumption rate |
| Motion / presence | 1–5 s | Fast-changing binary sensor |
| Battery level | 300–600 s | Very slow-changing |
Coalescing Groups¶
When multiple telemetry handlers share a physical resource (e.g. a serial bus),
use the group= parameter to coalesce them into a shared execution window:
@app.telemetry(name="outdoor", interval=300, group="optolink")
async def outdoor(port: OptolinkPort) -> dict[str, object]:
return await port.read_signals(["outdoor_temp"])
@app.telemetry(name="hotwater", interval=300, group="optolink")
async def hotwater(port: OptolinkPort) -> dict[str, object]:
return await port.read_signals(["hot_water_temp"])
Handlers in the same group execute sequentially within a batch when their intervals coincide. At t=0 all grouped handlers fire together; at subsequent ticks only those whose interval divides evenly into the elapsed time fire.
Each handler retains its own publish strategy, error isolation, persistence
policy, and init function. The group= parameter is purely an execution
scheduling hint.
See ADR-018 for the full design rationale.
Cron-Based Scheduling¶
When your device needs time-of-day-aligned polling — daily at 06:00, twice a day,
or on specific weekdays — use the schedule= parameter instead of interval=:
from cosalette import CronSchedule
@app.telemetry("calendar", schedule="0 0 6,18 * * ?") # (1)!
async def calendar() -> dict[str, object]:
events = await fetch_calendar_events()
return {"events": events}
- Quartz cron format:
second minute hour day-of-month month day-of-week. This fires at 06:00 and 18:00 daily. The first execution runs immediately on startup, then waits for the next scheduled time.
schedule= vs interval=¶
- Mutually exclusive — providing both raises
ValueError - One is required — every telemetry registration needs either
schedule=orinterval= schedule=accepts a cron string, a pre-parsedCronScheduleinstance, or aCronSpeccallable for per-device schedules (see Per-Device Schedules below)schedule=cannot combine withgroup=(coalescing groups requireinterval=)- All other telemetry features (
publish=,persist=,retry=,init=) work with bothschedule=andinterval=
Per-Device Schedules¶
When name= is a callable (dict-name multi-device registration), schedule= can
also be a callable — a CronSpec — that receives the per-device config and
returns a cron string or CronSchedule instance. This lets each device run on its
own wall-clock schedule:
from dataclasses import dataclass
from cosalette import App, DeviceContext
@dataclass
class SensorConfig:
mac: str
cron_expr: str = "0 0 * * * ?" # default: every hour
app = App(name="sensors", version="1.0.0")
@app.telemetry(
name=lambda s: {
"morning_sensor": SensorConfig(mac="AA:...:01", cron_expr="0 0 6 * * ?"),
"evening_sensor": SensorConfig(mac="AA:...:02", cron_expr="0 0 18 * * ?"),
},
schedule=lambda cfg: cfg.cron_expr, # (1)!
)
async def sensor(
ctx: DeviceContext, config: SensorConfig,
) -> dict[str, object]:
return {"value": await read_ble(config.mac)}
- The
schedule=callable receives the per-device config object (notSettings).morning_sensorfires at 06:00;evening_sensorfires at 18:00.
Constraints
- Requires
name=to be a callable (dict-name form). Static names raiseValueError. - Cannot combine with
group=(coalescing groups require a sharedinterval=).
Quartz Cron Format¶
Cosalette uses Quartz-compatible cron expressions with 6 or 7 fields:
┌───────────── second (0-59)
│ ┌───────────── minute (0-59)
│ │ ┌───────────── hour (0-23)
│ │ │ ┌───────────── day of month (1-31)
│ │ │ │ ┌───────────── month (1-12 or JAN-DEC)
│ │ │ │ │ ┌───────────── day of week (1-7, 1=SUN, or SUN-SAT)
│ │ │ │ │ │ ┌───────────── year (optional)
│ │ │ │ │ │ │
* * * * * * *
Common examples:
| Expression | Fires at |
|---|---|
0 0 6 * * ? |
Daily at 06:00:00 |
0 0 6,18 * * ? |
Daily at 06:00 and 18:00 |
0 30 * * * ? |
Every hour at :30 |
0 0 0 1 * ? |
First day of each month at midnight |
0 0 8 ? * MON-FRI |
Weekdays at 08:00 |
Timezone
Scheduled times use the system's local timezone by default.
In Docker containers, this is controlled by the TZ environment variable.
DST transitions may shift scheduled times by ±1 hour.
Pre-Parsed Schedules¶
For validation at import time or reuse, parse the expression explicitly:
from cosalette import CronSchedule
morning = CronSchedule("0 0 6 * * ?")
@app.telemetry("calendar", schedule=morning)
async def calendar() -> dict[str, object]:
...
CronSchedule validates eagerly — invalid expressions raise ValueError
at construction time, not when the first fire is due.
When to Use @app.device + ctx.sleep_until() Instead¶
For devices managed via @app.device that need time-of-day alignment
without the @app.telemetry polling model, use ctx.sleep_until():
import cosalette
from datetime import time
@app.device("calendar")
async def calendar(ctx: cosalette.DeviceContext):
while not ctx.shutdown_requested:
events = await fetch_calendar_events()
await ctx.publish_state({"events": events})
yield
await ctx.sleep_until(time(6, 0)) # (1)!
- Sleeps until the next 06:00 (local timezone by default).
Pass
tz=datetime.timezone.utcfor UTC, ortz=ZoneInfo("Europe/Berlin")for an explicit timezone.
ctx.sleep_until() also accepts a sequence of times — it sleeps until the
nearest upcoming one:
Using Telemetry with Router¶
For multi-module applications, define telemetry devices on a Router instead of
directly on App:
import cosalette
router = cosalette.Router(prefix="sensors", tags=["environment"])
@router.telemetry("temperature", interval=30)
async def read_temperature() -> dict[str, object]:
"""Read I2C temperature sensor."""
return {"celsius": 22.5}
@router.telemetry("humidity", interval=30)
async def read_humidity() -> dict[str, object]:
"""Read I2C humidity sensor."""
return {"percent": 55.0}
import cosalette
from sensors import router as sensors_router
app = cosalette.App(name="home2mqtt", version="1.0.0")
app.include_router(sensors_router)
if __name__ == "__main__":
app.run()
MQTT topics:
home2mqtt/sensors/temperature/statehome2mqtt/sensors/humidity/state
All telemetry features (publish strategies, filters, retry, triggerable, typed returns) work identically on routers. See Router Composition for full multi-module patterns.
See Also¶
- Router Composition — organize telemetry devices in multi-module apps
- Device Archetypes — telemetry vs command archetypes
- MQTT Topics — the
{prefix}/{device}/statetopic layout - Architecture — how devices fit into the framework
- Publish Strategies — publishing control concepts
- Signal Filters — handler-level data transformations
- ADR-010 — the decision behind device archetypes
- ADR-013 — the decision behind publish strategies
- ADR-014 — the decision behind signal filters
- ADR-018 — the decision behind coalescing groups
- ADR-024 — the decision behind retry/backoff
- ADR-032 — the decision behind cron scheduling and wall-clock sleep