KTVManager_UI/lib/reqs.py
2025-07-15 15:44:19 +01:00

106 lines
3.4 KiB
Python

import requests
import json
from datetime import datetime
from typing import List, Dict, Any
def get_urls(base_url: str, auth: str) -> List[Dict[str, Any]]:
"""Retrieves user account streams from the API.
Args:
base_url: The base URL of the API.
auth: The authorization token.
Returns:
A list of user account streams.
"""
url = f"{base_url}/getUserAccounts/streams"
headers = {"Authorization": f"Basic {auth}"}
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
def get_user_accounts(base_url: str, auth: str) -> List[Dict[str, Any]]:
"""Retrieves user accounts from the API.
Args:
base_url: The base URL of the API.
auth: The authorization token.
Returns:
A list of user accounts with 'expiaryDate_rendered' added.
"""
url = f"{base_url}/getUserAccounts"
headers = {"Authorization": f"Basic {auth}"}
response = requests.get(url, headers=headers)
response.raise_for_status()
accounts = response.json()
for account in accounts:
account["expiaryDate_rendered"] = datetime.utcfromtimestamp(
account["expiaryDate"]
).strftime("%d/%m/%Y")
return accounts
def delete_user_account(base_url: str, auth: str, stream: str, username: str) -> bool:
"""Deletes a user account via the API.
Args:
base_url: The base URL of the API.
auth: The authorization token.
stream: The stream associated with the account.
username: The username of the account to delete.
Returns:
True if the account was deleted successfully, False otherwise.
"""
url = f"{base_url}/deleteAccount"
payload = {"stream": stream, "user": username}
headers = {"Authorization": f"Basic {auth}"}
response = requests.post(url, headers=headers, data=payload)
response.raise_for_status()
return "Deleted" in response.text
def add_user_account(base_url: str, auth: str, username: str, password: str, stream: str) -> bool:
"""Adds a user account via the API.
Args:
base_url: The base URL of the API.
auth: The authorization token.
username: The username of the new account.
password: The password for the new account.
stream: The stream to associate with the new account.
Returns:
True if the account was added successfully, False otherwise.
"""
url = f"{base_url}/addAccount"
payload = {"username": username, "password": password, "stream": stream}
headers = {"Authorization": f"Basic {auth}"}
response = requests.post(url, headers=headers, data=payload)
return response.status_code == 200
def get_stream_names(base_url: str, auth: str) -> List[str]:
"""Retrieves a list of stream names from the API.
Args:
base_url: The base URL of the API.
auth: The authorization token.
Returns:
A list of stream names, or an empty list if an error occurs.
"""
url = f"{base_url}/getStreamNames"
headers = {"Authorization": f"Basic {auth}"}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
except (requests.exceptions.RequestException, json.JSONDecodeError):
return []