250 lines
9.9 KiB
Python
250 lines
9.9 KiB
Python
|
|
import platform
|
||
|
|
import threading
|
||
|
|
import time
|
||
|
|
from typing import Optional
|
||
|
|
# import pywinusb.hid as hid
|
||
|
|
from .ProductIDs import USBVendorIDs, USBProductIDs, g_products
|
||
|
|
from .Transport.LibUSBHIDAPI import LibUSBHIDAPI
|
||
|
|
|
||
|
|
# Platform-specific imports
|
||
|
|
if platform.system() == "Linux":
|
||
|
|
import pyudev
|
||
|
|
elif platform.system() == "Windows":
|
||
|
|
try:
|
||
|
|
import wmi
|
||
|
|
import pythoncom
|
||
|
|
WINDOWS_SUPPORT = True
|
||
|
|
except ImportError:
|
||
|
|
print("Warning: wmi module not installed, using polling mode")
|
||
|
|
WINDOWS_SUPPORT = False
|
||
|
|
elif platform.system() == "Darwin":
|
||
|
|
# macOS specific imports can be added here if needed
|
||
|
|
pass
|
||
|
|
|
||
|
|
class DeviceManager:
|
||
|
|
streamdocks = list()
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def _get_transport(transport):
|
||
|
|
return LibUSBHIDAPI()
|
||
|
|
|
||
|
|
def __init__(self, transport=None):
|
||
|
|
self.transport = self._get_transport(transport)
|
||
|
|
|
||
|
|
def enumerate(self)->list:
|
||
|
|
# CRITICAL: Clear old list to avoid stale references
|
||
|
|
self.streamdocks.clear()
|
||
|
|
|
||
|
|
products = g_products
|
||
|
|
for vid, pid, class_type in products:
|
||
|
|
found_devices = self.transport.enumerate_devices(vendor_id = vid, product_id = pid)
|
||
|
|
# Create a dedicated LibUSBHIDAPI instance per device
|
||
|
|
# CRITICAL: Pass device info to transport for proper resource management
|
||
|
|
for d in found_devices:
|
||
|
|
# Create device_info structure from dict
|
||
|
|
device_info = LibUSBHIDAPI.create_device_info_from_dict(d)
|
||
|
|
device_transport = LibUSBHIDAPI(device_info)
|
||
|
|
self.streamdocks.append(class_type(device_transport, d))
|
||
|
|
return self.streamdocks
|
||
|
|
|
||
|
|
def listen(self):
|
||
|
|
"""
|
||
|
|
Listen for device hotplug events, cross-platform
|
||
|
|
"""
|
||
|
|
products = g_products
|
||
|
|
system = platform.system()
|
||
|
|
|
||
|
|
if system == "Linux":
|
||
|
|
self._listen_linux(products)
|
||
|
|
elif system == "Windows":
|
||
|
|
self._listen_windows(products)
|
||
|
|
elif system == "Darwin":
|
||
|
|
self._listen_macos(products)
|
||
|
|
else:
|
||
|
|
print(f"Unsupported operating system: {system}")
|
||
|
|
|
||
|
|
def _listen_linux(self, products):
|
||
|
|
"""Linux uses pyudev to listen for device events"""
|
||
|
|
context = pyudev.Context()
|
||
|
|
monitor = pyudev.Monitor.from_netlink(context)
|
||
|
|
monitor.filter_by(subsystem='usb')
|
||
|
|
|
||
|
|
for device in iter(monitor.poll, None):
|
||
|
|
self._handle_device_event(device.action, device, products)
|
||
|
|
|
||
|
|
def _listen_windows(self, products):
|
||
|
|
"""Windows uses WMI to listen for device events"""
|
||
|
|
if not WINDOWS_SUPPORT:
|
||
|
|
print("WMI unavailable, using polling mode")
|
||
|
|
self._fallback_polling(products)
|
||
|
|
return
|
||
|
|
|
||
|
|
try:
|
||
|
|
pythoncom.CoInitialize()
|
||
|
|
c = wmi.WMI()
|
||
|
|
|
||
|
|
# Listen for device connection events
|
||
|
|
watcher = c.Win32_DeviceChangeEvent.watch_for(
|
||
|
|
EventType=2 # Device connected
|
||
|
|
)
|
||
|
|
|
||
|
|
while True:
|
||
|
|
try:
|
||
|
|
event = watcher()
|
||
|
|
if event.EventType == 2: # Device connected
|
||
|
|
self._check_new_devices_windows(products)
|
||
|
|
elif event.EventType == 3: # Device disconnected
|
||
|
|
self._check_removed_devices_windows(products)
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Windows device listener error: {e}")
|
||
|
|
time.sleep(1)
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Windows WMI initialization failed: {e}")
|
||
|
|
# Fall back to polling mode
|
||
|
|
self._fallback_polling(products)
|
||
|
|
finally:
|
||
|
|
pythoncom.CoUninitialize()
|
||
|
|
|
||
|
|
def _listen_macos(self, products):
|
||
|
|
"""macOS uses polling to listen for device events"""
|
||
|
|
self._fallback_polling(products)
|
||
|
|
|
||
|
|
def _fallback_polling(self, products):
|
||
|
|
"""Fall back to polling mode for systems without real-time monitoring"""
|
||
|
|
# print("Using polling mode to monitor device changes...")
|
||
|
|
current_devices = set()
|
||
|
|
|
||
|
|
# Initialize current device list
|
||
|
|
for vid, pid, _ in products:
|
||
|
|
devices = self.transport.enumerate_devices(vendor_id=vid, product_id=pid)
|
||
|
|
for device in devices:
|
||
|
|
current_devices.add(device['path'])
|
||
|
|
|
||
|
|
while True:
|
||
|
|
try:
|
||
|
|
new_devices = set()
|
||
|
|
for vid, pid, _ in products:
|
||
|
|
devices = self.transport.enumerate_devices(vendor_id=vid, product_id=pid)
|
||
|
|
for device in devices:
|
||
|
|
new_devices.add(device['path'])
|
||
|
|
|
||
|
|
# Check for newly added devices
|
||
|
|
added_devices = new_devices - current_devices
|
||
|
|
for device_path in added_devices:
|
||
|
|
print(f"[add] path: {device_path}")
|
||
|
|
self._handle_device_addition(device_path, products)
|
||
|
|
|
||
|
|
# Check for removed devices
|
||
|
|
removed_devices = current_devices - new_devices
|
||
|
|
for device_path in removed_devices:
|
||
|
|
print(f"[remove] path: {device_path}")
|
||
|
|
self._handle_device_removal(device_path)
|
||
|
|
|
||
|
|
current_devices = new_devices
|
||
|
|
time.sleep(2) # Check every 2 seconds
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Polling listener error: {e}")
|
||
|
|
time.sleep(5)
|
||
|
|
|
||
|
|
def _handle_device_event(self, action, device, products):
|
||
|
|
"""Handle device events (Linux)"""
|
||
|
|
if action not in ['add', 'remove']:
|
||
|
|
return
|
||
|
|
|
||
|
|
if action == 'remove':
|
||
|
|
for willRemoveDevice in self.streamdocks:
|
||
|
|
if device.device_path.find(willRemoveDevice.getPath()) != -1:
|
||
|
|
print("[remove] path: " + willRemoveDevice.getPath())
|
||
|
|
self.streamdocks.remove(willRemoveDevice)
|
||
|
|
break
|
||
|
|
|
||
|
|
vendor_id_str = device.get('ID_VENDOR_ID')
|
||
|
|
product_id_str = device.get('ID_MODEL_ID')
|
||
|
|
|
||
|
|
if not vendor_id_str or not product_id_str:
|
||
|
|
return
|
||
|
|
|
||
|
|
try:
|
||
|
|
vendor_id = int(vendor_id_str, 16)
|
||
|
|
product_id = int(product_id_str, 16)
|
||
|
|
except ValueError:
|
||
|
|
return
|
||
|
|
|
||
|
|
for vid, pid, class_type in products:
|
||
|
|
if vendor_id == vid and product_id == pid:
|
||
|
|
if action == 'add':
|
||
|
|
dev_path = device.device_path.split('/')[-1] + ":1.0"
|
||
|
|
full_path = dev_path
|
||
|
|
|
||
|
|
found_devices = self.transport.enumerate_devices(vendor_id=vid, product_id=pid)
|
||
|
|
for d in found_devices:
|
||
|
|
if d['path'].endswith(full_path):
|
||
|
|
print("[add] path:", d['path'])
|
||
|
|
newDevice = class_type(self.transport, d)
|
||
|
|
self.streamdocks.append(newDevice)
|
||
|
|
newDevice.open()
|
||
|
|
# your reconnect logic like the next two line
|
||
|
|
# newDevice.set_key_image(1, "../img/tiga64.png")
|
||
|
|
# newDevice.refresh()
|
||
|
|
break
|
||
|
|
|
||
|
|
def _check_new_devices_windows(self, products):
|
||
|
|
"""Check for new devices on Windows"""
|
||
|
|
for vid, pid, class_type in products:
|
||
|
|
found_devices = self.transport.enumerate_devices(vendor_id=vid, product_id=pid)
|
||
|
|
for device_info in found_devices:
|
||
|
|
device_path = device_info['path']
|
||
|
|
# Check whether the device already exists
|
||
|
|
exists = any(device.getPath() == device_path for device in self.streamdocks)
|
||
|
|
if not exists:
|
||
|
|
print(f"[add] path: {device_path}")
|
||
|
|
newDevice = class_type(self.transport, device_info)
|
||
|
|
self.streamdocks.append(newDevice)
|
||
|
|
newDevice.open()
|
||
|
|
# your reconnect logic like the next two line
|
||
|
|
# newDevice.set_key_image(1, "../img/tiga64.png")
|
||
|
|
# newDevice.refresh()
|
||
|
|
|
||
|
|
def _check_removed_devices_windows(self, products):
|
||
|
|
"""Check for removed devices on Windows"""
|
||
|
|
current_paths = set()
|
||
|
|
for vid, pid, _ in products:
|
||
|
|
found_devices = self.transport.enumerate_devices(vendor_id=vid, product_id=pid)
|
||
|
|
for device_info in found_devices:
|
||
|
|
current_paths.add(device_info['path'])
|
||
|
|
|
||
|
|
# Remove devices that no longer exist
|
||
|
|
devices_to_remove = []
|
||
|
|
for device in self.streamdocks:
|
||
|
|
if device.getPath() not in current_paths:
|
||
|
|
devices_to_remove.append(device)
|
||
|
|
|
||
|
|
for device in devices_to_remove:
|
||
|
|
print(f"[remove] path: {device.getPath()}")
|
||
|
|
self.streamdocks.remove(device)
|
||
|
|
|
||
|
|
def _handle_device_addition(self, device_path, products):
|
||
|
|
"""Handle device addition events (polling mode)"""
|
||
|
|
for vid, pid, class_type in products:
|
||
|
|
found_devices = self.transport.enumerate_devices(vendor_id=vid, product_id=pid)
|
||
|
|
for device_info in found_devices:
|
||
|
|
if device_info['path'] == device_path:
|
||
|
|
newDevice = class_type(self.transport, device_info)
|
||
|
|
self.streamdocks.append(newDevice)
|
||
|
|
newDevice.open()
|
||
|
|
# your reconnect logic like the next two line
|
||
|
|
# newDevice.set_key_image(1, "../img/tiga64.png")
|
||
|
|
# newDevice.refresh()
|
||
|
|
break
|
||
|
|
|
||
|
|
def _handle_device_removal(self, device_path):
|
||
|
|
"""Handle device removal events (polling mode)"""
|
||
|
|
devices_to_remove = []
|
||
|
|
for device in self.streamdocks:
|
||
|
|
if device.getPath() == device_path:
|
||
|
|
devices_to_remove.append(device)
|
||
|
|
|
||
|
|
for device in devices_to_remove:
|
||
|
|
self.streamdocks.remove(device)
|
||
|
|
|