Update the `make_cache_key` function to include the request query string in the generated key. This allows for manual cache busting by appending version parameters to redirects. In the `delete_account` route, replace manual cache deletion logic with a redirect containing a timestamped nonce. This ensures that the user's account list and home page views are refreshed without requiring explicit knowledge of all internal cache keys.
616 lines
25 KiB
Python
616 lines
25 KiB
Python
# app.py
|
|
from flask import (Flask, render_template, request, redirect, url_for, session,
|
|
send_file, jsonify, send_from_directory, Response)
|
|
from flask_caching import Cache
|
|
import requests.auth
|
|
import os
|
|
import base64
|
|
from typing import Dict, Any, Tuple, Union
|
|
import sys
|
|
import redis
|
|
import json
|
|
import mysql.connector
|
|
import re
|
|
import threading
|
|
|
|
from lib.datetime import filter_accounts_next_30_days, filter_accounts_expired
|
|
from lib.reqs import (get_urls, get_user_accounts, add_user_account,
|
|
delete_user_account, get_stream_names)
|
|
from config import DevelopmentConfig, ProductionConfig
|
|
|
|
|
|
os.environ["OMP_NUM_THREADS"] = "1"
|
|
os.environ["MKL_NUM_THREADS"] = "1"
|
|
|
|
app = Flask(__name__)
|
|
|
|
if os.environ.get("FLASK_ENV") == "production":
|
|
app.config.from_object(ProductionConfig)
|
|
else:
|
|
app.config.from_object(DevelopmentConfig)
|
|
|
|
# Check for Redis availability and configure cache
|
|
redis_url = app.config["REDIS_URL"]
|
|
cache_config = {"CACHE_TYPE": "redis", "CACHE_REDIS_URL": redis_url}
|
|
try:
|
|
# Use a short timeout to prevent hanging
|
|
r = redis.from_url(redis_url, socket_connect_timeout=1)
|
|
r.ping()
|
|
except redis.exceptions.ConnectionError as e:
|
|
print(
|
|
f"WARNING: Redis connection failed: {e}. Falling back to SimpleCache. "
|
|
"This is not recommended for production with multiple workers.",
|
|
file=sys.stderr,
|
|
)
|
|
cache_config = {"CACHE_TYPE": "SimpleCache"}
|
|
|
|
cache = Cache(app, config=cache_config)
|
|
|
|
app.config["OCR_ENABLED"] = False
|
|
|
|
app.config["SESSION_COOKIE_SECURE"] = not app.config["DEBUG"]
|
|
app.config['SESSION_COOKIE_HTTPONLY'] = True
|
|
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
|
|
app.config['PERMANENT_SESSION_LIFETIME'] = 60 * 60 * 24 * 365 # 1 year
|
|
|
|
def get_version() -> str:
|
|
"""Retrieves the application version from the VERSION file.
|
|
|
|
Returns:
|
|
The version string, or 'dev' if the file is not found.
|
|
"""
|
|
try:
|
|
with open('VERSION', 'r') as f:
|
|
return f.read().strip()
|
|
except FileNotFoundError:
|
|
return 'dev'
|
|
|
|
@app.context_processor
|
|
def inject_version() -> Dict[str, str]:
|
|
"""Injects the version into all templates."""
|
|
return dict(version=get_version(), config=app.config, session=session)
|
|
|
|
def make_cache_key(*args, **kwargs):
|
|
"""Generate a cache key based on the user's session and request path."""
|
|
username = session.get('username', 'anonymous')
|
|
path = request.path
|
|
# Include query string in cache key to bust cache with version parameter
|
|
if request.query_string:
|
|
return f"view/{username}/{path}?{request.query_string.decode()}"
|
|
return f"view/{username}/{path}"
|
|
|
|
@app.before_request
|
|
def make_session_permanent() -> None:
|
|
"""Makes the user session permanent."""
|
|
session.permanent = True
|
|
|
|
@app.route('/site.webmanifest')
|
|
def serve_manifest() -> Response:
|
|
"""Serves the site manifest file."""
|
|
return send_from_directory(
|
|
os.path.join(app.root_path, 'static'),
|
|
'site.webmanifest',
|
|
mimetype='application/manifest+json'
|
|
)
|
|
|
|
@app.route("/favicon.ico")
|
|
def favicon() -> Response:
|
|
"""Serves the favicon."""
|
|
return send_from_directory(
|
|
os.path.join(app.root_path, "static"),
|
|
"favicon.ico",
|
|
mimetype="image/vnd.microsoft.icon",
|
|
)
|
|
|
|
@app.route("/")
|
|
def index() -> Union[Response, str]:
|
|
"""Renders the index page or redirects to home if logged in."""
|
|
if session.get("logged_in"):
|
|
return redirect(url_for("home"))
|
|
return render_template("index.html")
|
|
|
|
@app.route('/vapid-public-key', methods=['GET'])
|
|
def proxy_vapid_public_key():
|
|
"""Proxies the request for the VAPID public key to the backend."""
|
|
backend_url = f"{app.config['BACKEND_URL']}/vapid-public-key"
|
|
try:
|
|
response = requests.get(backend_url)
|
|
return Response(response.content, status=response.status_code, mimetype=response.headers['Content-Type'])
|
|
except requests.exceptions.RequestException as e:
|
|
return jsonify({"error": str(e)}), 502
|
|
|
|
@app.route('/save-subscription', methods=['POST'])
|
|
def proxy_save_subscription():
|
|
"""Proxies the request to save a push subscription to the backend."""
|
|
if not session.get("logged_in"):
|
|
return jsonify({'error': 'Unauthorized'}), 401
|
|
|
|
backend_url = f"{app.config['BACKEND_URL']}/save-subscription"
|
|
credentials = base64.b64decode(session["auth_credentials"]).decode()
|
|
username, password = credentials.split(":", 1)
|
|
|
|
try:
|
|
response = requests.post(
|
|
backend_url,
|
|
auth=requests.auth.HTTPBasicAuth(username, password),
|
|
json=request.get_json()
|
|
)
|
|
return Response(response.content, status=response.status_code, mimetype=response.headers['Content-Type'])
|
|
except requests.exceptions.RequestException as e:
|
|
return jsonify({"error": str(e)}), 502
|
|
|
|
@app.route('/send-test-notification', methods=['POST'])
|
|
def send_test_notification():
|
|
"""Proxies the request to send a test notification to the backend."""
|
|
if not session.get("logged_in"):
|
|
return jsonify({'error': 'Unauthorized'}), 401
|
|
|
|
backend_url = f"{app.config['BACKEND_URL']}/send-test-notification"
|
|
credentials = base64.b64decode(session["auth_credentials"]).decode()
|
|
username, password = credentials.split(":", 1)
|
|
|
|
try:
|
|
response = requests.post(
|
|
backend_url,
|
|
auth=requests.auth.HTTPBasicAuth(username, password),
|
|
json={}
|
|
)
|
|
return Response(response.content, status=response.status_code, mimetype=response.headers['Content-Type'])
|
|
except requests.exceptions.RequestException as e:
|
|
return jsonify({"error": str(e)}), 502
|
|
|
|
@app.route("/home")
|
|
@cache.cached(timeout=60, key_prefix=make_cache_key)
|
|
def home() -> str:
|
|
"""Renders the home page with account statistics."""
|
|
if session.get("logged_in"):
|
|
base_url = app.config["BACKEND_URL"]
|
|
all_accounts = get_user_accounts(base_url, session["auth_credentials"])
|
|
return render_template(
|
|
"home.html",
|
|
username=session["username"],
|
|
accounts=len(all_accounts),
|
|
current_month_accounts=filter_accounts_next_30_days(all_accounts),
|
|
expired_accounts=filter_accounts_expired(all_accounts),
|
|
)
|
|
return render_template("index.html")
|
|
|
|
@app.route("/login", methods=["POST"])
|
|
def login() -> Union[Response, str]:
|
|
"""Handles user login."""
|
|
username = request.form["username"]
|
|
password = request.form["password"]
|
|
credentials = f"{username}:{password}"
|
|
encoded_credentials = base64.b64encode(credentials.encode()).decode()
|
|
base_url = app.config["BACKEND_URL"]
|
|
login_url = f"{base_url}/Login"
|
|
|
|
try:
|
|
response = requests.get(
|
|
login_url, auth=requests.auth.HTTPBasicAuth(username, password)
|
|
)
|
|
response.raise_for_status()
|
|
response_data = response.json()
|
|
if response_data.get("auth") == "Success":
|
|
# Ensure case-sensitive username comparison
|
|
if response_data.get("username") != username:
|
|
return render_template("index.html", error="Invalid username or password. Please try again.")
|
|
|
|
session["logged_in"] = True
|
|
session["username"] = response_data.get("username", username)
|
|
session["user_id"] = response_data.get("user_id")
|
|
session["auth_credentials"] = encoded_credentials
|
|
|
|
# Check if the user has at least one account
|
|
base_url = app.config["BACKEND_URL"]
|
|
all_accounts = get_user_accounts(base_url, session["auth_credentials"])
|
|
if not all_accounts:
|
|
return render_template("index.html", error="No accounts associated with this user.")
|
|
|
|
next_url = request.args.get("next")
|
|
if next_url:
|
|
return redirect(next_url)
|
|
return redirect(url_for("home", loggedin=True))
|
|
except requests.exceptions.RequestException:
|
|
pass # Fall through to error
|
|
|
|
error = "Invalid username or password. Please try again."
|
|
return render_template("index.html", error=error)
|
|
|
|
@app.route("/urls", methods=["GET"])
|
|
@cache.cached(timeout=300, key_prefix=make_cache_key)
|
|
def urls() -> Union[Response, str]:
|
|
"""Renders the URLs page."""
|
|
if not session.get("logged_in"):
|
|
return redirect(url_for("home"))
|
|
base_url = app.config["BACKEND_URL"]
|
|
return render_template(
|
|
"urls.html", urls=get_urls(base_url, session["auth_credentials"])
|
|
)
|
|
|
|
@app.route("/accounts", methods=["GET"])
|
|
@cache.cached(timeout=60, key_prefix=make_cache_key)
|
|
def user_accounts() -> Union[Response, str]:
|
|
"""Renders the user accounts page."""
|
|
if not session.get("logged_in"):
|
|
return redirect(url_for("home"))
|
|
base_url = app.config["BACKEND_URL"]
|
|
user_accounts_data = get_user_accounts(base_url, session["auth_credentials"])
|
|
return render_template(
|
|
"user_accounts.html",
|
|
username=session["username"],
|
|
user_accounts=user_accounts_data,
|
|
auth=session["auth_credentials"],
|
|
)
|
|
|
|
@app.route("/share", methods=["GET"])
|
|
def share() -> Response:
|
|
"""Handles shared text from PWA."""
|
|
if not session.get("logged_in"):
|
|
return redirect(url_for("index", next=request.url))
|
|
shared_text = request.args.get("text")
|
|
return redirect(url_for("add_account", shared_text=shared_text))
|
|
|
|
@app.route("/accounts/add", methods=["GET", "POST"])
|
|
def add_account() -> Union[Response, str]:
|
|
"""Handles adding a new user account."""
|
|
if not session.get("logged_in"):
|
|
return redirect(url_for("index", next=request.url))
|
|
base_url = app.config["BACKEND_URL"]
|
|
shared_text = request.args.get('shared_text')
|
|
|
|
if request.method == "POST":
|
|
username = request.form["username"]
|
|
password = request.form["password"]
|
|
stream = request.form["stream"]
|
|
result = add_user_account(
|
|
base_url, session["auth_credentials"], username, password, stream
|
|
)
|
|
if result is True:
|
|
# Clear cache for user accounts route
|
|
cache_key = f"view/{session['username']}/accounts"
|
|
cache.delete(cache_key)
|
|
# Also clear memoized version for good measure
|
|
cache.delete_memoized(user_accounts, key_prefix=make_cache_key)
|
|
# Clear home page cache as well since it shows account stats
|
|
cache_key_home = f"view/{session['username']}/home"
|
|
cache.delete(cache_key_home)
|
|
# Run the NPM config update in a background thread
|
|
thread = threading.Thread(target=_update_npm_config_in_background, args=(session["auth_credentials"],))
|
|
thread.start()
|
|
return redirect(url_for("user_accounts"))
|
|
elif result is False:
|
|
# Account already exists
|
|
error = "Account already exists for this user"
|
|
return render_template(
|
|
"add_account.html",
|
|
text_input_enabled=app.config.get("TEXT_INPUT_ENABLED"),
|
|
shared_text=shared_text,
|
|
error=error
|
|
)
|
|
|
|
return render_template(
|
|
"add_account.html",
|
|
text_input_enabled=app.config.get("TEXT_INPUT_ENABLED"),
|
|
shared_text=shared_text
|
|
)
|
|
|
|
@app.route("/accounts/delete", methods=["POST"])
|
|
def delete_account() -> Response:
|
|
"""Handles deleting a user account."""
|
|
account_id = request.form.get("id")
|
|
base_url = app.config["BACKEND_URL"]
|
|
delete_user_account(base_url, session["auth_credentials"], account_id)
|
|
# Run the NPM config update in a background thread to remove the deleted account's redirect
|
|
thread = threading.Thread(target=_update_npm_config_in_background, args=(session["auth_credentials"],))
|
|
thread.start()
|
|
# Redirect with a version nonce to bust all caches
|
|
import time
|
|
nonce = int(time.time())
|
|
return redirect(f"{url_for('user_accounts')}?_v={nonce}")
|
|
|
|
@app.route("/validateAccount", methods=["POST"])
|
|
def validate_account() -> Tuple[Response, int]:
|
|
"""Forwards account validation requests to the backend."""
|
|
base_url = app.config["BACKEND_URL"]
|
|
validate_url = f"{base_url}/validateAccount"
|
|
credentials = base64.b64decode(session["auth_credentials"]).decode()
|
|
username, password = credentials.split(":", 1)
|
|
|
|
try:
|
|
response = requests.post(
|
|
validate_url,
|
|
auth=requests.auth.HTTPBasicAuth(username, password),
|
|
json=request.get_json()
|
|
)
|
|
response.raise_for_status()
|
|
response_data = response.json()
|
|
if response_data.get("message") == "Account is valid and updated":
|
|
cache.delete_memoized(user_accounts, key_prefix=make_cache_key)
|
|
# Also clear regular cache for good measure
|
|
cache_key = f"view/{session['username']}/accounts"
|
|
cache.delete(cache_key)
|
|
# Run the NPM config update in a background thread
|
|
thread = threading.Thread(target=_update_npm_config_in_background, args=(session["auth_credentials"],))
|
|
thread.start()
|
|
return jsonify(response_data), response.status_code
|
|
except requests.exceptions.RequestException as e:
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@app.route("/get_stream_names", methods=["GET"])
|
|
def stream_names() -> Union[Response, str]:
|
|
"""Fetches and returns stream names as JSON."""
|
|
if not session.get("logged_in"):
|
|
return redirect(url_for("home"))
|
|
base_url = app.config["BACKEND_URL"]
|
|
return jsonify(get_stream_names(base_url, session["auth_credentials"]))
|
|
|
|
|
|
@app.route('/config')
|
|
def config():
|
|
"""Handles access to the configuration page."""
|
|
if session.get('user_id') and int(session.get('user_id')) == 1:
|
|
return redirect(url_for('config_dashboard'))
|
|
return redirect(url_for('home'))
|
|
|
|
@app.route('/config/dashboard')
|
|
def config_dashboard():
|
|
"""Renders the configuration dashboard."""
|
|
if not session.get('user_id') or int(session.get('user_id')) != 1:
|
|
return redirect(url_for('home'))
|
|
return render_template('config_dashboard.html')
|
|
|
|
@app.route('/check-expiring-accounts', methods=['POST'])
|
|
def check_expiring_accounts():
|
|
"""Proxies the request to check for expiring accounts to the backend."""
|
|
if not session.get('user_id') or int(session.get('user_id')) != 1:
|
|
return jsonify({'error': 'Unauthorized'}), 401
|
|
|
|
backend_url = f"{app.config['BACKEND_URL']}/check-expiry"
|
|
try:
|
|
response = requests.post(backend_url)
|
|
return Response(response.content, status=response.status_code, mimetype=response.headers['Content-Type'])
|
|
except requests.exceptions.RequestException as e:
|
|
return jsonify({"error": str(e)}), 502
|
|
|
|
|
|
@app.route('/dns', methods=['GET', 'POST', 'DELETE'])
|
|
def proxy_dns():
|
|
"""Proxies DNS management requests to the backend."""
|
|
if not session.get('user_id') or int(session.get('user_id')) != 1:
|
|
return jsonify({'error': 'Unauthorized'}), 401
|
|
|
|
backend_url = f"{app.config['BACKEND_URL']}/dns"
|
|
credentials = base64.b64decode(session["auth_credentials"]).decode()
|
|
username, password = credentials.split(":", 1)
|
|
auth = requests.auth.HTTPBasicAuth(username, password)
|
|
|
|
try:
|
|
if request.method == 'GET':
|
|
response = requests.get(backend_url, auth=auth)
|
|
elif request.method == 'POST':
|
|
response = requests.post(backend_url, auth=auth, json=request.get_json())
|
|
if response.ok:
|
|
cache.clear()
|
|
elif request.method == 'DELETE':
|
|
response = requests.delete(backend_url, auth=auth, json=request.get_json())
|
|
if response.ok:
|
|
cache.clear()
|
|
|
|
return Response(response.content, status=response.status_code, mimetype=response.headers['Content-Type'])
|
|
except requests.exceptions.RequestException as e:
|
|
return jsonify({"error": str(e)}), 502
|
|
|
|
|
|
@app.route('/extra_urls', methods=['GET', 'POST', 'DELETE'])
|
|
def proxy_extra_urls():
|
|
"""Proxies extra URL management requests to the backend."""
|
|
if not session.get('user_id') or int(session.get('user_id')) != 1:
|
|
return jsonify({'error': 'Unauthorized'}), 401
|
|
|
|
backend_url = f"{app.config['BACKEND_URL']}/extra_urls"
|
|
credentials = base64.b64decode(session["auth_credentials"]).decode()
|
|
username, password = credentials.split(":", 1)
|
|
auth = requests.auth.HTTPBasicAuth(username, password)
|
|
|
|
try:
|
|
if request.method == 'GET':
|
|
response = requests.get(backend_url, auth=auth)
|
|
elif request.method == 'POST':
|
|
response = requests.post(backend_url, auth=auth, json=request.get_json())
|
|
if response.ok:
|
|
cache.clear()
|
|
elif request.method == 'DELETE':
|
|
response = requests.delete(backend_url, auth=auth, json=request.get_json())
|
|
if response.ok:
|
|
cache.clear()
|
|
|
|
return Response(response.content, status=response.status_code, mimetype=response.headers['Content-Type'])
|
|
except requests.exceptions.RequestException as e:
|
|
return jsonify({"error": str(e)}), 502
|
|
|
|
|
|
class NginxProxyManager:
|
|
def __init__(self, host, email, password):
|
|
self.host = host
|
|
self.email = email
|
|
self.password = password
|
|
self.token = None
|
|
|
|
def login(self):
|
|
url = f"{self.host}/api/tokens"
|
|
payload = {
|
|
"identity": self.email,
|
|
"secret": self.password
|
|
}
|
|
headers = {
|
|
"Content-Type": "application/json"
|
|
}
|
|
response = requests.post(url, headers=headers, data=json.dumps(payload))
|
|
if response.status_code == 200:
|
|
self.token = response.json()["token"]
|
|
print("Login successful.")
|
|
else:
|
|
print(f"Failed to login: {response.text}")
|
|
exit(1)
|
|
|
|
def get_proxy_host(self, host_id):
|
|
if not self.token:
|
|
self.login()
|
|
|
|
url = f"{self.host}/api/nginx/proxy-hosts/{host_id}"
|
|
headers = {
|
|
"Authorization": f"Bearer {self.token}"
|
|
}
|
|
response = requests.get(url, headers=headers)
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
else:
|
|
print(f"Failed to get proxy host {host_id}: {response.text}")
|
|
return None
|
|
|
|
def update_proxy_host_config(self, host_id, config):
|
|
if not self.token:
|
|
self.login()
|
|
|
|
url = f"{self.host}/api/nginx/proxy-hosts/{host_id}"
|
|
|
|
original_host_data = self.get_proxy_host(host_id)
|
|
if not original_host_data:
|
|
return
|
|
|
|
# Construct a new payload with only the allowed fields for an update
|
|
update_payload = {
|
|
"domain_names": original_host_data.get("domain_names", []),
|
|
"forward_scheme": original_host_data.get("forward_scheme", "http"),
|
|
"forward_host": original_host_data.get("forward_host"),
|
|
"forward_port": original_host_data.get("forward_port"),
|
|
"access_list_id": original_host_data.get("access_list_id", 0),
|
|
"certificate_id": original_host_data.get("certificate_id", 0),
|
|
"ssl_forced": original_host_data.get("ssl_forced", False),
|
|
"hsts_enabled": original_host_data.get("hsts_enabled", False),
|
|
"hsts_subdomains": original_host_data.get("hsts_subdomains", False),
|
|
"http2_support": original_host_data.get("http2_support", False),
|
|
"block_exploits": original_host_data.get("block_exploits", False),
|
|
"caching_enabled": original_host_data.get("caching_enabled", False),
|
|
"allow_websocket_upgrade": original_host_data.get("allow_websocket_upgrade", False),
|
|
"advanced_config": config, # The updated advanced config
|
|
"meta": original_host_data.get("meta", {}),
|
|
"locations": original_host_data.get("locations", []),
|
|
}
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {self.token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
response = requests.put(url, headers=headers, data=json.dumps(update_payload))
|
|
if response.status_code == 200:
|
|
print(f"Successfully updated proxy host {host_id}")
|
|
else:
|
|
print(f"Failed to update proxy host {host_id}: {response.text}")
|
|
|
|
|
|
|
|
def update_config_with_streams(config, streams):
|
|
# Get all stream names from the database
|
|
db_stream_names = {stream['streamName'] for stream in streams}
|
|
|
|
# Find all location blocks in the config (more accurate pattern)
|
|
location_blocks = re.findall(r'location\s+~\s+\^/(\w+)\(\.\*\)\$\s*\{[^}]*return\s+302\s+([^;]+);[^}]*\}', config)
|
|
existing_stream_names = {stream_name for stream_name, _ in location_blocks}
|
|
|
|
# Remove location blocks that are not in the database
|
|
for stream_name in existing_stream_names:
|
|
if stream_name not in db_stream_names:
|
|
print(f"Removing location block for stream: {stream_name}")
|
|
pattern = re.compile(f'location\\s+~\\s+\\^/{re.escape(stream_name)}\\(\\.\\*\\)\\$\\s*\\{{[^}}]*return\\s+302\\s+[^;]+;[^}}]*\\}}\\s*', re.DOTALL)
|
|
config = pattern.sub('', config)
|
|
|
|
# Update existing stream URLs and create new ones
|
|
for stream in streams:
|
|
stream_name = stream['streamName']
|
|
stream_url = stream['streamURL']
|
|
if stream_url: # Ensure there is a URL to update to
|
|
# Check if location block already exists for this stream
|
|
if stream_name in existing_stream_names:
|
|
# Update existing location block
|
|
pattern = re.compile(f'(location\\s+~\\s+\\^/{re.escape(stream_name)}\\(\\.\\*\\)\\$\\s*\\{{[^}}]*return\\s+302\\s+)([^;]+)(;[^}}]*\\}})', re.DOTALL)
|
|
config = pattern.sub(f'\\1{stream_url}/$1$is_args$args\\3', config)
|
|
else:
|
|
# Create new location block for this stream
|
|
new_location_block = f'location ~ ^/{stream_name}(.*)$ {{\n return 302 {stream_url}/$1$is_args$args;\n}}\n'
|
|
# Add the new block at the end of the config (before the last closing brace)
|
|
config = config.rstrip() + '\n' + new_location_block
|
|
print(f"Created new location block for stream: {stream_name}")
|
|
|
|
return config
|
|
|
|
def _update_npm_config(credentials=None):
|
|
"""Helper function to update the NPM config."""
|
|
# Use provided credentials or get from session (for backward compatibility)
|
|
if credentials is None:
|
|
# This is for backward compatibility when called from background threads
|
|
# where session context might not be available
|
|
if not session.get('user_id') or int(session.get('user_id')) != 1:
|
|
print("Unauthorized attempt to update NPM config.")
|
|
return False
|
|
credentials = session.get("auth_credentials")
|
|
# If credentials are provided, we skip the session check as the user has already been authenticated
|
|
|
|
npm = NginxProxyManager(app.config['NPM_HOST'], app.config['NPM_EMAIL'], app.config['NPM_PASSWORD'])
|
|
npm.login()
|
|
host = npm.get_proxy_host(9)
|
|
if host:
|
|
current_config = host.get('advanced_config', '')
|
|
|
|
backend_url = f"{app.config['BACKEND_URL']}/get_all_stream_urls"
|
|
decoded_credentials = base64.b64decode(credentials).decode()
|
|
username, password = decoded_credentials.split(":", 1)
|
|
auth = requests.auth.HTTPBasicAuth(username, password)
|
|
|
|
try:
|
|
response = requests.get(backend_url, auth=auth)
|
|
response.raise_for_status()
|
|
streams = response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Failed to fetch streams from backend: {e}")
|
|
return False
|
|
|
|
if streams:
|
|
print("Current Config:")
|
|
print(current_config)
|
|
|
|
new_config = update_config_with_streams(current_config, streams)
|
|
print("\nNew Config:")
|
|
print(new_config)
|
|
|
|
npm.update_proxy_host_config(9, new_config)
|
|
print("\nNPM config updated successfully.")
|
|
return True
|
|
else:
|
|
print("Failed to update NPM config.")
|
|
return False
|
|
|
|
def _update_npm_config_in_background(credentials):
|
|
with app.app_context():
|
|
_update_npm_config(credentials)
|
|
|
|
@app.route('/update_host_9_config', methods=['POST'])
|
|
def update_host_9_config():
|
|
if not session.get('user_id') or int(session.get('user_id')) != 1:
|
|
return jsonify({'error': 'Unauthorized'}), 401
|
|
|
|
thread = threading.Thread(target=_update_npm_config_in_background, args=(session["auth_credentials"],))
|
|
thread.start()
|
|
|
|
return jsonify({'message': 'NPM config update started in the background. Check logs for status.'}), 202
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(
|
|
debug=app.config["DEBUG"],
|
|
host=app.config["HOST"],
|
|
port=app.config["PORT"]
|
|
)
|