Architecture¶
Cosalette follows the composition root pattern: a single App object acts as
the wiring point where all infrastructure, devices, and lifespan logic come
together. The framework calls your code — not the other way around.
Architecture Overview¶
graph TB
subgraph User["User Code"]
direction LR
dev["@app.device"]
tel["@app.telemetry"]
cmd["@app.command"]
lifespan["lifespan()"]
end
subgraph App["App — Composition Root"]
direction LR
reg["Registry<br/><i>decorators + adapter()</i>"]
inj["Injection Engine<br/><i>signature → plan → resolve</i>"]
orch["Orchestrator<br/><i>_run_async()</i>"]
end
subgraph Ports["Ports — PEP 544 Protocols"]
direction LR
mqtt_port["MqttPort"]
clock_port["ClockPort"]
store_port["Store"]
filter_port["Filter"]
strategy["PublishStrategy"]
persist["PersistPolicy"]
custom_port["<i>User-defined ports</i>"]
end
subgraph Adapters["Adapters — Implementations"]
direction LR
mqtt_client["MqttClient<br/><small>aiomqtt</small>"]
sys_clock["SystemClock"]
stores["MemoryStore / JsonFileStore<br/>SqliteStore"]
filters["Pt1 / Median / OneEuro<br/><small>Rust pyo3</small>"]
strategies["Every / OnChange"]
persists["SaveOnPublish / SaveOnChange"]
custom_adapter["<i>User adapters</i>"]
end
subgraph Infra["Infrastructure"]
direction LR
broker["MQTT Broker"]
fs["Filesystem"]
hw["Hardware / Sensors"]
end
User -->|"register"| reg
reg -->|"build plan"| inj
inj -->|"inject at call time"| User
orch -->|"resolve"| Ports
mqtt_port -.->|"impl"| mqtt_client
clock_port -.->|"impl"| sys_clock
store_port -.->|"impl"| stores
filter_port -.->|"impl"| filters
strategy -.->|"impl"| strategies
persist -.->|"impl"| persists
custom_port -.->|"impl"| custom_adapter
mqtt_client --> broker
stores --> fs
custom_adapter --> hw
classDef userStyle fill:#fff8e1,stroke:#FFC105
classDef appStyle fill:#fff3e0,stroke:#FF9100
classDef portStyle fill:#e8eaf6,stroke:#6791E0
classDef adapterStyle fill:#e8f5e9,stroke:#2FB170
classDef infraStyle fill:#fce4ec,stroke:#E6695B
class dev,tel,cmd,lifespan userStyle
class reg,inj,orch appStyle
class mqtt_port,clock_port,store_port,filter_port,strategy,persist,custom_port portStyle
class mqtt_client,sys_clock,stores,filters,strategies,persists,custom_adapter adapterStyle
class broker,fs,hw infraStyle
Dependency rule: User Code → App → Ports ← Adapters → Infrastructure. User code depends on framework APIs and port protocols — never on concrete adapter implementations.
Bootstrap Lifecycle¶
graph LR
B["① Bootstrap<br/><small>settings, logging,<br/>adapters, MQTT</small>"]
W["② Wire<br/><small>availability, contexts,<br/>router, subscriptions</small>"]
R["③ Run<br/><small>lifespan, device tasks,<br/>heartbeat, await shutdown</small>"]
T["④ Teardown<br/><small>cancel tasks, lifespan exit,<br/>offline, disconnect</small>"]
B --> W --> R --> T
classDef phase fill:#fff3e0,stroke:#FF9100
class B,W,R,T phase
The FastAPI Analogy¶
If you have used FastAPI, the programming model will feel familiar:
import cosalette
app = cosalette.App(name="velux2mqtt", version="0.3.0") # (1)!
@app.device("blind") # (2)!
async def blind(ctx: cosalette.DeviceContext) -> None:
...
@app.command("valve") # (3)!
async def handle_valve(
topic: str, payload: str, ctx: cosalette.DeviceContext
) -> dict[str, object]:
return {"state": payload}
@app.telemetry("temp", interval=60) # (4)!
async def temp(ctx: cosalette.DeviceContext) -> dict[str, object]:
return {"celsius": 21.5}
app.adapter(GpioPort, RpiGpioAdapter, dry_run=MockGpio) # (5)!
app.run() # (6)!
- Composition root — the
Appis constructed once at module level. - Device decorator —
@app.deviceregisters a long-running command & control coroutine (escape hatch). - Command decorator —
@app.commandregisters a per-message command handler (recommended for most command devices). - Telemetry decorator —
@app.telemetryregisters a periodic polling function. - Adapter binding — maps a Protocol port to a concrete implementation (with optional dry-run variant).
- Entry point — builds the CLI, parses arguments, and runs the async lifecycle.
This is Inversion of Control (IoC): the framework owns the asyncio event
loop, signal handling, MQTT connection management, and task supervision. Your
device functions and hooks are called back by the framework at the appropriate
point in the lifecycle.
Registration API¶
All registration happens at import time through decorators and method calls on
the App instance:
| API | Purpose |
|---|---|
@app.command(name?, init=?, enabled=?) |
Register a per-message command handler with optional init callback (recommended). Omit name for root-level topics. name= accepts a callable for multi-device registration. |
@app.device(name?, init=?, enabled=?) |
Register a long-running command & control coroutine with optional init callback. Omit name for root-level topics. name= accepts a callable for multi-device registration. |
@app.telemetry(name?, interval=N, publish=?, persist=?, init=?, enabled=?, group=?) |
Register a periodic telemetry device with optional publish strategy, persistence policy, init callback, and coalescing group. Omit name for root-level topics. name= accepts a callable for multi-device registration. |
app.add_command(name, handler, ...) |
Imperative counterpart to @app.command. name accepts a string or callable (same as decorator form). |
app.add_device(name, handler, ...) |
Imperative counterpart to @app.device. name accepts a string or callable (same as decorator form). |
app.add_telemetry(name, handler, ...) |
Imperative counterpart to @app.telemetry. name accepts a string or callable (same as decorator form). |
App(lifespan=fn) |
Register a lifespan context manager |
App(store=StoreBackend()) |
Enable device persistence (required for persist= on telemetry). |
App(adapters={Port: Impl, ...}) |
Declarative adapter dict — alternative to app.adapter(). |
@app.on_configure |
Register a lifecycle hook that runs after settings and adapters are ready (before devices are wired). Receives dependencies via injection. See Multi-Device Registration. |
app.adapter(Port, Impl) |
Bind a Protocol port to a concrete adapter |
No base classes
Device functions are plain async def coroutines. There is no
BaseDevice to inherit from — handlers declare only the parameters
they need via type annotations, and the framework injects them
automatically.
Context Injection¶
The framework uses signature-based injection — device handlers declare only the parameters they need via type annotations, and the framework provides them automatically.
Injected into @app.command, @app.device, and @app.telemetry functions
when they declare a ctx: DeviceContext parameter. Provides device-scoped
MQTT publishing, command registration, shutdown-aware sleep, and adapter
resolution.
For @app.command handlers, dependencies are injected by type annotation:
@app.command("relay")
async def handle_relay(
topic: str, payload: str, ctx: cosalette.DeviceContext
) -> dict[str, object]:
gpio = ctx.adapter(GpioPort) # resolve adapter
return {"on": gpio.read()}
For @app.device functions, the context is the sole parameter:
Handlers that don't need context can omit all parameters. This is common for simple telemetry devices:
Handlers can request specific injectable types — Settings, logging.Logger,
ClockPort, asyncio.Event, or adapter port types:
Injected into the lifespan function. Provides settings and adapter resolution but not device-scoped features (no publish, no sleep, no on_command).
Four-Phase Orchestration¶
The App._run_async() method orchestrates the full application lifecycle in
four sequential phases:
| Phase | What happens |
|---|---|
| Bootstrap | Load settings, configure logging, resolve adapters, run configure hooks, expand name specs, resolve intervals, connect MQTT |
| Wire | Install signal handlers, publish availability, build contexts, wire router |
| Run | Execute lifespan startup, launch device tasks, await shutdown_event.wait() |
| Teardown | Execute lifespan teardown, cancel tasks, publish offline, disconnect MQTT |
Each phase is detailed in the Application Lifecycle concept page.
Test Seams¶
_run_async() accepts four optional keyword arguments specifically designed
as test seams — injection points that let tests bypass real infrastructure:
await app._run_async(
settings=make_settings(), # skip env/dotenv loading
shutdown_event=asyncio.Event(), # manual shutdown control
mqtt=MockMqttClient(), # in-memory MQTT double
clock=FakeClock(), # deterministic time
)
This design means integration tests run entirely in-process with no broker, no real clock, and no signal handlers. See Testing for the full testing strategy.
Design principle — Dependency Injection over Service Locator
The test-seam parameters follow constructor/method injection rather than a global service locator. Each dependency is explicit and visible at the call site, making tests self-documenting.
Why a Composition Root?¶
The composition root pattern (as described in ADR-001) solves several problems specific to IoT bridge daemons:
- Single wiring point — all adapters, devices, and lifespan are assembled in one place, making the dependency graph explicit.
- Testability — inject doubles at the root without modifying device code.
- Dry-run mode — swap adapter implementations globally by setting one flag.
- Discoverability — reading
app.pyreveals the full application topology.
See Also¶
- Application Lifecycle — detailed phase-by-phase walkthrough
- Hexagonal Architecture — ports, adapters, and the dependency rule
- Device Archetypes — the three first-class device types
- Testing — test seams and the
AppHarness - ADR-001 — Framework Architecture Style
- ADR-005 — CLI Framework
- ADR-006 — Hexagonal Architecture
Architecture Decision Records (ADRs)
All major architectural decisions are documented as Architecture Decision Records.
| ADR | Title | Status | Date |
|---|---|---|---|
| ADR-001 | Framework Architecture Style | Accepted | 2026-02-14 |
| ADR-002 | MQTT Topic Conventions | Accepted | 2026-02-14 |
| ADR-003 | Configuration System | Accepted | 2026-02-14 |
| ADR-004 | Logging Strategy | Accepted | 2026-02-14 |
| ADR-005 | CLI Framework | Accepted | 2026-02-14 |
| ADR-006 | Hexagonal Architecture (Ports & Adapters) | Accepted | 2026-02-14 |
| ADR-007 | Testing Strategy | Accepted | 2026-02-14 |
| ADR-008 | Packaging and Distribution | Accepted | 2026-02-14 |
| ADR-009 | Python Version and Dependencies | Accepted | 2026-02-14 |
| ADR-010 | Device Archetypes | Accepted | 2026-02-14 |
| ADR-011 | Error Handling and Publishing | Accepted | 2026-02-14 |
| ADR-012 | Health and Availability Reporting | Accepted | 2026-02-14 |
| ADR-013 | Telemetry Publish Strategies | Accepted | 2026-02-22 |
| ADR-014 | Signal Filters | Accepted | 2026-02-22 |
| ADR-015 | Persistence | Accepted | 2026-02-25 |
| ADR-016 | Adapter Lifecycle Protocol | Accepted | 2026-02-26 |
| ADR-017 | SBOM Generation | Accepted | 2026-02-27 |
| ADR-018 | Telemetry Coalescing Groups | Accepted | 2026-03-03 |