stream_dock_linux/streamdock_ha.py

423 lines
14 KiB
Python
Raw Permalink Normal View History

import asyncio
import io
import logging
import sys
import signal
from homeassistant_api import AsyncWebsocketClient, State
from StreamDock.DeviceManager import DeviceManager
from StreamDock.ImageHelpers.PILHelper import to_native_key_format, to_native_touchscreen_format
from StreamDock.InputTypes import EventType, Direction, KnobId
from config import load_config
from key_renderer import render_key_image, render_background
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("streamdock-ha")
def _parse_button_ref(ref):
if not ref:
return None
ref = str(ref).lower().strip()
mapping = {
"button_7": 7, "key_7": 7,
"button_8": 8, "key_8": 8,
"button_9": 9, "key_9": 9,
}
return mapping.get(ref, int(ref) if ref.isdigit() else None)
def _parse_knob_ref(ref):
if not ref:
return None
ref = str(ref).lower().strip().replace("-", "_")
mapping = {
"knob_1": KnobId.KNOB_1, "knob1": KnobId.KNOB_1,
"knob_2": KnobId.KNOB_2, "knob2": KnobId.KNOB_2,
"knob_3": KnobId.KNOB_3, "knob3": KnobId.KNOB_3,
"knob_4": KnobId.KNOB_4, "knob4": KnobId.KNOB_4,
"left": KnobId.KNOB_1, "bottom_left": KnobId.KNOB_1,
"right": KnobId.KNOB_2, "bottom_right": KnobId.KNOB_2,
"top": KnobId.KNOB_3,
}
if ref in mapping:
return mapping[ref]
for knob in KnobId:
if knob.value == ref:
return knob
return None
class StreamDockHA:
def __init__(self, config_path=None):
self.cfg = load_config(config_path)
self.ha_url = self.cfg["homeassistant"]["url"]
self.ha_token = self.cfg["homeassistant"]["token"]
self.pages = self.cfg.get("pages", [])
self.active_page_idx = 0
nav = self.cfg.get("navigation", {})
self.page_cycle_key = _parse_button_ref(nav.get("page_cycle", "button_8"))
self.nav_knob_left = _parse_knob_ref(nav.get("knob_left"))
self.nav_knob_right = _parse_knob_ref(nav.get("knob_right"))
self.nav_knob_press_left = _parse_knob_ref(nav.get("knob_press_left"))
self.nav_knob_press_right = _parse_knob_ref(nav.get("knob_press_right"))
self.device = None
self.ha_client = None
self.entity_states = {}
self._all_key_configs = []
self._all_knob_configs = []
self._running = False
def _build_all_configs(self):
self._all_key_configs = []
self._all_knob_configs = []
for page in self.pages:
page_keys = {}
raw_keys = page.get("keys", {})
for key_str, cfg in raw_keys.items():
key_id = int(key_str) if isinstance(key_str, str) else key_str
page_keys[key_id] = cfg
self._all_key_configs.append(page_keys)
page_knobs = {}
raw_knobs = page.get("knobs", {})
for knob_ref, knob_cfg in raw_knobs.items():
knob_id = _parse_knob_ref(knob_ref)
if knob_id:
page_knobs[knob_id] = knob_cfg
self._all_knob_configs.append(page_knobs)
@property
def active_keys(self):
if not self._all_key_configs:
return {}
return self._all_key_configs[self.active_page_idx]
@property
def active_knobs(self):
if not self._all_knob_configs:
return {}
return self._all_knob_configs[self.active_page_idx]
def _all_entities(self):
entities = set()
for page_keys in self._all_key_configs:
for cfg in page_keys.values():
entity = cfg.get("entity")
if entity:
entities.add(entity)
for page_knobs in self._all_knob_configs:
for knob_cfg in page_knobs.values():
for action_key in ("press", "rotate_left", "rotate_right"):
action = knob_cfg.get(action_key, {})
if isinstance(action, dict):
entity = action.get("entity")
if entity:
entities.add(entity)
entity = knob_cfg.get("entity")
if entity:
entities.add(entity)
return entities
async def _fetch_initial_states(self):
if not self.ha_client:
return
entities = self._all_entities()
if not entities:
return
try:
states = await self.ha_client.get_states()
for state in states:
if state.entity_id in entities:
self.entity_states[state.entity_id] = state.state
log.info(f"Fetched states for {len(entities)} entities")
except Exception as e:
log.error(f"Failed to fetch initial states: {e}")
async def _render_page(self):
if not self.device:
return
await self._render_background()
for key_id, cfg in self.active_keys.items():
await self._update_key_image(key_id)
self.device.refresh()
log.info(f"Page rendered: {self.pages[self.active_page_idx].get('name', '')}")
async def _render_background(self):
if not self.device:
return
active_name = self.pages[self.active_page_idx].get("name", "")
total = len(self.pages)
bg = render_background(
self.pages, active_name,
page_num=self.active_page_idx + 1,
total_pages=total,
)
bg_processed = to_native_touchscreen_format(self.device, bg)
buf = io.BytesIO()
bg_processed.save(buf, format="JPEG", quality=85)
self.device.transport.set_background_image_stream(buf.getvalue())
async def _update_key_image(self, key_id):
cfg = self.active_keys.get(key_id)
if not cfg:
return
entity = cfg.get("entity")
state = self.entity_states.get(entity, "unknown") if entity else "off"
try:
img = render_key_image(cfg, state, size=64)
img_processed = to_native_key_format(self.device, img)
buf = io.BytesIO()
img_processed.save(buf, format="JPEG", quality=90)
jpeg_data = buf.getvalue()
self.device.transport.set_key_image_stream(jpeg_data, key_id)
except Exception as e:
log.error(f"Failed to update key {key_id}: {e}")
async def _cycle_page(self, direction=1):
if len(self.pages) <= 1:
return
old_idx = self.active_page_idx
self.active_page_idx = (self.active_page_idx + direction) % len(self.pages)
if self.active_page_idx == old_idx:
return
page_name = self.pages[self.active_page_idx].get("name", "")
log.info(f"Page: {page_name} ({self.active_page_idx + 1}/{len(self.pages)})")
self.device.clearAllIcon()
await self._render_page()
async def _handle_event(self, device, event):
if event.event_type == EventType.KNOB_ROTATE:
await self._handle_knob_rotate(event)
return
if event.event_type == EventType.KNOB_PRESS:
await self._handle_knob_press(event)
return
if event.event_type != EventType.BUTTON:
return
if event.state != 1:
return
key_id = event.key.value if event.key else None
if key_id is None:
return
if key_id == self.page_cycle_key:
await self._cycle_page(1)
return
cfg = self.active_keys.get(key_id)
if not cfg:
return
await self._call_service_from_cfg(cfg, f"Key {key_id}")
async def _handle_knob_rotate(self, event):
knob_id = event.knob_id
if not knob_id:
return
is_right = event.direction == Direction.RIGHT
if is_right and knob_id == self.nav_knob_right:
await self._cycle_page(1)
return
if not is_right and knob_id == self.nav_knob_left:
await self._cycle_page(-1)
return
knob_cfg = self.active_knobs.get(knob_id)
if not knob_cfg:
return
action_key = "rotate_right" if is_right else "rotate_left"
action = knob_cfg.get(action_key)
if not action:
return
await self._call_service_from_cfg(action, f"Knob {knob_id.value} {action_key}")
async def _handle_knob_press(self, event):
knob_id = event.knob_id
if not knob_id:
return
if event.state != 1:
return
if knob_id == self.nav_knob_press_left:
await self._cycle_page(-1)
return
if knob_id == self.nav_knob_press_right:
await self._cycle_page(1)
return
knob_cfg = self.active_knobs.get(knob_id)
if not knob_cfg:
return
press_action = knob_cfg.get("press")
if not press_action:
entity = knob_cfg.get("entity")
service = knob_cfg.get("service")
if entity and service:
await self._call_service(entity, service, f"Knob {knob_id.value} press")
return
await self._call_service_from_cfg(press_action, f"Knob {knob_id.value} press")
async def _call_service_from_cfg(self, cfg, label=""):
if isinstance(cfg, dict):
entity = cfg.get("entity")
service = cfg.get("service")
else:
return
if not entity or not service:
return
await self._call_service(entity, service, label)
async def _call_service(self, entity, service, label=""):
domain = entity.split(".")[0]
log.info(f"{label}: calling {domain}.{service} on {entity}")
try:
await self.ha_client.trigger_service(domain, service, entity_id=entity)
except Exception as e:
log.error(f"Failed to call service {domain}.{service}: {e}")
async def _listen_state_changes(self):
if not self.ha_client:
return
log.info("Listening for HA state changes...")
try:
async with self.ha_client.listen_events("state_changed") as events:
async for event in events:
if not self._running:
break
try:
data = event.data if hasattr(event, "data") else {}
entity_id = data.get("entity_id", "")
new_state = data.get("new_state")
if not entity_id or not new_state:
continue
if isinstance(new_state, State):
state_val = new_state.state
elif isinstance(new_state, dict):
state_val = new_state.get("state", "")
else:
state_val = str(new_state)
old_val = self.entity_states.get(entity_id)
self.entity_states[entity_id] = state_val
if old_val != state_val:
log.info(f"State changed: {entity_id} = {state_val}")
await self._update_visible_keys(entity_id)
except Exception as e:
log.error(f"Error processing state change: {e}")
except Exception as e:
log.error(f"HA event listener error: {e}")
async def _update_visible_keys(self, entity_id):
for key_id, cfg in self.active_keys.items():
if cfg.get("entity") == entity_id:
await self._update_key_image(key_id)
self.device.refresh()
async def _setup_device(self):
manager = DeviceManager()
devices = manager.enumerate()
if not devices:
log.error("No StreamDock device found")
return False
self.device = devices[0]
log.info(
f"Found {type(self.device).__name__} at {self.device.path} (serial: {self.device.serial_number})"
)
self.device.open()
self.device.init()
self.device.set_key_callback(self._make_sync_callback())
return True
def _make_sync_callback(self):
loop = asyncio.get_event_loop()
def callback(device, event):
asyncio.run_coroutine_threadsafe(self._handle_event(device, event), loop)
return callback
async def run(self):
if not self.pages:
log.error("No pages configured in config.yaml")
return
if not await self._setup_device():
return
self._build_all_configs()
async with AsyncWebsocketClient(self.ha_url, self.ha_token) as client:
self.ha_client = client
log.info("Connected to Home Assistant")
self._running = True
await self._fetch_initial_states()
await self._render_page()
listen_task = asyncio.create_task(self._listen_state_changes())
nav_info = []
if self.page_cycle_key:
nav_info.append(f"Key {self.page_cycle_key}=next page")
if self.nav_knob_left or self.nav_knob_right:
nav_info.append("knob rotation=prev/next page")
log.info(f"StreamDock-HA running ({len(self.pages)} pages). {'; '.join(nav_info)}. Ctrl+C to exit.")
try:
while self._running:
await asyncio.sleep(1)
except (KeyboardInterrupt, asyncio.CancelledError):
pass
finally:
self._running = False
listen_task.cancel()
try:
await listen_task
except asyncio.CancelledError:
pass
if self.device:
self.device.close()
log.info("Shutdown complete")
def shutdown(self):
self._running = False
def main():
config_path = None
for i, arg in enumerate(sys.argv):
if arg in ("-c", "--config") and i + 1 < len(sys.argv):
config_path = sys.argv[i + 1]
app = StreamDockHA(config_path)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, app.shutdown)
try:
loop.run_until_complete(app.run())
except KeyboardInterrupt:
app.shutdown()
loop.run_until_complete(app.run())
finally:
loop.close()
if __name__ == "__main__":
main()