from flask import Blueprint, jsonify, Response, request, current_app from ktvmanager.lib.database import ( get_user_accounts, get_stream_names, single_check, add_account, delete_account, get_user_id_from_username, save_push_subscription, get_push_subscriptions, ) from ktvmanager.lib.get_urls import get_latest_urls_from_dns from ktvmanager.lib.auth import requires_basic_auth, check_login from ktvmanager.lib.checker import validate_account from typing import Tuple import json import re import base64 from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ec from pywebpush import webpush, WebPushException api_blueprint = Blueprint("api", __name__) @api_blueprint.route("/getUserAccounts") @requires_basic_auth def get_user_accounts_route(username: str, password: str) -> Response: """Retrieves all accounts associated with a user. Args: username: The username of the user. password: The password of the user (used for authentication). Returns: A Flask JSON response containing the user's accounts or an error message. """ user_id = get_user_id_from_username(username) if user_id: return get_user_accounts(user_id) return jsonify({"message": "User not found"}), 404 @api_blueprint.route("/getStreamNames") @requires_basic_auth def get_stream_names_route(username: str, password: str) -> Response: """Retrieves all stream names. Args: username: The username of the user. password: The password of the user (used for authentication). Returns: A Flask JSON response containing the list of stream names. """ return get_stream_names() @api_blueprint.route("/getUserAccounts/streams") @requires_basic_auth def get_user_accounts_streams_route(username: str, password: str) -> Response: """Retrieves the latest stream URLs from DNS. Args: username: The username of the user. password: The password of the user (used for authentication). Returns: A Flask JSON response containing the list of stream URLs. """ return jsonify(get_latest_urls_from_dns()) @api_blueprint.route("/singleCheck", methods=["POST"]) @requires_basic_auth def single_check_route(username: str, password: str) -> Response: """Performs a single account check. Args: username: The username of the user. password: The password of the user (used for authentication). Returns: A Flask JSON response with the result of the check. """ return single_check() @api_blueprint.route("/addAccount", methods=["POST"]) @requires_basic_auth def add_account_route(username: str, password: str) -> Response: """Adds a new account for the user. Args: username: The username of the user. password: The password of the user (used for authentication). Returns: A Flask JSON response confirming the account was added or an error message. """ user_id = get_user_id_from_username(username) return add_account(user_id) @api_blueprint.route("/deleteAccount", methods=["POST"]) @requires_basic_auth def delete_account_route(username: str, password: str) -> Response: """Deletes an account for the user. Args: username: The username of the user. password: The password of the user (used for authentication). Returns: A Flask JSON response confirming the account was deleted or an error message. """ user_id = get_user_id_from_username(username) return delete_account(user_id) @api_blueprint.route("/validateAccount", methods=["POST"]) @requires_basic_auth def validate_account_route(username: str, password: str) -> Tuple[Response, int]: """Validates an account. Args: username: The username of the user. password: The password of the user (used for authentication). Returns: A tuple containing a Flask JSON response and an HTTP status code. """ return validate_account() @api_blueprint.route("/Login") @requires_basic_auth def login_route(username: str, password: str) -> Response: """Logs a user in. Args: username: The username of the user. password: The password of the user. Returns: A Flask JSON response with the result of the login attempt. """ return check_login(username, password) @api_blueprint.route("/vapid-public-key", methods=["GET"]) def vapid_public_key(): """Provides the VAPID public key in the correct format.""" pem_key = current_app.config["VAPID_PUBLIC_KEY"] try: public_key = serialization.load_pem_public_key(pem_key.encode("utf-8")) if not isinstance(public_key, ec.EllipticCurvePublicKey): raise TypeError("VAPID public key is not an Elliptic Curve key") # Get the raw, uncompressed public key bytes (65 bytes for P-256) raw_key = public_key.public_bytes( encoding=serialization.Encoding.X962, format=serialization.PublicFormat.UncompressedPoint ) # URL-safe base64 encode the raw key url_safe_key = base64.urlsafe_b64encode(raw_key).rstrip(b'=').decode('utf-8') return jsonify({"public_key": url_safe_key}) except (ValueError, TypeError, AttributeError) as e: current_app.logger.error(f"Error processing VAPID public key: {e}") return jsonify({"error": "Could not process VAPID public key"}), 500 @api_blueprint.route("/save-subscription", methods=["POST"]) @requires_basic_auth def save_subscription(username: str, password: str) -> Response: """Saves a push notification subscription. Args: username: The username of the user. password: The password of the user. Returns: A Flask JSON response. """ user_id = get_user_id_from_username(username) if not user_id: return jsonify({"message": "User not found"}), 404 subscription_data = request.get_json() if not subscription_data: return jsonify({"message": "No subscription data provided"}), 400 save_push_subscription(user_id, json.dumps(subscription_data)) return jsonify({"message": "Subscription saved."}) def send_notification(subscription_info, message_body): try: webpush( subscription_info=subscription_info, data=message_body, vapid_private_key=current_app.config["VAPID_PRIVATE_KEY"], vapid_claims={"sub": current_app.config["VAPID_CLAIM_EMAIL"]}, ) except WebPushException as ex: print(f"Web push error: {ex}") # You might want to remove the subscription if it's invalid if ex.response and ex.response.status_code == 410: print("Subscription is no longer valid, removing from DB.") # Add logic to remove the subscription from your database @api_blueprint.route("/send-expiry-notifications", methods=["POST"]) @requires_basic_auth def send_expiry_notifications_route(username: str, password: str) -> Response: """Triggers the sending of expiry notifications. Args: username: The username of the user. password: The password of the user. Returns: A Flask JSON response. """ from ktvmanager.account_checker import send_expiry_notifications send_expiry_notifications() return jsonify({"message": "Expiry notifications sent."}) @api_blueprint.route("/send-test-notification", methods=["POST"]) @requires_basic_auth def send_test_notification_route(username: str, password: str) -> Response: """Sends a test push notification to all users.""" data = request.get_json(silent=True) message = data.get("message", "Ktv Test") if data else "Ktv Test" try: subscriptions = get_push_subscriptions() # Get all subscriptions except Exception as e: print(f"Error getting push subscriptions: {e}") return jsonify({"error": "Could not retrieve push subscriptions from the database."}), 500 if not subscriptions: return jsonify({"message": "No push subscriptions found."}), 404 message_body = json.dumps({"title": "KTVManager", "body": message}) success_count = 0 failure_count = 0 for sub in subscriptions: try: send_notification(json.loads(sub['subscription_json']), message_body) success_count += 1 except Exception as e: print(f"Error sending notification to subscription ID {sub.get('id', 'N/A')}: {e}") failure_count += 1 return jsonify({ "message": f"Test notification sending process completed.", "sent": success_count, "failed": failure_count })