import requests from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Optional, Dict, Any, Tuple from flask import request, jsonify, Response from ktvmanager.lib.get_urls import get_latest_urls_from_dns def build_url(stream_url: str, username: str, password: str) -> str: """Builds the player API URL for a given stream URL and credentials. Args: stream_url: The base URL of the streaming server. username: The username for the account. password: The password for the account. Returns: The fully constructed player API URL. """ return f"{stream_url}/player_api.php?username={username}&password={password}" def check_url(url: str) -> Optional[Dict[str, Any]]: """Checks if a given URL is a valid and authenticated streaming service endpoint. Args: url: The URL to check. Returns: A dictionary containing the JSON response from the server if the account is valid, otherwise None. """ try: response = requests.get(url, timeout=5) response.raise_for_status() data = response.json() if data.get("user_info", {}).get("auth"): return data except (requests.exceptions.RequestException, ValueError): # Return None for any request/parsing error return None return None def single_account_check( account_data: Dict[str, str], stream_urls: List[str] ) -> Optional[Dict[str, Any]]: """Checks a single account against a list of stream URLs concurrently. Args: account_data: A dictionary containing the 'username' and 'password'. stream_urls: A list of stream URLs to check against. Returns: A dictionary containing the valid URL and the server's response data if a valid URL is found, otherwise None. """ if not stream_urls: return None with ThreadPoolExecutor(max_workers=min(10, len(stream_urls))) as executor: future_to_url = { executor.submit( check_url, build_url( stream_url, account_data["username"], account_data["password"] ), ): stream_url for stream_url in stream_urls } for future in as_completed(future_to_url): result = future.result() if result: return {"url": future_to_url[future], "data": result} return None def validate_account() -> Tuple[Response, int]: """Validates account credentials provided in a JSON request. Returns: A Flask JSON response tuple containing a success or error message and an HTTP status code. """ data = request.get_json() username = data.get("username") password = data.get("password") expiry_date = data.get("expiry_date") stream = data.get("stream") stream_url = data.get("streamURL") if not all([username, password]): return jsonify({"message": "Missing required fields"}), 400 stream_urls = get_latest_urls_from_dns() account_data = {"username": username, "password": password} result = single_account_check(account_data, stream_urls) if result: if result["url"] != stream_url: from ktvmanager.lib.database import update_stream_url update_stream_url(result["url"], stream_url) return ( jsonify({"message": "Account is valid and updated", "data": result}), 200, ) if ( expiry_date and stream and int(result["data"]["user_info"]["exp_date"]) != expiry_date ): from ktvmanager.lib.database import update_expiry_date update_expiry_date( username, stream, result["data"]["user_info"]["exp_date"] ) return ( jsonify({"message": "Account is valid and updated", "data": result}), 200, ) if ( int(result.get("data", {}).get("user_info", {}).get("max_connections")) and data.get("max_connections") and int(result["data"]["user_info"]["max_connections"]) != data.get("max_connections") ): from ktvmanager.lib.database import update_max_connections update_max_connections( username, stream, int(result["data"]["user_info"]["max_connections"]) ) return ( jsonify({"message": "Account is valid and updated", "data": result}), 200, ) return jsonify({"message": "Account is valid", "data": result}), 200 else: return jsonify({"message": "Account is invalid"}), 401