2025-07-15 15:42:47 +01:00

59 lines
1.9 KiB
Python

import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from flask import request, jsonify
from ktvmanager.lib.get_urls import get_latest_urls_from_dns
def build_url(stream_url, username, password):
return f"{stream_url}/player_api.php?username={username}&password={password}"
def check_url(url):
try:
response = requests.get(url, timeout=5)
response.raise_for_status()
data = response.json()
if data.get("user_info", {}).get("auth"):
return data
except (requests.exceptions.RequestException, ValueError):
# Return None for any request/parsing error
return None
return None
def single_account_check(account_data, stream_urls):
if not stream_urls:
return None
executor = ThreadPoolExecutor(max_workers=min(10, len(stream_urls)))
future_to_url = {
executor.submit(
check_url,
build_url(stream_url, account_data['username'], account_data['password'])
): stream_url
for stream_url in stream_urls
}
final_result = None
for future in as_completed(future_to_url):
result = future.result()
if result:
final_result = {"url": future_to_url[future], "data": result}
break # Found a valid URL, stop checking others
executor.shutdown(wait=False) # Don't wait for other threads to finish
return final_result
def validate_account():
data = request.get_json()
username = data.get("username")
password = data.get("password")
if not all([username, password]):
return jsonify({"message": "Missing required fields"}), 400
stream_urls = get_latest_urls_from_dns()
account_data = {"username": username, "password": password}
result = single_account_check(account_data, stream_urls)
if result:
return jsonify({"message": "Account is valid"})
else:
return jsonify({"message": "Account is invalid"}), 401