This commit introduces a complete integration for Mirabox StreamDock devices with Home Assistant, allowing users to control entities and monitor states directly from the hardware. Key features included: - Support for multiple Mirabox models: N1, N3, N4, N4Pro, XL, M3, M18, K1Pro, and various 293 variants. - Home Assistant WebSocket integration for real-time entity updates and service execution. - Dynamic LCD key rendering with support for custom icons, labels, and entity-state aware colors. - Input handling for physical buttons and rotary encoders (knobs). - Multi-page navigation support with configurable cycle keys and knobs. - Cross-platform transport layer using a custom HID library. - Configuration system using YAML with page-based layouts. - Linux udev rules for non-root USB access.
423 lines
14 KiB
Python
423 lines
14 KiB
Python
import asyncio
|
|
import io
|
|
import logging
|
|
import sys
|
|
import signal
|
|
|
|
from homeassistant_api import AsyncWebsocketClient, State
|
|
from StreamDock.DeviceManager import DeviceManager
|
|
from StreamDock.ImageHelpers.PILHelper import to_native_key_format, to_native_touchscreen_format
|
|
from StreamDock.InputTypes import EventType, Direction, KnobId
|
|
from config import load_config
|
|
from key_renderer import render_key_image, render_background
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
|
datefmt="%H:%M:%S",
|
|
)
|
|
log = logging.getLogger("streamdock-ha")
|
|
|
|
|
|
def _parse_button_ref(ref):
|
|
if not ref:
|
|
return None
|
|
ref = str(ref).lower().strip()
|
|
mapping = {
|
|
"button_7": 7, "key_7": 7,
|
|
"button_8": 8, "key_8": 8,
|
|
"button_9": 9, "key_9": 9,
|
|
}
|
|
return mapping.get(ref, int(ref) if ref.isdigit() else None)
|
|
|
|
|
|
def _parse_knob_ref(ref):
|
|
if not ref:
|
|
return None
|
|
ref = str(ref).lower().strip().replace("-", "_")
|
|
mapping = {
|
|
"knob_1": KnobId.KNOB_1, "knob1": KnobId.KNOB_1,
|
|
"knob_2": KnobId.KNOB_2, "knob2": KnobId.KNOB_2,
|
|
"knob_3": KnobId.KNOB_3, "knob3": KnobId.KNOB_3,
|
|
"knob_4": KnobId.KNOB_4, "knob4": KnobId.KNOB_4,
|
|
"left": KnobId.KNOB_1, "bottom_left": KnobId.KNOB_1,
|
|
"right": KnobId.KNOB_2, "bottom_right": KnobId.KNOB_2,
|
|
"top": KnobId.KNOB_3,
|
|
}
|
|
if ref in mapping:
|
|
return mapping[ref]
|
|
for knob in KnobId:
|
|
if knob.value == ref:
|
|
return knob
|
|
return None
|
|
|
|
|
|
class StreamDockHA:
|
|
def __init__(self, config_path=None):
|
|
self.cfg = load_config(config_path)
|
|
self.ha_url = self.cfg["homeassistant"]["url"]
|
|
self.ha_token = self.cfg["homeassistant"]["token"]
|
|
self.pages = self.cfg.get("pages", [])
|
|
self.active_page_idx = 0
|
|
|
|
nav = self.cfg.get("navigation", {})
|
|
self.page_cycle_key = _parse_button_ref(nav.get("page_cycle", "button_8"))
|
|
self.nav_knob_left = _parse_knob_ref(nav.get("knob_left"))
|
|
self.nav_knob_right = _parse_knob_ref(nav.get("knob_right"))
|
|
self.nav_knob_press_left = _parse_knob_ref(nav.get("knob_press_left"))
|
|
self.nav_knob_press_right = _parse_knob_ref(nav.get("knob_press_right"))
|
|
|
|
self.device = None
|
|
self.ha_client = None
|
|
self.entity_states = {}
|
|
self._all_key_configs = []
|
|
self._all_knob_configs = []
|
|
self._running = False
|
|
|
|
def _build_all_configs(self):
|
|
self._all_key_configs = []
|
|
self._all_knob_configs = []
|
|
for page in self.pages:
|
|
page_keys = {}
|
|
raw_keys = page.get("keys", {})
|
|
for key_str, cfg in raw_keys.items():
|
|
key_id = int(key_str) if isinstance(key_str, str) else key_str
|
|
page_keys[key_id] = cfg
|
|
self._all_key_configs.append(page_keys)
|
|
|
|
page_knobs = {}
|
|
raw_knobs = page.get("knobs", {})
|
|
for knob_ref, knob_cfg in raw_knobs.items():
|
|
knob_id = _parse_knob_ref(knob_ref)
|
|
if knob_id:
|
|
page_knobs[knob_id] = knob_cfg
|
|
self._all_knob_configs.append(page_knobs)
|
|
|
|
@property
|
|
def active_keys(self):
|
|
if not self._all_key_configs:
|
|
return {}
|
|
return self._all_key_configs[self.active_page_idx]
|
|
|
|
@property
|
|
def active_knobs(self):
|
|
if not self._all_knob_configs:
|
|
return {}
|
|
return self._all_knob_configs[self.active_page_idx]
|
|
|
|
def _all_entities(self):
|
|
entities = set()
|
|
for page_keys in self._all_key_configs:
|
|
for cfg in page_keys.values():
|
|
entity = cfg.get("entity")
|
|
if entity:
|
|
entities.add(entity)
|
|
for page_knobs in self._all_knob_configs:
|
|
for knob_cfg in page_knobs.values():
|
|
for action_key in ("press", "rotate_left", "rotate_right"):
|
|
action = knob_cfg.get(action_key, {})
|
|
if isinstance(action, dict):
|
|
entity = action.get("entity")
|
|
if entity:
|
|
entities.add(entity)
|
|
entity = knob_cfg.get("entity")
|
|
if entity:
|
|
entities.add(entity)
|
|
return entities
|
|
|
|
async def _fetch_initial_states(self):
|
|
if not self.ha_client:
|
|
return
|
|
entities = self._all_entities()
|
|
if not entities:
|
|
return
|
|
try:
|
|
states = await self.ha_client.get_states()
|
|
for state in states:
|
|
if state.entity_id in entities:
|
|
self.entity_states[state.entity_id] = state.state
|
|
log.info(f"Fetched states for {len(entities)} entities")
|
|
except Exception as e:
|
|
log.error(f"Failed to fetch initial states: {e}")
|
|
|
|
async def _render_page(self):
|
|
if not self.device:
|
|
return
|
|
await self._render_background()
|
|
for key_id, cfg in self.active_keys.items():
|
|
await self._update_key_image(key_id)
|
|
self.device.refresh()
|
|
log.info(f"Page rendered: {self.pages[self.active_page_idx].get('name', '')}")
|
|
|
|
async def _render_background(self):
|
|
if not self.device:
|
|
return
|
|
active_name = self.pages[self.active_page_idx].get("name", "")
|
|
total = len(self.pages)
|
|
bg = render_background(
|
|
self.pages, active_name,
|
|
page_num=self.active_page_idx + 1,
|
|
total_pages=total,
|
|
)
|
|
bg_processed = to_native_touchscreen_format(self.device, bg)
|
|
buf = io.BytesIO()
|
|
bg_processed.save(buf, format="JPEG", quality=85)
|
|
self.device.transport.set_background_image_stream(buf.getvalue())
|
|
|
|
async def _update_key_image(self, key_id):
|
|
cfg = self.active_keys.get(key_id)
|
|
if not cfg:
|
|
return
|
|
entity = cfg.get("entity")
|
|
state = self.entity_states.get(entity, "unknown") if entity else "off"
|
|
try:
|
|
img = render_key_image(cfg, state, size=64)
|
|
img_processed = to_native_key_format(self.device, img)
|
|
buf = io.BytesIO()
|
|
img_processed.save(buf, format="JPEG", quality=90)
|
|
jpeg_data = buf.getvalue()
|
|
self.device.transport.set_key_image_stream(jpeg_data, key_id)
|
|
except Exception as e:
|
|
log.error(f"Failed to update key {key_id}: {e}")
|
|
|
|
async def _cycle_page(self, direction=1):
|
|
if len(self.pages) <= 1:
|
|
return
|
|
old_idx = self.active_page_idx
|
|
self.active_page_idx = (self.active_page_idx + direction) % len(self.pages)
|
|
if self.active_page_idx == old_idx:
|
|
return
|
|
page_name = self.pages[self.active_page_idx].get("name", "")
|
|
log.info(f"Page: {page_name} ({self.active_page_idx + 1}/{len(self.pages)})")
|
|
self.device.clearAllIcon()
|
|
await self._render_page()
|
|
|
|
async def _handle_event(self, device, event):
|
|
if event.event_type == EventType.KNOB_ROTATE:
|
|
await self._handle_knob_rotate(event)
|
|
return
|
|
|
|
if event.event_type == EventType.KNOB_PRESS:
|
|
await self._handle_knob_press(event)
|
|
return
|
|
|
|
if event.event_type != EventType.BUTTON:
|
|
return
|
|
if event.state != 1:
|
|
return
|
|
|
|
key_id = event.key.value if event.key else None
|
|
if key_id is None:
|
|
return
|
|
|
|
if key_id == self.page_cycle_key:
|
|
await self._cycle_page(1)
|
|
return
|
|
|
|
cfg = self.active_keys.get(key_id)
|
|
if not cfg:
|
|
return
|
|
await self._call_service_from_cfg(cfg, f"Key {key_id}")
|
|
|
|
async def _handle_knob_rotate(self, event):
|
|
knob_id = event.knob_id
|
|
if not knob_id:
|
|
return
|
|
|
|
is_right = event.direction == Direction.RIGHT
|
|
|
|
if is_right and knob_id == self.nav_knob_right:
|
|
await self._cycle_page(1)
|
|
return
|
|
if not is_right and knob_id == self.nav_knob_left:
|
|
await self._cycle_page(-1)
|
|
return
|
|
|
|
knob_cfg = self.active_knobs.get(knob_id)
|
|
if not knob_cfg:
|
|
return
|
|
|
|
action_key = "rotate_right" if is_right else "rotate_left"
|
|
action = knob_cfg.get(action_key)
|
|
if not action:
|
|
return
|
|
await self._call_service_from_cfg(action, f"Knob {knob_id.value} {action_key}")
|
|
|
|
async def _handle_knob_press(self, event):
|
|
knob_id = event.knob_id
|
|
if not knob_id:
|
|
return
|
|
|
|
if event.state != 1:
|
|
return
|
|
|
|
if knob_id == self.nav_knob_press_left:
|
|
await self._cycle_page(-1)
|
|
return
|
|
if knob_id == self.nav_knob_press_right:
|
|
await self._cycle_page(1)
|
|
return
|
|
|
|
knob_cfg = self.active_knobs.get(knob_id)
|
|
if not knob_cfg:
|
|
return
|
|
|
|
press_action = knob_cfg.get("press")
|
|
if not press_action:
|
|
entity = knob_cfg.get("entity")
|
|
service = knob_cfg.get("service")
|
|
if entity and service:
|
|
await self._call_service(entity, service, f"Knob {knob_id.value} press")
|
|
return
|
|
await self._call_service_from_cfg(press_action, f"Knob {knob_id.value} press")
|
|
|
|
async def _call_service_from_cfg(self, cfg, label=""):
|
|
if isinstance(cfg, dict):
|
|
entity = cfg.get("entity")
|
|
service = cfg.get("service")
|
|
else:
|
|
return
|
|
if not entity or not service:
|
|
return
|
|
await self._call_service(entity, service, label)
|
|
|
|
async def _call_service(self, entity, service, label=""):
|
|
domain = entity.split(".")[0]
|
|
log.info(f"{label}: calling {domain}.{service} on {entity}")
|
|
try:
|
|
await self.ha_client.trigger_service(domain, service, entity_id=entity)
|
|
except Exception as e:
|
|
log.error(f"Failed to call service {domain}.{service}: {e}")
|
|
|
|
async def _listen_state_changes(self):
|
|
if not self.ha_client:
|
|
return
|
|
log.info("Listening for HA state changes...")
|
|
try:
|
|
async with self.ha_client.listen_events("state_changed") as events:
|
|
async for event in events:
|
|
if not self._running:
|
|
break
|
|
try:
|
|
data = event.data if hasattr(event, "data") else {}
|
|
entity_id = data.get("entity_id", "")
|
|
new_state = data.get("new_state")
|
|
if not entity_id or not new_state:
|
|
continue
|
|
if isinstance(new_state, State):
|
|
state_val = new_state.state
|
|
elif isinstance(new_state, dict):
|
|
state_val = new_state.get("state", "")
|
|
else:
|
|
state_val = str(new_state)
|
|
old_val = self.entity_states.get(entity_id)
|
|
self.entity_states[entity_id] = state_val
|
|
if old_val != state_val:
|
|
log.info(f"State changed: {entity_id} = {state_val}")
|
|
await self._update_visible_keys(entity_id)
|
|
except Exception as e:
|
|
log.error(f"Error processing state change: {e}")
|
|
except Exception as e:
|
|
log.error(f"HA event listener error: {e}")
|
|
|
|
async def _update_visible_keys(self, entity_id):
|
|
for key_id, cfg in self.active_keys.items():
|
|
if cfg.get("entity") == entity_id:
|
|
await self._update_key_image(key_id)
|
|
self.device.refresh()
|
|
|
|
async def _setup_device(self):
|
|
manager = DeviceManager()
|
|
devices = manager.enumerate()
|
|
if not devices:
|
|
log.error("No StreamDock device found")
|
|
return False
|
|
|
|
self.device = devices[0]
|
|
log.info(
|
|
f"Found {type(self.device).__name__} at {self.device.path} (serial: {self.device.serial_number})"
|
|
)
|
|
self.device.open()
|
|
self.device.init()
|
|
self.device.set_key_callback(self._make_sync_callback())
|
|
return True
|
|
|
|
def _make_sync_callback(self):
|
|
loop = asyncio.get_event_loop()
|
|
|
|
def callback(device, event):
|
|
asyncio.run_coroutine_threadsafe(self._handle_event(device, event), loop)
|
|
|
|
return callback
|
|
|
|
async def run(self):
|
|
if not self.pages:
|
|
log.error("No pages configured in config.yaml")
|
|
return
|
|
|
|
if not await self._setup_device():
|
|
return
|
|
|
|
self._build_all_configs()
|
|
|
|
async with AsyncWebsocketClient(self.ha_url, self.ha_token) as client:
|
|
self.ha_client = client
|
|
log.info("Connected to Home Assistant")
|
|
self._running = True
|
|
|
|
await self._fetch_initial_states()
|
|
await self._render_page()
|
|
|
|
listen_task = asyncio.create_task(self._listen_state_changes())
|
|
|
|
nav_info = []
|
|
if self.page_cycle_key:
|
|
nav_info.append(f"Key {self.page_cycle_key}=next page")
|
|
if self.nav_knob_left or self.nav_knob_right:
|
|
nav_info.append("knob rotation=prev/next page")
|
|
log.info(f"StreamDock-HA running ({len(self.pages)} pages). {'; '.join(nav_info)}. Ctrl+C to exit.")
|
|
try:
|
|
while self._running:
|
|
await asyncio.sleep(1)
|
|
except (KeyboardInterrupt, asyncio.CancelledError):
|
|
pass
|
|
finally:
|
|
self._running = False
|
|
listen_task.cancel()
|
|
try:
|
|
await listen_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
if self.device:
|
|
self.device.close()
|
|
log.info("Shutdown complete")
|
|
|
|
def shutdown(self):
|
|
self._running = False
|
|
|
|
|
|
def main():
|
|
config_path = None
|
|
for i, arg in enumerate(sys.argv):
|
|
if arg in ("-c", "--config") and i + 1 < len(sys.argv):
|
|
config_path = sys.argv[i + 1]
|
|
|
|
app = StreamDockHA(config_path)
|
|
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
|
|
for sig in (signal.SIGINT, signal.SIGTERM):
|
|
loop.add_signal_handler(sig, app.shutdown)
|
|
|
|
try:
|
|
loop.run_until_complete(app.run())
|
|
except KeyboardInterrupt:
|
|
app.shutdown()
|
|
loop.run_until_complete(app.run())
|
|
finally:
|
|
loop.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |