Skip to content

Adapters

Hardware adapter implementations for the MagnetometerPort protocol.

QMC5883L (Production)

gas2mqtt.adapters.qmc5883l

QMC5883L magnetometer adapter — production I2C implementation.

Reads from the QMC5883L 3-axis digital magnetometer via the I2C bus using smbus2. The QMC5883L is a multi-chip module containing a 3-axis magnetic sensor with on-die temperature compensation.

I2C address: 0x0D (default for QMC5883L) Data register: 0x00 (9 bytes) Bytes 0-1: X axis (little-endian signed 16-bit) Bytes 2-3: Y axis (little-endian signed 16-bit) Bytes 4-5: Z axis (little-endian signed 16-bit) Byte 6: Status register Bytes 7-8: Temperature (little-endian signed 16-bit)

Configuration

Register 0x09: Control Register 1 Register 0x0B: SET/RESET Period Register

Note: Despite some legacy references to "HMC5883", the actual hardware is a QMC5883L — a lower-cost successor with a different register map.

Qmc5883lAdapter

Qmc5883lAdapter(settings: Gas2MqttSettings)

Production adapter for the QMC5883L magnetometer over I2C.

Implements MagnetometerPort protocol. Reads magnetic field (X, Y, Z) and temperature from the QMC5883L via smbus2.

The QMC5883L data output register layout (starting at 0x00): Bytes 0-1: X axis (little-endian signed 16) Bytes 2-3: Y axis (little-endian signed 16) Bytes 4-5: Z axis (little-endian signed 16) Byte 6: Status register Bytes 7-8: Temperature (little-endian signed 16)

Source code in packages/src/gas2mqtt/adapters/qmc5883l.py
def __init__(self, settings: Gas2MqttSettings) -> None:
    self._bus_number = settings.i2c_bus
    self._address = settings.i2c_address
    self._bus: smbus2.SMBus | None = None

initialize

initialize() -> None

Open I2C bus and configure the QMC5883L.

Control Register 1 (0x09): 0b11010001 - Continuous measurement mode (bits 0-1: 01) - Output data rate: 10 Hz (bits 2-3: 00) - Full scale range: 8 Gauss (bits 4-5: 01) - Over sample ratio: 64 (bits 6-7: 11)

SET/RESET Period Register (0x0B): 0b00000001 - Recommended value per datasheet.

Source code in packages/src/gas2mqtt/adapters/qmc5883l.py
def initialize(self) -> None:
    """Open I2C bus and configure the QMC5883L.

    Control Register 1 (0x09): 0b11010001
        - Continuous measurement mode (bits 0-1: 01)
        - Output data rate: 10 Hz (bits 2-3: 00)
        - Full scale range: 8 Gauss (bits 4-5: 01)
        - Over sample ratio: 64 (bits 6-7: 11)

    SET/RESET Period Register (0x0B): 0b00000001
        - Recommended value per datasheet.
    """
    self._bus = smbus2.SMBus(self._bus_number)
    self._bus.write_byte_data(self._address, 0x09, 0b11010001)
    self._bus.write_byte_data(self._address, 0x0B, 0b00000001)
    logger.info(
        "QMC5883L initialized on bus %d at address 0x%02X",
        self._bus_number,
        self._address,
    )

read

read() -> MagneticReading

Read magnetic field (X, Y, Z) and temperature from the QMC5883L.

Reads 9 bytes from register 0x00: X, Y, Z axes (little-endian) plus status byte and temperature (little-endian).

Returns:

Type Description
MagneticReading

MagneticReading with bx, by, bz, and temperature_raw.

Raises:

Type Description
IOError

If I2C communication fails.

RuntimeError

If the bus has not been initialized.

Source code in packages/src/gas2mqtt/adapters/qmc5883l.py
def read(self) -> MagneticReading:
    """Read magnetic field (X, Y, Z) and temperature from the QMC5883L.

    Reads 9 bytes from register 0x00: X, Y, Z axes (little-endian)
    plus status byte and temperature (little-endian).

    Returns:
        MagneticReading with bx, by, bz, and temperature_raw.

    Raises:
        IOError: If I2C communication fails.
        RuntimeError: If the bus has not been initialized.
    """
    if self._bus is None:
        msg = "QMC5883L not initialized — call initialize() first"
        raise RuntimeError(msg)

    # Read 9 bytes starting at register 0x00
    data = self._bus.read_i2c_block_data(self._address, 0x00, 9)

    bx = _to_signed_16_le(data[0], data[1])
    by = _to_signed_16_le(data[2], data[3])
    bz = _to_signed_16_le(data[4], data[5])
    # Byte 6 is status — skip
    temperature_raw = _to_signed_16_le(data[7], data[8])

    return MagneticReading(bx=bx, by=by, bz=bz, temperature_raw=temperature_raw)

close

close() -> None

Close the I2C bus connection.

Source code in packages/src/gas2mqtt/adapters/qmc5883l.py
def close(self) -> None:
    """Close the I2C bus connection."""
    if self._bus is not None:
        self._bus.close()
        self._bus = None
        logger.info("QMC5883L I2C bus closed")

__aenter__ async

__aenter__() -> Self

Enter async context: initialize the sensor.

Enables cosalette 0.1.5 adapter lifecycle management.

Source code in packages/src/gas2mqtt/adapters/qmc5883l.py
async def __aenter__(self) -> Self:
    """Enter async context: initialize the sensor.

    Enables cosalette 0.1.5 adapter lifecycle management.
    """
    self.initialize()
    return self

__aexit__ async

__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit async context: close the I2C bus.

Source code in packages/src/gas2mqtt/adapters/qmc5883l.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit async context: close the I2C bus."""
    self.close()

Fake (Test/Dry-Run)

gas2mqtt.adapters.fake

Fake magnetometer adapter for testing and dry-run mode.

Provides deterministic readings for unit tests and --dry-run operation. The bz value can be controlled to simulate trigger events.

FakeMagnetometer

FakeMagnetometer()

Test double for MagnetometerPort.

Returns configurable, deterministic readings. Useful for: - Unit tests (set specific bz values to test Schmitt trigger) - Dry-run mode (returns safe default values)

Attributes:

Name Type Description
bx int

Configurable X-axis value (default 0).

by int

Configurable Y-axis value (default 0).

bz int

Configurable Z-axis value (default 0).

temperature_raw int

Configurable raw temperature value (default 0).

initialized bool

Whether initialize() has been called.

closed bool

Whether close() has been called.

Source code in packages/src/gas2mqtt/adapters/fake.py
def __init__(self) -> None:
    self.bx: int = 0
    self.by: int = 0
    self.bz: int = 0
    self.temperature_raw: int = 0
    self.initialized: bool = False
    self.closed: bool = False

initialize

initialize() -> None

Mark the fake sensor as initialized.

Source code in packages/src/gas2mqtt/adapters/fake.py
def initialize(self) -> None:
    """Mark the fake sensor as initialized."""
    self.initialized = True

read

read() -> MagneticReading

Return a MagneticReading with the current configured values.

Source code in packages/src/gas2mqtt/adapters/fake.py
def read(self) -> MagneticReading:
    """Return a MagneticReading with the current configured values."""
    return MagneticReading(
        bx=self.bx,
        by=self.by,
        bz=self.bz,
        temperature_raw=self.temperature_raw,
    )

close

close() -> None

Mark the fake sensor as closed.

Source code in packages/src/gas2mqtt/adapters/fake.py
def close(self) -> None:
    """Mark the fake sensor as closed."""
    self.closed = True

__aenter__ async

__aenter__() -> Self

Enter async context: initialize the fake sensor.

Source code in packages/src/gas2mqtt/adapters/fake.py
async def __aenter__(self) -> Self:
    """Enter async context: initialize the fake sensor."""
    self.initialize()
    return self

__aexit__ async

__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit async context: close the fake sensor.

Source code in packages/src/gas2mqtt/adapters/fake.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit async context: close the fake sensor."""
    self.close()