Testing Utilities¶
Reference for the cosalette.testing package — test doubles, factories, and
pytest fixtures for testing cosalette applications.
Test Harness¶
cosalette.testing.AppHarness
dataclass
¶
AppHarness(
app: App,
mqtt: MockMqttClient,
clock: FakeClock,
settings: Settings,
shutdown_event: Event,
run_periodic: bool = False,
)
Test harness wrapping App with pre-configured test doubles.
Provides unified access to App, MockMqttClient, FakeClock, Settings, and a shutdown Event — eliminating boilerplate in integration-style tests.
Usage::
harness = AppHarness.create()
@harness.app.device("sensor")
async def sensor(ctx):
...
# Run with auto-shutdown after device_called event:
await harness.run()
See Also
ADR-007 for testing strategy decisions.
create
classmethod
¶
create(
*,
name: str = "testapp",
version: str = "1.0.0",
dry_run: bool = False,
lifespan: LifespanFunc | None = None,
store: Store | None = None,
run_periodic: bool = False,
**settings_overrides: Any,
) -> Self
Create a harness with fresh test doubles.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
App name. |
'testapp'
|
version
|
str
|
App version. |
'1.0.0'
|
dry_run
|
bool
|
When True, forward to App for dry-run adapter variants. |
False
|
lifespan
|
LifespanFunc | None
|
Optional lifespan context manager forwarded to
:class: |
None
|
store
|
Store | None
|
Optional :class: |
None
|
run_periodic
|
bool
|
When True, periodic tasks will be started; when False, they will be suppressed for testing. |
False
|
**settings_overrides
|
Any
|
Forwarded to :func: |
{}
|
Returns:
| Type | Description |
|---|---|
Self
|
A fully wired :class: |
Source code in packages/src/cosalette/testing/_harness.py
run
async
¶
Run _run_async with the harness's test doubles.
Source code in packages/src/cosalette/testing/_harness.py
trigger_shutdown
¶
inject_stream
async
¶
inject_stream(
name: str,
*items: Any,
shutdown: bool = True,
ctx: DeviceContext | None = None,
store: Store | None = None,
providers: dict[type, Any] | None = None,
adapters: dict[type, object] | None = None,
) -> None
Push items into a named stream handler for testing.
Finds the registered @app.stream handler by name, creates a Stream, pushes the provided items, optionally signals shutdown, and runs the handler directly (bypassing adapter lifecycle).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Stream handler name as registered with @app.stream. |
required |
*items
|
Any
|
Items to push into the stream. |
()
|
shutdown
|
bool
|
When True (default), call stream.shutdown() after all items are pushed so the handler's async for loop terminates. |
True
|
ctx
|
DeviceContext | None
|
Optional :class: |
None
|
store
|
Store | None
|
Optional :class: |
None
|
providers
|
dict[type, Any] | None
|
Extra DI providers merged into the provider map with the highest priority (override everything else). |
None
|
adapters
|
dict[type, object] | None
|
Concrete adapter instances injected by their concrete
type into both the DI provider map and the
:class: |
None
|
Note
When ctx is supplied it replaces the entire :class:DeviceContext
— harness doubles (mqtt, clock, shutdown_event) are not merged in.
adapters are added to the DI providers map but not injected
into the explicitly supplied ctx. If you need both a custom
context and adapter injection, build the context with the adapters
you need and pass both ctx and adapters.
Raises:
| Type | Description |
|---|---|
ValueError
|
If no stream handler with name is registered. |
TypeError
|
If a required dependency (e.g. :class: |
Source code in packages/src/cosalette/testing/_harness.py
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 | |
override_state
¶
Override a @app.state factory with a pre-built test double.
Bypasses the factory entirely; instance is injected directly
into the DI container at bootstrap. Call before :meth:run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state_type
|
type
|
The type returned by the factory (the DI key). |
required |
instance
|
Any
|
The test double to inject. |
required |
Raises:
| Type | Description |
|---|---|
TypeError
|
If instance is not an instance of state_type. |
Source code in packages/src/cosalette/testing/_harness.py
tick_periodic
async
¶
Invoke one cycle of the named periodic handler (bypasses interval).
Directly calls the handler's function with injected arguments — skips the asyncio sleep so you can test the handler logic without waiting for the interval.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
The periodic task name as registered with |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
if no periodic task with name exists. |
Source code in packages/src/cosalette/testing/_harness.py
published
¶
Return a snapshot of all MQTT messages published so far.
Returns:
| Type | Description |
|---|---|
list[tuple[str, str, bool, int]]
|
Snapshot list of |
list[tuple[str, str, bool, int]]
|
is a copy — mutating the returned list does not affect the |
list[tuple[str, str, bool, int]]
|
class: |
Source code in packages/src/cosalette/testing/_harness.py
messages_for
¶
Return all messages published to topic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
MQTT topic filter (exact match only). |
required |
Returns:
| Type | Description |
|---|---|
list[tuple[str, bool, int]]
|
List of |
Source code in packages/src/cosalette/testing/_harness.py
last_published
¶
Return the most recent MQTT publish, or None if no publishes.
Returns:
| Type | Description |
|---|---|
tuple[str, str, bool, int] | None
|
|
Source code in packages/src/cosalette/testing/_harness.py
assert_published
¶
Assert that topic has published messages matching criteria.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
MQTT topic to check (exact match). |
required |
contains
|
str | None
|
Optional substring that must appear in at least one payload for topic. |
None
|
count
|
int | None
|
Optional exact number of messages that must have been published to topic. |
None
|
Raises:
| Type | Description |
|---|---|
AssertionError
|
If no messages for topic, or if contains is not found in any payload, or if message count doesn't match count. |
Source code in packages/src/cosalette/testing/_harness.py
inject_command
async
¶
Simulate an inbound MQTT command to device.
Delivers a message to {topic_prefix}/{device}/set (or
{topic_prefix}/set for root commands) via the
:class:MockMqttClient, triggering registered command callbacks.
This is an MQTT-delivery helper — the app must be running and callbacks must be registered for the command to be processed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
device
|
str | None
|
Device name as registered with |
required |
payload
|
str
|
MQTT payload string. |
required |
topic
|
str | None
|
Optional explicit topic override. When |
None
|
See Also
:meth:call_command for direct command handler invocation without
requiring the app to be running.
Source code in packages/src/cosalette/testing/_harness.py
call_command
async
¶
Directly invoke a registered @app.command handler.
Resolves the handler by name, injects dependencies, calls it with
the deserialized payload, and publishes any returned state to
harness.mqtt — mirroring production execution without requiring
the app to be running.
Supports production request binding including typed Pydantic payloads
(Annotated[Model, Payload()]), payload/topic/message
parameters, DeviceContext, and simple DI providers available to
CommandRunner. Does NOT run adapter lifecycle, state factory
lifecycle, or reactors.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Command handler name as registered with |
required |
payload
|
str | dict[str, object]
|
MQTT payload — either a JSON string or a dict that will be serialized to JSON. |
required |
topic
|
str | None
|
Optional MQTT topic string. When |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If no command handler with name is registered. |
Exception
|
Any exception raised by the handler is propagated. |
Note
For tests requiring adapter lifecycle, state factory lifecycle,
or reactor dispatch, use :meth:inject_command with the app
running. init= command callbacks are NOT run; handlers that
cache init results will receive None for those
dependencies. Reactor dispatch is disabled; if the handler
triggers side-effects via reactors, use :meth:inject_command
with the app running instead.
See Also
:meth:inject_command for MQTT-delivery simulation requiring the
app to be running.
Source code in packages/src/cosalette/testing/_harness.py
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 | |
advance_time
async
¶
Advance test clock by seconds, yielding to event loop.
Convenience wrapper over await harness.clock.sleep(seconds).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
seconds
|
float
|
Time delta to advance. |
required |
Source code in packages/src/cosalette/testing/_harness.py
run_stream
async
¶
Run a stream handler's full lifecycle (open → scan → close).
Constructs a minimal :class:_StreamRegistration from func, then
calls :func:run_stream with the provided adapters. Useful for
testing stream handler behaviour without wiring a full app.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Any
|
The async-generator stream handler to run. |
required |
adapters
|
dict[type, Any]
|
Resolved adapter map keyed by port type
(e.g. |
required |
shutdown
|
Event | None
|
Optional :class: |
None
|
Source code in packages/src/cosalette/testing/_harness.py
Clock¶
cosalette.testing.FakeClock
dataclass
¶
Test double for ClockPort.
Attributes:
| Name | Type | Description |
|---|---|---|
_time |
float
|
The current "now" value returned by |
Example::
clock = FakeClock(42.0)
assert clock.now() == 42.0
clock._time = 99.0
assert clock.now() == 99.0
now
¶
sleep
async
¶
Advance virtual time by seconds with no real delay.
Allows tests to exercise sleep-dependent code paths
without wall-clock waiting. The asyncio.sleep(0)
yields to the event loop so concurrent tasks interleave
correctly.
Source code in packages/src/cosalette/testing/_clock.py
MQTT Test Doubles¶
cosalette.testing.MockMqttClient
dataclass
¶
MockMqttClient(
published: list[tuple[str, str, bool, int]] = list(),
subscriptions: list[str] = list(),
raise_on_publish: Exception | None = None,
)
In-memory test double that records MQTT interactions.
Records publishes and subscriptions for assertion. Supports
callback registration and simulated message delivery via
deliver().
publish
async
¶
publish(
topic: str,
payload: str | dict[str, Any],
*,
retain: bool = False,
qos: int = 1,
) -> None
Record a publish call, or raise if raise_on_publish is set.
Source code in packages/src/cosalette/_mqtt/__init__.py
subscribe
async
¶
on_message
¶
on_message(callback: MessageCallback) -> None
deliver
async
¶
reset
¶
Clear all recorded data, callbacks, and failure injection.
get_messages_for
¶
Return (payload, retain, qos) tuples for topic.
Source code in packages/src/cosalette/_mqtt/__init__.py
cosalette.testing.NullMqttClient
dataclass
¶
Silent no-op MQTT adapter.
Every method is a no-op that logs at DEBUG level. Useful as a default when MQTT is not configured.
Settings Factory¶
cosalette.testing.make_settings
¶
make_settings(**overrides: Any) -> Settings
Create a Settings instance with sensible test defaults.
Instantiates an :class:_IsolatedSettings subclass whose only
configuration source is init_settings. This means the
factory ignores os.environ, .env files, and secret
directories — tests see only model defaults plus any explicit
overrides.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**overrides
|
Any
|
Keyword arguments forwarded to the |
{}
|
Returns:
| Type | Description |
|---|---|
Settings
|
A fully initialised :class: |
Example::
settings = make_settings()
assert settings.mqtt.host == "localhost"
from cosalette._settings import MqttSettings
custom = make_settings(mqtt=MqttSettings(host="broker.test"))
assert custom.mqtt.host == "broker.test"
Source code in packages/src/cosalette/testing/_settings.py
Pytest Fixtures¶
The cosalette.testing package registers a
pytest plugin
via the pytest11 entry point. The fixtures below are available
automatically when cosalette is installed:
| Fixture | Type | Description |
|---|---|---|
mock_mqtt |
MockMqttClient |
In-memory MQTT client for capturing published messages |
fake_clock |
FakeClock |
Deterministic clock starting at 0.0 |
device_context |
DeviceContext |
Pre-wired context with mock_mqtt and fake_clock |
All fixtures are function-scoped. Import them by name — no explicit import needed.
MemoryStore¶
cosalette.MemoryStore
¶
In-memory store backed by a plain dict.
Both load and save deep-copy data so that callers cannot
mutate internal state by accident. Designed for tests — mirrors
the FakeStorage pattern from gas2mqtt.
Parameters¶
initial: Optional seed data. The mapping is deep-copied on construction.
Source code in packages/src/cosalette/_persistence/_stores.py
load
¶
Return a deep copy of the stored dict, or None.
MemoryStore is the recommended test double for persistence. It stores
data in an in-memory dictionary, avoiding filesystem access in tests.