Streaming¶
Streaming is the push-to-pull bridge for hardware devices that deliver data via callbacks rather than waiting to be polled. Examples: BLE characteristic notifications, serial port events, HID input reports, USB bulk transfers.
Unlike @app.telemetry — which owns a poll
loop and publishes on a schedule — streaming adapters receive items whenever the
hardware fires them. The framework provides three primitives to bridge this
callback-based world into idiomatic async for iteration:
StreamablePort[T_co]— async port Protocol for hardware adapters (open/close/scan are awaitable)Stream[T]— the async iterator that converts push callbacks into pull iteration
As established in ADR-042 and
extended in ADR-045, these
primitives live at the hexagonal boundary (ADR-006): the adapter layer
implements the port contract; the domain handler iterates a Stream.
The StreamablePort Protocol¶
All streamable hardware adapters implement StreamablePort[T_co]:
class StreamablePort[T_co](Protocol):
async def open(self) -> None: ...
async def close(self) -> None: ...
async def start_scan(self) -> None: ...
async def stop_scan(self) -> None: ...
def register_callback(self, cb: Callable[[T_co], None]) -> None: ...
The five methods define a hardware lifecycle: connect, optionally begin a
scan phase, register one or more callbacks to receive items, stop scanning,
and disconnect. Lifecycle methods are awaitable — the stream runner calls
them with await. register_callback is synchronous because hardware
callbacks always fire sync; Stream handles the async boundary.
T_co is covariant — a StreamablePort[Sensor] satisfies
StreamablePort[BaseSensor].
Register with app.adapter() using the protocol as the key:
Stream[T] — the async bridge¶
Stream[T] converts sync callbacks into an AsyncIterator[T]:
| Method | Role |
|---|---|
put(item) |
Push end — called by the hardware callback (sync, never blocks) |
shutdown() |
Signal the iterator to stop (idempotent) |
async for item in stream |
Pull end — consumes items as they arrive |
__anext__ races queue.get() against a shutdown asyncio.Event using
asyncio.wait(FIRST_COMPLETED). There is no timeout polling: shutdown latency
is zero, and idle iteration blocks cleanly on the queue.
Typical pattern¶
import cosalette
from cosalette import Stream, StreamablePort
app = cosalette.App(name="sensor-bridge", version="1.0.0")
@app.device("ble-sensor")
async def ble_handler(
ctx: cosalette.DeviceContext,
port: BlePort, # implements StreamablePort[SensorReading]
):
stream: Stream[SensorReading] = Stream()
port.register_callback(stream.put)
await port.open()
await port.start_scan()
try:
async for reading in stream:
if ctx.shutdown_requested:
stream.shutdown()
continue
await ctx.publish_state({"reading": reading})
yield # reaction boundary
finally:
await port.stop_scan()
await port.close()
Push vs pull¶
Pull (@app.telemetry) |
Push (streaming) | |
|---|---|---|
| Data source | Polled on a schedule | Fires on hardware events |
| Timing control | Framework owns the interval | Hardware owns the schedule |
| MQTT integration | @app.telemetry decorator |
Manual via ctx.publish_state() |
| Shutdown | ctx.shutdown_requested |
stream.shutdown() |
When to use @app.stream¶
@app.stream is the managed-lifecycle alternative to the manual @app.device
pattern above. The framework owns the port lifecycle; the handler iterates a
Stream[T] and may inject DeviceContext, DeviceStore, and the concrete
adapter alongside it:
import cosalette
from cosalette import DeviceStore, Stream
app = cosalette.App(name="sensor-bridge", version="1.0.0", store=store_backend)
@app.stream("ble-sensor")
async def ble_handler(
stream: Stream[SensorReading],
ctx: cosalette.DeviceContext, # optional — inject to publish to MQTT
store: DeviceStore, # optional — inject to read/write persistent state
):
registry.restore_from(store)
async for reading in stream:
result = registry.record(reading)
if result.is_new:
await ctx.publish_state({"sensor": result.name, "value": reading.value})
store["last_reading"] = reading.value
yield # reaction boundary
The handler must declare exactly one Stream[T] parameter. Declaring
StreamablePort[T] directly is not permitted —
the framework manages the port and injects only the stream.
DeviceContext is always available for injection. DeviceStore requires the
app to be configured with a store backend (App(store=...)); without one,
declaring DeviceStore causes a TypeError when the handler starts — the
production stream runner logs the error and exits the task; AppHarness.inject_stream
raises it directly to the test.
Before invoking the handler, the framework:
- Locates the registered
StreamablePort[T]adapter. - Creates a
Stream[T]instance. - Opens the port:
await port.open(),port.register_callback(stream.put), andawait port.start_scan().
On shutdown, after the handler exits, the framework calls stream.shutdown(),
then await port.stop_scan() and await port.close(). The store is saved before exit.
Concrete adapter injection¶
The framework injects a capability-limited proxy under the concrete adapter type so handlers can call non-lifecycle methods on it directly:
class SerialPort(StreamablePort[Frame]):
async def open(self) -> None: ...
async def close(self) -> None: ...
async def start_scan(self) -> None: ...
async def stop_scan(self) -> None: ...
def register_callback(self, cb: Callable[[Frame], None]) -> None: ...
def set_led(self, on: bool) -> None: ... # device-specific, not a lifecycle method
app.adapter(StreamablePort[Frame], lambda: SerialPort("/dev/ttyUSB0"))
@app.stream("serial-receiver")
async def handle_frames(stream: Stream[Frame], port: SerialPort):
async for frame in stream:
await process(frame)
port.set_led(True) # OK — non-lifecycle method
yield
Lifecycle methods raise AttributeError on the injected adapter
Production run_stream() injects a capability-limited proxy under the
concrete type — not the raw adapter. Non-lifecycle attributes and methods
forward transparently, but calling open(), close(), start_scan(), or
stop_scan() raises AttributeError because lifecycle belongs exclusively
to the framework.
AppHarness.inject_stream() is a test-only shortcut that bypasses
production lifecycle management; it may inject raw test instances without
this restriction.
Manual wiring vs @app.stream¶
@app.device (manual) |
@app.stream (managed) |
|
|---|---|---|
| Port lifecycle | Handler calls open() / close() |
Framework manages |
| Async lifecycle | Manual await calls |
Handled automatically — lifecycle always awaited |
| Shutdown signal | stream.shutdown() in your loop |
Framework signals before cleanup |
| What handler receives | DeviceContext + port |
Stream[T] + optional DeviceContext, DeviceStore, concrete adapter |
| Persistent state | Manual store wiring | DeviceStore injection when App(store=...) is configured |
| Best for | Custom error handling, multiple ports, inbound MQTT commands |
Single-stream, callback-only devices |
Use @app.device with manual wiring when you need fine-grained port error
handling, multiple concurrent streams, or combined stream + MQTT command
support. Use @app.stream when a single callback stream is the primary data
path and you want framework-managed lifecycle, DI, and persistence.
See the Using @app.stream guide for step-by-step setup and testing patterns.