123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- import requests
- import json
- from datetime import datetime
- from typing import List, Dict, Any
- def get_urls(base_url: str, auth: str) -> List[Dict[str, Any]]:
- """Retrieve user account streams from the specified base URL.
- Args:
- base_url (str): The base URL of the API.
- auth (str): The authorization token for accessing the API.
- Returns:
- List[Dict[str, Any]]: A list of user account streams.
- """
- url = f"{base_url}/getUserAccounts/streams"
- payload = {}
- headers = {"Authorization": f"Basic {auth}"}
- response = requests.request("GET", url, headers=headers, data=payload)
- return json.loads(response.text)
- def get_user_accounts(base_url: str, auth: str) -> List[Dict[str, Any]]:
- """Retrieve user accounts from the specified base URL.
- Args:
- base_url (str): The base URL of the API.
- auth (str): The authorization token for accessing the API.
- Returns:
- List[Dict[str, Any]]: A list of user accounts with their expiration dates rendered.
- """
- url = f"{base_url}/getUserAccounts"
- payload = {}
- headers = {"Authorization": f"Basic {auth}"}
- response = requests.request("GET", url, headers=headers, data=payload)
- res_json = json.loads(response.text)
- for account in res_json:
- account["expiaryDate_rendered"] = datetime.utcfromtimestamp(
- account["expiaryDate"]
- ).strftime("%d/%m/%Y")
- return res_json
- def delete_user_account(base_url: str, auth: str, stream: str, username: str) -> bool:
- """Delete a user account from the specified base URL.
- Args:
- base_url (str): The base URL of the API.
- auth (str): The authorization token for accessing the API.
- stream (str): The name of the stream associated with the user account.
- username (str): The username of the account to delete.
- Returns:
- bool: True if the account was deleted successfully, False otherwise.
- """
- url = f"{base_url}/deleteAccount"
- payload = {"stream": stream, "user": username}
- headers = {"Authorization": f"Basic {auth}"}
- response = requests.request("POST", url, headers=headers, data=payload)
- return "Deleted" in response.text
- def add_user_account(base_url: str, auth: str, username: str, password: str, stream: str) -> bool:
- """Add a user account to the specified base URL.
- Args:
- base_url (str): The base URL of the API.
- auth (str): The authorization token for accessing the API.
- username (str): The username of the account to add.
- password (str): The password of the account to add.
- stream (str): The name of the stream associated with the user account.
- Returns:
- bool: True if the account was added successfully, False otherwise.
- """
- url = f"{base_url}/addAccount"
- payload = {"username": username, "password": password, "stream": stream}
- headers = {"Authorization": f"Basic {auth}"}
- response = requests.request("POST", url, headers=headers, data=payload)
- return "Added successfully" in response.text
- def get_user_accounts_count(base_url: str, auth: str) -> int:
- """Get the count of user accounts from the specified base URL.
- Args:
- base_url (str): The base URL of the API.
- auth (str): The authorization token for accessing the API.
- Returns:
- int: The count of user accounts.
- """
- url = f"{base_url}/getUserAccounts/count"
- payload = {}
- headers = {"Authorization": f"Basic {auth}"}
- response = requests.request("GET", url, headers=headers, data=payload)
- res_json = json.loads(response.text)
- return res_json['count']
|