RuuviTag Sensor Python Package with Bleak

I found some issues in ruuvitag-sensor · PyPI and I am trying to rewrite it.

At the initial testing it gives the same results as the app in my phone.

I am totally ignorant in bluetooth and I do not know it this is very stupid in principle.

Any comments?

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Jun  7 15:54:58 2022

@author: heikki

This is an early experiment on how to read https://ruuvi.com/ruuvitag/
sensor data.

I first tried https://pypi.org/project/ruuvitag-sensor/,
but it seemed to require

    bluez-deprecated - Bluez tools that upstream considers obsolete

    This package contains tools from the bluez package that are only built if
    the "--enable-deprecated" switch is used.
    These are considered obsolete by the upstream developers and might
    contain serious issues, even security bugs. Use at your own risk.
    Note that this package will go away before end of 2020,
    change your code to use the modern tools instead.

It asked for superuser password, which I did not like. And it did not work.

So, totally ignorant on bluetooth protocols, I began to study,
how bluetooth works — mainly by trial-and-error.
So, the result may be stupid and anyhow far from production ready —
Just tested a couple of times ;-)


After heavy googling I found 'bleak' and simple examples how it works.

"""

import asyncio
import struct
import math
from bleak import BleakScanner  # , BleakClient


# The following functions I copied from a file decoder.py somewhere in
# ruuvi web pages. Could not find again.

def _get_temperature(data):
    """Return temperature in celsius"""
    if data[1] == -32768:
        return None

    return round(data[1] / 200, 2)


def _get_humidity(data):
    """Return humidity %"""
    if data[2] == 65535:
        return None

    return round(data[2] / 400, 2)


def _get_pressure(data):
    """Return air pressure hPa"""
    if data[3] == 0xFFFF:
        return None

    return round((data[3] + 50000) / 100, 2)


def _get_acceleration(data):
    """Return acceleration mG"""
    if (data[4] == -32768 or data[5] == -32768 or data[6] == -32768):
        return (None, None, None)

    return data[4:7]


def _get_powerinfo(data):
    """Return battery voltage and tx power"""
    battery_voltage = data[7] >> 5
    tx_power = data[7] & 0x001F

    return (battery_voltage, tx_power)


def _get_battery(data):
    """Return battery mV"""
    battery_voltage = _get_powerinfo(data)[0]
    if battery_voltage == 0b11111111111:
        return None

    return battery_voltage + 1600


def _get_txpower(data):
    """Return transmit power"""
    tx_power = _get_powerinfo(data)[1]
    if tx_power == 0b11111:
        return None

    return -40 + (tx_power * 2)


def _get_movementcounter(data):
    return data[8]


def _get_measurementsequencenumber(data):
    return data[9]


def _get_mac(data):
    return ''.join('{:02x}'.format(x) for x in data[10:])


def encode(sensor_data):
    print('\nencoding:')
    measurements = struct.unpack('>BhHHhhhHBH6B', sensor_data)
    # in the original ruuvi 'decoder.py':
    # struct.unpack('>BhHHhhhHBH6B', bytearray.fromhex(data[:48]))
    acc_x, acc_y, acc_z = _get_acceleration(measurements)
    return {
        'data_format': 5,
        'humidity': _get_humidity(measurements),
        'temperature': _get_temperature(measurements),
        'pressure': _get_pressure(measurements),
        'acceleration': math.sqrt(
            acc_x * acc_x + acc_y * acc_y + acc_z * acc_z),
        'acceleration_x': acc_x,
        'acceleration_y': acc_y,
        'acceleration_z': acc_z,
        'tx_power': _get_txpower(measurements),
        'battery': _get_battery(measurements),
        'movement_counter': _get_movementcounter(measurements),
        'measurement_sequence_number': _get_measurementsequencenumber(
            measurements),
        'mac': _get_mac(measurements)
    }


async def main():
    """
    From here begins my code

    I need the sensor data maybe every 10 minutes,
    so performance will be no issue. I just make this into a function
    to be called every 10 minutes. Not forgetting 'try:' 'except:'
    error handling

    It turned out that bleak discover catches the information I need.

    """
    devices = await BleakScanner.discover()
    for d in devices:
        # print(d.details)  # to see what there
        props = d.details['props']
        if 'Name' in props.keys():
            # there will of course be a list of sensor names
            if props['Name'] == 'Ruuvi B599':
                # myDevice = d
                # print(props)
                sensor_data = props['ManufacturerData'][1177]
                print(sensor_data)
                res = encode(sensor_data)
                print(res)


asyncio.run(main())


# Also 'BleakClient' is worth studying. I think I need it if I want to control
# bluetooth thermostats. Might be difficult :-[

# address = str(myDevice.details['props']['Address'])
# async with BleakClient(address) as client:
#     services = await client.get_services()
#     print("Services:")
#     for service in services:
#         print(service)
    # for handle in services.characteristics:
    #     print(handle, services.get_characteristic(handle))

Bleak connection quick test
bleak connection

Hi! Thanks for bringing Bleak to my attention. I wasn’t aware that there is well functioning BLE package for Python. I added support for it to ruuvitag-sensor · PyPI

It is not thoroughly tested and Bleak package must be manually added. Guide from README

Implementation uses BleakScanner with a callback.