docstrings

This commit is contained in:
Karl 2025-07-15 15:44:19 +01:00
parent 259b128af0
commit 8b9c6da2b7
3 changed files with 180 additions and 164 deletions

202
app.py
View File

@ -1,19 +1,24 @@
# app.py
from flask import Flask, render_template, request, redirect, url_for, session, send_file, jsonify
from flask import (Flask, render_template, request, redirect, url_for, session,
send_file, jsonify, send_from_directory, Response)
from flask_caching import Cache
import requests.auth
import os
from lib.datetime import filter_accounts_next_30_days, filter_accounts_expired
from lib.reqs import get_urls, get_user_accounts, add_user_account, delete_user_account, get_stream_names
from flask import send_from_directory
import requests
import base64
from flask import Flask
from config import DevelopmentConfig, ProductionConfig
from paddleocr import PaddleOCR
from PIL import Image
import numpy as np
from typing import Dict, Any, Tuple, Union
from lib.datetime import filter_accounts_next_30_days, filter_accounts_expired
from lib.reqs import (get_urls, get_user_accounts, add_user_account,
delete_user_account, get_stream_names)
from config import DevelopmentConfig, ProductionConfig
try:
from paddleocr import PaddleOCR
from PIL import Image
import numpy as np
OCR_AVAILABLE = True
except ImportError:
OCR_AVAILABLE = False
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
@ -24,18 +29,26 @@ if os.environ.get("FLASK_ENV") == "production":
app.config.from_object(ProductionConfig)
else:
app.config.from_object(DevelopmentConfig)
cache = Cache(app, config={"CACHE_TYPE": "SimpleCache"})
if app.config.get("OCR_ENABLED"):
ocr = PaddleOCR(use_angle_cls=True, lang='en') # Adjust language if needed
if app.config.get("OCR_ENABLED") and OCR_AVAILABLE:
ocr = PaddleOCR(use_angle_cls=True, lang='en')
else:
app.config["OCR_ENABLED"] = False
app.config["SESSION_COOKIE_SECURE"] = not app.config["DEBUG"]
app.config['SESSION_COOKIE_HTTPONLY'] = True # Prevent JavaScript access
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' # Adjust for cross-site requests
app.config['PERMANENT_SESSION_LIFETIME'] = 60 * 60 * 24 * 365 # 1 year in seconds
cache.clear() # Clears all cache entries
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
app.config['PERMANENT_SESSION_LIFETIME'] = 60 * 60 * 24 * 365 # 1 year
cache.clear()
def get_version():
def get_version() -> str:
"""Retrieves the application version from the VERSION file.
Returns:
The version string, or 'dev' if the file is not found.
"""
try:
with open('VERSION', 'r') as f:
return f.read().strip()
@ -43,109 +56,102 @@ def get_version():
return 'dev'
@app.context_processor
def inject_version():
def inject_version() -> Dict[str, str]:
"""Injects the version into all templates."""
return dict(version=get_version())
@app.before_request
def make_session_permanent():
def make_session_permanent() -> None:
"""Makes the user session permanent."""
session.permanent = True
@app.route('/site.webmanifest')
def serve_manifest():
return send_from_directory(os.path.join(app.root_path, 'static'), 'site.webmanifest', mimetype='application/manifest+json')
def serve_manifest() -> Response:
"""Serves the site manifest file."""
return send_from_directory(
os.path.join(app.root_path, 'static'),
'site.webmanifest',
mimetype='application/manifest+json'
)
@app.route("/favicon.ico")
def favicon():
def favicon() -> Response:
"""Serves the favicon."""
return send_from_directory(
os.path.join(app.root_path, "static"),
"favicon.ico",
mimetype="image/vnd.microsoft.icon",
)
@app.route("/")
def index():
# If the user is logged in, redirect to a protected page like /accounts
def index() -> Union[Response, str]:
"""Renders the index page or redirects to home if logged in."""
if session.get("logged_in"):
return redirect(url_for("home"))
return render_template("index.html")
@app.route("/home")
@cache.cached(timeout=60) # cache for 120 seconds
def home():
@cache.cached(timeout=60)
def home() -> str:
"""Renders the home page with account statistics."""
if session.get("logged_in"):
base_url = app.config["BASE_URL"] # Access base_url from the config
base_url = app.config["BASE_URL"]
all_accounts = get_user_accounts(base_url, session["auth_credentials"])
count = len(all_accounts)
current_month_accounts = filter_accounts_next_30_days(all_accounts)
expired_accounts = filter_accounts_expired(all_accounts)
return render_template(
"home.html",
username=session["username"],
accounts=count,
current_month_accounts=current_month_accounts,
expired_accounts=expired_accounts,
accounts=len(all_accounts),
current_month_accounts=filter_accounts_next_30_days(all_accounts),
expired_accounts=filter_accounts_expired(all_accounts),
ocr_enabled=app.config.get("OCR_ENABLED"),
)
return render_template("index.html")
@app.route("/login", methods=["POST"])
def login():
def login() -> Union[Response, str]:
"""Handles user login."""
username = request.form["username"]
password = request.form["password"]
# Encode the username and password in Base64
credentials = f"{username}:{password}"
encoded_credentials = base64.b64encode(credentials.encode()).decode()
base_url = app.config["BASE_URL"]
login_url = f"{base_url}/Login"
base_url = app.config["BASE_URL"] # Access base_url from the config
login_url = f"{base_url}/Login" # Construct the full URL
# Send GET request to the external login API with Basic Auth
try:
response = requests.get(
login_url, auth=requests.auth.HTTPBasicAuth(username, password)
)
# Check if login was successful
if response.status_code == 200 and response.json().get("auth") == "Success":
# Set session variable to indicate the user is logged in
response.raise_for_status()
if response.json().get("auth") == "Success":
session["logged_in"] = True
session["username"] = username
session["auth_credentials"] = encoded_credentials
return redirect(url_for("home")) # Redirect to the Accounts page
else:
# Show error on the login page
return redirect(url_for("home"))
except requests.exceptions.RequestException:
pass # Fall through to error
error = "Invalid username or password. Please try again."
return render_template("index.html", error=error)
@app.route("/urls", methods=["GET"])
@cache.cached(timeout=300) # cache for 5 minutes
def urls():
# Check if the user is logged in
@cache.cached(timeout=300)
def urls() -> Union[Response, str]:
"""Renders the URLs page."""
if not session.get("logged_in"):
return redirect(url_for("home"))
# Placeholder content for Accounts page
base_url = app.config["BASE_URL"] # Access base_url from the config
base_url = app.config["BASE_URL"]
return render_template(
"urls.html", urls=get_urls(base_url, session["auth_credentials"])
)
@app.route("/accounts", methods=["GET"])
def user_accounts():
# Check if the user is logged in
def user_accounts() -> Union[Response, str]:
"""Renders the user accounts page."""
if not session.get("logged_in"):
return redirect(url_for("home"))
# Placeholder content for Accounts page
base_url = app.config["BASE_URL"] # Access base_url from the config
base_url = app.config["BASE_URL"]
user_accounts_data = get_user_accounts(base_url, session["auth_credentials"])
# Clear the cache for 'user_accounts' view specifically
cache.delete_memoized(user_accounts)
return render_template(
"user_accounts.html",
username=session["username"],
@ -153,9 +159,9 @@ def user_accounts():
auth=session["auth_credentials"],
)
@app.route("/accounts/add", methods=["GET", "POST"])
def add_account():
def add_account() -> Union[Response, str]:
"""Handles adding a new user account."""
base_url = app.config["BASE_URL"]
shared_text = request.args.get('shared_text')
@ -163,72 +169,84 @@ def add_account():
username = request.form["username"]
password = request.form["password"]
stream = request.form["stream"]
if add_user_account(
base_url, session["auth_credentials"], username, password, stream
):
cache.clear()
return redirect(url_for("user_accounts"))
return render_template("add_account.html", ocr_enabled=app.config.get("OCR_ENABLED"), text_input_enabled=app.config.get("TEXT_INPUT_ENABLED"), shared_text=shared_text)
return render_template("add_account.html", ocr_enabled=app.config.get("OCR_ENABLED"), text_input_enabled=app.config.get("TEXT_INPUT_ENABLED"), shared_text=shared_text)
return render_template(
"add_account.html",
ocr_enabled=app.config.get("OCR_ENABLED"),
text_input_enabled=app.config.get("TEXT_INPUT_ENABLED"),
shared_text=shared_text
)
@app.route("/accounts/delete", methods=["POST"])
def delete_account():
def delete_account() -> Response:
"""Handles deleting a user account."""
stream = request.form.get("stream")
username = request.form.get("username")
base_url = app.config["BASE_URL"]
if delete_user_account(base_url, session["auth_credentials"], stream, username):
delete_user_account(base_url, session["auth_credentials"], stream, username)
return redirect(url_for("user_accounts"))
return redirect(url_for("user_accounts"))
@app.route("/validateAccount", methods=["POST"])
def validate_account():
def validate_account() -> Tuple[Response, int]:
"""Forwards account validation requests to the backend."""
base_url = app.config["BASE_URL"]
validate_url = f"{base_url}/validateAccount"
# Forward the request to the backend API
credentials = base64.b64decode(session["auth_credentials"]).decode()
username, password = credentials.split(":", 1)
try:
response = requests.post(
validate_url,
auth=requests.auth.HTTPBasicAuth(username, password),
json=request.get_json()
)
response.raise_for_status()
return jsonify(response.json()), response.status_code
except requests.exceptions.RequestException as e:
return jsonify({"error": str(e)}), 500
@app.route("/get_stream_names", methods=["GET"])
def stream_names():
def stream_names() -> Union[Response, str]:
"""Fetches and returns stream names as JSON."""
if not session.get("logged_in"):
return redirect(url_for("home"))
base_url = app.config["BASE_URL"]
stream_names = get_stream_names(base_url, session["auth_credentials"])
return jsonify(stream_names)
return jsonify(get_stream_names(base_url, session["auth_credentials"]))
if app.config.get("OCR_ENABLED"):
@app.route('/OCRupload', methods=['POST'])
def OCRupload():
def ocr_upload() -> Union[Response, str, Tuple[Response, int]]:
"""Handles image uploads for OCR processing."""
if 'image' not in request.files:
return jsonify({"error": "No image file found"}), 400
# Get the uploaded file
file = request.files['image']
try:
image = Image.open(file.stream)
image_np = np.array(image)
result = ocr.ocr(image_np)
# Extract text
extracted_text = []
for line in result[0]:
extracted_text.append(line[1][0])
return render_template("add_account.html", username=extracted_text[2], password=extracted_text[3], ocr_enabled=app.config.get("OCR_ENABLED"), text_input_enabled=app.config.get("TEXT_INPUT_ENABLED"))
extracted_text = [line[1][0] for line in result[0]]
# Basic validation
if len(extracted_text) >= 4:
return render_template(
"add_account.html",
username=extracted_text[2],
password=extracted_text[3],
ocr_enabled=True,
text_input_enabled=app.config.get("TEXT_INPUT_ENABLED")
)
else:
return jsonify({"error": "Could not extract required fields from image"}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == "__main__":
app.run(debug=app.config["DEBUG"], host=app.config["HOST"], port=app.config["PORT"])
app.run(
debug=app.config["DEBUG"],
host=app.config["HOST"],
port=app.config["PORT"]
)

View File

@ -1,13 +1,16 @@
from datetime import datetime, timedelta
from typing import List, Dict
from typing import List, Dict, Any
def filter_accounts_next_30_days(accounts: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Filters accounts expiring within the next 30 days.
def filter_accounts_next_30_days(accounts: List[Dict[str, int]]) -> List[Dict[str, int]]:
"""Filter accounts whose expiry date falls within the next 30 days.
Args:
accounts (List[Dict[str, int]]): A list of account dictionaries, each containing
an 'expiaryDate' key with an epoch timestamp as its value.
accounts: A list of account dictionaries, each with an 'expiaryDate'
(epoch timestamp).
Returns:
List[Dict[str, int]]: A list of accounts expiring within the next 30 days.
A list of accounts expiring within the next 30 days, with added
'expiaryDate_rendered' and 'days_to_expiry' keys.
"""
now = datetime.now()
thirty_days_later = now + timedelta(days=30)
@ -26,18 +29,18 @@ def filter_accounts_next_30_days(accounts: List[Dict[str, int]]) -> List[Dict[st
result.append(account)
return result
def filter_accounts_expired(accounts: List[Dict[str, int]]) -> List[Dict[str, int]]:
"""Filter accounts whose expiry date has passed.
def filter_accounts_expired(accounts: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Filters accounts that have already expired.
Args:
accounts (List[Dict[str, int]]): A list of account dictionaries, each containing
an 'expiaryDate' key with an epoch timestamp as its value.
accounts: A list of account dictionaries, each with an 'expiaryDate'
(epoch timestamp).
Returns:
List[Dict[str, int]]: A list of accounts that have expired.
A list of expired accounts with an added 'expiaryDate_rendered' key.
"""
# Get the current epoch timestamp
current_timestamp = int(datetime.now().timestamp())
# Filter accounts where the current date is greater than the expiryDate
expired_accounts = []
for account in accounts:
if account['expiaryDate'] < current_timestamp:

View File

@ -5,106 +5,101 @@ from typing import List, Dict, Any
def get_urls(base_url: str, auth: str) -> List[Dict[str, Any]]:
"""Retrieve user account streams from the specified base URL.
"""Retrieves user account streams from the API.
Args:
base_url (str): The base URL of the API.
auth (str): The authorization token for accessing the API.
base_url: The base URL of the API.
auth: The authorization token.
Returns:
List[Dict[str, Any]]: A list of user account streams.
A list of user account streams.
"""
url = f"{base_url}/getUserAccounts/streams"
payload = {}
headers = {"Authorization": f"Basic {auth}"}
response = requests.request("GET", url, headers=headers, data=payload)
return json.loads(response.text)
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
def get_user_accounts(base_url: str, auth: str) -> List[Dict[str, Any]]:
"""Retrieve user accounts from the specified base URL.
"""Retrieves user accounts from the API.
Args:
base_url (str): The base URL of the API.
auth (str): The authorization token for accessing the API.
base_url: The base URL of the API.
auth: The authorization token.
Returns:
List[Dict[str, Any]]: A list of user accounts with their expiration dates rendered.
A list of user accounts with 'expiaryDate_rendered' added.
"""
url = f"{base_url}/getUserAccounts"
payload = {}
headers = {"Authorization": f"Basic {auth}"}
response = requests.get(url, headers=headers)
response.raise_for_status()
accounts = response.json()
response = requests.request("GET", url, headers=headers, data=payload)
res_json = json.loads(response.text)
for account in res_json:
for account in accounts:
account["expiaryDate_rendered"] = datetime.utcfromtimestamp(
account["expiaryDate"]
).strftime("%d/%m/%Y")
return res_json
return accounts
def delete_user_account(base_url: str, auth: str, stream: str, username: str) -> bool:
"""Delete a user account from the specified base URL.
"""Deletes a user account via the API.
Args:
base_url (str): The base URL of the API.
auth (str): The authorization token for accessing the API.
stream (str): The name of the stream associated with the user account.
username (str): The username of the account to delete.
base_url: The base URL of the API.
auth: The authorization token.
stream: The stream associated with the account.
username: The username of the account to delete.
Returns:
bool: True if the account was deleted successfully, False otherwise.
True if the account was deleted successfully, False otherwise.
"""
url = f"{base_url}/deleteAccount"
payload = {"stream": stream, "user": username}
headers = {"Authorization": f"Basic {auth}"}
response = requests.request("POST", url, headers=headers, data=payload)
response = requests.post(url, headers=headers, data=payload)
response.raise_for_status()
return "Deleted" in response.text
def add_user_account(base_url: str, auth: str, username: str, password: str, stream: str) -> bool:
"""Add a user account to the specified base URL.
"""Adds a user account via the API.
Args:
base_url (str): The base URL of the API.
auth (str): The authorization token for accessing the API.
username (str): The username of the account to add.
password (str): The password of the account to add.
stream (str): The name of the stream associated with the user account.
base_url: The base URL of the API.
auth: The authorization token.
username: The username of the new account.
password: The password for the new account.
stream: The stream to associate with the new account.
Returns:
bool: True if the account was added successfully, False otherwise.
True if the account was added successfully, False otherwise.
"""
url = f"{base_url}/addAccount"
payload = {"username": username, "password": password, "stream": stream}
headers = {"Authorization": f"Basic {auth}"}
response = requests.request("POST", url, headers=headers, data=payload)
response = requests.post(url, headers=headers, data=payload)
return response.status_code == 200
def get_stream_names(base_url: str, auth: str) -> List[str]:
"""Get a list of stream names from the API.
"""Retrieves a list of stream names from the API.
Args:
base_url (str): The base URL of the API.
auth (str): The authorization token.
base_url: The base URL of the API.
auth: The authorization token.
Returns:
List[str]: A list of stream names.
A list of stream names, or an empty list if an error occurs.
"""
url = f"{base_url}/getStreamNames"
payload = {}
headers = {"Authorization": f"Basic {auth}"}
response = requests.request("GET", url, headers=headers, data=payload)
if response.status_code == 200 and response.text:
try:
return json.loads(response.text)
except json.JSONDecodeError:
return []
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()
except (requests.exceptions.RequestException, json.JSONDecodeError):
return []