Skip to content

API Reference

Complete reference for all public classes, functions, and protocols exported by cosalette.

Application

cosalette.App

App(
    name: str,
    version: str = "0.0.0",
    *,
    description: str = "IoT-to-MQTT bridge",
    settings_class: type[Settings] = Settings,
    dry_run: bool = False,
    heartbeat_interval: float | None = 60.0,
    health_check_interval: float | None = 30.0,
    lifespan: LifespanFunc | None = None,
    store: Store | Callable[..., Store] | None = None,
    adapters: dict[
        type,
        type
        | str
        | Callable[..., object]
        | tuple[
            type | str | Callable[..., object],
            type | str | Callable[..., object],
        ],
    ]
    | None = None,
    restart_after_failures: int = 5,
    max_restarts: int = 3,
    restart_cooldown: float = 5.0,
    sustained_health_reset: float = 300.0,
)

Bases: _ConfigureMixin, _DeviceMixin, _CommandMixin, _TelemetryMixin, _StreamMixin, _PeriodicMixin, _AdapterMixin, _LifecycleMixin, _AsyncapiMixin

Central composition root and application orchestrator.

Collects device registrations, adapter mappings, and an optional lifespan context manager, then runs the full async lifecycle in :meth:run.

See Also

ADR-001 — Framework architecture (IoC, composition root).

Initialise the application orchestrator.

Parameters:

Name Type Description Default
name str

Application name (used as MQTT topic prefix and client ID).

required
version str

Application version string.

'0.0.0'
description str

Short description for CLI help text.

'IoT-to-MQTT bridge'
settings_class type[Settings]

Settings subclass to instantiate at startup.

Settings
dry_run bool

When True, resolve dry-run adapter variants.

False
heartbeat_interval float | None

Seconds between periodic heartbeats published to {prefix}/status. Set to None to disable periodic heartbeats entirely. Defaults to 60.

60.0
health_check_interval float | None

Seconds between periodic health checks for adapters implementing :class:~cosalette.HealthCheckable. Set to None to disable health checks entirely. Defaults to 30.

30.0
lifespan LifespanFunc | None

Async context manager for application startup and shutdown. Code before yield runs before devices start; code after yield runs after devices stop. Receives an :class:AppContext. When None, a no-op default is used.

None
store Store | Callable[..., Store] | None

Optional :class:Store backend for device persistence, or a callable factory Callable[..., Store] whose parameters are injected from resolved settings and adapters at bootstrap time. When set, the framework creates a :class:DeviceStore per device and injects it into handlers that declare a DeviceStore parameter.

None
adapters dict[type, type | str | Callable[..., object] | tuple[type | str | Callable[..., object], type | str | Callable[..., object]]] | None

Optional mapping of port types to adapter implementations. Each key is a Protocol type; each value is either a single implementation (class, lazy-import string, or factory callable) or a (impl, dry_run) tuple. Entries are registered via :meth:adapter and coexist with later imperative calls.

None
Source code in packages/src/cosalette/_app/__init__.py
def __init__(
    self,
    name: str,
    version: str = "0.0.0",
    *,
    description: str = "IoT-to-MQTT bridge",
    settings_class: type[Settings] = Settings,
    dry_run: bool = False,
    heartbeat_interval: float | None = 60.0,
    health_check_interval: float | None = 30.0,
    lifespan: LifespanFunc | None = None,
    store: Store | Callable[..., Store] | None = None,
    adapters: dict[
        type,
        type
        | str
        | Callable[..., object]
        | tuple[
            type | str | Callable[..., object],
            type | str | Callable[..., object],
        ],
    ]
    | None = None,
    restart_after_failures: int = 5,
    max_restarts: int = 3,
    restart_cooldown: float = 5.0,
    sustained_health_reset: float = 300.0,
) -> None:
    """Initialise the application orchestrator.

    Args:
        name: Application name (used as MQTT topic prefix and client ID).
        version: Application version string.
        description: Short description for CLI help text.
        settings_class: Settings subclass to instantiate at startup.
        dry_run: When True, resolve dry-run adapter variants.
        heartbeat_interval: Seconds between periodic heartbeats
            published to ``{prefix}/status``.  Set to ``None`` to
            disable periodic heartbeats entirely.  Defaults to 60.
        health_check_interval: Seconds between periodic health
            checks for adapters implementing
            :class:`~cosalette.HealthCheckable`.  Set to ``None`` to
            disable health checks entirely.  Defaults to 30.
        lifespan: Async context manager for application startup
            and shutdown.  Code before ``yield`` runs before devices
            start; code after ``yield`` runs after devices stop.
            Receives an :class:`AppContext`.  When ``None``, a no-op
            default is used.
        store: Optional :class:`Store` backend for device persistence,
            or a callable factory ``Callable[..., Store]`` whose
            parameters are injected from resolved settings and
            adapters at bootstrap time.
            When set, the framework creates a :class:`DeviceStore`
            per device and injects it into handlers that declare a
            ``DeviceStore`` parameter.
        adapters: Optional mapping of port types to adapter
            implementations.  Each key is a Protocol type; each
            value is either a single implementation (class,
            lazy-import string, or factory callable) or a
            ``(impl, dry_run)`` tuple.  Entries are registered via
            :meth:`adapter` and coexist with later imperative calls.
    """
    validate_mqtt_name(name)
    self._name = name
    self._version = version
    self._description = description
    self._settings_class = settings_class
    try:
        self._settings: Settings | None = settings_class()
    except ValidationError:
        self._settings = None
    self._dry_run = dry_run
    _validate_positive_interval("heartbeat_interval", heartbeat_interval)
    self._heartbeat_interval = heartbeat_interval
    _validate_positive_interval("health_check_interval", health_check_interval)
    self._health_check_interval = health_check_interval
    if restart_after_failures < 0:
        msg = f"restart_after_failures must be >= 0, got {restart_after_failures}"
        raise ValueError(msg)
    self._restart_after_failures = restart_after_failures
    if max_restarts < 0:
        msg = f"max_restarts must be >= 0, got {max_restarts}"
        raise ValueError(msg)
    self._max_restarts = max_restarts
    _validate_positive_interval("restart_cooldown", restart_cooldown)
    self._restart_cooldown = restart_cooldown
    _validate_positive_interval("sustained_health_reset", sustained_health_reset)
    self._sustained_health_reset = sustained_health_reset
    self._lifespan: LifespanFunc = (
        lifespan if lifespan is not None else _noop_lifespan
    )
    self._devices: list[_DeviceRegistration] = []
    self._telemetry: list[_TelemetryRegistration] = []
    self._commands: list[_CommandRegistration] = []
    self._streams: list[_StreamRegistration] = []
    self._periodic: list[_PeriodicRegistration] = []
    self._reactors: list[_ReactorRegistration] = []
    self._state_factories: list[StateRegistration] = []
    self._state_overrides: dict[type, Any] = {}  # for tests
    self._adapters: dict[type, _AdapterEntry] = {}
    self._store_factory: Callable[..., Store] | None = None
    self._store: Store | None = None
    self._apply_store_arg(store)
    self._configure_hooks: list[Callable[..., Any]] = []

    def _register(
        pt: type,
        impl: type | str | Callable[..., object],
        dry_run: type | str | Callable[..., object] | None,
    ) -> None:
        self.adapter(pt, impl, dry_run=dry_run)

    process_adapters_dict(adapters, _register)

settings property

settings: Settings

Application settings, instantiated at construction time.

The instance is created eagerly in __init__ from the settings_class parameter. Environment variables and .env files are read at that point, so decorator arguments like interval=app.settings.poll_interval reflect the actual runtime configuration.

The CLI entrypoint (:meth:cli) re-instantiates settings with --env-file support and passes the result to :meth:_run_async, which takes precedence over this instance.

Raises:

Type Description
RuntimeError

If the settings class could not be instantiated at construction time (e.g. required fields with no defaults and no matching environment variables). Use app.cli() with --env-file instead.

name property

name: str

Application name (used as MQTT topic prefix and client ID).

version property

version: str

Application version string.

description property

description: str

Short description for CLI help text.

devices property

devices: Sequence[_DeviceRegistration]

Registered device handlers (read-only view).

telemetry_registrations property

telemetry_registrations: Sequence[_TelemetryRegistration]

Registered telemetry handlers (read-only view).

Named telemetry_registrations rather than telemetry to avoid shadowing the :meth:telemetry registration decorator.

commands property

commands: Sequence[_CommandRegistration]

Registered command handlers (read-only view).

periodic_registrations property

periodic_registrations: Sequence[_PeriodicRegistration]

Registered periodic handlers (read-only view).

adapters property

adapters: Mapping[type, _AdapterEntry]

Registered adapter entries keyed by port type (read-only view).

registered_names

registered_names() -> frozenset[str]

Collect registered device/telemetry/command/periodic names.

Source code in packages/src/cosalette/_app/__init__.py
def registered_names(self) -> frozenset[str]:
    """Collect registered device/telemetry/command/periodic names."""
    all_regs = (
        self._devices,
        self._telemetry,
        self._commands,
        self._periodic,
        self._streams,
    )
    return frozenset(r.name for regs in all_regs for r in regs)

include_router

include_router(
    router: Router,
    *,
    prefix: str | None = None,
    tags: list[str] | None = None,
    dependencies: list[Any] | None = None,
    adapters: dict[
        type,
        type
        | str
        | Callable[..., object]
        | tuple[
            type | str | Callable[..., object],
            type | str | Callable[..., object],
        ],
    ]
    | None = None,
) -> None

Include a router's registrations in this application.

Applies snapshot semantics: registrations are captured at call time. Later mutations to the router do not affect prior inclusions. Multiple inclusions with different prefixes are allowed.

Parameters:

Name Type Description Default
router Router

Router instance to include.

required
prefix str | None

Optional single MQTT topic segment prepended to all router operation names. Must not contain /, +, #, or NUL. Combined with router's own prefix.

None
tags list[str] | None

Additional tags applied to all router operations. Accumulates in order: router constructor → include_router → operation.

None
dependencies list[Any] | None

Reserved for cos-ebc. Must be None or empty.

None
adapters dict[type, type | str | Callable[..., object] | tuple[type | str | Callable[..., object], type | str | Callable[..., object]]] | None

Adapter declarations merged into the app's registry. Same shape as App(adapters=...). Conflicts (same port type already registered) raise ValueError at include time.

None

Raises:

Type Description
ValueError

If prefix contains MQTT special characters.

ValueError

If an adapter port type conflict is detected.

NotImplementedError

If dependencies is not None or empty.

See Also

ADR-044 — Public Router and composition API.

Example::

# sensors.py
router = cosalette.Router(prefix="sensors")

@router.telemetry("temperature", interval=30)
async def read_temperature() -> dict:
    return {"celsius": 22.5}

# main.py
app = cosalette.App("bridge")
app.include_router(router, tags=["production"])
# → publishes to: bridge/sensors/temperature/state
Source code in packages/src/cosalette/_app/__init__.py
def include_router(
    self,
    router: Router,
    *,
    prefix: str | None = None,
    tags: list[str] | None = None,
    dependencies: list[Any] | None = None,
    adapters: dict[
        type,
        type
        | str
        | Callable[..., object]
        | tuple[
            type | str | Callable[..., object],
            type | str | Callable[..., object],
        ],
    ]
    | None = None,
) -> None:
    """Include a router's registrations in this application.

    Applies snapshot semantics: registrations are captured at call time.
    Later mutations to the router do not affect prior inclusions.
    Multiple inclusions with different prefixes are allowed.

    Args:
        router: Router instance to include.
        prefix: Optional single MQTT topic segment prepended to all
            router operation names.  Must not contain ``/``, ``+``,
            ``#``, or NUL.  Combined with router's own prefix.
        tags: Additional tags applied to all router operations.
            Accumulates in order: router constructor → include_router → operation.
        dependencies: Reserved for cos-ebc.  Must be None or empty.
        adapters: Adapter declarations merged into the app's registry.
            Same shape as ``App(adapters=...)``.  Conflicts (same port
            type already registered) raise ValueError at include time.

    Raises:
        ValueError: If *prefix* contains MQTT special characters.
        ValueError: If an adapter port type conflict is detected.
        NotImplementedError: If *dependencies* is not None or empty.

    See Also:
        ADR-044 — Public Router and composition API.

    Example::

        # sensors.py
        router = cosalette.Router(prefix="sensors")

        @router.telemetry("temperature", interval=30)
        async def read_temperature() -> dict:
            return {"celsius": 22.5}

        # main.py
        app = cosalette.App("bridge")
        app.include_router(router, tags=["production"])
        # → publishes to: bridge/sensors/temperature/state
    """
    if prefix is not None:
        validate_mqtt_name(prefix)

    if dependencies is not None and len(dependencies) > 0:
        msg = (
            "dependencies= is reserved for the cos-ebc epic "
            "and is not yet implemented. Pass None or omit the parameter."
        )
        raise NotImplementedError(msg)

    # Compute combined prefix
    combined_prefix = self._compute_combined_prefix(router._prefix, prefix)

    # Merge adapters first (fail fast on conflicts)
    self._merge_include_adapters(adapters)
    self._merge_router_adapters(router)

    # Include tags: accumulate router constructor → include_router → operation
    include_tags = list(tags) if tags is not None else []

    # Snapshot existing names for collision detection across all types
    existing_names: set[str] = set(self.registered_names())

    # Copy standard registrations with prefix/tag transformations
    self._copy_standard_registrations(
        router, combined_prefix, router._tags, include_tags, existing_names
    )

    # Handle periodic registrations: apply prefix, accumulate tags, check collisions
    for reg in router._periodic:
        accumulated_tags = self._accumulate_tags(
            router._tags, include_tags, reg.tags
        )
        new_name = self._apply_prefix(reg.name, combined_prefix)
        if new_name in existing_names:
            msg = f"Name {new_name!r} is already registered on the app"
            raise ValueError(msg)
        existing_names.add(new_name)
        new_reg = replace(reg, name=new_name, tags=tuple(accumulated_tags))
        self._periodic.append(new_reg)

    # Merge reactors with validation
    self._merge_reactors(router)

cosalette.AppContext

AppContext(
    *, settings: Settings, adapters: dict[type, object]
)

Context for the application lifespan.

Provided to the lifespan async context manager registered via App(lifespan=...). Offers access to settings and adapter resolution but NOT per-device features (no publish, no on_command, no sleep).

See Also

ADR-001 — Framework architecture (lifespan).

Initialise lifecycle-hook context.

Parameters:

Name Type Description Default
settings Settings

Application settings instance.

required
adapters dict[type, object]

Resolved adapter registry mapping port types to instances.

required
Source code in packages/src/cosalette/_context/_app_context.py
def __init__(
    self,
    *,
    settings: Settings,
    adapters: dict[type, object],
) -> None:
    """Initialise lifecycle-hook context.

    Args:
        settings: Application settings instance.
        adapters: Resolved adapter registry mapping port types to instances.
    """
    self._settings = settings
    self._adapters = adapters

settings property

settings: Settings

Application settings instance.

adapter

adapter(port_type: type[T]) -> T

Resolve an adapter by port type.

Parameters:

Name Type Description Default
port_type type[T]

The Protocol type to look up.

required

Returns:

Type Description
T

The adapter instance registered for that port type.

Raises:

Type Description
LookupError

If no adapter is registered for the port type.

Source code in packages/src/cosalette/_context/_app_context.py
def adapter[T](self, port_type: type[T]) -> T:
    """Resolve an adapter by port type.

    Args:
        port_type: The Protocol type to look up.

    Returns:
        The adapter instance registered for that port type.

    Raises:
        LookupError: If no adapter is registered for the port type.
    """
    try:
        return cast(T, self._adapters[port_type])
    except KeyError:
        msg = f"No adapter registered for {port_type!r}"
        raise LookupError(msg) from None

cosalette.DeviceContext

DeviceContext(
    *,
    name: str,
    settings: Settings,
    mqtt: MqttPort,
    topic_prefix: str,
    shutdown_event: Event,
    adapters: dict[type, object],
    clock: ClockPort,
    is_root: bool = False,
)

Per-device runtime context injected by the framework.

Provides device-scoped access to MQTT publishing, command registration, shutdown-aware sleeping, and adapter resolution.

Each device function receives its own DeviceContext instance. The context is pre-configured with the device's name and topic prefix so that publish operations target the correct MQTT topics automatically.

See Also

ADR-010 — Device archetypes. ADR-006 — Hexagonal architecture (adapter resolution).

Initialise per-device context.

Parameters:

Name Type Description Default
name str

Device name as registered (e.g. "blind").

required
settings Settings

Application settings instance.

required
mqtt MqttPort

MQTT port for publishing.

required
topic_prefix str

Root prefix for MQTT topics (e.g. "velux2mqtt").

required
shutdown_event Event

Shared event that signals graceful shutdown.

required
adapters dict[type, object]

Resolved adapter registry mapping port types to instances.

required
clock ClockPort

Monotonic clock for timing.

required
is_root bool

When True, topics omit the device name segment (root-level device).

False
Source code in packages/src/cosalette/_context/_device_context.py
def __init__(
    self,
    *,
    name: str,
    settings: Settings,
    mqtt: MqttPort,
    topic_prefix: str,
    shutdown_event: asyncio.Event,
    adapters: dict[type, object],
    clock: ClockPort,
    is_root: bool = False,
) -> None:
    """Initialise per-device context.

    Args:
        name: Device name as registered (e.g. "blind").
        settings: Application settings instance.
        mqtt: MQTT port for publishing.
        topic_prefix: Root prefix for MQTT topics (e.g. "velux2mqtt").
        shutdown_event: Shared event that signals graceful shutdown.
        adapters: Resolved adapter registry mapping port types to instances.
        clock: Monotonic clock for timing.
        is_root: When True, topics omit the device name segment
            (root-level device).
    """
    self._name = name
    self._settings = settings
    self._mqtt = mqtt
    self._topic_prefix = topic_prefix
    self._shutdown_event = shutdown_event
    self._adapters = adapters
    self._clock = clock
    self._command_handlers: dict[str | None, CommandHandler] = {}
    self._is_root = is_root
    self._command_queue: asyncio.Queue[Command] = asyncio.Queue()
    self._commands_consumed: bool = False
    self._topic_base = topic_prefix if is_root else f"{topic_prefix}/{name}"
    self._active_sub_entities: set[str] = set()

name property

name: str

Device name as registered.

settings property

settings: Settings

Application settings instance.

clock property

clock: ClockPort

Monotonic clock for timing.

shutdown_requested property

shutdown_requested: bool

True when the framework has received a shutdown signal.

command_handler property

command_handler: CommandHandler | None

The root command handler, or None. Framework-internal.

command_handlers property

command_handlers: Mapping[str | None, CommandHandler]

All registered command handlers keyed by sub-topic. Framework-internal.

get_command_handler

get_command_handler(
    sub_topic: str | None = None,
) -> CommandHandler | None

Look up the command handler for a sub-topic (or root).

Source code in packages/src/cosalette/_context/_device_context.py
def get_command_handler(
    self,
    sub_topic: str | None = None,
) -> CommandHandler | None:
    """Look up the command handler for a sub-topic (or root)."""
    return self._command_handlers.get(sub_topic)

publish_state async

publish_state(
    payload: dict[str, object], *, retain: bool = True
) -> None

Publish device state to {prefix}/{device}/state as JSON.

For root devices (unnamed), publishes to {prefix}/state instead.

This is the primary publication method for device telemetry. The payload dict is JSON-serialised automatically.

Parameters:

Name Type Description Default
payload dict[str, object]

Dict to serialise as JSON.

required
retain bool

Whether the message should be retained (default True).

True
Source code in packages/src/cosalette/_context/_device_context.py
async def publish_state(
    self,
    payload: dict[str, object],
    *,
    retain: bool = True,
) -> None:
    """Publish device state to ``{prefix}/{device}/state`` as JSON.

    For root devices (unnamed), publishes to ``{prefix}/state`` instead.

    This is the primary publication method for device telemetry.
    The payload dict is JSON-serialised automatically.

    Args:
        payload: Dict to serialise as JSON.
        retain: Whether the message should be retained (default True).
    """
    topic = f"{self._topic_base}/state"
    await self._mqtt.publish(topic, payload, retain=retain, qos=1)

publish async

publish(
    channel: str,
    payload: str,
    *,
    retain: bool = False,
    qos: int = 1,
) -> None

Publish to an arbitrary sub-channel: {prefix}/{device}/{channel}.

For root devices (unnamed), publishes to {prefix}/{channel} instead.

Escape hatch for non-standard topics. Prefer publish_state() for normal device state updates.

Source code in packages/src/cosalette/_context/_device_context.py
async def publish(
    self,
    channel: str,
    payload: str,
    *,
    retain: bool = False,
    qos: int = 1,
) -> None:
    """Publish to an arbitrary sub-channel: ``{prefix}/{device}/{channel}``.

    For root devices (unnamed), publishes to ``{prefix}/{channel}`` instead.

    Escape hatch for non-standard topics. Prefer publish_state() for
    normal device state updates.
    """
    topic = f"{self._topic_base}/{channel}"
    await self._mqtt.publish(topic, payload, retain=retain, qos=qos)

sleep async

sleep(seconds: float) -> None

Shutdown-aware sleep.

Returns early (without exception) if shutdown is requested during the sleep period. This enables the idiomatic pattern::

while not ctx.shutdown_requested:
    await ctx.sleep(10)
    # ... do work ...
Source code in packages/src/cosalette/_context/_device_context.py
async def sleep(self, seconds: float) -> None:
    """Shutdown-aware sleep.

    Returns early (without exception) if shutdown is requested during
    the sleep period. This enables the idiomatic pattern::

        while not ctx.shutdown_requested:
            await ctx.sleep(10)
            # ... do work ...
    """
    if self._shutdown_event.is_set():
        return

    sleep_task = asyncio.ensure_future(self._clock.sleep(seconds))
    shutdown_task = asyncio.ensure_future(self._shutdown_event.wait())

    done, pending = await asyncio.wait(
        {sleep_task, shutdown_task},
        return_when=asyncio.FIRST_COMPLETED,
    )

    for task in pending:
        task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await task

sleep_until async

sleep_until(
    target: time | Sequence[time],
    *,
    tz: tzinfo | None = None,
) -> None

Shutdown-aware sleep until a wall-clock time.

Sleeps until the next occurrence of target (or the nearest upcoming time if a sequence is given). Uses local timezone when tz is None.

Returns early (without exception) if shutdown is requested during the sleep, via :meth:sleep.

Example — poll twice daily at 06:00 and 18:00 local time::

while not ctx.shutdown_requested:
    data = await read_data()
    await ctx.publish_state(data)
    await ctx.sleep_until([time(6, 0), time(18, 0)])

Parameters:

Name Type Description Default
target time | Sequence[time]

A :class:datetime.time or sequence of times to sleep until.

required
tz tzinfo | None

Timezone for interpreting target. None (default) uses the system's local timezone.

None

Raises:

Type Description
ValueError

If target is an empty sequence.

See Also

ADR-032 — Wall-clock scheduling design.

Source code in packages/src/cosalette/_context/_device_context.py
async def sleep_until(
    self,
    target: datetime.time | Sequence[datetime.time],
    *,
    tz: datetime.tzinfo | None = None,
) -> None:
    """Shutdown-aware sleep until a wall-clock time.

    Sleeps until the next occurrence of *target* (or the nearest
    upcoming time if a sequence is given).  Uses local timezone
    when *tz* is ``None``.

    Returns early (without exception) if shutdown is requested
    during the sleep, via :meth:`sleep`.

    Example — poll twice daily at 06:00 and 18:00 local time::

        while not ctx.shutdown_requested:
            data = await read_data()
            await ctx.publish_state(data)
            await ctx.sleep_until([time(6, 0), time(18, 0)])

    Args:
        target: A :class:`datetime.time` or sequence of times to
            sleep until.
        tz: Timezone for interpreting *target*.  ``None`` (default)
            uses the system's local timezone.

    Raises:
        ValueError: If *target* is an empty sequence.

    See Also:
        ADR-032 — Wall-clock scheduling design.
    """
    seconds = _seconds_until(target, tz=tz)
    await self.sleep(seconds)

sub_entity async

sub_entity(name: str) -> AsyncIterator[SubEntityContext]

Scoped sub-entity lifecycle with automatic availability.

Publishes "online" on enter and "offline" on exit to {topic_base}/{name}/availability. Clears retained state on exit by publishing an empty payload to the state topic.

Parameters:

Name Type Description Default
name str

Sub-entity name (single MQTT topic level).

required

Yields:

Name Type Description
A AsyncIterator[SubEntityContext]

class:SubEntityContext scoped to the sub-entity's topics.

Raises:

Type Description
ValueError

If the name fails validation.

See Also

ADR-031 — Sub-entity context manager.

Source code in packages/src/cosalette/_context/_device_context.py
@contextlib.asynccontextmanager
async def sub_entity(self, name: str) -> AsyncIterator[SubEntityContext]:
    """Scoped sub-entity lifecycle with automatic availability.

    Publishes ``"online"`` on enter and ``"offline"`` on exit to
    ``{topic_base}/{name}/availability``.  Clears retained state
    on exit by publishing an empty payload to the state topic.

    Args:
        name: Sub-entity name (single MQTT topic level).

    Yields:
        A :class:`SubEntityContext` scoped to the sub-entity's topics.

    Raises:
        ValueError: If the name fails validation.

    See Also:
        ADR-031 — Sub-entity context manager.
    """
    from cosalette._context._sub_entity_context import SubEntityContext

    self._validate_sub_entity_name(name)
    self._active_sub_entities.add(name)
    sub = SubEntityContext(name=name, parent=self)
    avail_topic = f"{self._topic_base}/{name}/availability"
    try:
        await self._mqtt.publish(avail_topic, "online", retain=True, qos=1)
    except Exception:
        self._active_sub_entities.discard(name)
        raise
    try:
        yield sub
    finally:
        try:
            state_topic = f"{self._topic_base}/{name}/state"
            await self._mqtt.publish(state_topic, "", retain=True, qos=1)
            await self._mqtt.publish(avail_topic, "offline", retain=True, qos=1)
        finally:
            self._active_sub_entities.discard(name)

on_command

on_command(
    handler_or_sub_topic: CommandHandler,
) -> CommandHandler
on_command(
    handler_or_sub_topic: str | None = ...,
) -> Callable[[CommandHandler], CommandHandler]
on_command(
    handler_or_sub_topic: CommandHandler
    | str
    | None = None,
) -> (
    CommandHandler
    | Callable[[CommandHandler], CommandHandler]
)

Register a command handler for this device.

Supports three call patterns:

  1. Decorator — root handler::

    @ctx.on_command async def handle(sub_topic: str | None, payload: str) -> None: ...

  2. Direct call — root handler::

    ctx.on_command(handle)

  3. Decorator factory — sub-topic handler::

    @ctx.on_command("calibrate") async def handle_cal(sub_topic: str | None, payload: str) -> None: ...

Handlers may also accept a single :class:Command argument (new-style), detected automatically via type annotation::

@ctx.on_command
async def handle(cmd: Command) -> None: ...

Raises:

Type Description
RuntimeError

If a handler is already registered for the same sub-topic, or if commands() is active and a root handler is being registered.

ValueError

If the sub-topic string is empty or contains /, +, or #.

Returns:

Type Description
CommandHandler | Callable[[CommandHandler], CommandHandler]

The handler unchanged when called with a callable, or a

CommandHandler | Callable[[CommandHandler], CommandHandler]

decorator function when called with a sub-topic string or None.

Source code in packages/src/cosalette/_context/_device_context.py
def on_command(
    self,
    handler_or_sub_topic: CommandHandler | str | None = None,
    /,
) -> CommandHandler | Callable[[CommandHandler], CommandHandler]:
    """Register a command handler for this device.

    Supports three call patterns:

    1. Decorator — root handler::

        @ctx.on_command
        async def handle(sub_topic: str | None, payload: str) -> None: ...

    2. Direct call — root handler::

        ctx.on_command(handle)

    3. Decorator factory — sub-topic handler::

        @ctx.on_command("calibrate")
        async def handle_cal(sub_topic: str | None, payload: str) -> None: ...

    Handlers may also accept a single :class:`Command` argument
    (new-style), detected automatically via type annotation::

        @ctx.on_command
        async def handle(cmd: Command) -> None: ...

    Raises:
        RuntimeError: If a handler is already registered for the same
            sub-topic, or if ``commands()`` is active and a root
            handler is being registered.
        ValueError: If the sub-topic string is empty or contains
            ``/``, ``+``, or ``#``.

    Returns:
        The handler unchanged when called with a callable, or a
        decorator function when called with a sub-topic string or None.
    """

    def _register(
        handler: CommandHandler,
        sub_topic: str | None,
    ) -> CommandHandler:
        if sub_topic is None and self._commands_consumed:
            msg = (
                f"Cannot register on_command — commands() iterator already "
                f"active for device '{self._name}'"
            )
            raise RuntimeError(msg)
        if sub_topic in self._command_handlers:
            label = f"sub-topic '{sub_topic}'" if sub_topic else "root"
            msg = (
                f"Command handler already registered for {label} "
                f"on device '{self._name}'"
            )
            raise RuntimeError(msg)
        self._command_handlers[sub_topic] = handler
        return handler

    # --- callable → register as root handler immediately ---
    if callable(handler_or_sub_topic):
        return _register(handler_or_sub_topic, None)  # ty: ignore[invalid-argument-type]

    # --- string → validate and return decorator for that sub-topic ---
    if isinstance(handler_or_sub_topic, str):
        if not handler_or_sub_topic:
            msg = "Sub-topic must not be empty"
            raise ValueError(msg)
        _invalid = set(handler_or_sub_topic) & {"/", "+", "#"}
        if _invalid:
            msg = (
                f"Sub-topic contains invalid MQTT characters "
                f"{_invalid}: '{handler_or_sub_topic}'"
            )
            raise ValueError(msg)
        sub = handler_or_sub_topic

        def _decorator(handler: CommandHandler) -> CommandHandler:
            return _register(handler, sub)

        return _decorator

    # --- None → return decorator for root handler ---
    def _root_decorator(handler: CommandHandler) -> CommandHandler:
        return _register(handler, None)

    return _root_decorator

commands

commands(
    timeout: float | None = None,
) -> AsyncIterator[Command | None]

Async iterator that yields inbound commands from the internal queue.

Provides a queue-backed async iterator for @app.device loops, eliminating the need for manual asyncio.Queue bridges.

When timeout is provided, yields None on timeout expiry, enabling periodic-work patterns::

async for cmd in ctx.commands(timeout=5):
    if cmd is None:
        await periodic_check()
    else:
        await process(cmd.payload)

Without timeout, blocks until a command arrives or shutdown is requested. Shutdown is detected immediately via event racing.

Parameters:

Name Type Description Default
timeout float | None

Seconds to wait for a command before yielding None. When None (default), blocks indefinitely until a command arrives or shutdown is requested.

None

Yields:

Type Description
AsyncIterator[Command | None]

Command when a command arrives, or None on timeout expiry.

Raises:

Type Description
RuntimeError

If called more than once on the same context, or if a command handler is already registered via :meth:on_command.

See Also

ADR-025 — Command channel and sub-topic routing.

Source code in packages/src/cosalette/_context/_device_context.py
def commands(
    self,
    timeout: float | None = None,
) -> AsyncIterator[Command | None]:
    """Async iterator that yields inbound commands from the internal queue.

    Provides a queue-backed async iterator for ``@app.device`` loops,
    eliminating the need for manual ``asyncio.Queue`` bridges.

    When *timeout* is provided, yields ``None`` on timeout expiry,
    enabling periodic-work patterns::

        async for cmd in ctx.commands(timeout=5):
            if cmd is None:
                await periodic_check()
            else:
                await process(cmd.payload)

    Without *timeout*, blocks until a command arrives or shutdown is
    requested. Shutdown is detected immediately via event racing.

    Args:
        timeout: Seconds to wait for a command before yielding None.
            When None (default), blocks indefinitely until a command
            arrives or shutdown is requested.

    Yields:
        Command when a command arrives, or None on timeout expiry.

    Raises:
        RuntimeError: If called more than once on the same context,
            or if a command handler is already registered via
            :meth:`on_command`.

    See Also:
        ADR-025 — Command channel and sub-topic routing.
    """
    if self._commands_consumed:
        msg = f"commands() already active for device '{self._name}'"
        raise RuntimeError(msg)
    if None in self._command_handlers:
        msg = (
            f"Cannot use commands() — on_command handler already "
            f"registered for device '{self._name}'"
        )
        raise RuntimeError(msg)
    self._commands_consumed = True

    async def _iter() -> AsyncIterator[Command | None]:
        while not self.shutdown_requested:
            result = await self._await_command(timeout)
            if result is not None:
                yield result
            elif self.shutdown_requested:
                break
            elif timeout is not None:
                yield None
        # Drain any commands that arrived before/during shutdown
        while not self._command_queue.empty():
            yield self._command_queue.get_nowait()

    return _iter()

adapter

adapter(port_type: type[T]) -> T

Resolve an adapter by port type.

Parameters:

Name Type Description Default
port_type type[T]

The Protocol type to look up.

required

Returns:

Type Description
T

The adapter instance registered for that port type.

Raises:

Type Description
LookupError

If no adapter is registered for the port type.

Source code in packages/src/cosalette/_context/_device_context.py
def adapter[T](self, port_type: type[T]) -> T:
    """Resolve an adapter by port type.

    Args:
        port_type: The Protocol type to look up.

    Returns:
        The adapter instance registered for that port type.

    Raises:
        LookupError: If no adapter is registered for the port type.
    """
    try:
        return cast(T, self._adapters[port_type])
    except KeyError:
        msg = f"No adapter registered for {port_type!r}"
        raise LookupError(msg) from None

cosalette.SubEntityContext

SubEntityContext(*, name: str, parent)

Context for a sub-entity within a device.

Provides scoped MQTT publishing for a sub-entity's topic namespace. Created via :meth:DeviceContext.sub_entity context manager — not instantiated directly by user code.

See Also

ADR-031 — Sub-entity context manager.

Source code in packages/src/cosalette/_context/_sub_entity_context.py
def __init__(self, *, name: str, parent) -> None:
    from cosalette._context._device_context import DeviceContext

    self.name = name
    self.parent: DeviceContext = parent

publish_state async

publish_state(
    payload: dict[str, object], *, retain: bool = True
) -> None

Publish sub-entity state to {device}/{name}/state as JSON.

Parameters:

Name Type Description Default
payload dict[str, object]

Dict to serialise as JSON.

required
retain bool

Whether the message should be retained (default True).

True
Source code in packages/src/cosalette/_context/_sub_entity_context.py
async def publish_state(
    self,
    payload: dict[str, object],
    *,
    retain: bool = True,
) -> None:
    """Publish sub-entity state to ``{device}/{name}/state`` as JSON.

    Args:
        payload: Dict to serialise as JSON.
        retain: Whether the message should be retained (default True).
    """
    topic = f"{self.parent._topic_base}/{self.name}/state"
    await self.parent._mqtt.publish(topic, payload, retain=retain, qos=1)

on_command

on_command(handler: CommandHandler) -> CommandHandler

Register a command handler for this sub-entity's sub-topic.

Delegates to the parent device's :meth:~DeviceContext.on_command with this sub-entity's name as the sub-topic.

Parameters:

Name Type Description Default
handler CommandHandler

Async callable to handle inbound commands.

required

Returns:

Type Description
CommandHandler

The handler, unchanged.

Source code in packages/src/cosalette/_context/_sub_entity_context.py
def on_command(
    self,
    handler: CommandHandler,
) -> CommandHandler:
    """Register a command handler for this sub-entity's sub-topic.

    Delegates to the parent device's :meth:`~DeviceContext.on_command`
    with this sub-entity's name as the sub-topic.

    Args:
        handler: Async callable to handle inbound commands.

    Returns:
        The handler, unchanged.
    """
    return self.parent.on_command(self.name)(handler)

cosalette.Command dataclass

Command(
    topic: str,
    payload: str,
    sub_topic: str | None = None,
    timestamp: float = 0.0,
)

An inbound MQTT command.

Attributes:

Name Type Description
topic str

Full MQTT topic the command arrived on.

payload str

Raw payload string.

sub_topic str | None

Sub-topic segment, or None for root commands.

timestamp float

Monotonic timestamp at receipt (seconds).

cosalette.CronSchedule

CronSchedule(expression: str)

Parsed Quartz-compatible cron expression.

Supports 6-field (second through day-of-week) and 7-field (with year) expressions.

Example::

sched = CronSchedule("0 30 10-13 ? * WED,FRI")
next_dt = sched.next_fire_after(datetime.datetime.now())
Source code in packages/src/cosalette/_cron/_schedule.py
def __init__(self, expression: str) -> None:
    parts = expression.strip().split()
    if len(parts) not in (6, 7):
        msg = (
            f"Cron expression must have 6 or 7 fields, "
            f"got {len(parts)}: {expression!r}"
        )
        raise ValueError(msg)

    self._expression = expression
    self._seconds: set[int] = _parse_simple_field(parts[0], 0, 59)
    self._minutes: set[int] = _parse_simple_field(parts[1], 0, 59)
    self._hours: set[int] = _parse_simple_field(parts[2], 0, 23)
    self._dom: set[int] | _DomSpecial = _parse_dom_field(parts[3])
    self._dom_is_unspecified: bool = parts[3].strip() == "?"
    self._months: set[int] = _parse_simple_field(parts[4], 1, 12, _MONTH_NAMES)
    self._dow: set[int] | _DowSpecial = _parse_dow_field(parts[5])
    self._dow_is_unspecified: bool = parts[5].strip() == "?"
    self._py_dows: frozenset[int] | None = (
        frozenset(_quartz_to_python_dow(q) for q in self._dow)
        if isinstance(self._dow, set)
        else None
    )
    self._years: set[int] | None = (
        _parse_simple_field(parts[6], _MIN_YEAR, _MAX_YEAR)
        if len(parts) == 7
        else None
    )

    self._validate()

expression property

expression: str

The original cron expression string.

next_fire_after

next_fire_after(after: datetime) -> datetime

Return the next datetime strictly after after that matches.

The returned datetime preserves the timezone of after.

Parameters:

Name Type Description Default
after datetime

Reference datetime (exclusive — result is strictly after).

required

Returns:

Type Description
datetime

Next matching datetime.

Raises:

Type Description
ValueError

If no match is found within the scan limit (4 years).

Source code in packages/src/cosalette/_cron/_schedule.py
def next_fire_after(
    self,
    after: datetime.datetime,
) -> datetime.datetime:
    """Return the next datetime strictly after *after* that matches.

    The returned datetime preserves the timezone of *after*.

    Args:
        after: Reference datetime (exclusive — result is strictly after).

    Returns:
        Next matching datetime.

    Raises:
        ValueError: If no match is found within the scan limit
            (4 years).
    """
    tz = after.tzinfo

    # Start scanning from the next second
    dt = after.replace(microsecond=0) + datetime.timedelta(seconds=1)

    limit = after.year + _MAX_SCAN_YEARS
    checks = [
        self._advance_month,
        self._advance_day,
        self._advance_hour,
        self._advance_minute,
        self._advance_second,
    ]

    while dt.year <= limit:
        # Year check (handled separately — can terminate the scan)
        if self._years is not None and dt.year not in self._years:
            future = sorted(y for y in self._years if y > dt.year)
            if not future:
                break
            dt = datetime.datetime(future[0], 1, 1, tzinfo=tz)
            continue

        for check in checks:
            result = check(dt, tz)
            if result is not None:
                dt = result
                break
        else:
            # All fields matched
            return dt

    msg = (
        f"No matching fire time found within {_MAX_SCAN_YEARS} years "
        f"for expression {self._expression!r}"
    )
    raise ValueError(msg)

__eq__

__eq__(other: object) -> bool

Check equality based on the raw expression string.

Note: Semantically equivalent expressions with different syntax (e.g. "0 0 12 * * MON" vs "0 0 12 * * 2") compare unequal because comparison uses the unparsed expression text.

Source code in packages/src/cosalette/_cron/_schedule.py
def __eq__(self, other: object) -> bool:
    """Check equality based on the raw expression string.

    Note: Semantically equivalent expressions with different syntax
    (e.g. ``"0 0 12 * * MON"`` vs ``"0 0 12 * * 2"``) compare
    unequal because comparison uses the unparsed expression text.
    """
    if not isinstance(other, CronSchedule):
        return NotImplemented
    return self._expression == other._expression

Shared-State Factories

@app.state registers a factory that runs once at bootstrap, after settings are resolved and before lifecycle adapters are entered. Its return value is registered in the DI container by the return type and injected into any handler declaring that type.

Four factory forms are supported, detected from the return annotation at registration time:

Form Teardown
def f(...) -> T None
def f(...) -> ContextManager[T] __exit__ on shutdown
async def f(...) -> AsyncIterator[T] generator finalized on shutdown
async def f(...) -> AsyncContextManager[T] __aexit__ on shutdown

Teardown runs in reverse registration order (LIFO).

The factory may optionally declare one parameter annotated with Settings or a subclass — the framework passes the resolved settings instance narrowed to that type. Zero-parameter factories are also valid.

Registration-time validation:

  • Missing return annotation → TypeError
  • Unsupported return annotation form → TypeError
  • First parameter annotated with a non-Settings type → TypeError
  • Two factories returning the same type → ValueError

See Share State Between Handlers for usage examples and ADR-039 for design rationale.

Domain-Event Reactors

@app.react registers a reactor function that the framework calls automatically at execution boundaries when a state object has pending domain events.

@app.react(SharedState, drain=lambda s: s.registry.drain_events())
async def on_registry_events(
    events: list[RegistryEvent],   # reserved name — injected by framework
    ctx: cosalette.DeviceContext,
    store: DeviceStore,
    state: SharedState,
) -> None:
    for event in events:
        await ctx.publish("registry/event", event.to_dict())
    store["registry"] = state.registry.to_dict()

state_type — the @app.state-registered type to watch. Must be registered before @app.react is called; otherwise ValueError is raised at decoration time.

drain= — optional callable (state_instance) -> Iterable | None. When None, the framework calls state_instance.drain_events() structurally. If no drain method exists, AttributeError is raised at runtime.

events parameter — reserved name. If the reactor function declares a parameter named events, the framework injects the drained event list directly. The events parameter is not resolved through type-based DI.

Reaction boundaries:

Handler When reactors fire
@app.device After each yield and once at normal completion
@app.stream After each item processed and once at handler exit
@app.telemetry After each successful handler return
@app.command After each successful handler return

Reactors do not fire on cancellation or unhandled exceptions.

Registration-time validation:

  • state_type not registered via @app.stateValueError
  • Reactor function is not async defTypeError

See Share State Between Handlers for usage examples and ADR-043 for design rationale.

Periodic Background Tasks

@app.periodic registers a coroutine as a background task that runs on a fixed interval with no MQTT output. It is the right primitive for side-effect work that runs alongside devices: flushing write buffers, sending watchdog pings, synchronising LED state, or warming caches.

import datetime
import cosalette
from cosalette import SettingRef


class AppSettings(cosalette.Settings):
    watchdog_enabled: bool = True
    led_interval: float = 5.0


app = cosalette.App(name="bridge", version="1.0.0")


@app.periodic("flush-buffer", interval=30.0)  # (1)!
async def flush_buffer(cache: BufferCache) -> None:
    await cache.flush()


@app.periodic(
    "watchdog",
    interval=datetime.timedelta(minutes=1),  # (2)!
    enabled=lambda s: s.watchdog_enabled,    # (3)!
)
async def watchdog_ping(settings: AppSettings) -> None:
    await ping_watchdog(settings.watchdog_url)


@app.periodic("led-sync", interval=SettingRef("led_interval"))  # (4)!
async def led_sync(led: LedPort) -> None:
    await led.sync_state()


@app.periodic(  # (5)!
    "poll-sensor",
    interval=lambda s: s.sensor_poll_interval,
)
async def poll_sensor(settings: AppSettings) -> None:
    await read_sensor(settings.sensor_url)
  1. interval as a plain float — simplest form; positive number of seconds between invocations.
  2. interval as datetime.timedelta — converted to seconds at registration time.
  3. enabled as a callable — evaluated at bootstrap with the resolved Settings instance; False silently skips registration entirely (ADR-038 deferred-enabled pattern).
  4. SettingRef("led_interval") — deferred resolution: the value of AppSettings.led_interval is read from settings at bootstrap, not at import time.
  5. interval as a Callable[[Settings], float] — called once at bootstrap with the resolved settings; use when the interval depends on a computed expression or multiple settings fields.

DI injection: handlers may declare Settings subclasses, adapter ports registered via app.adapter(), ClockPort, and objects registered by @app.state factories. DeviceContext is not available (periodic tasks have no MQTT lifecycle).

Exception behaviour: asyncio.CancelledError propagates (clean shutdown). All other exceptions are caught, logged at ERROR level, and the loop continues.

Lifecycle: periodic tasks are spawned as asyncio.Tasks during Phase 3 (Run) and cancelled during Phase 4 (Teardown) with a 5-second grace period.

App.add_periodic(name, func, *, interval, enabled, init, summary, behavior)

Imperative equivalent of @app.periodic. Accepts enabled: bool only (not a callable) — use inside @app.on_configure where settings are already resolved.

App.periodic_registrations

Sequence[_PeriodicRegistration] — read-only view of all registered periodic tasks. Each entry exposes name, interval, func, and the injection plan.

AppHarness.tick_periodic(name)

Invoke one cycle of a named periodic handler synchronously, bypassing the interval sleep. This is the recommended way to test periodic handlers:

async def test_flush_writes_pending_data() -> None:
    mock_buf = MockBufferPort()
    harness = AppHarness.create()
    harness.app.adapter(BufferPort, lambda: mock_buf)

    await harness.tick_periodic("flush-buffer")

    assert mock_buf.flush_called

The handler runs exactly once. No task is spawned; no sleep occurs.

AppHarness.create(..., run_periodic=False)

The run_periodic parameter on AppHarness.create() controls whether periodic tasks are spawned during harness.run():

Value Effect
False (default) Periodic tasks are not spawned — existing tests are unaffected
True Periodic tasks are spawned as asyncio.Tasks for integration-level coverage

Prefer tick_periodic() for unit-level testing of handler logic. Use run_periodic=True only when you need to verify that a task actually fires during the full application lifecycle.

See the Periodic Tasks guide for full usage examples and ADR-041 for design rationale.

MQTT

cosalette.MqttPort

Bases: Protocol

Port contract for MQTT publish/subscribe.

Satisfies ADR-006 hexagonal architecture: all MQTT interaction goes through this protocol so adapters are swappable.

cosalette.MqttClient dataclass

MqttClient(
    settings: MqttSettings, will: WillConfig | None = None
)

Production MQTT adapter backed by aiomqtt.

Uses a background task that maintains a persistent connection with automatic reconnection. aiomqtt is imported lazily inside _connection_loop() so the mock and null adapters work without the dependency installed.

See Also

ADR-006 — Hexagonal architecture (lazy imports). ADR-012 — LWT / availability via WillConfig.

is_connected property

is_connected: bool

Whether the client is currently connected to the broker.

publish async

publish(
    topic: str,
    payload: str | dict[str, Any],
    *,
    retain: bool = False,
    qos: int = 1,
) -> None

Publish a message to the broker.

Raises:

Type Description
RuntimeError

If the client is not connected.

Source code in packages/src/cosalette/_mqtt/_client.py
async def publish(
    self,
    topic: str,
    payload: str | dict[str, Any],
    *,
    retain: bool = False,
    qos: int = 1,
) -> None:
    """Publish a message to the broker.

    Raises:
        RuntimeError: If the client is not connected.
    """
    if self._client is None:
        msg = "MqttClient is not connected"
        raise RuntimeError(msg)
    if isinstance(payload, dict):
        from cosalette._json import dumps

        payload = dumps(payload)
    await self._client.publish(
        topic,
        payload,
        retain=retain,
        qos=qos,
    )
    logger.debug(
        "Published to %s (qos=%d, retain=%s)",
        topic,
        qos,
        retain,
    )

subscribe async

subscribe(topic: str) -> None

Subscribe to topic.

The subscription is tracked internally so it can be restored after a reconnection.

Source code in packages/src/cosalette/_mqtt/_client.py
async def subscribe(self, topic: str) -> None:
    """Subscribe to *topic*.

    The subscription is tracked internally so it can be restored
    after a reconnection.
    """
    self._subscriptions.add(topic)
    if self._client is not None:
        await self._client.subscribe(
            topic,
            qos=1,
        )

on_message

on_message(callback: MessageCallback) -> None

Register a callback for inbound messages.

Source code in packages/src/cosalette/_mqtt/_client.py
def on_message(self, callback: MessageCallback) -> None:
    """Register a callback for inbound messages."""
    self._callbacks.append(callback)

start async

start() -> None

Start the background connection loop.

Source code in packages/src/cosalette/_mqtt/_client.py
async def start(self) -> None:
    """Start the background connection loop."""
    if self._listen_task is not None and not self._listen_task.done():
        logger.debug("MqttClient.start() called while already running")
        return
    # Build SSL context once — avoids re-reading CA file on every reconnect.
    if self._ssl_context is None:
        self._ssl_context = self._build_ssl_context()
    self._stopping = False
    self._listen_task = asyncio.create_task(
        self._connection_loop(),
    )

stop async

stop() -> None

Stop the connection loop and clean up.

Idempotent — safe to call multiple times.

Source code in packages/src/cosalette/_mqtt/_client.py
async def stop(self) -> None:
    """Stop the connection loop and clean up.

    Idempotent — safe to call multiple times.
    """
    self._stopping = True
    if self._listen_task is not None:
        self._listen_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._listen_task
        self._listen_task = None
    self._client = None
    self._connected.clear()

cosalette.MqttLifecycle

Bases: Protocol

Lifecycle management for MQTT adapters.

Adapters that need explicit start/stop (e.g. connecting to a broker) implement this protocol. Adapters like MockMqttClient and NullMqttClient that need no lifecycle management simply omit these methods — the framework detects their absence via isinstance.

See Also

ADR-006 — Interface Segregation: ports are narrow by design. PEP 544 — Structural subtyping (Protocols).

cosalette.MqttMessageHandler

Bases: Protocol

Message dispatch capability for MQTT adapters.

Adapters that can receive inbound messages implement this protocol. The framework calls on_message to wire up the topic router.

See Also

ADR-006 — Interface Segregation.

cosalette.MockMqttClient dataclass

MockMqttClient(
    published: list[tuple[str, str, bool, int]] = list(),
    subscriptions: list[str] = list(),
    raise_on_publish: Exception | None = None,
)

In-memory test double that records MQTT interactions.

Records publishes and subscriptions for assertion. Supports callback registration and simulated message delivery via deliver().

publish_count property

publish_count: int

Number of recorded publishes.

subscribe_count property

subscribe_count: int

Number of recorded subscriptions.

publish async

publish(
    topic: str,
    payload: str | dict[str, Any],
    *,
    retain: bool = False,
    qos: int = 1,
) -> None

Record a publish call, or raise if raise_on_publish is set.

Source code in packages/src/cosalette/_mqtt/__init__.py
async def publish(
    self,
    topic: str,
    payload: str | dict[str, Any],
    *,
    retain: bool = False,
    qos: int = 1,
) -> None:
    """Record a publish call, or raise if ``raise_on_publish`` is set."""
    if self.raise_on_publish is not None:
        raise self.raise_on_publish
    if isinstance(payload, dict):
        from cosalette._json import dumps

        payload = dumps(payload)
    self.published.append((topic, payload, retain, qos))

subscribe async

subscribe(topic: str) -> None

Record a subscribe call.

Source code in packages/src/cosalette/_mqtt/__init__.py
async def subscribe(self, topic: str) -> None:
    """Record a subscribe call."""
    self.subscriptions.append(topic)

on_message

on_message(callback: MessageCallback) -> None

Register an inbound-message callback.

Source code in packages/src/cosalette/_mqtt/__init__.py
def on_message(self, callback: MessageCallback) -> None:
    """Register an inbound-message callback."""
    self._callbacks.append(callback)

deliver async

deliver(topic: str, payload: str) -> None

Simulate an inbound message by invoking all callbacks.

Source code in packages/src/cosalette/_mqtt/__init__.py
async def deliver(self, topic: str, payload: str) -> None:
    """Simulate an inbound message by invoking all callbacks."""
    for cb in self._callbacks:
        await cb(topic, payload)

reset

reset() -> None

Clear all recorded data, callbacks, and failure injection.

Source code in packages/src/cosalette/_mqtt/__init__.py
def reset(self) -> None:
    """Clear all recorded data, callbacks, and failure injection."""
    self.published.clear()
    self.subscriptions.clear()
    self._callbacks.clear()
    self.raise_on_publish = None

get_messages_for

get_messages_for(topic: str) -> list[tuple[str, bool, int]]

Return (payload, retain, qos) tuples for topic.

Source code in packages/src/cosalette/_mqtt/__init__.py
def get_messages_for(
    self,
    topic: str,
) -> list[tuple[str, bool, int]]:
    """Return ``(payload, retain, qos)`` tuples for *topic*."""
    return [
        (payload, retain, qos)
        for t, payload, retain, qos in self.published
        if t == topic
    ]

cosalette.NullMqttClient dataclass

NullMqttClient()

Silent no-op MQTT adapter.

Every method is a no-op that logs at DEBUG level. Useful as a default when MQTT is not configured.

publish async

publish(
    topic: str,
    payload: str | dict[str, Any],
    *,
    retain: bool = False,
    qos: int = 1,
) -> None

Silently discard a publish request.

Source code in packages/src/cosalette/_mqtt/__init__.py
async def publish(
    self,
    topic: str,
    payload: str | dict[str, Any],  # noqa: ARG002
    *,
    retain: bool = False,  # noqa: ARG002
    qos: int = 1,  # noqa: ARG002
) -> None:
    """Silently discard a publish request."""
    logger.debug("NullMqttClient.publish(%s) — discarded", topic)

subscribe async

subscribe(topic: str) -> None

Silently discard a subscribe request.

Source code in packages/src/cosalette/_mqtt/__init__.py
async def subscribe(self, topic: str) -> None:
    """Silently discard a subscribe request."""
    logger.debug("NullMqttClient.subscribe(%s) — discarded", topic)

cosalette.WillConfig dataclass

WillConfig(
    topic: str,
    payload: str = "offline",
    qos: int = 1,
    retain: bool = True,
)

Last-Will-and-Testament configuration.

Abstracts aiomqtt.Will so that callers never depend on the aiomqtt package directly. The real client translates this into the library-specific type inside _connection_loop().

See Also

ADR-012 — Health and availability reporting.

cosalette.MessageCallback module-attribute

MessageCallback = Callable[[str, str], Awaitable[None]]

Async callback receiving (topic, payload) for each inbound message.

Error Handling

cosalette.ErrorPayload dataclass

ErrorPayload(
    error_type: str,
    message: str,
    device: str | None,
    timestamp: str,
    details: dict[str, object] = dict(),
)

Immutable structured error payload.

Represents a single error event ready for JSON serialisation and MQTT publication.

to_json

to_json() -> str

Serialise to a JSON string.

Source code in packages/src/cosalette/_errors.py
def to_json(self) -> str:
    """Serialise to a JSON string."""
    return dumps(asdict(self))

cosalette.ErrorPublisher dataclass

ErrorPublisher(
    mqtt: MqttPort,
    topic_prefix: str,
    error_type_map: dict[type[Exception], str] = dict(),
    clock: Callable[[], datetime] | None = None,
)

Publishes structured error payloads to MQTT.

Wraps :func:build_error_payload with fire-and-forget MQTT publication. Errors during publication are logged but never propagated — the main application loop must not crash because an error report failed.

Parameters:

Name Type Description Default
mqtt MqttPort

MQTT port used for publishing.

required
topic_prefix str

Base prefix for error topics (e.g. "velux2mqtt").

required
error_type_map dict[type[Exception], str]

Pluggable mapping from consumer exception types to machine-readable type strings.

dict()
clock Callable[[], datetime] | None

Optional callable returning a :class:~datetime.datetime for deterministic testing.

None

publish async

publish(
    error: Exception,
    *,
    device: str | None = None,
    is_root: bool = False,
) -> None

Build an error payload and publish it to MQTT.

Always publishes to {topic_prefix}/error. When device is provided, also publishes to {topic_prefix}/{device}/error (skipped for root devices, whose per-device topic would duplicate the global topic).

The entire pipeline (build → serialise → publish) is wrapped in fire-and-forget semantics: failures at any stage are logged but never propagated to the caller.

Source code in packages/src/cosalette/_errors.py
async def publish(
    self,
    error: Exception,
    *,
    device: str | None = None,
    is_root: bool = False,
) -> None:
    """Build an error payload and publish it to MQTT.

    Always publishes to ``{topic_prefix}/error``.  When *device*
    is provided, also publishes to ``{topic_prefix}/{device}/error``
    (skipped for root devices, whose per-device topic would
    duplicate the global topic).

    The entire pipeline (build → serialise → publish) is wrapped
    in fire-and-forget semantics: failures at *any* stage are
    logged but never propagated to the caller.
    """
    try:
        payload = build_error_payload(
            error,
            error_type_map=self.error_type_map,
            device=device,
            clock=self.clock,
        )
        payload_json = payload.to_json()
    except Exception:
        logger.exception(
            "Failed to build error payload for %r (device=%s)",
            error,
            device,
        )
        return

    global_topic = f"{self.topic_prefix}/error"
    logger.warning(
        "Publishing error: %s (type=%s, device=%s)",
        payload.message,
        payload.error_type,
        device,
    )
    await self._safe_publish(global_topic, payload_json)

    # Skip per-device topic for root devices (same as global)
    if device is not None and not is_root:
        device_topic = f"{self.topic_prefix}/{device}/error"
        await self._safe_publish(device_topic, payload_json)

cosalette.build_error_payload

build_error_payload(
    error: Exception,
    *,
    error_type_map: dict[type[Exception], str]
    | None = None,
    device: str | None = None,
    details: dict[str, object] | None = None,
    clock: Callable[[], datetime] | None = None,
) -> ErrorPayload

Convert an exception into a structured :class:ErrorPayload.

Looks up the exact class of the exception; subclasses are not matched.

Parameters:

Name Type Description Default
error Exception

The exception to convert.

required
error_type_map dict[type[Exception], str] | None

Optional mapping from exception types to machine-readable error_type strings. Falls back to "error" for unmapped types.

None
device str | None

Optional device name to include in the payload.

None
details dict[str, object] | None

Optional dict of additional context to attach to the payload. Defaults to an empty dict when None.

None
clock Callable[[], datetime] | None

Optional callable returning a :class:~datetime.datetime. Defaults to datetime.now(UTC).

None

Returns:

Type Description
ErrorPayload

A frozen dataclass ready for serialisation.

Source code in packages/src/cosalette/_errors.py
def build_error_payload(
    error: Exception,
    *,
    error_type_map: dict[type[Exception], str] | None = None,
    device: str | None = None,
    details: dict[str, object] | None = None,
    clock: Callable[[], datetime] | None = None,
) -> ErrorPayload:
    """Convert an exception into a structured :class:`ErrorPayload`.

    Looks up the exact class of the exception; subclasses are not matched.

    Args:
        error: The exception to convert.
        error_type_map: Optional mapping from exception types to machine-readable
            ``error_type`` strings.  Falls back to ``"error"`` for
            unmapped types.
        device: Optional device name to include in the payload.
        details: Optional dict of additional context to attach to the payload.
            Defaults to an empty dict when ``None``.
        clock: Optional callable returning a :class:`~datetime.datetime`.
            Defaults to ``datetime.now(UTC)``.

    Returns:
        A frozen dataclass ready for serialisation.
    """
    resolved_map = error_type_map or {}
    error_type = resolved_map.get(type(error), "error")
    now = clock() if clock is not None else datetime.now(UTC)
    return ErrorPayload(
        error_type=error_type,
        message=str(error),
        device=device,
        timestamp=now.isoformat(),
        details=details or {},
    )

Health and Availability

cosalette.DeviceStatus dataclass

DeviceStatus(status: str = 'ok')

Immutable status snapshot for a single device.

Used inside :class:HeartbeatPayload to report per-device health in the heartbeat JSON.

to_dict

to_dict() -> dict[str, str]

Serialise to a plain dictionary.

Source code in packages/src/cosalette/_health/_reporter.py
def to_dict(self) -> dict[str, str]:
    """Serialise to a plain dictionary."""
    return asdict(self)

cosalette.HeartbeatPayload dataclass

HeartbeatPayload(
    status: str,
    uptime_s: float,
    version: str,
    devices: dict[str, DeviceStatus] = dict(),
)

Immutable structured heartbeat payload.

Represents an app-level status snapshot ready for JSON serialisation and MQTT publication.

to_json

to_json() -> str

Serialise to a JSON string.

Device entries are expanded to nested dicts via :meth:DeviceStatus.to_dict.

Source code in packages/src/cosalette/_health/_reporter.py
def to_json(self) -> str:
    """Serialise to a JSON string.

    Device entries are expanded to nested dicts via
    :meth:`DeviceStatus.to_dict`.
    """
    data: dict[str, object] = {
        "status": self.status,
        "uptime_s": self.uptime_s,
        "version": self.version,
        "devices": {
            name: device.to_dict() for name, device in self.devices.items()
        },
    }
    return dumps(data)

cosalette.HealthReporter dataclass

HealthReporter(
    mqtt: MqttPort,
    topic_prefix: str,
    version: str,
    clock: ClockPort,
)

Publishes app heartbeats and per-device availability to MQTT.

Manages device tracking, uptime calculation (via monotonic clock), and graceful shutdown. All publication is fire-and-forget — errors are logged but never propagated.

Parameters:

Name Type Description Default
mqtt MqttPort

MQTT port used for publishing.

required
topic_prefix str

Base prefix for health topics (e.g. "velux2mqtt").

required
version str

Application version string included in heartbeats.

required
clock ClockPort

Monotonic clock for uptime measurement (see :class:ClockPort).

required

__post_init__

__post_init__() -> None

Capture the start time for uptime calculation.

Source code in packages/src/cosalette/_health/_reporter.py
def __post_init__(self) -> None:
    """Capture the start time for uptime calculation."""
    self._start_time = self.clock.now()

set_device_status

set_device_status(device: str, status: str = 'ok') -> None

Update or add a device's status in the internal tracker.

Parameters:

Name Type Description Default
device str

Device name (used in topic paths and heartbeat payload).

required
status str

Free-form status string, defaults to "ok".

'ok'
Source code in packages/src/cosalette/_health/_reporter.py
def set_device_status(self, device: str, status: str = "ok") -> None:
    """Update or add a device's status in the internal tracker.

    Args:
        device: Device name (used in topic paths and heartbeat payload).
        status: Free-form status string, defaults to ``"ok"``.
    """
    self._devices[device] = DeviceStatus(status=status)

remove_device

remove_device(device: str) -> None

Remove a device from internal tracking, if present.

Source code in packages/src/cosalette/_health/_reporter.py
def remove_device(self, device: str) -> None:
    """Remove a device from internal tracking, if present."""
    self._devices.pop(device, None)

publish_device_available async

publish_device_available(
    device: str, *, is_root: bool = False
) -> None

Publish "online" to the device availability topic.

For root devices (unnamed), publishes to {prefix}/availability instead of {prefix}/{device}/availability.

Also registers the device as "ok" in internal tracking.

Source code in packages/src/cosalette/_health/_reporter.py
async def publish_device_available(
    self,
    device: str,
    *,
    is_root: bool = False,
) -> None:
    """Publish ``"online"`` to the device availability topic.

    For root devices (unnamed), publishes to ``{prefix}/availability``
    instead of ``{prefix}/{device}/availability``.

    Also registers the device as ``"ok"`` in internal tracking.
    """
    if is_root:
        topic = f"{self.topic_prefix}/availability"
        self._root_devices.add(device)
    else:
        topic = f"{self.topic_prefix}/{device}/availability"
    await self._safe_publish(topic, "online")
    self.set_device_status(device)

publish_device_unavailable async

publish_device_unavailable(
    device: str, *, is_root: bool = False
) -> None

Publish "offline" to the device availability topic.

For root devices (unnamed), publishes to {prefix}/availability instead of {prefix}/{device}/availability.

Also removes the device from internal tracking.

Source code in packages/src/cosalette/_health/_reporter.py
async def publish_device_unavailable(
    self,
    device: str,
    *,
    is_root: bool = False,
) -> None:
    """Publish ``"offline"`` to the device availability topic.

    For root devices (unnamed), publishes to ``{prefix}/availability``
    instead of ``{prefix}/{device}/availability``.

    Also removes the device from internal tracking.
    """
    if is_root:
        topic = f"{self.topic_prefix}/availability"
        self._root_devices.discard(device)
    else:
        topic = f"{self.topic_prefix}/{device}/availability"
    await self._safe_publish(topic, "offline")
    self.remove_device(device)

publish_heartbeat async

publish_heartbeat() -> None

Publish a structured JSON heartbeat to {prefix}/status.

The payload includes current uptime, version, and all tracked device statuses.

Source code in packages/src/cosalette/_health/_reporter.py
async def publish_heartbeat(self) -> None:
    """Publish a structured JSON heartbeat to ``{prefix}/status``.

    The payload includes current uptime, version, and all tracked
    device statuses.
    """
    uptime = self.clock.now() - self._start_time
    payload = HeartbeatPayload(
        status="online",
        uptime_s=uptime,
        version=self.version,
        devices=dict(self._devices),
    )
    topic = f"{self.topic_prefix}/status"
    logger.debug("Publishing heartbeat to %s", topic)
    await self._safe_publish(topic, payload.to_json())

shutdown async

shutdown() -> None

Gracefully shut down: publish "offline" for everything.

Publishes "offline" to each tracked device's availability topic (using root topic for root devices), then publishes "offline" to the app status topic, and clears internal device tracking.

Source code in packages/src/cosalette/_health/_reporter.py
async def shutdown(self) -> None:
    """Gracefully shut down: publish ``"offline"`` for everything.

    Publishes ``"offline"`` to each tracked device's availability
    topic (using root topic for root devices), then publishes
    ``"offline"`` to the app status topic, and clears internal
    device tracking.
    """
    logger.info("Health reporter shutting down — publishing offline")
    for device in list(self._devices):
        if device in self._root_devices:
            topic = f"{self.topic_prefix}/availability"
        else:
            topic = f"{self.topic_prefix}/{device}/availability"
        await self._safe_publish(topic, "offline")

    status_topic = f"{self.topic_prefix}/status"
    await self._safe_publish(status_topic, "offline")
    self._devices.clear()
    self._root_devices.clear()

cosalette.build_will_config

build_will_config(topic_prefix: str) -> WillConfig

Create a :class:WillConfig for the app's LWT.

The resulting config targets {topic_prefix}/status with payload "offline", QoS 1, retained. Pass this to :class:MqttClient so the broker publishes "offline" on unexpected disconnection.

Parameters:

Name Type Description Default
topic_prefix str

Application-level topic prefix (e.g. "velux2mqtt").

required

Returns:

Type Description
WillConfig

Pre-configured LWT for the app status topic.

Source code in packages/src/cosalette/_health/_reporter.py
def build_will_config(topic_prefix: str) -> WillConfig:
    """Create a :class:`WillConfig` for the app's LWT.

    The resulting config targets ``{topic_prefix}/status`` with payload
    ``"offline"``, QoS 1, retained.  Pass this to :class:`MqttClient`
    so the broker publishes ``"offline"`` on unexpected disconnection.

    Args:
        topic_prefix: Application-level topic prefix (e.g. ``"velux2mqtt"``).

    Returns:
        Pre-configured LWT for the app status topic.
    """
    return WillConfig(
        topic=f"{topic_prefix}/status",
        payload="offline",
        qos=1,
        retain=True,
    )

cosalette.HealthCheckable

Bases: Protocol

Adapter health check protocol (ADR-028).

Adapters that implement this single-method protocol are periodically probed by the framework. Return True when healthy, False otherwise. The framework sets per-device availability accordingly.

cosalette.AdapterHealthStatus dataclass

AdapterHealthStatus(
    healthy: bool = True,
    consecutive_failures: int = 0,
    last_check: float = 0.0,
    restart_count: int = 0,
    restart_exhausted: bool = False,
    last_restart: float = 0.0,
    last_healthy_since: float = 0.0,
)

Per-adapter health state snapshot for the health check runner.

Tracks whether an adapter is healthy, how many consecutive health check failures have occurred, and the monotonic timestamp of the last health check. Exposed for Epic 6 (auto-restart decisions).

Clock

cosalette.ClockPort

Bases: Protocol

Monotonic clock for timing measurements.

Used by device controllers and timing-sensitive components to measure elapsed time without being affected by system clock adjustments (NTP, manual changes, etc.).

The default implementation wraps time.monotonic(). Tests inject a deterministic fake clock for reproducible timing.

now

now() -> float

Return monotonic time in seconds.

Returns:

Type Description
float

A float representing seconds from an arbitrary epoch.

float

Only the difference between two calls is meaningful.

Source code in packages/src/cosalette/_clock.py
def now(self) -> float:
    """Return monotonic time in seconds.

    Returns:
        A float representing seconds from an arbitrary epoch.
        Only the *difference* between two calls is meaningful.
    """
    ...

sleep async

sleep(seconds: float) -> None

Sleep for seconds.

Used by :meth:DeviceContext.sleep for shutdown-aware sleeping. Production implementations delegate to asyncio.sleep; test doubles may advance virtual time instead.

Source code in packages/src/cosalette/_clock.py
async def sleep(self, seconds: float) -> None:
    """Sleep for *seconds*.

    Used by :meth:`DeviceContext.sleep` for shutdown-aware sleeping.
    Production implementations delegate to ``asyncio.sleep``; test
    doubles may advance virtual time instead.
    """
    ...

cosalette.SystemClock

Production clock wrapping time.monotonic().

Satisfies :class:ClockPort via structural subtyping.

Usage::

clock = SystemClock()
start = clock.now()
# ... some work ...
elapsed = clock.now() - start

now

now() -> float

Return monotonic time in seconds.

Source code in packages/src/cosalette/_clock.py
def now(self) -> float:
    """Return monotonic time in seconds."""
    return time.monotonic()

sleep async

sleep(seconds: float) -> None

Sleep for seconds using asyncio.sleep.

Source code in packages/src/cosalette/_clock.py
async def sleep(self, seconds: float) -> None:
    """Sleep for *seconds* using ``asyncio.sleep``."""
    await asyncio.sleep(seconds)

Logging

cosalette.JsonFormatter

JsonFormatter(*, service: str = '', version: str = '')

Bases: Formatter

Emit log records as single-line JSON objects (NDJSON).

Each record produces a JSON object with these fields:

  • timestamp — ISO 8601 with timezone (always UTC)
  • level — Python log level name
  • logger — dotted logger name
  • message — the formatted log message
  • service — application name for log correlation
  • version — application version (omitted when empty)
  • exception — formatted traceback (only present when an exception is logged)
  • stack_info — stack trace (only present when stack_info=True)

Parameters:

Name Type Description Default
service str

Application name included in every log line.

''
version str

Application version string. Omitted from output when empty.

''
Source code in packages/src/cosalette/_logging.py
def __init__(
    self,
    *,
    service: str = "",
    version: str = "",
) -> None:
    super().__init__()
    self._service = service
    self._version = version

format

format(record: LogRecord) -> str

Format a log record as a single-line JSON string.

Overrides :meth:logging.Formatter.format. The returned string contains no embedded newlines (tracebacks are escaped by the JSON serialiser), so each call produces exactly one log line — critical for container log drivers that split on \n.

Source code in packages/src/cosalette/_logging.py
def format(self, record: logging.LogRecord) -> str:
    """Format a log record as a single-line JSON string.

    Overrides :meth:`logging.Formatter.format`.  The returned
    string contains no embedded newlines (tracebacks are
    escaped by the JSON serialiser), so each call produces exactly
    one log line — critical for container log drivers that
    split on ``\\n``.
    """
    entry: dict[str, Any] = {
        "timestamp": datetime.fromtimestamp(record.created, tz=UTC).isoformat(),
        "level": record.levelname,
        "logger": record.name,
        "message": record.getMessage(),
        "service": self._service,
    }

    if self._version:
        entry["version"] = self._version

    if record.exc_info and record.exc_info[0] is not None:
        entry["exception"] = self.formatException(record.exc_info)

    if record.stack_info:
        entry["stack_info"] = self.formatStack(record.stack_info)

    return dumps(entry, default=str)

cosalette.configure_logging

configure_logging(
    settings: LoggingSettings,
    *,
    service: str,
    version: str = "",
) -> None

Configure the root logger from settings.

Clears any existing handlers on the root logger, then installs fresh handlers according to settings.

A :class:logging.StreamHandler writing to stderr is always installed. When settings.file is set, a :class:~logging.handlers.RotatingFileHandler is added as well.

Parameters:

Name Type Description Default
settings LoggingSettings

Logging configuration (level, format, file).

required
service str

Application name passed to :class:JsonFormatter.

required
version str

Application version passed to :class:JsonFormatter. Defaults to "".

''
Source code in packages/src/cosalette/_logging.py
def configure_logging(
    settings: LoggingSettings,
    *,
    service: str,
    version: str = "",
) -> None:
    """Configure the root logger from settings.

    Clears any existing handlers on the root logger, then
    installs fresh handlers according to *settings*.

    A :class:`logging.StreamHandler` writing to ``stderr`` is
    always installed.  When ``settings.file`` is set, a
    :class:`~logging.handlers.RotatingFileHandler` is added as
    well.

    Args:
        settings: Logging configuration (level, format, file).
        service: Application name passed to :class:`JsonFormatter`.
        version: Application version passed to
            :class:`JsonFormatter`.  Defaults to ``""``.
    """
    root = logging.getLogger()

    # Clear existing handlers
    for handler in root.handlers[:]:
        root.removeHandler(handler)
        with contextlib.suppress(Exception):
            handler.close()

    # Build formatter
    if settings.format == "json":
        formatter: logging.Formatter = JsonFormatter(service=service, version=version)
    else:
        formatter = logging.Formatter(_TEXT_FORMAT)

    # Stream handler (always present → stderr)
    stream_handler = logging.StreamHandler(sys.stderr)
    stream_handler.setFormatter(formatter)
    root.addHandler(stream_handler)

    # Optional rotating file handler
    if settings.file is not None:
        file_handler = RotatingFileHandler(
            settings.file,
            maxBytes=settings.max_file_size_mb * 1024 * 1024,
            backupCount=settings.backup_count,
            encoding="utf-8",
        )
        file_handler.setFormatter(formatter)
        root.addHandler(file_handler)

    root.setLevel(settings.level)

Settings

cosalette.Settings

Bases: BaseSettings

Root framework settings for cosalette applications.

Loaded from environment variables with the nested delimiter __ and an optional .env file in the working directory.

No env_prefix is set at the framework level — each application subclasses Settings and adds its own prefix (e.g. env_prefix="MYAPP_").

Example .env::

MQTT__HOST=broker.local
MQTT__PORT=1883
MQTT__USERNAME=user
MQTT__PASSWORD=secret
LOGGING__LEVEL=DEBUG
LOGGING__FORMAT=text

Example with an application prefix (subclass)::

class MyAppSettings(Settings):
    model_config = SettingsConfigDict(
        env_prefix="MYAPP_",
        env_nested_delimiter="__",
        env_file=".env",
        env_file_encoding="utf-8",
    )

model_config class-attribute instance-attribute

model_config = SettingsConfigDict(
    env_nested_delimiter="__",
    env_file=".env",
    env_file_encoding="utf-8",
    extra="ignore",
)

Settings uses extra="ignore" because the base class sets no env_prefix. Without a prefix, pydantic-settings reads every environment variable; the BaseSettings default of extra="forbid" would then reject unrelated variables (GH_TOKEN, PATH, etc.) as validation errors.

Subclasses that set env_prefix only see prefixed variables and may safely tighten this to extra="forbid" for strict validation.

cosalette.MqttSettings

Bases: BaseModel

MQTT broker connection and topic configuration.

Environment variables (with __ nesting)::

MQTT__HOST=broker.local
MQTT__PORT=1883
MQTT__USERNAME=user
MQTT__PASSWORD=secret
MQTT__TOPIC_PREFIX=myapp

cosalette.LoggingSettings

Bases: BaseModel

Logging configuration.

When file is set, logs are also written to a rotating file (size-based rotation, backup_count generations kept). When None, logs go to stderr only.

The format field selects the output format:

  • "json" (default) — structured JSON lines for container log aggregators (Loki, Elasticsearch, CloudWatch). Each line is a complete JSON object with correlation metadata.
  • "text" — human-readable timestamped format for local development and direct terminal use.

Adapter Lifecycle

Adapters registered via app.adapter() that implement the async context manager protocol (__aenter__/__aexit__) are automatically managed by the framework:

  • Entered during startup, before the lifespan= hook runs
  • Exited during shutdown, after the lifespan= hook exits
  • Managed via contextlib.AsyncExitStack for LIFO ordering and exception safety
  • Adapters without __aenter__/__aexit__ pass through unchanged

The detection is duck-typed — any object with both __aenter__ and __aexit__ attributes qualifies. No base class or registration is needed.

See ADR-016 for the design rationale and Adapter Lifecycle Management for usage examples.

Streaming

StreamablePort[T_co] and Stream[T] are the push-to-pull bridge for hardware devices that deliver data via callbacks rather than polling. All lifecycle methods (open, close, start_scan, stop_scan) are coroutines awaited by the stream runner. See Streaming for a full explanation and ADR-042 for design rationale.

cosalette.StreamablePort

Bases: Protocol

Contract for hardware ports that push data via callbacks.

Implementers open a connection, optionally start and stop a hardware scan (e.g. BLE discovery, USB enumeration), and let callers register a callback that fires for every inbound datum.

Lifecycle::

await port.open()
port.register_callback(stream.put)
await port.start_scan()
...
await port.stop_scan()
await port.close()

T_co is the type of item produced by the port (covariant: a port of Sensor satisfies StreamablePort[BaseSensor]).

open async

open() -> None

Open the hardware connection.

Source code in packages/src/cosalette/_runners/_stream_types.py
async def open(self) -> None:
    """Open the hardware connection."""
    ...

close async

close() -> None

Close the hardware connection and release resources.

Source code in packages/src/cosalette/_runners/_stream_types.py
async def close(self) -> None:
    """Close the hardware connection and release resources."""
    ...

start_scan async

start_scan() -> None

Begin emitting data (start scan / polling loop).

Source code in packages/src/cosalette/_runners/_stream_types.py
async def start_scan(self) -> None:
    """Begin emitting data (start scan / polling loop)."""
    ...

stop_scan async

stop_scan() -> None

Stop emitting data without closing the connection.

Source code in packages/src/cosalette/_runners/_stream_types.py
async def stop_scan(self) -> None:
    """Stop emitting data without closing the connection."""
    ...

register_callback

register_callback(cb: Callable[[T_co], None]) -> None

Register cb to be called for each inbound datum.

Parameters:

Name Type Description Default
cb Callable[[T_co], None]

Sync callable invoked with each item. The callback must not block; hardware callbacks are inherently synchronous. Use :class:Stream to bridge into async code.

required
Source code in packages/src/cosalette/_runners/_stream_types.py
def register_callback(self, cb: Callable[[T_co], None]) -> None:
    """Register *cb* to be called for each inbound datum.

    Args:
        cb: Sync callable invoked with each item.  The callback must
            not block; hardware callbacks are inherently synchronous.
            Use :class:`Stream` to bridge into async code.
    """
    ...

cosalette.Stream

Stream(
    *,
    maxsize: int = 0,
    backpressure: BackpressurePolicy = "raise",
    thread_safe: bool = False,
)

Async iterator backed by a push-callback bridge.

Bridges hardware callbacks (sync :meth:put) into async for loops. Shutdown is signalled once via :meth:shutdown; __anext__ then raises :exc:StopAsyncIteration and all further iteration stops. Shutdown is immediate — items still in the queue at shutdown are discarded, not drained.

Parameters:

Name Type Description Default
maxsize int

Maximum number of items buffered before :meth:put raises :exc:asyncio.QueueFull. 0 (default) means unbounded, matching :class:asyncio.Queue semantics.

0
backpressure BackpressurePolicy

Policy applied when maxsize > 0 and the queue is full. "raise" (default) raises :exc:asyncio.QueueFull to preserve pre-backpressure behaviour. "drop_newest" silently discards the incoming item. "drop_oldest" evicts the oldest queued item to make room for the incoming one. When :class:Stream is created by @app.stream, the decorator defaults to "drop_newest" — a safer choice for IoT producers that cannot block. The policy is a no-op when maxsize=0.

'raise'
thread_safe bool

If True, :meth:put may be called from any OS thread. The stream captures the running event loop at construction time and uses :meth:~asyncio.AbstractEventLoop.call_soon_threadsafe to marshal enqueue calls. When False (default), :meth:put must be called from the event-loop thread.

False

The iterator uses a sentinel-value pattern: :meth:shutdown enqueues a module-level _SENTINEL object into the queue, so a waiting __anext__ wakes immediately without creating extra tasks or sets.

Typical usage::

stream: Stream[SensorReading] = Stream()
port.register_callback(stream.put)
port.open()
port.start_scan()
async for reading in stream:
    ...  # process each pushed item
Source code in packages/src/cosalette/_runners/_stream_types.py
def __init__(
    self,
    *,
    maxsize: int = 0,
    backpressure: BackpressurePolicy = "raise",
    thread_safe: bool = False,
) -> None:
    self._queue: asyncio.Queue[T] = asyncio.Queue(maxsize=maxsize)
    self._backpressure = backpressure
    self._shutdown: asyncio.Event = asyncio.Event()
    self._thread_safe = thread_safe
    if thread_safe:
        self._loop = asyncio.get_running_loop()

put

put(item: T) -> None

Push item onto the queue (sync, never blocks).

When thread_safe=True was passed at construction, this method is safe to call from any OS thread. Otherwise it must be called from the event-loop thread. For off-loop use without thread_safe::

loop.call_soon_threadsafe(stream.put, item)

The backpressure policy takes effect when maxsize > 0 and the queue is full:

  • "raise" — raises :exc:asyncio.QueueFull (sync mode) or surfaces the exception on the event-loop thread (thread-safe mode).
  • "drop_newest" — the incoming item is discarded; a DEBUG log is emitted.
  • "drop_oldest" — the oldest queued item is evicted and item is enqueued; a DEBUG log is emitted.

When maxsize=0 (unbounded) the policy is never evaluated.

Raises:

Type Description
QueueFull

When maxsize > 0, the queue is full, and backpressure is "raise". In thread-safe mode the exception surfaces on the event-loop thread.

Source code in packages/src/cosalette/_runners/_stream_types.py
def put(self, item: T) -> None:
    """Push *item* onto the queue (sync, never blocks).

    When *thread_safe=True* was passed at construction, this method
    is safe to call from any OS thread.  Otherwise it must be called
    from the event-loop thread.  For off-loop use without
    *thread_safe*::

        loop.call_soon_threadsafe(stream.put, item)

    The backpressure policy takes effect when ``maxsize > 0`` and
    the queue is full:

    - ``"raise"`` — raises :exc:`asyncio.QueueFull` (sync mode) or
      surfaces the exception on the event-loop thread (thread-safe
      mode).
    - ``"drop_newest"`` — the incoming *item* is discarded; a DEBUG
      log is emitted.
    - ``"drop_oldest"`` — the oldest queued item is evicted and
      *item* is enqueued; a DEBUG log is emitted.

    When ``maxsize=0`` (unbounded) the policy is never evaluated.

    Raises:
        asyncio.QueueFull: When *maxsize* > 0, the queue is full,
            and *backpressure* is ``"raise"``.  In thread-safe mode
            the exception surfaces on the event-loop thread.
    """
    if self._thread_safe:
        self._loop.call_soon_threadsafe(self._enqueue_with_policy, item)
    else:
        self._enqueue_with_policy(item)

shutdown

shutdown() -> None

Signal the iterator to stop.

Idempotent. Once set, __anext__ raises :exc:StopAsyncIteration on the next call. Any items still in the queue are discarded — shutdown is immediate, not draining. Must be called from the event-loop thread.

Source code in packages/src/cosalette/_runners/_stream_types.py
def shutdown(self) -> None:
    """Signal the iterator to stop.

    Idempotent.  Once set, ``__anext__`` raises
    :exc:`StopAsyncIteration` on the next call.  Any items still in
    the queue are discarded — shutdown is immediate, not draining.
    Must be called from the event-loop thread.
    """
    self._shutdown.set()
    with contextlib.suppress(asyncio.QueueFull):
        self._queue.put_nowait(_SENTINEL)  # ty: ignore[invalid-argument-type]

Publish Strategies

cosalette.PublishStrategy

Bases: Protocol

Publish-decision contract for the device loop.

The framework calls _bind before the loop to inject the clock, should_publish each iteration, and on_published after a successful publish to let the strategy reset internal state (counters, timestamps, etc.).

should_publish

should_publish(
    current: dict[str, object],
    previous: dict[str, object] | None,
) -> bool

Decide whether the current reading should be published.

Parameters:

Name Type Description Default
current dict[str, object]

The latest telemetry payload.

required
previous dict[str, object] | None

The last published payload, or None on the very first call.

required

Returns:

Type Description
bool

True if the framework should publish current.

Source code in packages/src/cosalette/_strategies/_base.py
def should_publish(
    self,
    current: dict[str, object],
    previous: dict[str, object] | None,
) -> bool:
    """Decide whether the current reading should be published.

    Args:
        current: The latest telemetry payload.
        previous: The last *published* payload, or ``None`` on the
            very first call.

    Returns:
        ``True`` if the framework should publish ``current``.
    """
    ...

on_published

on_published() -> None

Called after a successful publish to reset internal state.

Source code in packages/src/cosalette/_strategies/_base.py
def on_published(self) -> None:
    """Called after a successful publish to reset internal state."""
    ...

cosalette.Every

Every(
    *, seconds: float | None = None, n: int | None = None
)

Bases: _StrategyBase

Time-based or count-based publish throttle.

Exactly one of seconds or n must be provided.

Every(seconds=30) Publish at most once every 30 seconds. Requires a :class:ClockPort injected via _bind(). Before binding, should_publish always returns True (safe fallback).

Every(n=5) Publish every 5th reading. No clock dependency.

Raises:

Type Description
ValueError

If both, neither, or non-positive values are given.

Source code in packages/src/cosalette/_strategies/_every.py
def __init__(
    self,
    *,
    seconds: float | None = None,
    n: int | None = None,
) -> None:
    if seconds is not None and n is not None:
        msg = "Specify exactly one of 'seconds' or 'n', not both"
        raise ValueError(msg)
    if seconds is None and n is None:
        msg = "Specify exactly one of 'seconds' or 'n'"
        raise ValueError(msg)

    if seconds is not None and seconds <= 0:
        msg = "'seconds' must be positive"
        raise ValueError(msg)
    if n is not None and n <= 0:
        msg = "'n' must be positive"
        raise ValueError(msg)

    self._seconds = seconds
    self._n = n

    # Time-mode state
    self._clock: ClockPort | None = None
    self._last_publish_time: float | None = None

    # Count-mode state
    self._counter: int = 0

should_publish

should_publish(
    current: dict[str, object],
    previous: dict[str, object] | None,
) -> bool

Return True when enough time/calls have elapsed.

Source code in packages/src/cosalette/_strategies/_every.py
def should_publish(
    self,
    current: dict[str, object],  # noqa: ARG002
    previous: dict[str, object] | None,  # noqa: ARG002
) -> bool:
    """Return ``True`` when enough time/calls have elapsed."""
    if self._seconds is not None:
        return self._should_publish_time()
    return self._should_publish_count()

on_published

on_published() -> None

Record publish timestamp or reset counter.

Source code in packages/src/cosalette/_strategies/_every.py
def on_published(self) -> None:
    """Record publish timestamp or reset counter."""
    if self._seconds is not None:
        if self._clock is not None:
            self._last_publish_time = self._clock.now()
    else:
        self._counter = 0

cosalette.OnChange

OnChange(
    *, threshold: float | dict[str, float] | None = None
)

Bases: _StrategyBase

Publish when the telemetry payload changes.

With threshold=None (default), uses exact equality (current != previous).

When threshold is a float, it acts as a global numeric dead-band: a leaf field must change by more than threshold (strict >) to trigger a publish. Non-numeric fields fall back to !=.

When threshold is a dict[str, float], each key names a leaf field with its own dead-band. Use dot-notation for nested fields (e.g. {"sensor.temp": 0.5}). Fields not listed in the dict use exact equality.

Thresholds are applied to leaf values only. Nested dicts are traversed recursively — {"sensor": {"temp": 22.5}} compares temp numerically, not the intermediate sensor dict as a whole.

In both threshold modes, structural changes (added or removed keys at any nesting level) always trigger a publish, and fields are combined with OR semantics — any single leaf field exceeding its threshold is sufficient.

Parameters:

Name Type Description Default
threshold float | dict[str, float] | None

Optional dead-band for numeric change detection. None → exact equality, float → global threshold, dict[str, float] → per-field thresholds (dot-notation for nested keys).

None
Source code in packages/src/cosalette/_strategies/_onchange.py
def __init__(
    self,
    *,
    threshold: float | dict[str, float] | None = None,
) -> None:
    if isinstance(threshold, dict):
        for field, value in threshold.items():
            if isinstance(value, bool):
                msg = f"Threshold for '{field}' must be a number, got bool"
                raise TypeError(msg)
            if value < 0:
                msg = f"Threshold for '{field}' must be non-negative, got {value}"
                raise ValueError(msg)
    elif isinstance(threshold, bool):
        msg = "Threshold must be a number, got bool"
        raise TypeError(msg)
    elif isinstance(threshold, (int, float)) and threshold < 0:
        msg = f"Threshold must be non-negative, got {threshold}"
        raise ValueError(msg)
    self._threshold = threshold

should_publish

should_publish(
    current: dict[str, object],
    previous: dict[str, object] | None,
) -> bool

Return True when the payload differs from the last publish.

When a threshold is configured, numeric fields are compared using abs(current - previous) > threshold (strict inequality). Non-numeric fields and structural changes always use exact equality.

Source code in packages/src/cosalette/_strategies/_onchange.py
def should_publish(
    self,
    current: dict[str, object],
    previous: dict[str, object] | None,
) -> bool:
    """Return ``True`` when the payload differs from the last publish.

    When a threshold is configured, numeric fields are compared
    using ``abs(current - previous) > threshold`` (strict
    inequality).  Non-numeric fields and structural changes always
    use exact equality.
    """
    if previous is None:
        return True
    if self._threshold is None:
        return current != previous
    return self._check_with_threshold(current, previous)

on_published

on_published() -> None

No-op — OnChange is stateless.

Source code in packages/src/cosalette/_strategies/_onchange.py
def on_published(self) -> None:
    """No-op — ``OnChange`` is stateless."""

Composite Strategies

cosalette.AllStrategy

AllStrategy(*children: _StrategyBase)

Bases: _StrategyBase

AND-composite: publishes only if all children say yes.

Nested AllStrategy instances are automatically flattened::

AllStrategy(AllStrategy(a, b), c)  →  AllStrategy(a, b, c)
Source code in packages/src/cosalette/_strategies/_composite.py
def __init__(self, *children: _StrategyBase) -> None:
    self._children: list[_StrategyBase] = []
    for child in children:
        if isinstance(child, AllStrategy):
            self._children.extend(child._children)
        else:
            self._children.append(child)
    if not self._children:
        msg = "AllStrategy requires at least one child strategy"
        raise ValueError(msg)

should_publish

should_publish(
    current: dict[str, object],
    previous: dict[str, object] | None,
) -> bool

Return True only if all children return True.

All children are evaluated eagerly (no short-circuit) so that stateful strategies like Every(n=N) always advance their internal counters.

Source code in packages/src/cosalette/_strategies/_composite.py
def should_publish(
    self,
    current: dict[str, object],
    previous: dict[str, object] | None,
) -> bool:
    """Return ``True`` only if **all** children return ``True``.

    All children are evaluated eagerly (no short-circuit) so that
    stateful strategies like ``Every(n=N)`` always advance their
    internal counters.
    """
    # IMPORTANT: list comprehension, not generator — eager evaluation
    # ensures stateful children (e.g. Every(n=N)) always advance.
    results = [c.should_publish(current, previous) for c in self._children]
    return all(results)

on_published

on_published() -> None

Notify all children of a publish event.

Source code in packages/src/cosalette/_strategies/_composite.py
def on_published(self) -> None:
    """Notify all children of a publish event."""
    for child in self._children:
        child.on_published()

cosalette.AnyStrategy

AnyStrategy(*children: _StrategyBase)

Bases: _StrategyBase

OR-composite: publishes if any child says yes.

Nested AnyStrategy instances are automatically flattened::

AnyStrategy(AnyStrategy(a, b), c)  →  AnyStrategy(a, b, c)
Source code in packages/src/cosalette/_strategies/_composite.py
def __init__(self, *children: _StrategyBase) -> None:
    self._children: list[_StrategyBase] = []
    for child in children:
        if isinstance(child, AnyStrategy):
            self._children.extend(child._children)
        else:
            self._children.append(child)
    if not self._children:
        msg = "AnyStrategy requires at least one child strategy"
        raise ValueError(msg)

should_publish

should_publish(
    current: dict[str, object],
    previous: dict[str, object] | None,
) -> bool

Return True if any child returns True.

All children are evaluated eagerly (no short-circuit) so that stateful strategies like Every(n=N) always advance their internal counters.

Source code in packages/src/cosalette/_strategies/_composite.py
def should_publish(
    self,
    current: dict[str, object],
    previous: dict[str, object] | None,
) -> bool:
    """Return ``True`` if **any** child returns ``True``.

    All children are evaluated eagerly (no short-circuit) so that
    stateful strategies like ``Every(n=N)`` always advance their
    internal counters.
    """
    # IMPORTANT: list comprehension, not generator — eager evaluation
    # ensures stateful children (e.g. Every(n=N)) always advance.
    results = [c.should_publish(current, previous) for c in self._children]
    return any(results)

on_published

on_published() -> None

Notify all children of a publish event.

Source code in packages/src/cosalette/_strategies/_composite.py
def on_published(self) -> None:
    """Notify all children of a publish event."""
    for child in self._children:
        child.on_published()

Introspection

cosalette.build_registry_snapshot

build_registry_snapshot(app: App) -> dict[str, Any]

Build a JSON-serializable snapshot of all app registrations.

Produces a dict describing the app metadata, devices, telemetry, commands, and adapters — suitable for json.dumps() without custom encoders.

Parameters:

Name Type Description Default
app App

The cosalette :class:App instance to introspect.

required

Returns:

Type Description
dict[str, Any]

A plain dict with string keys and JSON-serializable values.

Source code in packages/src/cosalette/_mcp/_introspect.py
def build_registry_snapshot(app: App) -> dict[str, Any]:
    """Build a JSON-serializable snapshot of all app registrations.

    Produces a dict describing the app metadata, devices, telemetry,
    commands, and adapters — suitable for ``json.dumps()`` without
    custom encoders.

    Args:
        app: The cosalette :class:`App` instance to introspect.

    Returns:
        A plain dict with string keys and JSON-serializable values.
    """
    return {
        "app": {
            "name": app.name,
            "version": app.version,
            "description": app.description,
        },
        "devices": [_describe_device(reg) for reg in app.devices],
        "telemetry": [_describe_telemetry(reg) for reg in app.telemetry_registrations],
        "commands": [_describe_command(reg) for reg in app.commands],
        "adapters": [
            _describe_adapter(port_type, entry)
            for port_type, entry in app.adapters.items()
        ],
    }

cosalette.format_registry_json

format_registry_json(snapshot: dict[str, Any]) -> str

Return the registry snapshot as indented JSON.

Parameters:

Name Type Description Default
snapshot dict[str, Any]

Dict returned by :func:build_registry_snapshot.

required

Returns:

Type Description
str

A pretty-printed JSON string.

Source code in packages/src/cosalette/_mcp/_introspect.py
def format_registry_json(snapshot: dict[str, Any]) -> str:
    """Return the registry *snapshot* as indented JSON.

    Args:
        snapshot: Dict returned by :func:`build_registry_snapshot`.

    Returns:
        A pretty-printed JSON string.
    """
    result: str = orjson.dumps(snapshot, option=orjson.OPT_INDENT_2).decode()
    return result

cosalette.format_registry_table

format_registry_table(snapshot: dict[str, Any]) -> str

Return the registry snapshot as a human-readable plain-text table.

Parameters:

Name Type Description Default
snapshot dict[str, Any]

Dict returned by :func:build_registry_snapshot.

required

Returns:

Type Description
str

A multi-line string with aligned columns per section.

Source code in packages/src/cosalette/_mcp/_introspect.py
def format_registry_table(snapshot: dict[str, Any]) -> str:
    """Return the registry *snapshot* as a human-readable plain-text table.

    Args:
        snapshot: Dict returned by :func:`build_registry_snapshot`.

    Returns:
        A multi-line string with aligned columns per section.
    """
    lines: list[str] = []
    app_info = snapshot["app"]
    desc = app_info.get("description") or ""
    header = f"{app_info['name']} v{app_info['version']}"
    if desc:
        header += f" — {desc}"
    lines.append(header)

    _append_devices_section(lines, snapshot.get("devices", []))
    _append_telemetry_section(lines, snapshot.get("telemetry", []))
    _append_commands_section(lines, snapshot.get("commands", []))
    _append_adapters_section(lines, snapshot.get("adapters", []))

    return "\n".join(lines)

Retry / Backoff

cosalette.BackoffStrategy

Bases: Protocol

Backoff-delay contract for telemetry retry.

The framework calls delay(attempt) between retry attempts. Attempt numbers are 1-based.

delay

delay(attempt: int) -> float

Return delay in seconds for the given attempt (1-based).

Source code in packages/src/cosalette/_retry.py
def delay(self, attempt: int) -> float:
    """Return delay in seconds for the given attempt (1-based)."""
    ...

cosalette.ExponentialBackoff

ExponentialBackoff(
    base: float = 2.0, max_delay: float = 60.0
)

Exponential backoff with ±20% jitter.

min(base * 2^(attempt-1), max_delay)

Source code in packages/src/cosalette/_retry.py
def __init__(self, base: float = 2.0, max_delay: float = 60.0) -> None:
    self._base = base
    self._max_delay = max_delay

cosalette.LinearBackoff

LinearBackoff(step: float = 2.0, max_delay: float = 60.0)

Linear backoff: min(step * attempt, max_delay) with ±20% jitter.

Source code in packages/src/cosalette/_retry.py
def __init__(self, step: float = 2.0, max_delay: float = 60.0) -> None:
    self._step = step
    self._max_delay = max_delay

cosalette.FixedBackoff

FixedBackoff(delay: float = 5.0)

Fixed backoff: constant delay with ±20% jitter.

Source code in packages/src/cosalette/_retry.py
def __init__(self, delay: float = 5.0) -> None:
    self._delay = delay

cosalette.CircuitBreaker

CircuitBreaker(threshold: int = 5)

Optional circuit breaker for telemetry retry.

Tracks consecutive failed cycles (where all retries were exhausted). After threshold consecutive failures, the circuit opens and the handler is skipped until a half-open probe succeeds.

Source code in packages/src/cosalette/_retry.py
def __init__(self, threshold: int = 5) -> None:
    if not isinstance(threshold, int) or threshold < 1:
        msg = "threshold must be a positive integer (>= 1)"
        raise ValueError(msg)
    self._threshold = threshold
    self._consecutive_failures = 0
    self._state: str = "closed"  # closed | open | half-open

record_success

record_success() -> None

Record a successful handler execution. Resets all state.

Source code in packages/src/cosalette/_retry.py
def record_success(self) -> None:
    """Record a successful handler execution. Resets all state."""
    self._consecutive_failures = 0
    self._state = "closed"

record_failure

record_failure() -> None

Record a cycle where all retries were exhausted.

Source code in packages/src/cosalette/_retry.py
def record_failure(self) -> None:
    """Record a cycle where all retries were exhausted."""
    self._consecutive_failures += 1
    if self._consecutive_failures >= self._threshold:
        self._state = "open"

should_attempt

should_attempt() -> bool

Return True if the handler should execute this cycle.

  • closed: always True
  • open: skip this cycle, transition to half-open for next cycle
  • half-open: True (probe attempt)
Source code in packages/src/cosalette/_retry.py
def should_attempt(self) -> bool:
    """Return True if the handler should execute this cycle.

    - closed: always True
    - open: skip this cycle, transition to half-open for next cycle
    - half-open: True (probe attempt)
    """
    if self._state == "closed":
        return True
    if self._state == "open":
        # Skip this cycle, but transition to half-open for the NEXT cycle
        self._state = "half-open"
        return False
    # half-open: probe attempt
    return True

Triggerable Telemetry

cosalette.TriggerPayload dataclass

TriggerPayload(
    is_triggered: bool = False,
    raw: str | None = None,
    data: dict[str, Any] | None = None,
)

Trigger context for triggerable telemetry handlers.

Injected via DI when a handler declares a TriggerPayload parameter. On scheduled runs, is_triggered is False and raw/data are None. On MQTT-triggered runs, is_triggered is True and the MQTT payload is available.

Examples:

Simple check::

@app.telemetry("sensor", interval=60, triggerable=True)
async def read_sensor(
    adapter: SensorPort,
    trigger: TriggerPayload,
) -> dict[str, object]:
    days = trigger.get("days", 7) if trigger.is_triggered else 7
    return await adapter.read(days=days)

get

get(key: str, default: Any = None) -> Any

Extract a key from parsed JSON data, with fallback.

Returns default when not triggered, when payload was not valid JSON, or when key is absent.

Source code in packages/src/cosalette/_runners/_trigger.py
def get(self, key: str, default: Any = None) -> Any:
    """Extract a key from parsed JSON data, with fallback.

    Returns *default* when not triggered, when payload was not
    valid JSON, or when *key* is absent.
    """
    if self.data is None:
        return default
    return self.data.get(key, default)

scheduled classmethod

scheduled() -> TriggerPayload

Return the singleton scheduled-run instance.

Source code in packages/src/cosalette/_runners/_trigger.py
@classmethod
def scheduled(cls) -> TriggerPayload:
    """Return the singleton scheduled-run instance."""
    assert _SCHEDULED is not None  # noqa: S101
    return _SCHEDULED

from_mqtt classmethod

from_mqtt(payload: str) -> TriggerPayload

Create a triggered instance from an MQTT payload string.

JSON parsing is best-effort — if payload is not valid JSON, data is None but raw is still set.

Source code in packages/src/cosalette/_runners/_trigger.py
@classmethod
def from_mqtt(cls, payload: str) -> TriggerPayload:
    """Create a triggered instance from an MQTT payload string.

    JSON parsing is best-effort — if *payload* is not valid JSON,
    ``data`` is ``None`` but ``raw`` is still set.
    """
    data: dict[str, Any] | None = None
    if payload:
        try:
            parsed = json.loads(payload)
            if isinstance(parsed, dict):
                data = parsed
        except json.JSONDecodeError, ValueError:
            logger.debug("Trigger payload is not valid JSON: %r", payload[:100])
    return cls(is_triggered=True, raw=payload or None, data=data)

Filters

cosalette.Filter

Bases: Protocol

Signal filter contract.

All filters follow the update → value pattern:

  1. Call update(raw) with each new measurement.
  2. The return value is the filtered output.
  3. Access value for the current filtered state.
  4. Call reset() to clear internal state.

The first update() call seeds the filter — it returns the raw value unchanged (no history to smooth against).

value property

value: float | None

Current filtered value, or None before the first update.

update

update(raw: float) -> float

Feed a raw measurement and return the filtered value.

Source code in packages/src/cosalette/_strategies/_filters.py
def update(self, raw: float) -> float:
    """Feed a raw measurement and return the filtered value."""
    ...

reset

reset() -> None

Clear internal state so the next update re-seeds.

Source code in packages/src/cosalette/_strategies/_filters.py
def reset(self) -> None:
    """Clear internal state so the next update re-seeds."""
    ...

cosalette.Pt1Filter

Pt1Filter(tau: float, dt: float)

First-order low-pass (PT1) filter — Rust drop-in for the Python implementation.

__doc__ class-attribute

__doc__ = "First-order low-pass (PT1) filter — Rust drop-in for the Python implementation."

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.

__module__ class-attribute

__module__ = 'cosalette._filters_rs'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.

__new__ builtin

__new__(*args, **kwargs)

Create and return a new object. See help(type) for accurate signature.

__repr__ method descriptor

__repr__()

Return repr(self).

cosalette.MedianFilter

MedianFilter(window: int)

Sliding-window median filter — Rust drop-in for the Python implementation.

__doc__ class-attribute

__doc__ = "Sliding-window median filter — Rust drop-in for the Python implementation."

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.

__module__ class-attribute

__module__ = 'cosalette._filters_rs'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.

__new__ builtin

__new__(*args, **kwargs)

Create and return a new object. See help(type) for accurate signature.

__repr__ method descriptor

__repr__()

Return repr(self).

cosalette.OneEuroFilter

OneEuroFilter(
    min_cutoff: float = 1.0,
    beta: float = 0.0,
    d_cutoff: float = 1.0,
    dt: float = 1.0,
)

Adaptive low-pass filter (1€ Filter) — Rust drop-in for the Python implementation.

__doc__ class-attribute

__doc__ = "Adaptive low-pass filter (1€ Filter) — Rust drop-in for the Python implementation."

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.

__module__ class-attribute

__module__ = 'cosalette._filters_rs'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.

__new__ builtin

__new__(*args, **kwargs)

Create and return a new object. See help(type) for accurate signature.

__repr__ method descriptor

__repr__()

Return repr(self).

Persistence

cosalette.PersistPolicy

Bases: Protocol

Save-timing contract for the persistence system.

The framework calls should_save after each telemetry cycle to decide whether to persist the :class:DeviceStore immediately. This is simpler than :class:PublishStrategy — no clock binding or post-publish callback is needed.

should_save

should_save(store: DeviceStore, published: bool) -> bool

Decide whether to save right now.

Parameters:

Name Type Description Default
store DeviceStore

The :class:DeviceStore being managed.

required
published bool

True if an MQTT publish just occurred this cycle.

required

Returns:

Type Description
bool

True if the store should be saved now.

Source code in packages/src/cosalette/_persistence/_persist.py
def should_save(self, store: DeviceStore, published: bool) -> bool:
    """Decide whether to save right now.

    Args:
        store: The :class:`DeviceStore` being managed.
        published: ``True`` if an MQTT publish just occurred
            this cycle.

    Returns:
        ``True`` if the store should be saved now.
    """
    ...

cosalette.SaveOnPublish

Bases: _SavePolicyBase

Save after each successful MQTT publish.

This is the most common policy — the store is persisted whenever new data is published to MQTT, ensuring the persisted state matches what's been broadcast.

should_save

should_save(store: DeviceStore, published: bool) -> bool

Return True when an MQTT publish just occurred.

Source code in packages/src/cosalette/_persistence/_persist.py
def should_save(
    self,
    store: DeviceStore,  # noqa: ARG002
    published: bool,
) -> bool:
    """Return ``True`` when an MQTT publish just occurred."""
    return published

cosalette.SaveOnChange

Bases: _SavePolicyBase

Save whenever the store has been modified (dirty).

Saves on every handler cycle where the store was mutated, regardless of whether MQTT publishing occurred. Most aggressive policy — ensures minimal data loss on crash.

should_save

should_save(store: DeviceStore, published: bool) -> bool

Return True when the store has uncommitted changes.

Source code in packages/src/cosalette/_persistence/_persist.py
def should_save(
    self,
    store: DeviceStore,
    published: bool,  # noqa: ARG002
) -> bool:
    """Return ``True`` when the store has uncommitted changes."""
    return store.dirty

cosalette.SaveOnShutdown

Bases: _SavePolicyBase

Save only on graceful shutdown.

The lightest I/O policy — no saves during normal operation. Data accumulated during a session is only persisted when the app shuts down cleanly. Risk: data loss on hard crash/power loss.

Note: The framework always saves on shutdown regardless of policy, so this policy effectively means "never save during the loop".

should_save

should_save(store: DeviceStore, published: bool) -> bool

Always return False — framework handles shutdown save.

Source code in packages/src/cosalette/_persistence/_persist.py
def should_save(
    self,
    store: DeviceStore,  # noqa: ARG002
    published: bool,  # noqa: ARG002
) -> bool:
    """Always return ``False`` — framework handles shutdown save."""
    return False

cosalette.AllSavePolicy

AllSavePolicy(*children: _SavePolicyBase)

Bases: _SavePolicyBase

AND-composite: save only if all children agree.

Nested AllSavePolicy instances are automatically flattened::

AllSavePolicy(AllSavePolicy(a, b), c)  →  AllSavePolicy(a, b, c)
Source code in packages/src/cosalette/_persistence/_persist.py
def __init__(self, *children: _SavePolicyBase) -> None:
    self._children: list[_SavePolicyBase] = []
    for child in children:
        if isinstance(child, AllSavePolicy):
            self._children.extend(child._children)
        else:
            self._children.append(child)
    if not self._children:
        msg = "AllSavePolicy requires at least one child policy"
        raise ValueError(msg)

should_save

should_save(store: DeviceStore, published: bool) -> bool

Return True only if all children return True.

Source code in packages/src/cosalette/_persistence/_persist.py
def should_save(self, store: DeviceStore, published: bool) -> bool:
    """Return ``True`` only if **all** children return ``True``."""
    return all(c.should_save(store, published) for c in self._children)

cosalette.AnySavePolicy

AnySavePolicy(*children: _SavePolicyBase)

Bases: _SavePolicyBase

OR-composite: save if any child says yes.

Nested AnySavePolicy instances are automatically flattened::

AnySavePolicy(AnySavePolicy(a, b), c)  →  AnySavePolicy(a, b, c)
Source code in packages/src/cosalette/_persistence/_persist.py
def __init__(self, *children: _SavePolicyBase) -> None:
    self._children: list[_SavePolicyBase] = []
    for child in children:
        if isinstance(child, AnySavePolicy):
            self._children.extend(child._children)
        else:
            self._children.append(child)
    if not self._children:
        msg = "AnySavePolicy requires at least one child policy"
        raise ValueError(msg)

should_save

should_save(store: DeviceStore, published: bool) -> bool

Return True if any child returns True.

Source code in packages/src/cosalette/_persistence/_persist.py
def should_save(self, store: DeviceStore, published: bool) -> bool:
    """Return ``True`` if **any** child returns ``True``."""
    return any(c.should_save(store, published) for c in self._children)

Stores

cosalette.Store

Bases: Protocol

Key-value persistence for device state.

Each key maps to a JSON-serializable dict. Implementations must handle missing keys (return None) and create storage locations as needed.

load

load(key: str) -> dict[str, object] | None

Load state for the given key. Returns None if not found.

Source code in packages/src/cosalette/_persistence/_stores.py
def load(self, key: str) -> dict[str, object] | None:
    """Load state for the given key.  Returns ``None`` if not found."""
    ...

save

save(key: str, data: dict[str, object]) -> None

Persist state for the given key.

Source code in packages/src/cosalette/_persistence/_stores.py
def save(self, key: str, data: dict[str, object]) -> None:
    """Persist state for the given key."""
    ...

cosalette.DeviceStore

DeviceStore(backend: Store, key: str)

Per-device scoped store with dirty tracking.

Wraps a :class:Store backend, automatically keyed by device name. Behaves like a dict (MutableMapping interface) — handlers read and write state naturally via store["key"] = value.

The framework creates one DeviceStore per device and manages its lifecycle:

  1. load() — called before the first handler invocation.
  2. Handler reads/writes via dict-like access.
  3. save() — always called on shutdown (safety net).

Dirty tracking: after load or save, the store is "clean". Any __setitem__ or __delitem__ marks it "dirty". For nested mutations the store cannot detect automatically, call :meth:mark_dirty explicitly.

Source code in packages/src/cosalette/_persistence/_stores.py
def __init__(self, backend: Store, key: str) -> None:
    self._backend = backend
    self._key = key
    self._data: dict[str, object] = {}
    self._dirty = False
    self._loaded = False

dirty property

dirty: bool

True if state has been modified since last load/save.

load

load() -> None

Load state from backend. Called by framework before first use.

Source code in packages/src/cosalette/_persistence/_stores.py
def load(self) -> None:
    """Load state from backend.  Called by framework before first use."""
    saved = self._backend.load(self._key)
    self._data = saved if saved is not None else {}
    self._dirty = False
    self._loaded = True

save

save() -> None

Persist current state to backend.

Source code in packages/src/cosalette/_persistence/_stores.py
def save(self) -> None:
    """Persist current state to backend."""
    self._backend.save(self._key, dict(self._data))
    self._dirty = False

mark_dirty

mark_dirty() -> None

Explicitly mark the store as dirty.

Use when mutating nested structures that __setitem__ can't detect (e.g. store["list"].append(x)).

Source code in packages/src/cosalette/_persistence/_stores.py
def mark_dirty(self) -> None:
    """Explicitly mark the store as dirty.

    Use when mutating nested structures that ``__setitem__``
    can't detect (e.g. ``store["list"].append(x)``).
    """
    self._dirty = True

get

get(key: str, default: object = None) -> object

Return the value for key, or default if not present.

Source code in packages/src/cosalette/_persistence/_stores.py
def get(self, key: str, default: object = None) -> object:
    """Return the value for *key*, or *default* if not present."""
    self._check_loaded()
    return self._data.get(key, default)

setdefault

setdefault(key: str, default: object = None) -> object

Return self[key] if present, else set and return default.

Source code in packages/src/cosalette/_persistence/_stores.py
def setdefault(self, key: str, default: object = None) -> object:
    """Return ``self[key]`` if present, else set and return *default*."""
    self._check_loaded()
    if key not in self._data:
        self._data[key] = default
        self._dirty = True
    return self._data[key]

update

update(
    other: dict[str, object] | None = None, **kwargs: object
) -> None

Update the store from a dict and/or keyword arguments.

Source code in packages/src/cosalette/_persistence/_stores.py
def update(self, other: dict[str, object] | None = None, **kwargs: object) -> None:
    """Update the store from a dict and/or keyword arguments."""
    self._check_loaded()
    if other:
        self._data.update(other)
        self._dirty = True
    if kwargs:
        self._data.update(kwargs)
        self._dirty = True

to_dict

to_dict() -> dict[str, object]

Return a shallow copy of the underlying data dict.

Useful when returning state from a telemetry handler (the handler returns this dict for MQTT publishing).

Source code in packages/src/cosalette/_persistence/_stores.py
def to_dict(self) -> dict[str, object]:
    """Return a shallow copy of the underlying data dict.

    Useful when returning state from a telemetry handler
    (the handler returns this dict for MQTT publishing).
    """
    self._check_loaded()
    return dict(self._data)

keys

keys() -> KeysView[str]

Return a view of the store's keys.

Source code in packages/src/cosalette/_persistence/_stores.py
def keys(self) -> KeysView[str]:
    """Return a view of the store's keys."""
    self._check_loaded()
    return self._data.keys()

values

values() -> ValuesView[object]

Return a view of the store's values.

Source code in packages/src/cosalette/_persistence/_stores.py
def values(self) -> ValuesView[object]:
    """Return a view of the store's values."""
    self._check_loaded()
    return self._data.values()

items

items() -> ItemsView[str, object]

Return a view of the store's items.

Source code in packages/src/cosalette/_persistence/_stores.py
def items(self) -> ItemsView[str, object]:
    """Return a view of the store's items."""
    self._check_loaded()
    return self._data.items()

cosalette.NullStore

No-op store — load always returns None, save is silent.

Use when persistence is disabled or for dry-run modes.

load

load(key: str) -> dict[str, object] | None

Always returns None.

Source code in packages/src/cosalette/_persistence/_stores.py
def load(self, key: str) -> dict[str, object] | None:  # noqa: ARG002
    """Always returns ``None``."""
    return None

save

save(key: str, data: dict[str, object]) -> None

Does nothing.

Source code in packages/src/cosalette/_persistence/_stores.py
def save(self, key: str, data: dict[str, object]) -> None:  # noqa: ARG002
    """Does nothing."""

cosalette.MemoryStore

MemoryStore(
    initial: dict[str, dict[str, object]] | None = None,
)

In-memory store backed by a plain dict.

Both load and save deep-copy data so that callers cannot mutate internal state by accident. Designed for tests — mirrors the FakeStorage pattern from gas2mqtt.

Parameters

initial: Optional seed data. The mapping is deep-copied on construction.

Source code in packages/src/cosalette/_persistence/_stores.py
def __init__(
    self,
    initial: dict[str, dict[str, object]] | None = None,
) -> None:
    self._data: dict[str, dict[str, object]] = (
        copy.deepcopy(initial) if initial else {}
    )

load

load(key: str) -> dict[str, object] | None

Return a deep copy of the stored dict, or None.

Source code in packages/src/cosalette/_persistence/_stores.py
def load(self, key: str) -> dict[str, object] | None:
    """Return a deep copy of the stored dict, or ``None``."""
    value = self._data.get(key)
    if value is None:
        return None
    return copy.deepcopy(value)

save

save(key: str, data: dict[str, object]) -> None

Store a deep copy of data.

Source code in packages/src/cosalette/_persistence/_stores.py
def save(self, key: str, data: dict[str, object]) -> None:
    """Store a deep copy of *data*."""
    self._data[key] = copy.deepcopy(data)

cosalette.JsonFileStore

JsonFileStore(path: Path | str)

Single-file JSON store with atomic writes.

All keys live as top-level keys in one JSON object. Writes use a write-to-temp + os.replace pattern for atomicity so that a crash mid-write never corrupts the file.

Parameters

path: Path to the JSON file. Parent directories are created automatically on the first save.

Source code in packages/src/cosalette/_persistence/_stores.py
def __init__(self, path: Path | str) -> None:
    self._path = Path(path)

load

load(key: str) -> dict[str, object] | None

Load a key from the JSON file.

Returns None when the file does not exist, the key is missing, or the file contains invalid JSON (a warning is logged in the latter case).

Source code in packages/src/cosalette/_persistence/_stores.py
def load(self, key: str) -> dict[str, object] | None:
    """Load a key from the JSON file.

    Returns ``None`` when the file does not exist, the key is
    missing, or the file contains invalid JSON (a warning is
    logged in the latter case).
    """
    if not self._path.exists():
        return None

    try:
        text = self._path.read_text(encoding="utf-8")
        data = loads(text)
    except (JSONDecodeError, OSError) as exc:
        logger.warning("Corrupt or unreadable store file %s: %s", self._path, exc)
        return None

    if not isinstance(data, dict):
        logger.warning(
            "Store file %s contains non-object JSON, treating as empty",
            self._path,
        )
        return None

    return data.get(key)

save

save(key: str, data: dict[str, object]) -> None

Persist data under key using an atomic write.

The full JSON object is read (if it exists), the key is updated, and the result is written to a temporary file before being atomically moved into place.

Source code in packages/src/cosalette/_persistence/_stores.py
def save(self, key: str, data: dict[str, object]) -> None:
    """Persist *data* under *key* using an atomic write.

    The full JSON object is read (if it exists), the key is
    updated, and the result is written to a temporary file
    before being atomically moved into place.
    """
    self._path.parent.mkdir(parents=True, exist_ok=True)

    # Read existing content (if any)
    existing: dict[str, object] = {}
    if self._path.exists():
        try:
            text = self._path.read_text(encoding="utf-8")
            parsed = loads(text)
            if isinstance(parsed, dict):
                existing = parsed
            else:
                logger.warning(
                    "Overwriting non-object JSON in store file %s",
                    self._path,
                )
        except (JSONDecodeError, OSError) as exc:
            logger.warning(
                "Overwriting corrupt store file %s: %s",
                self._path,
                exc,
            )

    existing[key] = data

    tmp_path = self._path.with_suffix(".tmp")
    tmp_path.write_text(
        dumps_pretty(existing) + "\n",
        encoding="utf-8",
    )
    os.replace(tmp_path, self._path)

cosalette.SqliteStore

SqliteStore(path: Path | str)

SQLite-backed store using WAL mode for power-loss resistance.

Each key is a row in a store table; the value is stored as a JSON text column. The table is auto-created on first use.

Parameters

path: Path to the SQLite database file. Parent directories are created automatically.

Source code in packages/src/cosalette/_persistence/_stores.py
def __init__(self, path: Path | str) -> None:
    self._path = Path(path)
    self._path.parent.mkdir(parents=True, exist_ok=True)
    self._conn = sqlite3.connect(str(self._path))
    self._conn.execute("PRAGMA journal_mode=WAL")
    self._conn.execute(
        "CREATE TABLE IF NOT EXISTS store "
        "(key TEXT PRIMARY KEY, data TEXT NOT NULL)"
    )
    self._conn.commit()

load

load(key: str) -> dict[str, object] | None

Load JSON data for key, or None if not present.

Source code in packages/src/cosalette/_persistence/_stores.py
def load(self, key: str) -> dict[str, object] | None:
    """Load JSON data for *key*, or ``None`` if not present."""
    cur = self._conn.execute("SELECT data FROM store WHERE key = ?", (key,))
    row = cur.fetchone()
    if row is None:
        return None
    return loads(row[0])  # type: ignore[no-any-return]

save

save(key: str, data: dict[str, object]) -> None

Insert or replace data for key.

Source code in packages/src/cosalette/_persistence/_stores.py
def save(self, key: str, data: dict[str, object]) -> None:
    """Insert or replace *data* for *key*."""
    self._conn.execute(
        "INSERT OR REPLACE INTO store (key, data) VALUES (?, ?)",
        (key, dumps_pretty(data)),
    )
    self._conn.commit()

close

close() -> None

Close the underlying database connection.

Source code in packages/src/cosalette/_persistence/_stores.py
def close(self) -> None:
    """Close the underlying database connection."""
    self._conn.close()