106 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			106 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import requests
 | |
| import json
 | |
| from datetime import datetime
 | |
| from typing import List, Dict, Any
 | |
| 
 | |
| 
 | |
| def get_urls(base_url: str, auth: str) -> List[Dict[str, Any]]:
 | |
|     """Retrieves user account streams from the API.
 | |
| 
 | |
|     Args:
 | |
|         base_url: The base URL of the API.
 | |
|         auth: The authorization token.
 | |
| 
 | |
|     Returns:
 | |
|         A list of user account streams.
 | |
|     """
 | |
|     url = f"{base_url}/getUserAccounts/streams"
 | |
|     headers = {"Authorization": f"Basic {auth}"}
 | |
|     response = requests.get(url, headers=headers)
 | |
|     response.raise_for_status()
 | |
|     return response.json()
 | |
| 
 | |
| 
 | |
| def get_user_accounts(base_url: str, auth: str) -> List[Dict[str, Any]]:
 | |
|     """Retrieves user accounts from the API.
 | |
| 
 | |
|     Args:
 | |
|         base_url: The base URL of the API.
 | |
|         auth: The authorization token.
 | |
| 
 | |
|     Returns:
 | |
|         A list of user accounts with 'expiaryDate_rendered' added.
 | |
|     """
 | |
|     url = f"{base_url}/getUserAccounts"
 | |
|     headers = {"Authorization": f"Basic {auth}"}
 | |
|     response = requests.get(url, headers=headers)
 | |
|     response.raise_for_status()
 | |
|     accounts = response.json()
 | |
| 
 | |
|     for account in accounts:
 | |
|         account["expiaryDate_rendered"] = datetime.utcfromtimestamp(
 | |
|             account["expiaryDate"]
 | |
|         ).strftime("%d/%m/%Y")
 | |
| 
 | |
|     return accounts
 | |
| 
 | |
| 
 | |
| def delete_user_account(base_url: str, auth: str, stream: str, username: str) -> bool:
 | |
|     """Deletes a user account via the API.
 | |
| 
 | |
|     Args:
 | |
|         base_url: The base URL of the API.
 | |
|         auth: The authorization token.
 | |
|         stream: The stream associated with the account.
 | |
|         username: The username of the account to delete.
 | |
| 
 | |
|     Returns:
 | |
|         True if the account was deleted successfully, False otherwise.
 | |
|     """
 | |
|     url = f"{base_url}/deleteAccount"
 | |
|     payload = {"stream": stream, "user": username}
 | |
|     headers = {"Authorization": f"Basic {auth}"}
 | |
|     response = requests.post(url, headers=headers, data=payload)
 | |
|     response.raise_for_status()
 | |
|     return "Deleted" in response.text
 | |
| 
 | |
| 
 | |
| def add_user_account(base_url: str, auth: str, username: str, password: str, stream: str) -> bool:
 | |
|     """Adds a user account via the API.
 | |
| 
 | |
|     Args:
 | |
|         base_url: The base URL of the API.
 | |
|         auth: The authorization token.
 | |
|         username: The username of the new account.
 | |
|         password: The password for the new account.
 | |
|         stream: The stream to associate with the new account.
 | |
| 
 | |
|     Returns:
 | |
|         True if the account was added successfully, False otherwise.
 | |
|     """
 | |
|     url = f"{base_url}/addAccount"
 | |
|     payload = {"username": username, "password": password, "stream": stream}
 | |
|     headers = {"Authorization": f"Basic {auth}"}
 | |
|     response = requests.post(url, headers=headers, data=payload)
 | |
|     return response.status_code == 200
 | |
| 
 | |
| 
 | |
| def get_stream_names(base_url: str, auth: str) -> List[str]:
 | |
|     """Retrieves a list of stream names from the API.
 | |
| 
 | |
|     Args:
 | |
|         base_url: The base URL of the API.
 | |
|         auth: The authorization token.
 | |
| 
 | |
|     Returns:
 | |
|         A list of stream names, or an empty list if an error occurs.
 | |
|     """
 | |
|     url = f"{base_url}/getStreamNames"
 | |
|     headers = {"Authorization": f"Basic {auth}"}
 | |
|     try:
 | |
|         response = requests.get(url, headers=headers)
 | |
|         response.raise_for_status()
 | |
|         return response.json()
 | |
|     except (requests.exceptions.RequestException, json.JSONDecodeError):
 | |
|         return []
 |