# app.py from flask import (Flask, render_template, request, redirect, url_for, session, send_file, jsonify, send_from_directory, Response) from flask_caching import Cache import requests.auth import os import base64 from typing import Dict, Any, Tuple, Union from lib.datetime import filter_accounts_next_30_days, filter_accounts_expired from lib.reqs import (get_urls, get_user_accounts, add_user_account, delete_user_account, get_stream_names) from config import DevelopmentConfig, ProductionConfig try: from paddleocr import PaddleOCR from PIL import Image import numpy as np OCR_AVAILABLE = True except ImportError: OCR_AVAILABLE = False os.environ["OMP_NUM_THREADS"] = "1" os.environ["MKL_NUM_THREADS"] = "1" app = Flask(__name__) if os.environ.get("FLASK_ENV") == "production": app.config.from_object(ProductionConfig) else: app.config.from_object(DevelopmentConfig) cache = Cache(app, config={"CACHE_TYPE": "SimpleCache"}) if app.config.get("OCR_ENABLED") and OCR_AVAILABLE: ocr = PaddleOCR(use_angle_cls=True, lang='en') else: app.config["OCR_ENABLED"] = False app.config["SESSION_COOKIE_SECURE"] = not app.config["DEBUG"] app.config['SESSION_COOKIE_HTTPONLY'] = True app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' app.config['PERMANENT_SESSION_LIFETIME'] = 60 * 60 * 24 * 365 # 1 year cache.clear() def get_version() -> str: """Retrieves the application version from the VERSION file. Returns: The version string, or 'dev' if the file is not found. """ try: with open('VERSION', 'r') as f: return f.read().strip() except FileNotFoundError: return 'dev' @app.context_processor def inject_version() -> Dict[str, str]: """Injects the version into all templates.""" return dict(version=get_version()) @app.before_request def make_session_permanent() -> None: """Makes the user session permanent.""" session.permanent = True @app.route('/site.webmanifest') def serve_manifest() -> Response: """Serves the site manifest file.""" return send_from_directory( os.path.join(app.root_path, 'static'), 'site.webmanifest', mimetype='application/manifest+json' ) @app.route("/favicon.ico") def favicon() -> Response: """Serves the favicon.""" return send_from_directory( os.path.join(app.root_path, "static"), "favicon.ico", mimetype="image/vnd.microsoft.icon", ) @app.route("/") def index() -> Union[Response, str]: """Renders the index page or redirects to home if logged in.""" if session.get("logged_in"): return redirect(url_for("home")) return render_template("index.html") @app.route("/home") @cache.cached(timeout=60) def home() -> str: """Renders the home page with account statistics.""" if session.get("logged_in"): base_url = app.config["BASE_URL"] all_accounts = get_user_accounts(base_url, session["auth_credentials"]) return render_template( "home.html", username=session["username"], accounts=len(all_accounts), current_month_accounts=filter_accounts_next_30_days(all_accounts), expired_accounts=filter_accounts_expired(all_accounts), ocr_enabled=app.config.get("OCR_ENABLED"), ) return render_template("index.html") @app.route("/login", methods=["POST"]) def login() -> Union[Response, str]: """Handles user login.""" username = request.form["username"] password = request.form["password"] credentials = f"{username}:{password}" encoded_credentials = base64.b64encode(credentials.encode()).decode() base_url = app.config["BASE_URL"] login_url = f"{base_url}/Login" try: response = requests.get( login_url, auth=requests.auth.HTTPBasicAuth(username, password) ) response.raise_for_status() if response.json().get("auth") == "Success": session["logged_in"] = True session["username"] = username session["auth_credentials"] = encoded_credentials return redirect(url_for("home")) except requests.exceptions.RequestException: pass # Fall through to error error = "Invalid username or password. Please try again." return render_template("index.html", error=error) @app.route("/urls", methods=["GET"]) @cache.cached(timeout=300) def urls() -> Union[Response, str]: """Renders the URLs page.""" if not session.get("logged_in"): return redirect(url_for("home")) base_url = app.config["BASE_URL"] return render_template( "urls.html", urls=get_urls(base_url, session["auth_credentials"]) ) @app.route("/accounts", methods=["GET"]) def user_accounts() -> Union[Response, str]: """Renders the user accounts page.""" if not session.get("logged_in"): return redirect(url_for("home")) base_url = app.config["BASE_URL"] user_accounts_data = get_user_accounts(base_url, session["auth_credentials"]) cache.delete_memoized(user_accounts) return render_template( "user_accounts.html", username=session["username"], user_accounts=user_accounts_data, auth=session["auth_credentials"], ) @app.route("/accounts/add", methods=["GET", "POST"]) def add_account() -> Union[Response, str]: """Handles adding a new user account.""" base_url = app.config["BASE_URL"] shared_text = request.args.get('shared_text') if request.method == "POST": username = request.form["username"] password = request.form["password"] stream = request.form["stream"] if add_user_account( base_url, session["auth_credentials"], username, password, stream ): cache.clear() return redirect(url_for("user_accounts")) return render_template( "add_account.html", ocr_enabled=app.config.get("OCR_ENABLED"), text_input_enabled=app.config.get("TEXT_INPUT_ENABLED"), shared_text=shared_text ) @app.route("/accounts/delete", methods=["POST"]) def delete_account() -> Response: """Handles deleting a user account.""" stream = request.form.get("stream") username = request.form.get("username") base_url = app.config["BASE_URL"] delete_user_account(base_url, session["auth_credentials"], stream, username) return redirect(url_for("user_accounts")) @app.route("/validateAccount", methods=["POST"]) def validate_account() -> Tuple[Response, int]: """Forwards account validation requests to the backend.""" base_url = app.config["BASE_URL"] validate_url = f"{base_url}/validateAccount" credentials = base64.b64decode(session["auth_credentials"]).decode() username, password = credentials.split(":", 1) try: response = requests.post( validate_url, auth=requests.auth.HTTPBasicAuth(username, password), json=request.get_json() ) response.raise_for_status() return jsonify(response.json()), response.status_code except requests.exceptions.RequestException as e: return jsonify({"error": str(e)}), 500 @app.route("/get_stream_names", methods=["GET"]) def stream_names() -> Union[Response, str]: """Fetches and returns stream names as JSON.""" if not session.get("logged_in"): return redirect(url_for("home")) base_url = app.config["BASE_URL"] return jsonify(get_stream_names(base_url, session["auth_credentials"])) if app.config.get("OCR_ENABLED"): @app.route('/OCRupload', methods=['POST']) def ocr_upload() -> Union[Response, str, Tuple[Response, int]]: """Handles image uploads for OCR processing.""" if 'image' not in request.files: return jsonify({"error": "No image file found"}), 400 file = request.files['image'] try: image = Image.open(file.stream) image_np = np.array(image) result = ocr.ocr(image_np) extracted_text = [line[1][0] for line in result[0]] # Basic validation if len(extracted_text) >= 4: return render_template( "add_account.html", username=extracted_text[2], password=extracted_text[3], ocr_enabled=True, text_input_enabled=app.config.get("TEXT_INPUT_ENABLED") ) else: return jsonify({"error": "Could not extract required fields from image"}), 400 except Exception as e: return jsonify({"error": str(e)}), 500 if __name__ == "__main__": app.run( debug=app.config["DEBUG"], host=app.config["HOST"], port=app.config["PORT"] )