Application Lifecycle¶
The cosalette lifecycle is a four-phase orchestration that transforms a collection of decorated functions into a running MQTT daemon. Each phase has clear responsibilities and well-defined boundaries.
Phase Overview¶
sequenceDiagram
participant CLI as CLI (Typer)
participant App
participant MQTT as MQTT Client
participant Devices
participant Health as HealthReporter
Note over CLI,Health: Phase 1 — Bootstrap
CLI->>App: parse args, load Settings
App->>App: configure_logging()
App->>App: resolve_adapters()
App->>App: run on_configure hooks
App->>App: expand name specs (dict/list → concrete)
App->>App: resolve intervals
App->>MQTT: MqttClient(settings, will=LWT)
App->>Health: HealthReporter(mqtt, clock)
App->>MQTT: mqtt.start()
App->>App: enter lifecycle adapters (AsyncExitStack)
Note over CLI,Health: Phase 2 — Wire
App->>App: install signal handlers (SIGTERM, SIGINT)
App->>Health: publish_device_available() × N
App->>App: build DeviceContexts
App->>App: wire TopicRouter (@app.device + @app.command)
App->>MQTT: subscribe to {prefix}/{device}/set × N
App->>MQTT: on_message(router.route)
Note over CLI,Health: Phase 3 — Run
App->>App: enter lifespan (startup)
App->>App: startup health checks (HealthCheckable adapters)
App->>Devices: create_task() × N
App->>App: create_task(health_check_loop + auto-restart)
App->>App: await shutdown_event.wait()
Note over CLI,Health: Phase 4 — Teardown
App->>Devices: cancel device tasks
App->>App: cancel health check task
App->>App: exit lifespan (shutdown)
App->>App: exit lifecycle adapters (LIFO)
App->>Health: shutdown() → publish offline × N
App->>MQTT: mqtt.stop()
Phase 1 — Bootstrap¶
Bootstrap prepares all infrastructure before any device code runs:
- Settings —
Settingsinstance is created from environment variables,.envfile, and CLI overrides (see Configuration) - Logging —
configure_logging()clears existing handlers, installs JSON or text formatter on stderr (+ optional rotating file handler) - Adapters —
_resolve_adapters()instantiates all registered adapters, choosing dry-run variants when--dry-runis active - Configure hooks —
@app.on_configurehooks run in registration order, with access to resolved settings and adapters via dependency injection - Name expansion — dict-name and list-name callables are evaluated,
expanding
name=lambda s: {...}into concrete device registrations - Interval resolution — callable
interval=values are resolved to concrete floats using per-device config - Clock —
SystemClock()(or injectedFakeClockin tests) - MQTT client —
MqttClient(settings.mqtt, will=build_will_config(prefix))with the LWT pre-configured for crash detection - Services —
HealthReporterandErrorPublisherare created with references to the MQTT port and clock - Connect —
mqtt.start()begins the background connection loop - Adapter lifecycle — adapters implementing
__aenter__/__aexit__are entered viaAsyncExitStack(see ADR-016). Non-lifecycle adapters pass through unchanged
# Simplified bootstrap (see _app.py and _wiring.py for full signatures)
resolved_settings = settings or self._settings_class()
configure_logging(resolved_settings.logging, service=self._name, version=self._version)
resolved_adapters = self._resolve_adapters(resolved_settings)
resolved_clock = clock or SystemClock()
prefix = resolved_settings.mqtt.prefix or self._name
mqtt = _wiring.create_mqtt(mqtt, resolved_settings, prefix, self._name)
health_reporter, error_publisher = _wiring.create_services(mqtt, prefix, self._version, resolved_clock)
if isinstance(mqtt, MqttLifecycle):
await mqtt.start()
LWT registration
The Last Will and Testament is set during MQTT connection, not after.
build_will_config() creates a WillConfig that the MqttClient passes
to aiomqtt's Client constructor. If the client crashes, the broker
publishes "offline" to {prefix}/status automatically.
Phase 2 — Wire¶
The Wire phase connects the device graph to the running infrastructure:
- Signal handlers —
SIGTERMandSIGINTboth callshutdown_event.set(). This handles bothCtrl+Cduring development anddocker stopin production. - Device availability —
publish_device_available()sends"online"to{prefix}/{device}/availabilityfor every registered device - DeviceContexts — one
DeviceContextper device, pre-configured with the device name, MQTT port, settings, adapters, clock, and shutdown event - TopicRouter — command handler proxies are registered for each
@app.devicefunction (via@ctx.on_command) and each@app.commandhandler; the router maps{prefix}/{device}/setto handlers - Subscriptions — MQTT subscriptions for all command topics
- Message wiring —
mqtt.on_message(router.route)connects inbound messages to the router
Phase 3 — Run¶
The run phase is where device code executes:
- AppContext — created with settings and resolved adapters
- Enter lifespan — the lifespan context manager's startup code runs
(everything before
yield), receiving theAppContext - Startup health checks — adapters implementing
HealthCheckablereceive a single probe. Failed adapters start with their devices marked"offline", but device tasks still launch (health checks are informational, not blocking). See Adapter Health Checks - Device tasks — each device becomes an
asyncio.Task: @app.device→ runs the coroutine directly@app.telemetry→TelemetryRunnerpolling loop (withctx.sleep)@app.command→ no task created — handlers are dispatched per-message by theTopicRouter, not as long-running tasks- Health check task — a single
asyncio.TaskrunsHealthCheckRunner, probing allHealthCheckableadapters everyhealth_check_intervalseconds. Whenrestart_after_failures > 0, the runner also triggers auto-restart for adapters that exceed the failure threshold. Sethealth_check_interval=NoneonApp()to disable health checks entirely. - Block —
await shutdown_event.wait()suspends the orchestrator until a shutdown signal arrives
# Phase 3 internals — simplified (see _wiring.py for full signatures)
app_context = AppContext(settings=resolved_settings, adapters=resolved_adapters)
async with lifespan(app_context):
await health_check_runner.run_startup_checks()
device_tasks = start_device_tasks(...) # devices, telemetry, contexts, etc.
health_check_task = start_health_check_task(health_check_runner)
await shutdown_event.wait()
Lifespan Execution Windows¶
| Phase | Runs after | Runs before |
|---|---|---|
| Adapter lifecycle | MQTT connected | Lifespan enter |
| Lifespan enter | Adapter lifecycle + subscribed | Startup health checks |
| Startup health checks | Lifespan enter | Device tasks started |
| Lifespan exit | Device + health tasks cancelled | Adapter lifecycle exit |
| Adapter cleanup | Lifespan exit | MQTT disconnected |
This ordering is intentional: lifecycle adapters are entered before the lifespan so startup code can use already-initialised adapters. Adapter cleanup runs after lifespan teardown so shutdown code can still access adapter resources.
Restartable adapters
Adapters eligible for auto-restart
are managed outside the AsyncExitStack so they can be individually
exited and re-entered during Phase 3. Non-restartable lifecycle adapters
remain in the stack for conventional LIFO cleanup.
Error Handling in Lifespan¶
If the lifespan's startup code (before yield) raises an exception, the
application aborts — no device tasks are started. If the shutdown code
(after yield) raises, the exception is logged and shutdown continues.
@asynccontextmanager
async def lifespan(ctx: cosalette.AppContext) -> AsyncIterator[None]:
# Startup — critical failure aborts the app
meter = ctx.adapter(GasMeterPort)
meter.connect(ctx.settings.serial_port)
yield
# Shutdown — errors are logged but don’t prevent MQTT disconnect
meter.close()
try/finally for guaranteed cleanup
Wrap yield in try/finally when managing multiple resources to ensure
all cleanup runs, even if device shutdown raises.
Phase 4 — Teardown¶
Teardown runs in reverse order to bootstrap:
- Cancel device tasks — all device
asyncio.Tasks are cancelled;asyncio.gatherwaits for graceful completion - Cancel health check task — the
HealthCheckRunnertask is cancelled, so no more probes fire during the rest of teardown - Exit lifespan — the lifespan context manager's shutdown code runs
(everything after
yield) - Exit lifecycle adapters —
AsyncExitStackexits all lifecycle adapters in LIFO order; if an adapter__aexit__raises, the exception propagates after all exits complete - Health offline —
HealthReporter.shutdown()publishes"offline"to each device's availability topic and to{prefix}/status - MQTT disconnect —
mqtt.stop()cancels the connection loop
# Phase 4 internals (simplified)
# Lifecycle adapters wrap the entire lifespan + device phase.
# Health/MQTT cleanup is in `finally` so it always runs, even
# when an adapter __aenter__/__aexit__ raises.
#
# try:
# async with adapter_stack:
# async with self._lifespan(app_context):
# ... devices run ...
# finally:
# health_reporter.shutdown()
# mqtt.stop()
await cancel_tasks(device_tasks)
if health_check_task is not None:
health_check_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await health_check_task
# lifespan __aexit__ runs here (code after yield)
# adapter_stack __aexit__ runs here (LIFO adapter cleanup)
# finally block always runs:
await health_reporter.shutdown()
if isinstance(mqtt, MqttLifecycle):
await mqtt.stop()
Signal Handling¶
The framework installs handlers for both SIGTERM and SIGINT:
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, event.set)
| Signal | Source | Effect |
|---|---|---|
SIGINT |
Ctrl+C in terminal |
Sets shutdown_event |
SIGTERM |
docker stop, systemd |
Sets shutdown_event |
Both signals trigger the same graceful shutdown path. No special handling is needed for different deployment environments.
Graceful Shutdown Pattern¶
Device code cooperates with shutdown via the ctx.shutdown_requested +
ctx.sleep() pattern:
@app.device("sensor")
async def sensor(ctx: cosalette.DeviceContext) -> None:
while not ctx.shutdown_requested:
data = await read_sensor()
await ctx.publish_state(data)
await ctx.sleep(30) # returns early on shutdown
# cleanup code runs here
ctx.sleep() internally races an asyncio.sleep against the shutdown event.
When shutdown is signalled, sleep returns immediately (without raising), and
the while loop exits naturally.
CLI Orchestration¶
The full path from command line to async lifecycle:
$ myapp --log-level DEBUG
│
├── Typer parses CLI flags
├── loads Settings from env + .env
├── applies CLI overrides (--log-level, --log-format)
└── asyncio.run(app._run_async(settings=...))
│
├── Phase 1: Bootstrap
├── Phase 2: Wire
├── Phase 3: Run (enter lifespan → devices → exit lifespan)
└── Phase 4: Teardown
See Also¶
- Architecture — composition root and test seams
- Device Archetypes — how devices execute in Phase 3
- Health & Availability — availability publishing in Phases 2 and 4
- Error Handling — error isolation during device execution
- ADR-001 — Framework Architecture Style
- ADR-005 — CLI Framework
- ADR-028 — Adapter Health Check Protocol
- ADR-016 — Adapter Lifecycle Protocol