API Reference¶
Complete reference for all public classes, functions, and protocols exported by cosalette.
Application¶
cosalette.App
¶
App(
name: str,
version: str = "0.0.0",
*,
description: str = "IoT-to-MQTT bridge",
settings_class: type[Settings] = Settings,
dry_run: bool = False,
heartbeat_interval: float | None = 60.0,
health_check_interval: float | None = 30.0,
lifespan: LifespanFunc | None = None,
store: Store | Callable[..., Store] | None = None,
adapters: dict[
type,
type
| str
| Callable[..., object]
| tuple[
type | str | Callable[..., object],
type | str | Callable[..., object],
],
]
| None = None,
restart_after_failures: int = 5,
max_restarts: int = 3,
restart_cooldown: float = 5.0,
sustained_health_reset: float = 300.0,
)
Bases: _ConfigureMixin, _DeviceMixin, _CommandMixin, _TelemetryMixin, _StreamMixin, _PeriodicMixin, _AdapterMixin, _LifecycleMixin, _AsyncapiMixin
Central composition root and application orchestrator.
Collects device registrations, adapter mappings, and an optional
lifespan context manager, then runs the full async lifecycle
in :meth:run.
See Also
ADR-001 — Framework architecture (IoC, composition root).
Initialise the application orchestrator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Application name (used as MQTT topic prefix and client ID). |
required |
version
|
str
|
Application version string. |
'0.0.0'
|
description
|
str
|
Short description for CLI help text. |
'IoT-to-MQTT bridge'
|
settings_class
|
type[Settings]
|
Settings subclass to instantiate at startup. |
Settings
|
dry_run
|
bool
|
When True, resolve dry-run adapter variants. |
False
|
heartbeat_interval
|
float | None
|
Seconds between periodic heartbeats
published to |
60.0
|
health_check_interval
|
float | None
|
Seconds between periodic health
checks for adapters implementing
:class: |
30.0
|
lifespan
|
LifespanFunc | None
|
Async context manager for application startup
and shutdown. Code before |
None
|
store
|
Store | Callable[..., Store] | None
|
Optional :class: |
None
|
adapters
|
dict[type, type | str | Callable[..., object] | tuple[type | str | Callable[..., object], type | str | Callable[..., object]]] | None
|
Optional mapping of port types to adapter
implementations. Each key is a Protocol type; each
value is either a single implementation (class,
lazy-import string, or factory callable) or a
|
None
|
Source code in packages/src/cosalette/_app/__init__.py
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 | |
settings
property
¶
settings: Settings
Application settings, instantiated at construction time.
The instance is created eagerly in __init__ from the
settings_class parameter. Environment variables and
.env files are read at that point, so decorator arguments
like interval=app.settings.poll_interval reflect the
actual runtime configuration.
The CLI entrypoint (:meth:cli) re-instantiates settings
with --env-file support and passes the result to
:meth:_run_async, which takes precedence over this
instance.
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the settings class could not be
instantiated at construction time (e.g. required
fields with no defaults and no matching environment
variables). Use |
devices
property
¶
Registered device handlers (read-only view).
telemetry_registrations
property
¶
Registered telemetry handlers (read-only view).
Named telemetry_registrations rather than telemetry to
avoid shadowing the :meth:telemetry registration decorator.
commands
property
¶
Registered command handlers (read-only view).
periodic_registrations
property
¶
Registered periodic handlers (read-only view).
adapters
property
¶
Registered adapter entries keyed by port type (read-only view).
registered_names
¶
Collect registered device/telemetry/command/periodic names.
Source code in packages/src/cosalette/_app/__init__.py
include_router
¶
include_router(
router: Router,
*,
prefix: str | None = None,
tags: list[str] | None = None,
dependencies: list[Any] | None = None,
adapters: dict[
type,
type
| str
| Callable[..., object]
| tuple[
type | str | Callable[..., object],
type | str | Callable[..., object],
],
]
| None = None,
) -> None
Include a router's registrations in this application.
Applies snapshot semantics: registrations are captured at call time. Later mutations to the router do not affect prior inclusions. Multiple inclusions with different prefixes are allowed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
router
|
Router
|
Router instance to include. |
required |
prefix
|
str | None
|
Optional single MQTT topic segment prepended to all
router operation names. Must not contain |
None
|
tags
|
list[str] | None
|
Additional tags applied to all router operations. Accumulates in order: router constructor → include_router → operation. |
None
|
dependencies
|
list[Any] | None
|
Reserved for cos-ebc. Must be None or empty. |
None
|
adapters
|
dict[type, type | str | Callable[..., object] | tuple[type | str | Callable[..., object], type | str | Callable[..., object]]] | None
|
Adapter declarations merged into the app's registry.
Same shape as |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If prefix contains MQTT special characters. |
ValueError
|
If an adapter port type conflict is detected. |
NotImplementedError
|
If dependencies is not None or empty. |
See Also
ADR-044 — Public Router and composition API.
Example::
# sensors.py
router = cosalette.Router(prefix="sensors")
@router.telemetry("temperature", interval=30)
async def read_temperature() -> dict:
return {"celsius": 22.5}
# main.py
app = cosalette.App("bridge")
app.include_router(router, tags=["production"])
# → publishes to: bridge/sensors/temperature/state
Source code in packages/src/cosalette/_app/__init__.py
438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 | |
cosalette.AppContext
¶
AppContext(
*, settings: Settings, adapters: dict[type, object]
)
Context for the application lifespan.
Provided to the lifespan async context manager registered via
App(lifespan=...). Offers access to settings and adapter
resolution but NOT per-device features (no publish, no on_command,
no sleep).
See Also
ADR-001 — Framework architecture (lifespan).
Initialise lifecycle-hook context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
Settings
|
Application settings instance. |
required |
adapters
|
dict[type, object]
|
Resolved adapter registry mapping port types to instances. |
required |
Source code in packages/src/cosalette/_context/_app_context.py
adapter
¶
Resolve an adapter by port type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
port_type
|
type[T]
|
The Protocol type to look up. |
required |
Returns:
| Type | Description |
|---|---|
T
|
The adapter instance registered for that port type. |
Raises:
| Type | Description |
|---|---|
LookupError
|
If no adapter is registered for the port type. |
Source code in packages/src/cosalette/_context/_app_context.py
cosalette.DeviceContext
¶
DeviceContext(
*,
name: str,
settings: Settings,
mqtt: MqttPort,
topic_prefix: str,
shutdown_event: Event,
adapters: dict[type, object],
clock: ClockPort,
is_root: bool = False,
)
Per-device runtime context injected by the framework.
Provides device-scoped access to MQTT publishing, command registration, shutdown-aware sleeping, and adapter resolution.
Each device function receives its own DeviceContext instance. The context is pre-configured with the device's name and topic prefix so that publish operations target the correct MQTT topics automatically.
Initialise per-device context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Device name as registered (e.g. "blind"). |
required |
settings
|
Settings
|
Application settings instance. |
required |
mqtt
|
MqttPort
|
MQTT port for publishing. |
required |
topic_prefix
|
str
|
Root prefix for MQTT topics (e.g. "velux2mqtt"). |
required |
shutdown_event
|
Event
|
Shared event that signals graceful shutdown. |
required |
adapters
|
dict[type, object]
|
Resolved adapter registry mapping port types to instances. |
required |
clock
|
ClockPort
|
Monotonic clock for timing. |
required |
is_root
|
bool
|
When True, topics omit the device name segment (root-level device). |
False
|
Source code in packages/src/cosalette/_context/_device_context.py
shutdown_requested
property
¶
True when the framework has received a shutdown signal.
command_handler
property
¶
The root command handler, or None. Framework-internal.
command_handlers
property
¶
All registered command handlers keyed by sub-topic. Framework-internal.
get_command_handler
¶
Look up the command handler for a sub-topic (or root).
publish_state
async
¶
Publish device state to {prefix}/{device}/state as JSON.
For root devices (unnamed), publishes to {prefix}/state instead.
This is the primary publication method for device telemetry. The payload dict is JSON-serialised automatically.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
payload
|
dict[str, object]
|
Dict to serialise as JSON. |
required |
retain
|
bool
|
Whether the message should be retained (default True). |
True
|
Source code in packages/src/cosalette/_context/_device_context.py
publish
async
¶
Publish to an arbitrary sub-channel: {prefix}/{device}/{channel}.
For root devices (unnamed), publishes to {prefix}/{channel} instead.
Escape hatch for non-standard topics. Prefer publish_state() for normal device state updates.
Source code in packages/src/cosalette/_context/_device_context.py
sleep
async
¶
Shutdown-aware sleep.
Returns early (without exception) if shutdown is requested during the sleep period. This enables the idiomatic pattern::
while not ctx.shutdown_requested:
await ctx.sleep(10)
# ... do work ...
Source code in packages/src/cosalette/_context/_device_context.py
sleep_until
async
¶
Shutdown-aware sleep until a wall-clock time.
Sleeps until the next occurrence of target (or the nearest
upcoming time if a sequence is given). Uses local timezone
when tz is None.
Returns early (without exception) if shutdown is requested
during the sleep, via :meth:sleep.
Example — poll twice daily at 06:00 and 18:00 local time::
while not ctx.shutdown_requested:
data = await read_data()
await ctx.publish_state(data)
await ctx.sleep_until([time(6, 0), time(18, 0)])
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target
|
time | Sequence[time]
|
A :class: |
required |
tz
|
tzinfo | None
|
Timezone for interpreting target. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If target is an empty sequence. |
See Also
ADR-032 — Wall-clock scheduling design.
Source code in packages/src/cosalette/_context/_device_context.py
sub_entity
async
¶
sub_entity(name: str) -> AsyncIterator[SubEntityContext]
Scoped sub-entity lifecycle with automatic availability.
Publishes "online" on enter and "offline" on exit to
{topic_base}/{name}/availability. Clears retained state
on exit by publishing an empty payload to the state topic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Sub-entity name (single MQTT topic level). |
required |
Yields:
| Name | Type | Description |
|---|---|---|
A |
AsyncIterator[SubEntityContext]
|
class: |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the name fails validation. |
See Also
ADR-031 — Sub-entity context manager.
Source code in packages/src/cosalette/_context/_device_context.py
on_command
¶
on_command(
handler_or_sub_topic: CommandHandler
| str
| None = None,
) -> (
CommandHandler
| Callable[[CommandHandler], CommandHandler]
)
Register a command handler for this device.
Supports three call patterns:
-
Decorator — root handler::
@ctx.on_command async def handle(sub_topic: str | None, payload: str) -> None: ...
-
Direct call — root handler::
ctx.on_command(handle)
-
Decorator factory — sub-topic handler::
@ctx.on_command("calibrate") async def handle_cal(sub_topic: str | None, payload: str) -> None: ...
Handlers may also accept a single :class:Command argument
(new-style), detected automatically via type annotation::
@ctx.on_command
async def handle(cmd: Command) -> None: ...
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If a handler is already registered for the same
sub-topic, or if |
ValueError
|
If the sub-topic string is empty or contains
|
Returns:
| Type | Description |
|---|---|
CommandHandler | Callable[[CommandHandler], CommandHandler]
|
The handler unchanged when called with a callable, or a |
CommandHandler | Callable[[CommandHandler], CommandHandler]
|
decorator function when called with a sub-topic string or None. |
Source code in packages/src/cosalette/_context/_device_context.py
383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 | |
commands
¶
commands(
timeout: float | None = None,
) -> AsyncIterator[Command | None]
Async iterator that yields inbound commands from the internal queue.
Provides a queue-backed async iterator for @app.device loops,
eliminating the need for manual asyncio.Queue bridges.
When timeout is provided, yields None on timeout expiry,
enabling periodic-work patterns::
async for cmd in ctx.commands(timeout=5):
if cmd is None:
await periodic_check()
else:
await process(cmd.payload)
Without timeout, blocks until a command arrives or shutdown is requested. Shutdown is detected immediately via event racing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float | None
|
Seconds to wait for a command before yielding None. When None (default), blocks indefinitely until a command arrives or shutdown is requested. |
None
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[Command | None]
|
Command when a command arrives, or None on timeout expiry. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If called more than once on the same context,
or if a command handler is already registered via
:meth: |
See Also
ADR-025 — Command channel and sub-topic routing.
Source code in packages/src/cosalette/_context/_device_context.py
adapter
¶
Resolve an adapter by port type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
port_type
|
type[T]
|
The Protocol type to look up. |
required |
Returns:
| Type | Description |
|---|---|
T
|
The adapter instance registered for that port type. |
Raises:
| Type | Description |
|---|---|
LookupError
|
If no adapter is registered for the port type. |
Source code in packages/src/cosalette/_context/_device_context.py
cosalette.SubEntityContext
¶
Context for a sub-entity within a device.
Provides scoped MQTT publishing for a sub-entity's topic namespace.
Created via :meth:DeviceContext.sub_entity context manager — not
instantiated directly by user code.
See Also
ADR-031 — Sub-entity context manager.
Source code in packages/src/cosalette/_context/_sub_entity_context.py
publish_state
async
¶
Publish sub-entity state to {device}/{name}/state as JSON.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
payload
|
dict[str, object]
|
Dict to serialise as JSON. |
required |
retain
|
bool
|
Whether the message should be retained (default True). |
True
|
Source code in packages/src/cosalette/_context/_sub_entity_context.py
on_command
¶
Register a command handler for this sub-entity's sub-topic.
Delegates to the parent device's :meth:~DeviceContext.on_command
with this sub-entity's name as the sub-topic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
handler
|
CommandHandler
|
Async callable to handle inbound commands. |
required |
Returns:
| Type | Description |
|---|---|
CommandHandler
|
The handler, unchanged. |
Source code in packages/src/cosalette/_context/_sub_entity_context.py
cosalette.Command
dataclass
¶
An inbound MQTT command.
Attributes:
| Name | Type | Description |
|---|---|---|
topic |
str
|
Full MQTT topic the command arrived on. |
payload |
str
|
Raw payload string. |
sub_topic |
str | None
|
Sub-topic segment, or None for root commands. |
timestamp |
float
|
Monotonic timestamp at receipt (seconds). |
cosalette.CronSchedule
¶
Parsed Quartz-compatible cron expression.
Supports 6-field (second through day-of-week) and 7-field (with year) expressions.
Example::
sched = CronSchedule("0 30 10-13 ? * WED,FRI")
next_dt = sched.next_fire_after(datetime.datetime.now())
Source code in packages/src/cosalette/_cron/_schedule.py
next_fire_after
¶
Return the next datetime strictly after after that matches.
The returned datetime preserves the timezone of after.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
after
|
datetime
|
Reference datetime (exclusive — result is strictly after). |
required |
Returns:
| Type | Description |
|---|---|
datetime
|
Next matching datetime. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If no match is found within the scan limit (4 years). |
Source code in packages/src/cosalette/_cron/_schedule.py
__eq__
¶
Check equality based on the raw expression string.
Note: Semantically equivalent expressions with different syntax
(e.g. "0 0 12 * * MON" vs "0 0 12 * * 2") compare
unequal because comparison uses the unparsed expression text.
Source code in packages/src/cosalette/_cron/_schedule.py
Shared-State Factories¶
@app.state registers a factory that runs once at bootstrap, after settings are
resolved and before lifecycle adapters are entered. Its return value is registered
in the DI container by the return type and injected into any handler declaring that
type.
Four factory forms are supported, detected from the return annotation at registration time:
| Form | Teardown |
|---|---|
def f(...) -> T |
None |
def f(...) -> ContextManager[T] |
__exit__ on shutdown |
async def f(...) -> AsyncIterator[T] |
generator finalized on shutdown |
async def f(...) -> AsyncContextManager[T] |
__aexit__ on shutdown |
Teardown runs in reverse registration order (LIFO).
The factory may optionally declare one parameter annotated with Settings or a
subclass — the framework passes the resolved settings instance narrowed to that type.
Zero-parameter factories are also valid.
Registration-time validation:
- Missing return annotation →
TypeError - Unsupported return annotation form →
TypeError - First parameter annotated with a non-
Settingstype →TypeError - Two factories returning the same type →
ValueError
See Share State Between Handlers for usage examples and ADR-039 for design rationale.
Domain-Event Reactors¶
@app.react registers a reactor function that the framework calls automatically
at execution boundaries when a state object has pending domain events.
@app.react(SharedState, drain=lambda s: s.registry.drain_events())
async def on_registry_events(
events: list[RegistryEvent], # reserved name — injected by framework
ctx: cosalette.DeviceContext,
store: DeviceStore,
state: SharedState,
) -> None:
for event in events:
await ctx.publish("registry/event", event.to_dict())
store["registry"] = state.registry.to_dict()
state_type — the @app.state-registered type to watch. Must be registered
before @app.react is called; otherwise ValueError is raised at decoration time.
drain= — optional callable (state_instance) -> Iterable | None. When
None, the framework calls state_instance.drain_events() structurally. If no
drain method exists, AttributeError is raised at runtime.
events parameter — reserved name. If the reactor function declares a
parameter named events, the framework injects the drained event list directly.
The events parameter is not resolved through type-based DI.
Reaction boundaries:
| Handler | When reactors fire |
|---|---|
@app.device |
After each yield and once at normal completion |
@app.stream |
After each item processed and once at handler exit |
@app.telemetry |
After each successful handler return |
@app.command |
After each successful handler return |
Reactors do not fire on cancellation or unhandled exceptions.
Registration-time validation:
state_typenot registered via@app.state→ValueError- Reactor function is not
async def→TypeError
See Share State Between Handlers for usage examples and ADR-043 for design rationale.
Periodic Background Tasks¶
@app.periodic registers a coroutine as a background task that runs on a fixed
interval with no MQTT output. It is the right primitive for side-effect work that runs
alongside devices: flushing write buffers, sending watchdog pings, synchronising LED
state, or warming caches.
import datetime
import cosalette
from cosalette import SettingRef
class AppSettings(cosalette.Settings):
watchdog_enabled: bool = True
led_interval: float = 5.0
app = cosalette.App(name="bridge", version="1.0.0")
@app.periodic("flush-buffer", interval=30.0) # (1)!
async def flush_buffer(cache: BufferCache) -> None:
await cache.flush()
@app.periodic(
"watchdog",
interval=datetime.timedelta(minutes=1), # (2)!
enabled=lambda s: s.watchdog_enabled, # (3)!
)
async def watchdog_ping(settings: AppSettings) -> None:
await ping_watchdog(settings.watchdog_url)
@app.periodic("led-sync", interval=SettingRef("led_interval")) # (4)!
async def led_sync(led: LedPort) -> None:
await led.sync_state()
@app.periodic( # (5)!
"poll-sensor",
interval=lambda s: s.sensor_poll_interval,
)
async def poll_sensor(settings: AppSettings) -> None:
await read_sensor(settings.sensor_url)
intervalas a plainfloat— simplest form; positive number of seconds between invocations.intervalasdatetime.timedelta— converted to seconds at registration time.enabledas a callable — evaluated at bootstrap with the resolvedSettingsinstance;Falsesilently skips registration entirely (ADR-038 deferred-enabled pattern).SettingRef("led_interval")— deferred resolution: the value ofAppSettings.led_intervalis read from settings at bootstrap, not at import time.intervalas aCallable[[Settings], float]— called once at bootstrap with the resolved settings; use when the interval depends on a computed expression or multiple settings fields.
DI injection: handlers may declare Settings subclasses, adapter ports registered
via app.adapter(), ClockPort, and objects registered by @app.state factories.
DeviceContext is not available (periodic tasks have no MQTT lifecycle).
Exception behaviour: asyncio.CancelledError propagates (clean shutdown). All
other exceptions are caught, logged at ERROR level, and the loop continues.
Lifecycle: periodic tasks are spawned as asyncio.Tasks during Phase 3 (Run) and
cancelled during Phase 4 (Teardown) with a 5-second grace period.
App.add_periodic(name, func, *, interval, enabled, init, summary, behavior)¶
Imperative equivalent of @app.periodic. Accepts enabled: bool only (not a
callable) — use inside @app.on_configure where settings are already resolved.
App.periodic_registrations¶
Sequence[_PeriodicRegistration] — read-only view of all registered periodic tasks.
Each entry exposes name, interval, func, and the injection plan.
AppHarness.tick_periodic(name)¶
Invoke one cycle of a named periodic handler synchronously, bypassing the interval sleep. This is the recommended way to test periodic handlers:
async def test_flush_writes_pending_data() -> None:
mock_buf = MockBufferPort()
harness = AppHarness.create()
harness.app.adapter(BufferPort, lambda: mock_buf)
await harness.tick_periodic("flush-buffer")
assert mock_buf.flush_called
The handler runs exactly once. No task is spawned; no sleep occurs.
AppHarness.create(..., run_periodic=False)¶
The run_periodic parameter on AppHarness.create() controls whether periodic tasks
are spawned during harness.run():
| Value | Effect |
|---|---|
False (default) |
Periodic tasks are not spawned — existing tests are unaffected |
True |
Periodic tasks are spawned as asyncio.Tasks for integration-level coverage |
Prefer tick_periodic() for unit-level testing of handler logic. Use
run_periodic=True only when you need to verify that a task actually fires during
the full application lifecycle.
See the Periodic Tasks guide for full usage examples and ADR-041 for design rationale.
MQTT¶
cosalette.MqttPort
¶
Bases: Protocol
Port contract for MQTT publish/subscribe.
Satisfies ADR-006 hexagonal architecture: all MQTT interaction goes through this protocol so adapters are swappable.
cosalette.MqttClient
dataclass
¶
MqttClient(
settings: MqttSettings, will: WillConfig | None = None
)
Production MQTT adapter backed by aiomqtt.
Uses a background task that maintains a persistent connection
with automatic reconnection. aiomqtt is imported lazily
inside _connection_loop() so the mock and null adapters work
without the dependency installed.
See Also
ADR-006 — Hexagonal architecture (lazy imports).
ADR-012 — LWT / availability via WillConfig.
publish
async
¶
publish(
topic: str,
payload: str | dict[str, Any],
*,
retain: bool = False,
qos: int = 1,
) -> None
Publish a message to the broker.
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the client is not connected. |
Source code in packages/src/cosalette/_mqtt/_client.py
subscribe
async
¶
Subscribe to topic.
The subscription is tracked internally so it can be restored after a reconnection.
Source code in packages/src/cosalette/_mqtt/_client.py
on_message
¶
on_message(callback: MessageCallback) -> None
start
async
¶
Start the background connection loop.
Source code in packages/src/cosalette/_mqtt/_client.py
stop
async
¶
Stop the connection loop and clean up.
Idempotent — safe to call multiple times.
Source code in packages/src/cosalette/_mqtt/_client.py
cosalette.MqttLifecycle
¶
Bases: Protocol
Lifecycle management for MQTT adapters.
Adapters that need explicit start/stop (e.g. connecting to a broker)
implement this protocol. Adapters like MockMqttClient and
NullMqttClient that need no lifecycle management simply omit these
methods — the framework detects their absence via isinstance.
See Also
ADR-006 — Interface Segregation: ports are narrow by design. PEP 544 — Structural subtyping (Protocols).
cosalette.MqttMessageHandler
¶
Bases: Protocol
Message dispatch capability for MQTT adapters.
Adapters that can receive inbound messages implement this protocol.
The framework calls on_message to wire up the topic router.
See Also
ADR-006 — Interface Segregation.
cosalette.MockMqttClient
dataclass
¶
MockMqttClient(
published: list[tuple[str, str, bool, int]] = list(),
subscriptions: list[str] = list(),
raise_on_publish: Exception | None = None,
)
In-memory test double that records MQTT interactions.
Records publishes and subscriptions for assertion. Supports
callback registration and simulated message delivery via
deliver().
publish
async
¶
publish(
topic: str,
payload: str | dict[str, Any],
*,
retain: bool = False,
qos: int = 1,
) -> None
Record a publish call, or raise if raise_on_publish is set.
Source code in packages/src/cosalette/_mqtt/__init__.py
subscribe
async
¶
on_message
¶
on_message(callback: MessageCallback) -> None
deliver
async
¶
reset
¶
Clear all recorded data, callbacks, and failure injection.
get_messages_for
¶
Return (payload, retain, qos) tuples for topic.
Source code in packages/src/cosalette/_mqtt/__init__.py
cosalette.NullMqttClient
dataclass
¶
Silent no-op MQTT adapter.
Every method is a no-op that logs at DEBUG level. Useful as a default when MQTT is not configured.
cosalette.WillConfig
dataclass
¶
Last-Will-and-Testament configuration.
Abstracts aiomqtt.Will so that callers never depend on the
aiomqtt package directly. The real client translates this into
the library-specific type inside _connection_loop().
See Also
ADR-012 — Health and availability reporting.
cosalette.MessageCallback
module-attribute
¶
Async callback receiving (topic, payload) for each inbound message.
Error Handling¶
cosalette.ErrorPayload
dataclass
¶
ErrorPayload(
error_type: str,
message: str,
device: str | None,
timestamp: str,
details: dict[str, object] = dict(),
)
Immutable structured error payload.
Represents a single error event ready for JSON serialisation and MQTT publication.
cosalette.ErrorPublisher
dataclass
¶
ErrorPublisher(
mqtt: MqttPort,
topic_prefix: str,
error_type_map: dict[type[Exception], str] = dict(),
clock: Callable[[], datetime] | None = None,
)
Publishes structured error payloads to MQTT.
Wraps :func:build_error_payload with fire-and-forget MQTT
publication. Errors during publication are logged but never
propagated — the main application loop must not crash because
an error report failed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mqtt
|
MqttPort
|
MQTT port used for publishing. |
required |
topic_prefix
|
str
|
Base prefix for error topics (e.g. |
required |
error_type_map
|
dict[type[Exception], str]
|
Pluggable mapping from consumer exception types to machine-readable type strings. |
dict()
|
clock
|
Callable[[], datetime] | None
|
Optional callable returning a :class: |
None
|
publish
async
¶
Build an error payload and publish it to MQTT.
Always publishes to {topic_prefix}/error. When device
is provided, also publishes to {topic_prefix}/{device}/error
(skipped for root devices, whose per-device topic would
duplicate the global topic).
The entire pipeline (build → serialise → publish) is wrapped in fire-and-forget semantics: failures at any stage are logged but never propagated to the caller.
Source code in packages/src/cosalette/_errors.py
cosalette.build_error_payload
¶
build_error_payload(
error: Exception,
*,
error_type_map: dict[type[Exception], str]
| None = None,
device: str | None = None,
details: dict[str, object] | None = None,
clock: Callable[[], datetime] | None = None,
) -> ErrorPayload
Convert an exception into a structured :class:ErrorPayload.
Looks up the exact class of the exception; subclasses are not matched.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error
|
Exception
|
The exception to convert. |
required |
error_type_map
|
dict[type[Exception], str] | None
|
Optional mapping from exception types to machine-readable
|
None
|
device
|
str | None
|
Optional device name to include in the payload. |
None
|
details
|
dict[str, object] | None
|
Optional dict of additional context to attach to the payload.
Defaults to an empty dict when |
None
|
clock
|
Callable[[], datetime] | None
|
Optional callable returning a :class: |
None
|
Returns:
| Type | Description |
|---|---|
ErrorPayload
|
A frozen dataclass ready for serialisation. |
Source code in packages/src/cosalette/_errors.py
Health and Availability¶
cosalette.DeviceStatus
dataclass
¶
Immutable status snapshot for a single device.
Used inside :class:HeartbeatPayload to report per-device health
in the heartbeat JSON.
cosalette.HeartbeatPayload
dataclass
¶
HeartbeatPayload(
status: str,
uptime_s: float,
version: str,
devices: dict[str, DeviceStatus] = dict(),
)
Immutable structured heartbeat payload.
Represents an app-level status snapshot ready for JSON serialisation and MQTT publication.
to_json
¶
Serialise to a JSON string.
Device entries are expanded to nested dicts via
:meth:DeviceStatus.to_dict.
Source code in packages/src/cosalette/_health/_reporter.py
cosalette.HealthReporter
dataclass
¶
Publishes app heartbeats and per-device availability to MQTT.
Manages device tracking, uptime calculation (via monotonic clock), and graceful shutdown. All publication is fire-and-forget — errors are logged but never propagated.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mqtt
|
MqttPort
|
MQTT port used for publishing. |
required |
topic_prefix
|
str
|
Base prefix for health topics (e.g. |
required |
version
|
str
|
Application version string included in heartbeats. |
required |
clock
|
ClockPort
|
Monotonic clock for uptime measurement (see :class: |
required |
__post_init__
¶
set_device_status
¶
Update or add a device's status in the internal tracker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
device
|
str
|
Device name (used in topic paths and heartbeat payload). |
required |
status
|
str
|
Free-form status string, defaults to |
'ok'
|
Source code in packages/src/cosalette/_health/_reporter.py
remove_device
¶
publish_device_available
async
¶
Publish "online" to the device availability topic.
For root devices (unnamed), publishes to {prefix}/availability
instead of {prefix}/{device}/availability.
Also registers the device as "ok" in internal tracking.
Source code in packages/src/cosalette/_health/_reporter.py
publish_device_unavailable
async
¶
Publish "offline" to the device availability topic.
For root devices (unnamed), publishes to {prefix}/availability
instead of {prefix}/{device}/availability.
Also removes the device from internal tracking.
Source code in packages/src/cosalette/_health/_reporter.py
publish_heartbeat
async
¶
Publish a structured JSON heartbeat to {prefix}/status.
The payload includes current uptime, version, and all tracked device statuses.
Source code in packages/src/cosalette/_health/_reporter.py
shutdown
async
¶
Gracefully shut down: publish "offline" for everything.
Publishes "offline" to each tracked device's availability
topic (using root topic for root devices), then publishes
"offline" to the app status topic, and clears internal
device tracking.
Source code in packages/src/cosalette/_health/_reporter.py
cosalette.build_will_config
¶
build_will_config(topic_prefix: str) -> WillConfig
Create a :class:WillConfig for the app's LWT.
The resulting config targets {topic_prefix}/status with payload
"offline", QoS 1, retained. Pass this to :class:MqttClient
so the broker publishes "offline" on unexpected disconnection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic_prefix
|
str
|
Application-level topic prefix (e.g. |
required |
Returns:
| Type | Description |
|---|---|
WillConfig
|
Pre-configured LWT for the app status topic. |
Source code in packages/src/cosalette/_health/_reporter.py
cosalette.HealthCheckable
¶
Bases: Protocol
Adapter health check protocol (ADR-028).
Adapters that implement this single-method protocol are periodically
probed by the framework. Return True when healthy, False
otherwise. The framework sets per-device availability accordingly.
cosalette.AdapterHealthStatus
dataclass
¶
AdapterHealthStatus(
healthy: bool = True,
consecutive_failures: int = 0,
last_check: float = 0.0,
restart_count: int = 0,
restart_exhausted: bool = False,
last_restart: float = 0.0,
last_healthy_since: float = 0.0,
)
Per-adapter health state snapshot for the health check runner.
Tracks whether an adapter is healthy, how many consecutive health check failures have occurred, and the monotonic timestamp of the last health check. Exposed for Epic 6 (auto-restart decisions).
Clock¶
cosalette.ClockPort
¶
Bases: Protocol
Monotonic clock for timing measurements.
Used by device controllers and timing-sensitive components to measure elapsed time without being affected by system clock adjustments (NTP, manual changes, etc.).
The default implementation wraps time.monotonic(). Tests
inject a deterministic fake clock for reproducible timing.
now
¶
Return monotonic time in seconds.
Returns:
| Type | Description |
|---|---|
float
|
A float representing seconds from an arbitrary epoch. |
float
|
Only the difference between two calls is meaningful. |
sleep
async
¶
Sleep for seconds.
Used by :meth:DeviceContext.sleep for shutdown-aware sleeping.
Production implementations delegate to asyncio.sleep; test
doubles may advance virtual time instead.
Source code in packages/src/cosalette/_clock.py
cosalette.SystemClock
¶
Production clock wrapping time.monotonic().
Satisfies :class:ClockPort via structural subtyping.
Usage::
clock = SystemClock()
start = clock.now()
# ... some work ...
elapsed = clock.now() - start
now
¶
Logging¶
cosalette.JsonFormatter
¶
Bases: Formatter
Emit log records as single-line JSON objects (NDJSON).
Each record produces a JSON object with these fields:
timestamp— ISO 8601 with timezone (always UTC)level— Python log level namelogger— dotted logger namemessage— the formatted log messageservice— application name for log correlationversion— application version (omitted when empty)exception— formatted traceback (only present when an exception is logged)stack_info— stack trace (only present whenstack_info=True)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service
|
str
|
Application name included in every log line. |
''
|
version
|
str
|
Application version string. Omitted from output when empty. |
''
|
Source code in packages/src/cosalette/_logging.py
format
¶
Format a log record as a single-line JSON string.
Overrides :meth:logging.Formatter.format. The returned
string contains no embedded newlines (tracebacks are
escaped by the JSON serialiser), so each call produces exactly
one log line — critical for container log drivers that
split on \n.
Source code in packages/src/cosalette/_logging.py
cosalette.configure_logging
¶
configure_logging(
settings: LoggingSettings,
*,
service: str,
version: str = "",
) -> None
Configure the root logger from settings.
Clears any existing handlers on the root logger, then installs fresh handlers according to settings.
A :class:logging.StreamHandler writing to stderr is
always installed. When settings.file is set, a
:class:~logging.handlers.RotatingFileHandler is added as
well.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
LoggingSettings
|
Logging configuration (level, format, file). |
required |
service
|
str
|
Application name passed to :class: |
required |
version
|
str
|
Application version passed to
:class: |
''
|
Source code in packages/src/cosalette/_logging.py
Settings¶
cosalette.Settings
¶
Bases: BaseSettings
Root framework settings for cosalette applications.
Loaded from environment variables with the nested delimiter
__ and an optional .env file in the working directory.
No env_prefix is set at the framework level — each
application subclasses Settings and adds its own prefix
(e.g. env_prefix="MYAPP_").
Example .env::
MQTT__HOST=broker.local
MQTT__PORT=1883
MQTT__USERNAME=user
MQTT__PASSWORD=secret
LOGGING__LEVEL=DEBUG
LOGGING__FORMAT=text
Example with an application prefix (subclass)::
class MyAppSettings(Settings):
model_config = SettingsConfigDict(
env_prefix="MYAPP_",
env_nested_delimiter="__",
env_file=".env",
env_file_encoding="utf-8",
)
model_config
class-attribute
instance-attribute
¶
model_config = SettingsConfigDict(
env_nested_delimiter="__",
env_file=".env",
env_file_encoding="utf-8",
extra="ignore",
)
Settings uses extra="ignore" because the base class sets
no env_prefix. Without a prefix, pydantic-settings reads
every environment variable; the BaseSettings default of
extra="forbid" would then reject unrelated variables
(GH_TOKEN, PATH, etc.) as validation errors.
Subclasses that set env_prefix only see prefixed variables
and may safely tighten this to extra="forbid" for strict
validation.
cosalette.MqttSettings
¶
Bases: BaseModel
MQTT broker connection and topic configuration.
Environment variables (with __ nesting)::
MQTT__HOST=broker.local
MQTT__PORT=1883
MQTT__USERNAME=user
MQTT__PASSWORD=secret
MQTT__TOPIC_PREFIX=myapp
cosalette.LoggingSettings
¶
Bases: BaseModel
Logging configuration.
When file is set, logs are also written to a rotating file
(size-based rotation, backup_count generations kept). When
None, logs go to stderr only.
The format field selects the output format:
"json"(default) — structured JSON lines for container log aggregators (Loki, Elasticsearch, CloudWatch). Each line is a complete JSON object with correlation metadata."text"— human-readable timestamped format for local development and direct terminal use.
Adapter Lifecycle¶
Adapters registered via app.adapter() that implement the async context manager
protocol (__aenter__/__aexit__) are automatically managed by the framework:
- Entered during startup, before the
lifespan=hook runs - Exited during shutdown, after the
lifespan=hook exits - Managed via
contextlib.AsyncExitStackfor LIFO ordering and exception safety - Adapters without
__aenter__/__aexit__pass through unchanged
The detection is duck-typed — any object with both __aenter__ and __aexit__
attributes qualifies. No base class or registration is needed.
See ADR-016 for the design rationale and Adapter Lifecycle Management for usage examples.
Streaming¶
StreamablePort[T_co] and Stream[T] are
the push-to-pull bridge for hardware devices that deliver data via callbacks
rather than polling. All lifecycle methods (open, close, start_scan, stop_scan)
are coroutines awaited by the stream runner.
See Streaming for a full explanation and
ADR-042 for design rationale.
cosalette.StreamablePort
¶
Bases: Protocol
Contract for hardware ports that push data via callbacks.
Implementers open a connection, optionally start and stop a hardware scan (e.g. BLE discovery, USB enumeration), and let callers register a callback that fires for every inbound datum.
Lifecycle::
await port.open()
port.register_callback(stream.put)
await port.start_scan()
...
await port.stop_scan()
await port.close()
T_co is the type of item produced by the port (covariant: a port
of Sensor satisfies StreamablePort[BaseSensor]).
open
async
¶
close
async
¶
start_scan
async
¶
stop_scan
async
¶
register_callback
¶
Register cb to be called for each inbound datum.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cb
|
Callable[[T_co], None]
|
Sync callable invoked with each item. The callback must
not block; hardware callbacks are inherently synchronous.
Use :class: |
required |
Source code in packages/src/cosalette/_runners/_stream_types.py
cosalette.Stream
¶
Stream(
*,
maxsize: int = 0,
backpressure: BackpressurePolicy = "raise",
thread_safe: bool = False,
)
Async iterator backed by a push-callback bridge.
Bridges hardware callbacks (sync :meth:put) into async for
loops. Shutdown is signalled once via :meth:shutdown;
__anext__ then raises :exc:StopAsyncIteration and all further
iteration stops. Shutdown is immediate — items still in the
queue at shutdown are discarded, not drained.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
maxsize
|
int
|
Maximum number of items buffered before :meth: |
0
|
backpressure
|
BackpressurePolicy
|
Policy applied when |
'raise'
|
thread_safe
|
bool
|
If |
False
|
The iterator uses a sentinel-value pattern: :meth:shutdown enqueues
a module-level _SENTINEL object into the queue, so a waiting
__anext__ wakes immediately without creating extra tasks or sets.
Typical usage::
stream: Stream[SensorReading] = Stream()
port.register_callback(stream.put)
port.open()
port.start_scan()
async for reading in stream:
... # process each pushed item
Source code in packages/src/cosalette/_runners/_stream_types.py
put
¶
Push item onto the queue (sync, never blocks).
When thread_safe=True was passed at construction, this method is safe to call from any OS thread. Otherwise it must be called from the event-loop thread. For off-loop use without thread_safe::
loop.call_soon_threadsafe(stream.put, item)
The backpressure policy takes effect when maxsize > 0 and
the queue is full:
"raise"— raises :exc:asyncio.QueueFull(sync mode) or surfaces the exception on the event-loop thread (thread-safe mode)."drop_newest"— the incoming item is discarded; a DEBUG log is emitted."drop_oldest"— the oldest queued item is evicted and item is enqueued; a DEBUG log is emitted.
When maxsize=0 (unbounded) the policy is never evaluated.
Raises:
| Type | Description |
|---|---|
QueueFull
|
When maxsize > 0, the queue is full,
and backpressure is |
Source code in packages/src/cosalette/_runners/_stream_types.py
shutdown
¶
Signal the iterator to stop.
Idempotent. Once set, __anext__ raises
:exc:StopAsyncIteration on the next call. Any items still in
the queue are discarded — shutdown is immediate, not draining.
Must be called from the event-loop thread.
Source code in packages/src/cosalette/_runners/_stream_types.py
Publish Strategies¶
cosalette.PublishStrategy
¶
Bases: Protocol
Publish-decision contract for the device loop.
The framework calls _bind before the loop to inject the clock,
should_publish each iteration, and on_published after a
successful publish to let the strategy reset internal state
(counters, timestamps, etc.).
should_publish
¶
Decide whether the current reading should be published.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
current
|
dict[str, object]
|
The latest telemetry payload. |
required |
previous
|
dict[str, object] | None
|
The last published payload, or |
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
Source code in packages/src/cosalette/_strategies/_base.py
cosalette.Every
¶
Bases: _StrategyBase
Time-based or count-based publish throttle.
Exactly one of seconds or n must be provided.
Every(seconds=30)
Publish at most once every 30 seconds. Requires a
:class:ClockPort injected via _bind(). Before binding,
should_publish always returns True (safe fallback).
Every(n=5)
Publish every 5th reading. No clock dependency.
Raises:
| Type | Description |
|---|---|
ValueError
|
If both, neither, or non-positive values are given. |
Source code in packages/src/cosalette/_strategies/_every.py
cosalette.OnChange
¶
Bases: _StrategyBase
Publish when the telemetry payload changes.
With threshold=None (default), uses exact equality
(current != previous).
When threshold is a float, it acts as a global numeric
dead-band: a leaf field must change by more than threshold
(strict >) to trigger a publish. Non-numeric fields fall
back to !=.
When threshold is a dict[str, float], each key names a
leaf field with its own dead-band. Use dot-notation for
nested fields (e.g. {"sensor.temp": 0.5}). Fields not
listed in the dict use exact equality.
Thresholds are applied to leaf values only. Nested dicts
are traversed recursively — {"sensor": {"temp": 22.5}}
compares temp numerically, not the intermediate sensor
dict as a whole.
In both threshold modes, structural changes (added or removed keys at any nesting level) always trigger a publish, and fields are combined with OR semantics — any single leaf field exceeding its threshold is sufficient.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
threshold
|
float | dict[str, float] | None
|
Optional dead-band for numeric change detection.
|
None
|
Source code in packages/src/cosalette/_strategies/_onchange.py
should_publish
¶
Return True when the payload differs from the last publish.
When a threshold is configured, numeric fields are compared
using abs(current - previous) > threshold (strict
inequality). Non-numeric fields and structural changes always
use exact equality.
Source code in packages/src/cosalette/_strategies/_onchange.py
Composite Strategies¶
cosalette.AllStrategy
¶
Bases: _StrategyBase
AND-composite: publishes only if all children say yes.
Nested AllStrategy instances are automatically flattened::
AllStrategy(AllStrategy(a, b), c) → AllStrategy(a, b, c)
Source code in packages/src/cosalette/_strategies/_composite.py
should_publish
¶
Return True only if all children return True.
All children are evaluated eagerly (no short-circuit) so that
stateful strategies like Every(n=N) always advance their
internal counters.
Source code in packages/src/cosalette/_strategies/_composite.py
cosalette.AnyStrategy
¶
Bases: _StrategyBase
OR-composite: publishes if any child says yes.
Nested AnyStrategy instances are automatically flattened::
AnyStrategy(AnyStrategy(a, b), c) → AnyStrategy(a, b, c)
Source code in packages/src/cosalette/_strategies/_composite.py
should_publish
¶
Return True if any child returns True.
All children are evaluated eagerly (no short-circuit) so that
stateful strategies like Every(n=N) always advance their
internal counters.
Source code in packages/src/cosalette/_strategies/_composite.py
Introspection¶
cosalette.build_registry_snapshot
¶
build_registry_snapshot(app: App) -> dict[str, Any]
Build a JSON-serializable snapshot of all app registrations.
Produces a dict describing the app metadata, devices, telemetry,
commands, and adapters — suitable for json.dumps() without
custom encoders.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
app
|
App
|
The cosalette :class: |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
A plain dict with string keys and JSON-serializable values. |
Source code in packages/src/cosalette/_mcp/_introspect.py
cosalette.format_registry_json
¶
Return the registry snapshot as indented JSON.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
snapshot
|
dict[str, Any]
|
Dict returned by :func: |
required |
Returns:
| Type | Description |
|---|---|
str
|
A pretty-printed JSON string. |
Source code in packages/src/cosalette/_mcp/_introspect.py
cosalette.format_registry_table
¶
Return the registry snapshot as a human-readable plain-text table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
snapshot
|
dict[str, Any]
|
Dict returned by :func: |
required |
Returns:
| Type | Description |
|---|---|
str
|
A multi-line string with aligned columns per section. |
Source code in packages/src/cosalette/_mcp/_introspect.py
Retry / Backoff¶
cosalette.BackoffStrategy
¶
Bases: Protocol
Backoff-delay contract for telemetry retry.
The framework calls delay(attempt) between retry attempts.
Attempt numbers are 1-based.
cosalette.ExponentialBackoff
¶
cosalette.LinearBackoff
¶
cosalette.FixedBackoff
¶
cosalette.CircuitBreaker
¶
Optional circuit breaker for telemetry retry.
Tracks consecutive failed cycles (where all retries were exhausted).
After threshold consecutive failures, the circuit opens and
the handler is skipped until a half-open probe succeeds.
Source code in packages/src/cosalette/_retry.py
record_success
¶
record_failure
¶
Record a cycle where all retries were exhausted.
should_attempt
¶
Return True if the handler should execute this cycle.
- closed: always True
- open: skip this cycle, transition to half-open for next cycle
- half-open: True (probe attempt)
Source code in packages/src/cosalette/_retry.py
Triggerable Telemetry¶
cosalette.TriggerPayload
dataclass
¶
TriggerPayload(
is_triggered: bool = False,
raw: str | None = None,
data: dict[str, Any] | None = None,
)
Trigger context for triggerable telemetry handlers.
Injected via DI when a handler declares a TriggerPayload
parameter. On scheduled runs, is_triggered is False
and raw/data are None. On MQTT-triggered runs,
is_triggered is True and the MQTT payload is available.
Examples:
Simple check::
@app.telemetry("sensor", interval=60, triggerable=True)
async def read_sensor(
adapter: SensorPort,
trigger: TriggerPayload,
) -> dict[str, object]:
days = trigger.get("days", 7) if trigger.is_triggered else 7
return await adapter.read(days=days)
get
¶
Extract a key from parsed JSON data, with fallback.
Returns default when not triggered, when payload was not valid JSON, or when key is absent.
Source code in packages/src/cosalette/_runners/_trigger.py
scheduled
classmethod
¶
scheduled() -> TriggerPayload
from_mqtt
classmethod
¶
from_mqtt(payload: str) -> TriggerPayload
Create a triggered instance from an MQTT payload string.
JSON parsing is best-effort — if payload is not valid JSON,
data is None but raw is still set.
Source code in packages/src/cosalette/_runners/_trigger.py
Filters¶
cosalette.Filter
¶
Bases: Protocol
Signal filter contract.
All filters follow the update → value pattern:
- Call
update(raw)with each new measurement. - The return value is the filtered output.
- Access
valuefor the current filtered state. - Call
reset()to clear internal state.
The first update() call seeds the filter — it returns the raw
value unchanged (no history to smooth against).
update
¶
cosalette.Pt1Filter
¶
First-order low-pass (PT1) filter — Rust drop-in for the Python implementation.
__doc__
class-attribute
¶
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.
__module__
class-attribute
¶
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.
__new__
builtin
¶
Create and return a new object. See help(type) for accurate signature.
cosalette.MedianFilter
¶
Sliding-window median filter — Rust drop-in for the Python implementation.
__doc__
class-attribute
¶
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.
__module__
class-attribute
¶
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.
__new__
builtin
¶
Create and return a new object. See help(type) for accurate signature.
cosalette.OneEuroFilter
¶
OneEuroFilter(
min_cutoff: float = 1.0,
beta: float = 0.0,
d_cutoff: float = 1.0,
dt: float = 1.0,
)
Adaptive low-pass filter (1€ Filter) — Rust drop-in for the Python implementation.
__doc__
class-attribute
¶
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.
__module__
class-attribute
¶
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.
__new__
builtin
¶
Create and return a new object. See help(type) for accurate signature.
Persistence¶
cosalette.PersistPolicy
¶
Bases: Protocol
Save-timing contract for the persistence system.
The framework calls should_save after each telemetry cycle
to decide whether to persist the :class:DeviceStore immediately.
This is simpler than :class:PublishStrategy — no clock binding
or post-publish callback is needed.
should_save
¶
should_save(store: DeviceStore, published: bool) -> bool
Decide whether to save right now.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
store
|
DeviceStore
|
The :class: |
required |
published
|
bool
|
|
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
Source code in packages/src/cosalette/_persistence/_persist.py
cosalette.SaveOnPublish
¶
Bases: _SavePolicyBase
Save after each successful MQTT publish.
This is the most common policy — the store is persisted whenever new data is published to MQTT, ensuring the persisted state matches what's been broadcast.
should_save
¶
should_save(store: DeviceStore, published: bool) -> bool
cosalette.SaveOnChange
¶
Bases: _SavePolicyBase
Save whenever the store has been modified (dirty).
Saves on every handler cycle where the store was mutated, regardless of whether MQTT publishing occurred. Most aggressive policy — ensures minimal data loss on crash.
should_save
¶
should_save(store: DeviceStore, published: bool) -> bool
Return True when the store has uncommitted changes.
cosalette.SaveOnShutdown
¶
Bases: _SavePolicyBase
Save only on graceful shutdown.
The lightest I/O policy — no saves during normal operation. Data accumulated during a session is only persisted when the app shuts down cleanly. Risk: data loss on hard crash/power loss.
Note: The framework always saves on shutdown regardless of policy, so this policy effectively means "never save during the loop".
should_save
¶
should_save(store: DeviceStore, published: bool) -> bool
Always return False — framework handles shutdown save.
cosalette.AllSavePolicy
¶
Bases: _SavePolicyBase
AND-composite: save only if all children agree.
Nested AllSavePolicy instances are automatically flattened::
AllSavePolicy(AllSavePolicy(a, b), c) → AllSavePolicy(a, b, c)
Source code in packages/src/cosalette/_persistence/_persist.py
should_save
¶
should_save(store: DeviceStore, published: bool) -> bool
Return True only if all children return True.
cosalette.AnySavePolicy
¶
Bases: _SavePolicyBase
OR-composite: save if any child says yes.
Nested AnySavePolicy instances are automatically flattened::
AnySavePolicy(AnySavePolicy(a, b), c) → AnySavePolicy(a, b, c)
Source code in packages/src/cosalette/_persistence/_persist.py
should_save
¶
should_save(store: DeviceStore, published: bool) -> bool
Return True if any child returns True.
Stores¶
cosalette.Store
¶
Bases: Protocol
Key-value persistence for device state.
Each key maps to a JSON-serializable dict. Implementations
must handle missing keys (return None) and create storage
locations as needed.
load
¶
cosalette.DeviceStore
¶
DeviceStore(backend: Store, key: str)
Per-device scoped store with dirty tracking.
Wraps a :class:Store backend, automatically keyed by device name.
Behaves like a dict (MutableMapping interface) — handlers read
and write state naturally via store["key"] = value.
The framework creates one DeviceStore per device and manages
its lifecycle:
load()— called before the first handler invocation.- Handler reads/writes via dict-like access.
save()— always called on shutdown (safety net).
Dirty tracking: after load or save, the store is
"clean". Any __setitem__ or __delitem__ marks it "dirty".
For nested mutations the store cannot detect automatically, call
:meth:mark_dirty explicitly.
Source code in packages/src/cosalette/_persistence/_stores.py
load
¶
Load state from backend. Called by framework before first use.
Source code in packages/src/cosalette/_persistence/_stores.py
save
¶
mark_dirty
¶
Explicitly mark the store as dirty.
Use when mutating nested structures that __setitem__
can't detect (e.g. store["list"].append(x)).
get
¶
Return the value for key, or default if not present.
setdefault
¶
Return self[key] if present, else set and return default.
Source code in packages/src/cosalette/_persistence/_stores.py
update
¶
Update the store from a dict and/or keyword arguments.
Source code in packages/src/cosalette/_persistence/_stores.py
to_dict
¶
Return a shallow copy of the underlying data dict.
Useful when returning state from a telemetry handler (the handler returns this dict for MQTT publishing).
Source code in packages/src/cosalette/_persistence/_stores.py
keys
¶
values
¶
cosalette.NullStore
¶
No-op store — load always returns None, save is silent.
Use when persistence is disabled or for dry-run modes.
load
¶
cosalette.MemoryStore
¶
In-memory store backed by a plain dict.
Both load and save deep-copy data so that callers cannot
mutate internal state by accident. Designed for tests — mirrors
the FakeStorage pattern from gas2mqtt.
Parameters¶
initial: Optional seed data. The mapping is deep-copied on construction.
Source code in packages/src/cosalette/_persistence/_stores.py
load
¶
Return a deep copy of the stored dict, or None.
cosalette.JsonFileStore
¶
Single-file JSON store with atomic writes.
All keys live as top-level keys in one JSON object. Writes use a write-to-temp + os.replace pattern for atomicity so that a crash mid-write never corrupts the file.
Parameters¶
path:
Path to the JSON file. Parent directories are created
automatically on the first save.
Source code in packages/src/cosalette/_persistence/_stores.py
load
¶
Load a key from the JSON file.
Returns None when the file does not exist, the key is
missing, or the file contains invalid JSON (a warning is
logged in the latter case).
Source code in packages/src/cosalette/_persistence/_stores.py
save
¶
Persist data under key using an atomic write.
The full JSON object is read (if it exists), the key is updated, and the result is written to a temporary file before being atomically moved into place.
Source code in packages/src/cosalette/_persistence/_stores.py
cosalette.SqliteStore
¶
SQLite-backed store using WAL mode for power-loss resistance.
Each key is a row in a store table; the value is stored as a
JSON text column. The table is auto-created on first use.
Parameters¶
path: Path to the SQLite database file. Parent directories are created automatically.