Health & Availability¶
Cosalette provides three levels of health reporting — app-level heartbeats, per-device availability, and adapter health checks — backed by MQTT's Last Will and Testament (LWT) for crash detection.
Three Levels of Health¶
graph TB
subgraph "App Level"
A["LWT: broker publishes 'offline' on crash"]
B["Heartbeat: JSON with uptime + device map"]
end
subgraph "Device Level"
C["availability: 'online' / 'offline' per device"]
end
subgraph "Adapter Level"
E["HealthCheckable: periodic readiness probe"]
end
A --> S["{prefix}/status"]
B --> S
C --> D["{prefix}/{device}/availability"]
E -->|"failure toggles"| C
| Level | Topic | Payload | Retained | Purpose |
|---|---|---|---|---|
| App | {prefix}/status |
JSON heartbeat | Yes | Uptime, version, fleet monitoring |
| App LWT | {prefix}/status |
"offline" |
Yes | Crash detection by broker |
| Device | {prefix}/{device}/availability |
"online" / "offline" |
Yes | Per-device Home Assistant integration |
| Adapter | (internal) | (toggles device availability) | — | Detect wedged adapters |
Last Will and Testament (LWT)¶
The LWT is an MQTT feature where the client tells the broker: "If I disconnect
unexpectedly, publish this message on my behalf." Cosalette uses it to
guarantee that a crash results in a visible "offline" status.
WillConfig¶
The build_will_config() function creates a WillConfig for the app's LWT:
from cosalette._health import build_will_config
will = build_will_config("velux2mqtt")
# → WillConfig(
# topic="velux2mqtt/status",
# payload="offline",
# qos=1,
# retain=True,
# )
This WillConfig is passed to MqttClient during construction and translated
to the broker-specific LWT format at connection time.
LWT is set at connection time
The LWT payload is registered during the MQTT CONNECT handshake. It cannot be changed after connection. This is an MQTT protocol constraint, not a cosalette limitation.
Crash Detection Flow¶
sequenceDiagram
participant App
participant Broker
participant Subscriber as Monitoring Tool
App->>Broker: CONNECT (will="offline" on velux2mqtt/status)
App->>Broker: PUBLISH velux2mqtt/status = {"status": "online", ...}
Note over App: App crashes / network lost
Broker->>Subscriber: PUBLISH velux2mqtt/status = "offline" (LWT)
Structured Heartbeat¶
The app heartbeat is a JSON payload published to {prefix}/status:
{
"status": "online",
"uptime_s": 3600.0,
"version": "0.3.0",
"devices": {
"blind": {"status": "ok"},
"temperature": {"status": "error"}
}
}
Device status values
Device status is "ok" when the device is functioning normally. For
telemetry devices, the framework automatically sets status to "error"
when a polling cycle raises an exception, and restores it to "ok"
when the device recovers. See Error Handling for
details on error deduplication.
HeartbeatPayload Fields¶
| Field | Type | Description |
|---|---|---|
status |
str |
Always "online" when published |
uptime_s |
float |
Seconds since app start (monotonic) |
version |
str |
App version string |
devices |
dict[str, DeviceStatus] |
Per-device status snapshot |
Uptime Uses Monotonic Clock¶
Uptime is measured via ClockPort (backed by time.monotonic() in production):
Why monotonic for uptime?
time.monotonic() is immune to NTP adjustments and manual clock changes
(PEP 418). If the system clock jumps backward or forward, the uptime
value remains accurate. This is distinct from error timestamps, which use
wall-clock time for operator correlation — see Error Handling.
Two Payload Formats on One Topic¶
The {prefix}/status topic carries two different payload formats:
Consumers can distinguish them by attempting JSON parse. The LWT is always a plain string; the heartbeat is always valid JSON.
Periodic heartbeat scheduling
Periodic heartbeats are built into the framework via the
heartbeat_interval parameter on App(). The HealthReporter
publishes heartbeat payloads automatically at the configured
interval.
Per-Device Availability¶
Each device gets its own availability topic, published automatically by the
HealthReporter:
| Event | Publishes | When |
|---|---|---|
| Device start | "online" to {prefix}/{device}/availability |
Phase 2 (Wire) |
| Graceful shutdown | "offline" to {prefix}/{device}/availability |
Phase 4 (Teardown) |
# Published automatically by the framework
await health_reporter.publish_device_available("blind")
# → "online" to velux2mqtt/blind/availability (retained, QoS 1)
This is directly compatible with Home Assistant's MQTT availability configuration — no custom templates needed.
HealthReporter Service¶
The HealthReporter manages all health-related publications:
Key methods:
| Method | Purpose |
|---|---|
publish_device_available() |
Publish "online" + register device in tracker |
publish_device_unavailable() |
Publish "offline" + remove from tracker |
publish_heartbeat() |
Publish structured JSON heartbeat |
set_device_status() |
Update a device's status in the internal tracker |
shutdown() |
Publish "offline" for all devices + app status |
Fire-and-Forget Publishing¶
All health publications are wrapped in _safe_publish():
async def _safe_publish(self, topic, payload, *, retain=True):
try:
await self.mqtt.publish(topic, payload, retain=retain, qos=1)
except Exception:
logger.exception("Failed to publish health to %s", topic)
Health reporting must never crash the application. A broker outage means health data is temporarily lost, but service continues.
Graceful Shutdown Sequence¶
During Phase 4 teardown, the HealthReporter.shutdown() method publishes
offline status for everything:
async def shutdown(self):
for device in list(self._devices):
await self._safe_publish(f"{prefix}/{device}/availability", "offline")
await self._safe_publish(f"{prefix}/status", "offline")
self._devices.clear()
This ensures that a clean shutdown results in the same "offline" state
as a crash (via LWT). Subscribers see a consistent state regardless of
how the application stopped.
Fleet Monitoring¶
Subscribe to wildcard topics to monitor multiple bridges:
# All app statuses across the fleet
mosquitto_sub -t '+/status' -v
# All device availability for one app
mosquitto_sub -t 'velux2mqtt/+/availability' -v
The retained nature of status and availability topics means new subscribers immediately receive the current state of every app and device.
Adapter Health Checks¶
The LWT and heartbeat mechanisms detect crashes — the app process dies and the
broker publishes "offline". But what about adapters that are running but broken?
A BLE adapter can enter a wedged state (BlueZ daemon crash, hardware reset) where
connections fail indefinitely, yet the process stays alive.
Adapter health checks (ADR-028) address this gap. Adapters implement the
HealthCheckable protocol, and the framework probes them periodically.
Implementing HealthCheckable¶
Add a single async method to any adapter:
class BleAdapter:
"""BLE adapter with health check support."""
async def connect(self) -> None: ...
async def read(self, mac: str) -> Reading: ...
async def health_check(self) -> bool:
"""Return True if BLE stack is responsive."""
scanner = BleakScanner()
await scanner.discover(timeout=5)
return True
# Exceptions propagate to HealthCheckRunner, which treats them as failure
HealthCheckable is a @runtime_checkable Protocol — the framework detects it
via isinstance() after adapter lifecycle entry. No registration or configuration
needed.
How It Works¶
sequenceDiagram
participant Runner as HealthCheckRunner
participant Adapter
participant Reporter as HealthReporter
participant MQTT
Note over Runner: Startup (before device tasks)
Runner->>Adapter: health_check()
Adapter-->>Runner: True
Note over Runner: Adapter healthy, devices stay online
Note over Runner: Periodic loop (every 30s)
Runner->>Adapter: health_check()
Adapter-->>Runner: False (or timeout/exception)
Runner->>Reporter: publish_device_unavailable("sensor")
Reporter->>MQTT: "offline" → sensor/availability
Note over Runner: Next check
Runner->>Adapter: health_check()
Adapter-->>Runner: True
Runner->>Reporter: publish_device_available("sensor")
Reporter->>MQTT: "online" → sensor/availability
The lifecycle in detail:
- Startup check — one probe per adapter before device tasks launch.
Failed adapters start with their devices marked
"offline", but device tasks still start (health checks are informational, not blocking). - Periodic loop — probes every
health_check_intervalseconds (default 30, configurable onApp(),Noneto disable entirely). - Timeout — each probe has a timeout of
interval / 2. A hanginghealth_check()is treated as failure without blocking other adapters. - Availability toggle — on failure, all devices that depend on the
adapter are set to
"offline"; on recovery, they return to"online". - Telemetry continues — health checks are informational. Telemetry polling continues even when the adapter is marked unhealthy.
Adapter-to-Device Mapping¶
The framework automatically maps adapters to their dependent devices using
DI introspection. When adapter X fails a health check, only devices that
inject X via their type annotations go offline — other devices are
unaffected.
app = App("myapp", health_check_interval=30.0)
app.adapter(BlePort, BleAdapter) # implements HealthCheckable
@app.telemetry("temperature", interval=60)
async def temperature(ctx: DeviceContext, ble: BlePort) -> dict[str, object]:
# If BleAdapter.health_check() fails, "temperature" goes offline
return await ble.read("AA:BB:CC:DD:EE:FF")
@app.telemetry("cpu_temp", interval=60)
async def cpu_temp(ctx: DeviceContext) -> dict[str, object]:
# No BlePort dependency — unaffected by BLE health check failures
return {"celsius": read_cpu_temp()}
Failure Counting¶
Each adapter's health state is tracked via AdapterHealthStatus:
| Field | Type | Description |
|---|---|---|
healthy |
bool |
Current health state |
consecutive_failures |
int |
Failures since last success (resets to 0 on recovery) |
last_check |
float |
Monotonic timestamp of last probe |
restart_count |
int |
Number of restarts performed for this adapter |
restart_exhausted |
bool |
True when restart_count reaches max_restarts |
last_restart |
float |
Monotonic timestamp of last restart attempt |
last_healthy_since |
float |
Monotonic timestamp of sustained health start |
The consecutive_failures counter drives the auto-restart
threshold — when it reaches restart_after_failures, a restart is triggered.
Log Deduplication¶
Health check logging follows the same deduplication pattern as error handling:
| Event | Level | Example |
|---|---|---|
| First failure | WARNING | Adapter BleAdapter health check failed |
| Consecutive failure | DEBUG | Adapter BleAdapter health check failed (consecutive: 3) |
| Recovery | INFO | Adapter BleAdapter health check recovered after 3 failures |
This prevents log flooding during sustained adapter outages while still providing clear visibility into failure onset and recovery.
Complementary to crash detection
Health checks and LWT serve different failure modes:
- LWT — the process dies (crash, OOM kill, network partition)
- Health checks — the process is alive but an adapter is wedged
Together, they provide full coverage of both infrastructure and hardware failures.
Auto-Restart¶
When an adapter fails health checks repeatedly, the framework can automatically restart it — exit its async context manager, wait a cooldown, re-enter, and recreate device tasks. This handles transient hardware wedges (BLE daemon crash, serial port reset) without operator intervention.
Configuration¶
Auto-restart is controlled by four parameters on App():
| Parameter | Default | Description |
|---|---|---|
restart_after_failures |
5 |
Consecutive failures before triggering restart. 0 disables. |
max_restarts |
3 |
Maximum restarts per adapter before giving up |
restart_cooldown |
5.0 |
Seconds to wait between exit and re-entry |
sustained_health_reset |
300.0 |
Seconds of sustained health to reset restart counter |
app = App(
"myapp",
health_check_interval=30.0,
restart_after_failures=5, # restart after 5 consecutive failures
max_restarts=3, # give up after 3 restarts
restart_cooldown=5.0, # 5s between exit and re-enter
sustained_health_reset=300.0, # 5 min healthy resets counter
)
Restart Sequence¶
When consecutive failures reach the threshold:
sequenceDiagram
participant Runner as HealthCheckRunner
participant Wiring as _on_restart callback
participant Adapter
participant Devices as Device Tasks
Runner->>Runner: consecutive_failures >= restart_after_failures
Runner->>Wiring: on_restart_needed(adapter_type, adapter)
Wiring->>Devices: cancel_tasks_for_adapter()
Wiring->>Adapter: __aexit__() (best-effort)
Note over Adapter: cooldown period (restart_cooldown)
Wiring->>Adapter: __aenter__()
Wiring->>Adapter: health_check() (verification)
Wiring->>Devices: start_device_tasks_for_names()
Wiring-->>Runner: True (success)
Runner->>Runner: reset consecutive_failures, increment restart_count
On restart failure (re-entry or post-restart health check fails), the adapter
is marked restart_exhausted and its devices stay offline permanently.
Opting Out¶
By default, all adapters with HealthCheckable + lifecycle (__aenter__/__aexit__)
are eligible for auto-restart. Set restartable = False on the adapter class to
opt out:
class CriticalAdapter:
"""Adapter that must not be restarted mid-session."""
restartable = False
async def __aenter__(self) -> Self: ...
async def __aexit__(self, *exc: object) -> None: ...
async def health_check(self) -> bool: ...
Sustained Health Reset¶
If an adapter stays healthy for sustained_health_reset seconds (default 5 min),
its restart_count resets to zero. This allows adapters that experience rare
transient failures to get a fresh restart budget without accumulating toward
max_restarts.
See Also¶
- MQTT Topics — complete topic map and retention rules
- Error Handling — structured error events (complementary to health)
- Lifecycle — when availability is published (Phases 2 and 4)
- Hexagonal Architecture — ClockPort for monotonic uptime
- ADR-012 — Health and Availability Reporting
- ADR-028 — Adapter Health Check Protocol
- ADR-029 — Adapter Auto-Restart Strategy