ADR-042: Streaming Protocol: StreamablePort and Stream[T]¶
Status¶
Accepted Date: 2026-04-26
Context¶
IoT hardware devices — BLE peripherals, serial adapters, HID sensors — deliver data via push callbacks rather than the pull-oriented async generator or polling loop that @app.telemetry assumes. There is no existing primitive in cosalette for bridging sync hardware callbacks into async iteration. Adapters working with such devices must currently roll their own Queue+Event patterns, duplicating boilerplate and risking inconsistent shutdown semantics.
Decision¶
Add StreamablePort[T_co] (runtime-checkable Protocol) and Stream[T] (concrete AsyncIterator backed by asyncio.Queue + asyncio.Event) to packages/src/cosalette/_stream.py. StreamablePort defines the five-method lifecycle contract (open, close, start_scan, stop_scan, register_callback). Stream bridges sync push callbacks into async for loops by racing asyncio.Queue.get() against a shutdown asyncio.Event in anext using asyncio.wait with FIRST_COMPLETED — no timeout polling, no busy-wait. Stream accepts a maxsize parameter (0 = unbounded, matching asyncio.Queue semantics) and a thread_safe flag; when thread_safe=True, put() captures the running event loop at construction and uses call_soon_threadsafe to marshal items from non-event-loop threads. The shutdown task is created once on the first iteration and reused across subsequent calls to halve per-iteration Task allocations. Both types are exported from the top-level cosalette namespace. This file uses PEP 695 type-parameter syntax (class Foo[T]) consistent with its existing use elsewhere in the codebase (e.g. DeviceContext.adapter[T]).
stream: Stream[SensorReading] = Stream()
port.register_callback(stream.put)
port.open()
port.start_scan()
try:
async for reading in stream:
await ctx.publish_state({"reading": reading})
finally:
port.stop_scan()
port.close()
Decision Drivers¶
- Hardware push adapters (BLE, serial, HID) cannot use async generators or polling — they fire callbacks on their own schedule
- Shutdown must be deterministic and latency-free: a shutdown event race eliminates polling delays and busy-waits
- The bridge pattern (Queue + Event) should be a first-class framework primitive, not repeated per-adapter boilerplate
- Protocol covariance (T_co) allows port adapters of subtypes to satisfy the StreamablePort[BaseType] contract
Considered Options¶
Option 1: asyncio.Queue + asyncio.Event race (chosen)¶
Stream[T] holds an asyncio.Queue[T] and asyncio.Event. anext creates two tasks (queue.get and event.wait) and races them with asyncio.wait(FIRST_COMPLETED). Shutdown wins over queued data, cancels the pending task, and raises StopAsyncIteration.
- Advantages: Zero polling latency: shutdown is detected in the same event loop tick; No timeout parameter needed — no tuning required; Fully deterministic in tests: advance queue or set event to control iterator; put() is sync and safe to call from hardware callbacks in the same event-loop thread; thread_safe=True enables cross-thread use via call_soon_threadsafe
- Disadvantages: Creates two asyncio Tasks per anext call — minor overhead compared to timeout polling; Task cancellation requires careful handling to avoid resource leaks
Option 2: asyncio.wait_for with timeout polling¶
Stream.anext calls asyncio.wait_for(queue.get(), timeout=0.1) in a loop, checking a shutdown flag on TimeoutError.
- Advantages: Simple implementation with no task management; Familiar pattern to developers who know asyncio.wait_for
- Disadvantages: Up to 100 ms shutdown latency by default — unacceptable for responsive shutdown; Timeout value must be tuned per deployment — adds configuration burden; Busy-waits during idle periods, consuming event loop ticks
Option 3: Async generator adapter function¶
Provide an async generator function stream_from_port(port) that internally creates a Queue, registers the callback, and yields from the queue with event-based shutdown.
- Advantages: Async generator syntax is familiar and idiomatic; No class instantiation required — functional style
- Disadvantages: Cannot separate put() from the iteration context — callback registration is coupled to the generator lifetime; Harder to test in isolation: no Stream.put() to call directly in tests; Does not cleanly support reuse of the same stream across multiple consumers
Decision Matrix¶
| Criterion | asyncio.Queue + asyncio.Event race (chosen) | asyncio.wait_for with timeout polling | Async generator adapter function |
|---|---|---|---|
| Shutdown latency | 5 | 2 | 4 |
| Test simplicity | 5 | 3 | 3 |
| Thread safety of put() | 5 | 5 | 3 |
| No tuning parameters required | 5 | 1 | 5 |
Scale: 1 (poor) to 5 (excellent)
Consequences¶
Positive¶
- Adapter authors get a zero-boilerplate bridge: create Stream(), call register_callback(stream.put), iterate with async for
- Shutdown semantics are explicit and consistent across all streaming adapters
- StreamablePort Protocol enables isinstance checks and DI injection for port adapters
- Sets the reference implementation pattern for PEP 695 type-parameter syntax in this codebase
Negative¶
- Creates one asyncio Task per anext invocation (the shutdown task is created once and reused; only the queue task is allocated per call)
- Items remaining in the queue at shutdown are silently dropped — callers must handle any required drain before calling shutdown()
2026-04-26