working direct HA functionality

This commit is contained in:
Karl Hudgell 2024-10-04 16:35:45 +01:00
parent 23a74569a2
commit 7c75422dc4
2 changed files with 150 additions and 92 deletions

View File

@ -1,65 +1,94 @@
import os
import RPi.GPIO as GPIO
import time
from lib.mqtt import create_client, control_player, create_config, check_current_status, check_current_disc, update_disc
from lib.home_assistant import load_disc, pause_media, play_media_again, set_shuffle, next_track
from dotenv import load_dotenv
client = create_client()
load_dotenv()
create_config(client)
use_mqtt = os.environ["use_mqtt"].lower() == "true"
# Set up GPIO pins for each button
if use_mqtt:
client = create_client()
create_config(client)
# Debug: Print environment variable
print(f"Using MQTT: {use_mqtt}")
# GPIO button pin setup
BUTTON_PINS = [17, 18, 27, 22]
GPIO.setmode(GPIO.BCM) # Use BCM numbering
# Set up each button pin as input with internal pull-up resistors
for pin in BUTTON_PINS:
GPIO.setup(pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
# Callback functions to handle button presses
def button_callback_1(channel):
client = create_client()
if check_current_status(client) == "PLAY":
print("Button 1 pressed! Pausing playback.")
control_player(client, "PAUSE")
else:
print("Button 1 pressed! Playing.")
control_player(client, "PLAY")
def button_callback_2(channel):
print("Button 2 pressed!")
client = create_client()
control_player(client, "SKIP")
control_player(client, "PLAY")
def button_callback_3(channel):
start_time = time.time()
client = create_client()
disc_data = check_current_disc(client)
# Wait until the button is released or held for more than 3 seconds
while GPIO.input(BUTTON_PINS[2]) == GPIO.LOW: # Button is pressed (active low)
if time.time() - start_time >= 3: # Check if 3 seconds have passed
print("Button 3 held for 3 seconds! Sending RADIO2.")
update_disc(client, ["ytmusic",disc_data['name'],disc_data['type'],"RESET"])
# Button Handlers
def handle_button_1():
if use_mqtt:
if check_current_status(client) == "PLAY":
print("Pausing playback via MQTT.")
control_player(client, "PAUSE")
else:
print("Playing media via MQTT.")
control_player(client, "PLAY")
update_disc(client, ["ytmusic",disc_data['name'],disc_data['type'],disc_data['id']])
return # Exit after sending RADIO2
# If button was not held for 3 seconds, send RADIO
print("Button 3 pressed briefly! Sending RADIO.")
control_player(client, "RADIO")
else:
print("Pausing/Playing via Home Assistant.")
if check_current_status(client) == "PLAY":
pause_media()
else:
play_media_again()
def button_callback_4(channel):
print("Button 4 pressed!")
client = create_client()
control_player(client, "SHUFFLE")
control_player(client, "PLAY")
def handle_button_2():
print("Skipping to the next track.")
if use_mqtt:
control_player(client, "SKIP")
control_player(client, "PLAY")
else:
next_track()
def handle_button_3():
start_time = time.time()
if use_mqtt:
disc_data = check_current_disc(client)
# Wait until the button is released or held for more than 3 seconds
while GPIO.input(BUTTON_PINS[2]) == GPIO.LOW:
if time.time() - start_time >= 3:
print("Button 3 held for 3 seconds! Resetting to RADIO2 via MQTT.")
update_disc(client, ["ytmusic", disc_data['name'], disc_data['type'], "RESET"])
control_player(client, "PLAY")
update_disc(client, ["ytmusic", disc_data['name'], disc_data['type'], disc_data['id']])
return # Exit after sending RADIO2
print("Button 3 pressed briefly! Sending RADIO via MQTT.")
control_player(client, "RADIO")
else:
print("Button 3 functionality not implemented for Home Assistant.")
def handle_button_4():
print("Toggling shuffle mode.")
if use_mqtt:
control_player(client, "SHUFFLE")
control_player(client, "PLAY")
else:
set_shuffle()
# GPIO Callback Handlers
def button_callback(channel):
if channel == BUTTON_PINS[0]:
handle_button_1()
elif channel == BUTTON_PINS[1]:
handle_button_2()
elif channel == BUTTON_PINS[2]:
handle_button_3()
elif channel == BUTTON_PINS[3]:
handle_button_4()
# Add event detection for each button press
GPIO.add_event_detect(BUTTON_PINS[0], GPIO.FALLING, callback=button_callback_1, bouncetime=200)
GPIO.add_event_detect(BUTTON_PINS[1], GPIO.FALLING, callback=button_callback_2, bouncetime=200)
GPIO.add_event_detect(BUTTON_PINS[2], GPIO.FALLING, callback=button_callback_3, bouncetime=200)
GPIO.add_event_detect(BUTTON_PINS[3], GPIO.FALLING, callback=button_callback_3, bouncetime=200)
for pin in BUTTON_PINS:
GPIO.add_event_detect(pin, GPIO.FALLING, callback=button_callback, bouncetime=200)
try:
# Keep the script running to detect button presses

125
main.py
View File

@ -1,59 +1,88 @@
import sys
from lib.mqtt import create_client, update_disc, control_player, create_config, check_current_disc
from lib.home_assistant import load_disc, pause_media,play_media_again,play_radio,stop_media
from dotenv import load_dotenv
import os
from lib.mqtt import (
create_client,
update_disc,
control_player,
create_config,
check_current_disc,
)
from lib.home_assistant import load_disc, pause_media, play_media_again
from dotenv import load_dotenv
load_dotenv()
client = create_client()
use_mqtt = os.environ["use_mqtt"].lower() == "true"
if use_mqtt:
client = create_client()
create_config(client)
create_config(client)
# Debug: Print environment variable
print(f"Using MQTT: {use_mqtt}")
# Helper function to handle disc operations
def handle_disc(disc_data):
print(f"Handling disc: {disc_data}")
disc_object = {"name": disc_data[1], "type": disc_data[2], "id": disc_data[3]}
if use_mqtt:
if disc_object == check_current_disc(client):
print("Disc is the same as the last one, playing.")
control_player(client, "PLAY")
if use_mqtt:
print("Updating disc via MQTT and playing.")
control_player(client, "PLAY")
update_disc(client, disc_data)
else:
print(f"Loading {disc_object['name']} via Home Assistant.")
load_disc(disc_object)
# Command handlers for media control
def handle_play():
print("Handling PLAY command.")
if use_mqtt:
control_player(client, "PLAY")
else:
play_media_again()
def handle_pause():
print("Handling PAUSE command.")
if use_mqtt:
control_player(client, "PAUSE")
else:
pause_media()
# Dictionary for command dispatching
command_handlers = {"EJECT": handle_pause, "PLAY": handle_play, "PAUSE": handle_pause}
# Main function to process input
def process_input(input_str):
print(f"Processing input: {input_str}")
if ":" in input_str:
# Handle disc input
disc_data = input_str.split(":")
if disc_data[0] == "ytmusic":
handle_disc(disc_data)
else:
print(f"Unknown disc type: {disc_data[0]}")
else:
# Handle control commands
command_handler = command_handlers.get(input_str)
if command_handler:
command_handler()
else:
print(f"Can't process {input_str}")
use_mqtt = os.environ['use_mqtt'].lower() == 'true'
# Check if any arguments are passed
if len(sys.argv) > 1:
# Get the JSON argument from the command line
input_str = sys.argv[1]
# Convert the JSON string into a dictionary
disc_data = input_str.split(":")
if disc_data[0] == "ytmusic":
disc_object = {
"name":disc_data[1],
"type":disc_data[2],
"id":disc_data[3]
}
# Check if disc is the same as last inserted
if (disc_object == check_current_disc(client)):
# If disc is the same, PLAY
control_player(client, "PLAY")
else:
# Pass the parsed data to the update_disc function
if use_mqtt:
control_player(client, "PLAY")
update_disc(client, disc_data)
else:
load_disc(disc_object)
else:
if input_str == "EJECT":
# PAUSE the current playing item
if use_mqtt:
control_player(client, "PAUSE")
else:
pause_media()
elif input_str == "PLAY":
# PLAY the current playing item
if use_mqtt:
control_player(client, "PLAY")
else:
play_media_again()
elif input_str == "PAUSE":
# PAUSE the current playing item
if use_mqtt:
control_player(client, "PAUSE")
else:
pause_media()
else:
print(f"Can't process {input_str}")
print(f"Received argument: {input_str}")
process_input(input_str)
else:
print("No arguments passed.")