feat(app): restructure as a modern Flask API

This commit refactors the entire backend application into a more structured and maintainable Flask project. It introduces an application factory pattern, consolidates routes into a blueprint, and implements a robust authentication and database layer.

- Introduces a Flask application factory (`create_app` in `main.py`) for better organization and testability.
- Consolidates all API routes into a single blueprint (`routes/api.py`) for modularity.
- Implements a new basic authentication system using a decorator (`@requires_basic_auth`) to secure all endpoints.
- Refactors the database access layer with standardized query execution and connection handling.
- Adds new modules for core logic, including an account checker (`checker.py`) and user retrieval (`get_users.py`).
- Updates the VSCode launch configuration to support the new Flask application structure.

BREAKING CHANGE: The application has been completely restructured. The old `server.py` entry point is removed. The application should now be run via the app factory in `main.py`. All API endpoints now require basic authentication.
This commit is contained in:
Karl 2025-07-13 19:40:04 +01:00
parent 445cdc834a
commit 4352004ed3
15 changed files with 212 additions and 199 deletions

23
.vscode/launch.json vendored
View File

@ -1,16 +1,25 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"name": "Python: Flask",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"justMyCode": false
"module": "flask",
"env": {
"FLASK_APP": "ktvmanager.main:create_app()",
"FLASK_DEBUG": "1"
},
"args": [
"run",
"--no-debugger",
"--no-reload",
"--host=0.0.0.0",
"--port=5001"
],
"jinja": true,
"justMyCode": true,
"python": "${workspaceFolder}/.venv/bin/python"
}
]
}

View File

@ -1,4 +1,2 @@
http://apppanel.co.uk/panel/capo/smarters/api//home.php?action=dns
http://chopzappz.xyz/v3advert/crazy/api/dns.php
http://razzlertv.xyz/customers/scotslad/virgin/api/dns.php
http://razzlertv.xyz/customers/scotslad/skyup/api/dns.php
https://shadowappz.win/1/kuga/smarters405/api/dns.php

19
ktvmanager/lib/auth.py Normal file
View File

@ -0,0 +1,19 @@
from functools import wraps
from flask import request, jsonify, Blueprint
from ktvmanager.lib.get_users import get_users
auth_blueprint = Blueprint("auth", __name__)
def check_auth(username, password):
users = get_users()
stored_password = users.get(username)
return stored_password == password
def requires_basic_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return jsonify({"message": "Could not verify"}), 401, {'WWW-Authenticate': 'Basic realm="Login Required"'}
return f(auth.username, *args, **kwargs)
return decorated

31
ktvmanager/lib/checker.py Normal file
View File

@ -0,0 +1,31 @@
import requests
from requests_tor import RequestsTor
def build_url(stream_url, username, password):
return f"{stream_url}/player_api.php?username={username}&password={password}"
def check_url(url):
try:
tr = RequestsTor()
response = tr.get(url, timeout=5)
response.raise_for_status()
if response.json().get("user_info", {}).get("auth"):
return response.json()
except requests.exceptions.RequestException as e:
if e.response and e.response.status_code == 403:
try:
response = requests.get(url, timeout=5)
response.raise_for_status()
if response.json().get("user_info", {}).get("auth"):
return response.json()
except requests.exceptions.RequestException:
return None
return None
def single_account_check(account_data, stream_urls):
for stream_url in stream_urls:
url = build_url(stream_url, account_data['username'], account_data['password'])
result = check_url(url)
if result:
return {"url": stream_url, "data": result}
return None

View File

@ -35,7 +35,7 @@ def find_url_for_user_details(username, password):
tm2 = time.perf_counter()
print(f'Total time elapsed: {tm2-tm1:0.2f} seconds')
find_url_for_user_details('Karl061122', 'gkuEDWzxHD')
find_url_for_user_details('Karl130623', 'emYZWPs')
find_url_for_user_details('Karlos2306', 'Gg58Wg8MB9')
find_url_for_user_details('Maxine2306', 'EszFDNNcb2')
# find_url_for_user_details('Karl061122', 'gkuEDWzxHD')
# find_url_for_user_details('Karl130623', 'emYZWPs')
# find_url_for_user_details('Karlos2306', 'Gg58Wg8MB9')
# find_url_for_user_details('Maxine2306', 'EszFDNNcb2')

View File

@ -1,46 +1,77 @@
import os
from dotenv import load_dotenv
import mysql.connector
from mysql.connector import connection
from dotenv import load_dotenv
from flask import jsonify, request
from ktvmanager.lib.checker import single_account_check
from ktvmanager.lib.encryption import decrypt_password
load_dotenv()
def create_connection_to_database() -> connection:
"""_summary_
Returns:
connection: _description_
"""
username = os.getenv("DBUSER")
password = os.getenv("DBPASS")
server = os.getenv("DBHOST")
database = os.getenv("DATABASE")
port = os.getenv("DBPORT")
mydb = mysql.connector.connect(
host=server, user=username, password=password, database=database, port=port
def _create_connection():
return mysql.connector.connect(
host=os.getenv("DBHOST"),
user=os.getenv("DBUSER"),
password=os.getenv("DBPASS"),
database=os.getenv("DATABASE"),
port=os.getenv("DBPORT")
)
return mydb
def _execute_query(query, params=None):
conn = _create_connection()
cursor = conn.cursor(dictionary=True)
try:
cursor.execute(query, params)
if query.strip().upper().startswith("SELECT"):
result = cursor.fetchall()
else:
conn.commit()
result = {"affected_rows": cursor.rowcount}
return result
finally:
cursor.close()
conn.close()
def return_data_from_database(query) -> None:
connection = create_connection_to_database()
cursor = connection.cursor()
cursor.execute(query)
data = cursor.fetchall()
return data
def get_user_id_from_username(username):
query = "SELECT id FROM users WHERE username = %s"
result = _execute_query(query, (username,))
if result:
return result[0]['id']
return None
def get_user_accounts(user_id):
query = "SELECT * FROM userAccounts WHERE userID = %s"
accounts = _execute_query(query, (user_id,))
for account in accounts:
account['password'] = decrypt_password(account['password'])
return jsonify(accounts)
def getUserAccounts(user) -> None:
"""_summary_
def get_stream_names():
query = "SELECT streamName FROM streams"
results = _execute_query(query)
stream_names = [row['streamName'] for row in results]
return jsonify(stream_names)
Args:
user (_type_): _description_
"""
query = "SELECT userAccounts.username, userAccounts.stream, userAccounts.streamURL, userAccounts.expiaryDate, userAccounts.password FROM users INNER JOIN userAccounts ON users.id = userAccounts.userID WHERE users.id = '1'"
a = return_data_from_database(query)
print(a)
def single_check():
data = request.get_json()
# This is a placeholder for getting stream URLs. In a real application,
# this would likely come from a database query or a configuration file.
stream_urls = ["http://example.com", "http://example.org"]
result = single_account_check(data, stream_urls)
if result:
# Here you would typically update the database with the new information
return jsonify(result)
return jsonify({"message": "All checks failed"}), 400
def add_account():
data = request.get_json()
query = "INSERT INTO userAccounts (username, stream, streamURL, expiaryDate, password, userID) VALUES (%s, %s, %s, %s, %s, %s)"
params = (data['username'], data['stream'], data['streamURL'], data['expiaryDate'], data['password'], data['userID'])
result = _execute_query(query, params)
return jsonify(result)
getUserAccounts("1")
def delete_account():
data = request.get_json()
query = "DELETE FROM userAccounts WHERE id = %s"
params = (data['id'],)
result = _execute_query(query, params)
return jsonify(result)

View File

@ -23,14 +23,16 @@ def get_latest_urls_from_dns():
content = json.loads(data.text)
except Exception:
pass
list_of_urls = content['su'].split(',')
try:
list_of_urls = content['su'].split(',')
except KeyError:
list_of_urls = content['fu'].split(',')
for url in list_of_urls:
complete_list_of_urls.append(url)
complete_list_of_urls = list(set(complete_list_of_urls))
# print(complete_list_of_urls)
for url in extra_urls:
complete_list_of_urls.append(url)
return complete_list_of_urls
return list(dict.fromkeys(complete_list_of_urls))
def generate_urls_for_user(username, password):
new_urls = []

View File

@ -0,0 +1,7 @@
from ktvmanager.lib.database import _execute_query
def get_users():
query = "SELECT userName, password FROM users"
results = _execute_query(query)
users = {user['userName']: user['password'] for user in results}
return users

23
ktvmanager/main.py Normal file
View File

@ -0,0 +1,23 @@
from flask import Flask, jsonify
from routes.api import api_blueprint
def create_app():
app = Flask(__name__)
# Register blueprints
app.register_blueprint(api_blueprint)
# Error handlers
@app.errorhandler(404)
def not_found(error):
return jsonify({"error": "Not found"}), 404
@app.errorhandler(500)
def server_error(error):
return jsonify({"error": "Server error"}), 500
return app
if __name__ == "__main__":
app = create_app()
app.run(debug=True, host='0.0.0.0', port=5001)

View File

@ -1,35 +0,0 @@
from flask import Flask
from routes.index import index_blueprint
from routes.get_user_accounts import get_user_accounts_blueprint
from routes.single_check import single_check_blueprint
from routes.add_account import add_account_blueprint
from routes.delete_account import delete_account_blueprint
from routes.login import login_blueprint
from auth import requires_auth
from lib.get_users import get_users
app = Flask(__name__)
users = get_users()
# Routes without auth
app.register_blueprint(index_blueprint)
# Routes with basic auth
app.register_blueprint(login_blueprint, url_prefix="/login")
app.register_blueprint(get_user_accounts_blueprint, url_prefix="/getUserAccounts")
app.register_blueprint(single_check_blueprint, url_prefix="/singleCheck")
app.register_blueprint(add_account_blueprint, url_prefix="/addAccount")
app.register_blueprint(delete_account_blueprint, url_prefix="/deleteAccount")
# 404 handler
@app.errorhandler(404)
def not_found(error):
return {"error": "Not found"}, 404
# 500 error handler
@app.errorhandler(500)
def server_error(error):
return {"error": "Server error"}, 500
if __name__ == "__main__":
app.run(debug=True)

View File

@ -1,90 +0,0 @@
import mysql.connector
def get_connection():
return mysql.connector.connect(
host='localhost',
user='your_mysql_user',
password='your_mysql_password',
database='your_database'
)
def get_user_accounts(user_id):
conn = get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT
userAccounts.username,
userAccounts.stream,
userAccounts.streamURL,
userAccounts.expiaryDate,
userAccounts.password
FROM users
INNER JOIN userAccounts ON users.id = userAccounts.userID
WHERE users.id = %s
""", (user_id,))
data = cursor.fetchall()
cursor.close()
conn.close()
return data if data else 'User Not Found'
def get_user_accounts_check(user_id):
return get_user_accounts(user_id)
def get_all_user_accounts():
conn = get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT
userAccounts.username,
userAccounts.password,
userAccounts.expiaryDate,
userAccounts.stream
FROM userAccounts
WHERE userAccounts.expiaryDate != '0'
ORDER BY userAccounts.id DESC
""")
data = cursor.fetchall()
cursor.close()
conn.close()
return data if data else 'User Not Found'
def get_all_unique_accounts():
conn = get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT DISTINCT userAccounts.stream, userAccounts.username,
userAccounts.password, userAccounts.stream
FROM userAccounts
WHERE userAccounts.expiaryDate != '0'
GROUP BY userAccounts.stream
""")
data = cursor.fetchall()
cursor.close()
conn.close()
return data if data else 'User Not Found'
def get_user_unique_accounts(user_id):
conn = get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT DISTINCT userAccounts.streamURL, userAccounts.username,
userAccounts.password, userAccounts.stream, userAccounts.streamURL
FROM userAccounts
WHERE userAccounts.expiaryDate != '0'
AND userAccounts.userId = %s
GROUP BY userAccounts.stream
ORDER BY userAccounts.stream ASC
""", (user_id,))
data = cursor.fetchall()
cursor.close()
conn.close()
return data if data else 'User Not Found'
def get_user_id(username):
conn = get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT id FROM users WHERE userName = %s", (username,))
result = cursor.fetchone()
cursor.close()
conn.close()
return result["id"] if result else 'User Not Found'

View File

@ -14,3 +14,6 @@ requests==2.32.3
requests-tor @ git+https://github.com/karl0ss/requests_tor.git@ae1e85abb3bb2c25f431bd031c6d881986c626f6
stem==1.8.2
urllib3==2.4.0
Flask
PyJWT

39
routes/api.py Normal file
View File

@ -0,0 +1,39 @@
from flask import Blueprint, jsonify
from ktvmanager.lib.database import get_user_accounts, get_stream_names, single_check, add_account, delete_account, get_user_id_from_username
from ktvmanager.lib.get_urls import get_latest_urls_from_dns
from ktvmanager.lib.auth import requires_basic_auth
api_blueprint = Blueprint("api", __name__)
@api_blueprint.route("/getUserAccounts")
@requires_basic_auth
def get_user_accounts_route(username):
user_id = get_user_id_from_username(username)
if user_id:
return get_user_accounts(user_id)
return jsonify({"message": "User not found"}), 404
@api_blueprint.route("/getStreamNames")
@requires_basic_auth
def get_stream_names_route(username):
return get_stream_names()
@api_blueprint.route("/getUserAccounts/streams")
@requires_basic_auth
def get_user_accounts_streams_route(username):
return jsonify(get_latest_urls_from_dns())
@api_blueprint.route("/singleCheck", methods=["POST"])
@requires_basic_auth
def single_check_route(username):
return single_check()
@api_blueprint.route("/addAccount", methods=["POST"])
@requires_basic_auth
def add_account_route(username):
return add_account()
@api_blueprint.route("/deleteAccount", methods=["POST"])
@requires_basic_auth
def delete_account_route(username):
return delete_account()

View File

@ -1,24 +0,0 @@
from functools import wraps
from flask import request, Response
from lib.get_users import get_users
users = get_users()
def check_auth(username, password):
return users.get(username) == password
def authenticate():
return Response(
'Could not verify your access.\n',
401,
{'WWW-Authenticate': 'Basic realm="Login Required"'}
)
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated

View File