325 lines
11 KiB
Python
325 lines
11 KiB
Python
from __future__ import print_function
|
|
|
|
import threading
|
|
import logging
|
|
from collections import defaultdict
|
|
from binascii import hexlify
|
|
from uuid import UUID
|
|
|
|
from . import exceptions
|
|
|
|
try:
|
|
string_type = basestring
|
|
except NameError:
|
|
string_type = str
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class BLEDevice(object):
|
|
"""
|
|
An BLE device connection instance, returned by one of the BLEBackend
|
|
implementations. This class is not meant to be instantiated directly - use
|
|
BLEBackend.connect() to create one.
|
|
"""
|
|
def __init__(self, address):
|
|
"""
|
|
Initialize.
|
|
|
|
address -- the BLE address (aka MAC address) of the device as a string.
|
|
"""
|
|
self._address = address
|
|
self._characteristics = {}
|
|
self._callbacks = defaultdict(set)
|
|
self._subscribed_handlers = {}
|
|
self._subscribed_uuids = {}
|
|
self._lock = threading.Lock()
|
|
|
|
def bond(self, permanent=False):
|
|
"""
|
|
Create a new bond or use an existing bond with the device and make the
|
|
current connection bonded and encrypted.
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
def get_rssi(self):
|
|
"""
|
|
Get the receiver signal strength indicator (RSSI) value from the BLE
|
|
device.
|
|
|
|
Returns the RSSI value in dBm on success.
|
|
Returns None on failure.
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
def char_read(self, uuid):
|
|
"""
|
|
Reads a Characteristic by UUID.
|
|
|
|
uuid -- UUID of Characteristic to read as a string.
|
|
|
|
Returns a bytearray containing the characteristic value on success.
|
|
|
|
Example:
|
|
my_ble_device.char_read('a1e8f5b1-696b-4e4c-87c6-69dfe0b0093b')
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
def char_read_handle(self, uuid):
|
|
"""
|
|
Reads a Characteristic by handle.
|
|
|
|
handle -- handle of Characteristic to read.
|
|
|
|
Returns a bytearray containing the characteristic value on success.
|
|
|
|
Example:
|
|
my_ble_device.char_read_handle(5)
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
def char_read_long(self, uuid):
|
|
"""
|
|
Reads a Characteristic by UUID.
|
|
|
|
uuid -- UUID of Characteristic to read as a string.
|
|
|
|
Returns a bytearray containing the characteristic value on success.
|
|
|
|
Example:
|
|
my_ble_device.char_read('a1e8f5b1-696b-4e4c-87c6-69dfe0b0093b')
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
def char_read_long_handle(self, handle):
|
|
"""
|
|
Reads a Characteristic longer than one read by handle.
|
|
|
|
handle -- handle of Characteristic to read.
|
|
|
|
Returns a bytearray containing the characteristic value on success.
|
|
|
|
Example:
|
|
my_ble_device.char_read_long_handle(5)
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
def char_write(self, uuid, value, wait_for_response=True):
|
|
"""
|
|
Writes a value to a given characteristic UUID.
|
|
|
|
uuid -- the UUID of the characteristic to write to.
|
|
value -- a bytearray to write to the characteristic.
|
|
wait_for_response -- wait for response after writing. A GATT "command"
|
|
is used when not waiting for a response. The remote host will not
|
|
acknowledge the write.
|
|
|
|
Example:
|
|
my_ble_device.char_write('a1e8f5b1-696b-4e4c-87c6-69dfe0b0093b',
|
|
bytearray([0x00, 0xFF]))
|
|
"""
|
|
return self.char_write_handle(self.get_handle(uuid), value,
|
|
wait_for_response=wait_for_response)
|
|
|
|
def char_write_handle(self, handle, value, wait_for_response=True):
|
|
"""
|
|
Writes a value to a given characteristic handle. This can be used to
|
|
write to the characteristic config handle for a primary characteristic.
|
|
|
|
hande -- the handle to write to.
|
|
value -- a bytearray to write to the characteristic.
|
|
wait_for_response -- wait for response after writing.
|
|
|
|
Example:
|
|
my_ble_device.char_write_handle(42, bytearray([0x00, 0xFF]))
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
def char_write_long(self, uuid, value, wait_for_response=False):
|
|
"""
|
|
Writes a value to a given characteristic UUID.
|
|
|
|
uuid -- the UUID of the characteristic to write to.
|
|
value -- a bytearray to write to the characteristic.
|
|
wait_for_response -- wait for response after writing.
|
|
|
|
Example:
|
|
my_ble_device.char_write('a1e8f5b1-696b-4e4c-87c6-69dfe0b0093b',
|
|
bytearray([0x00, 0xFF]))
|
|
"""
|
|
return self.char_write_long_handle(self.get_handle(uuid), value,
|
|
wait_for_response=wait_for_response)
|
|
|
|
def char_write_long_handle(self, handle, value, wait_for_response=False):
|
|
"""
|
|
Writes a value to a given characteristic handle. This can be used to
|
|
write to the characteristic config handle for a primary characteristic.
|
|
|
|
hande -- the handle to write to.
|
|
value -- a bytearray to write to the characteristic.
|
|
wait_for_response -- wait for response after writing.
|
|
|
|
Example:
|
|
my_ble_device.char_write(42,
|
|
bytearray([0x00, 0xFF]))
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
def disconnect(self):
|
|
"""
|
|
Disconnect from the device. This instance of BLEDevice cannot be used
|
|
after calling this method, you must call BLEBackend.connect() again to
|
|
get a fresh connection.
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
def _notification_handles(self, uuid):
|
|
# Expect notifications on the value handle...
|
|
value_handle = self.get_handle(uuid)
|
|
|
|
# but write to the characteristic config to enable notifications
|
|
# TODO with the BGAPI backend we can be smarter and fetch the actual
|
|
# characteristic config handle - we can also do that with gattool if we
|
|
# use the 'desc' command, so we'll need to change the "get_handle" API
|
|
# to be able to get the value or characteristic config handle.
|
|
characteristic_config_handle = value_handle + 1
|
|
|
|
return value_handle, characteristic_config_handle
|
|
|
|
def subscribe(self, uuid, callback=None, indication=False,
|
|
wait_for_response=True):
|
|
|
|
"""
|
|
Enable notifications or indications for a characteristic and register a
|
|
callback function to be called whenever a new value arrives.
|
|
|
|
uuid -- UUID as a string of the characteristic to subscribe.
|
|
callback -- function to be called when a notification/indication is
|
|
received on this characteristic.
|
|
indication -- use indications (where each notificaiton is ACKd). This is
|
|
more reliable, but slower.
|
|
wait_for_response -- wait for response after subscription.
|
|
|
|
"""
|
|
|
|
value_handle, characteristic_config_handle = (
|
|
self._notification_handles(uuid)
|
|
)
|
|
|
|
properties = bytearray([
|
|
0x2 if indication else 0x1,
|
|
0x0
|
|
])
|
|
|
|
with self._lock:
|
|
if callback is not None:
|
|
self._callbacks[value_handle].add(callback)
|
|
|
|
if self._subscribed_handlers.get(value_handle, None) != properties:
|
|
self.char_write_handle(
|
|
characteristic_config_handle,
|
|
properties,
|
|
wait_for_response=wait_for_response
|
|
)
|
|
log.info("Subscribed to uuid=%s", uuid)
|
|
self._subscribed_handlers[value_handle] = properties
|
|
self._subscribed_uuids[uuid] = indication
|
|
else:
|
|
log.debug("Already subscribed to uuid=%s", uuid)
|
|
|
|
def unsubscribe(self, uuid, wait_for_response=True):
|
|
"""
|
|
Disable notification for a charecteristic and de-register the callback.
|
|
"""
|
|
value_handle, characteristic_config_handle = (
|
|
self._notification_handles(uuid)
|
|
)
|
|
|
|
properties = bytearray([0x0, 0x0])
|
|
|
|
with self._lock:
|
|
if uuid in self._subscribed_uuids:
|
|
del(self._subscribed_uuids[uuid])
|
|
if value_handle in self._callbacks:
|
|
del(self._callbacks[value_handle])
|
|
if value_handle in self._subscribed_handlers:
|
|
del(self._subscribed_handlers[value_handle])
|
|
self.char_write_handle(
|
|
characteristic_config_handle,
|
|
properties,
|
|
wait_for_response=wait_for_response
|
|
)
|
|
log.info("Unsubscribed from uuid=%s", uuid)
|
|
else:
|
|
log.debug("Already unsubscribed from uuid=%s", uuid)
|
|
|
|
def get_handle(self, char_uuid):
|
|
"""
|
|
Look up and return the handle for an attribute by its UUID.
|
|
:param char_uuid: The UUID of the characteristic.
|
|
:type uuid: str
|
|
:return: None if the UUID was not found.
|
|
"""
|
|
if isinstance(char_uuid, string_type):
|
|
char_uuid = UUID(char_uuid)
|
|
log.debug("Looking up handle for characteristic %s", char_uuid)
|
|
if char_uuid not in self._characteristics:
|
|
self._characteristics = self.discover_characteristics()
|
|
|
|
characteristic = self._characteristics.get(char_uuid)
|
|
if characteristic is None:
|
|
message = "No characteristic found matching %s" % char_uuid
|
|
log.warn(message)
|
|
raise exceptions.BLEError(message)
|
|
|
|
# TODO support filtering by descriptor UUID, or maybe return the whole
|
|
# Characteristic object
|
|
log.debug("Found %s" % characteristic)
|
|
return characteristic.handle
|
|
|
|
def receive_notification(self, handle, value):
|
|
"""
|
|
Receive a notification from the connected device and propagate the value
|
|
to all registered callbacks.
|
|
"""
|
|
|
|
log.info('Received notification on handle=0x%x, value=0x%s',
|
|
handle, hexlify(value))
|
|
with self._lock:
|
|
if handle in self._callbacks:
|
|
for callback in self._callbacks[handle]:
|
|
callback(handle, value)
|
|
|
|
def exchange_mtu(self, mtu):
|
|
"""
|
|
ATT exchange Maximum Transmission Unit.
|
|
:param mtu: New MTU-value
|
|
:return: New MTU, as recognized by server.
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
def resubscribe_all(self):
|
|
"""
|
|
Reenable all notifications and indications for uuids that were
|
|
previously subscribed to.
|
|
This has to be called after a connection loss and subsequent reconnect.
|
|
"""
|
|
|
|
for uuid in self._subscribed_uuids:
|
|
value_handle, characteristic_config_handle = (
|
|
self._notification_handles(uuid)
|
|
)
|
|
|
|
properties = bytearray([
|
|
0x2 if self._subscribed_uuids[uuid] else 0x1,
|
|
0x0
|
|
])
|
|
|
|
with self._lock:
|
|
self.char_write_handle(
|
|
characteristic_config_handle,
|
|
properties,
|
|
wait_for_response=True
|
|
)
|
|
log.info("Resubscribed to uuid=%s", uuid)
|