144 lines
4.7 KiB
Python

import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Optional, Dict, Any, Tuple
from flask import request, jsonify, Response
from ktvmanager.lib.get_urls import get_latest_urls_from_dns
def build_url(stream_url: str, username: str, password: str) -> str:
"""Builds the player API URL for a given stream URL and credentials.
Args:
stream_url: The base URL of the streaming server.
username: The username for the account.
password: The password for the account.
Returns:
The fully constructed player API URL.
"""
return f"{stream_url}/player_api.php?username={username}&password={password}"
def check_url(url: str) -> Optional[Dict[str, Any]]:
"""Checks if a given URL is a valid and authenticated streaming service endpoint.
Args:
url: The URL to check.
Returns:
A dictionary containing the JSON response from the server if the account is
valid, otherwise None.
"""
try:
response = requests.get(url, timeout=5)
response.raise_for_status()
data = response.json()
if data.get("user_info", {}).get("auth"):
return data
except (requests.exceptions.RequestException, ValueError):
# Return None for any request/parsing error
return None
return None
def single_account_check(
account_data: Dict[str, str], stream_urls: List[str]
) -> Optional[Dict[str, Any]]:
"""Checks a single account against a list of stream URLs concurrently.
Args:
account_data: A dictionary containing the 'username' and 'password'.
stream_urls: A list of stream URLs to check against.
Returns:
A dictionary containing the valid URL and the server's response data
if a valid URL is found, otherwise None.
"""
if not stream_urls:
return None
with ThreadPoolExecutor(max_workers=min(10, len(stream_urls))) as executor:
future_to_url = {
executor.submit(
check_url,
build_url(
stream_url, account_data["username"], account_data["password"]
),
): stream_url
for stream_url in stream_urls
}
for future in as_completed(future_to_url):
result = future.result()
if result:
return {"url": future_to_url[future], "data": result}
return None
def validate_account() -> Tuple[Response, int]:
"""Validates account credentials provided in a JSON request.
Returns:
A Flask JSON response tuple containing a success or error message
and an HTTP status code.
"""
data = request.get_json()
username = data.get("username")
password = data.get("password")
expiry_date = data.get("expiry_date")
stream = data.get("stream")
stream_url = data.get("streamURL")
if not all([username, password]):
return jsonify({"message": "Missing required fields"}), 400
stream_urls = get_latest_urls_from_dns()
account_data = {"username": username, "password": password}
result = single_account_check(account_data, stream_urls)
if result:
if result["url"] != stream_url:
from ktvmanager.lib.database import update_stream_url
update_stream_url(result["url"], stream_url)
return (
jsonify({"message": "Account is valid and updated", "data": result}),
200,
)
if (
expiry_date
and stream
and int(result["data"]["user_info"]["exp_date"]) != expiry_date
):
from ktvmanager.lib.database import update_expiry_date
update_expiry_date(
username, stream, result["data"]["user_info"]["exp_date"]
)
return (
jsonify({"message": "Account is valid and updated", "data": result}),
200,
)
if (
int(result.get("data", {}).get("user_info", {}).get("max_connections"))
and data.get("max_connections")
and int(result["data"]["user_info"]["max_connections"])
!= data.get("max_connections")
):
from ktvmanager.lib.database import update_max_connections
update_max_connections(
username, stream, int(result["data"]["user_info"]["max_connections"])
)
return (
jsonify({"message": "Account is valid and updated", "data": result}),
200,
)
return jsonify({"message": "Account is valid", "data": result}), 200
else:
return jsonify({"message": "Account is invalid"}), 401