I found some issues in ruuvitag-sensor · PyPI and I am trying to rewrite it.
At the initial testing it gives the same results as the app in my phone.
I am totally ignorant in bluetooth and I do not know it this is very stupid in principle.
Any comments?
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Jun 7 15:54:58 2022
@author: heikki
This is an early experiment on how to read https://ruuvi.com/ruuvitag/
sensor data.
I first tried https://pypi.org/project/ruuvitag-sensor/,
but it seemed to require
bluez-deprecated - Bluez tools that upstream considers obsolete
This package contains tools from the bluez package that are only built if
the "--enable-deprecated" switch is used.
These are considered obsolete by the upstream developers and might
contain serious issues, even security bugs. Use at your own risk.
Note that this package will go away before end of 2020,
change your code to use the modern tools instead.
It asked for superuser password, which I did not like. And it did not work.
So, totally ignorant on bluetooth protocols, I began to study,
how bluetooth works — mainly by trial-and-error.
So, the result may be stupid and anyhow far from production ready —
Just tested a couple of times ;-)
After heavy googling I found 'bleak' and simple examples how it works.
"""
import asyncio
import struct
import math
from bleak import BleakScanner # , BleakClient
# The following functions I copied from a file decoder.py somewhere in
# ruuvi web pages. Could not find again.
def _get_temperature(data):
"""Return temperature in celsius"""
if data[1] == -32768:
return None
return round(data[1] / 200, 2)
def _get_humidity(data):
"""Return humidity %"""
if data[2] == 65535:
return None
return round(data[2] / 400, 2)
def _get_pressure(data):
"""Return air pressure hPa"""
if data[3] == 0xFFFF:
return None
return round((data[3] + 50000) / 100, 2)
def _get_acceleration(data):
"""Return acceleration mG"""
if (data[4] == -32768 or data[5] == -32768 or data[6] == -32768):
return (None, None, None)
return data[4:7]
def _get_powerinfo(data):
"""Return battery voltage and tx power"""
battery_voltage = data[7] >> 5
tx_power = data[7] & 0x001F
return (battery_voltage, tx_power)
def _get_battery(data):
"""Return battery mV"""
battery_voltage = _get_powerinfo(data)[0]
if battery_voltage == 0b11111111111:
return None
return battery_voltage + 1600
def _get_txpower(data):
"""Return transmit power"""
tx_power = _get_powerinfo(data)[1]
if tx_power == 0b11111:
return None
return -40 + (tx_power * 2)
def _get_movementcounter(data):
return data[8]
def _get_measurementsequencenumber(data):
return data[9]
def _get_mac(data):
return ''.join('{:02x}'.format(x) for x in data[10:])
def encode(sensor_data):
print('\nencoding:')
measurements = struct.unpack('>BhHHhhhHBH6B', sensor_data)
# in the original ruuvi 'decoder.py':
# struct.unpack('>BhHHhhhHBH6B', bytearray.fromhex(data[:48]))
acc_x, acc_y, acc_z = _get_acceleration(measurements)
return {
'data_format': 5,
'humidity': _get_humidity(measurements),
'temperature': _get_temperature(measurements),
'pressure': _get_pressure(measurements),
'acceleration': math.sqrt(
acc_x * acc_x + acc_y * acc_y + acc_z * acc_z),
'acceleration_x': acc_x,
'acceleration_y': acc_y,
'acceleration_z': acc_z,
'tx_power': _get_txpower(measurements),
'battery': _get_battery(measurements),
'movement_counter': _get_movementcounter(measurements),
'measurement_sequence_number': _get_measurementsequencenumber(
measurements),
'mac': _get_mac(measurements)
}
async def main():
"""
From here begins my code
I need the sensor data maybe every 10 minutes,
so performance will be no issue. I just make this into a function
to be called every 10 minutes. Not forgetting 'try:' 'except:'
error handling
It turned out that bleak discover catches the information I need.
"""
devices = await BleakScanner.discover()
for d in devices:
# print(d.details) # to see what there
props = d.details['props']
if 'Name' in props.keys():
# there will of course be a list of sensor names
if props['Name'] == 'Ruuvi B599':
# myDevice = d
# print(props)
sensor_data = props['ManufacturerData'][1177]
print(sensor_data)
res = encode(sensor_data)
print(res)
asyncio.run(main())
# Also 'BleakClient' is worth studying. I think I need it if I want to control
# bluetooth thermostats. Might be difficult :-[
# address = str(myDevice.details['props']['Address'])
# async with BleakClient(address) as client:
# services = await client.get_services()
# print("Services:")
# for service in services:
# print(service)
# for handle in services.characteristics:
# print(handle, services.get_characteristic(handle))