API Reference¶
Complete reference for all public classes, functions, and protocols exported by cosalette.
Application¶
cosalette.App
¶
App(
name: str,
version: str = "0.0.0",
*,
description: str = "IoT-to-MQTT bridge",
settings_class: type[Settings] = Settings,
dry_run: bool = False,
heartbeat_interval: float | None = 60.0,
health_check_interval: float | None = 30.0,
lifespan: LifespanFunc | None = None,
store: Store | None = None,
adapters: dict[
type,
type
| str
| Callable[..., object]
| tuple[
type | str | Callable[..., object],
type | str | Callable[..., object],
],
]
| None = None,
restart_after_failures: int = 5,
max_restarts: int = 3,
restart_cooldown: float = 5.0,
sustained_health_reset: float = 300.0,
)
Central composition root and application orchestrator.
Collects device registrations, adapter mappings, and an optional
lifespan context manager, then runs the full async lifecycle
in :meth:run.
See Also
ADR-001 — Framework architecture (IoC, composition root).
Initialise the application orchestrator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Application name (used as MQTT topic prefix and client ID). |
required |
version
|
str
|
Application version string. |
'0.0.0'
|
description
|
str
|
Short description for CLI help text. |
'IoT-to-MQTT bridge'
|
settings_class
|
type[Settings]
|
Settings subclass to instantiate at startup. |
Settings
|
dry_run
|
bool
|
When True, resolve dry-run adapter variants. |
False
|
heartbeat_interval
|
float | None
|
Seconds between periodic heartbeats
published to |
60.0
|
health_check_interval
|
float | None
|
Seconds between periodic health
checks for adapters implementing
:class: |
30.0
|
lifespan
|
LifespanFunc | None
|
Async context manager for application startup
and shutdown. Code before |
None
|
store
|
Store | None
|
Optional :class: |
None
|
adapters
|
dict[type, type | str | Callable[..., object] | tuple[type | str | Callable[..., object], type | str | Callable[..., object]]] | None
|
Optional mapping of port types to adapter
implementations. Each key is a Protocol type; each
value is either a single implementation (class,
lazy-import string, or factory callable) or a
|
None
|
Source code in packages/src/cosalette/_app.py
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 | |
settings
property
¶
settings: Settings
Application settings, instantiated at construction time.
The instance is created eagerly in __init__ from the
settings_class parameter. Environment variables and
.env files are read at that point, so decorator arguments
like interval=app.settings.poll_interval reflect the
actual runtime configuration.
The CLI entrypoint (:meth:cli) re-instantiates settings
with --env-file support and passes the result to
:meth:_run_async, which takes precedence over this
instance.
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the settings class could not be
instantiated at construction time (e.g. required
fields with no defaults and no matching environment
variables). Use |
on_configure
¶
Register a configuration hook called before devices start.
The hook runs after settings and adapters are resolved but before the run-loop. Parameters are injected by type annotation (Settings, adapter ports, Logger, ClockPort).
Use @app.on_configure (no parentheses).
See Also
ADR-023 — on_configure lifecycle phase.
Source code in packages/src/cosalette/_app.py
device
¶
device(
name: str | None = None,
*,
init: Callable[..., Any] | None = None,
enabled: bool = True,
) -> Callable[..., Any]
Register a command & control device.
The decorated function runs as a concurrent asyncio task.
Parameters are injected based on type annotations — declare
only what you need (e.g. ctx: DeviceContext,
settings: Settings, logger: logging.Logger).
Zero-parameter handlers are valid.
The framework subscribes to {name}/set and routes commands
to the handler registered via ctx.on_command.
When name is None, the function name is used internally
and the device publishes to root-level topics ({prefix}/state
instead of {prefix}/{device}/state).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | None
|
Device name for MQTT topics and logging. When
|
None
|
init
|
Callable[..., Any] | None
|
Optional synchronous factory called once before the handler loop. Its return value is injected into the handler by type. |
None
|
enabled
|
bool
|
When |
True
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If a device with this name is already registered. |
ValueError
|
If a second root (unnamed) device is registered. |
TypeError
|
If any handler parameter lacks a type annotation. |
Source code in packages/src/cosalette/_app.py
add_device
¶
add_device(
name: str | Callable[..., Any],
func: Callable[..., Awaitable[None]],
*,
init: Callable[..., Any] | None = None,
enabled: bool = True,
is_root: bool = False,
) -> None
Register a command & control device imperatively.
This is the imperative counterpart to :meth:device. It
always creates a named (non-root) registration by default.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | Callable[..., Any]
|
Device name for MQTT topics and logging. |
required |
func
|
Callable[..., Awaitable[None]]
|
Async callable that implements the device loop. |
required |
init
|
Callable[..., Any] | None
|
Optional synchronous factory called once before the handler loop. Its return value is injected into func by type. |
None
|
enabled
|
bool
|
When |
True
|
is_root
|
bool
|
When |
False
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If a device with this name is already registered. |
TypeError
|
If init is async or has un-annotated parameters. |
TypeError
|
If func has un-annotated parameters. |
See Also
:meth:device — decorator equivalent.
Source code in packages/src/cosalette/_app.py
command
¶
command(
name: str | None = None,
*,
init: Callable[..., Any] | None = None,
enabled: bool = True,
) -> Callable[..., Any]
Register a command handler for an MQTT device.
The decorated function is called each time a command arrives
on the {prefix}/{name}/set topic. Parameters named
topic and payload receive the MQTT message values;
all other parameters are injected by type annotation, exactly
like @app.device and @app.telemetry handlers.
If the handler returns a dict, the framework publishes it
as device state via publish_state(). Return None to
skip auto-publishing.
When name is None, the function name is used internally
and the device publishes to root-level topics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | None
|
Device name used for MQTT topics and logging. When
|
None
|
init
|
Callable[..., Any] | None
|
Optional synchronous factory called once before the handler loop. Its return value is injected into the handler by type. |
None
|
enabled
|
bool
|
When |
True
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If a device with this name is already registered. |
ValueError
|
If a second root (unnamed) device is registered. |
TypeError
|
If any handler parameter lacks a type annotation. |
Source code in packages/src/cosalette/_app.py
add_command
¶
add_command(
name: str | Callable[..., Any],
func: Callable[
..., Awaitable[dict[str, object] | None]
],
*,
init: Callable[..., Any] | None = None,
enabled: bool = True,
is_root: bool = False,
) -> None
Register a command handler imperatively.
This is the imperative counterpart to :meth:command. It
always creates a named (non-root) registration by default.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | Callable[..., Any]
|
Device name for MQTT topics and logging. |
required |
func
|
Callable[..., Awaitable[dict[str, object] | None]]
|
Async callable invoked on each incoming command.
Parameters named |
required |
init
|
Callable[..., Any] | None
|
Optional synchronous factory called once before the handler loop. Its return value is injected into func by type. |
None
|
enabled
|
bool
|
When |
True
|
is_root
|
bool
|
When |
False
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If a device with this name is already registered. |
TypeError
|
If init is async or has un-annotated parameters. |
TypeError
|
If func has un-annotated parameters. |
See Also
:meth:command — decorator equivalent.
Source code in packages/src/cosalette/_app.py
458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 | |
telemetry
¶
telemetry(
name: str | None = None,
*,
interval: IntervalSpec,
publish: PublishStrategy | None = None,
persist: PersistPolicy | None = None,
init: Callable[..., Any] | None = None,
enabled: bool = True,
group: str | None = None,
retry: int = 0,
retry_on: tuple[type[BaseException], ...] | None = None,
backoff: BackoffStrategy | None = None,
circuit_breaker: CircuitBreaker | None = None,
) -> Callable[..., Any]
Register a telemetry device with periodic polling.
The decorated function returns a dict published as JSON
state, or None to suppress publishing for that cycle.
Parameters are injected based on type annotations — declare
only what you need. Zero-parameter handlers are valid.
The framework calls the handler at the specified interval
and publishes the returned dict (unless suppressed by a
None return or a publish strategy).
When name is None, the function name is used internally
and the device publishes to root-level topics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | None
|
Device name for MQTT topics and logging. When
|
None
|
interval
|
IntervalSpec
|
Polling interval in seconds, or a callable
|
required |
publish
|
PublishStrategy | None
|
Optional publish strategy controlling when
readings are actually published (e.g. |
None
|
persist
|
PersistPolicy | None
|
Optional save policy controlling when the
:class: |
None
|
init
|
Callable[..., Any] | None
|
Optional synchronous factory called once before the handler loop. Its return value is injected into the handler by type. |
None
|
enabled
|
bool
|
When |
True
|
group
|
str | None
|
Optional coalescing group name. Telemetry devices
in the same group share a single scheduler tick so
their readings are published together. When |
None
|
retry
|
int
|
Maximum number of retry attempts after a failure.
Defaults to |
0
|
retry_on
|
tuple[type[BaseException], ...] | None
|
Exception types to retry on. Defaults to
|
None
|
backoff
|
BackoffStrategy | None
|
Backoff strategy controlling delay between retries
(e.g. |
None
|
circuit_breaker
|
CircuitBreaker | None
|
Optional circuit breaker that stops
retrying after consecutive failed cycles. Works
independently of |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If a device with this name is already registered. |
ValueError
|
If a second root (unnamed) device is registered. |
ValueError
|
If interval is a float and <= 0. For
callable intervals, validation is deferred to
:meth: |
ValueError
|
If |
ValueError
|
If group is an empty string. |
ValueError
|
If |
TypeError
|
If any handler parameter lacks a type annotation. |
Source code in packages/src/cosalette/_app.py
538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 | |
add_telemetry
¶
add_telemetry(
name: str | Callable[..., Any],
func: Callable[
..., Awaitable[dict[str, object] | None]
],
*,
interval: IntervalSpec,
publish: PublishStrategy | None = None,
persist: PersistPolicy | None = None,
init: Callable[..., Any] | None = None,
enabled: bool = True,
group: str | None = None,
is_root: bool = False,
retry: int = 0,
retry_on: tuple[type[BaseException], ...] | None = None,
backoff: BackoffStrategy | None = None,
circuit_breaker: CircuitBreaker | None = None,
) -> None
Register a telemetry device imperatively.
This is the imperative counterpart to :meth:telemetry. It
always creates a named (non-root) registration by default.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | Callable[..., Any]
|
Device name for MQTT topics and logging. |
required |
func
|
Callable[..., Awaitable[dict[str, object] | None]]
|
Async callable returning a |
required |
interval
|
IntervalSpec
|
Polling interval in seconds, or a callable
|
required |
publish
|
PublishStrategy | None
|
Optional publish strategy (e.g. |
None
|
persist
|
PersistPolicy | None
|
Optional save policy. Requires |
None
|
init
|
Callable[..., Any] | None
|
Optional synchronous factory called once before the handler loop. Its return value is injected into func by type. |
None
|
enabled
|
bool
|
When |
True
|
group
|
str | None
|
Optional coalescing group name. Telemetry devices
in the same group share a single scheduler tick so
their readings are published together. When |
None
|
is_root
|
bool
|
When |
False
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If a device with this name is already registered. |
ValueError
|
If interval is a float and <= 0. For
callable intervals, validation is deferred to
:meth: |
ValueError
|
If persist is set but no |
ValueError
|
If group is an empty string. |
TypeError
|
If init is async or has un-annotated parameters. |
TypeError
|
If func has un-annotated parameters. |
See Also
:meth:telemetry — decorator equivalent.
Source code in packages/src/cosalette/_app.py
726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 | |
adapter
¶
adapter(
port_type: type,
impl: type | str | Callable[..., object],
*,
dry_run: type
| str
| Callable[..., object]
| None = None,
) -> None
Register an adapter for a port type.
All adapter forms support dependency injection: if a class
__init__ or factory callable declares a parameter
annotated with Settings (or a subclass), the parsed
settings instance is auto-injected at resolution time.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
port_type
|
type
|
The Protocol type to register. |
required |
impl
|
type | str | Callable[..., object]
|
The adapter class, a |
required |
dry_run
|
type | str | Callable[..., object] | None
|
Optional dry-run variant (class, lazy import string, or factory callable). |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If an adapter is already registered for this port type. |
TypeError
|
If a callable (class or factory) has invalid signatures (e.g. un-annotated parameters or unresolvable types). |
Source code in packages/src/cosalette/_app.py
run
¶
run(
*,
mqtt: MqttPort | None = None,
settings: Settings | None = None,
shutdown_event: Event | None = None,
clock: ClockPort | None = None,
) -> None
Start the application (blocking, synchronous entrypoint).
Wraps :meth:_run_async in :func:asyncio.run, handling
KeyboardInterrupt for clean Ctrl-C shutdown. This is the
recommended way to launch a cosalette application::
app = cosalette.App(name="mybridge", version="0.1.0")
app.run()
All parameters are optional and intended for programmatic or
test use — production apps typically call run() with no
arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mqtt
|
MqttPort | None
|
Override MQTT client (e.g. |
None
|
settings
|
Settings | None
|
Override settings (skip env-file loading). |
None
|
shutdown_event
|
Event | None
|
Override shutdown event (skip OS signal handlers). Useful in tests to control shutdown timing. |
None
|
clock
|
ClockPort | None
|
Override clock (e.g. |
None
|
See Also
:meth:cli — CLI entrypoint with Typer argument parsing.
Source code in packages/src/cosalette/_app.py
cli
¶
Start the application with CLI argument parsing.
Builds a Typer CLI from the application's configuration,
parses command-line arguments (--dry-run, --version,
--log-level, --log-format, --env-file), and
orchestrates the full async lifecycle.
For production use without CLI parsing, prefer :meth:run.
See Also
ADR-005 — CLI framework.
Source code in packages/src/cosalette/_app.py
cosalette.AppContext
¶
AppContext(
*, settings: Settings, adapters: dict[type, object]
)
Context for the application lifespan.
Provided to the lifespan async context manager registered via
App(lifespan=...). Offers access to settings and adapter
resolution but NOT per-device features (no publish, no on_command,
no sleep).
See Also
ADR-001 — Framework architecture (lifespan).
Initialise lifecycle-hook context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
Settings
|
Application settings instance. |
required |
adapters
|
dict[type, object]
|
Resolved adapter registry mapping port types to instances. |
required |
Source code in packages/src/cosalette/_context.py
adapter
¶
Resolve an adapter by port type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
port_type
|
type[T]
|
The Protocol type to look up. |
required |
Returns:
| Type | Description |
|---|---|
T
|
The adapter instance registered for that port type. |
Raises:
| Type | Description |
|---|---|
LookupError
|
If no adapter is registered for the port type. |
Source code in packages/src/cosalette/_context.py
cosalette.DeviceContext
¶
DeviceContext(
*,
name: str,
settings: Settings,
mqtt: MqttPort,
topic_prefix: str,
shutdown_event: Event,
adapters: dict[type, object],
clock: ClockPort,
is_root: bool = False,
)
Per-device runtime context injected by the framework.
Provides device-scoped access to MQTT publishing, command registration, shutdown-aware sleeping, and adapter resolution.
Each device function receives its own DeviceContext instance. The context is pre-configured with the device's name and topic prefix so that publish operations target the correct MQTT topics automatically.
See Also
ADR-010 — Device archetypes. ADR-006 — Hexagonal architecture (adapter resolution).
Initialise per-device context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Device name as registered (e.g. "blind"). |
required |
settings
|
Settings
|
Application settings instance. |
required |
mqtt
|
MqttPort
|
MQTT port for publishing. |
required |
topic_prefix
|
str
|
Root prefix for MQTT topics (e.g. "velux2mqtt"). |
required |
shutdown_event
|
Event
|
Shared event that signals graceful shutdown. |
required |
adapters
|
dict[type, object]
|
Resolved adapter registry mapping port types to instances. |
required |
clock
|
ClockPort
|
Monotonic clock for timing. |
required |
is_root
|
bool
|
When True, topics omit the device name segment (root-level device). |
False
|
Source code in packages/src/cosalette/_context.py
shutdown_requested
property
¶
True when the framework has received a shutdown signal.
command_handler
property
¶
The root command handler, or None. Framework-internal.
command_handlers
property
¶
All registered command handlers keyed by sub-topic. Framework-internal.
get_command_handler
¶
Look up the command handler for a sub-topic (or root).
publish_state
async
¶
Publish device state to {prefix}/{device}/state as JSON.
For root devices (unnamed), publishes to {prefix}/state instead.
This is the primary publication method for device telemetry. The payload dict is JSON-serialised automatically.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
payload
|
dict[str, object]
|
Dict to serialise as JSON. |
required |
retain
|
bool
|
Whether the message should be retained (default True). |
True
|
Source code in packages/src/cosalette/_context.py
publish
async
¶
Publish to an arbitrary sub-channel: {prefix}/{device}/{channel}.
For root devices (unnamed), publishes to {prefix}/{channel} instead.
Escape hatch for non-standard topics. Prefer publish_state() for normal device state updates.
Source code in packages/src/cosalette/_context.py
sleep
async
¶
Shutdown-aware sleep.
Returns early (without exception) if shutdown is requested during the sleep period. This enables the idiomatic pattern::
while not ctx.shutdown_requested:
await ctx.sleep(10)
# ... do work ...
Source code in packages/src/cosalette/_context.py
on_command
¶
on_command(
handler_or_sub_topic: CommandHandler
| str
| None = None,
) -> (
CommandHandler
| Callable[[CommandHandler], CommandHandler]
)
Register a command handler for this device.
Supports three call patterns:
-
Decorator — root handler::
@ctx.on_command async def handle(sub_topic: str | None, payload: str) -> None: ...
-
Direct call — root handler::
ctx.on_command(handle)
-
Decorator factory — sub-topic handler::
@ctx.on_command("calibrate") async def handle_cal(sub_topic: str | None, payload: str) -> None: ...
Handlers may also accept a single :class:Command argument
(new-style), detected automatically via type annotation::
@ctx.on_command
async def handle(cmd: Command) -> None: ...
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If a handler is already registered for the same
sub-topic, or if |
ValueError
|
If the sub-topic string is empty or contains
|
Returns:
| Type | Description |
|---|---|
CommandHandler | Callable[[CommandHandler], CommandHandler]
|
The handler unchanged when called with a callable, or a |
CommandHandler | Callable[[CommandHandler], CommandHandler]
|
decorator function when called with a sub-topic string or None. |
Source code in packages/src/cosalette/_context.py
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 | |
commands
¶
Async iterator that yields inbound commands from the internal queue.
Provides a queue-backed async iterator for @app.device loops,
eliminating the need for manual asyncio.Queue bridges.
When timeout is provided, yields None on timeout expiry,
enabling periodic-work patterns::
async for cmd in ctx.commands(timeout=5):
if cmd is None:
await periodic_check()
else:
await process(cmd.payload)
Without timeout, blocks until a command arrives or shutdown is requested. Shutdown is detected immediately via event racing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float | None
|
Seconds to wait for a command before yielding None. When None (default), blocks indefinitely until a command arrives or shutdown is requested. |
None
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[Command | None]
|
Command when a command arrives, or None on timeout expiry. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If called more than once on the same context,
or if a command handler is already registered via
:meth: |
See Also
ADR-025 — Command channel and sub-topic routing.
Source code in packages/src/cosalette/_context.py
adapter
¶
Resolve an adapter by port type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
port_type
|
type[T]
|
The Protocol type to look up. |
required |
Returns:
| Type | Description |
|---|---|
T
|
The adapter instance registered for that port type. |
Raises:
| Type | Description |
|---|---|
LookupError
|
If no adapter is registered for the port type. |
Source code in packages/src/cosalette/_context.py
MQTT¶
cosalette.MqttPort
¶
Bases: Protocol
Port contract for MQTT publish/subscribe.
Satisfies ADR-006 hexagonal architecture: all MQTT interaction goes through this protocol so adapters are swappable.
cosalette.MqttClient
dataclass
¶
MqttClient(
settings: MqttSettings, will: WillConfig | None = None
)
Production MQTT adapter backed by aiomqtt.
Uses a background task that maintains a persistent connection
with automatic reconnection. aiomqtt is imported lazily
inside _connection_loop() so the mock and null adapters work
without the dependency installed.
See Also
ADR-006 — Hexagonal architecture (lazy imports).
ADR-012 — LWT / availability via WillConfig.
publish
async
¶
Publish a message to the broker.
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the client is not connected. |
Source code in packages/src/cosalette/_mqtt_client.py
subscribe
async
¶
Subscribe to topic.
The subscription is tracked internally so it can be restored after a reconnection.
Source code in packages/src/cosalette/_mqtt_client.py
on_message
¶
on_message(callback: MessageCallback) -> None
start
async
¶
Start the background connection loop.
Source code in packages/src/cosalette/_mqtt_client.py
stop
async
¶
Stop the connection loop and clean up.
Idempotent — safe to call multiple times.
Source code in packages/src/cosalette/_mqtt_client.py
cosalette.MqttLifecycle
¶
Bases: Protocol
Lifecycle management for MQTT adapters.
Adapters that need explicit start/stop (e.g. connecting to a broker)
implement this protocol. Adapters like MockMqttClient and
NullMqttClient that need no lifecycle management simply omit these
methods — the framework detects their absence via isinstance.
See Also
ADR-006 — Interface Segregation: ports are narrow by design. PEP 544 — Structural subtyping (Protocols).
cosalette.MqttMessageHandler
¶
Bases: Protocol
Message dispatch capability for MQTT adapters.
Adapters that can receive inbound messages implement this protocol.
The framework calls on_message to wire up the topic router.
See Also
ADR-006 — Interface Segregation.
cosalette.MockMqttClient
dataclass
¶
MockMqttClient(
published: list[tuple[str, str, bool, int]] = list(),
subscriptions: list[str] = list(),
raise_on_publish: Exception | None = None,
)
In-memory test double that records MQTT interactions.
Records publishes and subscriptions for assertion. Supports
callback registration and simulated message delivery via
deliver().
publish
async
¶
Record a publish call, or raise if raise_on_publish is set.
Source code in packages/src/cosalette/_mqtt.py
subscribe
async
¶
on_message
¶
on_message(callback: MessageCallback) -> None
deliver
async
¶
reset
¶
Clear all recorded data, callbacks, and failure injection.
get_messages_for
¶
Return (payload, retain, qos) tuples for topic.
Source code in packages/src/cosalette/_mqtt.py
cosalette.NullMqttClient
dataclass
¶
Silent no-op MQTT adapter.
Every method is a no-op that logs at DEBUG level. Useful as a default when MQTT is not configured.
publish
async
¶
Silently discard a publish request.
Source code in packages/src/cosalette/_mqtt.py
cosalette.WillConfig
dataclass
¶
Last-Will-and-Testament configuration.
Abstracts aiomqtt.Will so that callers never depend on the
aiomqtt package directly. The real client translates this into
the library-specific type inside _connection_loop().
See Also
ADR-012 — Health and availability reporting.
cosalette.MessageCallback
module-attribute
¶
Async callback receiving (topic, payload) for each inbound message.
Error Handling¶
cosalette.ErrorPayload
dataclass
¶
ErrorPayload(
error_type: str,
message: str,
device: str | None,
timestamp: str,
details: dict[str, object] = dict(),
)
Immutable structured error payload.
Represents a single error event ready for JSON serialisation and MQTT publication.
cosalette.ErrorPublisher
dataclass
¶
ErrorPublisher(
mqtt: MqttPort,
topic_prefix: str,
error_type_map: dict[type[Exception], str] = dict(),
clock: Callable[[], datetime] | None = None,
)
Publishes structured error payloads to MQTT.
Wraps :func:build_error_payload with fire-and-forget MQTT
publication. Errors during publication are logged but never
propagated — the main application loop must not crash because
an error report failed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mqtt
|
MqttPort
|
MQTT port used for publishing. |
required |
topic_prefix
|
str
|
Base prefix for error topics (e.g. |
required |
error_type_map
|
dict[type[Exception], str]
|
Pluggable mapping from consumer exception types to machine-readable type strings. |
dict()
|
clock
|
Callable[[], datetime] | None
|
Optional callable returning a :class: |
None
|
publish
async
¶
Build an error payload and publish it to MQTT.
Always publishes to {topic_prefix}/error. When device
is provided, also publishes to {topic_prefix}/{device}/error
(skipped for root devices, whose per-device topic would
duplicate the global topic).
The entire pipeline (build → serialise → publish) is wrapped in fire-and-forget semantics: failures at any stage are logged but never propagated to the caller.
Source code in packages/src/cosalette/_errors.py
cosalette.build_error_payload
¶
build_error_payload(
error: Exception,
*,
error_type_map: dict[type[Exception], str]
| None = None,
device: str | None = None,
details: dict[str, object] | None = None,
clock: Callable[[], datetime] | None = None,
) -> ErrorPayload
Convert an exception into a structured :class:ErrorPayload.
Looks up the exact class of the exception; subclasses are not matched.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error
|
Exception
|
The exception to convert. |
required |
error_type_map
|
dict[type[Exception], str] | None
|
Optional mapping from exception types to machine-readable
|
None
|
device
|
str | None
|
Optional device name to include in the payload. |
None
|
details
|
dict[str, object] | None
|
Optional dict of additional context to attach to the payload.
Defaults to an empty dict when |
None
|
clock
|
Callable[[], datetime] | None
|
Optional callable returning a :class: |
None
|
Returns:
| Type | Description |
|---|---|
ErrorPayload
|
A frozen dataclass ready for serialisation. |
Source code in packages/src/cosalette/_errors.py
Health and Availability¶
cosalette.DeviceStatus
dataclass
¶
Immutable status snapshot for a single device.
Used inside :class:HeartbeatPayload to report per-device health
in the heartbeat JSON.
cosalette.HeartbeatPayload
dataclass
¶
HeartbeatPayload(
status: str,
uptime_s: float,
version: str,
devices: dict[str, DeviceStatus] = dict(),
)
Immutable structured heartbeat payload.
Represents an app-level status snapshot ready for JSON serialisation and MQTT publication.
to_json
¶
Serialise to a JSON string.
Device entries are expanded to nested dicts via
:meth:DeviceStatus.to_dict.
Source code in packages/src/cosalette/_health.py
cosalette.HealthReporter
dataclass
¶
Publishes app heartbeats and per-device availability to MQTT.
Manages device tracking, uptime calculation (via monotonic clock), and graceful shutdown. All publication is fire-and-forget — errors are logged but never propagated.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mqtt
|
MqttPort
|
MQTT port used for publishing. |
required |
topic_prefix
|
str
|
Base prefix for health topics (e.g. |
required |
version
|
str
|
Application version string included in heartbeats. |
required |
clock
|
ClockPort
|
Monotonic clock for uptime measurement (see :class: |
required |
__post_init__
¶
set_device_status
¶
Update or add a device's status in the internal tracker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
device
|
str
|
Device name (used in topic paths and heartbeat payload). |
required |
status
|
str
|
Free-form status string, defaults to |
'ok'
|
Source code in packages/src/cosalette/_health.py
remove_device
¶
publish_device_available
async
¶
Publish "online" to the device availability topic.
For root devices (unnamed), publishes to {prefix}/availability
instead of {prefix}/{device}/availability.
Also registers the device as "ok" in internal tracking.
Source code in packages/src/cosalette/_health.py
publish_device_unavailable
async
¶
Publish "offline" to the device availability topic.
For root devices (unnamed), publishes to {prefix}/availability
instead of {prefix}/{device}/availability.
Also removes the device from internal tracking.
Source code in packages/src/cosalette/_health.py
publish_heartbeat
async
¶
Publish a structured JSON heartbeat to {prefix}/status.
The payload includes current uptime, version, and all tracked device statuses.
Source code in packages/src/cosalette/_health.py
shutdown
async
¶
Gracefully shut down: publish "offline" for everything.
Publishes "offline" to each tracked device's availability
topic (using root topic for root devices), then publishes
"offline" to the app status topic, and clears internal
device tracking.
Source code in packages/src/cosalette/_health.py
cosalette.build_will_config
¶
build_will_config(topic_prefix: str) -> WillConfig
Create a :class:WillConfig for the app's LWT.
The resulting config targets {topic_prefix}/status with payload
"offline", QoS 1, retained. Pass this to :class:MqttClient
so the broker publishes "offline" on unexpected disconnection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic_prefix
|
str
|
Application-level topic prefix (e.g. |
required |
Returns:
| Type | Description |
|---|---|
WillConfig
|
Pre-configured LWT for the app status topic. |
Source code in packages/src/cosalette/_health.py
Clock¶
cosalette.ClockPort
¶
Bases: Protocol
Monotonic clock for timing measurements.
Used by device controllers and timing-sensitive components to measure elapsed time without being affected by system clock adjustments (NTP, manual changes, etc.).
The default implementation wraps time.monotonic(). Tests
inject a deterministic fake clock for reproducible timing.
now
¶
Return monotonic time in seconds.
Returns:
| Type | Description |
|---|---|
float
|
A float representing seconds from an arbitrary epoch. |
float
|
Only the difference between two calls is meaningful. |
sleep
async
¶
Sleep for seconds.
Used by :meth:DeviceContext.sleep for shutdown-aware sleeping.
Production implementations delegate to asyncio.sleep; test
doubles may advance virtual time instead.
Source code in packages/src/cosalette/_clock.py
cosalette.SystemClock
¶
Production clock wrapping time.monotonic().
Satisfies :class:ClockPort via structural subtyping.
Usage::
clock = SystemClock()
start = clock.now()
# ... some work ...
elapsed = clock.now() - start
now
¶
Logging¶
cosalette.JsonFormatter
¶
Bases: Formatter
Emit log records as single-line JSON objects (NDJSON).
Each record produces a JSON object with these fields:
timestamp— ISO 8601 with timezone (always UTC)level— Python log level namelogger— dotted logger namemessage— the formatted log messageservice— application name for log correlationversion— application version (omitted when empty)exception— formatted traceback (only present when an exception is logged)stack_info— stack trace (only present whenstack_info=True)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service
|
str
|
Application name included in every log line. |
''
|
version
|
str
|
Application version string. Omitted from output when empty. |
''
|
Source code in packages/src/cosalette/_logging.py
format
¶
Format a log record as a single-line JSON string.
Overrides :meth:logging.Formatter.format. The returned
string contains no embedded newlines (tracebacks are
escaped by the JSON serialiser), so each call produces exactly
one log line — critical for container log drivers that
split on \n.
Source code in packages/src/cosalette/_logging.py
cosalette.configure_logging
¶
configure_logging(
settings: LoggingSettings,
*,
service: str,
version: str = "",
) -> None
Configure the root logger from settings.
Clears any existing handlers on the root logger, then installs fresh handlers according to settings.
A :class:logging.StreamHandler writing to stderr is
always installed. When settings.file is set, a
:class:~logging.handlers.RotatingFileHandler is added as
well.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settings
|
LoggingSettings
|
Logging configuration (level, format, file). |
required |
service
|
str
|
Application name passed to :class: |
required |
version
|
str
|
Application version passed to
:class: |
''
|
Source code in packages/src/cosalette/_logging.py
Settings¶
cosalette.Settings
¶
Bases: BaseSettings
Root framework settings for cosalette applications.
Loaded from environment variables with the nested delimiter
__ and an optional .env file in the working directory.
No env_prefix is set at the framework level — each
application subclasses Settings and adds its own prefix
(e.g. env_prefix="MYAPP_").
Example .env::
MQTT__HOST=broker.local
MQTT__PORT=1883
MQTT__USERNAME=user
MQTT__PASSWORD=secret
LOGGING__LEVEL=DEBUG
LOGGING__FORMAT=text
Example with an application prefix (subclass)::
class MyAppSettings(Settings):
model_config = SettingsConfigDict(
env_prefix="MYAPP_",
env_nested_delimiter="__",
env_file=".env",
env_file_encoding="utf-8",
)
model_config
class-attribute
instance-attribute
¶
model_config = SettingsConfigDict(
env_nested_delimiter="__",
env_file=".env",
env_file_encoding="utf-8",
extra="ignore",
)
Settings uses extra="ignore" because the base class sets
no env_prefix. Without a prefix, pydantic-settings reads
every environment variable; the BaseSettings default of
extra="forbid" would then reject unrelated variables
(GH_TOKEN, PATH, etc.) as validation errors.
Subclasses that set env_prefix only see prefixed variables
and may safely tighten this to extra="forbid" for strict
validation.
cosalette.MqttSettings
¶
Bases: BaseModel
MQTT broker connection and topic configuration.
Environment variables (with __ nesting)::
MQTT__HOST=broker.local
MQTT__PORT=1883
MQTT__USERNAME=user
MQTT__PASSWORD=secret
MQTT__TOPIC_PREFIX=myapp
cosalette.LoggingSettings
¶
Bases: BaseModel
Logging configuration.
When file is set, logs are also written to a rotating file
(size-based rotation, backup_count generations kept). When
None, logs go to stderr only.
The format field selects the output format:
"json"(default) — structured JSON lines for container log aggregators (Loki, Elasticsearch, CloudWatch). Each line is a complete JSON object with correlation metadata."text"— human-readable timestamped format for local development and direct terminal use.
Adapter Lifecycle¶
Adapters registered via app.adapter() that implement the async context manager
protocol (__aenter__/__aexit__) are automatically managed by the framework:
- Entered during startup, before the
lifespan=hook runs - Exited during shutdown, after the
lifespan=hook exits - Managed via
contextlib.AsyncExitStackfor LIFO ordering and exception safety - Adapters without
__aenter__/__aexit__pass through unchanged
The detection is duck-typed — any object with both __aenter__ and __aexit__
attributes qualifies. No base class or registration is needed.
See ADR-016 for the design rationale and Adapter Lifecycle Management for usage examples.
Publish Strategies¶
cosalette.PublishStrategy
¶
Bases: Protocol
Publish-decision contract for the device loop.
The framework calls _bind before the loop to inject the clock,
should_publish each iteration, and on_published after a
successful publish to let the strategy reset internal state
(counters, timestamps, etc.).
should_publish
¶
Decide whether the current reading should be published.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
current
|
dict[str, object]
|
The latest telemetry payload. |
required |
previous
|
dict[str, object] | None
|
The last published payload, or |
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
Source code in packages/src/cosalette/_strategies.py
cosalette.Every
¶
Bases: _StrategyBase
Time-based or count-based publish throttle.
Exactly one of seconds or n must be provided.
Every(seconds=30)
Publish at most once every 30 seconds. Requires a
:class:ClockPort injected via _bind(). Before binding,
should_publish always returns True (safe fallback).
Every(n=5)
Publish every 5th reading. No clock dependency.
Raises:
| Type | Description |
|---|---|
ValueError
|
If both, neither, or non-positive values are given. |
Source code in packages/src/cosalette/_strategies.py
cosalette.OnChange
¶
Bases: _StrategyBase
Publish when the telemetry payload changes.
With threshold=None (default), uses exact equality
(current != previous).
When threshold is a float, it acts as a global numeric
dead-band: a leaf field must change by more than threshold
(strict >) to trigger a publish. Non-numeric fields fall
back to !=.
When threshold is a dict[str, float], each key names a
leaf field with its own dead-band. Use dot-notation for
nested fields (e.g. {"sensor.temp": 0.5}). Fields not
listed in the dict use exact equality.
Thresholds are applied to leaf values only. Nested dicts
are traversed recursively — {"sensor": {"temp": 22.5}}
compares temp numerically, not the intermediate sensor
dict as a whole.
In both threshold modes, structural changes (added or removed keys at any nesting level) always trigger a publish, and fields are combined with OR semantics — any single leaf field exceeding its threshold is sufficient.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
threshold
|
float | dict[str, float] | None
|
Optional dead-band for numeric change detection.
|
None
|
Source code in packages/src/cosalette/_strategies.py
should_publish
¶
Return True when the payload differs from the last publish.
When a threshold is configured, numeric fields are compared
using abs(current - previous) > threshold (strict
inequality). Non-numeric fields and structural changes always
use exact equality.
Source code in packages/src/cosalette/_strategies.py
Introspection¶
cosalette.build_registry_snapshot
¶
build_registry_snapshot(app: App) -> dict[str, Any]
Build a JSON-serializable snapshot of all app registrations.
Produces a dict describing the app metadata, devices, telemetry,
commands, and adapters — suitable for json.dumps() without
custom encoders.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
app
|
App
|
The cosalette :class: |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
A plain dict with string keys and JSON-serializable values. |
Source code in packages/src/cosalette/_introspect.py
Retry / Backoff¶
cosalette.BackoffStrategy
¶
Bases: Protocol
Backoff-delay contract for telemetry retry.
The framework calls delay(attempt) between retry attempts.
Attempt numbers are 1-based.
cosalette.ExponentialBackoff
¶
cosalette.LinearBackoff
¶
cosalette.FixedBackoff
¶
cosalette.CircuitBreaker
¶
Optional circuit breaker for telemetry retry.
Tracks consecutive failed cycles (where all retries were exhausted).
After threshold consecutive failures, the circuit opens and
the handler is skipped until a half-open probe succeeds.
Source code in packages/src/cosalette/_retry.py
record_success
¶
record_failure
¶
Record a cycle where all retries were exhausted.
should_attempt
¶
Return True if the handler should execute this cycle.
- closed: always True
- open: skip this cycle, transition to half-open for next cycle
- half-open: True (probe attempt)
Source code in packages/src/cosalette/_retry.py
Filters¶
cosalette.Filter
¶
Bases: Protocol
Signal filter contract.
All filters follow the update → value pattern:
- Call
update(raw)with each new measurement. - The return value is the filtered output.
- Access
valuefor the current filtered state. - Call
reset()to clear internal state.
The first update() call seeds the filter — it returns the raw
value unchanged (no history to smooth against).
update
¶
cosalette.Pt1Filter
¶
First-order low-pass (PT1) filter — Rust drop-in for the Python implementation.
__doc__
class-attribute
¶
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.
__module__
class-attribute
¶
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.
__new__
builtin
¶
Create and return a new object. See help(type) for accurate signature.
cosalette.MedianFilter
¶
Sliding-window median filter — Rust drop-in for the Python implementation.
__doc__
class-attribute
¶
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.
__module__
class-attribute
¶
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.
__new__
builtin
¶
Create and return a new object. See help(type) for accurate signature.
cosalette.OneEuroFilter
¶
OneEuroFilter(
min_cutoff: float = 1.0,
beta: float = 0.0,
d_cutoff: float = 1.0,
dt: float = 1.0,
)
Adaptive low-pass filter (1€ Filter) — Rust drop-in for the Python implementation.
__doc__
class-attribute
¶
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.
__module__
class-attribute
¶
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.
__new__
builtin
¶
Create and return a new object. See help(type) for accurate signature.
Persistence¶
cosalette.PersistPolicy
¶
Bases: Protocol
Save-timing contract for the persistence system.
The framework calls should_save after each telemetry cycle
to decide whether to persist the :class:DeviceStore immediately.
This is simpler than :class:PublishStrategy — no clock binding
or post-publish callback is needed.
should_save
¶
should_save(store: DeviceStore, published: bool) -> bool
Decide whether to save right now.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
store
|
DeviceStore
|
The :class: |
required |
published
|
bool
|
|
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
Source code in packages/src/cosalette/_persist.py
cosalette.SaveOnPublish
¶
Bases: _SavePolicyBase
Save after each successful MQTT publish.
This is the most common policy — the store is persisted whenever new data is published to MQTT, ensuring the persisted state matches what's been broadcast.
should_save
¶
should_save(store: DeviceStore, published: bool) -> bool
cosalette.SaveOnChange
¶
Bases: _SavePolicyBase
Save whenever the store has been modified (dirty).
Saves on every handler cycle where the store was mutated, regardless of whether MQTT publishing occurred. Most aggressive policy — ensures minimal data loss on crash.
should_save
¶
should_save(store: DeviceStore, published: bool) -> bool
cosalette.SaveOnShutdown
¶
Bases: _SavePolicyBase
Save only on graceful shutdown.
The lightest I/O policy — no saves during normal operation. Data accumulated during a session is only persisted when the app shuts down cleanly. Risk: data loss on hard crash/power loss.
Note: The framework always saves on shutdown regardless of policy, so this policy effectively means "never save during the loop".
should_save
¶
should_save(store: DeviceStore, published: bool) -> bool
Always return False — framework handles shutdown save.
Stores¶
cosalette.Store
¶
Bases: Protocol
Key-value persistence for device state.
Each key maps to a JSON-serializable dict. Implementations
must handle missing keys (return None) and create storage
locations as needed.
load
¶
cosalette.DeviceStore
¶
DeviceStore(backend: Store, key: str)
Per-device scoped store with dirty tracking.
Wraps a :class:Store backend, automatically keyed by device name.
Behaves like a dict (MutableMapping interface) — handlers read
and write state naturally via store["key"] = value.
The framework creates one DeviceStore per device and manages
its lifecycle:
load()— called before the first handler invocation.- Handler reads/writes via dict-like access.
save()— always called on shutdown (safety net).
Dirty tracking: after load or save, the store is
"clean". Any __setitem__ or __delitem__ marks it "dirty".
For nested mutations the store cannot detect automatically, call
:meth:mark_dirty explicitly.
Source code in packages/src/cosalette/_stores.py
load
¶
Load state from backend. Called by framework before first use.
save
¶
mark_dirty
¶
Explicitly mark the store as dirty.
Use when mutating nested structures that __setitem__
can't detect (e.g. store["list"].append(x)).
get
¶
setdefault
¶
Return self[key] if present, else set and return default.
Source code in packages/src/cosalette/_stores.py
update
¶
Update the store from a dict and/or keyword arguments.
Source code in packages/src/cosalette/_stores.py
to_dict
¶
Return a shallow copy of the underlying data dict.
Useful when returning state from a telemetry handler (the handler returns this dict for MQTT publishing).
Source code in packages/src/cosalette/_stores.py
keys
¶
values
¶
cosalette.NullStore
¶
No-op store — load always returns None, save is silent.
Use when persistence is disabled or for dry-run modes.
load
¶
cosalette.MemoryStore
¶
In-memory store backed by a plain dict.
Both load and save deep-copy data so that callers cannot
mutate internal state by accident. Designed for tests — mirrors
the FakeStorage pattern from gas2mqtt.
Parameters¶
initial: Optional seed data. The mapping is deep-copied on construction.
Source code in packages/src/cosalette/_stores.py
load
¶
Return a deep copy of the stored dict, or None.
cosalette.JsonFileStore
¶
Single-file JSON store with atomic writes.
All keys live as top-level keys in one JSON object. Writes use a write-to-temp + os.replace pattern for atomicity so that a crash mid-write never corrupts the file.
Parameters¶
path:
Path to the JSON file. Parent directories are created
automatically on the first save.
Source code in packages/src/cosalette/_stores.py
load
¶
Load a key from the JSON file.
Returns None when the file does not exist, the key is
missing, or the file contains invalid JSON (a warning is
logged in the latter case).
Source code in packages/src/cosalette/_stores.py
save
¶
Persist data under key using an atomic write.
The full JSON object is read (if it exists), the key is updated, and the result is written to a temporary file before being atomically moved into place.
Source code in packages/src/cosalette/_stores.py
cosalette.SqliteStore
¶
SQLite-backed store using WAL mode for power-loss resistance.
Each key is a row in a store table; the value is stored as a
JSON text column. The table is auto-created on first use.
Parameters¶
path: Path to the SQLite database file. Parent directories are created automatically.