Persistence¶
Cosalette's persistence system lets devices save state across restarts — accumulated values, calibration offsets, last-known-good readings, or anything that shouldn't be lost on power failure.
The system has three layers:
| Layer | What it does | Set where |
|---|---|---|
| Store backend | Where bytes live (file, database, memory) | App(store=...) |
| DeviceStore | Per-device scoped dict-like interface | Injected into handlers |
| PersistPolicy | When to flush to disk | persist= decorator parameter |
Store Backends¶
A Store is a key-value storage backend. The framework ships four:
| Backend | Use case |
|---|---|
JsonFileStore(path) |
Production — single JSON file, atomic writes |
SqliteStore(path) |
Production — single SQLite file, WAL mode |
MemoryStore() |
Testing — in-memory dict |
NullStore() |
Opt-out — all operations are no-ops |
The Store protocol is simple:
class Store(Protocol):
def load(self, key: str) -> dict[str, object] | None: ...
def save(self, key: str, data: dict[str, object]) -> None: ...
You can implement your own backend (Redis, S3, etc.) by satisfying this protocol.
JsonFileStore¶
Stores all keys as top-level entries in a single JSON file. Uses atomic writes (write to temp file, then rename) to prevent corruption.
store = JsonFileStore("./data/state.json")
# All device keys stored in one file: {"sensor": {...}, "counter": {...}}
SqliteStore¶
Stores all keys in a single SQLite database with WAL mode enabled for concurrent read access.
Store Factories¶
When the store path depends on runtime settings, pass a callable factory instead of a concrete instance:
def make_store(settings: Gas2MqttSettings) -> Store:
return JsonFileStore(settings.data_dir / "state.json")
app = cosalette.App(
name="gas2mqtt",
version="1.0.0",
settings_class=Gas2MqttSettings,
store=make_store,
)
The factory is called during bootstrap — after settings and adapters are resolved but before any device handlers run. Parameters are injected via the DI system (every parameter must carry a type annotation), so the factory can request settings, adapters, or both:
def make_store(settings: Gas2MqttSettings) -> Store:
return SqliteStore(settings.db_path)
app = cosalette.App(name="gas2mqtt", store=make_store)
When to use a factory
Use a concrete Store when the path is known at import time.
Use a factory when the path comes from settings or environment variables
that are resolved at startup.
DeviceStore¶
DeviceStore is a per-device scoped wrapper around a Store backend.
It provides a familiar dict-like interface:
@app.telemetry("sensor", interval=60)
async def sensor(store: DeviceStore) -> dict[str, object]:
# Dict-like access
store["count"] = store.get("count", 0) + 1
store.setdefault("offset", 0.0)
# Check what's stored
all_data = store.to_dict()
return {"count": store["count"]}
The framework automatically:
- Creates a
DeviceStorescoped to the device name - Loads existing data before the first handler call
- Injects it via the DI system (declare
store: DeviceStore) - Saves on shutdown (safety net, regardless of policy)
Dirty Tracking¶
DeviceStore tracks whether it has been modified since the last save.
This enables the SaveOnChange policy to avoid unnecessary I/O:
store["value"] = 42 # store.dirty → True
store.save() # store.dirty → False
store.mark_dirty() # Force dirty (e.g., after mutating a nested object)
Save Policies (PersistPolicy)¶
A PersistPolicy controls when the store is saved during the
telemetry loop. Three policies ship with the framework:
SaveOnPublish¶
Save after each successful MQTT publish. The most common choice — persisted state always matches what's been broadcast.
@app.telemetry("sensor", interval=60, persist=SaveOnPublish())
async def sensor(store: DeviceStore) -> dict[str, object]:
store["count"] = store.get("count", 0) + 1
return {"count": store["count"]}
SaveOnChange¶
Save whenever the store has been modified, regardless of whether MQTT publishing occurred. Most aggressive — minimises data loss.
@app.telemetry("sensor", interval=60, persist=SaveOnChange())
async def sensor(store: DeviceStore) -> dict[str, object]:
store["count"] = store.get("count", 0) + 1
return {"count": store["count"]}
SaveOnShutdown¶
Save only on graceful shutdown. Lightest I/O — no saves during normal operation. Risk: data loss on hard crash or power loss.
Crash risk
SaveOnShutdown means all data since the last startup is lost
if the process crashes or loses power. Use only when the data
can be re-derived.
Default Behaviour¶
If you set store= on the App but don't specify persist= on a device,
the framework saves only on shutdown (equivalent to SaveOnShutdown()).
The framework always saves on shutdown regardless of policy — the
persist= parameter only controls additional saves during operation.
Composing Policies¶
Policies compose with | (OR) and & (AND), just like publish strategies:
# Save on publish OR when dirty (maximum safety)
persist = SaveOnPublish() | SaveOnChange()
# Save only when BOTH conditions are true
persist = SaveOnPublish() & SaveOnChange()
| creates an AnySavePolicy (save if any child says yes).
& creates an AllSavePolicy (save only if all children agree).
When to Use Which Policy¶
| Policy | I/O frequency | Data safety | Best for |
|---|---|---|---|
SaveOnPublish() |
Medium | Good | Most telemetry devices |
SaveOnChange() |
High | Best | Critical counters, calibration |
SaveOnShutdown() |
Minimal | Low | Derived/re-calculable data |
SaveOnPublish() \| SaveOnChange() |
High | Best | Belt-and-suspenders |
Testing with MemoryStore¶
Use MemoryStore in tests to avoid filesystem access:
from cosalette import MemoryStore, DeviceStore
from cosalette.testing import AppHarness
async def test_sensor_persists_count():
backend = MemoryStore()
harness = AppHarness.create(store=backend)
@harness.app.telemetry("sensor", interval=10)
async def sensor(store: DeviceStore) -> dict[str, object]:
store["count"] = store.get("count", 0) + 1
return {"count": store["count"]}
await harness.run()
assert backend.load("sensor") == {"count": 1}
You can also pre-seed the store to test load behaviour:
backend = MemoryStore()
backend.save("sensor", {"count": 99})
# Handler will see store["count"] == 99 on first call
Persistence and Device Handlers¶
The persist= parameter is only available on @app.telemetry, because
the framework controls the telemetry loop and knows when publishes occur.
For @app.device handlers (which own their loop), inject DeviceStore
and call store.save() manually when appropriate:
@app.device("controller")
async def controller(ctx: DeviceContext, store: DeviceStore):
while not ctx.shutdown_requested:
# ... do work ...
store["last_run"] = ctx.clock.now()
store.save() # Manual save
yield # reaction boundary
await ctx.sleep(60)
The framework still saves on shutdown via the finally block.
See Also¶
- Publish Strategies — the
publish=parameter thatpersist=mirrors - Signal Filters — another composable utility
- Testing Guide — testing with
MemoryStore - ADR-015: Persistence — architectural decision record
- ADR-037: Lazy Store Resolution — callable store factories