# app.py from flask import (Flask, render_template, request, redirect, url_for, session, send_file, jsonify, send_from_directory, Response) from flask_caching import Cache import requests.auth import os import base64 from typing import Dict, Any, Tuple, Union import sys import redis import json import mysql.connector import re from lib.datetime import filter_accounts_next_30_days, filter_accounts_expired from lib.reqs import (get_urls, get_user_accounts, add_user_account, delete_user_account, get_stream_names) from config import DevelopmentConfig, ProductionConfig os.environ["OMP_NUM_THREADS"] = "1" os.environ["MKL_NUM_THREADS"] = "1" app = Flask(__name__) if os.environ.get("FLASK_ENV") == "production": app.config.from_object(ProductionConfig) else: app.config.from_object(DevelopmentConfig) # Check for Redis availability and configure cache redis_url = app.config["REDIS_URL"] cache_config = {"CACHE_TYPE": "redis", "CACHE_REDIS_URL": redis_url} try: # Use a short timeout to prevent hanging r = redis.from_url(redis_url, socket_connect_timeout=1) r.ping() except redis.exceptions.ConnectionError as e: print( f"WARNING: Redis connection failed: {e}. Falling back to SimpleCache. " "This is not recommended for production with multiple workers.", file=sys.stderr, ) cache_config = {"CACHE_TYPE": "SimpleCache"} cache = Cache(app, config=cache_config) app.config["OCR_ENABLED"] = False app.config["SESSION_COOKIE_SECURE"] = not app.config["DEBUG"] app.config['SESSION_COOKIE_HTTPONLY'] = True app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' app.config['PERMANENT_SESSION_LIFETIME'] = 60 * 60 * 24 * 365 # 1 year def get_version() -> str: """Retrieves the application version from the VERSION file. Returns: The version string, or 'dev' if the file is not found. """ try: with open('VERSION', 'r') as f: return f.read().strip() except FileNotFoundError: return 'dev' @app.context_processor def inject_version() -> Dict[str, str]: """Injects the version into all templates.""" return dict(version=get_version(), config=app.config, session=session) def make_cache_key(*args, **kwargs): """Generate a cache key based on the user's session and request path.""" username = session.get('username', 'anonymous') path = request.path return f"view/{username}/{path}" @app.before_request def make_session_permanent() -> None: """Makes the user session permanent.""" session.permanent = True @app.route('/site.webmanifest') def serve_manifest() -> Response: """Serves the site manifest file.""" return send_from_directory( os.path.join(app.root_path, 'static'), 'site.webmanifest', mimetype='application/manifest+json' ) @app.route("/favicon.ico") def favicon() -> Response: """Serves the favicon.""" return send_from_directory( os.path.join(app.root_path, "static"), "favicon.ico", mimetype="image/vnd.microsoft.icon", ) @app.route("/") def index() -> Union[Response, str]: """Renders the index page or redirects to home if logged in.""" if session.get("logged_in"): return redirect(url_for("home")) return render_template("index.html") @app.route('/vapid-public-key', methods=['GET']) def proxy_vapid_public_key(): """Proxies the request for the VAPID public key to the backend.""" backend_url = f"{app.config['BACKEND_URL']}/vapid-public-key" try: response = requests.get(backend_url) return Response(response.content, status=response.status_code, mimetype=response.headers['Content-Type']) except requests.exceptions.RequestException as e: return jsonify({"error": str(e)}), 502 @app.route('/save-subscription', methods=['POST']) def proxy_save_subscription(): """Proxies the request to save a push subscription to the backend.""" if not session.get("logged_in"): return jsonify({'error': 'Unauthorized'}), 401 backend_url = f"{app.config['BACKEND_URL']}/save-subscription" credentials = base64.b64decode(session["auth_credentials"]).decode() username, password = credentials.split(":", 1) try: response = requests.post( backend_url, auth=requests.auth.HTTPBasicAuth(username, password), json=request.get_json() ) return Response(response.content, status=response.status_code, mimetype=response.headers['Content-Type']) except requests.exceptions.RequestException as e: return jsonify({"error": str(e)}), 502 @app.route('/send-test-notification', methods=['POST']) def send_test_notification(): """Proxies the request to send a test notification to the backend.""" if not session.get("logged_in"): return jsonify({'error': 'Unauthorized'}), 401 backend_url = f"{app.config['BACKEND_URL']}/send-test-notification" credentials = base64.b64decode(session["auth_credentials"]).decode() username, password = credentials.split(":", 1) try: response = requests.post( backend_url, auth=requests.auth.HTTPBasicAuth(username, password), json={} ) return Response(response.content, status=response.status_code, mimetype=response.headers['Content-Type']) except requests.exceptions.RequestException as e: return jsonify({"error": str(e)}), 502 @app.route("/home") @cache.cached(timeout=60, key_prefix=make_cache_key) def home() -> str: """Renders the home page with account statistics.""" if session.get("logged_in"): base_url = app.config["BACKEND_URL"] all_accounts = get_user_accounts(base_url, session["auth_credentials"]) return render_template( "home.html", username=session["username"], accounts=len(all_accounts), current_month_accounts=filter_accounts_next_30_days(all_accounts), expired_accounts=filter_accounts_expired(all_accounts), ) return render_template("index.html") @app.route("/login", methods=["POST"]) def login() -> Union[Response, str]: """Handles user login.""" username = request.form["username"] password = request.form["password"] credentials = f"{username}:{password}" encoded_credentials = base64.b64encode(credentials.encode()).decode() base_url = app.config["BACKEND_URL"] login_url = f"{base_url}/Login" try: response = requests.get( login_url, auth=requests.auth.HTTPBasicAuth(username, password) ) response.raise_for_status() response_data = response.json() if response_data.get("auth") == "Success": session["logged_in"] = True session["username"] = response_data.get("username", username) session["user_id"] = response_data.get("user_id") session["auth_credentials"] = encoded_credentials next_url = request.args.get("next") if next_url: return redirect(next_url) return redirect(url_for("home", loggedin=True)) except requests.exceptions.RequestException: pass # Fall through to error error = "Invalid username or password. Please try again." return render_template("index.html", error=error) @app.route("/urls", methods=["GET"]) @cache.cached(timeout=300, key_prefix=make_cache_key) def urls() -> Union[Response, str]: """Renders the URLs page.""" if not session.get("logged_in"): return redirect(url_for("home")) base_url = app.config["BACKEND_URL"] return render_template( "urls.html", urls=get_urls(base_url, session["auth_credentials"]) ) @app.route("/accounts", methods=["GET"]) @cache.cached(timeout=60, key_prefix=make_cache_key) def user_accounts() -> Union[Response, str]: """Renders the user accounts page.""" if not session.get("logged_in"): return redirect(url_for("home")) base_url = app.config["BACKEND_URL"] user_accounts_data = get_user_accounts(base_url, session["auth_credentials"]) return render_template( "user_accounts.html", username=session["username"], user_accounts=user_accounts_data, auth=session["auth_credentials"], ) @app.route("/share", methods=["GET"]) def share() -> Response: """Handles shared text from PWA.""" if not session.get("logged_in"): return redirect(url_for("index", next=request.url)) shared_text = request.args.get("text") return redirect(url_for("add_account", shared_text=shared_text)) @app.route("/accounts/add", methods=["GET", "POST"]) def add_account() -> Union[Response, str]: """Handles adding a new user account.""" if not session.get("logged_in"): return redirect(url_for("index", next=request.url)) base_url = app.config["BACKEND_URL"] shared_text = request.args.get('shared_text') if request.method == "POST": username = request.form["username"] password = request.form["password"] stream = request.form["stream"] if add_user_account( base_url, session["auth_credentials"], username, password, stream ): cache.delete_memoized(user_accounts, key_prefix=make_cache_key) return redirect(url_for("user_accounts")) return render_template( "add_account.html", text_input_enabled=app.config.get("TEXT_INPUT_ENABLED"), shared_text=shared_text ) @app.route("/accounts/delete", methods=["POST"]) def delete_account() -> Response: """Handles deleting a user account.""" stream = request.form.get("stream") username = request.form.get("username") base_url = app.config["BACKEND_URL"] delete_user_account(base_url, session["auth_credentials"], stream, username) cache.delete_memoized(user_accounts, key_prefix=make_cache_key) return redirect(url_for("user_accounts")) @app.route("/validateAccount", methods=["POST"]) def validate_account() -> Tuple[Response, int]: """Forwards account validation requests to the backend.""" base_url = app.config["BACKEND_URL"] validate_url = f"{base_url}/validateAccount" credentials = base64.b64decode(session["auth_credentials"]).decode() username, password = credentials.split(":", 1) try: response = requests.post( validate_url, auth=requests.auth.HTTPBasicAuth(username, password), json=request.get_json() ) response.raise_for_status() response_data = response.json() if response_data.get("message") == "Account is valid and updated": cache.delete_memoized(user_accounts, key_prefix=make_cache_key) return jsonify(response_data), response.status_code except requests.exceptions.RequestException as e: return jsonify({"error": str(e)}), 500 @app.route("/get_stream_names", methods=["GET"]) def stream_names() -> Union[Response, str]: """Fetches and returns stream names as JSON.""" if not session.get("logged_in"): return redirect(url_for("home")) base_url = app.config["BACKEND_URL"] return jsonify(get_stream_names(base_url, session["auth_credentials"])) @app.route('/config') def config(): """Handles access to the configuration page.""" if session.get('user_id') and int(session.get('user_id')) == 1: return redirect(url_for('config_dashboard')) return redirect(url_for('home')) @app.route('/config/dashboard') def config_dashboard(): """Renders the configuration dashboard.""" if not session.get('user_id') or int(session.get('user_id')) != 1: return redirect(url_for('home')) return render_template('config_dashboard.html') @app.route('/check-expiring-accounts', methods=['POST']) def check_expiring_accounts(): """Proxies the request to check for expiring accounts to the backend.""" if not session.get('user_id') or int(session.get('user_id')) != 1: return jsonify({'error': 'Unauthorized'}), 401 backend_url = f"{app.config['BACKEND_URL']}/check-expiry" try: response = requests.post(backend_url) return Response(response.content, status=response.status_code, mimetype=response.headers['Content-Type']) except requests.exceptions.RequestException as e: return jsonify({"error": str(e)}), 502 @app.route('/dns', methods=['GET', 'POST', 'DELETE']) def proxy_dns(): """Proxies DNS management requests to the backend.""" if not session.get('user_id') or int(session.get('user_id')) != 1: return jsonify({'error': 'Unauthorized'}), 401 backend_url = f"{app.config['BACKEND_URL']}/dns" credentials = base64.b64decode(session["auth_credentials"]).decode() username, password = credentials.split(":", 1) auth = requests.auth.HTTPBasicAuth(username, password) try: if request.method == 'GET': response = requests.get(backend_url, auth=auth) elif request.method == 'POST': response = requests.post(backend_url, auth=auth, json=request.get_json()) if response.ok: cache.clear() elif request.method == 'DELETE': response = requests.delete(backend_url, auth=auth, json=request.get_json()) if response.ok: cache.clear() return Response(response.content, status=response.status_code, mimetype=response.headers['Content-Type']) except requests.exceptions.RequestException as e: return jsonify({"error": str(e)}), 502 @app.route('/extra_urls', methods=['GET', 'POST', 'DELETE']) def proxy_extra_urls(): """Proxies extra URL management requests to the backend.""" if not session.get('user_id') or int(session.get('user_id')) != 1: return jsonify({'error': 'Unauthorized'}), 401 backend_url = f"{app.config['BACKEND_URL']}/extra_urls" credentials = base64.b64decode(session["auth_credentials"]).decode() username, password = credentials.split(":", 1) auth = requests.auth.HTTPBasicAuth(username, password) try: if request.method == 'GET': response = requests.get(backend_url, auth=auth) elif request.method == 'POST': response = requests.post(backend_url, auth=auth, json=request.get_json()) if response.ok: cache.clear() elif request.method == 'DELETE': response = requests.delete(backend_url, auth=auth, json=request.get_json()) if response.ok: cache.clear() return Response(response.content, status=response.status_code, mimetype=response.headers['Content-Type']) except requests.exceptions.RequestException as e: return jsonify({"error": str(e)}), 502 class NginxProxyManager: def __init__(self, host, email, password): self.host = host self.email = email self.password = password self.token = None def login(self): url = f"{self.host}/api/tokens" payload = { "identity": self.email, "secret": self.password } headers = { "Content-Type": "application/json" } response = requests.post(url, headers=headers, data=json.dumps(payload)) if response.status_code == 200: self.token = response.json()["token"] print("Login successful.") else: print(f"Failed to login: {response.text}") exit(1) def get_proxy_host(self, host_id): if not self.token: self.login() url = f"{self.host}/api/nginx/proxy-hosts/{host_id}" headers = { "Authorization": f"Bearer {self.token}" } response = requests.get(url, headers=headers) if response.status_code == 200: return response.json() else: print(f"Failed to get proxy host {host_id}: {response.text}") return None def update_proxy_host_config(self, host_id, config): if not self.token: self.login() url = f"{self.host}/api/nginx/proxy-hosts/{host_id}" original_host_data = self.get_proxy_host(host_id) if not original_host_data: return # Construct a new payload with only the allowed fields for an update update_payload = { "domain_names": original_host_data.get("domain_names", []), "forward_scheme": original_host_data.get("forward_scheme", "http"), "forward_host": original_host_data.get("forward_host"), "forward_port": original_host_data.get("forward_port"), "access_list_id": original_host_data.get("access_list_id", 0), "certificate_id": original_host_data.get("certificate_id", 0), "ssl_forced": original_host_data.get("ssl_forced", False), "hsts_enabled": original_host_data.get("hsts_enabled", False), "hsts_subdomains": original_host_data.get("hsts_subdomains", False), "http2_support": original_host_data.get("http2_support", False), "block_exploits": original_host_data.get("block_exploits", False), "caching_enabled": original_host_data.get("caching_enabled", False), "allow_websocket_upgrade": original_host_data.get("allow_websocket_upgrade", False), "advanced_config": config, # The updated advanced config "meta": original_host_data.get("meta", {}), "locations": original_host_data.get("locations", []), } headers = { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json" } response = requests.put(url, headers=headers, data=json.dumps(update_payload)) if response.status_code == 200: print(f"Successfully updated proxy host {host_id}") else: print(f"Failed to update proxy host {host_id}: {response.text}") def update_config_with_streams(config, streams): # Get all stream names from the database db_stream_names = {stream['streamName'] for stream in streams} # Find all location blocks in the config location_blocks = re.findall(r'location ~ \^/(\w+)\(\.\*\)\$ \{[^}]+\}', config) # Remove location blocks that are not in the database for stream_name in location_blocks: if stream_name not in db_stream_names: print(f"Removing location block for stream: {stream_name}") pattern = re.compile(f'location ~ \\^/{re.escape(stream_name)}\\(\\.\\*\\)\\$ {{[^}}]+}}\\s*', re.DOTALL) config = pattern.sub('', config) # Update existing stream URLs for stream in streams: stream_name = stream['streamName'] stream_url = stream['streamURL'] if stream_url: # Ensure there is a URL to update to # Use a more specific regex to avoid replacing parts of other URLs pattern = re.compile(f'(location ~ \\^/{re.escape(stream_name)}\\(\\.\\*\\)\\$ {{\\s*return 302 )([^;]+)(;\\s*}})') config = pattern.sub(f'\\1{stream_url}/$1$is_args$args\\3', config) return config @app.route('/update_host_9_config', methods=['POST']) def update_host_9_config(): if not session.get('user_id') or int(session.get('user_id')) != 1: return jsonify({'error': 'Unauthorized'}), 401 npm = NginxProxyManager(app.config['NPM_HOST'], app.config['NPM_EMAIL'], app.config['NPM_PASSWORD']) npm.login() host = npm.get_proxy_host(9) if host: current_config = host.get('advanced_config', '') # Fetch streams from the backend API backend_url = f"{app.config['BACKEND_URL']}/get_all_stream_urls" credentials = base64.b64decode(session["auth_credentials"]).decode() username, password = credentials.split(":", 1) auth = requests.auth.HTTPBasicAuth(username, password) try: response = requests.get(backend_url, auth=auth) response.raise_for_status() streams = response.json() except requests.exceptions.RequestException as e: return jsonify({"error": f"Failed to fetch streams from backend: {e}"}), 502 if streams: new_config = update_config_with_streams(current_config, streams) npm.update_proxy_host_config(9, new_config) return jsonify({'message': 'Config updated successfully'}), 200 return jsonify({'error': 'Failed to update config'}), 500 if __name__ == "__main__": app.run( debug=app.config["DEBUG"], host=app.config["HOST"], port=app.config["PORT"] )