From 8b9c6da2b777ab882e6b1c5b19066f15d3a05086 Mon Sep 17 00:00:00 2001 From: Karl Date: Tue, 15 Jul 2025 15:44:19 +0100 Subject: [PATCH] docstrings --- app.py | 226 ++++++++++++++++++++++++++---------------------- lib/datetime.py | 29 ++++--- lib/reqs.py | 89 +++++++++---------- 3 files changed, 180 insertions(+), 164 deletions(-) diff --git a/app.py b/app.py index 2a4dd85..bb5d385 100644 --- a/app.py +++ b/app.py @@ -1,19 +1,24 @@ # app.py -from flask import Flask, render_template, request, redirect, url_for, session, send_file, jsonify +from flask import (Flask, render_template, request, redirect, url_for, session, + send_file, jsonify, send_from_directory, Response) from flask_caching import Cache import requests.auth import os -from lib.datetime import filter_accounts_next_30_days, filter_accounts_expired -from lib.reqs import get_urls, get_user_accounts, add_user_account, delete_user_account, get_stream_names -from flask import send_from_directory -import requests import base64 -from flask import Flask -from config import DevelopmentConfig, ProductionConfig -from paddleocr import PaddleOCR -from PIL import Image -import numpy as np +from typing import Dict, Any, Tuple, Union +from lib.datetime import filter_accounts_next_30_days, filter_accounts_expired +from lib.reqs import (get_urls, get_user_accounts, add_user_account, + delete_user_account, get_stream_names) +from config import DevelopmentConfig, ProductionConfig + +try: + from paddleocr import PaddleOCR + from PIL import Image + import numpy as np + OCR_AVAILABLE = True +except ImportError: + OCR_AVAILABLE = False os.environ["OMP_NUM_THREADS"] = "1" os.environ["MKL_NUM_THREADS"] = "1" @@ -24,18 +29,26 @@ if os.environ.get("FLASK_ENV") == "production": app.config.from_object(ProductionConfig) else: app.config.from_object(DevelopmentConfig) + cache = Cache(app, config={"CACHE_TYPE": "SimpleCache"}) -if app.config.get("OCR_ENABLED"): - ocr = PaddleOCR(use_angle_cls=True, lang='en') # Adjust language if needed +if app.config.get("OCR_ENABLED") and OCR_AVAILABLE: + ocr = PaddleOCR(use_angle_cls=True, lang='en') +else: + app.config["OCR_ENABLED"] = False app.config["SESSION_COOKIE_SECURE"] = not app.config["DEBUG"] -app.config['SESSION_COOKIE_HTTPONLY'] = True # Prevent JavaScript access -app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' # Adjust for cross-site requests -app.config['PERMANENT_SESSION_LIFETIME'] = 60 * 60 * 24 * 365 # 1 year in seconds -cache.clear() # Clears all cache entries +app.config['SESSION_COOKIE_HTTPONLY'] = True +app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' +app.config['PERMANENT_SESSION_LIFETIME'] = 60 * 60 * 24 * 365 # 1 year +cache.clear() -def get_version(): +def get_version() -> str: + """Retrieves the application version from the VERSION file. + + Returns: + The version string, or 'dev' if the file is not found. + """ try: with open('VERSION', 'r') as f: return f.read().strip() @@ -43,109 +56,102 @@ def get_version(): return 'dev' @app.context_processor -def inject_version(): +def inject_version() -> Dict[str, str]: + """Injects the version into all templates.""" return dict(version=get_version()) @app.before_request -def make_session_permanent(): +def make_session_permanent() -> None: + """Makes the user session permanent.""" session.permanent = True @app.route('/site.webmanifest') -def serve_manifest(): - return send_from_directory(os.path.join(app.root_path, 'static'), 'site.webmanifest', mimetype='application/manifest+json') +def serve_manifest() -> Response: + """Serves the site manifest file.""" + return send_from_directory( + os.path.join(app.root_path, 'static'), + 'site.webmanifest', + mimetype='application/manifest+json' + ) @app.route("/favicon.ico") -def favicon(): +def favicon() -> Response: + """Serves the favicon.""" return send_from_directory( os.path.join(app.root_path, "static"), "favicon.ico", mimetype="image/vnd.microsoft.icon", ) - @app.route("/") -def index(): - # If the user is logged in, redirect to a protected page like /accounts +def index() -> Union[Response, str]: + """Renders the index page or redirects to home if logged in.""" if session.get("logged_in"): return redirect(url_for("home")) return render_template("index.html") - @app.route("/home") -@cache.cached(timeout=60) # cache for 120 seconds -def home(): +@cache.cached(timeout=60) +def home() -> str: + """Renders the home page with account statistics.""" if session.get("logged_in"): - base_url = app.config["BASE_URL"] # Access base_url from the config + base_url = app.config["BASE_URL"] all_accounts = get_user_accounts(base_url, session["auth_credentials"]) - count = len(all_accounts) - current_month_accounts = filter_accounts_next_30_days(all_accounts) - expired_accounts = filter_accounts_expired(all_accounts) return render_template( "home.html", username=session["username"], - accounts=count, - current_month_accounts=current_month_accounts, - expired_accounts=expired_accounts, + accounts=len(all_accounts), + current_month_accounts=filter_accounts_next_30_days(all_accounts), + expired_accounts=filter_accounts_expired(all_accounts), ocr_enabled=app.config.get("OCR_ENABLED"), ) return render_template("index.html") - @app.route("/login", methods=["POST"]) -def login(): +def login() -> Union[Response, str]: + """Handles user login.""" username = request.form["username"] password = request.form["password"] - - # Encode the username and password in Base64 credentials = f"{username}:{password}" encoded_credentials = base64.b64encode(credentials.encode()).decode() + base_url = app.config["BASE_URL"] + login_url = f"{base_url}/Login" - base_url = app.config["BASE_URL"] # Access base_url from the config - login_url = f"{base_url}/Login" # Construct the full URL - - # Send GET request to the external login API with Basic Auth - response = requests.get( - login_url, auth=requests.auth.HTTPBasicAuth(username, password) - ) - - # Check if login was successful - if response.status_code == 200 and response.json().get("auth") == "Success": - # Set session variable to indicate the user is logged in - session["logged_in"] = True - session["username"] = username - session["auth_credentials"] = encoded_credentials - return redirect(url_for("home")) # Redirect to the Accounts page - else: - # Show error on the login page - error = "Invalid username or password. Please try again." - return render_template("index.html", error=error) + try: + response = requests.get( + login_url, auth=requests.auth.HTTPBasicAuth(username, password) + ) + response.raise_for_status() + if response.json().get("auth") == "Success": + session["logged_in"] = True + session["username"] = username + session["auth_credentials"] = encoded_credentials + return redirect(url_for("home")) + except requests.exceptions.RequestException: + pass # Fall through to error + error = "Invalid username or password. Please try again." + return render_template("index.html", error=error) @app.route("/urls", methods=["GET"]) -@cache.cached(timeout=300) # cache for 5 minutes -def urls(): - # Check if the user is logged in +@cache.cached(timeout=300) +def urls() -> Union[Response, str]: + """Renders the URLs page.""" if not session.get("logged_in"): return redirect(url_for("home")) - # Placeholder content for Accounts page - base_url = app.config["BASE_URL"] # Access base_url from the config + base_url = app.config["BASE_URL"] return render_template( "urls.html", urls=get_urls(base_url, session["auth_credentials"]) ) - @app.route("/accounts", methods=["GET"]) -def user_accounts(): - # Check if the user is logged in +def user_accounts() -> Union[Response, str]: + """Renders the user accounts page.""" if not session.get("logged_in"): return redirect(url_for("home")) - # Placeholder content for Accounts page - base_url = app.config["BASE_URL"] # Access base_url from the config + base_url = app.config["BASE_URL"] user_accounts_data = get_user_accounts(base_url, session["auth_credentials"]) - - # Clear the cache for 'user_accounts' view specifically cache.delete_memoized(user_accounts) - return render_template( "user_accounts.html", username=session["username"], @@ -153,9 +159,9 @@ def user_accounts(): auth=session["auth_credentials"], ) - @app.route("/accounts/add", methods=["GET", "POST"]) -def add_account(): +def add_account() -> Union[Response, str]: + """Handles adding a new user account.""" base_url = app.config["BASE_URL"] shared_text = request.args.get('shared_text') @@ -163,72 +169,84 @@ def add_account(): username = request.form["username"] password = request.form["password"] stream = request.form["stream"] - if add_user_account( base_url, session["auth_credentials"], username, password, stream ): cache.clear() return redirect(url_for("user_accounts")) - return render_template("add_account.html", ocr_enabled=app.config.get("OCR_ENABLED"), text_input_enabled=app.config.get("TEXT_INPUT_ENABLED"), shared_text=shared_text) - - return render_template("add_account.html", ocr_enabled=app.config.get("OCR_ENABLED"), text_input_enabled=app.config.get("TEXT_INPUT_ENABLED"), shared_text=shared_text) + return render_template( + "add_account.html", + ocr_enabled=app.config.get("OCR_ENABLED"), + text_input_enabled=app.config.get("TEXT_INPUT_ENABLED"), + shared_text=shared_text + ) @app.route("/accounts/delete", methods=["POST"]) -def delete_account(): +def delete_account() -> Response: + """Handles deleting a user account.""" stream = request.form.get("stream") username = request.form.get("username") base_url = app.config["BASE_URL"] - - if delete_user_account(base_url, session["auth_credentials"], stream, username): - return redirect(url_for("user_accounts")) + delete_user_account(base_url, session["auth_credentials"], stream, username) return redirect(url_for("user_accounts")) - @app.route("/validateAccount", methods=["POST"]) -def validate_account(): +def validate_account() -> Tuple[Response, int]: + """Forwards account validation requests to the backend.""" base_url = app.config["BASE_URL"] validate_url = f"{base_url}/validateAccount" - - # Forward the request to the backend API credentials = base64.b64decode(session["auth_credentials"]).decode() username, password = credentials.split(":", 1) - response = requests.post( - validate_url, - auth=requests.auth.HTTPBasicAuth(username, password), - json=request.get_json() - ) - - return jsonify(response.json()), response.status_code + try: + response = requests.post( + validate_url, + auth=requests.auth.HTTPBasicAuth(username, password), + json=request.get_json() + ) + response.raise_for_status() + return jsonify(response.json()), response.status_code + except requests.exceptions.RequestException as e: + return jsonify({"error": str(e)}), 500 @app.route("/get_stream_names", methods=["GET"]) -def stream_names(): +def stream_names() -> Union[Response, str]: + """Fetches and returns stream names as JSON.""" if not session.get("logged_in"): return redirect(url_for("home")) base_url = app.config["BASE_URL"] - stream_names = get_stream_names(base_url, session["auth_credentials"]) - return jsonify(stream_names) - + return jsonify(get_stream_names(base_url, session["auth_credentials"])) if app.config.get("OCR_ENABLED"): @app.route('/OCRupload', methods=['POST']) - def OCRupload(): + def ocr_upload() -> Union[Response, str, Tuple[Response, int]]: + """Handles image uploads for OCR processing.""" if 'image' not in request.files: return jsonify({"error": "No image file found"}), 400 - # Get the uploaded file file = request.files['image'] try: image = Image.open(file.stream) image_np = np.array(image) result = ocr.ocr(image_np) - # Extract text - extracted_text = [] - for line in result[0]: - extracted_text.append(line[1][0]) - return render_template("add_account.html", username=extracted_text[2], password=extracted_text[3], ocr_enabled=app.config.get("OCR_ENABLED"), text_input_enabled=app.config.get("TEXT_INPUT_ENABLED")) + extracted_text = [line[1][0] for line in result[0]] + # Basic validation + if len(extracted_text) >= 4: + return render_template( + "add_account.html", + username=extracted_text[2], + password=extracted_text[3], + ocr_enabled=True, + text_input_enabled=app.config.get("TEXT_INPUT_ENABLED") + ) + else: + return jsonify({"error": "Could not extract required fields from image"}), 400 except Exception as e: return jsonify({"error": str(e)}), 500 if __name__ == "__main__": - app.run(debug=app.config["DEBUG"], host=app.config["HOST"], port=app.config["PORT"]) + app.run( + debug=app.config["DEBUG"], + host=app.config["HOST"], + port=app.config["PORT"] + ) diff --git a/lib/datetime.py b/lib/datetime.py index c3e1496..da0714c 100644 --- a/lib/datetime.py +++ b/lib/datetime.py @@ -1,13 +1,16 @@ from datetime import datetime, timedelta -from typing import List, Dict +from typing import List, Dict, Any + +def filter_accounts_next_30_days(accounts: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Filters accounts expiring within the next 30 days. -def filter_accounts_next_30_days(accounts: List[Dict[str, int]]) -> List[Dict[str, int]]: - """Filter accounts whose expiry date falls within the next 30 days. Args: - accounts (List[Dict[str, int]]): A list of account dictionaries, each containing - an 'expiaryDate' key with an epoch timestamp as its value. + accounts: A list of account dictionaries, each with an 'expiaryDate' + (epoch timestamp). + Returns: - List[Dict[str, int]]: A list of accounts expiring within the next 30 days. + A list of accounts expiring within the next 30 days, with added + 'expiaryDate_rendered' and 'days_to_expiry' keys. """ now = datetime.now() thirty_days_later = now + timedelta(days=30) @@ -26,18 +29,18 @@ def filter_accounts_next_30_days(accounts: List[Dict[str, int]]) -> List[Dict[st result.append(account) return result -def filter_accounts_expired(accounts: List[Dict[str, int]]) -> List[Dict[str, int]]: - """Filter accounts whose expiry date has passed. +def filter_accounts_expired(accounts: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Filters accounts that have already expired. + Args: - accounts (List[Dict[str, int]]): A list of account dictionaries, each containing - an 'expiaryDate' key with an epoch timestamp as its value. + accounts: A list of account dictionaries, each with an 'expiaryDate' + (epoch timestamp). + Returns: - List[Dict[str, int]]: A list of accounts that have expired. + A list of expired accounts with an added 'expiaryDate_rendered' key. """ - # Get the current epoch timestamp current_timestamp = int(datetime.now().timestamp()) - # Filter accounts where the current date is greater than the expiryDate expired_accounts = [] for account in accounts: if account['expiaryDate'] < current_timestamp: diff --git a/lib/reqs.py b/lib/reqs.py index 5dad4cf..2642e15 100644 --- a/lib/reqs.py +++ b/lib/reqs.py @@ -5,106 +5,101 @@ from typing import List, Dict, Any def get_urls(base_url: str, auth: str) -> List[Dict[str, Any]]: - """Retrieve user account streams from the specified base URL. + """Retrieves user account streams from the API. Args: - base_url (str): The base URL of the API. - auth (str): The authorization token for accessing the API. + base_url: The base URL of the API. + auth: The authorization token. Returns: - List[Dict[str, Any]]: A list of user account streams. + A list of user account streams. """ url = f"{base_url}/getUserAccounts/streams" - payload = {} headers = {"Authorization": f"Basic {auth}"} - - response = requests.request("GET", url, headers=headers, data=payload) - return json.loads(response.text) + response = requests.get(url, headers=headers) + response.raise_for_status() + return response.json() def get_user_accounts(base_url: str, auth: str) -> List[Dict[str, Any]]: - """Retrieve user accounts from the specified base URL. + """Retrieves user accounts from the API. Args: - base_url (str): The base URL of the API. - auth (str): The authorization token for accessing the API. + base_url: The base URL of the API. + auth: The authorization token. Returns: - List[Dict[str, Any]]: A list of user accounts with their expiration dates rendered. + A list of user accounts with 'expiaryDate_rendered' added. """ url = f"{base_url}/getUserAccounts" - payload = {} headers = {"Authorization": f"Basic {auth}"} + response = requests.get(url, headers=headers) + response.raise_for_status() + accounts = response.json() - response = requests.request("GET", url, headers=headers, data=payload) - res_json = json.loads(response.text) - - for account in res_json: + for account in accounts: account["expiaryDate_rendered"] = datetime.utcfromtimestamp( account["expiaryDate"] ).strftime("%d/%m/%Y") - return res_json + return accounts def delete_user_account(base_url: str, auth: str, stream: str, username: str) -> bool: - """Delete a user account from the specified base URL. + """Deletes a user account via the API. Args: - base_url (str): The base URL of the API. - auth (str): The authorization token for accessing the API. - stream (str): The name of the stream associated with the user account. - username (str): The username of the account to delete. + base_url: The base URL of the API. + auth: The authorization token. + stream: The stream associated with the account. + username: The username of the account to delete. Returns: - bool: True if the account was deleted successfully, False otherwise. + True if the account was deleted successfully, False otherwise. """ url = f"{base_url}/deleteAccount" payload = {"stream": stream, "user": username} headers = {"Authorization": f"Basic {auth}"} - - response = requests.request("POST", url, headers=headers, data=payload) + response = requests.post(url, headers=headers, data=payload) + response.raise_for_status() return "Deleted" in response.text def add_user_account(base_url: str, auth: str, username: str, password: str, stream: str) -> bool: - """Add a user account to the specified base URL. + """Adds a user account via the API. Args: - base_url (str): The base URL of the API. - auth (str): The authorization token for accessing the API. - username (str): The username of the account to add. - password (str): The password of the account to add. - stream (str): The name of the stream associated with the user account. + base_url: The base URL of the API. + auth: The authorization token. + username: The username of the new account. + password: The password for the new account. + stream: The stream to associate with the new account. Returns: - bool: True if the account was added successfully, False otherwise. + True if the account was added successfully, False otherwise. """ url = f"{base_url}/addAccount" payload = {"username": username, "password": password, "stream": stream} headers = {"Authorization": f"Basic {auth}"} - - response = requests.request("POST", url, headers=headers, data=payload) + response = requests.post(url, headers=headers, data=payload) return response.status_code == 200 def get_stream_names(base_url: str, auth: str) -> List[str]: - """Get a list of stream names from the API. + """Retrieves a list of stream names from the API. Args: - base_url (str): The base URL of the API. - auth (str): The authorization token. + base_url: The base URL of the API. + auth: The authorization token. Returns: - List[str]: A list of stream names. + A list of stream names, or an empty list if an error occurs. """ url = f"{base_url}/getStreamNames" - payload = {} headers = {"Authorization": f"Basic {auth}"} - response = requests.request("GET", url, headers=headers, data=payload) - if response.status_code == 200 and response.text: - try: - return json.loads(response.text) - except json.JSONDecodeError: - return [] - return [] + try: + response = requests.get(url, headers=headers) + response.raise_for_status() + return response.json() + except (requests.exceptions.RequestException, json.JSONDecodeError): + return []