Contract-First Route Design¶
Every @app.telemetry, @app.command, and @app.device registration is also
a contract declaration — a machine-readable description of what a device
produces and what it accepts. Adding contract metadata turns main.py into an
auditable, declarative interface document that humans and AI coding assistants
can inspect without reading implementation code.
The pattern is directly analogous to FastAPI's route decorators: just as
@app.get("/items", response_model=Item, summary="List items") declares both
the route and its schema, a cosalette registration declares both the MQTT topic
wiring and the data contract.
Declaring Contract Metadata¶
All three registration decorators accept optional contract fields:
| Parameter | Type | Applies to | Description |
|---|---|---|---|
summary |
str |
telemetry, command, device | Human-readable description |
state_model |
type |
telemetry, command | Pydantic model or dataclass for state |
payload_model |
type |
command, triggerable telemetry | Expected inbound payload type |
behavior |
list[str] |
telemetry, command, device | Ordered description of what the handler does |
effects |
list[str] |
telemetry, command, device | Side effects and mutations |
summary, behavior, and effects are introspection metadata — surfaced by
the manifest and MCP tools with no runtime effect. state_model and
payload_model are also introspection metadata for documentation and tooling,
but typed handler annotations and state_model now additionally participate in
runtime validation and serialization — see Typed Payloads and Returns below.
Typed Payloads and Returns¶
Runtime type contracts let the framework parse, validate, and serialize values
automatically — no manual json.loads / json.dumps in handlers.
Imports¶
These are also re-exported from the top-level cosalette package:
import cosalette
# cosalette.Depends, cosalette.Payload, cosalette.Topic, cosalette.Message
# cosalette.PayloadValidationError, cosalette.ReturnValidationError
Typed Command Handler¶
When a parameter is annotated with a Pydantic model, the framework parses the
MQTT payload JSON into that model before calling the handler. A non-None
return is serialized using the return annotation first, then state_model as
fallback; plain dict publishes as-is; primitive / list values are wrapped as
{"value": ...}.
from __future__ import annotations
from typing import Annotated
from pydantic import BaseModel
import cosalette
from cosalette.di import Depends
from cosalette.mqtt import Payload, Topic
class ValveCommand(BaseModel):
position: int # 0–100
class ValveState(BaseModel):
position: int
flow_lpm: float
def get_audit_logger() -> AuditLogger: # synchronous dependency
return AuditLogger()
@app.command(
"valve",
summary="Open/close irrigation valve",
state_model=ValveState,
)
async def handle_valve(
cmd: Annotated[ValveCommand, Payload()], # (1)!
full_topic: Annotated[str, Topic()], # (2)!
audit: Annotated[AuditLogger, Depends(get_audit_logger)], # (3)!
) -> ValveState: # (4)!
driver = ...
await driver.set_position(cmd.position)
audit.record(full_topic, cmd)
return ValveState(position=cmd.position, flow_lpm=await driver.read_flow())
Annotated[ValveCommand, Payload()]parses MQTT payload JSON intoValveCommand. A parameter namedpayloadwith model annotation also works withoutPayload().Annotated[str, Topic()]binds the full MQTT topic string.Depends(fn)injects the result of a synchronous factory — nested deps supported.- Returning
ValveStateis serialized via Pydantic TypeAdapter / JSON-mode serialization before publishing.
Raw escape hatch — when you need the plain string:
async def handle(payload: str) -> dict[str, object]: ... # by name → always raw
async def handle(cmd: Annotated[str, Payload(raw=True)]) -> ...: ... # explicit raw
Typed Triggerable Telemetry¶
A triggerable handler can declare Annotated[Model | None, Payload()] — the
payload is parsed on triggered runs; scheduled runs bind None when the type
is optional:
from __future__ import annotations
from typing import Annotated
from pydantic import BaseModel
from cosalette.mqtt import Payload
class RefreshCommand(BaseModel):
days: int = 7
@app.telemetry(
"climate",
interval=300,
triggerable=True,
summary="Temperature and humidity from I2C sensor",
state_model=SensorReading,
)
async def climate(
cmd: Annotated[RefreshCommand | None, Payload()], # None on scheduled runs
) -> SensorReading:
days = cmd.days if cmd is not None else 7
return SensorReading(celsius=read_temp(), humidity=read_rh())
Telemetry with Full Metadata¶
from pydantic import BaseModel
import cosalette
class SensorReading(BaseModel):
celsius: float
humidity: float
class RefreshCommand(BaseModel):
days: int = 7
@app.telemetry(
"climate",
interval=cosalette.setting_ref("poll_interval"),
triggerable=True,
summary="Temperature and humidity from the I2C sensor",
state_model=SensorReading,
payload_model=RefreshCommand, # accepted on /set when triggerable
behavior=["reads I2C bus", "applies PT1 low-pass filter"],
effects=["updates HA dashboard state"],
)
async def climate(ctx: cosalette.DeviceContext) -> dict[str, object]:
sensor = ctx.adapter(ClimatePort)
return {"celsius": sensor.read_temp(), "humidity": sensor.read_rh()}
Command with Full Metadata¶
from pydantic import BaseModel
class ValveCommand(BaseModel):
position: int # 0–100
class ValveState(BaseModel):
position: int
flow_lpm: float
@app.command(
"valve",
summary="Opens or closes the irrigation valve",
payload_model=ValveCommand,
state_model=ValveState,
behavior=["validates position range", "logs to audit trail"],
effects=["mutates valve position", "triggers flow sensor update"],
)
async def handle_valve(
payload: ValveCommand, ctx: cosalette.DeviceContext
) -> dict[str, object]:
driver = ctx.adapter(ValvePort)
await driver.set_position(payload.position)
return {"position": payload.position, "flow_lpm": await driver.read_flow()}
Device with Metadata¶
@app.device supports summary, behavior, and effects. It does not accept
state_model or payload_model because device handlers manage their own
publishing loop rather than returning a typed state snapshot.
@app.device(
"receiver",
summary="Read sensor frames from serial port and publish per-sensor state",
behavior=[
"opens serial port at startup",
"reads LaCrosse protocol frames in a loop",
"publishes per-sensor state through a sub-entity per discovered sensor",
],
effects=["publishes to {name}/{sensor_id}/state for each discovered sensor"],
)
async def receiver(ctx: cosalette.DeviceContext):
port = ctx.adapter(SerialPort)
async for frame in port.read_frames():
await ctx.sub_entity(frame.sensor_id).publish_state(frame.to_state())
yield
Inspectable Settings Bindings¶
Using a raw lambda for interval hides the setting name from the manifest:
# Opaque — manifest shows "<deferred>", tooling cannot resolve the field name
@app.telemetry("sensor", interval=lambda s: s.poll_interval)
async def sensor() -> dict[str, object]: ...
setting_ref("field_name") wraps the same callable but preserves the field
name so it appears in the manifest output:
# Inspectable — manifest shows interval: poll_interval (field name)
@app.telemetry("sensor", interval=cosalette.setting_ref("poll_interval"))
async def sensor() -> dict[str, object]: ...
setting_ref also works for enabled:
@app.telemetry(
"magnetometer",
interval=cosalette.setting_ref("poll_interval"),
enabled=cosalette.setting_ref("enable_magnetometer"),
)
async def magnetometer() -> dict[str, object]: ...
The SettingRef type is exported from cosalette — use it for type annotations
if you build tooling around the registry snapshot.
The Read/Write Split Pattern¶
A telemetry registration and a command registration can share the same device
name. They use different MQTT topic suffixes (/state vs /set), and the
framework creates a shared DeviceContext for both.
This is the canonical way to model a resource with separate read and write paths:
import cosalette
app = cosalette.App(name="gas2mqtt", version="1.0.0")
@app.telemetry(
"gas_counter",
interval=cosalette.setting_ref("poll_interval"),
triggerable=True,
summary="Current gas meter impulse count",
state_model=GasCounterState,
)
async def read_gas_counter(ctx: cosalette.DeviceContext) -> dict[str, object]:
"""Poll impulse count; also fires immediately on /set trigger."""
meter = ctx.adapter(GasMeterPort)
return {"impulses": meter.read_impulses()}
@app.command(
"gas_counter", # same name — distinct MQTT suffix
summary="Reset or adjust the impulse counter",
payload_model=GasCounterCommand,
state_model=GasCounterState,
behavior=["validates offset bounds", "writes to non-volatile storage"],
effects=["mutates persisted counter value"],
)
async def write_gas_counter(
payload: GasCounterCommand, ctx: cosalette.DeviceContext
) -> dict[str, object]:
"""Accept counter mutations — reset or offset adjustment."""
meter = ctx.adapter(GasMeterPort)
await meter.set_offset(payload.offset)
return {"impulses": meter.read_impulses()}
app.run()
Topic layout for this pair:
| Topic | Direction | Handler |
|---|---|---|
gas2mqtt/gas_counter/state |
outbound | telemetry publishes |
gas2mqtt/gas_counter/set |
inbound | command subscribes |
Triggerable vs. Read/Write Split¶
These are different patterns — do not conflate them:
| Pattern | What it does |
|---|---|
triggerable=True on @app.telemetry |
A message on /set re-fires the read handler immediately — the value returned is still produced by the telemetry function. No mutation. |
@app.telemetry + @app.command sharing a name |
The telemetry handler reads state; the command handler writes state. Different code paths, distinct contracts. |
Use triggerable=True when the client wants a fresh reading on demand.
Use the read/write split when the client wants to mutate the resource.
Viewing the Manifest¶
The cosalette manifest command prints the canonical AsyncAPI 3.0.0 contract
for an app without running it:
# JSON output — full AsyncAPI document
cosalette manifest myapp.main:app
# Human-readable table
cosalette manifest myapp.main:app --table
Both forms call app.asyncapi() under the hood. The JSON output is a complete
AsyncAPI 3.0.0 document with typed payload schemas, operations, and contract
metadata. Abbreviated example for a thermo2mqtt temperature/pressure sensor
with a read/write thermostat setpoint:
{
"asyncapi": "3.0.0",
"info": {
"title": "thermo2mqtt",
"version": "1.0.0",
"x-cosalette-contract-version": "1"
},
"channels": {
"temperatureState": {
"address": "thermo2mqtt/temperature/state",
"messages": {"message": {"payload": {"$ref": "#/components/schemas/TemperatureReading"}}},
"x-cosalette-archetype": "telemetry",
"x-cosalette-summary": "Current temperature and pressure readings"
},
"setpointCommand": {
"address": "thermo2mqtt/setpoint/set",
"messages": {"message": {"payload": {"$ref": "#/components/schemas/SetpointCommand"}}},
"x-cosalette-archetype": "command",
"x-cosalette-summary": "Update the target temperature setpoint"
}
},
"operations": { "..." : "..." },
"components": {
"schemas": {
"TemperatureReading": { "..." : "..." },
"SetpointCommand": { "..." : "..." }
}
}
}
Schema inference priority (explicit wins over annotated):
| Registration field | Wins over |
|---|---|
state_model= on decorator |
handler return-type annotation |
payload_model= on decorator |
Annotated[T, Payload()] / payload: T convention |
Module-level code runs
cosalette manifest imports the app module to resolve registrations.
Any code at module level (outside functions) runs at import time — the
same behaviour as cosalette_inspect_app in the MCP server.
MCP Integration¶
AI coding assistants that use the cosalette MCP server can call
cosalette_manifest to retrieve the same AsyncAPI document programmatically:
Both the CLI and MCP tool call app.asyncapi() — the output is identical.
Use it to answer questions like "what topics does this app subscribe to?" or
"what payload does the valve command expect?" without reading implementation code.
Typed Contracts with Router¶
All contract features work identically on Router — use typed payloads, typed returns,
summary, state_model, payload_model, behavior, and effects on router
operations:
from __future__ import annotations
from typing import Annotated
from pydantic import BaseModel
import cosalette
from cosalette.mqtt import Payload
class ValveCommand(BaseModel):
position: int # 0–100
class ValveState(BaseModel):
position: int
flow_lpm: float
router = cosalette.Router(prefix="valves", tags=["irrigation"])
@router.command(
"main",
summary="Control main irrigation valve",
payload_model=ValveCommand,
state_model=ValveState,
behavior=["validates position range 0–100", "logs to audit trail"],
effects=["mutates valve position", "triggers flow sensor update"],
)
async def handle_valve(
cmd: Annotated[ValveCommand, Payload()],
ctx: cosalette.DeviceContext,
) -> ValveState:
driver = ctx.adapter(ValvePort)
await driver.set_position(cmd.position)
return ValveState(
position=cmd.position,
flow_lpm=await driver.read_flow(),
)
import cosalette
from valves import router as valves_router
app = cosalette.App(name="home2mqtt", version="1.0.0")
app.include_router(valves_router)
The manifest output (app.asyncapi()) includes all contract metadata from router
operations, with topics prefixed correctly:
- Subscribe:
home2mqtt/valves/main/set - Publish:
home2mqtt/valves/main/state
See Router Composition for multi-module organization patterns.
See Also¶
- Router Composition — multi-module apps with typed contracts
- Telemetry Device — polling loops and publish strategies
- Command & Control Device —
@app.commandhandler patterns - Device Archetypes — choosing the right decorator
- MCP Server — AI assistant integration