import os import paho.mqtt.client as mqtt import json import time from dotenv import load_dotenv # Load the .env file load_dotenv() # Define the MQTT server details broker = os.environ.get("broker") port = int(os.environ.get("port")) # MQTT username and password username = os.environ.get("username") password = os.environ.get("password") def create_client() -> mqtt.Client: """Create an MQTT client and connect it to the broker Returns: mqtt.Client: Connected MQTT client instance """ # Create a new MQTT client instance client = mqtt.Client() # Set username and password client.username_pw_set(username, password) # Connect to the MQTT broker client.connect(broker, port, 60) return client def create_config(client: mqtt.Client) -> None: """Create Home Assistant discovery topics Args: client (mqtt.Client): MQTT Client """ # Device-specific information for multiple sensors node_id = "floppy_player" # Unique device ID # Define discovery and state topics for each sensor discovery_topic_disc = f"homeassistant/sensor/floppy_player/current_disc/config" discovery_topic_disc_type = ( f"homeassistant/sensor/floppy_player/current_disc_type/config" ) discovery_topic_disc_id = ( f"homeassistant/sensor/floppy_player/current_disc_id/config" ) current_disc_state_topic_disc = ( f"homeassistant/sensor/floppy_player/current_disc/state" ) current_disc_type_state_topic_disc = ( f"homeassistant/sensor/floppy_player/current_disc_type/state" ) current_disc_id_state_topic_disc = ( f"homeassistant/sensor/floppy_player/current_disc_id/state" ) discovery_topic_status = f"homeassistant/sensor/floppy_player/status/config" state_topic_status = f"homeassistant/sensor/floppy_player/status/state" # Sensor 1: current_disc (a text-based sensor) current_disc_config = { "name": "Current Disc", "state_topic": current_disc_state_topic_disc, "value_template": "{{ value }}", # Textual value "unique_id": f"{node_id}_current_disc", "device": { "identifiers": [node_id], "name": "Floppy Player", "model": "v1", "manufacturer": "Karl", }, } current_disc_type_state_config = { "name": "Current Disc Type", "state_topic": current_disc_type_state_topic_disc, "value_template": "{{ value }}", # Textual value "unique_id": f"{node_id}_current_disc_type", "device": { "identifiers": [node_id], "name": "Floppy Player", "model": "v1", "manufacturer": "Karl", }, } current_disc_id_state_config = { "name": "Current Disc Id", "state_topic": current_disc_id_state_topic_disc, "value_template": "{{ value }}", # Textual value "unique_id": f"{node_id}_current_disc_id", "device": { "identifiers": [node_id], "name": "Floppy Player", "model": "v1", "manufacturer": "Karl", }, } # Sensor 2: status (another text-based sensor) status_config = { "name": "Device Status", "state_topic": state_topic_status, "value_template": "{{ value }}", # Textual value "unique_id": f"{node_id}_status", "device": { "identifiers": [node_id], "name": "Floppy Player", "model": "v1", "manufacturer": "Karl", }, } client.publish(discovery_topic_disc, json.dumps(current_disc_config), retain=True) client.publish( discovery_topic_disc_type, json.dumps(current_disc_type_state_config), retain=True, ) client.publish( discovery_topic_disc_id, json.dumps(current_disc_id_state_config), retain=True ) client.publish(discovery_topic_status, json.dumps(status_config), retain=True) def check_current_disc(client: mqtt.Client) -> dict: """Check the current loaded disc, disc type, and disc ID by subscribing to the retained messages on the state topics. Args: client (mqtt.Client): MQTT Client Returns: dict: A dictionary containing the current disc, disc type, and disc ID. """ def on_message(client, userdata, message): """Callback function to handle received MQTT messages.""" topic = message.topic userdata[topic] = message.payload.decode() # Create a dictionary to store the messages userdata = { "homeassistant/sensor/floppy_player/current_disc/state": None, "homeassistant/sensor/floppy_player/current_disc_type/state": None, "homeassistant/sensor/floppy_player/current_disc_id/state": None, } # Set the user data and the on_message callback client.user_data_set(userdata) client.on_message = on_message # Subscribe to the topics current_disc_state_topic_disc = ( "homeassistant/sensor/floppy_player/current_disc/state" ) current_disc_type_state_topic_disc = ( "homeassistant/sensor/floppy_player/current_disc_type/state" ) current_disc_id_state_topic_disc = ( "homeassistant/sensor/floppy_player/current_disc_id/state" ) topics = [ current_disc_state_topic_disc, current_disc_type_state_topic_disc, current_disc_id_state_topic_disc, ] for topic in topics: client.subscribe(topic) # Run the loop manually until we receive all messages or timeout timeout = 5 # Timeout after 5 seconds while any(value is None for value in userdata.values()) and timeout > 0: client.loop(timeout=0.1) # Process network events for a short time timeout -= 0.1 # Check if we received all messages if any(value is None for value in userdata.values()): raise TimeoutError("Some retained messages were not found.") # Return the relevant messages return { "name": userdata[current_disc_state_topic_disc], "type": userdata[current_disc_type_state_topic_disc], "id": userdata[current_disc_id_state_topic_disc], } def check_current_status(client: mqtt.Client) -> str: """Check the current player status by subscribing to the retained message on the state topic. Args: client (mqtt.Client): MQTT Client Returns: str: Current object at current_disc/state """ def on_message(client, userdata, message): """Callback function to handle received MQTT messages.""" userdata['message'] = message.payload.decode() # Create a dictionary to store the message userdata = {'message': None} # Set the user data and the on_message callback client.user_data_set(userdata) client.on_message = on_message # Subscribe to the topic client.subscribe("homeassistant/sensor/floppy_player/status/state") # Run the loop manually until we receive a message timeout = 5 # Timeout after 5 seconds while userdata['message'] is None and timeout > 0: client.loop(timeout=0.1) # Process network events for a short time timeout -= 0.1 # Check if we received the message if userdata['message'] is None: raise TimeoutError("No retained message was found.") return userdata['message'] def update_disc(client: mqtt.Client, disc_message: list) -> None: """Update current disc. Args: client (mqtt.Client): MQTT Client disc_message (dict): Current disc information """ # Publish disc name with QoS 1 client.publish( "homeassistant/sensor/floppy_player/current_disc/state", disc_message[1], qos=1, retain=True ) time.sleep(0.1) # Small delay to ensure messages are sent in order # Publish disc type with QoS 1 client.publish( "homeassistant/sensor/floppy_player/current_disc_type/state", disc_message[2], qos=1, retain=True ) time.sleep(0.1) # Publish disc ID with QoS 1 client.publish( "homeassistant/sensor/floppy_player/current_disc_id/state", disc_message[3], qos=1, retain=True ) print(f"Published current disc with QoS 1: {disc_message}") def control_player(client: mqtt.Client, state: str) -> None: """Control the player Args: client (mqtt.Client): MQTT Client state (str): Player State """ client.publish( "homeassistant/sensor/floppy_player/status/state", state, retain=True ) print(f"Published status: {state}")