Using @app.stream¶
@app.stream eliminates the boilerplate of opening a port, wiring a callback,
and tearing everything down on shutdown. Register a StreamablePort[T]
adapter once, write a handler that iterates a Stream[T], and the framework
handles lifecycle, DI, and persistence.
Prerequisites
This guide assumes you've completed the Quickstart and read the Streaming concepts page.
When to use @app.stream¶
| Need | Primitive |
|---|---|
| Callback-based hardware (BLE, serial, HID) | @app.stream |
| Poll a sensor on a fixed interval | @app.telemetry |
| Full port control, multiple streams, or inbound MQTT commands | @app.device |
@app.stream has no built-in MQTT publish schedule. Publishing inside the
handler is your responsibility via the injected DeviceContext. If you want
automatic periodic state publication on a schedule, use @app.telemetry
instead.
Step 1 — Define a port adapter¶
Implement StreamablePort[T] for your hardware adapter. All lifecycle methods
are async; register_callback is synchronous:
from collections.abc import Callable
from cosalette import StreamablePort
class ScannerPort(StreamablePort["Barcode"]):
"""USB HID barcode scanner."""
async def open(self) -> None: ...
async def close(self) -> None: ...
async def start_scan(self) -> None: ...
async def stop_scan(self) -> None: ...
def register_callback(self, cb: Callable[["Barcode"], None]) -> None: ...
The five methods map to the hardware lifecycle: connect, begin a scan phase, receive items via registered callbacks, stop scanning, and disconnect. See Streaming concepts for the full protocol definition and covariance rules.
Step 2 — Register the adapter¶
Call app.adapter() using the port protocol as the key:
import cosalette
from cosalette import StreamablePort
from myapp.adapters import UsbScannerAdapter, BleAdapter
from myapp.models import Barcode, SensorReading
app = cosalette.App(name="scanner-bridge", version="1.0.0")
app.adapter(StreamablePort[Barcode], lambda: UsbScannerAdapter(device="/dev/hidraw0"))
app.adapter(StreamablePort[SensorReading], lambda: BleAdapter("AA:BB:CC:DD"))
The framework matches the Stream[T] parameter in the handler to the registered
port by item type at startup.
Step 3 — Write the handler¶
Stateless handler¶
Declare a single Stream[T] parameter and iterate:
from cosalette import Stream
from myapp.models import Barcode
@app.stream("barcode-scanner") # (1)!
async def handle_scans(stream: Stream[Barcode]):
async for barcode in stream: # (2)!
await process_barcode(barcode) # (3)!
yield # (4)!
- The name string is optional. When omitted, the function name is used.
async forblocks until the next item arrives or shutdown is signalled. The framework signals shutdown by callingstream.shutdown(), which causes the iterator to stop immediately; items still in the queue may be discarded.- Your domain logic. Publish to MQTT, write to a database, forward downstream.
yieldmarks the reaction boundary. Place it after processing each stream item. If any@app.reactreactors are registered for mutated state, the framework drains events and runs them before the nextasync foriteration. Omittingyieldbatches all items before reactor dispatch — use this only when accumulating state across items is the intended behaviour.
Stateful handler — DeviceContext and DeviceStore¶
Declare DeviceContext to publish MQTT messages and DeviceStore to persist
state across restarts:
from collections.abc import AsyncIterator
from cosalette import DeviceContext, DeviceStore, Stream
from myapp.models import SensorReading
app = cosalette.App(name="sensor-bridge", version="1.0.0", store=store_backend) # (1)!
app.adapter(StreamablePort[SensorReading], lambda: BleAdapter("AA:BB:CC:DD"))
@app.stream("ble-sensor")
async def handle_readings(
stream: Stream[SensorReading],
ctx: DeviceContext, # (2)!
store: DeviceStore, # (3)!
) -> AsyncIterator[None]:
registry.restore_from(store) # (4)!
async for reading in stream:
result = registry.record(reading)
if result.is_new:
await ctx.publish_state({ # (5)!
"sensor": result.name,
"value": reading.value,
})
store["last_seen"] = reading.sensor_id # (6)!
yield # reaction boundary
store=is required to useDeviceStorein handlers. Without it, declaringDeviceStorecauses aTypeErrorwhen the handler starts — the production stream runner logs the error and exits the task;AppHarness.inject_streamraises it directly.DeviceContextis always available. Use it to callctx.publish_state(),ctx.publish(), andctx.sleep(). To publish availability directly, usectx.publish("availability", "online", retain=True).DeviceStoreis aMutableMapping[str, Any]scoped to this stream by name. Load it to restore persisted values; write to it to persist new state.- Restore domain state before the first reading arrives. This is the idiomatic pattern for stateful receivers that survive application restarts.
- Publish arbitrary state to the stream's MQTT topic. The framework manages topic construction using the stream name.
- Mark the store dirty. The framework saves the store to the backend on graceful shutdown.
Concrete adapter injection¶
When your adapter has device-specific methods beyond the port lifecycle — for
example a set_led() call — declare the concrete type alongside Stream[T]:
from myapp.ports import SerialPort # implements StreamablePort[Frame]
@app.stream("serial-receiver")
async def handle_frames(stream: Stream[Frame], port: SerialPort):
async for frame in stream:
await process(frame)
port.set_led(True) # non-lifecycle method — safe to call
yield
Lifecycle methods raise AttributeError on the injected adapter
Production run_stream() injects a capability-limited proxy under the
concrete type — not the raw adapter. Non-lifecycle attributes and methods
forward transparently, but open(), close(), start_scan(), and
stop_scan() raise AttributeError because lifecycle belongs to the
framework.
AppHarness.inject_stream() is a test-only shortcut that bypasses
production lifecycle management; it may inject raw test instances without
this restriction.
What the framework manages¶
Before calling the handler the framework:
- Locates the registered
StreamablePort[T]adapter. - Creates a
Stream[T]instance. - Opens the port:
await port.open(),port.register_callback(stream.put), andawait port.start_scan(). - Injects
DeviceContext,DeviceStore(if configured), and a capability-limited proxy under the concrete adapter type into the provider map.
On shutdown, after the handler exits:
- Calls
stream.shutdown()to send an immediate stop signal; any items still queued may be discarded. - Calls
port.stop_scan()andport.close()(awaited). - Saves the
DeviceStoreto the backend.
Do not declare StreamablePort[T] as handler
parameters — the framework manages them. Settings subclasses, @app.state
instances, and ClockPort may be declared alongside Stream[T] as usual.
Step 4 — Test with inject_stream¶
AppHarness.inject_stream feeds items directly into the handler's stream,
bypassing the hardware adapter entirely. DeviceContext, DeviceStore,
Settings, concrete adapters, and ClockPort are resolved through the same
provider map as production execution. To supply @app.state dependencies, call
harness.override_state() before inject_stream, or pass them via providers=.
Basic usage¶
import pytest
from cosalette import Stream, StreamablePort
from cosalette.testing import AppHarness
from myapp.adapters import UsbScannerAdapter
from myapp.models import Barcode
@pytest.mark.asyncio
async def test_barcode_processed() -> None:
harness = AppHarness.create() # (1)!
harness.app.adapter(StreamablePort[Barcode], lambda: UsbScannerAdapter(device="/dev/hidraw0"))
captured: list[Barcode] = []
@harness.app.stream("barcode-scanner")
async def handle_scans(stream: Stream[Barcode]):
async for barcode in stream:
captured.append(barcode)
yield
barcode = Barcode(code="12345678", symbology="EAN-8")
await harness.inject_stream("barcode-scanner", barcode) # (2)!
assert captured == [barcode]
AppHarness.create()builds a freshAppwith test doubles. Register adapters and handlers onharness.app— the harness does not accept an existing app.inject_stream(name, *items)delivers each item to the handler's stream in order, then signals shutdown so theasync forloop exits cleanly. Passshutdown=Falseto keep the stream open for multi-batch tests.
Testing stateful handlers with DeviceContext and DeviceStore¶
For handlers that inject DeviceContext or DeviceStore, register the handler
on harness.app and assert via harness.mqtt or the store backend:
import pytest
from collections.abc import AsyncIterator
from cosalette import DeviceContext, DeviceStore, MemoryStore, Stream
from cosalette.testing import AppHarness
from myapp.adapters import BleAdapter
from myapp.models import SensorReading
@pytest.mark.asyncio
async def test_publishes_new_sensor() -> None:
harness = AppHarness.create(name="sensor-bridge")
harness.app.adapter(StreamablePort[SensorReading], lambda: BleAdapter("AA:BB:CC:DD"))
@harness.app.stream("ble-sensor")
async def handle_readings(stream: Stream[SensorReading], ctx: DeviceContext) -> AsyncIterator[None]:
async for reading in stream:
await ctx.publish_state({"sensor_id": reading.sensor_id, "value": reading.value})
yield
reading = SensorReading(sensor_id=17, value=22.4)
await harness.inject_stream("ble-sensor", reading) # (1)!
published = harness.mqtt.get_messages_for("sensor-bridge/ble-sensor/state")
assert len(published) == 1
import json
assert json.loads(published[0][0])["sensor_id"] == 17
@pytest.mark.asyncio
async def test_restores_registry_from_store() -> None:
mem_store = MemoryStore({"ble-sensor": {"last_seen": 42}}) # (2)!
harness = AppHarness.create(name="sensor-bridge", store=mem_store) # (3)!
harness.app.adapter(StreamablePort[SensorReading], lambda: BleAdapter("AA:BB:CC:DD"))
@harness.app.stream("ble-sensor")
async def handle_readings(stream: Stream[SensorReading], store: DeviceStore) -> AsyncIterator[None]:
async for reading in stream:
store["last_seen"] = reading.sensor_id
yield
reading = SensorReading(sensor_id=42, value=18.0)
await harness.inject_stream("ble-sensor", reading)
# Verify the store was updated after the handler ran
saved = mem_store.load("ble-sensor")
assert saved is not None
assert saved["last_seen"] == 42
inject_streamauto-wires aDeviceContextfromharness.mqttandharness.clock. All publishes are captured inharness.mqtt.published.- Seed the store with pre-existing state to test restore behaviour.
AppHarness.create(store=...)sets the store backend.inject_streamauto-creates aDeviceStorekeyed by handler name, loads it before the handler runs, and saves it on exit.
Full signature¶
await harness.inject_stream(
name, # stream handler name
*items, # items to deliver into the stream
shutdown=True, # signal shutdown after items are delivered
ctx=None, # DeviceContext override (replaces harness default)
store=None, # Store backend override (replaces app._store)
providers=None, # extra DI providers merged last (highest precedence)
adapters=None, # concrete adapters injected under their own type
)
| Parameter | Default | Effect |
|---|---|---|
shutdown |
True |
Auto-signal stream shutdown after delivery |
ctx |
harness default | Replace the entire DeviceContext; harness doubles not merged in |
store |
app._store |
Override the store backend for this call |
providers |
{} |
Merged last — highest-precedence DI overrides |
adapters |
{} |
Concrete adapter instances injected by their type |
Lifecycle is still bypassed
inject_stream never calls port.open(), port.start_scan(),
port.stop_scan(), or port.close(). Hardware adapters registered with
the app are not instantiated. Only the stream items and DI providers you
pass are available to the handler.
Conditional registration with enabled=¶
enabled= follows the same rules as all other cosalette decorators:
# Skip at decoration time
@app.stream("scanner", enabled=False)
async def handle_scans(stream: Stream[Barcode]):
async for barcode in stream:
...
yield
# Defer the decision to bootstrap — settings are resolved first
@app.stream(
"scanner",
enabled=lambda s: s.scanner_enabled,
)
async def handle_scans(stream: Stream[Barcode]):
async for barcode in stream:
...
yield
A callable receives the resolved Settings instance. When it returns False
the handler is silently skipped — no adapter is opened, no task is spawned.
See
ADR-038
for the deferred enabled= design rationale.
Complete example¶
from __future__ import annotations
import cosalette
from cosalette import Stream, StreamablePort
from myapp.adapters import UsbScannerAdapter
from myapp.models import Barcode
app = cosalette.App(name="scanner-bridge", version="1.0.0")
app.adapter(StreamablePort[Barcode], lambda: UsbScannerAdapter(device="/dev/hidraw0"))
@app.stream("barcode-scanner")
async def handle_scans(stream: Stream[Barcode]):
async for barcode in stream:
await process_barcode(barcode)
yield
app.run()
from __future__ import annotations
from collections.abc import AsyncIterator
import cosalette
from cosalette import DeviceContext, DeviceStore, Stream
from myapp.adapters import BleAdapter
from myapp.models import SensorReading
app = cosalette.App(name="sensor-bridge", version="1.0.0", store=store_backend)
app.adapter(StreamablePort[SensorReading], lambda: BleAdapter("AA:BB:CC:DD"))
@app.stream("ble-sensor")
async def handle_readings(
stream: Stream[SensorReading],
ctx: DeviceContext,
store: DeviceStore,
) -> AsyncIterator[None]:
registry.restore_from(store)
async for reading in stream:
result = registry.record(reading)
if result.is_new:
await ctx.publish_state({"sensor": result.name, "value": reading.value})
store["last_seen"] = reading.sensor_id
yield # reaction boundary
app.run()
See also¶
- Streaming concepts —
StreamablePort,Stream[T], and the push-to-pull bridge - Device archetypes — choosing the right decorator
- Testing — full harness reference
- ADR-042 — Streaming protocol
- ADR-045 — Stateful stream receiver semantics