# app.py from flask import Flask, render_template, request, redirect, url_for, session, send_file, jsonify from flask_caching import Cache import requests.auth import os from lib.datetime import filter_accounts_next_30_days, filter_accounts_expired from lib.reqs import get_urls, get_user_accounts, add_user_account, delete_user_account, get_stream_names from flask import send_from_directory import requests import base64 from flask import Flask from config import DevelopmentConfig, ProductionConfig from paddleocr import PaddleOCR from PIL import Image import numpy as np os.environ["OMP_NUM_THREADS"] = "1" os.environ["MKL_NUM_THREADS"] = "1" app = Flask(__name__) if os.environ.get("FLASK_ENV") == "production": app.config.from_object(ProductionConfig) else: app.config.from_object(DevelopmentConfig) cache = Cache(app, config={"CACHE_TYPE": "SimpleCache"}) if app.config.get("OCR_ENABLED"): ocr = PaddleOCR(use_angle_cls=True, lang='en') # Adjust language if needed app.config["SESSION_COOKIE_SECURE"] = not app.config["DEBUG"] app.config['SESSION_COOKIE_HTTPONLY'] = True # Prevent JavaScript access app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' # Adjust for cross-site requests app.config['PERMANENT_SESSION_LIFETIME'] = 60 * 60 * 24 * 365 # 1 year in seconds cache.clear() # Clears all cache entries def get_version(): try: with open('VERSION', 'r') as f: return f.read().strip() except FileNotFoundError: return 'dev' @app.context_processor def inject_version(): return dict(version=get_version()) @app.before_request def make_session_permanent(): session.permanent = True @app.route('/site.webmanifest') def serve_manifest(): return send_from_directory(os.path.join(app.root_path, 'static'), 'site.webmanifest', mimetype='application/manifest+json') @app.route("/favicon.ico") def favicon(): return send_from_directory( os.path.join(app.root_path, "static"), "favicon.ico", mimetype="image/vnd.microsoft.icon", ) @app.route("/") def index(): # If the user is logged in, redirect to a protected page like /accounts if session.get("logged_in"): return redirect(url_for("home")) return render_template("index.html") @app.route("/home") @cache.cached(timeout=60) # cache for 120 seconds def home(): if session.get("logged_in"): base_url = app.config["BASE_URL"] # Access base_url from the config all_accounts = get_user_accounts(base_url, session["auth_credentials"]) count = len(all_accounts) current_month_accounts = filter_accounts_next_30_days(all_accounts) expired_accounts = filter_accounts_expired(all_accounts) return render_template( "home.html", username=session["username"], accounts=count, current_month_accounts=current_month_accounts, expired_accounts=expired_accounts, ocr_enabled=app.config.get("OCR_ENABLED"), ) return render_template("index.html") @app.route("/login", methods=["POST"]) def login(): username = request.form["username"] password = request.form["password"] # Encode the username and password in Base64 credentials = f"{username}:{password}" encoded_credentials = base64.b64encode(credentials.encode()).decode() base_url = app.config["BASE_URL"] # Access base_url from the config login_url = f"{base_url}/Login" # Construct the full URL # Send GET request to the external login API with Basic Auth response = requests.get( login_url, auth=requests.auth.HTTPBasicAuth(username, password) ) # Check if login was successful if response.status_code == 200 and response.json().get("auth") == "Success": # Set session variable to indicate the user is logged in session["logged_in"] = True session["username"] = username session["auth_credentials"] = encoded_credentials return redirect(url_for("home")) # Redirect to the Accounts page else: # Show error on the login page error = "Invalid username or password. Please try again." return render_template("index.html", error=error) @app.route("/urls", methods=["GET"]) @cache.cached(timeout=300) # cache for 5 minutes def urls(): # Check if the user is logged in if not session.get("logged_in"): return redirect(url_for("home")) # Placeholder content for Accounts page base_url = app.config["BASE_URL"] # Access base_url from the config return render_template( "urls.html", urls=get_urls(base_url, session["auth_credentials"]) ) @app.route("/accounts", methods=["GET"]) def user_accounts(): # Check if the user is logged in if not session.get("logged_in"): return redirect(url_for("home")) # Placeholder content for Accounts page base_url = app.config["BASE_URL"] # Access base_url from the config user_accounts_data = get_user_accounts(base_url, session["auth_credentials"]) # Clear the cache for 'user_accounts' view specifically cache.delete_memoized(user_accounts) return render_template( "user_accounts.html", username=session["username"], user_accounts=user_accounts_data, auth=session["auth_credentials"], ) @app.route("/accounts/add", methods=["GET", "POST"]) def add_account(): base_url = app.config["BASE_URL"] shared_text = request.args.get('shared_text') if request.method == "POST": username = request.form["username"] password = request.form["password"] stream = request.form["stream"] if add_user_account( base_url, session["auth_credentials"], username, password, stream ): cache.clear() return redirect(url_for("user_accounts")) return render_template("add_account.html", ocr_enabled=app.config.get("OCR_ENABLED"), text_input_enabled=app.config.get("TEXT_INPUT_ENABLED"), shared_text=shared_text) return render_template("add_account.html", ocr_enabled=app.config.get("OCR_ENABLED"), text_input_enabled=app.config.get("TEXT_INPUT_ENABLED"), shared_text=shared_text) @app.route("/accounts/delete", methods=["POST"]) def delete_account(): stream = request.form.get("stream") username = request.form.get("username") base_url = app.config["BASE_URL"] if delete_user_account(base_url, session["auth_credentials"], stream, username): return redirect(url_for("user_accounts")) return redirect(url_for("user_accounts")) @app.route("/validateAccount", methods=["POST"]) def validate_account(): base_url = app.config["BASE_URL"] validate_url = f"{base_url}/validateAccount" # Forward the request to the backend API credentials = base64.b64decode(session["auth_credentials"]).decode() username, password = credentials.split(":", 1) response = requests.post( validate_url, auth=requests.auth.HTTPBasicAuth(username, password), json=request.get_json() ) return jsonify(response.json()), response.status_code @app.route("/get_stream_names", methods=["GET"]) def stream_names(): if not session.get("logged_in"): return redirect(url_for("home")) base_url = app.config["BASE_URL"] stream_names = get_stream_names(base_url, session["auth_credentials"]) return jsonify(stream_names) if app.config.get("OCR_ENABLED"): @app.route('/OCRupload', methods=['POST']) def OCRupload(): if 'image' not in request.files: return jsonify({"error": "No image file found"}), 400 # Get the uploaded file file = request.files['image'] try: image = Image.open(file.stream) image_np = np.array(image) result = ocr.ocr(image_np) # Extract text extracted_text = [] for line in result[0]: extracted_text.append(line[1][0]) return render_template("add_account.html", username=extracted_text[2], password=extracted_text[3], ocr_enabled=app.config.get("OCR_ENABLED"), text_input_enabled=app.config.get("TEXT_INPUT_ENABLED")) except Exception as e: return jsonify({"error": str(e)}), 500 if __name__ == "__main__": app.run(debug=app.config["DEBUG"], host=app.config["HOST"], port=app.config["PORT"])