KTVManager_UI/lib/reqs.py

111 lines
3.7 KiB
Python
Raw Permalink Normal View History

2025-05-09 16:31:14 +01:00
import requests
import json
from datetime import datetime
from typing import List, Dict, Any
def get_urls(base_url: str, auth: str) -> List[Dict[str, Any]]:
"""Retrieve user account streams from the specified base URL.
Args:
base_url (str): The base URL of the API.
auth (str): The authorization token for accessing the API.
Returns:
List[Dict[str, Any]]: A list of user account streams.
"""
url = f"{base_url}/getUserAccounts/streams"
payload = {}
headers = {"Authorization": f"Basic {auth}"}
response = requests.request("GET", url, headers=headers, data=payload)
return json.loads(response.text)
def get_user_accounts(base_url: str, auth: str) -> List[Dict[str, Any]]:
"""Retrieve user accounts from the specified base URL.
Args:
base_url (str): The base URL of the API.
auth (str): The authorization token for accessing the API.
Returns:
List[Dict[str, Any]]: A list of user accounts with their expiration dates rendered.
"""
url = f"{base_url}/getUserAccounts"
payload = {}
headers = {"Authorization": f"Basic {auth}"}
response = requests.request("GET", url, headers=headers, data=payload)
res_json = json.loads(response.text)
for account in res_json:
account["expiaryDate_rendered"] = datetime.utcfromtimestamp(
account["expiaryDate"]
).strftime("%d/%m/%Y")
return res_json
def delete_user_account(base_url: str, auth: str, stream: str, username: str) -> bool:
"""Delete a user account from the specified base URL.
Args:
base_url (str): The base URL of the API.
auth (str): The authorization token for accessing the API.
stream (str): The name of the stream associated with the user account.
username (str): The username of the account to delete.
Returns:
bool: True if the account was deleted successfully, False otherwise.
"""
url = f"{base_url}/deleteAccount"
payload = {"stream": stream, "user": username}
headers = {"Authorization": f"Basic {auth}"}
response = requests.request("POST", url, headers=headers, data=payload)
return "Deleted" in response.text
def add_user_account(base_url: str, auth: str, username: str, password: str, stream: str) -> bool:
"""Add a user account to the specified base URL.
Args:
base_url (str): The base URL of the API.
auth (str): The authorization token for accessing the API.
username (str): The username of the account to add.
password (str): The password of the account to add.
stream (str): The name of the stream associated with the user account.
Returns:
bool: True if the account was added successfully, False otherwise.
"""
url = f"{base_url}/addAccount"
payload = {"username": username, "password": password, "stream": stream}
headers = {"Authorization": f"Basic {auth}"}
response = requests.request("POST", url, headers=headers, data=payload)
2025-07-14 20:09:17 +01:00
return response.status_code == 200
2025-05-09 16:31:14 +01:00
2025-07-05 10:57:19 +01:00
def get_stream_names(base_url: str, auth: str) -> List[str]:
"""Get a list of stream names from the API.
Args:
base_url (str): The base URL of the API.
auth (str): The authorization token.
Returns:
List[str]: A list of stream names.
"""
url = f"{base_url}/getStreamNames"
2025-07-13 15:45:12 +01:00
payload = {}
2025-07-05 10:57:19 +01:00
headers = {"Authorization": f"Basic {auth}"}
2025-07-13 15:45:12 +01:00
response = requests.request("GET", url, headers=headers, data=payload)
if response.status_code == 200 and response.text:
try:
return json.loads(response.text)
except json.JSONDecodeError:
return []
return []