import requests import json from datetime import datetime from typing import List, Dict, Any def get_urls(base_url: str, auth: str) -> List[Dict[str, Any]]: """Retrieve user account streams from the specified base URL. Args: base_url (str): The base URL of the API. auth (str): The authorization token for accessing the API. Returns: List[Dict[str, Any]]: A list of user account streams. """ url = f"{base_url}/getUserAccounts/streams" payload = {} headers = {"Authorization": f"Basic {auth}"} response = requests.request("GET", url, headers=headers, data=payload) return json.loads(response.text) def get_user_accounts(base_url: str, auth: str) -> List[Dict[str, Any]]: """Retrieve user accounts from the specified base URL. Args: base_url (str): The base URL of the API. auth (str): The authorization token for accessing the API. Returns: List[Dict[str, Any]]: A list of user accounts with their expiration dates rendered. """ url = f"{base_url}/getUserAccounts" payload = {} headers = {"Authorization": f"Basic {auth}"} response = requests.request("GET", url, headers=headers, data=payload) res_json = json.loads(response.text) for account in res_json: account["expiaryDate_rendered"] = datetime.utcfromtimestamp( account["expiaryDate"] ).strftime("%d/%m/%Y") return res_json def delete_user_account(base_url: str, auth: str, stream: str, username: str) -> bool: """Delete a user account from the specified base URL. Args: base_url (str): The base URL of the API. auth (str): The authorization token for accessing the API. stream (str): The name of the stream associated with the user account. username (str): The username of the account to delete. Returns: bool: True if the account was deleted successfully, False otherwise. """ url = f"{base_url}/deleteAccount" payload = {"stream": stream, "user": username} headers = {"Authorization": f"Basic {auth}"} response = requests.request("POST", url, headers=headers, data=payload) return "Deleted" in response.text def add_user_account(base_url: str, auth: str, username: str, password: str, stream: str) -> bool: """Add a user account to the specified base URL. Args: base_url (str): The base URL of the API. auth (str): The authorization token for accessing the API. username (str): The username of the account to add. password (str): The password of the account to add. stream (str): The name of the stream associated with the user account. Returns: bool: True if the account was added successfully, False otherwise. """ url = f"{base_url}/addAccount" payload = {"username": username, "password": password, "stream": stream} headers = {"Authorization": f"Basic {auth}"} response = requests.request("POST", url, headers=headers, data=payload) return "Added successfully" in response.text def get_user_accounts_count(base_url: str, auth: str) -> int: """Get the count of user accounts from the specified base URL. Args: base_url (str): The base URL of the API. auth (str): The authorization token for accessing the API. Returns: int: The count of user accounts. """ url = f"{base_url}/getUserAccounts/count" payload = {} headers = {"Authorization": f"Basic {auth}"} response = requests.request("GET", url, headers=headers, data=payload) res_json = json.loads(response.text) return res_json['count']