All checks were successful
		
		
	
	Build and Publish Docker Image / build-and-push (push) Successful in 4m16s
				
			
		
			
				
	
	
		
			218 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			218 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # app.py
 | |
| from flask import Flask, render_template, request, redirect, url_for, session, send_file, jsonify
 | |
| from flask_caching import Cache
 | |
| import requests.auth
 | |
| import os
 | |
| from lib.datetime import filter_accounts_next_30_days, filter_accounts_expired
 | |
| from lib.reqs import get_urls, get_user_accounts, add_user_account, delete_user_account, get_user_accounts_count, get_stream_names
 | |
| from flask import send_from_directory
 | |
| import requests
 | |
| import base64
 | |
| from flask import Flask
 | |
| from config import DevelopmentConfig, ProductionConfig
 | |
| from paddleocr import PaddleOCR
 | |
| from PIL import Image
 | |
| import numpy as np
 | |
| 
 | |
| 
 | |
| os.environ["OMP_NUM_THREADS"] = "1"
 | |
| os.environ["MKL_NUM_THREADS"] = "1"
 | |
| 
 | |
| app = Flask(__name__)
 | |
| 
 | |
| if os.environ.get("FLASK_ENV") == "production":
 | |
|     app.config.from_object(ProductionConfig)
 | |
| else:
 | |
|     app.config.from_object(DevelopmentConfig)
 | |
| cache = Cache(app, config={"CACHE_TYPE": "SimpleCache"})
 | |
| 
 | |
| if app.config.get("OCR_ENABLED"):
 | |
|     ocr = PaddleOCR(use_angle_cls=True, lang='en')  # Adjust language if needed
 | |
| 
 | |
| app.config["SESSION_COOKIE_SECURE"] = not app.config["DEBUG"]
 | |
| app.config['SESSION_COOKIE_HTTPONLY'] = True  # Prevent JavaScript access
 | |
| app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'  # Adjust for cross-site requests
 | |
| app.config['PERMANENT_SESSION_LIFETIME'] = 60 * 60 * 24 * 365  # 1 year in seconds
 | |
| cache.clear()  # Clears all cache entries
 | |
| 
 | |
| def get_version():
 | |
|     try:
 | |
|         with open('VERSION', 'r') as f:
 | |
|             return f.read().strip()
 | |
|     except FileNotFoundError:
 | |
|         return 'dev'
 | |
| 
 | |
| @app.context_processor
 | |
| def inject_version():
 | |
|     return dict(version=get_version())
 | |
| 
 | |
| @app.before_request
 | |
| def make_session_permanent():
 | |
|     session.permanent = True
 | |
| 
 | |
| @app.route('/site.webmanifest')
 | |
| def serve_manifest():
 | |
|     return send_from_directory(os.path.join(app.root_path, 'static'), 'site.webmanifest', mimetype='application/manifest+json')
 | |
| 
 | |
| @app.route("/favicon.ico")
 | |
| def favicon():
 | |
|     return send_from_directory(
 | |
|         os.path.join(app.root_path, "static"),
 | |
|         "favicon.ico",
 | |
|         mimetype="image/vnd.microsoft.icon",
 | |
|     )
 | |
| 
 | |
| 
 | |
| @app.route("/")
 | |
| def index():
 | |
|     # If the user is logged in, redirect to a protected page like /accounts
 | |
|     if session.get("logged_in"):
 | |
|         return redirect(url_for("home"))
 | |
|     return render_template("index.html")
 | |
| 
 | |
| 
 | |
| @app.route("/home")
 | |
| @cache.cached(timeout=60)  # cache for 120 seconds
 | |
| def home():
 | |
|     if session.get("logged_in"):
 | |
|         base_url = app.config["BASE_URL"]  # Access base_url from the config
 | |
|         all_accounts = get_user_accounts(base_url, session["auth_credentials"])
 | |
|         count = len(all_accounts)
 | |
|         current_month_accounts = filter_accounts_next_30_days(all_accounts)
 | |
|         expired_accounts = filter_accounts_expired(all_accounts)
 | |
|         return render_template(
 | |
|             "home.html",
 | |
|             username=session["username"],
 | |
|             accounts=count,
 | |
|             current_month_accounts=current_month_accounts,
 | |
|             expired_accounts=expired_accounts,
 | |
|             ocr_enabled=app.config.get("OCR_ENABLED"),
 | |
|         )
 | |
|     return render_template("index.html")
 | |
| 
 | |
| 
 | |
| @app.route("/login", methods=["POST"])
 | |
| def login():
 | |
|     username = request.form["username"]
 | |
|     password = request.form["password"]
 | |
| 
 | |
|     # Encode the username and password in Base64
 | |
|     credentials = f"{username}:{password}"
 | |
|     encoded_credentials = base64.b64encode(credentials.encode()).decode()
 | |
| 
 | |
|     base_url = app.config["BASE_URL"]  # Access base_url from the config
 | |
|     login_url = f"{base_url}/Login"  # Construct the full URL
 | |
| 
 | |
|     # Send GET request to the external login API with Basic Auth
 | |
|     response = requests.get(
 | |
|         login_url, auth=requests.auth.HTTPBasicAuth(username, password)
 | |
|     )
 | |
| 
 | |
|     # Check if login was successful
 | |
|     if response.status_code == 200 and response.json().get("auth") == "Success":
 | |
|         # Set session variable to indicate the user is logged in
 | |
|         session["logged_in"] = True
 | |
|         session["username"] = username
 | |
|         session["auth_credentials"] = encoded_credentials
 | |
|         return redirect(url_for("home"))  # Redirect to the Accounts page
 | |
|     else:
 | |
|         # Show error on the login page
 | |
|         error = "Invalid username or password. Please try again."
 | |
|         return render_template("index.html", error=error)
 | |
| 
 | |
| 
 | |
| @app.route("/urls", methods=["GET"])
 | |
| @cache.cached(timeout=300)  # cache for 5 minutes
 | |
| def urls():
 | |
|     # Check if the user is logged in
 | |
|     if not session.get("logged_in"):
 | |
|         return redirect(url_for("home"))
 | |
|     # Placeholder content for Accounts page
 | |
|     base_url = app.config["BASE_URL"]  # Access base_url from the config
 | |
|     return render_template(
 | |
|         "urls.html", urls=get_urls(base_url, session["auth_credentials"])
 | |
|     )
 | |
| 
 | |
| 
 | |
| @app.route("/accounts", methods=["GET"])
 | |
| def user_accounts():
 | |
|     # Check if the user is logged in
 | |
|     if not session.get("logged_in"):
 | |
|         return redirect(url_for("home"))
 | |
|     # Placeholder content for Accounts page
 | |
|     base_url = app.config["BASE_URL"]  # Access base_url from the config
 | |
|     user_accounts_data = get_user_accounts(base_url, session["auth_credentials"])
 | |
|     
 | |
|     # Clear the cache for 'user_accounts' view specifically
 | |
|     cache.delete_memoized(user_accounts)
 | |
|     
 | |
|     return render_template(
 | |
|         "user_accounts.html",
 | |
|         username=session["username"],
 | |
|         user_accounts=user_accounts_data,
 | |
|         auth=session["auth_credentials"],
 | |
|     )
 | |
| 
 | |
| 
 | |
| @app.route("/accounts/add", methods=["GET", "POST"])
 | |
| def add_account():
 | |
|     base_url = app.config["BASE_URL"]
 | |
|     shared_text = request.args.get('shared_text')
 | |
| 
 | |
|     if request.method == "POST":
 | |
|         username = request.form["username"]
 | |
|         password = request.form["password"]
 | |
|         stream = request.form["stream"]
 | |
| 
 | |
|         if add_user_account(
 | |
|             base_url, session["auth_credentials"], username, password, stream
 | |
|         ):
 | |
|             cache.clear()
 | |
|             return redirect(url_for("user_accounts"))
 | |
|         return render_template("add_account.html", ocr_enabled=app.config.get("OCR_ENABLED"), text_input_enabled=app.config.get("TEXT_INPUT_ENABLED"), shared_text=shared_text)
 | |
| 
 | |
|     return render_template("add_account.html", ocr_enabled=app.config.get("OCR_ENABLED"), text_input_enabled=app.config.get("TEXT_INPUT_ENABLED"), shared_text=shared_text)
 | |
| 
 | |
| 
 | |
| @app.route("/accounts/delete", methods=["POST"])
 | |
| def delete_account():
 | |
|     stream = request.form.get("stream")
 | |
|     username = request.form.get("username")
 | |
|     base_url = app.config["BASE_URL"]
 | |
| 
 | |
|     if delete_user_account(base_url, session["auth_credentials"], stream, username):
 | |
|         return redirect(url_for("user_accounts"))
 | |
|     return redirect(url_for("user_accounts"))
 | |
| 
 | |
| 
 | |
| @app.route("/get_stream_names", methods=["GET"])
 | |
| def stream_names():
 | |
|     if not session.get("logged_in"):
 | |
|         return redirect(url_for("home"))
 | |
|     base_url = app.config["BASE_URL"]
 | |
|     stream_names = get_stream_names(base_url, session["auth_credentials"])
 | |
|     return jsonify(stream_names)
 | |
| 
 | |
| 
 | |
| if app.config.get("OCR_ENABLED"):
 | |
|     @app.route('/OCRupload', methods=['POST'])
 | |
|     def OCRupload():
 | |
|         if 'image' not in request.files:
 | |
|             return jsonify({"error": "No image file found"}), 400
 | |
|         # Get the uploaded file
 | |
|         file = request.files['image']
 | |
|         try:
 | |
|             image = Image.open(file.stream)
 | |
|             image_np = np.array(image)
 | |
|             result = ocr.ocr(image_np)
 | |
|             # Extract text
 | |
|             extracted_text = []
 | |
|             for line in result[0]:
 | |
|                 extracted_text.append(line[1][0])
 | |
|             return render_template("add_account.html", username=extracted_text[2], password=extracted_text[3], ocr_enabled=app.config.get("OCR_ENABLED"), text_input_enabled=app.config.get("TEXT_INPUT_ENABLED"))
 | |
|         except Exception as e:
 | |
|             return jsonify({"error": str(e)}), 500
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     app.run(debug=app.config["DEBUG"], host=app.config["HOST"], port=app.config["PORT"])
 |