Skip to content

Architecture

Cosalette follows the composition root pattern: a single App object acts as the wiring point where all infrastructure, devices, and lifespan logic come together. The framework calls your code — not the other way around.

Architecture Overview

graph TB
    subgraph User["User Code"]
        direction LR
        dev["@app.device"]
        tel["@app.telemetry"]
        cmd["@app.command"]
        lifespan["lifespan()"]
    end

    subgraph App["App — Composition Root"]
        direction LR
        reg["Registry<br/><i>decorators + adapter()</i>"]
        inj["Injection Engine<br/><i>signature → plan → resolve</i>"]
        orch["Orchestrator<br/><i>_run_async()</i>"]
    end

    subgraph Ports["Ports — PEP 544 Protocols"]
        direction LR
        mqtt_port["MqttPort"]
        clock_port["ClockPort"]
        store_port["Store"]
        filter_port["Filter"]
        strategy["PublishStrategy"]
        persist["PersistPolicy"]
        custom_port["<i>User-defined ports</i>"]
    end

    subgraph Adapters["Adapters — Implementations"]
        direction LR
        mqtt_client["MqttClient<br/><small>aiomqtt</small>"]
        sys_clock["SystemClock"]
        stores["MemoryStore / JsonFileStore<br/>SqliteStore"]
        filters["Pt1 / Median / OneEuro<br/><small>Rust pyo3</small>"]
        strategies["Every / OnChange"]
        persists["SaveOnPublish / SaveOnChange"]
        custom_adapter["<i>User adapters</i>"]
    end

    subgraph Infra["Infrastructure"]
        direction LR
        broker["MQTT Broker"]
        fs["Filesystem"]
        hw["Hardware / Sensors"]
    end

    User -->|"register"| reg
    reg -->|"build plan"| inj
    inj -->|"inject at call time"| User
    orch -->|"resolve"| Ports

    mqtt_port -.->|"impl"| mqtt_client
    clock_port -.->|"impl"| sys_clock
    store_port -.->|"impl"| stores
    filter_port -.->|"impl"| filters
    strategy -.->|"impl"| strategies
    persist -.->|"impl"| persists
    custom_port -.->|"impl"| custom_adapter

    mqtt_client --> broker
    stores --> fs
    custom_adapter --> hw

    classDef userStyle fill:#fff8e1,stroke:#FFC105
    classDef appStyle fill:#fff3e0,stroke:#FF9100
    classDef portStyle fill:#e8eaf6,stroke:#6791E0
    classDef adapterStyle fill:#e8f5e9,stroke:#2FB170
    classDef infraStyle fill:#fce4ec,stroke:#E6695B

    class dev,tel,cmd,lifespan userStyle
    class reg,inj,orch appStyle
    class mqtt_port,clock_port,store_port,filter_port,strategy,persist,custom_port portStyle
    class mqtt_client,sys_clock,stores,filters,strategies,persists,custom_adapter adapterStyle
    class broker,fs,hw infraStyle

Dependency rule: User Code → App → Ports ← Adapters → Infrastructure. User code depends on framework APIs and port protocols — never on concrete adapter implementations.

Bootstrap Lifecycle

graph LR
    B["① Bootstrap<br/><small>settings, logging,<br/>adapters, MQTT</small>"]
    W["② Wire<br/><small>availability, contexts,<br/>router, subscriptions</small>"]
    R["③ Run<br/><small>lifespan, device tasks,<br/>heartbeat, await shutdown</small>"]
    T["④ Teardown<br/><small>cancel tasks, lifespan exit,<br/>offline, disconnect</small>"]

    B --> W --> R --> T

    classDef phase fill:#fff3e0,stroke:#FF9100
    class B,W,R,T phase

The FastAPI Analogy

If you have used FastAPI, the programming model will feel familiar:

import cosalette

app = cosalette.App(name="velux2mqtt", version="0.3.0")  # (1)!

@app.device("blind")  # (2)!
async def blind(ctx: cosalette.DeviceContext) -> None:
    ...

@app.command("valve")  # (3)!
async def handle_valve(
    topic: str, payload: str, ctx: cosalette.DeviceContext
) -> dict[str, object]:
    return {"state": payload}

@app.telemetry("temp", interval=60)  # (4)!
async def temp(ctx: cosalette.DeviceContext) -> dict[str, object]:
    return {"celsius": 21.5}

app.adapter(GpioPort, RpiGpioAdapter, dry_run=MockGpio)  # (5)!

app.run()  # (6)!
  1. Composition root — the App is constructed once at module level.
  2. Device decorator@app.device registers a long-running command & control coroutine (escape hatch).
  3. Command decorator@app.command registers a per-message command handler (recommended for most command devices).
  4. Telemetry decorator@app.telemetry registers a periodic polling function.
  5. Adapter binding — maps a Protocol port to a concrete implementation (with optional dry-run variant).
  6. Entry point — builds the CLI, parses arguments, and runs the async lifecycle.

This is Inversion of Control (IoC): the framework owns the asyncio event loop, signal handling, MQTT connection management, and task supervision. Your device functions and hooks are called back by the framework at the appropriate point in the lifecycle.

Registration API

All registration happens at import time through decorators and method calls on the App instance:

API Purpose
@app.command(name?, init=?, enabled=?) Register a per-message command handler with optional init callback (recommended). Omit name for root-level topics. name= accepts a callable for multi-device registration.
@app.device(name?, init=?, enabled=?) Register a long-running command & control coroutine with optional init callback. Omit name for root-level topics. name= accepts a callable for multi-device registration.
@app.telemetry(name?, interval=N, publish=?, persist=?, init=?, enabled=?, group=?) Register a periodic telemetry device with optional publish strategy, persistence policy, init callback, and coalescing group. Omit name for root-level topics. name= accepts a callable for multi-device registration.
app.add_command(name, handler, ...) Imperative counterpart to @app.command. name accepts a string or callable (same as decorator form).
app.add_device(name, handler, ...) Imperative counterpart to @app.device. name accepts a string or callable (same as decorator form).
app.add_telemetry(name, handler, ...) Imperative counterpart to @app.telemetry. name accepts a string or callable (same as decorator form).
App(lifespan=fn) Register a lifespan context manager
App(store=StoreBackend()) Enable device persistence (required for persist= on telemetry).
App(adapters={Port: Impl, ...}) Declarative adapter dict — alternative to app.adapter().
@app.on_configure Register a lifecycle hook that runs after settings and adapters are ready (before devices are wired). Receives dependencies via injection. See Multi-Device Registration.
app.adapter(Port, Impl) Bind a Protocol port to a concrete adapter

No base classes

Device functions are plain async def coroutines. There is no BaseDevice to inherit from — handlers declare only the parameters they need via type annotations, and the framework injects them automatically.

Context Injection

The framework uses signature-based injection — device handlers declare only the parameters they need via type annotations, and the framework provides them automatically.

Injected into @app.command, @app.device, and @app.telemetry functions when they declare a ctx: DeviceContext parameter. Provides device-scoped MQTT publishing, command registration, shutdown-aware sleep, and adapter resolution.

For @app.command handlers, dependencies are injected by type annotation:

@app.command("relay")
async def handle_relay(
    topic: str, payload: str, ctx: cosalette.DeviceContext
) -> dict[str, object]:
    gpio = ctx.adapter(GpioPort)  # resolve adapter
    return {"on": gpio.read()}

For @app.device functions, the context is the sole parameter:

@app.device("relay")
async def relay(ctx: cosalette.DeviceContext) -> None:
    gpio = ctx.adapter(GpioPort)  # resolve adapter
    while not ctx.shutdown_requested:
        await ctx.publish_state({"on": gpio.read()})
        await ctx.sleep(5)

Handlers that don't need context can omit all parameters. This is common for simple telemetry devices:

@app.telemetry("temp", interval=60)
async def temp() -> dict[str, object]:
    return {"celsius": await read_sensor()}

Handlers can request specific injectable types — Settings, logging.Logger, ClockPort, asyncio.Event, or adapter port types:

@app.telemetry("temp", interval=60)
async def temp(settings: Settings) -> dict[str, object]:
    # Only settings injected — no DeviceContext needed
    return {"celsius": 21.5, "unit": settings.unit}

Injected into the lifespan function. Provides settings and adapter resolution but not device-scoped features (no publish, no sleep, no on_command).

@asynccontextmanager
async def lifespan(ctx: cosalette.AppContext) -> AsyncIterator[None]:
    db = ctx.adapter(DatabasePort)
    await db.warm_cache()
    yield
    await db.close()

Four-Phase Orchestration

The App._run_async() method orchestrates the full application lifecycle in four sequential phases:

Phase What happens
Bootstrap Load settings, configure logging, resolve adapters, run configure hooks, expand name specs, resolve intervals, connect MQTT
Wire Install signal handlers, publish availability, build contexts, wire router
Run Execute lifespan startup, launch device tasks, await shutdown_event.wait()
Teardown Execute lifespan teardown, cancel tasks, publish offline, disconnect MQTT

Each phase is detailed in the Application Lifecycle concept page.

Test Seams

_run_async() accepts four optional keyword arguments specifically designed as test seams — injection points that let tests bypass real infrastructure:

await app._run_async(
    settings=make_settings(),        # skip env/dotenv loading
    shutdown_event=asyncio.Event(),  # manual shutdown control
    mqtt=MockMqttClient(),           # in-memory MQTT double
    clock=FakeClock(),               # deterministic time
)

This design means integration tests run entirely in-process with no broker, no real clock, and no signal handlers. See Testing for the full testing strategy.

Design principle — Dependency Injection over Service Locator

The test-seam parameters follow constructor/method injection rather than a global service locator. Each dependency is explicit and visible at the call site, making tests self-documenting.

Why a Composition Root?

The composition root pattern (as described in ADR-001) solves several problems specific to IoT bridge daemons:

  1. Single wiring point — all adapters, devices, and lifespan are assembled in one place, making the dependency graph explicit.
  2. Testability — inject doubles at the root without modifying device code.
  3. Dry-run mode — swap adapter implementations globally by setting one flag.
  4. Discoverability — reading app.py reveals the full application topology.

See Also


Architecture Decision Records (ADRs)

All major architectural decisions are documented as Architecture Decision Records.

ADR Title Status Date
ADR-001 Framework Architecture Style Accepted 2026-02-14
ADR-002 MQTT Topic Conventions Accepted 2026-02-14
ADR-003 Configuration System Accepted 2026-02-14
ADR-004 Logging Strategy Accepted 2026-02-14
ADR-005 CLI Framework Accepted 2026-02-14
ADR-006 Hexagonal Architecture (Ports & Adapters) Accepted 2026-02-14
ADR-007 Testing Strategy Accepted 2026-02-14
ADR-008 Packaging and Distribution Accepted 2026-02-14
ADR-009 Python Version and Dependencies Accepted 2026-02-14
ADR-010 Device Archetypes Accepted 2026-02-14
ADR-011 Error Handling and Publishing Accepted 2026-02-14
ADR-012 Health and Availability Reporting Accepted 2026-02-14
ADR-013 Telemetry Publish Strategies Accepted 2026-02-22
ADR-014 Signal Filters Accepted 2026-02-22
ADR-015 Persistence Accepted 2026-02-25
ADR-016 Adapter Lifecycle Protocol Accepted 2026-02-26
ADR-017 SBOM Generation Accepted 2026-02-27
ADR-018 Telemetry Coalescing Groups Accepted 2026-03-03