Using @app.stream¶
@app.stream eliminates the boilerplate of opening a port, wiring a callback,
and tearing everything down on shutdown. Register a StreamablePort[T] adapter
once, write a handler that iterates a Stream[T], and the framework handles
the rest.
Prerequisites
This guide assumes you've completed the Quickstart and read the Streaming concepts page.
When to use @app.stream¶
| Need | Primitive |
|---|---|
| Callback-based hardware (BLE, serial, HID) | @app.stream |
| Poll a sensor on a fixed interval | @app.telemetry |
| Full port control, multiple streams, or inbound MQTT commands | @app.device |
@app.stream has no built-in MQTT publish schedule. Publishing inside the
handler is your responsibility — call your MQTT client or a context helper
as needed. If you want automatic periodic state publication, use
@app.telemetry instead.
Step 1 — Define a StreamablePort adapter¶
Implement the five-method StreamablePort[T] protocol in your adapter:
from collections.abc import Callable
from cosalette import StreamablePort
class ScannerPort(StreamablePort["Barcode"]):
"""USB HID barcode scanner."""
def open(self) -> None: ...
def close(self) -> None: ...
def start_scan(self) -> None: ...
def stop_scan(self) -> None: ...
def register_callback(self, cb: Callable[["Barcode"], None]) -> None: ...
The five methods map to the hardware lifecycle: connect, begin a scan phase, receive items via registered callbacks, stop scanning, and disconnect. See Streaming concepts for the full protocol definition and covariance rules.
Step 2 — Register the adapter¶
Call app.adapter() before the handler is registered:
import cosalette
from myapp.adapters import UsbScannerAdapter
from myapp.ports import ScannerPort
app = cosalette.App(name="scanner-bridge", version="1.0.0")
app.adapter(ScannerPort, lambda: UsbScannerAdapter(device="/dev/hidraw0"))
The framework matches the Stream[Barcode] parameter in the handler to the
registered StreamablePort[Barcode] by type at startup. A missing or
mismatched adapter raises a TypeError before the app runs.
Step 3 — Write the handler¶
Declare a single Stream[T] parameter and iterate:
from cosalette import Stream
from myapp.models import Barcode
@app.stream("barcode-scanner") # (1)!
async def handle_scans(stream: Stream[Barcode]) -> None:
async for barcode in stream: # (2)!
await process_barcode(barcode) # (3)!
- The name string is optional. When omitted, the function name is used.
async forblocks until the next item arrives or shutdown is signalled. The framework signals shutdown by callingstream.shutdown(), which causes the iterator to stop after draining any queued items.- Your domain logic. Publish to MQTT, write to a database, forward downstream.
What the framework manages¶
Before calling the handler the framework:
- Locates the registered
StreamablePort[T]adapter. - Creates a
Stream[T]instance. - Calls
port.open(),port.register_callback(stream.put), andport.start_scan().
On shutdown, after the handler exits:
- Calls
stream.shutdown()to drain and stop the iterator. - Calls
port.stop_scan()andport.close().
Do not declare StreamablePort[T] as a handler parameter — the framework
manages it. Other DI targets (Settings subclasses, @app.state instances,
ClockPort) may be declared alongside Stream[T] as normal.
Step 4 — Test with inject_stream¶
AppHarness.inject_stream feeds items directly into the handler's stream,
bypassing the hardware adapter entirely:
import pytest
from cosalette.testing import AppHarness
from myapp.app import app
from myapp.models import Barcode
@pytest.fixture
def harness() -> AppHarness:
return AppHarness.create(app=app)
@pytest.mark.asyncio
async def test_barcode_processed(harness: AppHarness) -> None:
barcode = Barcode(code="12345678", symbology="EAN-8")
await harness.inject_stream( # (1)!
"barcode-scanner",
barcode,
shutdown=True, # (2)!
)
assert harness.state("barcode-scanner")["last_code"] == "12345678"
inject_stream(name, *items)delivers each item to the handler's stream in order, then signals shutdown so theasync forloop exits cleanly.shutdown=Trueis the default. Passshutdown=Falseto keep the stream open for multi-batch injection tests.
Conditional registration with enabled=¶
enabled= follows the same rules as all other cosalette decorators:
# Skip at decoration time
@app.stream("scanner", enabled=False)
async def handle_scans(stream: Stream[Barcode]) -> None:
...
# Defer the decision to bootstrap — settings are resolved first
@app.stream(
"scanner",
enabled=lambda s: s.scanner_enabled,
)
async def handle_scans(stream: Stream[Barcode]) -> None:
...
A callable receives the resolved Settings instance. When it returns False
the handler is silently skipped — no adapter is opened, no task is spawned.
See
ADR-038
for the deferred enabled= design rationale.
Complete example¶
from __future__ import annotations
import cosalette
from cosalette import Stream
from myapp.adapters import UsbScannerAdapter
from myapp.models import Barcode
from myapp.ports import ScannerPort
app = cosalette.App(name="scanner-bridge", version="1.0.0")
app.adapter(ScannerPort, lambda: UsbScannerAdapter(device="/dev/hidraw0"))
@app.stream("barcode-scanner")
async def handle_scans(stream: Stream[Barcode]) -> None:
async for barcode in stream:
await process_barcode(barcode)
app.run()
See also¶
- Streaming concepts —
StreamablePort,Stream[T], and the push-to-pull bridge - Device archetypes — choosing the right decorator
- Testing — full harness reference
- ADR-042 — Streaming protocol