2021-10-25 20:04:55 +01:00

215 lines
8.3 KiB
Python

import logging
import time
from pygatt import BLEDevice, exceptions
from . import constants
from .bgapi import BGAPIError
from .error_codes import ErrorCode
from .packets import BGAPICommandPacketBuilder as CommandBuilder
from .bglib import EventPacketType, ResponsePacketType
log = logging.getLogger(__name__)
def connection_required(func):
"""Raise an exception if the device is not connected before calling the
actual function.
"""
def wrapper(self, *args, **kwargs):
if self._handle is None:
raise exceptions.NotConnectedError()
return func(self, *args, **kwargs)
return wrapper
class BGAPIBLEDevice(BLEDevice):
def __init__(self, address, handle, backend):
super(BGAPIBLEDevice, self).__init__(address)
self._handle = handle
self._backend = backend
@connection_required
def bond(self, permanent=False):
"""
Create a bond and encrypted connection with the device.
"""
# Set to bondable mode so bonds are store permanently
if permanent:
self._backend.set_bondable(True)
log.debug("Bonding to %s", self._address)
self._backend.send_command(
CommandBuilder.sm_encrypt_start(
self._handle, constants.bonding['create_bonding']))
self._backend.expect(ResponsePacketType.sm_encrypt_start)
packet_type, response = self._backend.expect_any(
[EventPacketType.connection_status,
EventPacketType.sm_bonding_fail])
if packet_type == EventPacketType.sm_bonding_fail:
raise BGAPIError("Bonding failed")
log.info("Bonded to %s", self._address)
@connection_required
def get_rssi(self):
"""
Get the receiver signal strength indicator (RSSI) value from the device.
Returns the RSSI as in integer in dBm.
"""
# The BGAPI has some strange behavior where it will return 25 for
# the RSSI value sometimes... Try a maximum of 3 times.
for i in range(0, 3):
self._backend.send_command(
CommandBuilder.connection_get_rssi(self._handle))
_, response = self._backend.expect(
ResponsePacketType.connection_get_rssi)
rssi = response['rssi']
if rssi != 25:
return rssi
time.sleep(0.1)
raise BGAPIError("get rssi failed")
@connection_required
def char_read(self, uuid, timeout=None):
return self.char_read_handle(self.get_handle(uuid), timeout=timeout)
@connection_required
def char_read_handle(self, handle, timeout=None):
log.info("Reading characteristic at handle %d", handle)
self._backend.send_command(
CommandBuilder.attclient_read_by_handle(
self._handle, handle))
self._backend.expect(ResponsePacketType.attclient_read_by_handle)
success = False
while not success:
matched_packet_type, response = self._backend.expect_any(
[EventPacketType.attclient_attribute_value,
EventPacketType.attclient_procedure_completed],
timeout=timeout)
# TODO why not just expect *only* the attribute value response,
# then it would time out and raise an exception if allwe got was
# the 'procedure completed' response?
if matched_packet_type != EventPacketType.attclient_attribute_value:
raise BGAPIError("Unable to read characteristic")
if response['atthandle'] == handle:
# Otherwise we received a response from a wrong handle (e.g.
# from a notification) so we keep trying to wait for the
# correct one
success = True
return bytearray(response['value'])
@connection_required
def char_read_long(self, uuid, timeout=None):
return self.char_read_long_handle(self.get_handle(uuid),
timeout=timeout)
@connection_required
def char_read_long_handle(self, handle, timeout=None):
log.info("Reading long characteristic at handle %d", handle)
self._backend.send_command(
CommandBuilder.attclient_read_long(
self._handle, handle))
self._backend.expect(ResponsePacketType.attclient_read_long)
success = False
response = b""
while not success:
matched_packet_type, chunk = self._backend.expect_any(
[EventPacketType.attclient_attribute_value,
EventPacketType.attclient_procedure_completed],
timeout=timeout)
if (matched_packet_type ==
EventPacketType.attclient_attribute_value):
if chunk['atthandle'] == handle:
# Concatenate the data
response += chunk["value"]
elif (matched_packet_type ==
EventPacketType.attclient_procedure_completed):
if chunk['chrhandle'] == handle:
success = True
return bytearray(response)
@connection_required
def char_write_handle(self, char_handle, value, wait_for_response=True):
while True:
value_list = [b for b in value]
# An "attribute write" is always acknowledged by the remote host.
if wait_for_response:
self._backend.send_command(
CommandBuilder.attclient_attribute_write(
self._handle, char_handle, value_list))
self._backend.expect(
ResponsePacketType.attclient_attribute_write)
packet_type, response = self._backend.expect(
EventPacketType.attclient_procedure_completed,
# According to the BLE spec, the device has 30 seconds to
# repsonse to the attribute write.
timeout=30)
# A "command" write is unacknowledged - don't wait for a response.
else:
self._backend.send_command(
CommandBuilder.attclient_write_command(
self._handle, char_handle, value_list))
packet_type, response = self._backend.expect(
ResponsePacketType.attclient_write_command)
if (response['result'] !=
ErrorCode.insufficient_authentication.value):
# Continue to retry until we are bonded
break
# ASC - adapted from
# https://raw.githubusercontent.com/mjbrown/bgapi/master/bgapi/module.py
# - reliable_write_by_handle
@connection_required
def char_write_long_handle(self,
char_handle,
value,
wait_for_response=False):
maxv = 18
for i in range(int(((len(value)-1) / maxv)+1)):
chunk = value[maxv*i:min(maxv*(i+1), len(value))]
value_list = [b for b in chunk]
self._backend.send_command(
CommandBuilder.attclient_prepare_write(
self._handle, char_handle, maxv*i, value_list))
packet_type, response = self._backend.expect(
ResponsePacketType.attclient_prepare_write)
packet_type, response = self._backend.expect(
EventPacketType.attclient_procedure_completed)
time.sleep(0.1)
time.sleep(0.1)
self._backend.send_command(
CommandBuilder.attclient_execute_write(
self._handle, 1)) # 1 = commit, 0 = cancel
self._backend.expect(ResponsePacketType.attclient_execute_write)
packet_type, response = self._backend.expect(
EventPacketType.attclient_procedure_completed)
time.sleep(0.1)
@connection_required
def disconnect(self):
log.debug("Disconnecting from %s", self._address)
self._backend.send_command(
CommandBuilder.connection_disconnect(self._handle))
self._backend.expect(ResponsePacketType.connection_disconnect)
log.info("Disconnected from %s", self._address)
self._handle = None
@connection_required
def discover_characteristics(self):
self._characteristics = self._backend.discover_characteristics(
self._handle)
return self._characteristics