KTVManager_UI/app.py
2025-03-03 08:37:45 +00:00

187 lines
6.4 KiB
Python

# app.py
from flask import Flask, render_template, request, redirect, url_for, session, send_file, jsonify
from flask_caching import Cache
import requests.auth
import os
from lib.datetime import filter_accounts_next_30_days, filter_accounts_expired
from lib.reqs import get_urls, get_user_accounts, add_user_account, delete_user_account, get_user_accounts_count
from flask import send_from_directory
import requests
import base64
from flask import Flask
from config import DevelopmentConfig
from paddleocr import PaddleOCR
from PIL import Image
import numpy as np
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
app = Flask(__name__)
app.config.from_object(
DevelopmentConfig
)
cache = Cache(app, config={"CACHE_TYPE": "SimpleCache"})
ocr = PaddleOCR(use_angle_cls=True, lang='en') # Adjust language if needed
app.config['SESSION_COOKIE_SECURE'] = True # Only send cookie over HTTPS
app.config['SESSION_COOKIE_HTTPONLY'] = True # Prevent JavaScript access
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' # Adjust for cross-site requests
app.config['PERMANENT_SESSION_LIFETIME'] = 60 * 60 * 24 * 365 # 1 year in seconds
cache.clear() # Clears all cache entries
@app.before_request
def make_session_permanent():
session.permanent = True
@app.route('/manifest.json')
def serve_manifest():
return send_file('manifest.json', mimetype='application/manifest+json')
@app.route("/favicon.ico")
def favicon():
return send_from_directory(
os.path.join(app.root_path, "static"),
"favicon.ico",
mimetype="image/vnd.microsoft.icon",
)
@app.route("/")
def index():
# If the user is logged in, redirect to a protected page like /accounts
if session.get("logged_in"):
return redirect(url_for("home"))
return render_template("index.html")
@app.route("/home")
@cache.cached(timeout=60) # cache for 120 seconds
def home():
if session.get("logged_in"):
base_url = app.config["BASE_URL"] # Access base_url from the config
all_accounts = get_user_accounts(base_url, session["auth_credentials"])
count = len(all_accounts)
current_month_accounts = filter_accounts_next_30_days(all_accounts)
expired_accounts = filter_accounts_expired(all_accounts)
return render_template(
"home.html",
username=session["username"],
accounts=count,
current_month_accounts=current_month_accounts,
expired_accounts=expired_accounts,
)
return render_template("index.html")
@app.route("/login", methods=["POST"])
def login():
username = request.form["username"]
password = request.form["password"]
# Encode the username and password in Base64
credentials = f"{username}:{password}"
encoded_credentials = base64.b64encode(credentials.encode()).decode()
base_url = app.config["BASE_URL"] # Access base_url from the config
login_url = f"{base_url}/Login" # Construct the full URL
# Send GET request to the external login API with Basic Auth
response = requests.get(
login_url, auth=requests.auth.HTTPBasicAuth(username, password)
)
# Check if login was successful
if response.status_code == 200 and response.json().get("auth") == "Success":
# Set session variable to indicate the user is logged in
session["logged_in"] = True
session["username"] = username
session["auth_credentials"] = encoded_credentials
return redirect(url_for("home")) # Redirect to the Accounts page
else:
# Show error on the login page
error = "Invalid username or password. Please try again."
return render_template("index.html", error=error)
@app.route("/urls", methods=["GET"])
@cache.cached(timeout=300) # cache for 5 minutes
def urls():
# Check if the user is logged in
if not session.get("logged_in"):
return redirect(url_for("home"))
# Placeholder content for Accounts page
base_url = app.config["BASE_URL"] # Access base_url from the config
return render_template(
"urls.html", urls=get_urls(base_url, session["auth_credentials"])
)
@app.route("/accounts", methods=["GET"])
@cache.cached(timeout=120) # cache for 120 seconds
def user_accounts():
# Check if the user is logged in
if not session.get("logged_in"):
return redirect(url_for("home"))
# Placeholder content for Accounts page
base_url = app.config["BASE_URL"] # Access base_url from the config
return render_template(
"user_accounts.html",
username=session["username"],
user_accounts=get_user_accounts(base_url, session["auth_credentials"]),
auth=session["auth_credentials"],
)
@app.route("/accounts/add", methods=["GET", "POST"])
def add_account():
base_url = app.config["BASE_URL"] # Access base_url from the config
if request.method == "POST":
username = request.form["username"]
password = request.form["password"]
stream = request.form["stream"]
if add_user_account(
base_url, session["auth_credentials"], username, password, stream
):
cache.clear() # Clears all cache entries
return redirect(url_for("user_accounts"))
return render_template("add_account.html")
return render_template("add_account.html")
@app.route("/accounts/delete", methods=["POST"])
def delete_account():
stream = request.form.get("stream")
username = request.form.get("username")
base_url = app.config["BASE_URL"]
if delete_user_account(base_url, session["auth_credentials"], stream, username):
cache.clear() # Clears all cache entries
return redirect(url_for("user_accounts"))
return redirect(url_for("user_accounts"))
@app.route('/OCRupload', methods=['POST'])
def OCRupload():
if 'image' not in request.files:
return jsonify({"error": "No image file found"}), 400
# Get the uploaded file
file = request.files['image']
try:
image = Image.open(file.stream)
image_np = np.array(image)
result = ocr.ocr(image_np)
# Extract text
extracted_text = []
for line in result[0]:
extracted_text.append(line[1][0])
return render_template("add_account.html", username=extracted_text[2], password=extracted_text[3])
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == "__main__":
app.run(debug=app.config["DEBUG"], host=app.config["HOST"], port=app.config["PORT"])