Skip to content

Application Lifecycle

The cosalette lifecycle is a four-phase orchestration that transforms a collection of decorated functions into a running MQTT daemon. Each phase has clear responsibilities and well-defined boundaries.

Phase Overview

sequenceDiagram
    participant CLI as CLI (Typer)
    participant App
    participant MQTT as MQTT Client
    participant Devices
    participant Health as HealthReporter

    Note over CLI,Health: Phase 1 — Bootstrap
    CLI->>App: parse args, load Settings
    App->>App: configure_logging()
    App->>App: resolve_adapters()
    App->>App: run on_configure hooks
    App->>App: expand name specs (dict/list → concrete)
    App->>App: resolve intervals
    App->>MQTT: MqttClient(settings, will=LWT)
    App->>Health: HealthReporter(mqtt, clock)
    App->>MQTT: mqtt.start()
    App->>App: enter lifecycle adapters (AsyncExitStack)

    Note over CLI,Health: Phase 2 — Wire
    App->>App: install signal handlers (SIGTERM, SIGINT)
    App->>Health: publish_device_available() × N
    App->>App: build DeviceContexts
    App->>App: wire TopicRouter (@app.device + @app.command)
    App->>MQTT: subscribe to {prefix}/{device}/set × N
    App->>MQTT: on_message(router.route)

    Note over CLI,Health: Phase 3 — Run
    App->>App: enter lifespan (startup)
    App->>App: startup health checks (HealthCheckable adapters)
    App->>Devices: create_task() × N
    App->>App: create_task(health_check_loop + auto-restart)
    App->>App: await shutdown_event.wait()

    Note over CLI,Health: Phase 4 — Teardown
    App->>Devices: cancel device tasks
    App->>App: cancel health check task
    App->>App: exit lifespan (shutdown)
    App->>App: exit lifecycle adapters (LIFO)
    App->>Health: shutdown() → publish offline × N
    App->>MQTT: mqtt.stop()

Phase 1 — Bootstrap

Bootstrap prepares all infrastructure before any device code runs:

  1. SettingsSettings instance is created from environment variables, .env file, and CLI overrides (see Configuration)
  2. Loggingconfigure_logging() clears existing handlers, installs JSON or text formatter on stderr (+ optional rotating file handler)
  3. Adapters_resolve_adapters() instantiates all registered adapters, choosing dry-run variants when --dry-run is active
  4. Configure hooks@app.on_configure hooks run in registration order, with access to resolved settings and adapters via dependency injection
  5. Name expansion — dict-name and list-name callables are evaluated, expanding name=lambda s: {...} into concrete device registrations
  6. Interval resolution — callable interval= values are resolved to concrete floats using per-device config
  7. ClockSystemClock() (or injected FakeClock in tests)
  8. MQTT clientMqttClient(settings.mqtt, will=build_will_config(prefix)) with the LWT pre-configured for crash detection
  9. ServicesHealthReporter and ErrorPublisher are created with references to the MQTT port and clock
  10. Connectmqtt.start() begins the background connection loop
  11. Adapter lifecycle — adapters implementing __aenter__/__aexit__ are entered via AsyncExitStack (see ADR-016). Non-lifecycle adapters pass through unchanged
# Simplified bootstrap (see _app.py and _wiring.py for full signatures)
resolved_settings = settings or self._settings_class()
configure_logging(resolved_settings.logging, service=self._name, version=self._version)

resolved_adapters = self._resolve_adapters(resolved_settings)
resolved_clock = clock or SystemClock()

prefix = resolved_settings.mqtt.prefix or self._name
mqtt = _wiring.create_mqtt(mqtt, resolved_settings, prefix, self._name)
health_reporter, error_publisher = _wiring.create_services(mqtt, prefix, self._version, resolved_clock)

if isinstance(mqtt, MqttLifecycle):
    await mqtt.start()

LWT registration

The Last Will and Testament is set during MQTT connection, not after. build_will_config() creates a WillConfig that the MqttClient passes to aiomqtt's Client constructor. If the client crashes, the broker publishes "offline" to {prefix}/status automatically.

Phase 2 — Wire

The Wire phase connects the device graph to the running infrastructure:

  1. Signal handlersSIGTERM and SIGINT both call shutdown_event.set(). This handles both Ctrl+C during development and docker stop in production.
  2. Device availabilitypublish_device_available() sends "online" to {prefix}/{device}/availability for every registered device
  3. DeviceContexts — one DeviceContext per device, pre-configured with the device name, MQTT port, settings, adapters, clock, and shutdown event
  4. TopicRouter — command handler proxies are registered for each @app.device function (via @ctx.on_command) and each @app.command handler; the router maps {prefix}/{device}/set to handlers
  5. Subscriptions — MQTT subscriptions for all command topics
  6. Message wiringmqtt.on_message(router.route) connects inbound messages to the router

Phase 3 — Run

The run phase is where device code executes:

  1. AppContext — created with settings and resolved adapters
  2. Enter lifespan — the lifespan context manager's startup code runs (everything before yield), receiving the AppContext
  3. Startup health checks — adapters implementing HealthCheckable receive a single probe. Failed adapters start with their devices marked "offline", but device tasks still launch (health checks are informational, not blocking). See Adapter Health Checks
  4. Device tasks — each device becomes an asyncio.Task:
  5. @app.device → runs the coroutine directly
  6. @app.telemetryTelemetryRunner polling loop (with ctx.sleep)
  7. @app.commandno task created — handlers are dispatched per-message by the TopicRouter, not as long-running tasks
  8. Health check task — a single asyncio.Task runs HealthCheckRunner, probing all HealthCheckable adapters every health_check_interval seconds. When restart_after_failures > 0, the runner also triggers auto-restart for adapters that exceed the failure threshold. Set health_check_interval=None on App() to disable health checks entirely.
  9. Blockawait shutdown_event.wait() suspends the orchestrator until a shutdown signal arrives
# Phase 3 internals — simplified (see _wiring.py for full signatures)
app_context = AppContext(settings=resolved_settings, adapters=resolved_adapters)
async with lifespan(app_context):
    await health_check_runner.run_startup_checks()
    device_tasks = start_device_tasks(...)  # devices, telemetry, contexts, etc.
    health_check_task = start_health_check_task(health_check_runner)
    await shutdown_event.wait()

Lifespan Execution Windows

Phase Runs after Runs before
Adapter lifecycle MQTT connected Lifespan enter
Lifespan enter Adapter lifecycle + subscribed Startup health checks
Startup health checks Lifespan enter Device tasks started
Lifespan exit Device + health tasks cancelled Adapter lifecycle exit
Adapter cleanup Lifespan exit MQTT disconnected

This ordering is intentional: lifecycle adapters are entered before the lifespan so startup code can use already-initialised adapters. Adapter cleanup runs after lifespan teardown so shutdown code can still access adapter resources.

Restartable adapters

Adapters eligible for auto-restart are managed outside the AsyncExitStack so they can be individually exited and re-entered during Phase 3. Non-restartable lifecycle adapters remain in the stack for conventional LIFO cleanup.

Error Handling in Lifespan

If the lifespan's startup code (before yield) raises an exception, the application aborts — no device tasks are started. If the shutdown code (after yield) raises, the exception is logged and shutdown continues.

@asynccontextmanager
async def lifespan(ctx: cosalette.AppContext) -> AsyncIterator[None]:
    # Startup — critical failure aborts the app
    meter = ctx.adapter(GasMeterPort)
    meter.connect(ctx.settings.serial_port)
    yield
    # Shutdown — errors are logged but don’t prevent MQTT disconnect
    meter.close()

try/finally for guaranteed cleanup

Wrap yield in try/finally when managing multiple resources to ensure all cleanup runs, even if device shutdown raises.

Phase 4 — Teardown

Teardown runs in reverse order to bootstrap:

  1. Cancel device tasks — all device asyncio.Tasks are cancelled; asyncio.gather waits for graceful completion
  2. Cancel health check task — the HealthCheckRunner task is cancelled, so no more probes fire during the rest of teardown
  3. Exit lifespan — the lifespan context manager's shutdown code runs (everything after yield)
  4. Exit lifecycle adaptersAsyncExitStack exits all lifecycle adapters in LIFO order; if an adapter __aexit__ raises, the exception propagates after all exits complete
  5. Health offlineHealthReporter.shutdown() publishes "offline" to each device's availability topic and to {prefix}/status
  6. MQTT disconnectmqtt.stop() cancels the connection loop
# Phase 4 internals (simplified)
# Lifecycle adapters wrap the entire lifespan + device phase.
# Health/MQTT cleanup is in `finally` so it always runs, even
# when an adapter __aenter__/__aexit__ raises.
#
# try:
#     async with adapter_stack:
#         async with self._lifespan(app_context):
#             ... devices run ...
# finally:
#     health_reporter.shutdown()
#     mqtt.stop()

await cancel_tasks(device_tasks)
if health_check_task is not None:
    health_check_task.cancel()
    with contextlib.suppress(asyncio.CancelledError):
        await health_check_task
# lifespan __aexit__ runs here (code after yield)
# adapter_stack __aexit__ runs here (LIFO adapter cleanup)
# finally block always runs:
await health_reporter.shutdown()

if isinstance(mqtt, MqttLifecycle):
    await mqtt.stop()

Signal Handling

The framework installs handlers for both SIGTERM and SIGINT:

loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
    loop.add_signal_handler(sig, event.set)
Signal Source Effect
SIGINT Ctrl+C in terminal Sets shutdown_event
SIGTERM docker stop, systemd Sets shutdown_event

Both signals trigger the same graceful shutdown path. No special handling is needed for different deployment environments.

Graceful Shutdown Pattern

Device code cooperates with shutdown via the ctx.shutdown_requested + ctx.sleep() pattern:

@app.device("sensor")
async def sensor(ctx: cosalette.DeviceContext) -> None:
    while not ctx.shutdown_requested:
        data = await read_sensor()
        await ctx.publish_state(data)
        await ctx.sleep(30)  # returns early on shutdown
    # cleanup code runs here

ctx.sleep() internally races an asyncio.sleep against the shutdown event. When shutdown is signalled, sleep returns immediately (without raising), and the while loop exits naturally.

CLI Orchestration

The full path from command line to async lifecycle:

$ myapp --log-level DEBUG
    ├── Typer parses CLI flags
    ├── loads Settings from env + .env
    ├── applies CLI overrides (--log-level, --log-format)
    └── asyncio.run(app._run_async(settings=...))
            ├── Phase 1: Bootstrap
            ├── Phase 2: Wire
            ├── Phase 3: Run (enter lifespan → devices → exit lifespan)
            └── Phase 4: Teardown

See Also