Skip to content

API Reference

Complete reference for all public classes, functions, and protocols exported by cosalette.

Application

cosalette.App

App(
    name: str,
    version: str = "0.0.0",
    *,
    description: str = "IoT-to-MQTT bridge",
    settings_class: type[Settings] = Settings,
    dry_run: bool = False,
    heartbeat_interval: float | None = 60.0,
    health_check_interval: float | None = 30.0,
    lifespan: LifespanFunc | None = None,
    store: Store | None = None,
    adapters: dict[
        type,
        type
        | str
        | Callable[..., object]
        | tuple[
            type | str | Callable[..., object],
            type | str | Callable[..., object],
        ],
    ]
    | None = None,
    restart_after_failures: int = 5,
    max_restarts: int = 3,
    restart_cooldown: float = 5.0,
    sustained_health_reset: float = 300.0,
)

Central composition root and application orchestrator.

Collects device registrations, adapter mappings, and an optional lifespan context manager, then runs the full async lifecycle in :meth:run.

See Also

ADR-001 — Framework architecture (IoC, composition root).

Initialise the application orchestrator.

Parameters:

Name Type Description Default
name str

Application name (used as MQTT topic prefix and client ID).

required
version str

Application version string.

'0.0.0'
description str

Short description for CLI help text.

'IoT-to-MQTT bridge'
settings_class type[Settings]

Settings subclass to instantiate at startup.

Settings
dry_run bool

When True, resolve dry-run adapter variants.

False
heartbeat_interval float | None

Seconds between periodic heartbeats published to {prefix}/status. Set to None to disable periodic heartbeats entirely. Defaults to 60.

60.0
health_check_interval float | None

Seconds between periodic health checks for adapters implementing :class:~cosalette.HealthCheckable. Set to None to disable health checks entirely. Defaults to 30.

30.0
lifespan LifespanFunc | None

Async context manager for application startup and shutdown. Code before yield runs before devices start; code after yield runs after devices stop. Receives an :class:AppContext. When None, a no-op default is used.

None
store Store | None

Optional :class:Store backend for device persistence. When set, the framework creates a :class:DeviceStore per device and injects it into handlers that declare a DeviceStore parameter.

None
adapters dict[type, type | str | Callable[..., object] | tuple[type | str | Callable[..., object], type | str | Callable[..., object]]] | None

Optional mapping of port types to adapter implementations. Each key is a Protocol type; each value is either a single implementation (class, lazy-import string, or factory callable) or a (impl, dry_run) tuple. Entries are registered via :meth:adapter and coexist with later imperative calls.

None
Source code in packages/src/cosalette/_app.py
def __init__(
    self,
    name: str,
    version: str = "0.0.0",
    *,
    description: str = "IoT-to-MQTT bridge",
    settings_class: type[Settings] = Settings,
    dry_run: bool = False,
    heartbeat_interval: float | None = 60.0,
    health_check_interval: float | None = 30.0,
    lifespan: LifespanFunc | None = None,
    store: Store | None = None,
    adapters: dict[
        type,
        type
        | str
        | Callable[..., object]
        | tuple[
            type | str | Callable[..., object],
            type | str | Callable[..., object],
        ],
    ]
    | None = None,
    restart_after_failures: int = 5,
    max_restarts: int = 3,
    restart_cooldown: float = 5.0,
    sustained_health_reset: float = 300.0,
) -> None:
    """Initialise the application orchestrator.

    Args:
        name: Application name (used as MQTT topic prefix and client ID).
        version: Application version string.
        description: Short description for CLI help text.
        settings_class: Settings subclass to instantiate at startup.
        dry_run: When True, resolve dry-run adapter variants.
        heartbeat_interval: Seconds between periodic heartbeats
            published to ``{prefix}/status``.  Set to ``None`` to
            disable periodic heartbeats entirely.  Defaults to 60.
        health_check_interval: Seconds between periodic health
            checks for adapters implementing
            :class:`~cosalette.HealthCheckable`.  Set to ``None`` to
            disable health checks entirely.  Defaults to 30.
        lifespan: Async context manager for application startup
            and shutdown.  Code before ``yield`` runs before devices
            start; code after ``yield`` runs after devices stop.
            Receives an :class:`AppContext`.  When ``None``, a no-op
            default is used.
        store: Optional :class:`Store` backend for device persistence.
            When set, the framework creates a :class:`DeviceStore`
            per device and injects it into handlers that declare a
            ``DeviceStore`` parameter.
        adapters: Optional mapping of port types to adapter
            implementations.  Each key is a Protocol type; each
            value is either a single implementation (class,
            lazy-import string, or factory callable) or a
            ``(impl, dry_run)`` tuple.  Entries are registered via
            :meth:`adapter` and coexist with later imperative calls.
    """
    self._name = name
    self._version = version
    self._description = description
    self._settings_class = settings_class
    try:
        self._settings: Settings | None = settings_class()
    except ValidationError:
        self._settings = None
    self._dry_run = dry_run
    _validate_positive_interval("heartbeat_interval", heartbeat_interval)
    self._heartbeat_interval = heartbeat_interval
    _validate_positive_interval("health_check_interval", health_check_interval)
    self._health_check_interval = health_check_interval
    if restart_after_failures < 0:
        msg = f"restart_after_failures must be >= 0, got {restart_after_failures}"
        raise ValueError(msg)
    self._restart_after_failures = restart_after_failures
    if max_restarts < 0:
        msg = f"max_restarts must be >= 0, got {max_restarts}"
        raise ValueError(msg)
    self._max_restarts = max_restarts
    _validate_positive_interval("restart_cooldown", restart_cooldown)
    self._restart_cooldown = restart_cooldown
    _validate_positive_interval("sustained_health_reset", sustained_health_reset)
    self._sustained_health_reset = sustained_health_reset
    self._lifespan: LifespanFunc = (
        lifespan if lifespan is not None else _noop_lifespan
    )
    self._devices: list[_DeviceRegistration] = []
    self._telemetry: list[_TelemetryRegistration] = []
    self._commands: list[_CommandRegistration] = []
    self._adapters: dict[type, _AdapterEntry] = {}
    self._store = store
    self._configure_hooks: list[Callable[..., Any]] = []

    if adapters is not None:
        for port_type, value in adapters.items():
            if isinstance(value, tuple):
                if len(value) != 2:  # noqa: PLR2004
                    msg = (
                        f"adapters value for {port_type!r} must be an impl "
                        f"or (impl, dry_run) 2-tuple, got {len(value)}-tuple"
                    )
                    raise ValueError(msg)
                impl, dry_run_impl = value
                self.adapter(port_type, impl, dry_run=dry_run_impl)
            else:
                self.adapter(port_type, value)

settings property

settings: Settings

Application settings, instantiated at construction time.

The instance is created eagerly in __init__ from the settings_class parameter. Environment variables and .env files are read at that point, so decorator arguments like interval=app.settings.poll_interval reflect the actual runtime configuration.

The CLI entrypoint (:meth:cli) re-instantiates settings with --env-file support and passes the result to :meth:_run_async, which takes precedence over this instance.

Raises:

Type Description
RuntimeError

If the settings class could not be instantiated at construction time (e.g. required fields with no defaults and no matching environment variables). Use app.cli() with --env-file instead.

on_configure

on_configure(
    func: Callable[..., Any],
) -> Callable[..., Any]

Register a configuration hook called before devices start.

The hook runs after settings and adapters are resolved but before the run-loop. Parameters are injected by type annotation (Settings, adapter ports, Logger, ClockPort).

Use @app.on_configure (no parentheses).

See Also

ADR-023 — on_configure lifecycle phase.

Source code in packages/src/cosalette/_app.py
def on_configure(self, func: Callable[..., Any]) -> Callable[..., Any]:
    """Register a configuration hook called before devices start.

    The hook runs after settings and adapters are resolved but
    before the run-loop.  Parameters are injected by type
    annotation (Settings, adapter ports, Logger, ClockPort).

    Use ``@app.on_configure`` (no parentheses).

    See Also:
        ADR-023 — on_configure lifecycle phase.
    """
    self._configure_hooks.append(func)
    return func

device

device(
    name: str | None = None,
    *,
    init: Callable[..., Any] | None = None,
    enabled: bool = True,
) -> Callable[..., Any]

Register a command & control device.

The decorated function runs as a concurrent asyncio task. Parameters are injected based on type annotations — declare only what you need (e.g. ctx: DeviceContext, settings: Settings, logger: logging.Logger). Zero-parameter handlers are valid.

The framework subscribes to {name}/set and routes commands to the handler registered via ctx.on_command.

When name is None, the function name is used internally and the device publishes to root-level topics ({prefix}/state instead of {prefix}/{device}/state).

Parameters:

Name Type Description Default
name str | None

Device name for MQTT topics and logging. When None, the function name is used internally and topics omit the device segment.

None
init Callable[..., Any] | None

Optional synchronous factory called once before the handler loop. Its return value is injected into the handler by type.

None
enabled bool

When False, registration is silently skipped. The decorator returns the original function unmodified and no name slot is reserved. Defaults to True.

True

Raises:

Type Description
ValueError

If a device with this name is already registered.

ValueError

If a second root (unnamed) device is registered.

TypeError

If any handler parameter lacks a type annotation.

Source code in packages/src/cosalette/_app.py
def device(
    self,
    name: str | None = None,
    *,
    init: Callable[..., Any] | None = None,
    enabled: bool = True,
) -> Callable[..., Any]:
    """Register a command & control device.

    The decorated function runs as a concurrent asyncio task.
    Parameters are injected based on type annotations — declare
    only what you need (e.g. ``ctx: DeviceContext``,
    ``settings: Settings``, ``logger: logging.Logger``).
    Zero-parameter handlers are valid.

    The framework subscribes to ``{name}/set`` and routes commands
    to the handler registered via ``ctx.on_command``.

    When *name* is ``None``, the function name is used internally
    and the device publishes to root-level topics (``{prefix}/state``
    instead of ``{prefix}/{device}/state``).

    Args:
        name: Device name for MQTT topics and logging.  When
            ``None``, the function name is used internally and
            topics omit the device segment.
        init: Optional synchronous factory called once before the
            handler loop.  Its return value is injected into
            the handler by type.
        enabled: When ``False``, registration is silently skipped.
            The decorator returns the original function unmodified
            and no name slot is reserved.  Defaults to ``True``.

    Raises:
        ValueError: If a device with this name is already registered.
        ValueError: If a second root (unnamed) device is registered.
        TypeError: If any handler parameter lacks a type annotation.
    """
    if callable(name) and asyncio.iscoroutinefunction(name):
        raise TypeError("Use @app.device(), not @app.device (parentheses required)")

    def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
        if callable(name):
            self.add_device(name, func, init=init, enabled=enabled, is_root=False)
        else:
            resolved_name = name if name is not None else func.__name__
            self.add_device(
                resolved_name,
                func,
                init=init,
                enabled=enabled,
                is_root=name is None,
            )
        return func

    return decorator

add_device

add_device(
    name: str | Callable[..., Any],
    func: Callable[..., Awaitable[None]],
    *,
    init: Callable[..., Any] | None = None,
    enabled: bool = True,
    is_root: bool = False,
) -> None

Register a command & control device imperatively.

This is the imperative counterpart to :meth:device. It always creates a named (non-root) registration by default.

Parameters:

Name Type Description Default
name str | Callable[..., Any]

Device name for MQTT topics and logging.

required
func Callable[..., Awaitable[None]]

Async callable that implements the device loop.

required
init Callable[..., Any] | None

Optional synchronous factory called once before the handler loop. Its return value is injected into func by type.

None
enabled bool

When False, registration is silently skipped — no entry in the registry and no name slot reserved. Defaults to True.

True
is_root bool

When True, the device publishes to root-level topics ({prefix}/state instead of {prefix}/{name}/state). Defaults to False.

False

Raises:

Type Description
ValueError

If a device with this name is already registered.

TypeError

If init is async or has un-annotated parameters.

TypeError

If func has un-annotated parameters.

See Also

:meth:device — decorator equivalent.

Source code in packages/src/cosalette/_app.py
def add_device(
    self,
    name: str | Callable[..., Any],
    func: Callable[..., Awaitable[None]],
    *,
    init: Callable[..., Any] | None = None,
    enabled: bool = True,
    is_root: bool = False,
) -> None:
    """Register a command & control device imperatively.

    This is the imperative counterpart to :meth:`device`.  It
    always creates a *named* (non-root) registration by default.

    Args:
        name: Device name for MQTT topics and logging.
        func: Async callable that implements the device loop.
        init: Optional synchronous factory called once before the
            handler loop.  Its return value is injected into
            *func* by type.
        enabled: When ``False``, registration is silently skipped
            — no entry in the registry and no name slot reserved.
            Defaults to ``True``.
        is_root: When ``True``, the device publishes to root-level
            topics (``{prefix}/state`` instead of
            ``{prefix}/{name}/state``).  Defaults to ``False``.

    Raises:
        ValueError: If a device with this name is already registered.
        TypeError: If *init* is async or has un-annotated parameters.
        TypeError: If *func* has un-annotated parameters.

    See Also:
        :meth:`device` — decorator equivalent.
    """
    if not enabled:
        return
    if init is not None:
        _validate_init(init)
    init_plan = build_injection_plan(init) if init is not None else None
    if not callable(name):
        check_device_name(
            name,
            registry_type="device",
            is_root=is_root,
            devices=self._devices,
            telemetry=self._telemetry,
            commands=self._commands,
        )
    plan = build_injection_plan(func)
    if callable(name):
        self._devices.append(
            _DeviceRegistration(
                name=func.__qualname__,
                func=func,
                injection_plan=plan,
                is_root=is_root,
                init=init,
                init_injection_plan=init_plan,
                name_spec=name,
            ),
        )
    else:
        self._devices.append(
            _DeviceRegistration(
                name=name,
                func=func,
                injection_plan=plan,
                is_root=is_root,
                init=init,
                init_injection_plan=init_plan,
            ),
        )

command

command(
    name: str | None = None,
    *,
    init: Callable[..., Any] | None = None,
    enabled: bool = True,
) -> Callable[..., Any]

Register a command handler for an MQTT device.

The decorated function is called each time a command arrives on the {prefix}/{name}/set topic. Parameters named topic and payload receive the MQTT message values; all other parameters are injected by type annotation, exactly like @app.device and @app.telemetry handlers.

If the handler returns a dict, the framework publishes it as device state via publish_state(). Return None to skip auto-publishing.

When name is None, the function name is used internally and the device publishes to root-level topics.

Parameters:

Name Type Description Default
name str | None

Device name used for MQTT topics and logging. When None, the function name is used internally and topics omit the device segment.

None
init Callable[..., Any] | None

Optional synchronous factory called once before the handler loop. Its return value is injected into the handler by type.

None
enabled bool

When False, registration is silently skipped. The decorator returns the original function unmodified and no name slot is reserved. Defaults to True.

True

Raises:

Type Description
ValueError

If a device with this name is already registered.

ValueError

If a second root (unnamed) device is registered.

TypeError

If any handler parameter lacks a type annotation.

Source code in packages/src/cosalette/_app.py
def command(
    self,
    name: str | None = None,
    *,
    init: Callable[..., Any] | None = None,
    enabled: bool = True,
) -> Callable[..., Any]:
    """Register a command handler for an MQTT device.

    The decorated function is called each time a command arrives
    on the ``{prefix}/{name}/set`` topic.  Parameters named
    ``topic`` and ``payload`` receive the MQTT message values;
    all other parameters are injected by type annotation, exactly
    like ``@app.device`` and ``@app.telemetry`` handlers.

    If the handler returns a ``dict``, the framework publishes it
    as device state via ``publish_state()``.  Return ``None`` to
    skip auto-publishing.

    When *name* is ``None``, the function name is used internally
    and the device publishes to root-level topics.

    Args:
        name: Device name used for MQTT topics and logging.  When
            ``None``, the function name is used internally and
            topics omit the device segment.
        init: Optional synchronous factory called once before the
            handler loop.  Its return value is injected into
            the handler by type.
        enabled: When ``False``, registration is silently skipped.
            The decorator returns the original function unmodified
            and no name slot is reserved.  Defaults to ``True``.

    Raises:
        ValueError: If a device with this name is already registered.
        ValueError: If a second root (unnamed) device is registered.
        TypeError: If any handler parameter lacks a type annotation.
    """
    if callable(name) and asyncio.iscoroutinefunction(name):
        raise TypeError(
            "Use @app.command(), not @app.command (parentheses required)"
        )

    def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
        if callable(name):
            self.add_command(name, func, init=init, enabled=enabled, is_root=False)
        else:
            resolved_name = name if name is not None else func.__name__
            self.add_command(
                resolved_name,
                func,
                init=init,
                enabled=enabled,
                is_root=name is None,
            )
        return func

    return decorator

add_command

add_command(
    name: str | Callable[..., Any],
    func: Callable[
        ..., Awaitable[dict[str, object] | None]
    ],
    *,
    init: Callable[..., Any] | None = None,
    enabled: bool = True,
    is_root: bool = False,
) -> None

Register a command handler imperatively.

This is the imperative counterpart to :meth:command. It always creates a named (non-root) registration by default.

Parameters:

Name Type Description Default
name str | Callable[..., Any]

Device name for MQTT topics and logging.

required
func Callable[..., Awaitable[dict[str, object] | None]]

Async callable invoked on each incoming command. Parameters named topic and payload receive the MQTT message values; others are injected by type.

required
init Callable[..., Any] | None

Optional synchronous factory called once before the handler loop. Its return value is injected into func by type.

None
enabled bool

When False, registration is silently skipped — no entry in the registry and no name slot reserved. Defaults to True.

True
is_root bool

When True, the device publishes to root-level topics ({prefix}/state instead of {prefix}/{name}/state). Defaults to False.

False

Raises:

Type Description
ValueError

If a device with this name is already registered.

TypeError

If init is async or has un-annotated parameters.

TypeError

If func has un-annotated parameters.

See Also

:meth:command — decorator equivalent.

Source code in packages/src/cosalette/_app.py
def add_command(
    self,
    name: str | Callable[..., Any],
    func: Callable[..., Awaitable[dict[str, object] | None]],
    *,
    init: Callable[..., Any] | None = None,
    enabled: bool = True,
    is_root: bool = False,
) -> None:
    """Register a command handler imperatively.

    This is the imperative counterpart to :meth:`command`.  It
    always creates a *named* (non-root) registration by default.

    Args:
        name: Device name for MQTT topics and logging.
        func: Async callable invoked on each incoming command.
            Parameters named ``topic`` and ``payload`` receive the
            MQTT message values; others are injected by type.
        init: Optional synchronous factory called once before the
            handler loop.  Its return value is injected into
            *func* by type.
        enabled: When ``False``, registration is silently skipped
            — no entry in the registry and no name slot reserved.
            Defaults to ``True``.
        is_root: When ``True``, the device publishes to root-level
            topics (``{prefix}/state`` instead of
            ``{prefix}/{name}/state``).  Defaults to ``False``.

    Raises:
        ValueError: If a device with this name is already registered.
        TypeError: If *init* is async or has un-annotated parameters.
        TypeError: If *func* has un-annotated parameters.

    See Also:
        :meth:`command` — decorator equivalent.
    """
    if not enabled:
        return
    if init is not None:
        _validate_init(init)
    init_plan = build_injection_plan(init) if init is not None else None
    if not callable(name):
        check_device_name(
            name,
            registry_type="command",
            is_root=is_root,
            devices=self._devices,
            telemetry=self._telemetry,
            commands=self._commands,
        )
    plan = build_injection_plan(func, mqtt_params={"topic", "payload"})
    sig = inspect.signature(func)
    declared_mqtt = frozenset({"topic", "payload"} & sig.parameters.keys())
    if callable(name):
        self._commands.append(
            _CommandRegistration(
                name=func.__qualname__,
                func=func,
                injection_plan=plan,
                mqtt_params=declared_mqtt,
                is_root=is_root,
                init=init,
                init_injection_plan=init_plan,
                name_spec=name,
            ),
        )
    else:
        self._commands.append(
            _CommandRegistration(
                name=name,
                func=func,
                injection_plan=plan,
                mqtt_params=declared_mqtt,
                is_root=is_root,
                init=init,
                init_injection_plan=init_plan,
            ),
        )

telemetry

telemetry(
    name: str | None = None,
    *,
    interval: IntervalSpec,
    publish: PublishStrategy | None = None,
    persist: PersistPolicy | None = None,
    init: Callable[..., Any] | None = None,
    enabled: bool = True,
    group: str | None = None,
    retry: int = 0,
    retry_on: tuple[type[BaseException], ...] | None = None,
    backoff: BackoffStrategy | None = None,
    circuit_breaker: CircuitBreaker | None = None,
) -> Callable[..., Any]

Register a telemetry device with periodic polling.

The decorated function returns a dict published as JSON state, or None to suppress publishing for that cycle. Parameters are injected based on type annotations — declare only what you need. Zero-parameter handlers are valid.

The framework calls the handler at the specified interval and publishes the returned dict (unless suppressed by a None return or a publish strategy).

When name is None, the function name is used internally and the device publishes to root-level topics.

Parameters:

Name Type Description Default
name str | None

Device name for MQTT topics and logging. When None, the function name is used internally and topics omit the device segment.

None
interval IntervalSpec

Polling interval in seconds, or a callable (Settings) -> float for deferred resolution. When a callable is provided, it is invoked once in :meth:_run_async after settings are resolved — this allows reading intervals from settings without requiring valid settings at registration time (e.g. during --help / --version).

required
publish PublishStrategy | None

Optional publish strategy controlling when readings are actually published (e.g. OnChange(), Every(seconds=60)). When None, every reading is published unconditionally.

None
persist PersistPolicy | None

Optional save policy controlling when the :class:DeviceStore is persisted (e.g. SaveOnPublish(), SaveOnChange()). Requires store= on the :class:App. When None, the store is saved only on shutdown (the safety net).

None
init Callable[..., Any] | None

Optional synchronous factory called once before the handler loop. Its return value is injected into the handler by type.

None
enabled bool

When False, registration is silently skipped. The decorator returns the original function unmodified and no name slot is reserved. Defaults to True.

True
group str | None

Optional coalescing group name. Telemetry devices in the same group share a single scheduler tick so their readings are published together. When None (the default), the device runs on its own independent timer.

None
retry int

Maximum number of retry attempts after a failure. Defaults to 0 (no retry). The retry counter persists across poll cycles and resets on success.

0
retry_on tuple[type[BaseException], ...] | None

Exception types to retry on. Defaults to (OSError,) when retry > 0 and not explicitly set. Exceptions not matching this tuple propagate immediately to the error handler.

None
backoff BackoffStrategy | None

Backoff strategy controlling delay between retries (e.g. ExponentialBackoff(), LinearBackoff(), FixedBackoff()). Defaults to ExponentialBackoff(base=2.0, max_delay=60.0) when retry > 0 and not explicitly set.

None
circuit_breaker CircuitBreaker | None

Optional circuit breaker that stops retrying after consecutive failed cycles. Works independently of retry — even with retry=0, it tracks per-cycle failures.

None

Raises:

Type Description
ValueError

If a device with this name is already registered.

ValueError

If a second root (unnamed) device is registered.

ValueError

If interval is a float and <= 0. For callable intervals, validation is deferred to :meth:_run_async.

ValueError

If persist is set but no store= backend was configured on the App.

ValueError

If group is an empty string.

ValueError

If retry > 0 and retry_on is explicitly empty.

TypeError

If any handler parameter lacks a type annotation.

Source code in packages/src/cosalette/_app.py
def telemetry(
    self,
    name: str | None = None,
    *,
    interval: IntervalSpec,
    publish: PublishStrategy | None = None,
    persist: PersistPolicy | None = None,
    init: Callable[..., Any] | None = None,
    enabled: bool = True,
    group: str | None = None,
    retry: int = 0,
    retry_on: tuple[type[BaseException], ...] | None = None,
    backoff: BackoffStrategy | None = None,
    circuit_breaker: CircuitBreaker | None = None,
) -> Callable[..., Any]:
    """Register a telemetry device with periodic polling.

    The decorated function returns a ``dict`` published as JSON
    state, or ``None`` to suppress publishing for that cycle.
    Parameters are injected based on type annotations — declare
    only what you need.  Zero-parameter handlers are valid.

    The framework calls the handler at the specified interval
    and publishes the returned dict (unless suppressed by a
    ``None`` return or a publish strategy).

    When *name* is ``None``, the function name is used internally
    and the device publishes to root-level topics.

    Args:
        name: Device name for MQTT topics and logging.  When
            ``None``, the function name is used internally and
            topics omit the device segment.
        interval: Polling interval in seconds, or a callable
            ``(Settings) -> float`` for deferred resolution.
            When a callable is provided, it is invoked once in
            :meth:`_run_async` after settings are resolved —
            this allows reading intervals from settings without
            requiring valid settings at registration time (e.g.
            during ``--help`` / ``--version``).
        publish: Optional publish strategy controlling when
            readings are actually published (e.g. ``OnChange()``,
            ``Every(seconds=60)``).  When ``None``, every reading
            is published unconditionally.
        persist: Optional save policy controlling when the
            :class:`DeviceStore` is persisted (e.g.
            ``SaveOnPublish()``, ``SaveOnChange()``).  Requires
            ``store=`` on the :class:`App`.  When ``None``, the
            store is saved only on shutdown (the safety net).
        init: Optional synchronous factory called once before the
            handler loop.  Its return value is injected into
            the handler by type.
        enabled: When ``False``, registration is silently skipped.
            The decorator returns the original function unmodified
            and no name slot is reserved.  Defaults to ``True``.
        group: Optional coalescing group name.  Telemetry devices
            in the same group share a single scheduler tick so
            their readings are published together.  When ``None``
            (the default), the device runs on its own independent
            timer.
        retry: Maximum number of retry attempts after a failure.
            Defaults to ``0`` (no retry).  The retry counter
            persists across poll cycles and resets on success.
        retry_on: Exception types to retry on.  Defaults to
            ``(OSError,)`` when ``retry > 0`` and not explicitly
            set.  Exceptions not matching this tuple propagate
            immediately to the error handler.
        backoff: Backoff strategy controlling delay between retries
            (e.g. ``ExponentialBackoff()``, ``LinearBackoff()``,
            ``FixedBackoff()``).  Defaults to
            ``ExponentialBackoff(base=2.0, max_delay=60.0)`` when
            ``retry > 0`` and not explicitly set.
        circuit_breaker: Optional circuit breaker that stops
            retrying after consecutive failed cycles.  Works
            independently of ``retry`` — even with ``retry=0``,
            it tracks per-cycle failures.

    Raises:
        ValueError: If a device with this name is already registered.
        ValueError: If a second root (unnamed) device is registered.
        ValueError: If *interval* is a float and <= 0.  For
            callable intervals, validation is deferred to
            :meth:`_run_async`.
        ValueError: If ``persist`` is set but no ``store=`` backend
            was configured on the App.
        ValueError: If *group* is an empty string.
        ValueError: If ``retry > 0`` and ``retry_on`` is
            explicitly empty.
        TypeError: If any handler parameter lacks a type annotation.
    """
    # Skip all validation when disabled — a disabled device shouldn't raise.
    if enabled and group is not None and group == "":
        msg = "group must be non-empty"
        raise ValueError(msg)

    # Eagerly validate persist/store at decoration time
    # (add_telemetry re-checks for the imperative path).
    # Skip when disabled — a disabled device shouldn't raise.
    if enabled and persist is not None and self._store is None:
        msg = (
            "persist= requires a store= backend on the App. "
            "Pass store=MemoryStore() (or another Store) to App()."
        )
        raise ValueError(msg)

    def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
        if callable(name):
            self.add_telemetry(
                name,
                func,
                interval=interval,
                publish=publish,
                persist=persist,
                init=init,
                enabled=enabled,
                group=group,
                is_root=False,
                retry=retry,
                retry_on=retry_on,
                backoff=backoff,
                circuit_breaker=circuit_breaker,
            )
        else:
            resolved_name = name if name is not None else func.__name__
            self.add_telemetry(
                resolved_name,
                func,
                interval=interval,
                publish=publish,
                persist=persist,
                init=init,
                enabled=enabled,
                group=group,
                is_root=name is None,
                retry=retry,
                retry_on=retry_on,
                backoff=backoff,
                circuit_breaker=circuit_breaker,
            )
        return func

    return decorator

add_telemetry

add_telemetry(
    name: str | Callable[..., Any],
    func: Callable[
        ..., Awaitable[dict[str, object] | None]
    ],
    *,
    interval: IntervalSpec,
    publish: PublishStrategy | None = None,
    persist: PersistPolicy | None = None,
    init: Callable[..., Any] | None = None,
    enabled: bool = True,
    group: str | None = None,
    is_root: bool = False,
    retry: int = 0,
    retry_on: tuple[type[BaseException], ...] | None = None,
    backoff: BackoffStrategy | None = None,
    circuit_breaker: CircuitBreaker | None = None,
) -> None

Register a telemetry device imperatively.

This is the imperative counterpart to :meth:telemetry. It always creates a named (non-root) registration by default.

Parameters:

Name Type Description Default
name str | Callable[..., Any]

Device name for MQTT topics and logging.

required
func Callable[..., Awaitable[dict[str, object] | None]]

Async callable returning a dict (published as state) or None (suppresses that cycle).

required
interval IntervalSpec

Polling interval in seconds, or a callable (Settings) -> float for deferred resolution. When a callable is provided, it is invoked once in :meth:_run_async after settings are resolved — this allows reading intervals from settings without requiring valid settings at registration time (e.g. during --help / --version).

required
publish PublishStrategy | None

Optional publish strategy (e.g. OnChange()) controlling when readings are actually published.

None
persist PersistPolicy | None

Optional save policy. Requires store= on the :class:App.

None
init Callable[..., Any] | None

Optional synchronous factory called once before the handler loop. Its return value is injected into func by type.

None
enabled bool

When False, registration is silently skipped — no entry in the registry and no name slot reserved. Defaults to True.

True
group str | None

Optional coalescing group name. Telemetry devices in the same group share a single scheduler tick so their readings are published together. When None (the default), the device runs on its own independent timer.

None
is_root bool

When True, the device publishes to root-level topics ({prefix}/state instead of {prefix}/{name}/state). Defaults to False.

False

Raises:

Type Description
ValueError

If a device with this name is already registered.

ValueError

If interval is a float and <= 0. For callable intervals, validation is deferred to :meth:_run_async.

ValueError

If persist is set but no store= backend was configured on the App.

ValueError

If group is an empty string.

TypeError

If init is async or has un-annotated parameters.

TypeError

If func has un-annotated parameters.

See Also

:meth:telemetry — decorator equivalent.

Source code in packages/src/cosalette/_app.py
def add_telemetry(
    self,
    name: str | Callable[..., Any],
    func: Callable[..., Awaitable[dict[str, object] | None]],
    *,
    interval: IntervalSpec,
    publish: PublishStrategy | None = None,
    persist: PersistPolicy | None = None,
    init: Callable[..., Any] | None = None,
    enabled: bool = True,
    group: str | None = None,
    is_root: bool = False,
    retry: int = 0,
    retry_on: tuple[type[BaseException], ...] | None = None,
    backoff: BackoffStrategy | None = None,
    circuit_breaker: CircuitBreaker | None = None,
) -> None:
    """Register a telemetry device imperatively.

    This is the imperative counterpart to :meth:`telemetry`.  It
    always creates a *named* (non-root) registration by default.

    Args:
        name: Device name for MQTT topics and logging.
        func: Async callable returning a ``dict`` (published as
            state) or ``None`` (suppresses that cycle).
        interval: Polling interval in seconds, or a callable
            ``(Settings) -> float`` for deferred resolution.
            When a callable is provided, it is invoked once in
            :meth:`_run_async` after settings are resolved —
            this allows reading intervals from settings without
            requiring valid settings at registration time (e.g.
            during ``--help`` / ``--version``).
        publish: Optional publish strategy (e.g. ``OnChange()``)
            controlling when readings are actually published.
        persist: Optional save policy.  Requires ``store=`` on the
            :class:`App`.
        init: Optional synchronous factory called once before the
            handler loop.  Its return value is injected into
            *func* by type.
        enabled: When ``False``, registration is silently skipped
            — no entry in the registry and no name slot reserved.
            Defaults to ``True``.
        group: Optional coalescing group name.  Telemetry devices
            in the same group share a single scheduler tick so
            their readings are published together.  When ``None``
            (the default), the device runs on its own independent
            timer.
        is_root: When ``True``, the device publishes to root-level
            topics (``{prefix}/state`` instead of
            ``{prefix}/{name}/state``).  Defaults to ``False``.

    Raises:
        ValueError: If a device with this name is already registered.
        ValueError: If *interval* is a float and <= 0.  For
            callable intervals, validation is deferred to
            :meth:`_run_async`.
        ValueError: If *persist* is set but no ``store=`` backend
            was configured on the App.
        ValueError: If *group* is an empty string.
        TypeError: If *init* is async or has un-annotated parameters.
        TypeError: If *func* has un-annotated parameters.

    See Also:
        :meth:`telemetry` — decorator equivalent.
    """
    if not enabled:
        return
    self._validate_telemetry_args(
        name,
        interval,
        persist,
        init,
        group,
        retry=retry,
        retry_on=retry_on,
    )
    init_plan = build_injection_plan(init) if init is not None else None
    if not callable(name):
        check_device_name(
            name,
            registry_type="telemetry",
            is_root=is_root,
            devices=self._devices,
            telemetry=self._telemetry,
            commands=self._commands,
        )
    plan = build_injection_plan(func)
    resolved_name = func.__qualname__ if callable(name) else name
    name_spec = name if callable(name) else None

    # Resolve retry defaults
    resolved_retry_on = retry_on
    resolved_backoff = backoff
    if retry > 0:
        if resolved_retry_on is None:
            resolved_retry_on = _DEFAULT_RETRY_ON
        if resolved_backoff is None:
            resolved_backoff = _DEFAULT_BACKOFF

    self._telemetry.append(
        _TelemetryRegistration(
            name=resolved_name,
            func=func,
            injection_plan=plan,
            interval=interval,
            is_root=is_root,
            publish_strategy=publish,
            persist_policy=persist,
            init=init,
            init_injection_plan=init_plan,
            group=group,
            name_spec=name_spec,
            retry=retry,
            retry_on=resolved_retry_on if resolved_retry_on is not None else (),
            backoff=resolved_backoff,
            circuit_breaker=circuit_breaker,
        ),
    )

adapter

adapter(
    port_type: type,
    impl: type | str | Callable[..., object],
    *,
    dry_run: type
    | str
    | Callable[..., object]
    | None = None,
) -> None

Register an adapter for a port type.

All adapter forms support dependency injection: if a class __init__ or factory callable declares a parameter annotated with Settings (or a subclass), the parsed settings instance is auto-injected at resolution time.

Parameters:

Name Type Description Default
port_type type

The Protocol type to register.

required
impl type | str | Callable[..., object]

The adapter class, a module:ClassName lazy import string, or a factory callable returning an adapter instance.

required
dry_run type | str | Callable[..., object] | None

Optional dry-run variant (class, lazy import string, or factory callable).

None

Raises:

Type Description
ValueError

If an adapter is already registered for this port type.

TypeError

If a callable (class or factory) has invalid signatures (e.g. un-annotated parameters or unresolvable types).

Source code in packages/src/cosalette/_app.py
def adapter(
    self,
    port_type: type,
    impl: type | str | Callable[..., object],
    *,
    dry_run: type | str | Callable[..., object] | None = None,
) -> None:
    """Register an adapter for a port type.

    All adapter forms support dependency injection: if a class
    ``__init__`` or factory callable declares a parameter
    annotated with ``Settings`` (or a subclass), the parsed
    settings instance is auto-injected at resolution time.

    Args:
        port_type: The Protocol type to register.
        impl: The adapter class, a ``module:ClassName`` lazy import
            string, or a factory callable returning an adapter instance.
        dry_run: Optional dry-run variant (class, lazy import string,
            or factory callable).

    Raises:
        ValueError: If an adapter is already registered for this port type.
        TypeError: If a callable (class or factory) has invalid
            signatures (e.g. un-annotated parameters or
            unresolvable types).
    """
    if port_type in self._adapters:
        msg = f"Adapter already registered for {port_type!r}"
        raise ValueError(msg)

    # Fail-fast: validate callable signatures at registration time
    # so errors surface here rather than at runtime resolution.
    # Classes are included — inspect.signature(cls) inspects __init__.
    for candidate in (impl, dry_run):
        if (
            candidate is not None
            and callable(candidate)
            and not isinstance(candidate, str)
        ):
            build_injection_plan(candidate)

    self._adapters[port_type] = _AdapterEntry(impl=impl, dry_run=dry_run)

run

run(
    *,
    mqtt: MqttPort | None = None,
    settings: Settings | None = None,
    shutdown_event: Event | None = None,
    clock: ClockPort | None = None,
) -> None

Start the application (blocking, synchronous entrypoint).

Wraps :meth:_run_async in :func:asyncio.run, handling KeyboardInterrupt for clean Ctrl-C shutdown. This is the recommended way to launch a cosalette application::

app = cosalette.App(name="mybridge", version="0.1.0")
app.run()

All parameters are optional and intended for programmatic or test use — production apps typically call run() with no arguments.

Parameters:

Name Type Description Default
mqtt MqttPort | None

Override MQTT client (e.g. MockMqttClient for testing). When None, a real MqttClient is created from settings.

None
settings Settings | None

Override settings (skip env-file loading).

None
shutdown_event Event | None

Override shutdown event (skip OS signal handlers). Useful in tests to control shutdown timing.

None
clock ClockPort | None

Override clock (e.g. FakeClock for tests).

None
See Also

:meth:cli — CLI entrypoint with Typer argument parsing.

Source code in packages/src/cosalette/_app.py
def run(
    self,
    *,
    mqtt: MqttPort | None = None,
    settings: Settings | None = None,
    shutdown_event: asyncio.Event | None = None,
    clock: ClockPort | None = None,
) -> None:
    """Start the application (blocking, synchronous entrypoint).

    Wraps :meth:`_run_async` in :func:`asyncio.run`, handling
    ``KeyboardInterrupt`` for clean Ctrl-C shutdown.  This is the
    recommended way to launch a cosalette application::

        app = cosalette.App(name="mybridge", version="0.1.0")
        app.run()

    All parameters are optional and intended for programmatic or
    test use — production apps typically call ``run()`` with no
    arguments.

    Args:
        mqtt: Override MQTT client (e.g. ``MockMqttClient`` for
            testing).  When ``None``, a real ``MqttClient`` is
            created from settings.
        settings: Override settings (skip env-file loading).
        shutdown_event: Override shutdown event (skip OS signal
            handlers).  Useful in tests to control shutdown timing.
        clock: Override clock (e.g. ``FakeClock`` for tests).

    See Also:
        :meth:`cli` — CLI entrypoint with Typer argument parsing.
    """
    with contextlib.suppress(KeyboardInterrupt):
        asyncio.run(
            self._run_async(
                mqtt=mqtt,
                settings=settings,
                shutdown_event=shutdown_event,
                clock=clock,
            ),
        )

cli

cli() -> None

Start the application with CLI argument parsing.

Builds a Typer CLI from the application's configuration, parses command-line arguments (--dry-run, --version, --log-level, --log-format, --env-file), and orchestrates the full async lifecycle.

For production use without CLI parsing, prefer :meth:run.

See Also

ADR-005 — CLI framework.

Source code in packages/src/cosalette/_app.py
def cli(self) -> None:
    """Start the application with CLI argument parsing.

    Builds a Typer CLI from the application's configuration,
    parses command-line arguments (``--dry-run``, ``--version``,
    ``--log-level``, ``--log-format``, ``--env-file``), and
    orchestrates the full async lifecycle.

    For production use without CLI parsing, prefer :meth:`run`.

    See Also:
        ADR-005 — CLI framework.
    """
    from cosalette._cli import build_cli

    cli = build_cli(self)
    cli(standalone_mode=True)

cosalette.AppContext

AppContext(
    *, settings: Settings, adapters: dict[type, object]
)

Context for the application lifespan.

Provided to the lifespan async context manager registered via App(lifespan=...). Offers access to settings and adapter resolution but NOT per-device features (no publish, no on_command, no sleep).

See Also

ADR-001 — Framework architecture (lifespan).

Initialise lifecycle-hook context.

Parameters:

Name Type Description Default
settings Settings

Application settings instance.

required
adapters dict[type, object]

Resolved adapter registry mapping port types to instances.

required
Source code in packages/src/cosalette/_context.py
def __init__(
    self,
    *,
    settings: Settings,
    adapters: dict[type, object],
) -> None:
    """Initialise lifecycle-hook context.

    Args:
        settings: Application settings instance.
        adapters: Resolved adapter registry mapping port types to instances.
    """
    self._settings = settings
    self._adapters = adapters

settings property

settings: Settings

Application settings instance.

adapter

adapter(port_type: type[T]) -> T

Resolve an adapter by port type.

Parameters:

Name Type Description Default
port_type type[T]

The Protocol type to look up.

required

Returns:

Type Description
T

The adapter instance registered for that port type.

Raises:

Type Description
LookupError

If no adapter is registered for the port type.

Source code in packages/src/cosalette/_context.py
def adapter[T](self, port_type: type[T]) -> T:
    """Resolve an adapter by port type.

    Args:
        port_type: The Protocol type to look up.

    Returns:
        The adapter instance registered for that port type.

    Raises:
        LookupError: If no adapter is registered for the port type.
    """
    try:
        return self._adapters[port_type]  # type: ignore[return-value]
    except KeyError:
        msg = f"No adapter registered for {port_type!r}"
        raise LookupError(msg) from None

cosalette.DeviceContext

DeviceContext(
    *,
    name: str,
    settings: Settings,
    mqtt: MqttPort,
    topic_prefix: str,
    shutdown_event: Event,
    adapters: dict[type, object],
    clock: ClockPort,
    is_root: bool = False,
)

Per-device runtime context injected by the framework.

Provides device-scoped access to MQTT publishing, command registration, shutdown-aware sleeping, and adapter resolution.

Each device function receives its own DeviceContext instance. The context is pre-configured with the device's name and topic prefix so that publish operations target the correct MQTT topics automatically.

See Also

ADR-010 — Device archetypes. ADR-006 — Hexagonal architecture (adapter resolution).

Initialise per-device context.

Parameters:

Name Type Description Default
name str

Device name as registered (e.g. "blind").

required
settings Settings

Application settings instance.

required
mqtt MqttPort

MQTT port for publishing.

required
topic_prefix str

Root prefix for MQTT topics (e.g. "velux2mqtt").

required
shutdown_event Event

Shared event that signals graceful shutdown.

required
adapters dict[type, object]

Resolved adapter registry mapping port types to instances.

required
clock ClockPort

Monotonic clock for timing.

required
is_root bool

When True, topics omit the device name segment (root-level device).

False
Source code in packages/src/cosalette/_context.py
def __init__(
    self,
    *,
    name: str,
    settings: Settings,
    mqtt: MqttPort,
    topic_prefix: str,
    shutdown_event: asyncio.Event,
    adapters: dict[type, object],
    clock: ClockPort,
    is_root: bool = False,
) -> None:
    """Initialise per-device context.

    Args:
        name: Device name as registered (e.g. "blind").
        settings: Application settings instance.
        mqtt: MQTT port for publishing.
        topic_prefix: Root prefix for MQTT topics (e.g. "velux2mqtt").
        shutdown_event: Shared event that signals graceful shutdown.
        adapters: Resolved adapter registry mapping port types to instances.
        clock: Monotonic clock for timing.
        is_root: When True, topics omit the device name segment
            (root-level device).
    """
    self._name = name
    self._settings = settings
    self._mqtt = mqtt
    self._topic_prefix = topic_prefix
    self._shutdown_event = shutdown_event
    self._adapters = adapters
    self._clock = clock
    self._command_handlers: dict[str | None, CommandHandler] = {}
    self._is_root = is_root
    self._command_queue: asyncio.Queue[Command] = asyncio.Queue()
    self._commands_consumed: bool = False
    self._topic_base = topic_prefix if is_root else f"{topic_prefix}/{name}"

name property

name: str

Device name as registered.

settings property

settings: Settings

Application settings instance.

clock property

clock: ClockPort

Monotonic clock for timing.

shutdown_requested property

shutdown_requested: bool

True when the framework has received a shutdown signal.

command_handler property

command_handler: CommandHandler | None

The root command handler, or None. Framework-internal.

command_handlers property

command_handlers: Mapping[str | None, CommandHandler]

All registered command handlers keyed by sub-topic. Framework-internal.

get_command_handler

get_command_handler(
    sub_topic: str | None = None,
) -> CommandHandler | None

Look up the command handler for a sub-topic (or root).

Source code in packages/src/cosalette/_context.py
def get_command_handler(
    self,
    sub_topic: str | None = None,
) -> CommandHandler | None:
    """Look up the command handler for a sub-topic (or root)."""
    return self._command_handlers.get(sub_topic)

publish_state async

publish_state(
    payload: dict[str, object], *, retain: bool = True
) -> None

Publish device state to {prefix}/{device}/state as JSON.

For root devices (unnamed), publishes to {prefix}/state instead.

This is the primary publication method for device telemetry. The payload dict is JSON-serialised automatically.

Parameters:

Name Type Description Default
payload dict[str, object]

Dict to serialise as JSON.

required
retain bool

Whether the message should be retained (default True).

True
Source code in packages/src/cosalette/_context.py
async def publish_state(
    self,
    payload: dict[str, object],
    *,
    retain: bool = True,
) -> None:
    """Publish device state to ``{prefix}/{device}/state`` as JSON.

    For root devices (unnamed), publishes to ``{prefix}/state`` instead.

    This is the primary publication method for device telemetry.
    The payload dict is JSON-serialised automatically.

    Args:
        payload: Dict to serialise as JSON.
        retain: Whether the message should be retained (default True).
    """
    topic = f"{self._topic_base}/state"
    await self._mqtt.publish(topic, dumps(payload), retain=retain, qos=1)

publish async

publish(
    channel: str,
    payload: str,
    *,
    retain: bool = False,
    qos: int = 1,
) -> None

Publish to an arbitrary sub-channel: {prefix}/{device}/{channel}.

For root devices (unnamed), publishes to {prefix}/{channel} instead.

Escape hatch for non-standard topics. Prefer publish_state() for normal device state updates.

Source code in packages/src/cosalette/_context.py
async def publish(
    self,
    channel: str,
    payload: str,
    *,
    retain: bool = False,
    qos: int = 1,
) -> None:
    """Publish to an arbitrary sub-channel: ``{prefix}/{device}/{channel}``.

    For root devices (unnamed), publishes to ``{prefix}/{channel}`` instead.

    Escape hatch for non-standard topics. Prefer publish_state() for
    normal device state updates.
    """
    topic = f"{self._topic_base}/{channel}"
    await self._mqtt.publish(topic, payload, retain=retain, qos=qos)

sleep async

sleep(seconds: float) -> None

Shutdown-aware sleep.

Returns early (without exception) if shutdown is requested during the sleep period. This enables the idiomatic pattern::

while not ctx.shutdown_requested:
    await ctx.sleep(10)
    # ... do work ...
Source code in packages/src/cosalette/_context.py
async def sleep(self, seconds: float) -> None:
    """Shutdown-aware sleep.

    Returns early (without exception) if shutdown is requested during
    the sleep period. This enables the idiomatic pattern::

        while not ctx.shutdown_requested:
            await ctx.sleep(10)
            # ... do work ...
    """
    if self._shutdown_event.is_set():
        return

    sleep_task = asyncio.ensure_future(self._clock.sleep(seconds))
    shutdown_task = asyncio.ensure_future(self._shutdown_event.wait())

    done, pending = await asyncio.wait(
        {sleep_task, shutdown_task},
        return_when=asyncio.FIRST_COMPLETED,
    )

    for task in pending:
        task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await task

on_command

on_command(
    handler_or_sub_topic: CommandHandler
    | str
    | None = None,
) -> (
    CommandHandler
    | Callable[[CommandHandler], CommandHandler]
)

Register a command handler for this device.

Supports three call patterns:

  1. Decorator — root handler::

    @ctx.on_command async def handle(sub_topic: str | None, payload: str) -> None: ...

  2. Direct call — root handler::

    ctx.on_command(handle)

  3. Decorator factory — sub-topic handler::

    @ctx.on_command("calibrate") async def handle_cal(sub_topic: str | None, payload: str) -> None: ...

Handlers may also accept a single :class:Command argument (new-style), detected automatically via type annotation::

@ctx.on_command
async def handle(cmd: Command) -> None: ...

Raises:

Type Description
RuntimeError

If a handler is already registered for the same sub-topic, or if commands() is active and a root handler is being registered.

ValueError

If the sub-topic string is empty or contains /, +, or #.

Returns:

Type Description
CommandHandler | Callable[[CommandHandler], CommandHandler]

The handler unchanged when called with a callable, or a

CommandHandler | Callable[[CommandHandler], CommandHandler]

decorator function when called with a sub-topic string or None.

Source code in packages/src/cosalette/_context.py
def on_command(
    self,
    handler_or_sub_topic: CommandHandler | str | None = None,
    /,
) -> CommandHandler | Callable[[CommandHandler], CommandHandler]:
    """Register a command handler for this device.

    Supports three call patterns:

    1. Decorator — root handler::

        @ctx.on_command
        async def handle(sub_topic: str | None, payload: str) -> None: ...

    2. Direct call — root handler::

        ctx.on_command(handle)

    3. Decorator factory — sub-topic handler::

        @ctx.on_command("calibrate")
        async def handle_cal(sub_topic: str | None, payload: str) -> None: ...

    Handlers may also accept a single :class:`Command` argument
    (new-style), detected automatically via type annotation::

        @ctx.on_command
        async def handle(cmd: Command) -> None: ...

    Raises:
        RuntimeError: If a handler is already registered for the same
            sub-topic, or if ``commands()`` is active and a root
            handler is being registered.
        ValueError: If the sub-topic string is empty or contains
            ``/``, ``+``, or ``#``.

    Returns:
        The handler unchanged when called with a callable, or a
        decorator function when called with a sub-topic string or None.
    """

    def _register(
        handler: CommandHandler,
        sub_topic: str | None,
    ) -> CommandHandler:
        if sub_topic is None and self._commands_consumed:
            msg = (
                f"Cannot register on_command — commands() iterator already "
                f"active for device '{self._name}'"
            )
            raise RuntimeError(msg)
        if sub_topic in self._command_handlers:
            label = f"sub-topic '{sub_topic}'" if sub_topic else "root"
            msg = (
                f"Command handler already registered for {label} "
                f"on device '{self._name}'"
            )
            raise RuntimeError(msg)
        self._command_handlers[sub_topic] = handler
        return handler

    # --- callable → register as root handler immediately ---
    if callable(handler_or_sub_topic):
        return _register(handler_or_sub_topic, None)

    # --- string → validate and return decorator for that sub-topic ---
    if isinstance(handler_or_sub_topic, str):
        if not handler_or_sub_topic:
            msg = "Sub-topic must not be empty"
            raise ValueError(msg)
        _invalid = set(handler_or_sub_topic) & {"/", "+", "#"}
        if _invalid:
            msg = (
                f"Sub-topic contains invalid MQTT characters "
                f"{_invalid}: '{handler_or_sub_topic}'"
            )
            raise ValueError(msg)
        sub = handler_or_sub_topic

        def _decorator(handler: CommandHandler) -> CommandHandler:
            return _register(handler, sub)

        return _decorator

    # --- None → return decorator for root handler ---
    def _root_decorator(handler: CommandHandler) -> CommandHandler:
        return _register(handler, None)

    return _root_decorator

commands

commands(
    timeout: float | None = None,
) -> AsyncIterator[Command | None]

Async iterator that yields inbound commands from the internal queue.

Provides a queue-backed async iterator for @app.device loops, eliminating the need for manual asyncio.Queue bridges.

When timeout is provided, yields None on timeout expiry, enabling periodic-work patterns::

async for cmd in ctx.commands(timeout=5):
    if cmd is None:
        await periodic_check()
    else:
        await process(cmd.payload)

Without timeout, blocks until a command arrives or shutdown is requested. Shutdown is detected immediately via event racing.

Parameters:

Name Type Description Default
timeout float | None

Seconds to wait for a command before yielding None. When None (default), blocks indefinitely until a command arrives or shutdown is requested.

None

Yields:

Type Description
AsyncIterator[Command | None]

Command when a command arrives, or None on timeout expiry.

Raises:

Type Description
RuntimeError

If called more than once on the same context, or if a command handler is already registered via :meth:on_command.

See Also

ADR-025 — Command channel and sub-topic routing.

Source code in packages/src/cosalette/_context.py
def commands(
    self,
    timeout: float | None = None,
) -> AsyncIterator[Command | None]:
    """Async iterator that yields inbound commands from the internal queue.

    Provides a queue-backed async iterator for ``@app.device`` loops,
    eliminating the need for manual ``asyncio.Queue`` bridges.

    When *timeout* is provided, yields ``None`` on timeout expiry,
    enabling periodic-work patterns::

        async for cmd in ctx.commands(timeout=5):
            if cmd is None:
                await periodic_check()
            else:
                await process(cmd.payload)

    Without *timeout*, blocks until a command arrives or shutdown is
    requested. Shutdown is detected immediately via event racing.

    Args:
        timeout: Seconds to wait for a command before yielding None.
            When None (default), blocks indefinitely until a command
            arrives or shutdown is requested.

    Yields:
        Command when a command arrives, or None on timeout expiry.

    Raises:
        RuntimeError: If called more than once on the same context,
            or if a command handler is already registered via
            :meth:`on_command`.

    See Also:
        ADR-025 — Command channel and sub-topic routing.
    """
    if self._commands_consumed:
        msg = f"commands() already active for device '{self._name}'"
        raise RuntimeError(msg)
    if None in self._command_handlers:
        msg = (
            f"Cannot use commands() — on_command handler already "
            f"registered for device '{self._name}'"
        )
        raise RuntimeError(msg)
    self._commands_consumed = True

    async def _iter() -> AsyncIterator[Command | None]:
        while not self.shutdown_requested:
            result = await self._await_command(timeout)
            if result is not None:
                yield result
            elif self.shutdown_requested:
                break
            elif timeout is not None:
                yield None
        # Drain any commands that arrived before/during shutdown
        while not self._command_queue.empty():
            yield self._command_queue.get_nowait()

    return _iter()

adapter

adapter(port_type: type[T]) -> T

Resolve an adapter by port type.

Parameters:

Name Type Description Default
port_type type[T]

The Protocol type to look up.

required

Returns:

Type Description
T

The adapter instance registered for that port type.

Raises:

Type Description
LookupError

If no adapter is registered for the port type.

Source code in packages/src/cosalette/_context.py
def adapter[T](self, port_type: type[T]) -> T:
    """Resolve an adapter by port type.

    Args:
        port_type: The Protocol type to look up.

    Returns:
        The adapter instance registered for that port type.

    Raises:
        LookupError: If no adapter is registered for the port type.
    """
    try:
        return self._adapters[port_type]  # type: ignore[return-value]
    except KeyError:
        msg = f"No adapter registered for {port_type!r}"
        raise LookupError(msg) from None

MQTT

cosalette.MqttPort

Bases: Protocol

Port contract for MQTT publish/subscribe.

Satisfies ADR-006 hexagonal architecture: all MQTT interaction goes through this protocol so adapters are swappable.

cosalette.MqttClient dataclass

MqttClient(
    settings: MqttSettings, will: WillConfig | None = None
)

Production MQTT adapter backed by aiomqtt.

Uses a background task that maintains a persistent connection with automatic reconnection. aiomqtt is imported lazily inside _connection_loop() so the mock and null adapters work without the dependency installed.

See Also

ADR-006 — Hexagonal architecture (lazy imports). ADR-012 — LWT / availability via WillConfig.

is_connected property

is_connected: bool

Whether the client is currently connected to the broker.

publish async

publish(
    topic: str,
    payload: str,
    *,
    retain: bool = False,
    qos: int = 1,
) -> None

Publish a message to the broker.

Raises:

Type Description
RuntimeError

If the client is not connected.

Source code in packages/src/cosalette/_mqtt_client.py
async def publish(
    self,
    topic: str,
    payload: str,
    *,
    retain: bool = False,
    qos: int = 1,
) -> None:
    """Publish a message to the broker.

    Raises:
        RuntimeError: If the client is not connected.
    """
    if self._client is None:
        msg = "MqttClient is not connected"
        raise RuntimeError(msg)
    await self._client.publish(
        topic,
        payload,
        retain=retain,
        qos=qos,
    )
    logger.debug(
        "Published to %s (qos=%d, retain=%s)",
        topic,
        qos,
        retain,
    )

subscribe async

subscribe(topic: str) -> None

Subscribe to topic.

The subscription is tracked internally so it can be restored after a reconnection.

Source code in packages/src/cosalette/_mqtt_client.py
async def subscribe(self, topic: str) -> None:
    """Subscribe to *topic*.

    The subscription is tracked internally so it can be restored
    after a reconnection.
    """
    self._subscriptions.add(topic)
    if self._client is not None:
        await self._client.subscribe(
            topic,
            qos=1,
        )

on_message

on_message(callback: MessageCallback) -> None

Register a callback for inbound messages.

Source code in packages/src/cosalette/_mqtt_client.py
def on_message(self, callback: MessageCallback) -> None:
    """Register a callback for inbound messages."""
    self._callbacks.append(callback)

start async

start() -> None

Start the background connection loop.

Source code in packages/src/cosalette/_mqtt_client.py
async def start(self) -> None:
    """Start the background connection loop."""
    if self._listen_task is not None and not self._listen_task.done():
        logger.debug("MqttClient.start() called while already running")
        return
    self._stopping = False
    self._listen_task = asyncio.create_task(
        self._connection_loop(),
    )

stop async

stop() -> None

Stop the connection loop and clean up.

Idempotent — safe to call multiple times.

Source code in packages/src/cosalette/_mqtt_client.py
async def stop(self) -> None:
    """Stop the connection loop and clean up.

    Idempotent — safe to call multiple times.
    """
    self._stopping = True
    if self._listen_task is not None:
        self._listen_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._listen_task
        self._listen_task = None
    self._client = None
    self._connected.clear()

cosalette.MqttLifecycle

Bases: Protocol

Lifecycle management for MQTT adapters.

Adapters that need explicit start/stop (e.g. connecting to a broker) implement this protocol. Adapters like MockMqttClient and NullMqttClient that need no lifecycle management simply omit these methods — the framework detects their absence via isinstance.

See Also

ADR-006 — Interface Segregation: ports are narrow by design. PEP 544 — Structural subtyping (Protocols).

cosalette.MqttMessageHandler

Bases: Protocol

Message dispatch capability for MQTT adapters.

Adapters that can receive inbound messages implement this protocol. The framework calls on_message to wire up the topic router.

See Also

ADR-006 — Interface Segregation.

cosalette.MockMqttClient dataclass

MockMqttClient(
    published: list[tuple[str, str, bool, int]] = list(),
    subscriptions: list[str] = list(),
    raise_on_publish: Exception | None = None,
)

In-memory test double that records MQTT interactions.

Records publishes and subscriptions for assertion. Supports callback registration and simulated message delivery via deliver().

publish_count property

publish_count: int

Number of recorded publishes.

subscribe_count property

subscribe_count: int

Number of recorded subscriptions.

publish async

publish(
    topic: str,
    payload: str,
    *,
    retain: bool = False,
    qos: int = 1,
) -> None

Record a publish call, or raise if raise_on_publish is set.

Source code in packages/src/cosalette/_mqtt.py
async def publish(
    self,
    topic: str,
    payload: str,
    *,
    retain: bool = False,
    qos: int = 1,
) -> None:
    """Record a publish call, or raise if ``raise_on_publish`` is set."""
    if self.raise_on_publish is not None:
        raise self.raise_on_publish
    self.published.append((topic, payload, retain, qos))

subscribe async

subscribe(topic: str) -> None

Record a subscribe call.

Source code in packages/src/cosalette/_mqtt.py
async def subscribe(self, topic: str) -> None:
    """Record a subscribe call."""
    self.subscriptions.append(topic)

on_message

on_message(callback: MessageCallback) -> None

Register an inbound-message callback.

Source code in packages/src/cosalette/_mqtt.py
def on_message(self, callback: MessageCallback) -> None:
    """Register an inbound-message callback."""
    self._callbacks.append(callback)

deliver async

deliver(topic: str, payload: str) -> None

Simulate an inbound message by invoking all callbacks.

Source code in packages/src/cosalette/_mqtt.py
async def deliver(self, topic: str, payload: str) -> None:
    """Simulate an inbound message by invoking all callbacks."""
    for cb in self._callbacks:
        await cb(topic, payload)

reset

reset() -> None

Clear all recorded data, callbacks, and failure injection.

Source code in packages/src/cosalette/_mqtt.py
def reset(self) -> None:
    """Clear all recorded data, callbacks, and failure injection."""
    self.published.clear()
    self.subscriptions.clear()
    self._callbacks.clear()
    self.raise_on_publish = None

get_messages_for

get_messages_for(topic: str) -> list[tuple[str, bool, int]]

Return (payload, retain, qos) tuples for topic.

Source code in packages/src/cosalette/_mqtt.py
def get_messages_for(
    self,
    topic: str,
) -> list[tuple[str, bool, int]]:
    """Return ``(payload, retain, qos)`` tuples for *topic*."""
    return [
        (payload, retain, qos)
        for t, payload, retain, qos in self.published
        if t == topic
    ]

cosalette.NullMqttClient dataclass

NullMqttClient()

Silent no-op MQTT adapter.

Every method is a no-op that logs at DEBUG level. Useful as a default when MQTT is not configured.

publish async

publish(
    topic: str,
    payload: str,
    *,
    retain: bool = False,
    qos: int = 1,
) -> None

Silently discard a publish request.

Source code in packages/src/cosalette/_mqtt.py
async def publish(
    self,
    topic: str,
    payload: str,  # noqa: ARG002
    *,
    retain: bool = False,  # noqa: ARG002
    qos: int = 1,  # noqa: ARG002
) -> None:
    """Silently discard a publish request."""
    logger.debug("NullMqttClient.publish(%s) — discarded", topic)

subscribe async

subscribe(topic: str) -> None

Silently discard a subscribe request.

Source code in packages/src/cosalette/_mqtt.py
async def subscribe(self, topic: str) -> None:
    """Silently discard a subscribe request."""
    logger.debug("NullMqttClient.subscribe(%s) — discarded", topic)

cosalette.WillConfig dataclass

WillConfig(
    topic: str,
    payload: str = "offline",
    qos: int = 1,
    retain: bool = True,
)

Last-Will-and-Testament configuration.

Abstracts aiomqtt.Will so that callers never depend on the aiomqtt package directly. The real client translates this into the library-specific type inside _connection_loop().

See Also

ADR-012 — Health and availability reporting.

cosalette.MessageCallback module-attribute

MessageCallback = Callable[[str, str], Awaitable[None]]

Async callback receiving (topic, payload) for each inbound message.

Error Handling

cosalette.ErrorPayload dataclass

ErrorPayload(
    error_type: str,
    message: str,
    device: str | None,
    timestamp: str,
    details: dict[str, object] = dict(),
)

Immutable structured error payload.

Represents a single error event ready for JSON serialisation and MQTT publication.

to_json

to_json() -> str

Serialise to a JSON string.

Source code in packages/src/cosalette/_errors.py
def to_json(self) -> str:
    """Serialise to a JSON string."""
    return dumps(asdict(self))

cosalette.ErrorPublisher dataclass

ErrorPublisher(
    mqtt: MqttPort,
    topic_prefix: str,
    error_type_map: dict[type[Exception], str] = dict(),
    clock: Callable[[], datetime] | None = None,
)

Publishes structured error payloads to MQTT.

Wraps :func:build_error_payload with fire-and-forget MQTT publication. Errors during publication are logged but never propagated — the main application loop must not crash because an error report failed.

Parameters:

Name Type Description Default
mqtt MqttPort

MQTT port used for publishing.

required
topic_prefix str

Base prefix for error topics (e.g. "velux2mqtt").

required
error_type_map dict[type[Exception], str]

Pluggable mapping from consumer exception types to machine-readable type strings.

dict()
clock Callable[[], datetime] | None

Optional callable returning a :class:~datetime.datetime for deterministic testing.

None

publish async

publish(
    error: Exception,
    *,
    device: str | None = None,
    is_root: bool = False,
) -> None

Build an error payload and publish it to MQTT.

Always publishes to {topic_prefix}/error. When device is provided, also publishes to {topic_prefix}/{device}/error (skipped for root devices, whose per-device topic would duplicate the global topic).

The entire pipeline (build → serialise → publish) is wrapped in fire-and-forget semantics: failures at any stage are logged but never propagated to the caller.

Source code in packages/src/cosalette/_errors.py
async def publish(
    self,
    error: Exception,
    *,
    device: str | None = None,
    is_root: bool = False,
) -> None:
    """Build an error payload and publish it to MQTT.

    Always publishes to ``{topic_prefix}/error``.  When *device*
    is provided, also publishes to ``{topic_prefix}/{device}/error``
    (skipped for root devices, whose per-device topic would
    duplicate the global topic).

    The entire pipeline (build → serialise → publish) is wrapped
    in fire-and-forget semantics: failures at *any* stage are
    logged but never propagated to the caller.
    """
    try:
        payload = build_error_payload(
            error,
            error_type_map=self.error_type_map,
            device=device,
            clock=self.clock,
        )
        payload_json = payload.to_json()
    except Exception:
        logger.exception(
            "Failed to build error payload for %r (device=%s)",
            error,
            device,
        )
        return

    global_topic = f"{self.topic_prefix}/error"
    logger.warning(
        "Publishing error: %s (type=%s, device=%s)",
        payload.message,
        payload.error_type,
        device,
    )
    await self._safe_publish(global_topic, payload_json)

    # Skip per-device topic for root devices (same as global)
    if device is not None and not is_root:
        device_topic = f"{self.topic_prefix}/{device}/error"
        await self._safe_publish(device_topic, payload_json)

cosalette.build_error_payload

build_error_payload(
    error: Exception,
    *,
    error_type_map: dict[type[Exception], str]
    | None = None,
    device: str | None = None,
    details: dict[str, object] | None = None,
    clock: Callable[[], datetime] | None = None,
) -> ErrorPayload

Convert an exception into a structured :class:ErrorPayload.

Looks up the exact class of the exception; subclasses are not matched.

Parameters:

Name Type Description Default
error Exception

The exception to convert.

required
error_type_map dict[type[Exception], str] | None

Optional mapping from exception types to machine-readable error_type strings. Falls back to "error" for unmapped types.

None
device str | None

Optional device name to include in the payload.

None
details dict[str, object] | None

Optional dict of additional context to attach to the payload. Defaults to an empty dict when None.

None
clock Callable[[], datetime] | None

Optional callable returning a :class:~datetime.datetime. Defaults to datetime.now(UTC).

None

Returns:

Type Description
ErrorPayload

A frozen dataclass ready for serialisation.

Source code in packages/src/cosalette/_errors.py
def build_error_payload(
    error: Exception,
    *,
    error_type_map: dict[type[Exception], str] | None = None,
    device: str | None = None,
    details: dict[str, object] | None = None,
    clock: Callable[[], datetime] | None = None,
) -> ErrorPayload:
    """Convert an exception into a structured :class:`ErrorPayload`.

    Looks up the exact class of the exception; subclasses are not matched.

    Args:
        error: The exception to convert.
        error_type_map: Optional mapping from exception types to machine-readable
            ``error_type`` strings.  Falls back to ``"error"`` for
            unmapped types.
        device: Optional device name to include in the payload.
        details: Optional dict of additional context to attach to the payload.
            Defaults to an empty dict when ``None``.
        clock: Optional callable returning a :class:`~datetime.datetime`.
            Defaults to ``datetime.now(UTC)``.

    Returns:
        A frozen dataclass ready for serialisation.
    """
    resolved_map = error_type_map or {}
    error_type = resolved_map.get(type(error), "error")
    now = clock() if clock is not None else datetime.now(UTC)
    return ErrorPayload(
        error_type=error_type,
        message=str(error),
        device=device,
        timestamp=now.isoformat(),
        details=details or {},
    )

Health and Availability

cosalette.DeviceStatus dataclass

DeviceStatus(status: str = 'ok')

Immutable status snapshot for a single device.

Used inside :class:HeartbeatPayload to report per-device health in the heartbeat JSON.

to_dict

to_dict() -> dict[str, str]

Serialise to a plain dictionary.

Source code in packages/src/cosalette/_health.py
def to_dict(self) -> dict[str, str]:
    """Serialise to a plain dictionary."""
    return asdict(self)

cosalette.HeartbeatPayload dataclass

HeartbeatPayload(
    status: str,
    uptime_s: float,
    version: str,
    devices: dict[str, DeviceStatus] = dict(),
)

Immutable structured heartbeat payload.

Represents an app-level status snapshot ready for JSON serialisation and MQTT publication.

to_json

to_json() -> str

Serialise to a JSON string.

Device entries are expanded to nested dicts via :meth:DeviceStatus.to_dict.

Source code in packages/src/cosalette/_health.py
def to_json(self) -> str:
    """Serialise to a JSON string.

    Device entries are expanded to nested dicts via
    :meth:`DeviceStatus.to_dict`.
    """
    data: dict[str, object] = {
        "status": self.status,
        "uptime_s": self.uptime_s,
        "version": self.version,
        "devices": {
            name: device.to_dict() for name, device in self.devices.items()
        },
    }
    return dumps(data)

cosalette.HealthReporter dataclass

HealthReporter(
    mqtt: MqttPort,
    topic_prefix: str,
    version: str,
    clock: ClockPort,
)

Publishes app heartbeats and per-device availability to MQTT.

Manages device tracking, uptime calculation (via monotonic clock), and graceful shutdown. All publication is fire-and-forget — errors are logged but never propagated.

Parameters:

Name Type Description Default
mqtt MqttPort

MQTT port used for publishing.

required
topic_prefix str

Base prefix for health topics (e.g. "velux2mqtt").

required
version str

Application version string included in heartbeats.

required
clock ClockPort

Monotonic clock for uptime measurement (see :class:ClockPort).

required

__post_init__

__post_init__() -> None

Capture the start time for uptime calculation.

Source code in packages/src/cosalette/_health.py
def __post_init__(self) -> None:
    """Capture the start time for uptime calculation."""
    self._start_time = self.clock.now()

set_device_status

set_device_status(device: str, status: str = 'ok') -> None

Update or add a device's status in the internal tracker.

Parameters:

Name Type Description Default
device str

Device name (used in topic paths and heartbeat payload).

required
status str

Free-form status string, defaults to "ok".

'ok'
Source code in packages/src/cosalette/_health.py
def set_device_status(self, device: str, status: str = "ok") -> None:
    """Update or add a device's status in the internal tracker.

    Args:
        device: Device name (used in topic paths and heartbeat payload).
        status: Free-form status string, defaults to ``"ok"``.
    """
    self._devices[device] = DeviceStatus(status=status)

remove_device

remove_device(device: str) -> None

Remove a device from internal tracking, if present.

Source code in packages/src/cosalette/_health.py
def remove_device(self, device: str) -> None:
    """Remove a device from internal tracking, if present."""
    self._devices.pop(device, None)

publish_device_available async

publish_device_available(
    device: str, *, is_root: bool = False
) -> None

Publish "online" to the device availability topic.

For root devices (unnamed), publishes to {prefix}/availability instead of {prefix}/{device}/availability.

Also registers the device as "ok" in internal tracking.

Source code in packages/src/cosalette/_health.py
async def publish_device_available(
    self,
    device: str,
    *,
    is_root: bool = False,
) -> None:
    """Publish ``"online"`` to the device availability topic.

    For root devices (unnamed), publishes to ``{prefix}/availability``
    instead of ``{prefix}/{device}/availability``.

    Also registers the device as ``"ok"`` in internal tracking.
    """
    if is_root:
        topic = f"{self.topic_prefix}/availability"
        self._root_devices.add(device)
    else:
        topic = f"{self.topic_prefix}/{device}/availability"
    await self._safe_publish(topic, "online")
    self.set_device_status(device)

publish_device_unavailable async

publish_device_unavailable(
    device: str, *, is_root: bool = False
) -> None

Publish "offline" to the device availability topic.

For root devices (unnamed), publishes to {prefix}/availability instead of {prefix}/{device}/availability.

Also removes the device from internal tracking.

Source code in packages/src/cosalette/_health.py
async def publish_device_unavailable(
    self,
    device: str,
    *,
    is_root: bool = False,
) -> None:
    """Publish ``"offline"`` to the device availability topic.

    For root devices (unnamed), publishes to ``{prefix}/availability``
    instead of ``{prefix}/{device}/availability``.

    Also removes the device from internal tracking.
    """
    if is_root:
        topic = f"{self.topic_prefix}/availability"
        self._root_devices.discard(device)
    else:
        topic = f"{self.topic_prefix}/{device}/availability"
    await self._safe_publish(topic, "offline")
    self.remove_device(device)

publish_heartbeat async

publish_heartbeat() -> None

Publish a structured JSON heartbeat to {prefix}/status.

The payload includes current uptime, version, and all tracked device statuses.

Source code in packages/src/cosalette/_health.py
async def publish_heartbeat(self) -> None:
    """Publish a structured JSON heartbeat to ``{prefix}/status``.

    The payload includes current uptime, version, and all tracked
    device statuses.
    """
    uptime = self.clock.now() - self._start_time
    payload = HeartbeatPayload(
        status="online",
        uptime_s=uptime,
        version=self.version,
        devices=dict(self._devices),
    )
    topic = f"{self.topic_prefix}/status"
    logger.debug("Publishing heartbeat to %s", topic)
    await self._safe_publish(topic, payload.to_json())

shutdown async

shutdown() -> None

Gracefully shut down: publish "offline" for everything.

Publishes "offline" to each tracked device's availability topic (using root topic for root devices), then publishes "offline" to the app status topic, and clears internal device tracking.

Source code in packages/src/cosalette/_health.py
async def shutdown(self) -> None:
    """Gracefully shut down: publish ``"offline"`` for everything.

    Publishes ``"offline"`` to each tracked device's availability
    topic (using root topic for root devices), then publishes
    ``"offline"`` to the app status topic, and clears internal
    device tracking.
    """
    logger.info("Health reporter shutting down — publishing offline")
    for device in list(self._devices):
        if device in self._root_devices:
            topic = f"{self.topic_prefix}/availability"
        else:
            topic = f"{self.topic_prefix}/{device}/availability"
        await self._safe_publish(topic, "offline")

    status_topic = f"{self.topic_prefix}/status"
    await self._safe_publish(status_topic, "offline")
    self._devices.clear()
    self._root_devices.clear()

cosalette.build_will_config

build_will_config(topic_prefix: str) -> WillConfig

Create a :class:WillConfig for the app's LWT.

The resulting config targets {topic_prefix}/status with payload "offline", QoS 1, retained. Pass this to :class:MqttClient so the broker publishes "offline" on unexpected disconnection.

Parameters:

Name Type Description Default
topic_prefix str

Application-level topic prefix (e.g. "velux2mqtt").

required

Returns:

Type Description
WillConfig

Pre-configured LWT for the app status topic.

Source code in packages/src/cosalette/_health.py
def build_will_config(topic_prefix: str) -> WillConfig:
    """Create a :class:`WillConfig` for the app's LWT.

    The resulting config targets ``{topic_prefix}/status`` with payload
    ``"offline"``, QoS 1, retained.  Pass this to :class:`MqttClient`
    so the broker publishes ``"offline"`` on unexpected disconnection.

    Args:
        topic_prefix: Application-level topic prefix (e.g. ``"velux2mqtt"``).

    Returns:
        Pre-configured LWT for the app status topic.
    """
    return WillConfig(
        topic=f"{topic_prefix}/status",
        payload="offline",
        qos=1,
        retain=True,
    )

Clock

cosalette.ClockPort

Bases: Protocol

Monotonic clock for timing measurements.

Used by device controllers and timing-sensitive components to measure elapsed time without being affected by system clock adjustments (NTP, manual changes, etc.).

The default implementation wraps time.monotonic(). Tests inject a deterministic fake clock for reproducible timing.

now

now() -> float

Return monotonic time in seconds.

Returns:

Type Description
float

A float representing seconds from an arbitrary epoch.

float

Only the difference between two calls is meaningful.

Source code in packages/src/cosalette/_clock.py
def now(self) -> float:
    """Return monotonic time in seconds.

    Returns:
        A float representing seconds from an arbitrary epoch.
        Only the *difference* between two calls is meaningful.
    """
    ...

sleep async

sleep(seconds: float) -> None

Sleep for seconds.

Used by :meth:DeviceContext.sleep for shutdown-aware sleeping. Production implementations delegate to asyncio.sleep; test doubles may advance virtual time instead.

Source code in packages/src/cosalette/_clock.py
async def sleep(self, seconds: float) -> None:
    """Sleep for *seconds*.

    Used by :meth:`DeviceContext.sleep` for shutdown-aware sleeping.
    Production implementations delegate to ``asyncio.sleep``; test
    doubles may advance virtual time instead.
    """
    ...

cosalette.SystemClock

Production clock wrapping time.monotonic().

Satisfies :class:ClockPort via structural subtyping.

Usage::

clock = SystemClock()
start = clock.now()
# ... some work ...
elapsed = clock.now() - start

now

now() -> float

Return monotonic time in seconds.

Source code in packages/src/cosalette/_clock.py
def now(self) -> float:
    """Return monotonic time in seconds."""
    return time.monotonic()

sleep async

sleep(seconds: float) -> None

Sleep for seconds using asyncio.sleep.

Source code in packages/src/cosalette/_clock.py
async def sleep(self, seconds: float) -> None:
    """Sleep for *seconds* using ``asyncio.sleep``."""
    await asyncio.sleep(seconds)

Logging

cosalette.JsonFormatter

JsonFormatter(*, service: str = '', version: str = '')

Bases: Formatter

Emit log records as single-line JSON objects (NDJSON).

Each record produces a JSON object with these fields:

  • timestamp — ISO 8601 with timezone (always UTC)
  • level — Python log level name
  • logger — dotted logger name
  • message — the formatted log message
  • service — application name for log correlation
  • version — application version (omitted when empty)
  • exception — formatted traceback (only present when an exception is logged)
  • stack_info — stack trace (only present when stack_info=True)

Parameters:

Name Type Description Default
service str

Application name included in every log line.

''
version str

Application version string. Omitted from output when empty.

''
Source code in packages/src/cosalette/_logging.py
def __init__(
    self,
    *,
    service: str = "",
    version: str = "",
) -> None:
    super().__init__()
    self._service = service
    self._version = version

format

format(record: LogRecord) -> str

Format a log record as a single-line JSON string.

Overrides :meth:logging.Formatter.format. The returned string contains no embedded newlines (tracebacks are escaped by the JSON serialiser), so each call produces exactly one log line — critical for container log drivers that split on \n.

Source code in packages/src/cosalette/_logging.py
def format(self, record: logging.LogRecord) -> str:
    """Format a log record as a single-line JSON string.

    Overrides :meth:`logging.Formatter.format`.  The returned
    string contains no embedded newlines (tracebacks are
    escaped by the JSON serialiser), so each call produces exactly
    one log line — critical for container log drivers that
    split on ``\\n``.
    """
    entry: dict[str, Any] = {
        "timestamp": datetime.fromtimestamp(record.created, tz=UTC).isoformat(),
        "level": record.levelname,
        "logger": record.name,
        "message": record.getMessage(),
        "service": self._service,
    }

    if self._version:
        entry["version"] = self._version

    if record.exc_info and record.exc_info[0] is not None:
        entry["exception"] = self.formatException(record.exc_info)

    if record.stack_info:
        entry["stack_info"] = self.formatStack(record.stack_info)

    return dumps(entry, default=str)

cosalette.configure_logging

configure_logging(
    settings: LoggingSettings,
    *,
    service: str,
    version: str = "",
) -> None

Configure the root logger from settings.

Clears any existing handlers on the root logger, then installs fresh handlers according to settings.

A :class:logging.StreamHandler writing to stderr is always installed. When settings.file is set, a :class:~logging.handlers.RotatingFileHandler is added as well.

Parameters:

Name Type Description Default
settings LoggingSettings

Logging configuration (level, format, file).

required
service str

Application name passed to :class:JsonFormatter.

required
version str

Application version passed to :class:JsonFormatter. Defaults to "".

''
Source code in packages/src/cosalette/_logging.py
def configure_logging(
    settings: LoggingSettings,
    *,
    service: str,
    version: str = "",
) -> None:
    """Configure the root logger from settings.

    Clears any existing handlers on the root logger, then
    installs fresh handlers according to *settings*.

    A :class:`logging.StreamHandler` writing to ``stderr`` is
    always installed.  When ``settings.file`` is set, a
    :class:`~logging.handlers.RotatingFileHandler` is added as
    well.

    Args:
        settings: Logging configuration (level, format, file).
        service: Application name passed to :class:`JsonFormatter`.
        version: Application version passed to
            :class:`JsonFormatter`.  Defaults to ``""``.
    """
    root = logging.getLogger()

    # Clear existing handlers
    for handler in root.handlers[:]:
        root.removeHandler(handler)
        with contextlib.suppress(Exception):
            handler.close()

    # Build formatter
    if settings.format == "json":
        formatter: logging.Formatter = JsonFormatter(service=service, version=version)
    else:
        formatter = logging.Formatter(_TEXT_FORMAT)

    # Stream handler (always present → stderr)
    stream_handler = logging.StreamHandler(sys.stderr)
    stream_handler.setFormatter(formatter)
    root.addHandler(stream_handler)

    # Optional rotating file handler
    if settings.file is not None:
        file_handler = RotatingFileHandler(
            settings.file,
            maxBytes=settings.max_file_size_mb * 1024 * 1024,
            backupCount=settings.backup_count,
            encoding="utf-8",
        )
        file_handler.setFormatter(formatter)
        root.addHandler(file_handler)

    root.setLevel(settings.level)

Settings

cosalette.Settings

Bases: BaseSettings

Root framework settings for cosalette applications.

Loaded from environment variables with the nested delimiter __ and an optional .env file in the working directory.

No env_prefix is set at the framework level — each application subclasses Settings and adds its own prefix (e.g. env_prefix="MYAPP_").

Example .env::

MQTT__HOST=broker.local
MQTT__PORT=1883
MQTT__USERNAME=user
MQTT__PASSWORD=secret
LOGGING__LEVEL=DEBUG
LOGGING__FORMAT=text

Example with an application prefix (subclass)::

class MyAppSettings(Settings):
    model_config = SettingsConfigDict(
        env_prefix="MYAPP_",
        env_nested_delimiter="__",
        env_file=".env",
        env_file_encoding="utf-8",
    )

model_config class-attribute instance-attribute

model_config = SettingsConfigDict(
    env_nested_delimiter="__",
    env_file=".env",
    env_file_encoding="utf-8",
    extra="ignore",
)

Settings uses extra="ignore" because the base class sets no env_prefix. Without a prefix, pydantic-settings reads every environment variable; the BaseSettings default of extra="forbid" would then reject unrelated variables (GH_TOKEN, PATH, etc.) as validation errors.

Subclasses that set env_prefix only see prefixed variables and may safely tighten this to extra="forbid" for strict validation.

cosalette.MqttSettings

Bases: BaseModel

MQTT broker connection and topic configuration.

Environment variables (with __ nesting)::

MQTT__HOST=broker.local
MQTT__PORT=1883
MQTT__USERNAME=user
MQTT__PASSWORD=secret
MQTT__TOPIC_PREFIX=myapp

cosalette.LoggingSettings

Bases: BaseModel

Logging configuration.

When file is set, logs are also written to a rotating file (size-based rotation, backup_count generations kept). When None, logs go to stderr only.

The format field selects the output format:

  • "json" (default) — structured JSON lines for container log aggregators (Loki, Elasticsearch, CloudWatch). Each line is a complete JSON object with correlation metadata.
  • "text" — human-readable timestamped format for local development and direct terminal use.

Adapter Lifecycle

Adapters registered via app.adapter() that implement the async context manager protocol (__aenter__/__aexit__) are automatically managed by the framework:

  • Entered during startup, before the lifespan= hook runs
  • Exited during shutdown, after the lifespan= hook exits
  • Managed via contextlib.AsyncExitStack for LIFO ordering and exception safety
  • Adapters without __aenter__/__aexit__ pass through unchanged

The detection is duck-typed — any object with both __aenter__ and __aexit__ attributes qualifies. No base class or registration is needed.

See ADR-016 for the design rationale and Adapter Lifecycle Management for usage examples.

Publish Strategies

cosalette.PublishStrategy

Bases: Protocol

Publish-decision contract for the device loop.

The framework calls _bind before the loop to inject the clock, should_publish each iteration, and on_published after a successful publish to let the strategy reset internal state (counters, timestamps, etc.).

should_publish

should_publish(
    current: dict[str, object],
    previous: dict[str, object] | None,
) -> bool

Decide whether the current reading should be published.

Parameters:

Name Type Description Default
current dict[str, object]

The latest telemetry payload.

required
previous dict[str, object] | None

The last published payload, or None on the very first call.

required

Returns:

Type Description
bool

True if the framework should publish current.

Source code in packages/src/cosalette/_strategies.py
def should_publish(
    self,
    current: dict[str, object],
    previous: dict[str, object] | None,
) -> bool:
    """Decide whether the current reading should be published.

    Args:
        current: The latest telemetry payload.
        previous: The last *published* payload, or ``None`` on the
            very first call.

    Returns:
        ``True`` if the framework should publish ``current``.
    """
    ...

on_published

on_published() -> None

Called after a successful publish to reset internal state.

Source code in packages/src/cosalette/_strategies.py
def on_published(self) -> None:
    """Called after a successful publish to reset internal state."""
    ...

cosalette.Every

Every(
    *, seconds: float | None = None, n: int | None = None
)

Bases: _StrategyBase

Time-based or count-based publish throttle.

Exactly one of seconds or n must be provided.

Every(seconds=30) Publish at most once every 30 seconds. Requires a :class:ClockPort injected via _bind(). Before binding, should_publish always returns True (safe fallback).

Every(n=5) Publish every 5th reading. No clock dependency.

Raises:

Type Description
ValueError

If both, neither, or non-positive values are given.

Source code in packages/src/cosalette/_strategies.py
def __init__(
    self,
    *,
    seconds: float | None = None,
    n: int | None = None,
) -> None:
    if seconds is not None and n is not None:
        msg = "Specify exactly one of 'seconds' or 'n', not both"
        raise ValueError(msg)
    if seconds is None and n is None:
        msg = "Specify exactly one of 'seconds' or 'n'"
        raise ValueError(msg)

    if seconds is not None and seconds <= 0:
        msg = "'seconds' must be positive"
        raise ValueError(msg)
    if n is not None and n <= 0:
        msg = "'n' must be positive"
        raise ValueError(msg)

    self._seconds = seconds
    self._n = n

    # Time-mode state
    self._clock: ClockPort | None = None
    self._last_publish_time: float | None = None

    # Count-mode state
    self._counter: int = 0

should_publish

should_publish(
    current: dict[str, object],
    previous: dict[str, object] | None,
) -> bool

Return True when enough time/calls have elapsed.

Source code in packages/src/cosalette/_strategies.py
def should_publish(
    self,
    current: dict[str, object],  # noqa: ARG002
    previous: dict[str, object] | None,  # noqa: ARG002
) -> bool:
    """Return ``True`` when enough time/calls have elapsed."""
    if self._seconds is not None:
        return self._should_publish_time()
    return self._should_publish_count()

on_published

on_published() -> None

Record publish timestamp or reset counter.

Source code in packages/src/cosalette/_strategies.py
def on_published(self) -> None:
    """Record publish timestamp or reset counter."""
    if self._seconds is not None:
        if self._clock is not None:
            self._last_publish_time = self._clock.now()
    else:
        self._counter = 0

cosalette.OnChange

OnChange(
    *, threshold: float | dict[str, float] | None = None
)

Bases: _StrategyBase

Publish when the telemetry payload changes.

With threshold=None (default), uses exact equality (current != previous).

When threshold is a float, it acts as a global numeric dead-band: a leaf field must change by more than threshold (strict >) to trigger a publish. Non-numeric fields fall back to !=.

When threshold is a dict[str, float], each key names a leaf field with its own dead-band. Use dot-notation for nested fields (e.g. {"sensor.temp": 0.5}). Fields not listed in the dict use exact equality.

Thresholds are applied to leaf values only. Nested dicts are traversed recursively — {"sensor": {"temp": 22.5}} compares temp numerically, not the intermediate sensor dict as a whole.

In both threshold modes, structural changes (added or removed keys at any nesting level) always trigger a publish, and fields are combined with OR semantics — any single leaf field exceeding its threshold is sufficient.

Parameters:

Name Type Description Default
threshold float | dict[str, float] | None

Optional dead-band for numeric change detection. None → exact equality, float → global threshold, dict[str, float] → per-field thresholds (dot-notation for nested keys).

None
Source code in packages/src/cosalette/_strategies.py
def __init__(
    self,
    *,
    threshold: float | dict[str, float] | None = None,
) -> None:
    if isinstance(threshold, dict):
        for field, value in threshold.items():
            if isinstance(value, bool):
                msg = f"Threshold for '{field}' must be a number, got bool"
                raise TypeError(msg)
            if value < 0:
                msg = f"Threshold for '{field}' must be non-negative, got {value}"
                raise ValueError(msg)
    elif isinstance(threshold, bool):
        msg = "Threshold must be a number, got bool"
        raise TypeError(msg)
    elif isinstance(threshold, (int, float)) and threshold < 0:
        msg = f"Threshold must be non-negative, got {threshold}"
        raise ValueError(msg)
    self._threshold = threshold

should_publish

should_publish(
    current: dict[str, object],
    previous: dict[str, object] | None,
) -> bool

Return True when the payload differs from the last publish.

When a threshold is configured, numeric fields are compared using abs(current - previous) > threshold (strict inequality). Non-numeric fields and structural changes always use exact equality.

Source code in packages/src/cosalette/_strategies.py
def should_publish(
    self,
    current: dict[str, object],
    previous: dict[str, object] | None,
) -> bool:
    """Return ``True`` when the payload differs from the last publish.

    When a threshold is configured, numeric fields are compared
    using ``abs(current - previous) > threshold`` (strict
    inequality).  Non-numeric fields and structural changes always
    use exact equality.
    """
    if previous is None:
        return True
    if self._threshold is None:
        return current != previous
    return self._check_with_threshold(current, previous)

on_published

on_published() -> None

No-op — OnChange is stateless.

Source code in packages/src/cosalette/_strategies.py
def on_published(self) -> None:
    """No-op — ``OnChange`` is stateless."""

Introspection

cosalette.build_registry_snapshot

build_registry_snapshot(app: App) -> dict[str, Any]

Build a JSON-serializable snapshot of all app registrations.

Produces a dict describing the app metadata, devices, telemetry, commands, and adapters — suitable for json.dumps() without custom encoders.

Parameters:

Name Type Description Default
app App

The cosalette :class:App instance to introspect.

required

Returns:

Type Description
dict[str, Any]

A plain dict with string keys and JSON-serializable values.

Source code in packages/src/cosalette/_introspect.py
def build_registry_snapshot(app: App) -> dict[str, Any]:
    """Build a JSON-serializable snapshot of all app registrations.

    Produces a dict describing the app metadata, devices, telemetry,
    commands, and adapters — suitable for ``json.dumps()`` without
    custom encoders.

    Args:
        app: The cosalette :class:`App` instance to introspect.

    Returns:
        A plain dict with string keys and JSON-serializable values.
    """
    return {
        "app": {
            "name": app._name,
            "version": app._version,
            "description": app._description,
        },
        "devices": [_describe_device(reg) for reg in app._devices],
        "telemetry": [_describe_telemetry(reg) for reg in app._telemetry],
        "commands": [_describe_command(reg) for reg in app._commands],
        "adapters": [
            _describe_adapter(port_type, entry)
            for port_type, entry in app._adapters.items()
        ],
    }

Retry / Backoff

cosalette.BackoffStrategy

Bases: Protocol

Backoff-delay contract for telemetry retry.

The framework calls delay(attempt) between retry attempts. Attempt numbers are 1-based.

delay

delay(attempt: int) -> float

Return delay in seconds for the given attempt (1-based).

Source code in packages/src/cosalette/_retry.py
def delay(self, attempt: int) -> float:
    """Return delay in seconds for the given attempt (1-based)."""
    ...

cosalette.ExponentialBackoff

ExponentialBackoff(
    base: float = 2.0, max_delay: float = 60.0
)

Exponential backoff with ±20% jitter.

min(base * 2^(attempt-1), max_delay)

Source code in packages/src/cosalette/_retry.py
def __init__(self, base: float = 2.0, max_delay: float = 60.0) -> None:
    self._base = base
    self._max_delay = max_delay

cosalette.LinearBackoff

LinearBackoff(step: float = 2.0, max_delay: float = 60.0)

Linear backoff: min(step * attempt, max_delay) with ±20% jitter.

Source code in packages/src/cosalette/_retry.py
def __init__(self, step: float = 2.0, max_delay: float = 60.0) -> None:
    self._step = step
    self._max_delay = max_delay

cosalette.FixedBackoff

FixedBackoff(delay: float = 5.0)

Fixed backoff: constant delay with ±20% jitter.

Source code in packages/src/cosalette/_retry.py
def __init__(self, delay: float = 5.0) -> None:
    self._delay = delay

cosalette.CircuitBreaker

CircuitBreaker(threshold: int = 5)

Optional circuit breaker for telemetry retry.

Tracks consecutive failed cycles (where all retries were exhausted). After threshold consecutive failures, the circuit opens and the handler is skipped until a half-open probe succeeds.

Source code in packages/src/cosalette/_retry.py
def __init__(self, threshold: int = 5) -> None:
    if not isinstance(threshold, int) or threshold < 1:
        msg = "threshold must be a positive integer (>= 1)"
        raise ValueError(msg)
    self._threshold = threshold
    self._consecutive_failures = 0
    self._state: str = "closed"  # closed | open | half-open

record_success

record_success() -> None

Record a successful handler execution. Resets all state.

Source code in packages/src/cosalette/_retry.py
def record_success(self) -> None:
    """Record a successful handler execution. Resets all state."""
    self._consecutive_failures = 0
    self._state = "closed"

record_failure

record_failure() -> None

Record a cycle where all retries were exhausted.

Source code in packages/src/cosalette/_retry.py
def record_failure(self) -> None:
    """Record a cycle where all retries were exhausted."""
    self._consecutive_failures += 1
    if self._consecutive_failures >= self._threshold:
        self._state = "open"

should_attempt

should_attempt() -> bool

Return True if the handler should execute this cycle.

  • closed: always True
  • open: skip this cycle, transition to half-open for next cycle
  • half-open: True (probe attempt)
Source code in packages/src/cosalette/_retry.py
def should_attempt(self) -> bool:
    """Return True if the handler should execute this cycle.

    - closed: always True
    - open: skip this cycle, transition to half-open for next cycle
    - half-open: True (probe attempt)
    """
    if self._state == "closed":
        return True
    if self._state == "open":
        # Skip this cycle, but transition to half-open for the NEXT cycle
        self._state = "half-open"
        return False
    # half-open: probe attempt
    return True

Filters

cosalette.Filter

Bases: Protocol

Signal filter contract.

All filters follow the update → value pattern:

  1. Call update(raw) with each new measurement.
  2. The return value is the filtered output.
  3. Access value for the current filtered state.
  4. Call reset() to clear internal state.

The first update() call seeds the filter — it returns the raw value unchanged (no history to smooth against).

value property

value: float | None

Current filtered value, or None before the first update.

update

update(raw: float) -> float

Feed a raw measurement and return the filtered value.

Source code in packages/src/cosalette/_filters.py
def update(self, raw: float) -> float:
    """Feed a raw measurement and return the filtered value."""
    ...

reset

reset() -> None

Clear internal state so the next update re-seeds.

Source code in packages/src/cosalette/_filters.py
def reset(self) -> None:
    """Clear internal state so the next update re-seeds."""
    ...

cosalette.Pt1Filter

Pt1Filter(tau: float, dt: float)

First-order low-pass (PT1) filter — Rust drop-in for the Python implementation.

__doc__ class-attribute

__doc__ = "First-order low-pass (PT1) filter — Rust drop-in for the Python implementation."

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.

__module__ class-attribute

__module__ = 'cosalette._filters_rs'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.

alpha property

alpha: float

dt property

dt: float

tau property

tau: float

value property

value: float | None

__new__ builtin

__new__(*args, **kwargs)

Create and return a new object. See help(type) for accurate signature.

__repr__ method descriptor

__repr__()

Return repr(self).

cosalette.MedianFilter

MedianFilter(window: int)

Sliding-window median filter — Rust drop-in for the Python implementation.

__doc__ class-attribute

__doc__ = "Sliding-window median filter — Rust drop-in for the Python implementation."

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.

__module__ class-attribute

__module__ = 'cosalette._filters_rs'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.

value property

value: float | None

window property

window: int

__new__ builtin

__new__(*args, **kwargs)

Create and return a new object. See help(type) for accurate signature.

__repr__ method descriptor

__repr__()

Return repr(self).

cosalette.OneEuroFilter

OneEuroFilter(
    min_cutoff: float = 1.0,
    beta: float = 0.0,
    d_cutoff: float = 1.0,
    dt: float = 1.0,
)

Adaptive low-pass filter (1€ Filter) — Rust drop-in for the Python implementation.

__doc__ class-attribute

__doc__ = "Adaptive low-pass filter (1€ Filter) — Rust drop-in for the Python implementation."

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.

__module__ class-attribute

__module__ = 'cosalette._filters_rs'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.

beta property

beta: float

d_cutoff property

d_cutoff: float

dt property

dt: float

min_cutoff property

min_cutoff: float

value property

value: float | None

__new__ builtin

__new__(*args, **kwargs)

Create and return a new object. See help(type) for accurate signature.

__repr__ method descriptor

__repr__()

Return repr(self).

Persistence

cosalette.PersistPolicy

Bases: Protocol

Save-timing contract for the persistence system.

The framework calls should_save after each telemetry cycle to decide whether to persist the :class:DeviceStore immediately. This is simpler than :class:PublishStrategy — no clock binding or post-publish callback is needed.

should_save

should_save(store: DeviceStore, published: bool) -> bool

Decide whether to save right now.

Parameters:

Name Type Description Default
store DeviceStore

The :class:DeviceStore being managed.

required
published bool

True if an MQTT publish just occurred this cycle.

required

Returns:

Type Description
bool

True if the store should be saved now.

Source code in packages/src/cosalette/_persist.py
def should_save(self, store: DeviceStore, published: bool) -> bool:
    """Decide whether to save right now.

    Args:
        store: The :class:`DeviceStore` being managed.
        published: ``True`` if an MQTT publish just occurred
            this cycle.

    Returns:
        ``True`` if the store should be saved now.
    """
    ...

cosalette.SaveOnPublish

Bases: _SavePolicyBase

Save after each successful MQTT publish.

This is the most common policy — the store is persisted whenever new data is published to MQTT, ensuring the persisted state matches what's been broadcast.

should_save

should_save(store: DeviceStore, published: bool) -> bool

Return True when an MQTT publish just occurred.

Source code in packages/src/cosalette/_persist.py
def should_save(
    self,
    store: DeviceStore,  # noqa: ARG002
    published: bool,
) -> bool:
    """Return ``True`` when an MQTT publish just occurred."""
    return published

cosalette.SaveOnChange

Bases: _SavePolicyBase

Save whenever the store has been modified (dirty).

Saves on every handler cycle where the store was mutated, regardless of whether MQTT publishing occurred. Most aggressive policy — ensures minimal data loss on crash.

should_save

should_save(store: DeviceStore, published: bool) -> bool

Return True when the store has uncommitted changes.

Source code in packages/src/cosalette/_persist.py
def should_save(
    self,
    store: DeviceStore,
    published: bool,  # noqa: ARG002
) -> bool:
    """Return ``True`` when the store has uncommitted changes."""
    return store.dirty

cosalette.SaveOnShutdown

Bases: _SavePolicyBase

Save only on graceful shutdown.

The lightest I/O policy — no saves during normal operation. Data accumulated during a session is only persisted when the app shuts down cleanly. Risk: data loss on hard crash/power loss.

Note: The framework always saves on shutdown regardless of policy, so this policy effectively means "never save during the loop".

should_save

should_save(store: DeviceStore, published: bool) -> bool

Always return False — framework handles shutdown save.

Source code in packages/src/cosalette/_persist.py
def should_save(
    self,
    store: DeviceStore,  # noqa: ARG002
    published: bool,  # noqa: ARG002
) -> bool:
    """Always return ``False`` — framework handles shutdown save."""
    return False

Stores

cosalette.Store

Bases: Protocol

Key-value persistence for device state.

Each key maps to a JSON-serializable dict. Implementations must handle missing keys (return None) and create storage locations as needed.

load

load(key: str) -> dict[str, object] | None

Load state for the given key. Returns None if not found.

Source code in packages/src/cosalette/_stores.py
def load(self, key: str) -> dict[str, object] | None:
    """Load state for the given key.  Returns ``None`` if not found."""
    ...

save

save(key: str, data: dict[str, object]) -> None

Persist state for the given key.

Source code in packages/src/cosalette/_stores.py
def save(self, key: str, data: dict[str, object]) -> None:
    """Persist state for the given key."""
    ...

cosalette.DeviceStore

DeviceStore(backend: Store, key: str)

Per-device scoped store with dirty tracking.

Wraps a :class:Store backend, automatically keyed by device name. Behaves like a dict (MutableMapping interface) — handlers read and write state naturally via store["key"] = value.

The framework creates one DeviceStore per device and manages its lifecycle:

  1. load() — called before the first handler invocation.
  2. Handler reads/writes via dict-like access.
  3. save() — always called on shutdown (safety net).

Dirty tracking: after load or save, the store is "clean". Any __setitem__ or __delitem__ marks it "dirty". For nested mutations the store cannot detect automatically, call :meth:mark_dirty explicitly.

Source code in packages/src/cosalette/_stores.py
def __init__(self, backend: Store, key: str) -> None:
    self._backend = backend
    self._key = key
    self._data: dict[str, object] = {}
    self._dirty = False
    self._loaded = False

dirty property

dirty: bool

True if state has been modified since last load/save.

load

load() -> None

Load state from backend. Called by framework before first use.

Source code in packages/src/cosalette/_stores.py
def load(self) -> None:
    """Load state from backend.  Called by framework before first use."""
    saved = self._backend.load(self._key)
    self._data = saved if saved is not None else {}
    self._dirty = False
    self._loaded = True

save

save() -> None

Persist current state to backend.

Source code in packages/src/cosalette/_stores.py
def save(self) -> None:
    """Persist current state to backend."""
    self._backend.save(self._key, dict(self._data))
    self._dirty = False

mark_dirty

mark_dirty() -> None

Explicitly mark the store as dirty.

Use when mutating nested structures that __setitem__ can't detect (e.g. store["list"].append(x)).

Source code in packages/src/cosalette/_stores.py
def mark_dirty(self) -> None:
    """Explicitly mark the store as dirty.

    Use when mutating nested structures that ``__setitem__``
    can't detect (e.g. ``store["list"].append(x)``).
    """
    self._dirty = True

get

get(key: str, default: object = None) -> object

Return the value for key, or default if not present.

Source code in packages/src/cosalette/_stores.py
def get(self, key: str, default: object = None) -> object:
    """Return the value for *key*, or *default* if not present."""
    self._check_loaded()
    return self._data.get(key, default)

setdefault

setdefault(key: str, default: object = None) -> object

Return self[key] if present, else set and return default.

Source code in packages/src/cosalette/_stores.py
def setdefault(self, key: str, default: object = None) -> object:
    """Return ``self[key]`` if present, else set and return *default*."""
    self._check_loaded()
    if key not in self._data:
        self._data[key] = default
        self._dirty = True
    return self._data[key]

update

update(
    other: dict[str, object] | None = None, **kwargs: object
) -> None

Update the store from a dict and/or keyword arguments.

Source code in packages/src/cosalette/_stores.py
def update(self, other: dict[str, object] | None = None, **kwargs: object) -> None:
    """Update the store from a dict and/or keyword arguments."""
    self._check_loaded()
    if other:
        self._data.update(other)
        self._dirty = True
    if kwargs:
        self._data.update(kwargs)
        self._dirty = True

to_dict

to_dict() -> dict[str, object]

Return a shallow copy of the underlying data dict.

Useful when returning state from a telemetry handler (the handler returns this dict for MQTT publishing).

Source code in packages/src/cosalette/_stores.py
def to_dict(self) -> dict[str, object]:
    """Return a shallow copy of the underlying data dict.

    Useful when returning state from a telemetry handler
    (the handler returns this dict for MQTT publishing).
    """
    self._check_loaded()
    return dict(self._data)

keys

keys() -> KeysView[str]

Return a view of the store's keys.

Source code in packages/src/cosalette/_stores.py
def keys(self) -> KeysView[str]:
    """Return a view of the store's keys."""
    self._check_loaded()
    return self._data.keys()

values

values() -> ValuesView[object]

Return a view of the store's values.

Source code in packages/src/cosalette/_stores.py
def values(self) -> ValuesView[object]:
    """Return a view of the store's values."""
    self._check_loaded()
    return self._data.values()

items

items() -> ItemsView[str, object]

Return a view of the store's items.

Source code in packages/src/cosalette/_stores.py
def items(self) -> ItemsView[str, object]:
    """Return a view of the store's items."""
    self._check_loaded()
    return self._data.items()

cosalette.NullStore

No-op store — load always returns None, save is silent.

Use when persistence is disabled or for dry-run modes.

load

load(key: str) -> dict[str, object] | None

Always returns None.

Source code in packages/src/cosalette/_stores.py
def load(self, key: str) -> dict[str, object] | None:  # noqa: ARG002
    """Always returns ``None``."""
    return None

save

save(key: str, data: dict[str, object]) -> None

Does nothing.

Source code in packages/src/cosalette/_stores.py
def save(self, key: str, data: dict[str, object]) -> None:  # noqa: ARG002
    """Does nothing."""

cosalette.MemoryStore

MemoryStore(
    initial: dict[str, dict[str, object]] | None = None,
)

In-memory store backed by a plain dict.

Both load and save deep-copy data so that callers cannot mutate internal state by accident. Designed for tests — mirrors the FakeStorage pattern from gas2mqtt.

Parameters

initial: Optional seed data. The mapping is deep-copied on construction.

Source code in packages/src/cosalette/_stores.py
def __init__(
    self,
    initial: dict[str, dict[str, object]] | None = None,
) -> None:
    self._data: dict[str, dict[str, object]] = (
        copy.deepcopy(initial) if initial else {}
    )

load

load(key: str) -> dict[str, object] | None

Return a deep copy of the stored dict, or None.

Source code in packages/src/cosalette/_stores.py
def load(self, key: str) -> dict[str, object] | None:
    """Return a deep copy of the stored dict, or ``None``."""
    value = self._data.get(key)
    if value is None:
        return None
    return copy.deepcopy(value)

save

save(key: str, data: dict[str, object]) -> None

Store a deep copy of data.

Source code in packages/src/cosalette/_stores.py
def save(self, key: str, data: dict[str, object]) -> None:
    """Store a deep copy of *data*."""
    self._data[key] = copy.deepcopy(data)

cosalette.JsonFileStore

JsonFileStore(path: Path | str)

Single-file JSON store with atomic writes.

All keys live as top-level keys in one JSON object. Writes use a write-to-temp + os.replace pattern for atomicity so that a crash mid-write never corrupts the file.

Parameters

path: Path to the JSON file. Parent directories are created automatically on the first save.

Source code in packages/src/cosalette/_stores.py
def __init__(self, path: Path | str) -> None:
    self._path = Path(path)

load

load(key: str) -> dict[str, object] | None

Load a key from the JSON file.

Returns None when the file does not exist, the key is missing, or the file contains invalid JSON (a warning is logged in the latter case).

Source code in packages/src/cosalette/_stores.py
def load(self, key: str) -> dict[str, object] | None:
    """Load a key from the JSON file.

    Returns ``None`` when the file does not exist, the key is
    missing, or the file contains invalid JSON (a warning is
    logged in the latter case).
    """
    if not self._path.exists():
        return None

    try:
        text = self._path.read_text(encoding="utf-8")
        data = loads(text)
    except (JSONDecodeError, OSError) as exc:
        logger.warning("Corrupt or unreadable store file %s: %s", self._path, exc)
        return None

    if not isinstance(data, dict):
        logger.warning(
            "Store file %s contains non-object JSON, treating as empty",
            self._path,
        )
        return None

    return data.get(key)

save

save(key: str, data: dict[str, object]) -> None

Persist data under key using an atomic write.

The full JSON object is read (if it exists), the key is updated, and the result is written to a temporary file before being atomically moved into place.

Source code in packages/src/cosalette/_stores.py
def save(self, key: str, data: dict[str, object]) -> None:
    """Persist *data* under *key* using an atomic write.

    The full JSON object is read (if it exists), the key is
    updated, and the result is written to a temporary file
    before being atomically moved into place.
    """
    self._path.parent.mkdir(parents=True, exist_ok=True)

    # Read existing content (if any)
    existing: dict[str, object] = {}
    if self._path.exists():
        try:
            text = self._path.read_text(encoding="utf-8")
            parsed = loads(text)
            if isinstance(parsed, dict):
                existing = parsed
            else:
                logger.warning(
                    "Overwriting non-object JSON in store file %s",
                    self._path,
                )
        except (JSONDecodeError, OSError) as exc:
            logger.warning(
                "Overwriting corrupt store file %s: %s",
                self._path,
                exc,
            )

    existing[key] = data

    tmp_path = self._path.with_suffix(".tmp")
    tmp_path.write_text(
        dumps_pretty(existing) + "\n",
        encoding="utf-8",
    )
    os.replace(tmp_path, self._path)

cosalette.SqliteStore

SqliteStore(path: Path | str)

SQLite-backed store using WAL mode for power-loss resistance.

Each key is a row in a store table; the value is stored as a JSON text column. The table is auto-created on first use.

Parameters

path: Path to the SQLite database file. Parent directories are created automatically.

Source code in packages/src/cosalette/_stores.py
def __init__(self, path: Path | str) -> None:
    self._path = Path(path)
    self._path.parent.mkdir(parents=True, exist_ok=True)
    self._conn = sqlite3.connect(str(self._path))
    self._conn.execute("PRAGMA journal_mode=WAL")
    self._conn.execute(
        "CREATE TABLE IF NOT EXISTS store "
        "(key TEXT PRIMARY KEY, data TEXT NOT NULL)"
    )
    self._conn.commit()

load

load(key: str) -> dict[str, object] | None

Load JSON data for key, or None if not present.

Source code in packages/src/cosalette/_stores.py
def load(self, key: str) -> dict[str, object] | None:
    """Load JSON data for *key*, or ``None`` if not present."""
    cur = self._conn.execute("SELECT data FROM store WHERE key = ?", (key,))
    row = cur.fetchone()
    if row is None:
        return None
    return loads(row[0])  # type: ignore[no-any-return]

save

save(key: str, data: dict[str, object]) -> None

Insert or replace data for key.

Source code in packages/src/cosalette/_stores.py
def save(self, key: str, data: dict[str, object]) -> None:
    """Insert or replace *data* for *key*."""
    self._conn.execute(
        "INSERT OR REPLACE INTO store (key, data) VALUES (?, ?)",
        (key, dumps_pretty(data)),
    )
    self._conn.commit()

close

close() -> None

Close the underlying database connection.

Source code in packages/src/cosalette/_stores.py
def close(self) -> None:
    """Close the underlying database connection."""
    self._conn.close()