108 lines
3.6 KiB
Python
108 lines
3.6 KiB
Python
import requests
|
|
import json
|
|
from datetime import datetime
|
|
from typing import List, Dict, Any
|
|
|
|
|
|
def get_urls(base_url: str, auth: str) -> List[Dict[str, Any]]:
|
|
"""Retrieve user account streams from the specified base URL.
|
|
|
|
Args:
|
|
base_url (str): The base URL of the API.
|
|
auth (str): The authorization token for accessing the API.
|
|
|
|
Returns:
|
|
List[Dict[str, Any]]: A list of user account streams.
|
|
"""
|
|
url = f"{base_url}/getUserAccounts/streams"
|
|
payload = {}
|
|
headers = {"Authorization": f"Basic {auth}"}
|
|
|
|
response = requests.request("GET", url, headers=headers, data=payload)
|
|
return json.loads(response.text)
|
|
|
|
|
|
def get_user_accounts(base_url: str, auth: str) -> List[Dict[str, Any]]:
|
|
"""Retrieve user accounts from the specified base URL.
|
|
|
|
Args:
|
|
base_url (str): The base URL of the API.
|
|
auth (str): The authorization token for accessing the API.
|
|
|
|
Returns:
|
|
List[Dict[str, Any]]: A list of user accounts with their expiration dates rendered.
|
|
"""
|
|
url = f"{base_url}/getUserAccounts"
|
|
payload = {}
|
|
headers = {"Authorization": f"Basic {auth}"}
|
|
|
|
response = requests.request("GET", url, headers=headers, data=payload)
|
|
res_json = json.loads(response.text)
|
|
|
|
for account in res_json:
|
|
account["expiaryDate_rendered"] = datetime.utcfromtimestamp(
|
|
account["expiaryDate"]
|
|
).strftime("%d/%m/%Y")
|
|
|
|
return res_json
|
|
|
|
|
|
def delete_user_account(base_url: str, auth: str, stream: str, username: str) -> bool:
|
|
"""Delete a user account from the specified base URL.
|
|
|
|
Args:
|
|
base_url (str): The base URL of the API.
|
|
auth (str): The authorization token for accessing the API.
|
|
stream (str): The name of the stream associated with the user account.
|
|
username (str): The username of the account to delete.
|
|
|
|
Returns:
|
|
bool: True if the account was deleted successfully, False otherwise.
|
|
"""
|
|
url = f"{base_url}/deleteAccount"
|
|
payload = {"stream": stream, "user": username}
|
|
headers = {"Authorization": f"Basic {auth}"}
|
|
|
|
response = requests.request("POST", url, headers=headers, data=payload)
|
|
return "Deleted" in response.text
|
|
|
|
|
|
def add_user_account(base_url: str, auth: str, username: str, password: str, stream: str) -> bool:
|
|
"""Add a user account to the specified base URL.
|
|
|
|
Args:
|
|
base_url (str): The base URL of the API.
|
|
auth (str): The authorization token for accessing the API.
|
|
username (str): The username of the account to add.
|
|
password (str): The password of the account to add.
|
|
stream (str): The name of the stream associated with the user account.
|
|
|
|
Returns:
|
|
bool: True if the account was added successfully, False otherwise.
|
|
"""
|
|
url = f"{base_url}/addAccount"
|
|
payload = {"username": username, "password": password, "stream": stream}
|
|
headers = {"Authorization": f"Basic {auth}"}
|
|
|
|
response = requests.request("POST", url, headers=headers, data=payload)
|
|
return "Added successfully" in response.text
|
|
|
|
|
|
def get_user_accounts_count(base_url: str, auth: str) -> int:
|
|
"""Get the count of user accounts from the specified base URL.
|
|
|
|
Args:
|
|
base_url (str): The base URL of the API.
|
|
auth (str): The authorization token for accessing the API.
|
|
|
|
Returns:
|
|
int: The count of user accounts.
|
|
"""
|
|
url = f"{base_url}/getUserAccounts/count"
|
|
payload = {}
|
|
headers = {"Authorization": f"Basic {auth}"}
|
|
|
|
response = requests.request("GET", url, headers=headers, data=payload)
|
|
res_json = json.loads(response.text)
|
|
return res_json['count']
|