Skip to content

Testing Utilities

Reference for the cosalette.testing package — test doubles, factories, and pytest fixtures for testing cosalette applications.

Test Harness

cosalette.testing.AppHarness dataclass

AppHarness(
    app: App,
    mqtt: MockMqttClient,
    clock: FakeClock,
    settings: Settings,
    shutdown_event: Event,
    run_periodic: bool = False,
)

Test harness wrapping App with pre-configured test doubles.

Provides unified access to App, MockMqttClient, FakeClock, Settings, and a shutdown Event — eliminating boilerplate in integration-style tests.

Usage::

harness = AppHarness.create()

@harness.app.device("sensor")
async def sensor(ctx):
    ...

# Run with auto-shutdown after device_called event:
await harness.run()
See Also

ADR-007 for testing strategy decisions.

create classmethod

create(
    *,
    name: str = "testapp",
    version: str = "1.0.0",
    dry_run: bool = False,
    lifespan: LifespanFunc | None = None,
    store: Store | None = None,
    run_periodic: bool = False,
    **settings_overrides: Any,
) -> Self

Create a harness with fresh test doubles.

Parameters:

Name Type Description Default
name str

App name.

'testapp'
version str

App version.

'1.0.0'
dry_run bool

When True, forward to App for dry-run adapter variants.

False
lifespan LifespanFunc | None

Optional lifespan context manager forwarded to :class:App.

None
store Store | None

Optional :class:Store backend for device persistence.

None
run_periodic bool

When True, periodic tasks will be started; when False, they will be suppressed for testing.

False
**settings_overrides Any

Forwarded to :func:make_settings.

{}

Returns:

Type Description
Self

A fully wired :class:AppHarness ready for test use.

Source code in packages/src/cosalette/testing/_harness.py
@classmethod
def create(
    cls,
    *,
    name: str = "testapp",
    version: str = "1.0.0",
    dry_run: bool = False,
    lifespan: LifespanFunc | None = None,
    store: Store | None = None,
    run_periodic: bool = False,
    **settings_overrides: Any,
) -> Self:
    """Create a harness with fresh test doubles.

    Args:
        name: App name.
        version: App version.
        dry_run: When True, forward to App for dry-run adapter variants.
        lifespan: Optional lifespan context manager forwarded to
            :class:`App`.
        store: Optional :class:`Store` backend for device persistence.
        run_periodic: When True, periodic tasks will be started; when False,
            they will be suppressed for testing.
        **settings_overrides: Forwarded to :func:`make_settings`.

    Returns:
        A fully wired :class:`AppHarness` ready for test use.
    """
    return cls(
        app=App(
            name=name,
            version=version,
            dry_run=dry_run,
            lifespan=lifespan,
            store=store,
        ),
        mqtt=MockMqttClient(),
        clock=FakeClock(),
        settings=make_settings(**settings_overrides),
        shutdown_event=asyncio.Event(),
        run_periodic=run_periodic,
    )

run async

run() -> None

Run _run_async with the harness's test doubles.

Source code in packages/src/cosalette/testing/_harness.py
async def run(self) -> None:
    """Run ``_run_async`` with the harness's test doubles."""
    periodic_backup = list(self.app._periodic)
    streams_backup = list(self.app._streams)
    if not self.run_periodic:
        self.app._periodic = []
    # Always suppress streams in harness.run() — use inject_stream() instead
    self.app._streams = []
    try:
        await self.app._run_async(
            settings=self.settings,
            shutdown_event=self.shutdown_event,
            mqtt=self.mqtt,
            clock=self.clock,
        )
    finally:
        self.app._periodic = periodic_backup
        self.app._streams = streams_backup

trigger_shutdown

trigger_shutdown() -> None

Signal the shutdown event.

Source code in packages/src/cosalette/testing/_harness.py
def trigger_shutdown(self) -> None:
    """Signal the shutdown event."""
    self.shutdown_event.set()

inject_stream async

inject_stream(
    name: str,
    *items: Any,
    shutdown: bool = True,
    ctx: DeviceContext | None = None,
    store: Store | None = None,
    providers: dict[type, Any] | None = None,
    adapters: dict[type, object] | None = None,
) -> None

Push items into a named stream handler for testing.

Finds the registered @app.stream handler by name, creates a Stream, pushes the provided items, optionally signals shutdown, and runs the handler directly (bypassing adapter lifecycle).

Parameters:

Name Type Description Default
name str

Stream handler name as registered with @app.stream.

required
*items Any

Items to push into the stream.

()
shutdown bool

When True (default), call stream.shutdown() after all items are pushed so the handler's async for loop terminates.

True
ctx DeviceContext | None

Optional :class:DeviceContext override. When None (default), a context is constructed from the harness doubles (mqtt, settings, clock, shutdown_event) so handlers can call ctx.publish_state etc. and assertions use harness.mqtt.published.

None
store Store | None

Optional :class:Store backend for persistence. When None, falls back to app._store if configured. The harness creates a :class:DeviceStore keyed by name, loads it before the handler runs, and saves it afterward.

None
providers dict[type, Any] | None

Extra DI providers merged into the provider map with the highest priority (override everything else).

None
adapters dict[type, object] | None

Concrete adapter instances injected by their concrete type into both the DI provider map and the :class:DeviceContext adapters dict. Allows stream handlers to access adapters for non-lifecycle operations without running the hardware lifecycle (open/start_scan).

None
Note

When ctx is supplied it replaces the entire :class:DeviceContext — harness doubles (mqtt, clock, shutdown_event) are not merged in. adapters are added to the DI providers map but not injected into the explicitly supplied ctx. If you need both a custom context and adapter injection, build the context with the adapters you need and pass both ctx and adapters.

Raises:

Type Description
ValueError

If no stream handler with name is registered.

TypeError

If a required dependency (e.g. :class:DeviceStore) cannot be resolved from the provider map.

Source code in packages/src/cosalette/testing/_harness.py
async def inject_stream(
    self,
    name: str,
    *items: Any,
    shutdown: bool = True,
    ctx: DeviceContext | None = None,
    store: Store | None = None,
    providers: dict[type, Any] | None = None,
    adapters: dict[type, object] | None = None,
) -> None:
    """Push items into a named stream handler for testing.

    Finds the registered @app.stream handler by name, creates a Stream,
    pushes the provided items, optionally signals shutdown, and runs the
    handler directly (bypassing adapter lifecycle).

    Args:
        name: Stream handler name as registered with @app.stream.
        *items: Items to push into the stream.
        shutdown: When True (default), call stream.shutdown() after all
            items are pushed so the handler's async for loop terminates.
        ctx: Optional :class:`DeviceContext` override.  When ``None``
            (default), a context is constructed from the harness doubles
            (mqtt, settings, clock, shutdown_event) so handlers can call
            ``ctx.publish_state`` etc. and assertions use
            ``harness.mqtt.published``.
        store: Optional :class:`Store` backend for persistence.  When
            ``None``, falls back to ``app._store`` if configured.  The
            harness creates a :class:`DeviceStore` keyed by *name*,
            loads it before the handler runs, and saves it afterward.
        providers: Extra DI providers merged into the provider map with
            the highest priority (override everything else).
        adapters: Concrete adapter instances injected by their concrete
            type into both the DI provider map and the
            :class:`DeviceContext` adapters dict.  Allows stream handlers
            to access adapters for non-lifecycle operations without
            running the hardware lifecycle (open/start_scan).

    Note:
        When *ctx* is supplied it replaces the entire :class:`DeviceContext`
        — harness doubles (mqtt, clock, shutdown_event) are not merged in.
        *adapters* are added to the DI providers map but **not** injected
        into the explicitly supplied *ctx*.  If you need both a custom
        context and adapter injection, build the context with the adapters
        you need and pass both *ctx* and *adapters*.

    Raises:
        ValueError: If no stream handler with *name* is registered.
        TypeError: If a required dependency (e.g. :class:`DeviceStore`)
            cannot be resolved from the provider map.
    """
    try:
        reg = next(r for r in self.app._streams if r.name == name)
    except StopIteration:
        msg = f"No stream handler named '{name}' found"
        raise ValueError(msg) from None

    stream: Stream[Any] = Stream()
    for item in items:
        stream.put(item)

    if shutdown:
        asyncio.create_task(
            _stream_auto_shutdown(stream), name=f"inject-shutdown:{name}"
        )

    resolved_adapters: dict[type, object] = dict(adapters) if adapters else {}
    ctx = self._make_stream_ctx(name, reg, resolved_adapters, ctx)
    device_store = await self._make_device_store(name, store, providers)
    base_providers = self._build_inject_providers(
        name, ctx, resolved_adapters, device_store, providers
    )

    try:
        await _run_stream_handler(reg, stream, base_providers, self.app._reactors)
    finally:
        # Retrieve the DeviceStore from final providers — _make_device_store
        # returns None when a pre-supplied store was passed via providers,
        # but in that case base_providers.get(DeviceStore) still returns it.
        await async_save_store_on_shutdown(base_providers.get(DeviceStore), name)

override_state

override_state(state_type: type, instance: Any) -> None

Override a @app.state factory with a pre-built test double.

Bypasses the factory entirely; instance is injected directly into the DI container at bootstrap. Call before :meth:run.

Parameters:

Name Type Description Default
state_type type

The type returned by the factory (the DI key).

required
instance Any

The test double to inject.

required

Raises:

Type Description
TypeError

If instance is not an instance of state_type.

Source code in packages/src/cosalette/testing/_harness.py
def override_state(self, state_type: type, instance: Any) -> None:
    """Override a @app.state factory with a pre-built test double.

    Bypasses the factory entirely; *instance* is injected directly
    into the DI container at bootstrap.  Call before :meth:`run`.

    Args:
        state_type: The type returned by the factory (the DI key).
        instance: The test double to inject.

    Raises:
        TypeError: If *instance* is not an instance of *state_type*.
    """
    if not isinstance(instance, state_type):
        raise TypeError(
            f"override_state: expected an instance of {state_type.__name__!r}, "
            f"got {type(instance).__name__!r}"
        )
    self.app._state_overrides[state_type] = instance

tick_periodic async

tick_periodic(name: str) -> None

Invoke one cycle of the named periodic handler (bypasses interval).

Directly calls the handler's function with injected arguments — skips the asyncio sleep so you can test the handler logic without waiting for the interval.

Parameters:

Name Type Description Default
name str

The periodic task name as registered with @app.periodic.

required

Raises:

Type Description
ValueError

if no periodic task with name exists.

Source code in packages/src/cosalette/testing/_harness.py
async def tick_periodic(self, name: str) -> None:
    """Invoke one cycle of the named periodic handler (bypasses interval).

    Directly calls the handler's function with injected arguments —
    skips the asyncio sleep so you can test the handler logic
    without waiting for the interval.

    Args:
        name: The periodic task name as registered with ``@app.periodic``.

    Raises:
        ValueError: if no periodic task with *name* exists.
    """
    from cosalette._injection import resolve_request_kwargs

    try:
        reg = next(r for r in self.app._periodic if r.name == name)
    except StopIteration:
        msg = f"No periodic task named '{name}' found"
        raise ValueError(msg) from None

    # Build a provider map matching production _build_periodic_providers:
    # settings under every Settings base class, clock, logger, state overrides
    providers: dict[type, Any] = {}
    settings = self.settings
    for cls in type(settings).__mro__:
        if isinstance(cls, type) and issubclass(cls, Settings):
            providers[cls] = settings
    providers[ClockPort] = self.clock
    providers[logging.Logger] = logging.getLogger(f"cosalette.periodic.{name}")
    providers.update(self.app._state_overrides)
    kwargs = resolve_request_kwargs(reg.injection_plan, providers)
    await reg.func(**kwargs)

published

published() -> list[tuple[str, str, bool, int]]

Return a snapshot of all MQTT messages published so far.

Returns:

Type Description
list[tuple[str, str, bool, int]]

Snapshot list of (topic, payload, retain, qos) tuples. This

list[tuple[str, str, bool, int]]

is a copy — mutating the returned list does not affect the

list[tuple[str, str, bool, int]]

class:MockMqttClient internal state.

Source code in packages/src/cosalette/testing/_harness.py
def published(self) -> list[tuple[str, str, bool, int]]:
    """Return a snapshot of all MQTT messages published so far.

    Returns:
        Snapshot list of ``(topic, payload, retain, qos)`` tuples. This
        is a copy — mutating the returned list does not affect the
        :class:`MockMqttClient` internal state.
    """
    return list(self.mqtt.published)

messages_for

messages_for(topic: str) -> list[tuple[str, bool, int]]

Return all messages published to topic.

Parameters:

Name Type Description Default
topic str

MQTT topic filter (exact match only).

required

Returns:

Type Description
list[tuple[str, bool, int]]

List of (payload, retain, qos) tuples for the given topic.

Source code in packages/src/cosalette/testing/_harness.py
def messages_for(self, topic: str) -> list[tuple[str, bool, int]]:
    """Return all messages published to *topic*.

    Args:
        topic: MQTT topic filter (exact match only).

    Returns:
        List of ``(payload, retain, qos)`` tuples for the given *topic*.
    """
    return self.mqtt.get_messages_for(topic)

last_published

last_published() -> tuple[str, str, bool, int] | None

Return the most recent MQTT publish, or None if no publishes.

Returns:

Type Description
tuple[str, str, bool, int] | None

(topic, payload, retain, qos) tuple or None.

Source code in packages/src/cosalette/testing/_harness.py
def last_published(self) -> tuple[str, str, bool, int] | None:
    """Return the most recent MQTT publish, or ``None`` if no publishes.

    Returns:
        ``(topic, payload, retain, qos)`` tuple or ``None``.
    """
    return self.mqtt.published[-1] if self.mqtt.published else None

assert_published

assert_published(
    topic: str,
    *,
    contains: str | None = None,
    count: int | None = None,
) -> None

Assert that topic has published messages matching criteria.

Parameters:

Name Type Description Default
topic str

MQTT topic to check (exact match).

required
contains str | None

Optional substring that must appear in at least one payload for topic.

None
count int | None

Optional exact number of messages that must have been published to topic.

None

Raises:

Type Description
AssertionError

If no messages for topic, or if contains is not found in any payload, or if message count doesn't match count.

Source code in packages/src/cosalette/testing/_harness.py
def assert_published(
    self,
    topic: str,
    *,
    contains: str | None = None,
    count: int | None = None,
) -> None:
    """Assert that *topic* has published messages matching criteria.

    Args:
        topic: MQTT topic to check (exact match).
        contains: Optional substring that must appear in at least one
            payload for *topic*.
        count: Optional exact number of messages that must have been
            published to *topic*.

    Raises:
        AssertionError: If no messages for *topic*, or if *contains* is
            not found in any payload, or if message count doesn't match
            *count*.
    """
    messages = self.messages_for(topic)
    if not messages:
        raise AssertionError(f"No messages published to {topic!r}")
    if count is not None and len(messages) != count:
        raise AssertionError(
            f"Expected {count} message(s) to {topic!r}, got {len(messages)}"
        )
    if contains is not None and not any(
        contains in payload for payload, _, _ in messages
    ):
        raise AssertionError(f"No message to {topic!r} contains {contains!r}")

inject_command async

inject_command(
    device: str | None,
    payload: str,
    *,
    topic: str | None = None,
) -> None

Simulate an inbound MQTT command to device.

Delivers a message to {topic_prefix}/{device}/set (or {topic_prefix}/set for root commands) via the :class:MockMqttClient, triggering registered command callbacks.

This is an MQTT-delivery helper — the app must be running and callbacks must be registered for the command to be processed.

Parameters:

Name Type Description Default
device str | None

Device name as registered with @app.command, or None for root commands (matching @app.command(None) registration semantics). None constructs the topic as {prefix}/set; any non-empty string constructs {prefix}/{device}/set.

required
payload str

MQTT payload string.

required
topic str | None

Optional explicit topic override. When None (default), the topic is constructed from device.

None
See Also

:meth:call_command for direct command handler invocation without requiring the app to be running.

Source code in packages/src/cosalette/testing/_harness.py
async def inject_command(
    self, device: str | None, payload: str, *, topic: str | None = None
) -> None:
    """Simulate an inbound MQTT command to *device*.

    Delivers a message to ``{topic_prefix}/{device}/set`` (or
    ``{topic_prefix}/set`` for root commands) via the
    :class:`MockMqttClient`, triggering registered command callbacks.

    This is an MQTT-delivery helper — the app must be running and
    callbacks must be registered for the command to be processed.

    Args:
        device: Device name as registered with ``@app.command``, or
            ``None`` for root commands (matching ``@app.command(None)``
            registration semantics). ``None`` constructs the topic as
            ``{prefix}/set``; any non-empty string constructs
            ``{prefix}/{device}/set``.
        payload: MQTT payload string.
        topic: Optional explicit topic override. When ``None`` (default),
            the topic is constructed from *device*.

    See Also:
        :meth:`call_command` for direct command handler invocation without
        requiring the app to be running.
    """
    if topic is None:
        topic_prefix = self.settings.mqtt.topic_prefix or self.app._name
        topic = f"{topic_prefix}/{device}/set" if device else f"{topic_prefix}/set"
    await self.mqtt.deliver(topic, payload)

call_command async

call_command(
    name: str,
    payload: str | dict[str, object],
    *,
    topic: str | None = None,
) -> None

Directly invoke a registered @app.command handler.

Resolves the handler by name, injects dependencies, calls it with the deserialized payload, and publishes any returned state to harness.mqtt — mirroring production execution without requiring the app to be running.

Supports production request binding including typed Pydantic payloads (Annotated[Model, Payload()]), payload/topic/message parameters, DeviceContext, and simple DI providers available to CommandRunner. Does NOT run adapter lifecycle, state factory lifecycle, or reactors.

Parameters:

Name Type Description Default
name str

Command handler name as registered with @app.command. Supports router-prefixed names like "router/sub". For root commands registered with @app.command(None), pass the function name.

required
payload str | dict[str, object]

MQTT payload — either a JSON string or a dict that will be serialized to JSON.

required
topic str | None

Optional MQTT topic string. When None (default), constructs {prefix}/{name}/set or {prefix}/set for root commands.

None

Raises:

Type Description
ValueError

If no command handler with name is registered.

Exception

Any exception raised by the handler is propagated.

Note

For tests requiring adapter lifecycle, state factory lifecycle, or reactor dispatch, use :meth:inject_command with the app running. init= command callbacks are NOT run; handlers that cache init results will receive None for those dependencies. Reactor dispatch is disabled; if the handler triggers side-effects via reactors, use :meth:inject_command with the app running instead.

See Also

:meth:inject_command for MQTT-delivery simulation requiring the app to be running.

Source code in packages/src/cosalette/testing/_harness.py
async def call_command(
    self,
    name: str,
    payload: str | dict[str, object],
    *,
    topic: str | None = None,
) -> None:
    """Directly invoke a registered ``@app.command`` handler.

    Resolves the handler by *name*, injects dependencies, calls it with
    the deserialized *payload*, and publishes any returned state to
    ``harness.mqtt`` — mirroring production execution without requiring
    the app to be running.

    Supports production request binding including typed Pydantic payloads
    (``Annotated[Model, Payload()]``), ``payload``/``topic``/``message``
    parameters, ``DeviceContext``, and simple DI providers available to
    ``CommandRunner``. Does NOT run adapter lifecycle, state factory
    lifecycle, or reactors.

    Args:
        name: Command handler name as registered with ``@app.command``.
            Supports router-prefixed names like ``"router/sub"``. For
            root commands registered with ``@app.command(None)``, pass
            the function name.
        payload: MQTT payload — either a JSON string or a dict that will
            be serialized to JSON.
        topic: Optional MQTT topic string. When ``None`` (default),
            constructs ``{prefix}/{name}/set`` or ``{prefix}/set`` for
            root commands.

    Raises:
        ValueError: If no command handler with *name* is registered.
        Exception: Any exception raised by the handler is propagated.

    Note:
        For tests requiring adapter lifecycle, state factory lifecycle,
        or reactor dispatch, use :meth:`inject_command` with the app
        running. ``init=`` command callbacks are NOT run; handlers that
        cache ``init`` results will receive ``None`` for those
        dependencies. Reactor dispatch is disabled; if the handler
        triggers side-effects via reactors, use :meth:`inject_command`
        with the app running instead.

    See Also:
        :meth:`inject_command` for MQTT-delivery simulation requiring the
        app to be running.
    """
    topic_prefix = self.settings.mqtt.topic_prefix or self.app._name

    # Find the command registration
    try:
        reg = next(r for r in self.app._commands if r.name == name)
    except StopIteration:
        msg = f"No command handler named '{name}' found"
        raise ValueError(msg) from None

    # Construct topic if not provided
    if topic is None:
        if reg.is_root:
            topic = f"{topic_prefix}/set"
        else:
            topic = f"{topic_prefix}/{name}/set"

    # Serialize payload using the project's JSON backend (orjson) for
    # consistency with production encoding behaviour.
    payload_str = _json_dumps(payload) if isinstance(payload, dict) else payload

    # Build DeviceContext for command execution
    ctx = DeviceContext(
        name=name,
        settings=self.settings,
        mqtt=self.mqtt,
        topic_prefix=topic_prefix,
        shutdown_event=self.shutdown_event,
        adapters={},
        clock=self.clock,
        is_root=reg.is_root,
    )

    # Create CommandRunner and execute (reactors=None skips reactor dispatch)
    cmd_runner = CommandRunner(store=self.app._store)
    error_publisher = ErrorPublisher(
        mqtt=self.mqtt,
        topic_prefix=topic_prefix,
    )

    await cmd_runner.run_command(
        reg=reg,
        ctx=ctx,
        topic=topic,
        payload=payload_str,
        error_publisher=error_publisher,
        reactors=None,
    )

advance_time async

advance_time(seconds: float) -> None

Advance test clock by seconds, yielding to event loop.

Convenience wrapper over await harness.clock.sleep(seconds).

Parameters:

Name Type Description Default
seconds float

Time delta to advance.

required
Source code in packages/src/cosalette/testing/_harness.py
async def advance_time(self, seconds: float) -> None:
    """Advance test clock by *seconds*, yielding to event loop.

    Convenience wrapper over ``await harness.clock.sleep(seconds)``.

    Args:
        seconds: Time delta to advance.
    """
    await self.clock.sleep(seconds)

run_stream async

run_stream(
    func: Any,
    adapters: dict[type, Any],
    *,
    shutdown: Event | None = None,
) -> None

Run a stream handler's full lifecycle (open → scan → close).

Constructs a minimal :class:_StreamRegistration from func, then calls :func:run_stream with the provided adapters. Useful for testing stream handler behaviour without wiring a full app.

Parameters:

Name Type Description Default
func Any

The async-generator stream handler to run.

required
adapters dict[type, Any]

Resolved adapter map keyed by port type (e.g. {StreamablePort[Item]: my_port_instance}).

required
shutdown Event | None

Optional :class:asyncio.Event to trigger graceful shutdown. Defaults to :attr:shutdown_event.

None
Source code in packages/src/cosalette/testing/_harness.py
async def run_stream(
    self,
    func: Any,
    adapters: dict[type, Any],
    *,
    shutdown: asyncio.Event | None = None,
) -> None:
    """Run a stream handler's full lifecycle (open → scan → close).

    Constructs a minimal :class:`_StreamRegistration` from *func*, then
    calls :func:`run_stream` with the provided *adapters*.  Useful for
    testing stream handler behaviour without wiring a full app.

    Args:
        func: The async-generator stream handler to run.
        adapters: Resolved adapter map keyed by port type
            (e.g. ``{StreamablePort[Item]: my_port_instance}``).
        shutdown: Optional :class:`asyncio.Event` to trigger graceful
            shutdown.  Defaults to :attr:`shutdown_event`.
    """
    from cosalette._injection import build_injection_plan
    from cosalette._registration import _StreamRegistration
    from cosalette._runners._stream_runner import run_stream as _run_stream

    plan = build_injection_plan(func)
    reg = _StreamRegistration(
        name="test_stream",
        func=func,
        injection_plan=plan,
        enabled_spec=True,
        summary=None,
        behavior=None,
        effects=None,
    )
    await _run_stream(
        reg, adapters, {}, shutdown if shutdown is not None else self.shutdown_event
    )

Clock

cosalette.testing.FakeClock dataclass

FakeClock(_time: float = 0.0)

Test double for ClockPort.

Attributes:

Name Type Description
_time float

The current "now" value returned by now(). Set directly or via the constructor to control time in tests.

Example::

clock = FakeClock(42.0)
assert clock.now() == 42.0
clock._time = 99.0
assert clock.now() == 99.0

now

now() -> float

Return the manually set time value.

Source code in packages/src/cosalette/testing/_clock.py
def now(self) -> float:
    """Return the manually set time value."""
    return self._time

sleep async

sleep(seconds: float) -> None

Advance virtual time by seconds with no real delay.

Allows tests to exercise sleep-dependent code paths without wall-clock waiting. The asyncio.sleep(0) yields to the event loop so concurrent tasks interleave correctly.

Source code in packages/src/cosalette/testing/_clock.py
async def sleep(self, seconds: float) -> None:
    """Advance virtual time by *seconds* with no real delay.

    Allows tests to exercise sleep-dependent code paths
    without wall-clock waiting.  The ``asyncio.sleep(0)``
    yields to the event loop so concurrent tasks interleave
    correctly.
    """
    await asyncio.sleep(0)
    if seconds > 0:
        self._time += seconds

MQTT Test Doubles

cosalette.testing.MockMqttClient dataclass

MockMqttClient(
    published: list[tuple[str, str, bool, int]] = list(),
    subscriptions: list[str] = list(),
    raise_on_publish: Exception | None = None,
)

In-memory test double that records MQTT interactions.

Records publishes and subscriptions for assertion. Supports callback registration and simulated message delivery via deliver().

publish_count property

publish_count: int

Number of recorded publishes.

subscribe_count property

subscribe_count: int

Number of recorded subscriptions.

publish async

publish(
    topic: str,
    payload: str | dict[str, Any],
    *,
    retain: bool = False,
    qos: int = 1,
) -> None

Record a publish call, or raise if raise_on_publish is set.

Source code in packages/src/cosalette/_mqtt/__init__.py
async def publish(
    self,
    topic: str,
    payload: str | dict[str, Any],
    *,
    retain: bool = False,
    qos: int = 1,
) -> None:
    """Record a publish call, or raise if ``raise_on_publish`` is set."""
    if self.raise_on_publish is not None:
        raise self.raise_on_publish
    if isinstance(payload, dict):
        from cosalette._json import dumps

        payload = dumps(payload)
    self.published.append((topic, payload, retain, qos))

subscribe async

subscribe(topic: str) -> None

Record a subscribe call.

Source code in packages/src/cosalette/_mqtt/__init__.py
async def subscribe(self, topic: str) -> None:
    """Record a subscribe call."""
    self.subscriptions.append(topic)

on_message

on_message(callback: MessageCallback) -> None

Register an inbound-message callback.

Source code in packages/src/cosalette/_mqtt/__init__.py
def on_message(self, callback: MessageCallback) -> None:
    """Register an inbound-message callback."""
    self._callbacks.append(callback)

deliver async

deliver(topic: str, payload: str) -> None

Simulate an inbound message by invoking all callbacks.

Source code in packages/src/cosalette/_mqtt/__init__.py
async def deliver(self, topic: str, payload: str) -> None:
    """Simulate an inbound message by invoking all callbacks."""
    for cb in self._callbacks:
        await cb(topic, payload)

reset

reset() -> None

Clear all recorded data, callbacks, and failure injection.

Source code in packages/src/cosalette/_mqtt/__init__.py
def reset(self) -> None:
    """Clear all recorded data, callbacks, and failure injection."""
    self.published.clear()
    self.subscriptions.clear()
    self._callbacks.clear()
    self.raise_on_publish = None

get_messages_for

get_messages_for(topic: str) -> list[tuple[str, bool, int]]

Return (payload, retain, qos) tuples for topic.

Source code in packages/src/cosalette/_mqtt/__init__.py
def get_messages_for(
    self,
    topic: str,
) -> list[tuple[str, bool, int]]:
    """Return ``(payload, retain, qos)`` tuples for *topic*."""
    return [
        (payload, retain, qos)
        for t, payload, retain, qos in self.published
        if t == topic
    ]

cosalette.testing.NullMqttClient dataclass

NullMqttClient()

Silent no-op MQTT adapter.

Every method is a no-op that logs at DEBUG level. Useful as a default when MQTT is not configured.

publish async

publish(
    topic: str,
    payload: str | dict[str, Any],
    *,
    retain: bool = False,
    qos: int = 1,
) -> None

Silently discard a publish request.

Source code in packages/src/cosalette/_mqtt/__init__.py
async def publish(
    self,
    topic: str,
    payload: str | dict[str, Any],  # noqa: ARG002
    *,
    retain: bool = False,  # noqa: ARG002
    qos: int = 1,  # noqa: ARG002
) -> None:
    """Silently discard a publish request."""
    logger.debug("NullMqttClient.publish(%s) — discarded", topic)

subscribe async

subscribe(topic: str) -> None

Silently discard a subscribe request.

Source code in packages/src/cosalette/_mqtt/__init__.py
async def subscribe(self, topic: str) -> None:
    """Silently discard a subscribe request."""
    logger.debug("NullMqttClient.subscribe(%s) — discarded", topic)

Settings Factory

cosalette.testing.make_settings

make_settings(**overrides: Any) -> Settings

Create a Settings instance with sensible test defaults.

Instantiates an :class:_IsolatedSettings subclass whose only configuration source is init_settings. This means the factory ignores os.environ, .env files, and secret directories — tests see only model defaults plus any explicit overrides.

Parameters:

Name Type Description Default
**overrides Any

Keyword arguments forwarded to the Settings constructor. Any field not provided falls back to the model defaults (e.g. mqtt.host="localhost").

{}

Returns:

Type Description
Settings

A fully initialised :class:Settings ready for test use.

Example::

settings = make_settings()
assert settings.mqtt.host == "localhost"

from cosalette._settings import MqttSettings
custom = make_settings(mqtt=MqttSettings(host="broker.test"))
assert custom.mqtt.host == "broker.test"
Source code in packages/src/cosalette/testing/_settings.py
def make_settings(**overrides: Any) -> Settings:
    """Create a ``Settings`` instance with sensible test defaults.

    Instantiates an :class:`_IsolatedSettings` subclass whose only
    configuration source is ``init_settings``.  This means the
    factory ignores ``os.environ``, ``.env`` files, and secret
    directories — tests see only model defaults plus any explicit
    *overrides*.

    Parameters:
        **overrides: Keyword arguments forwarded to the ``Settings``
            constructor.  Any field not provided falls back to the
            model defaults (e.g. ``mqtt.host="localhost"``).

    Returns:
        A fully initialised :class:`Settings` ready for test use.

    Example::

        settings = make_settings()
        assert settings.mqtt.host == "localhost"

        from cosalette._settings import MqttSettings
        custom = make_settings(mqtt=MqttSettings(host="broker.test"))
        assert custom.mqtt.host == "broker.test"
    """
    # _env_file is a valid pydantic-settings runtime kwarg that disables
    # dotenv loading, but it isn't reflected in the generated __init__
    # signature — hence the type: ignore.
    return _IsolatedSettings(_env_file=None, **overrides)  # ty: ignore[unknown-argument]

Pytest Fixtures

The cosalette.testing package registers a pytest plugin via the pytest11 entry point. The fixtures below are available automatically when cosalette is installed:

Fixture Type Description
mock_mqtt MockMqttClient In-memory MQTT client for capturing published messages
fake_clock FakeClock Deterministic clock starting at 0.0
device_context DeviceContext Pre-wired context with mock_mqtt and fake_clock

All fixtures are function-scoped. Import them by name — no explicit import needed.

MemoryStore

cosalette.MemoryStore

MemoryStore(
    initial: dict[str, dict[str, object]] | None = None,
)

In-memory store backed by a plain dict.

Both load and save deep-copy data so that callers cannot mutate internal state by accident. Designed for tests — mirrors the FakeStorage pattern from gas2mqtt.

Parameters

initial: Optional seed data. The mapping is deep-copied on construction.

Source code in packages/src/cosalette/_persistence/_stores.py
def __init__(
    self,
    initial: dict[str, dict[str, object]] | None = None,
) -> None:
    self._data: dict[str, dict[str, object]] = (
        copy.deepcopy(initial) if initial else {}
    )

load

load(key: str) -> dict[str, object] | None

Return a deep copy of the stored dict, or None.

Source code in packages/src/cosalette/_persistence/_stores.py
def load(self, key: str) -> dict[str, object] | None:
    """Return a deep copy of the stored dict, or ``None``."""
    value = self._data.get(key)
    if value is None:
        return None
    return copy.deepcopy(value)

save

save(key: str, data: dict[str, object]) -> None

Store a deep copy of data.

Source code in packages/src/cosalette/_persistence/_stores.py
def save(self, key: str, data: dict[str, object]) -> None:
    """Store a deep copy of *data*."""
    self._data[key] = copy.deepcopy(data)

MemoryStore is the recommended test double for persistence. It stores data in an in-memory dictionary, avoiding filesystem access in tests.

from cosalette import MemoryStore
from cosalette.testing import AppHarness

backend = MemoryStore()
harness = AppHarness.create(store=backend)

# Pre-seed data
backend.save("sensor", {"count": 99})

# After test, inspect stored data
assert backend.load("sensor") == {"count": 99}