111 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			111 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import requests
 | |
| import json
 | |
| from datetime import datetime
 | |
| from typing import List, Dict, Any
 | |
| 
 | |
| 
 | |
| def get_urls(base_url: str, auth: str) -> List[Dict[str, Any]]:
 | |
|     """Retrieve user account streams from the specified base URL.
 | |
| 
 | |
|     Args:
 | |
|         base_url (str): The base URL of the API.
 | |
|         auth (str): The authorization token for accessing the API.
 | |
| 
 | |
|     Returns:
 | |
|         List[Dict[str, Any]]: A list of user account streams.
 | |
|     """
 | |
|     url = f"{base_url}/getUserAccounts/streams"
 | |
|     payload = {}
 | |
|     headers = {"Authorization": f"Basic {auth}"}
 | |
| 
 | |
|     response = requests.request("GET", url, headers=headers, data=payload)
 | |
|     return json.loads(response.text)
 | |
| 
 | |
| 
 | |
| def get_user_accounts(base_url: str, auth: str) -> List[Dict[str, Any]]:
 | |
|     """Retrieve user accounts from the specified base URL.
 | |
| 
 | |
|     Args:
 | |
|         base_url (str): The base URL of the API.
 | |
|         auth (str): The authorization token for accessing the API.
 | |
| 
 | |
|     Returns:
 | |
|         List[Dict[str, Any]]: A list of user accounts with their expiration dates rendered.
 | |
|     """
 | |
|     url = f"{base_url}/getUserAccounts"
 | |
|     payload = {}
 | |
|     headers = {"Authorization": f"Basic {auth}"}
 | |
| 
 | |
|     response = requests.request("GET", url, headers=headers, data=payload)
 | |
|     res_json = json.loads(response.text)
 | |
| 
 | |
|     for account in res_json:
 | |
|         account["expiaryDate_rendered"] = datetime.utcfromtimestamp(
 | |
|             account["expiaryDate"]
 | |
|         ).strftime("%d/%m/%Y")
 | |
| 
 | |
|     return res_json
 | |
| 
 | |
| 
 | |
| def delete_user_account(base_url: str, auth: str, stream: str, username: str) -> bool:
 | |
|     """Delete a user account from the specified base URL.
 | |
| 
 | |
|     Args:
 | |
|         base_url (str): The base URL of the API.
 | |
|         auth (str): The authorization token for accessing the API.
 | |
|         stream (str): The name of the stream associated with the user account.
 | |
|         username (str): The username of the account to delete.
 | |
| 
 | |
|     Returns:
 | |
|         bool: True if the account was deleted successfully, False otherwise.
 | |
|     """
 | |
|     url = f"{base_url}/deleteAccount"
 | |
|     payload = {"stream": stream, "user": username}
 | |
|     headers = {"Authorization": f"Basic {auth}"}
 | |
| 
 | |
|     response = requests.request("POST", url, headers=headers, data=payload)
 | |
|     return "Deleted" in response.text
 | |
| 
 | |
| 
 | |
| def add_user_account(base_url: str, auth: str, username: str, password: str, stream: str) -> bool:
 | |
|     """Add a user account to the specified base URL.
 | |
| 
 | |
|     Args:
 | |
|         base_url (str): The base URL of the API.
 | |
|         auth (str): The authorization token for accessing the API.
 | |
|         username (str): The username of the account to add.
 | |
|         password (str): The password of the account to add.
 | |
|         stream (str): The name of the stream associated with the user account.
 | |
| 
 | |
|     Returns:
 | |
|         bool: True if the account was added successfully, False otherwise.
 | |
|     """
 | |
|     url = f"{base_url}/addAccount"
 | |
|     payload = {"username": username, "password": password, "stream": stream}
 | |
|     headers = {"Authorization": f"Basic {auth}"}
 | |
| 
 | |
|     response = requests.request("POST", url, headers=headers, data=payload)
 | |
|     return response.status_code == 200
 | |
| 
 | |
| 
 | |
| def get_stream_names(base_url: str, auth: str) -> List[str]:
 | |
|     """Get a list of stream names from the API.
 | |
| 
 | |
|     Args:
 | |
|         base_url (str): The base URL of the API.
 | |
|         auth (str): The authorization token.
 | |
| 
 | |
|     Returns:
 | |
|         List[str]: A list of stream names.
 | |
|     """
 | |
|     url = f"{base_url}/getStreamNames"
 | |
|     payload = {}
 | |
|     headers = {"Authorization": f"Basic {auth}"}
 | |
|     response = requests.request("GET", url, headers=headers, data=payload)
 | |
|     if response.status_code == 200 and response.text:
 | |
|         try:
 | |
|             return json.loads(response.text)
 | |
|         except json.JSONDecodeError:
 | |
|             return []
 | |
|     return []
 |