The OCR feature can now be enabled or disabled using a new `OCR_ENABLED` configuration flag. This allows for more flexible deployments where the OCR feature is not required, reducing the application's resource footprint and dependency overhead. The backend now conditionally initializes the OCR engine and its associated route. The frontend UI for OCR uploads is also rendered conditionally based on this setting.
213 lines
7.5 KiB
Python
213 lines
7.5 KiB
Python
# app.py
|
|
from flask import Flask, render_template, request, redirect, url_for, session, send_file, jsonify
|
|
from flask_caching import Cache
|
|
import requests.auth
|
|
import os
|
|
from lib.datetime import filter_accounts_next_30_days, filter_accounts_expired
|
|
from lib.reqs import get_urls, get_user_accounts, add_user_account, delete_user_account, get_user_accounts_count, get_stream_names
|
|
from flask import send_from_directory
|
|
import requests
|
|
import base64
|
|
from flask import Flask
|
|
from config import DevelopmentConfig, ProductionConfig
|
|
from paddleocr import PaddleOCR
|
|
from PIL import Image
|
|
import numpy as np
|
|
|
|
|
|
os.environ["OMP_NUM_THREADS"] = "1"
|
|
os.environ["MKL_NUM_THREADS"] = "1"
|
|
|
|
app = Flask(__name__)
|
|
|
|
if os.environ.get("FLASK_ENV") == "production":
|
|
app.config.from_object(ProductionConfig)
|
|
else:
|
|
app.config.from_object(DevelopmentConfig)
|
|
cache = Cache(app, config={"CACHE_TYPE": "SimpleCache"})
|
|
|
|
if app.config.get("OCR_ENABLED"):
|
|
ocr = PaddleOCR(use_angle_cls=True, lang='en') # Adjust language if needed
|
|
|
|
app.config["SESSION_COOKIE_SECURE"] = not app.config["DEBUG"]
|
|
app.config['SESSION_COOKIE_HTTPONLY'] = True # Prevent JavaScript access
|
|
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' # Adjust for cross-site requests
|
|
app.config['PERMANENT_SESSION_LIFETIME'] = 60 * 60 * 24 * 365 # 1 year in seconds
|
|
cache.clear() # Clears all cache entries
|
|
|
|
def get_version():
|
|
try:
|
|
with open('VERSION', 'r') as f:
|
|
return f.read().strip()
|
|
except FileNotFoundError:
|
|
return 'dev'
|
|
|
|
@app.context_processor
|
|
def inject_version():
|
|
return dict(version=get_version())
|
|
|
|
@app.before_request
|
|
def make_session_permanent():
|
|
session.permanent = True
|
|
|
|
@app.route('/manifest.json')
|
|
def serve_manifest():
|
|
return send_file('manifest.json', mimetype='application/manifest+json')
|
|
|
|
@app.route("/favicon.ico")
|
|
def favicon():
|
|
return send_from_directory(
|
|
os.path.join(app.root_path, "static"),
|
|
"favicon.ico",
|
|
mimetype="image/vnd.microsoft.icon",
|
|
)
|
|
|
|
|
|
@app.route("/")
|
|
def index():
|
|
# If the user is logged in, redirect to a protected page like /accounts
|
|
if session.get("logged_in"):
|
|
return redirect(url_for("home"))
|
|
return render_template("index.html")
|
|
|
|
|
|
@app.route("/home")
|
|
@cache.cached(timeout=60) # cache for 120 seconds
|
|
def home():
|
|
if session.get("logged_in"):
|
|
base_url = app.config["BASE_URL"] # Access base_url from the config
|
|
all_accounts = get_user_accounts(base_url, session["auth_credentials"])
|
|
count = len(all_accounts)
|
|
current_month_accounts = filter_accounts_next_30_days(all_accounts)
|
|
expired_accounts = filter_accounts_expired(all_accounts)
|
|
return render_template(
|
|
"home.html",
|
|
username=session["username"],
|
|
accounts=count,
|
|
current_month_accounts=current_month_accounts,
|
|
expired_accounts=expired_accounts,
|
|
ocr_enabled=app.config.get("OCR_ENABLED"),
|
|
)
|
|
return render_template("index.html")
|
|
|
|
|
|
@app.route("/login", methods=["POST"])
|
|
def login():
|
|
username = request.form["username"]
|
|
password = request.form["password"]
|
|
|
|
# Encode the username and password in Base64
|
|
credentials = f"{username}:{password}"
|
|
encoded_credentials = base64.b64encode(credentials.encode()).decode()
|
|
|
|
base_url = app.config["BASE_URL"] # Access base_url from the config
|
|
login_url = f"{base_url}/Login" # Construct the full URL
|
|
|
|
# Send GET request to the external login API with Basic Auth
|
|
response = requests.get(
|
|
login_url, auth=requests.auth.HTTPBasicAuth(username, password)
|
|
)
|
|
|
|
# Check if login was successful
|
|
if response.status_code == 200 and response.json().get("auth") == "Success":
|
|
# Set session variable to indicate the user is logged in
|
|
session["logged_in"] = True
|
|
session["username"] = username
|
|
session["auth_credentials"] = encoded_credentials
|
|
return redirect(url_for("home")) # Redirect to the Accounts page
|
|
else:
|
|
# Show error on the login page
|
|
error = "Invalid username or password. Please try again."
|
|
return render_template("index.html", error=error)
|
|
|
|
|
|
@app.route("/urls", methods=["GET"])
|
|
@cache.cached(timeout=300) # cache for 5 minutes
|
|
def urls():
|
|
# Check if the user is logged in
|
|
if not session.get("logged_in"):
|
|
return redirect(url_for("home"))
|
|
# Placeholder content for Accounts page
|
|
base_url = app.config["BASE_URL"] # Access base_url from the config
|
|
return render_template(
|
|
"urls.html", urls=get_urls(base_url, session["auth_credentials"])
|
|
)
|
|
|
|
|
|
@app.route("/accounts", methods=["GET"])
|
|
@cache.cached(timeout=120) # cache for 120 seconds
|
|
def user_accounts():
|
|
# Check if the user is logged in
|
|
if not session.get("logged_in"):
|
|
return redirect(url_for("home"))
|
|
# Placeholder content for Accounts page
|
|
base_url = app.config["BASE_URL"] # Access base_url from the config
|
|
return render_template(
|
|
"user_accounts.html",
|
|
username=session["username"],
|
|
user_accounts=get_user_accounts(base_url, session["auth_credentials"]),
|
|
auth=session["auth_credentials"],
|
|
)
|
|
|
|
|
|
@app.route("/accounts/add", methods=["GET", "POST"])
|
|
def add_account():
|
|
base_url = app.config["BASE_URL"] # Access base_url from the config
|
|
if request.method == "POST":
|
|
username = request.form["username"]
|
|
password = request.form["password"]
|
|
stream = request.form["stream"]
|
|
|
|
if add_user_account(
|
|
base_url, session["auth_credentials"], username, password, stream
|
|
):
|
|
cache.clear() # Clears all cache entries
|
|
return redirect(url_for("user_accounts"))
|
|
return render_template("add_account.html")
|
|
|
|
return render_template("add_account.html", ocr_enabled=app.config.get("OCR_ENABLED"))
|
|
|
|
|
|
@app.route("/accounts/delete", methods=["POST"])
|
|
def delete_account():
|
|
stream = request.form.get("stream")
|
|
username = request.form.get("username")
|
|
base_url = app.config["BASE_URL"]
|
|
|
|
if delete_user_account(base_url, session["auth_credentials"], stream, username):
|
|
cache.clear() # Clears all cache entries
|
|
return redirect(url_for("user_accounts"))
|
|
return redirect(url_for("user_accounts"))
|
|
|
|
|
|
@app.route("/get_stream_names", methods=["GET"])
|
|
def stream_names():
|
|
if not session.get("logged_in"):
|
|
return redirect(url_for("home"))
|
|
base_url = app.config["BASE_URL"]
|
|
stream_names = get_stream_names(base_url, session["auth_credentials"])
|
|
return jsonify(stream_names)
|
|
|
|
|
|
if app.config.get("OCR_ENABLED"):
|
|
@app.route('/OCRupload', methods=['POST'])
|
|
def OCRupload():
|
|
if 'image' not in request.files:
|
|
return jsonify({"error": "No image file found"}), 400
|
|
# Get the uploaded file
|
|
file = request.files['image']
|
|
try:
|
|
image = Image.open(file.stream)
|
|
image_np = np.array(image)
|
|
result = ocr.ocr(image_np)
|
|
# Extract text
|
|
extracted_text = []
|
|
for line in result[0]:
|
|
extracted_text.append(line[1][0])
|
|
return render_template("add_account.html", username=extracted_text[2], password=extracted_text[3])
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
if __name__ == "__main__":
|
|
app.run(debug=app.config["DEBUG"], host=app.config["HOST"], port=app.config["PORT"])
|