2025-07-15 15:45:17 +01:00

140 lines
4.1 KiB
Python

from flask import Blueprint, jsonify, Response
from ktvmanager.lib.database import (
get_user_accounts,
get_stream_names,
single_check,
add_account,
delete_account,
get_user_id_from_username,
)
from ktvmanager.lib.get_urls import get_latest_urls_from_dns
from ktvmanager.lib.auth import requires_basic_auth, check_login
from ktvmanager.lib.checker import validate_account
from typing import Tuple
api_blueprint = Blueprint("api", __name__)
@api_blueprint.route("/getUserAccounts")
@requires_basic_auth
def get_user_accounts_route(username: str, password: str) -> Response:
"""Retrieves all accounts associated with a user.
Args:
username: The username of the user.
password: The password of the user (used for authentication).
Returns:
A Flask JSON response containing the user's accounts or an error message.
"""
user_id = get_user_id_from_username(username)
if user_id:
return get_user_accounts(user_id)
return jsonify({"message": "User not found"}), 404
@api_blueprint.route("/getStreamNames")
@requires_basic_auth
def get_stream_names_route(username: str, password: str) -> Response:
"""Retrieves all stream names.
Args:
username: The username of the user.
password: The password of the user (used for authentication).
Returns:
A Flask JSON response containing the list of stream names.
"""
return get_stream_names()
@api_blueprint.route("/getUserAccounts/streams")
@requires_basic_auth
def get_user_accounts_streams_route(username: str, password: str) -> Response:
"""Retrieves the latest stream URLs from DNS.
Args:
username: The username of the user.
password: The password of the user (used for authentication).
Returns:
A Flask JSON response containing the list of stream URLs.
"""
return jsonify(get_latest_urls_from_dns())
@api_blueprint.route("/singleCheck", methods=["POST"])
@requires_basic_auth
def single_check_route(username: str, password: str) -> Response:
"""Performs a single account check.
Args:
username: The username of the user.
password: The password of the user (used for authentication).
Returns:
A Flask JSON response with the result of the check.
"""
return single_check()
@api_blueprint.route("/addAccount", methods=["POST"])
@requires_basic_auth
def add_account_route(username: str, password: str) -> Response:
"""Adds a new account for the user.
Args:
username: The username of the user.
password: The password of the user (used for authentication).
Returns:
A Flask JSON response confirming the account was added or an error message.
"""
user_id = get_user_id_from_username(username)
return add_account(user_id)
@api_blueprint.route("/deleteAccount", methods=["POST"])
@requires_basic_auth
def delete_account_route(username: str, password: str) -> Response:
"""Deletes an account for the user.
Args:
username: The username of the user.
password: The password of the user (used for authentication).
Returns:
A Flask JSON response confirming the account was deleted or an error message.
"""
user_id = get_user_id_from_username(username)
return delete_account(user_id)
@api_blueprint.route("/validateAccount", methods=["POST"])
@requires_basic_auth
def validate_account_route(username: str, password: str) -> Tuple[Response, int]:
"""Validates an account.
Args:
username: The username of the user.
password: The password of the user (used for authentication).
Returns:
A tuple containing a Flask JSON response and an HTTP status code.
"""
return validate_account()
@api_blueprint.route("/Login")
@requires_basic_auth
def login_route(username: str, password: str) -> Response:
"""Logs a user in.
Args:
username: The username of the user.
password: The password of the user.
Returns:
A Flask JSON response with the result of the login attempt.
"""
return check_login(username, password)