Skip to content

Devices

cosalette device handlers — the bridge between domain logic and the MQTT framework.

Gas Counter

gas2mqtt.devices.gas_counter

Gas counter device — stateful trigger detection and counting.

Uses an @app.device handler with a manual polling loop: 1. Reads Bz from the magnetometer at poll_interval 2. Feeds Bz into a SchmittTrigger 3. On rising edge: increments counter, optionally tracks consumption 4. Publishes state on every trigger event (not every poll) 5. Accepts inbound commands to set consumption value

State persistence

When state_file is configured, counter and consumption values are saved after every state-publishing event and on shutdown. On startup, saved state is restored so values survive restarts. The trigger state is transient and not persisted.

MQTT state payload

{"counter": 42, "trigger": "CLOSED"} or with consumption tracking:

MQTT command payload (on gas2mqtt/gas_counter/set):

COUNTER_MODULUS module-attribute

COUNTER_MODULUS = 65536

Modulus for the tick counter (wraps at 2^16).

gas_counter async

gas_counter(ctx: DeviceContext, store: DeviceStore) -> None

Gas counter device — polls magnetometer, detects ticks.

This is a long-running device coroutine intended for registration with @app.device("gas_counter"). It owns its polling loop, manages a SchmittTrigger for edge detection, and optionally tracks cumulative gas consumption.

Parameters:

Name Type Description Default
ctx DeviceContext

Per-device context injected by cosalette. Provides MQTT publishing, shutdown-aware sleep, adapter resolution, and settings access.

required
store DeviceStore

Per-device persistent store injected by cosalette. Already loaded on entry; saved by framework on shutdown.

required
Source code in packages/src/gas2mqtt/devices/gas_counter.py
async def gas_counter(
    ctx: cosalette.DeviceContext,
    store: cosalette.DeviceStore,
) -> None:
    """Gas counter device — polls magnetometer, detects ticks.

    This is a long-running device coroutine intended for registration
    with ``@app.device("gas_counter")``. It owns its polling loop,
    manages a SchmittTrigger for edge detection, and optionally tracks
    cumulative gas consumption.

    Args:
        ctx: Per-device context injected by cosalette. Provides MQTT
            publishing, shutdown-aware sleep, adapter resolution,
            and settings access.
        store: Per-device persistent store injected by cosalette.
            Already loaded on entry; saved by framework on shutdown.
    """
    settings: Gas2MqttSettings = ctx.settings  # type: ignore[assignment]
    magnetometer = ctx.adapter(MagnetometerPort)  # type: ignore[type-abstract]
    logger = logging.getLogger(f"cosalette.{ctx.name}")

    # --- Domain object initialisation ---
    trigger = SchmittTrigger(settings.trigger_level, settings.trigger_hysteresis)

    # --- Restore persisted state ---
    counter = _restore_counter(store, logger)
    consumption = _restore_consumption(store, settings, logger)

    def _build_state() -> dict[str, object]:
        """Build the state payload dict."""
        state: dict[str, object] = {
            "counter": counter,
            "trigger": "CLOSED" if trigger.state is TriggerState.HIGH else "OPEN",
        }
        if consumption is not None:
            state["consumption_m3"] = round(consumption.consumption_m3, 3)
        return state

    @ctx.on_command
    async def handle_command(topic: str, payload: str) -> None:  # noqa: ARG001
        nonlocal consumption
        if consumption is None:
            logger.warning("Consumption command received but tracking is disabled")
            return
        data = json.loads(payload)
        if "consumption_m3" in data:
            consumption.set_consumption(float(data["consumption_m3"]))
            logger.info("Consumption set to %.3f m³", consumption.consumption_m3)
            await ctx.publish_state(_build_state())
            _save_state()

    def _save_state() -> None:
        """Persist current state to the device store."""
        state = _build_state()
        # Remove trigger — it's transient, not worth persisting
        state.pop("trigger", None)
        store.update(state)
        store.save()

    # Publish initial state
    await ctx.publish_state(_build_state())
    _save_state()

    while not ctx.shutdown_requested:
        try:
            counter, should_publish = _process_poll(
                magnetometer,
                trigger,
                counter,
                consumption,
                logger,
            )
        except OSError:
            logger.warning("I2C read error — will retry next poll cycle", exc_info=True)
            await ctx.sleep(settings.poll_interval)
            continue
        if should_publish:
            await ctx.publish_state(_build_state())
            _save_state()
        await ctx.sleep(settings.poll_interval)

Temperature

gas2mqtt.devices.temperature

Temperature telemetry — PT1-filtered, calibrated sensor readings.

Reads the raw temperature from the magnetometer's built-in sensor, applies a linear calibration (temp_scale * raw + temp_offset), then smooths via an exponentially-weighted PT1 (first-order lag) filter.

The OnChange publish strategy ensures readings are only published when the filtered value shifts by more than 0.05 °C, avoiding MQTT chatter for sensor noise.

MQTT state payload

{"temperature": 22.5}

make_pt1

make_pt1(settings: Gas2MqttSettings) -> Pt1Filter

Create PT1 filter from settings for temperature smoothing.

This is the init= factory: cosalette calls it once before the handler loop and injects the returned Pt1Filter by type.

Source code in packages/src/gas2mqtt/devices/temperature.py
def make_pt1(settings: Gas2MqttSettings) -> Pt1Filter:
    """Create PT1 filter from settings for temperature smoothing.

    This is the ``init=`` factory: cosalette calls it once before the
    handler loop and injects the returned ``Pt1Filter`` by type.
    """
    return Pt1Filter(tau=settings.smoothing_tau, dt=settings.temperature_interval)

temperature async

temperature(
    magnetometer: MagnetometerPort,
    settings: Gas2MqttSettings,
    pt1: Pt1Filter,
) -> dict[str, object]

Read temperature, calibrate, filter, and return state dict.

Parameters:

Name Type Description Default
magnetometer MagnetometerPort

Sensor adapter (injected by cosalette DI).

required
settings Gas2MqttSettings

Application settings with calibration coefficients.

required
pt1 Pt1Filter

PT1 filter instance (created by :func:make_pt1).

required

Returns:

Type Description
dict[str, object]

{"temperature": <rounded float>} — published to MQTT by

dict[str, object]

the framework when the OnChange strategy fires.

Source code in packages/src/gas2mqtt/devices/temperature.py
async def temperature(
    magnetometer: MagnetometerPort,
    settings: Gas2MqttSettings,
    pt1: Pt1Filter,
) -> dict[str, object]:
    """Read temperature, calibrate, filter, and return state dict.

    Args:
        magnetometer: Sensor adapter (injected by cosalette DI).
        settings: Application settings with calibration coefficients.
        pt1: PT1 filter instance (created by :func:`make_pt1`).

    Returns:
        ``{"temperature": <rounded float>}`` — published to MQTT by
        the framework when the ``OnChange`` strategy fires.
    """
    reading = magnetometer.read()
    raw_celsius = settings.temp_scale * reading.temperature_raw + settings.temp_offset
    return {"temperature": round(pt1.update(raw_celsius), 1)}

Magnetometer

gas2mqtt.devices.magnetometer

Magnetometer debug telemetry — raw 3-axis magnetic field readings.

Publishes raw Bx, By, Bz values from the QMC5883L sensor at the gas counter's poll interval. Disabled by default — enable via the enable_debug_device setting for calibration and troubleshooting.

MQTT state payload

{"bx": -1234, "by": 567, "bz": -4567}

magnetometer async

magnetometer(
    magnetometer: MagnetometerPort,
) -> dict[str, object]

Read and return raw magnetic field values.

Parameters:

Name Type Description Default
magnetometer MagnetometerPort

Sensor adapter (injected by cosalette DI).

required

Returns:

Type Description
dict[str, object]

{"bx": int, "by": int, "bz": int} — published to MQTT

dict[str, object]

on every poll cycle.

Source code in packages/src/gas2mqtt/devices/magnetometer.py
async def magnetometer(
    magnetometer: MagnetometerPort,
) -> dict[str, object]:
    """Read and return raw magnetic field values.

    Args:
        magnetometer: Sensor adapter (injected by cosalette DI).

    Returns:
        ``{"bx": int, "by": int, "bz": int}`` — published to MQTT
        on every poll cycle.
    """
    reading = magnetometer.read()
    return {"bx": reading.bx, "by": reading.by, "bz": reading.bz}