Skip to content

Build a Complete IoT Bridge

This capstone guide combines everything from the previous guides into a complete, production-ready gas2mqtt application. You'll build a gas meter bridge daemon with telemetry polling, valve commands, hardware abstraction, lifecycle management, custom error types, and a full test suite.

Prerequisites

This guide assumes you've completed the Quickstart and are familiar with the individual guides:

1. Project Structure

gas2mqtt/
├── pyproject.toml
├── .env
├── src/
│   └── gas2mqtt/
│       ├── __init__.py
│       ├── app.py            # App assembly + devices
│       ├── settings.py       # Custom settings
│       ├── ports.py          # Protocol ports
│       ├── adapters.py       # Hardware adapters
│       └── errors.py         # Domain exceptions
└── tests/
    ├── conftest.py
    ├── unit/
    │   ├── test_counter.py
    │   ├── test_valve.py
    │   └── test_errors.py
    └── integration/
        └── test_app.py

Each file has a single responsibility — this keeps the codebase navigable and testable as the project grows.

2. Custom Settings

Define app-specific configuration fields, inheriting MQTT and logging settings from the framework:

src/gas2mqtt/settings.py
"""Configuration for gas2mqtt."""

from __future__ import annotations

from pydantic import Field, SecretStr, field_validator
from pydantic_settings import SettingsConfigDict

import cosalette


class Gas2MqttSettings(cosalette.Settings):
    """Gas meter bridge configuration.

    Environment variables use the ``GAS2MQTT_`` prefix.
    Nested models use ``__`` as delimiter:
    ``GAS2MQTT_MQTT__HOST=broker.local``.
    """

    model_config = SettingsConfigDict(
        env_prefix="GAS2MQTT_",
        env_nested_delimiter="__",
        env_file=".env",
        env_file_encoding="utf-8",
    )

    # Hardware
    serial_port: str = Field(
        default="/dev/ttyUSB0",
        description="Serial port for the gas meter sensor.",
    )
    baud_rate: int = Field(
        default=9600,
        description="Serial baud rate.",
    )

    # Polling
    counter_interval: int = Field(
        default=60,
        ge=1,
        description="Impulse counter polling interval in seconds.",
    )

    @field_validator("serial_port")
    @classmethod
    def serial_port_must_be_device(cls, v: str) -> str:
        """Validate that serial_port looks like a device path."""
        if not v.startswith("/dev/"):
            msg = f"serial_port must be a /dev/ path, got: {v!r}"
            raise ValueError(msg)
        return v

Why subclass Settings?

The base cosalette.Settings includes mqtt and logging sub-models. By subclassing, your app inherits broker connection and logging config for free — you only add the fields unique to gas2mqtt. See Configuration for the full guide.

3. Protocol Port

Define the hardware abstraction as a PEP 544 Protocol:

src/gas2mqtt/ports.py
"""Protocol ports for gas2mqtt hardware abstraction."""

from __future__ import annotations

from typing import Protocol, runtime_checkable


@runtime_checkable
class GasMeterPort(Protocol):
    """Hardware abstraction for gas meter sensors.

    Implementations provide impulse counting and temperature
    reading over the protocol boundary.
    """

    def connect(self, port: str, baud_rate: int = 9600) -> None:
        """Open connection to the sensor."""
        ...

    def read_impulses(self) -> int:
        """Read the current impulse count."""
        ...

    def read_temperature(self) -> float:
        """Read the sensor's temperature reading in Celsius."""
        ...

    def close(self) -> None:
        """Close the hardware connection."""
        ...

The port defines what your code needs. The adapters in the next section define how to provide it.

4. Real Adapter

The production adapter communicates over a serial port:

src/gas2mqtt/adapters.py
"""Hardware adapter implementations for gas2mqtt."""

from __future__ import annotations


class SerialGasMeter:
    """Real gas meter adapter communicating over a serial port.

    Uses ``pyserial`` for UART communication. Imported lazily by
    the framework via ``"gas2mqtt.adapters:SerialGasMeter"`` so
    that ``pyserial`` doesn't need to be installed on dev machines.
    """

    def __init__(self) -> None:
        self._conn = None

    def connect(self, port: str, baud_rate: int = 9600) -> None:
        """Open the serial connection."""
        import serial  # (1)!

        self._conn = serial.Serial(port, baud_rate, timeout=5)

    def read_impulses(self) -> int:
        """Read impulse count from the meter."""
        assert self._conn is not None, "Call connect() first"
        self._conn.write(b"READ_IMPULSES\n")
        response = self._conn.readline().decode().strip()
        return int(response)

    def read_temperature(self) -> float:
        """Read temperature from the meter's built-in sensor."""
        assert self._conn is not None, "Call connect() first"
        self._conn.write(b"READ_TEMP\n")
        response = self._conn.readline().decode().strip()
        return float(response)

    def close(self) -> None:
        """Close the serial connection."""
        if self._conn is not None:
            self._conn.close()
            self._conn = None
  1. pyserial is imported inside the method, not at module level. This is the hexagonal lazy-import pattern (ADR-006) — the module can be imported on machines without pyserial installed.

5. Mock Adapter

A fake implementation for --dry-run mode and testing:

src/gas2mqtt/adapters.py (continued)
class FakeGasMeter:
    """Mock gas meter for dry-run mode and testing.

    Returns incrementing impulse counts and a fixed temperature.
    Requires no hardware or external libraries.
    """

    def __init__(self) -> None:
        self._impulses = 0
        self._connected = False

    def connect(self, port: str, baud_rate: int = 9600) -> None:
        self._connected = True

    def read_impulses(self) -> int:
        self._impulses += 1
        return self._impulses

    def read_temperature(self) -> float:
        return 21.5

    def close(self) -> None:
        self._connected = False

Fake vs Stub vs Mock

FakeGasMeter is a fake — it has working logic (incrementing counter) but no real hardware dependency. Fakes are great for dry-run mode because they produce realistic-looking data. In unit tests, you might use simpler stubs with fixed return values.

6. Telemetry Device

The impulse counter polls the gas meter sensor at a fixed interval:

src/gas2mqtt/app.py (counter device)
@app.telemetry("counter", interval=60)
async def counter(ctx: cosalette.DeviceContext) -> dict[str, object]:
    """Read gas meter impulses and temperature.

    The framework calls this every 60 seconds. The returned dict
    is published as JSON to ``gas2mqtt/counter/state``.
    """
    meter = ctx.adapter(GasMeterPort)

    impulses = meter.read_impulses()
    temperature = meter.read_temperature()

    if impulses < 0:
        raise InvalidReadingError(f"Negative impulse count: {impulses}")

    return {
        "impulses": impulses,
        "temperature_celsius": temperature,
        "unit": "m³",
    }

This is the return-dict contract in action: your function reads the sensor and returns data. The framework handles JSON serialisation, MQTT publication, error catching, and the timing loop.

7. Command Device

The valve device receives open/close commands via MQTT and publishes state. The init= parameter creates a state object once, eliminating the need for module-level globals:

src/gas2mqtt/app.py (valve device)
from dataclasses import dataclass


@dataclass
class ValveState:
    """Tracks valve position across commands."""

    position: str = "closed"


def make_valve_state() -> ValveState:
    return ValveState()


@app.command("valve", init=make_valve_state)  # (1)!
async def handle_valve(
    payload: str, state: ValveState
) -> dict[str, object]:
    """Control the gas valve via MQTT commands.

    Subscribes to ``gas2mqtt/valve/set`` for inbound commands.
    Returns state dict — the framework publishes to
    ``gas2mqtt/valve/state``.
    """
    match payload:
        case "open":
            state.position = "open"
        case "close":
            state.position = "closed"
        case "toggle":
            state.position = (
                "open" if state.position == "closed" else "closed"
            )
        case _:
            raise ValueError(
                f"Unknown command: {payload!r}. Valid: open, close, toggle"
            )

    return {"state": state.position}
  1. init=make_valve_state runs once at startup. The ValveState instance is reused for every command — no global, no nonlocal.

Compare this to the telemetry device above: @app.command handlers are even simpler — they receive a command and return state. No main loop, no closures, no nonlocal. The init= parameter handles state setup cleanly.

8. Lifespan

Initialise the serial connection at startup, close it at shutdown using the lifespan context manager:

src/gas2mqtt/app.py (lifespan)
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager


@asynccontextmanager
async def lifespan(ctx: cosalette.AppContext) -> AsyncIterator[None]:
    """Open serial connection before devices start, close after."""
    meter = ctx.adapter(GasMeterPort)
    settings = ctx.settings
    assert isinstance(settings, Gas2MqttSettings)
    meter.connect(settings.serial_port, settings.baud_rate)
    yield  # (1)!
    meter.close()
  1. Everything before yield runs before devices start. Everything after yield runs after devices stop. The yield is where the application's device phase executes.

AppContext — limited API

The lifespan receives AppContext, which has only .settings and .adapter(). There is NO publish_state(), sleep(), or on_command — those are DeviceContext-only. See Lifespan for details.

9. Custom Error Types

Define domain exceptions and the error type map:

src/gas2mqtt/errors.py
"""Domain exceptions for gas2mqtt."""


class SensorTimeoutError(Exception):
    """Gas meter sensor didn't respond within the timeout period."""


class InvalidReadingError(Exception):
    """Sensor returned a reading outside valid physical bounds."""


class ConnectionLostError(Exception):
    """Serial connection to the gas meter was lost."""


error_type_map: dict[type[Exception], str] = {
    SensorTimeoutError: "sensor_timeout",
    InvalidReadingError: "invalid_reading",
    ConnectionLostError: "connection_lost",
}

When counter raises InvalidReadingError("Negative impulse count: -3"), the framework's error isolation catches it and publishes:

gas2mqtt/counter/error
{
    "error_type": "error",
    "message": "Negative impulse count: -3",
    "device": "counter",
    "timestamp": "2026-02-18T10:30:00+00:00",
    "details": {}
}

The framework uses the generic "error" type for all auto-caught exceptions. To get domain-specific types like "invalid_reading", use build_error_payload() manually — see Custom Error Types for the full guide.

10. App Assembly

Wire everything together in app.py:

src/gas2mqtt/app.py
"""gas2mqtt — Gas meter IoT-to-MQTT bridge.

A complete cosalette application with telemetry polling,
command control, hardware abstraction, and lifespan management.
"""

from __future__ import annotations

from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from dataclasses import dataclass

import cosalette
from cosalette import Every, OnChange

from gas2mqtt.adapters import FakeGasMeter
from gas2mqtt.errors import InvalidReadingError
from gas2mqtt.ports import GasMeterPort
from gas2mqtt.settings import Gas2MqttSettings


# --- Lifespan ---


@asynccontextmanager
async def lifespan(ctx: cosalette.AppContext) -> AsyncIterator[None]:
    """Open serial connection before devices start, close after."""
    meter = ctx.adapter(GasMeterPort)
    settings = ctx.settings
    assert isinstance(settings, Gas2MqttSettings)
    meter.connect(settings.serial_port, settings.baud_rate)
    yield
    meter.close()


# --- App construction ---

app = cosalette.App(
    name="gas2mqtt",
    version="1.0.0",
    settings_class=Gas2MqttSettings,
    lifespan=lifespan,
)

# --- Adapter registration ---

app.adapter(
    GasMeterPort,
    "gas2mqtt.adapters:SerialGasMeter",  # (1)!
    dry_run=FakeGasMeter,  # (2)!
)


# --- Telemetry device ---


@app.telemetry(
    "counter",
    interval=60,
    publish=OnChange(threshold={"impulses": 1}) | Every(seconds=300),  # (3)!
)
async def counter(ctx: cosalette.DeviceContext) -> dict[str, object]:
    """Read gas meter impulses and temperature."""
    meter = ctx.adapter(GasMeterPort)

    impulses = meter.read_impulses()
    temperature = meter.read_temperature()

    if impulses < 0:
        raise InvalidReadingError(f"Negative impulse count: {impulses}")

    return {
        "impulses": impulses,
        "temperature_celsius": temperature,
        "unit": "m³",
    }


# --- Command device ---


@dataclass
class ValveState:
    """Tracks valve position across commands."""

    position: str = "closed"


def make_valve_state() -> ValveState:
    return ValveState()


@app.command("valve", init=make_valve_state)
async def handle_valve(
    payload: str, state: ValveState
) -> dict[str, object]:
    """Control the gas valve via MQTT commands."""
    match payload:
        case "open":
            state.position = "open"
        case "close":
            state.position = "closed"
        case "toggle":
            state.position = (
                "open" if state.position == "closed" else "closed"
            )
        case _:
            raise ValueError(
                f"Unknown command: {payload!r}. "
                f"Valid: open, close, toggle"
            )

    return {"state": state.position}


# --- Entry point ---


app.run()
  1. SerialGasMeter is imported lazily — pyserial doesn't need to be installed on dev machines or in CI. The framework imports it at startup only in production.
  2. FakeGasMeter is used when running gas2mqtt --dry-run. It returns simulated data without any hardware.
  3. OnChange(threshold={"impulses": 1}) suppresses publishes when the impulse count hasn't changed by more than 1. Every(seconds=300) guarantees a heartbeat publish every 5 minutes regardless.

11. Test Suite

Test Configuration

tests/conftest.py
"""Shared pytest configuration for gas2mqtt tests."""

pytest_plugins = ["cosalette.testing._plugin"]

Unit Tests: Counter

tests/unit/test_counter.py
"""Unit tests for the counter telemetry device.

Test Techniques Used:
- Specification-based: Return-dict contract verification.
- Error Guessing: Invalid reading detection.
- Boundary Value Analysis: Edge case at impulses = 0.
"""

from __future__ import annotations

import pytest

from gas2mqtt.adapters import FakeGasMeter
from gas2mqtt.errors import InvalidReadingError
from gas2mqtt.ports import GasMeterPort


class StubGasMeter:
    """Stub with configurable return values."""

    def __init__(self, impulses: int = 42, temperature: float = 21.5) -> None:
        self.impulses = impulses
        self.temperature = temperature

    def connect(self, port: str, baud_rate: int = 9600) -> None:
        pass

    def read_impulses(self) -> int:
        return self.impulses

    def read_temperature(self) -> float:
        return self.temperature

    def close(self) -> None:
        pass


@pytest.mark.asyncio
async def test_counter_returns_impulse_dict(device_context):
    """Counter returns dict with impulses, temperature, and unit."""
    device_context._adapters[GasMeterPort] = StubGasMeter(
        impulses=100, temperature=22.0
    )

    from gas2mqtt.app import counter

    result = await counter(device_context)

    assert result == {
        "impulses": 100,
        "temperature_celsius": 22.0,
        "unit": "m³",
    }


@pytest.mark.asyncio
async def test_counter_rejects_negative_impulses(device_context):
    """Negative impulse count raises InvalidReadingError."""
    device_context._adapters[GasMeterPort] = StubGasMeter(impulses=-1)

    from gas2mqtt.app import counter

    with pytest.raises(InvalidReadingError, match="Negative impulse count"):
        await counter(device_context)


@pytest.mark.asyncio
async def test_counter_accepts_zero_impulses(device_context):
    """Zero is a valid impulse count (boundary value)."""
    device_context._adapters[GasMeterPort] = StubGasMeter(impulses=0)

    from gas2mqtt.app import counter

    result = await counter(device_context)
    assert result["impulses"] == 0

Unit Tests: Valve

tests/unit/test_valve.py
"""Unit tests for the valve command device.

Test Techniques Used:
- Decision Table: Command × expected state.
- Error Guessing: Invalid command handling.

Note: @app.command handlers are plain async functions, which
makes them trivially testable — no device loop, no closures.
The init= state object is created directly in each test.
"""

import pytest

from gas2mqtt.app import ValveState


@pytest.mark.asyncio
async def test_valve_open_command():
    """'open' command returns state dict with 'open'."""
    from gas2mqtt.app import handle_valve

    state = ValveState()
    result = await handle_valve(payload="open", state=state)
    assert result == {"state": "open"}


@pytest.mark.asyncio
async def test_valve_close_command():
    """'close' command returns state dict with 'closed'."""
    from gas2mqtt.app import handle_valve

    state = ValveState()
    result = await handle_valve(payload="close", state=state)
    assert result == {"state": "closed"}


@pytest.mark.asyncio
async def test_valve_toggle_command():
    """'toggle' flips state from closed to open."""
    from gas2mqtt.app import handle_valve

    state = ValveState()
    result = await handle_valve(payload="toggle", state=state)
    assert result == {"state": "open"}


@pytest.mark.asyncio
async def test_valve_rejects_invalid_command():
    """Unknown commands raise ValueError."""
    from gas2mqtt.app import handle_valve

    state = ValveState()
    with pytest.raises(ValueError, match="Unknown command"):
        await handle_valve(payload="blink", state=state)

Because @app.command handlers are standalone functions, unit testing is as simple as calling the function directly with the arguments you want to test. With init=, you create the state object directly in each test — no fixtures or mocking needed.

Unit Tests: Error Types

tests/unit/test_errors.py
"""Unit tests for gas2mqtt error type map.

Test Techniques Used:
- Decision Table: Exception class → error_type string mapping.
"""

from cosalette import build_error_payload
from gas2mqtt.errors import (
    InvalidReadingError,
    SensorTimeoutError,
    error_type_map,
)


def test_sensor_timeout_maps_correctly():
    """SensorTimeoutError → 'sensor_timeout'."""
    payload = build_error_payload(
        SensorTimeoutError("timed out"),
        error_type_map=error_type_map,
        device="counter",
    )
    assert payload.error_type == "sensor_timeout"


def test_invalid_reading_maps_correctly():
    """InvalidReadingError → 'invalid_reading'."""
    payload = build_error_payload(
        InvalidReadingError("bad value"),
        error_type_map=error_type_map,
        device="counter",
    )
    assert payload.error_type == "invalid_reading"


def test_unmapped_exception_falls_back_to_error():
    """Unmapped exceptions get default 'error' type."""
    payload = build_error_payload(
        RuntimeError("unexpected"),
        error_type_map=error_type_map,
    )
    assert payload.error_type == "error"

Integration Test: Full Lifecycle

tests/integration/test_app.py
"""Integration tests for the gas2mqtt application.

Test Techniques Used:
- State Transition Testing: Full app lifecycle.
"""

import asyncio

import pytest
import cosalette
from cosalette.testing import AppHarness

from gas2mqtt.adapters import FakeGasMeter
from gas2mqtt.ports import GasMeterPort


@pytest.mark.asyncio
async def test_full_lifecycle_publishes_telemetry():
    """Full app lifecycle: startup → telemetry → shutdown."""
    # Arrange
    harness = AppHarness.create(name="gas2mqtt")
    harness.app.adapter(GasMeterPort, FakeGasMeter)

    @harness.app.telemetry("counter", interval=1)
    async def counter(ctx: cosalette.DeviceContext) -> dict[str, object]:
        meter = ctx.adapter(GasMeterPort)
        return {"impulses": meter.read_impulses()}

    # Act — auto-shutdown after brief run
    async def shutdown_after_delay():
        await asyncio.sleep(0.1)
        harness.trigger_shutdown()

    asyncio.create_task(shutdown_after_delay())
    await harness.run()

    # Assert
    messages = harness.mqtt.get_messages_for("gas2mqtt/counter/state")
    assert len(messages) >= 1


@pytest.mark.asyncio
async def test_valve_command_publishes_state():
    """Valve command handler publishes state on command."""
    # Arrange
    harness = AppHarness.create(name="gas2mqtt")

    @harness.app.command("valve")
    async def handle_valve(
        payload: str, ctx: cosalette.DeviceContext
    ) -> dict[str, object]:
        return {"state": payload}

    # Act
    async def shutdown_after_delay():
        await asyncio.sleep(0.1)
        harness.trigger_shutdown()

    asyncio.create_task(shutdown_after_delay())
    await harness.run()

    # Assert — command handlers don't publish on startup, so we check
    # that the device registered successfully
    assert "valve" in [c.name for c in harness.app._commands]

12. Running the Application

With a .env File

.env
# MQTT broker
GAS2MQTT_MQTT__HOST=broker.local
GAS2MQTT_MQTT__PORT=1883
GAS2MQTT_MQTT__USERNAME=gas2mqtt
GAS2MQTT_MQTT__PASSWORD=s3cret

# Logging
GAS2MQTT_LOGGING__LEVEL=INFO
GAS2MQTT_LOGGING__FORMAT=json

# App settings
GAS2MQTT_SERIAL_PORT=/dev/ttyUSB0
GAS2MQTT_BAUD_RATE=9600
GAS2MQTT_COUNTER_INTERVAL=60

Production

# Run normally
uv run gas2mqtt

# Override log level
uv run gas2mqtt --log-level DEBUG --log-format text

# Use a custom .env file
uv run gas2mqtt --env-file /etc/gas2mqtt/.env

Dry-Run Mode

# Uses FakeGasMeter instead of SerialGasMeter
uv run gas2mqtt --dry-run

Dry-run mode resolves FakeGasMeter for GasMeterPort, so the app runs without hardware. This is useful for development, CI testing, and demo setups.

Docker Deployment

Dockerfile
FROM python:3.14-slim

WORKDIR /app
COPY pyproject.toml uv.lock ./
RUN pip install uv==0.6.6 && uv sync --frozen

COPY src/ src/
COPY .env .env

CMD ["uv", "run", "gas2mqtt"]
docker-compose.yml
services:
  gas2mqtt:
    build: .
    restart: unless-stopped
    devices:
      - /dev/ttyUSB0:/dev/ttyUSB0  # Pass through serial device
    environment:
      GAS2MQTT_MQTT__HOST: mosquitto
      GAS2MQTT_LOGGING__FORMAT: json
    depends_on:
      - mosquitto

  mosquitto:
    image: eclipse-mosquitto:2
    ports:
      - "1883:1883"
    volumes:
      - mosquitto-data:/mosquitto/data
      - ./mosquitto.conf:/mosquitto/config/mosquitto.conf

volumes:
  mosquitto-data:

Production checklist

  • Use json log format for container log aggregators
  • Set QoS 1 (default) for at-least-once delivery
  • Configure MQTT authentication
  • Mount the serial device into the container
  • Set restart: unless-stopped for daemon resilience
  • Monitor the gas2mqtt/status topic for LWT availability

Summary

Here's what each piece does and how they connect:

┌────────────────────────────────────────────────┐
│                  gas2mqtt App                  │
├───────────────┬────────────────────────────────┤
│ Settings      │ Gas2MqttSettings               │
│               │   serial_port, baud_rate, etc. │
├───────────────┼────────────────────────────────┤
│ Port          │ GasMeterPort (Protocol)        │
├───────────────┼────────────────────────────────┤
│ Adapters      │ SerialGasMeter (real)          │
│               │ FakeGasMeter   (dry-run)       │
├───────────────┼────────────────────────────────┤
│ Lifespan      │ lifespan (asynccontextmanager) │
├───────────────┼────────────────────────────────┤
│ Devices       │ counter  (telemetry, 60s)      │
│               │ valve    (command, open/close) │
├───────────────┼────────────────────────────────┤
│ Error Types   │ SensorTimeoutError             │
│               │ InvalidReadingError            │
│               │ ConnectionLostError            │
├───────────────┼────────────────────────────────┤
│ MQTT Topics   │ gas2mqtt/counter/state         │
│               │ gas2mqtt/valve/state           │
│               │ gas2mqtt/valve/set             │
│               │ gas2mqtt/error                 │
│               │ gas2mqtt/{device}/error        │
│               │ gas2mqtt/status                │
└───────────────┴────────────────────────────────┘

See Also