BLE Sensoren mit Micropython (Daum Ergometer als Sensor)

Mein Vermieter Hans fragte mich neulich ob man irgendwie die Daten seines Ergometers (ein altes Daum ergo_bike) in seinen neuen Fahrradcomputer ( Einen Garmin Edge 820 ) bekommen kann,
da er in seinem Fahrradcomputer gerne seine gesamten Fitnessdaten für Rennrad und Ergometer hätte

Bei Daum bekommt man für das ergo_bike eine schöne Schnittstellenbeschreibung:

Man kann erkennen, dass die sog. Run_Daten alles enthalten was man so braucht – Leistung, Trittfrequenz, Geschwindigkeit und Puls

Auf der Rückseite des Cockpits findet man den zugehörigen Stecker (oben):

Der Stecker sieht zwar wie RS-232 aus, hat aber etwas andere Pegel (Daum verwendet statt max232 2 Optokoppler für RX und TX) – Deswegen ein PullUp an pin 2 des MAX232.

GND und VCC hab ich der Übersichtlichkeit halber nicht eingezeichnet (und natürlich weil ich faul bin 🙂 )

Das war schon die Hardware – du wirst dich wundern warum 2 ESP32. Leider hat der Fahrradcomputer mehrere Sensoren pro Gerät nicht verstanden und es ging nicht anders 🙁

Nach etwas Micropython hats dann funktioniert:

Code Power-Sensor ( für Trittfrequenz, Leistung und Kadenz )

(oberer ESP32)

Tut mir leid, ist etwas schnell und schlampig geschrieben aber erfüllt seinen Zweck ganz gut

Du brauchst Micropython und die Datei https://github.com/micropython/micropython/blob/master/examples/bluetooth/ble_advertising.py aus den Micropython-Beispielen auf deinem Device. Natürlich hab ich mich auch großzügig an den Beispielen dort bedient – ist doch klar 🙂

Das Protokoll zwischen Fahrradcomputern und Sensoren ist am besten beschrieben unter https://www.bluetooth.com/specifications/specs/gatt-specification-supplement-5/
und natürlich viele viele andere Typen von Sensoren (z.B. Temperatur, Luftfeuchte …) man findet für fast Alles was. Vielleicht kannst du hier was brauchen für dein Sensorprojekt – es ist sehr leicht BLE-Sensoren zu implementieren ( besonders mit Micropython, finde ich 🙂 )

import bluetooth
import random
import struct
import time
from ble_advertising import advertising_payload

from micropython import const

from machine import UART

uart = UART(2, 9600)                         
uart.init(9600, bits=8, parity=None, stop=1) 

_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_INDICATE_DONE = const(20)

_FLAG_READ = const(0x0002)
_FLAG_NOTIFY = const(0x0010)
_FLAG_INDICATE = const(0x0020)

_UUID_POWER = bluetooth.UUID(0x1818)

_POWER_CHAR = (
    bluetooth.UUID(0x2A63),
    _FLAG_READ | _FLAG_NOTIFY | _FLAG_INDICATE,
)
_POWER_FEAT_CHAR = (
    bluetooth.UUID(0x2A65),
    _FLAG_READ | _FLAG_NOTIFY | _FLAG_INDICATE,
)
_POWER_LOC_CHAR = (
    bluetooth.UUID(0x2A5D),
    _FLAG_READ | _FLAG_NOTIFY,
)
_SERVICE_POWER = (
    _UUID_POWER,
    (_POWER_CHAR,_POWER_FEAT_CHAR,_POWER_LOC_CHAR,),
)

SERVICES = (_SERVICE_POWER,)

class BLEErgo:
    def __init__(self, ble, name="HansErgoPower"):
        self._ble = ble
        self._ble.active(True)
        self._ble.irq(self._irq)
        ((self._handlePower, self._handlePowerFeat,self._handlePowerLoc,),) = self._ble.gatts_register_services(SERVICES)
        self._connections = set()
        self._payload = advertising_payload(
            name=name, services=[_UUID_POWER]
        )
        self._advertise()
        self.lastCrankRev = 0
        self.lastWheelRev = 0
        self.lastCrankEvent = 0
        self.lastWheelEvent = 0
        
        dat = bytearray()
        dat.append(0x0C) # speed and cadence
        dat.append(0x00)
        dat.append(0x00)
        dat.append(0x00)

        self._ble.gatts_write(self._handlePowerFeat, dat)
        
        dat = bytearray()
        dat.append(6) # right crank
        
        self._ble.gatts_write(self._handlePowerLoc, dat)

    def _irq(self, event, data):
        if event == _IRQ_CENTRAL_CONNECT:
            conn_handle, _, _ = data
            self._connections.add(conn_handle)
        elif event == _IRQ_CENTRAL_DISCONNECT:
            conn_handle, _, _ = data
            self._connections.remove(conn_handle)
            self._advertise()
        elif event == _IRQ_GATTS_INDICATE_DONE:
            conn_handle, value_handle, status = data

    def setPower(self, pwr, rpm, speed, notify=False, indicate=False):
        
        self.lastWheelRev += 1
        self.lastCrankRev += 1
        self.lastCrankEvent += int(60 / rpm * 1024)
        
        rnd = int(15500 / speed)
        self.lastWheelEvent += rnd
        
        print(rnd)
     
        data = bytearray()
        data.append(0x30) # wheel and crank rev present
        data.append(0x00)        
        data+= ((int(pwr)).to_bytes(2, 'little'))
        data+= ((int(self.lastWheelRev)).to_bytes(4, 'little'))
        data+= ((int(self.lastWheelEvent)).to_bytes(2, 'little'))
        data+= ((int(self.lastCrankRev)).to_bytes(2, 'little'))
        data+= ((int(self.lastCrankEvent)).to_bytes(2, 'little'))
        data.append(0x00)
        data.append(0x00)
        data.append(0x00)
        data.append(0x00)
        data.append(0x00)

        self._ble.gatts_write(self._handlePower, data)
        if notify or indicate:
            for conn_handle in self._connections:
                if notify:
                    self._ble.gatts_notify(conn_handle, self._handlePower)
                if indicate:
                    self._ble.gatts_indicate(conn_handle, self._handlePower)


    def _advertise(self, interval_us=100000):
        self._ble.gap_advertise(interval_us, adv_data=self._payload)



def app():
    ble = bluetooth.BLE()
    ergo = BLEErgo(ble)

    while True:
        dat = bytearray()
        dat.append(0x40)
        dat.append(0x00)
        uart.write(dat)
        time.sleep_ms(10)
        dat = bytearray()
        while uart.any():
             dat += uart.read()
        
        if len(dat) > 14:
            speed = dat[7]
            cadence = dat[6]
            power = dat[5] * 5
            pulse = dat[14]
            if cadence == 0:
                cadence = 1
            if speed == 0:
                speed = 1

            print ("Leistung: " + str(power))
            print ("Geschwindigkeit: " + str(speed))
            print ("Kadenz: " + str(cadence))
            
            ergo.setPower(power, cadence, speed, notify=True, indicate=False)
        else:
            ergo.setPower(42, 42, 42, notify=True, indicate=False)
        
        time.sleep_ms(1000)

if __name__ == "__main__":
    app()

Code Speed-Sensor:

(unterer ESP32)

import bluetooth
import random
import struct
import time
from ble_advertising import advertising_payload

from micropython import const

from machine import UART

uart = UART(2, 9600)                         
uart.init(9600, bits=8, parity=None, stop=1) 

_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_INDICATE_DONE = const(20)

_FLAG_WRITE = const(0x0001)
_FLAG_READ = const(0x0002)
_FLAG_NOTIFY = const(0x0010)
_FLAG_INDICATE = const(0x0020)

_UUID_SPEED = bluetooth.UUID(0x1816)

_SPEED_CHAR = (
    bluetooth.UUID(0x2A5B),
    _FLAG_NOTIFY,
)
_SPEED_FEAT_CHAR = (
    bluetooth.UUID(0x2A5C),
    _FLAG_READ,
)
_SPEED_LOC_CHAR = (
    bluetooth.UUID(0x2A5D),
    _FLAG_READ,
)
_SPEED_CONTROL_CHAR = (
    bluetooth.UUID(0x2A55),
    _FLAG_WRITE | _FLAG_INDICATE,
)
_SPEED_CONTROL_CHAR = (
    bluetooth.UUID(0x2A55),
    _FLAG_WRITE | _FLAG_INDICATE,
)

_SERVICE_SPEED = (
    _UUID_SPEED,
    (_SPEED_CHAR,_SPEED_FEAT_CHAR,_SPEED_LOC_CHAR,_SPEED_CONTROL_CHAR,),
)

SERVICES = (_SERVICE_SPEED,)

class BLEErgo:
    def __init__(self, ble, name="HansErgoSpeed"):
        self._ble = ble
        self._ble.active(True)
        self._ble.irq(self._irq)
        ((self._handleSpeed,self._handleSpeedFeat,self._handleSpeedLoc,self._handleSpeedControl,),) = self._ble.gatts_register_services(SERVICES)
        self._connections = set()
        self._payload = advertising_payload(
            name=name, services=[_UUID_SPEED]
        )
        self._advertise()
        self.lastWheelEvent = 0
        self.lastWheelRev = 0
        
        dat = bytearray()
        dat.append(0x01) # speed 
        dat.append(0x00)

        self._ble.gatts_write(self._handleSpeedFeat, dat)
        
        dat = bytearray()
        dat.append(12) # rear wheel
        
        self._ble.gatts_write(self._handleSpeedLoc, dat)

        dat = bytearray()
        dat.append(0x02) 
        dat.append(0x00) 
        
        self._ble.gatts_write(self._handleSpeedControl, dat)


    def _irq(self, event, data):
        if event == _IRQ_CENTRAL_CONNECT:
            conn_handle, _, _ = data
            self._connections.add(conn_handle)
        elif event == _IRQ_CENTRAL_DISCONNECT:
            conn_handle, _, _ = data
            self._connections.remove(conn_handle)
            self._advertise()
        elif event == _IRQ_GATTS_INDICATE_DONE:
            conn_handle, value_handle, status = data

    def setSpeed(self, speed, notify=False, indicate=False):
        
        self.lastWheelRev += 1
        self.lastWheelEvent += int(15500 / speed / 2)
     
        data = bytearray()
        data.append(0x01) # wheel rev present      
        data+= ((int(self.lastWheelRev)).to_bytes(4, 'little'))
        data+= ((int(self.lastWheelEvent)).to_bytes(2, 'little'))
        
        self._ble.gatts_write(self._handleSpeed, data)
        if notify or indicate:
            for conn_handle in self._connections:
                if notify:
                    self._ble.gatts_notify(conn_handle, self._handleSpeed)
                if indicate:
                    self._ble.gatts_indicate(conn_handle, self._handleSpeed)


    def _advertise(self, interval_us=100000):
        self._ble.gap_advertise(interval_us, adv_data=self._payload)



def app():
    ble = bluetooth.BLE()
    ergo = BLEErgo(ble)
    dat = bytearray()
    while True:
        
        while uart.any():
             dat += uart.read()
        
        if len(dat) > 14:
            speed = dat[7]
            cadence = dat[6]
            power = dat[5] * 5
            pulse = dat[14]
            if cadence == 0:
                cadence = 1
            if speed == 0:
                speed = 1
            dat = bytearray()

            print ("Geschwindigkeit: " + str(speed))

            ergo.setSpeed(speed, notify=True, indicate=False)
            time.sleep_ms(100)
        else:
            #ergo.setSpeed(42, notify=True, indicate=False)
            time.sleep_ms(1000)

if __name__ == "__main__":
    app()