Compare commits

..

No commits in common. "master" and "1.2.8" have entirely different histories.

18 changed files with 67 additions and 575 deletions

View File

@ -1,5 +1,5 @@
[tool.bumpversion] [tool.bumpversion]
current_version = "1.3.8" current_version = "1.2.8"
commit = true commit = true
tag = true tag = true
tag_name = "{new_version}" tag_name = "{new_version}"

2
.vscode/launch.json vendored
View File

@ -17,7 +17,7 @@
"FLASK_APP": "ktvmanager.main:create_app", "FLASK_APP": "ktvmanager.main:create_app",
"FLASK_ENV": "development", "FLASK_ENV": "development",
"PYTHONPATH": "${workspaceFolder}", "PYTHONPATH": "${workspaceFolder}",
"FLASK_RUN_PORT": "5002" "FLASK_RUN_PORT": "5001"
}, },
"args": [ "args": [
"run", "run",

View File

@ -1 +1 @@
1.3.8 1.2.8

View File

@ -33,9 +33,6 @@ ENV FLASK_APP=ktvmanager.main:create_app
# Copy application code # Copy application code
COPY . . COPY . .
# Debugging step to verify file presence
RUN ls -la /app/ktvmanager/lib
# Handle versioning # Handle versioning
ARG VERSION ARG VERSION
RUN if [ -n "$VERSION" ]; then echo $VERSION > VERSION; fi RUN if [ -n "$VERSION" ]; then echo $VERSION > VERSION; fi

View File

@ -1,21 +0,0 @@
from py_vapid import Vapid
import os
vapid = Vapid()
vapid.generate_keys()
from cryptography.hazmat.primitives import serialization
private_key = vapid.private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
).decode('utf-8')
public_key = vapid.public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode('utf-8')
print(f"VAPID_PRIVATE_KEY='{private_key}'")
print(f"VAPID_PUBLIC_KEY='{public_key}'")

View File

@ -1,72 +0,0 @@
location ~ ^/Mongoose(.*)$ {
return 302 http://m3u.sstv.one:81/$1$is_args$args;
}
location ~ ^/Blazin(.*)$ {
return 302 http://blazin.dns-cloud.net:8080/$1$is_args$args;
}
location ~ ^/Insanity(.*)$ {
return 302 https://biglicks.win:443/$1$is_args$args;
}
location ~ ^/Badger(.*)$ {
return 302 http://hurricanetv.kiev.ua:80/$1$is_args$args;
}
location ~ ^/Gunslinger(.*)$ {
return 302 http://jabawalkies.club:8080/$1$is_args$args;
}
location ~ ^/KDB(.*)$ {
return 302 http://finger-ya-bum-hole.site/$1$is_args$args;
}
location ~ ^/Graphite(.*)$ {
return 302 http://sarahgraphite.liveme.vip:80/$1$is_args$args;
}
location ~ ^/old-Premium(.*)$ {
return 302 https://kwikfitfitter.life:443/$1$is_args$args;
}
location ~ ^/Gold(.*)$ {
return 302 http://server1.elitehosting.gq:8090/$1$is_args$args;
}
location ~ ^/Bravado(.*)$ {
return 302 http://le.thund.re/$1$is_args$args;
}
location ~ ^/Titan(.*)$ {
return 302 http://maximumorg.xyz:80/$1$is_args$args;
}
location ~ ^/Wolfie(.*)$ {
return 302 http://deviltv.fun:8080/$1$is_args$args;
}
location ~ ^/DiamondBack(.*)$ {
return 302 http://pro-media.live:2052/$1$is_args$args;
}
location ~ ^/Halo(.*)$ {
return 302 http://i-like-turtles.org:8080/$1$is_args$args;
}
location ~ ^/Nitro(.*)$ {
return 302 http://mr-beans-streams.xyz$1$is_args$args;
}
location ~ ^/Insanity(.*)$ {
return 302 https://biglicks.win:443/$1$is_args$args;
}
location ~ ^/Bonsai(.*)$ {
return 302 http://crazyservertimes.pro/$1$is_args$args;
}
location ~ ^/New-Prem(.*)$ {
return 302 http://hello.exodus-2.xyz:8080/$1$is_args$args;
}
location ~ ^/Crystal(.*)$ {
return 302 https://line.ottcst.com/$1$is_args$args;
}
location ~ ^/VIP(.*)$ {
return 302 https://1visions.co.uk:443/$1$is_args$args;
}
location ~ ^/WILD(.*)$ {
return 302 http://wildversion.com:8080/$1$is_args$args;
}
location ~ ^/STEST(.*)$ {
return 302 http://notwhatyourlookingfor.ru/$1$is_args$args;
}
location ~ ^/SPARE(.*)$ {
return 302 http://moontv.co.uk/$1$is_args$args;
}
location ~ ^/QUARTZ(.*)$ {
return 302 http://anyholeisagoal.ru/$1$is_args$args;
}

View File

@ -3,7 +3,7 @@ import sys
from dotenv import load_dotenv from dotenv import load_dotenv
import mysql.connector import mysql.connector
from datetime import datetime, timedelta from datetime import datetime, timedelta
from ktvmanager.lib.notifications import send_notification from routes.api import send_notification
from ktvmanager.lib.database import get_push_subscriptions, _execute_query from ktvmanager.lib.database import get_push_subscriptions, _execute_query
# Add the project root to the Python path # Add the project root to the Python path
@ -33,50 +33,41 @@ def get_all_accounts(db_connection: MySQLConnection) -> List[Dict[str, Any]]:
return accounts return accounts
def send_expiry_notifications(app) -> None: def send_expiry_notifications() -> None:
""" """
Sends notifications to users with accounts expiring in the next 30 days. Sends notifications to users with accounts expiring in the next 30 days.
""" """
with app.app_context(): now = datetime.now()
now = datetime.now() thirty_days_later = now + timedelta(days=30)
thirty_days_later = now + timedelta(days=30) now_timestamp = int(now.timestamp())
now_timestamp = int(now.timestamp()) thirty_days_later_timestamp = int(thirty_days_later.timestamp())
thirty_days_later_timestamp = int(thirty_days_later.timestamp())
query = """ query = """
SELECT u.id as user_id, ua.username, ua.expiaryDate SELECT u.id as user_id, ua.username, ua.expiaryDate
FROM users u FROM users u
JOIN userAccounts ua ON u.id = ua.userID JOIN userAccounts ua ON u.id = ua.userID
WHERE ua.expiaryDate BETWEEN %s AND %s WHERE ua.expiaryDate BETWEEN %s AND %s
""" """
expiring_accounts = _execute_query(query, (now_timestamp, thirty_days_later_timestamp)) expiring_accounts = _execute_query(query, (now_timestamp, thirty_days_later_timestamp))
for account in expiring_accounts: for account in expiring_accounts:
expiry_date = datetime.fromtimestamp(account['expiaryDate']) user_id = account['user_id']
days_to_expiry = (expiry_date.date() - now.date()).days subscriptions = get_push_subscriptions(user_id)
for sub in subscriptions:
# Check if a notification has been sent recently
last_notified_query = "SELECT last_notified FROM push_subscriptions WHERE id = %s"
last_notified_result = _execute_query(last_notified_query, (sub['id'],))
last_notified = last_notified_result[0]['last_notified'] if last_notified_result and last_notified_result[0]['last_notified'] else None
if days_to_expiry == 30 or days_to_expiry == 7: if last_notified and last_notified > now - timedelta(days=1):
print(f"Found expiring account: {account['username']}") continue
user_id = account['user_id']
subscriptions = get_push_subscriptions(user_id)
for sub in subscriptions:
# Check if a notification has been sent recently
last_notified_query = "SELECT last_notified FROM push_subscriptions WHERE id = %s"
last_notified_result = _execute_query(last_notified_query, (sub['id'],))
last_notified = last_notified_result[0]['last_notified'] if last_notified_result and last_notified_result[0]['last_notified'] else None
if last_notified and last_notified.date() == now.date(): message = f"Your account {account['username']} is due to expire on {datetime.fromtimestamp(account['expiaryDate']).strftime('%d-%m-%Y')}."
continue send_notification(sub['subscription_json'], message)
message = { # Update the last notified timestamp
"title": "Account Expiry Warning", update_last_notified_query = "UPDATE push_subscriptions SET last_notified = %s WHERE id = %s"
"body": f"Your account {account['username']} is due to expire in {days_to_expiry} days." _execute_query(update_last_notified_query, (now, sub['id']))
}
send_notification(sub['subscription_json'], json.dumps(message))
# Update the last notified timestamp
update_last_notified_query = "UPDATE push_subscriptions SET last_notified = %s WHERE id = %s"
_execute_query(update_last_notified_query, (now, sub['id']))
def main() -> None: def main() -> None:

View File

@ -14,12 +14,15 @@ class Config:
DATABASE = os.getenv("DATABASE") DATABASE = os.getenv("DATABASE")
DBPORT = os.getenv("DBPORT") DBPORT = os.getenv("DBPORT")
STREAM_URLS = ["http://example.com", "http://example.org"] STREAM_URLS = ["http://example.com", "http://example.org"]
VAPID_PRIVATE_KEY = "MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQg6vkDnUOnpMUZ+DAvgEge20aPDmffv1rYTADnaNP5NvGhRANCAATZvXvlV0QyvzvgOdsEMSt07n5qgbBnICQ0s1x364rGswAcVVJuu8q5XgZQrBLk/lkhQBcyyuuAjc4OvJLADqEk" VAPID_PRIVATE_KEY = """-----BEGIN PRIVATE KEY-----
MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQg6vkDnUOnpMUZ+DAv
gEge20aPDmffv1rYTADnaNP5NvGhRANCAATZvXvlV0QyvzvgOdsEMSt07n5qgbBn
ICQ0s1x364rGswAcVVJuu8q5XgZQrBLk/lkhQBcyyuuAjc4OvJLADqEk
-----END PRIVATE KEY-----"""
VAPID_PUBLIC_KEY = """-----BEGIN PUBLIC KEY----- VAPID_PUBLIC_KEY = """-----BEGIN PUBLIC KEY-----
MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE2b175VdEMr874DnbBDErdO5+aoGw MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE2b175VdEMr874DnbBDErdO5+aoGw
ZyAkNLNcd+uKxrMAHFVSbrvKuV4GUKwS5P5ZIUAXMsrrgI3ODrySwA6hJA== ZyAkNLNcd+uKxrMAHFVSbrvKuV4GUKwS5P5ZIUAXMsrrgI3ODrySwA6hJA==
-----END PUBLIC KEY-----""" -----END PUBLIC KEY-----"""
VAPID_CLAIM_EMAIL = "mailto:karl@k-world.me.uk" # Replace with your email
SECRET_KEY = "a_very_secret_key" SECRET_KEY = "a_very_secret_key"
class DevelopmentConfig(Config): class DevelopmentConfig(Config):

View File

@ -1,7 +1,6 @@
from functools import wraps from functools import wraps
from flask import request, jsonify, Blueprint, Response from flask import request, jsonify, Blueprint, Response
from typing import Callable, Any, Tuple, Dict from typing import Callable, Any, Tuple, Dict
from .database import get_user_id_from_username
auth_blueprint = Blueprint("auth", __name__) auth_blueprint = Blueprint("auth", __name__)
@ -58,5 +57,4 @@ def check_login(username: str, password: str) -> Response:
Returns: Returns:
A Flask JSON response indicating success. A Flask JSON response indicating success.
""" """
user_id = get_user_id_from_username(username) return jsonify({"auth": "Success"})
return jsonify({"auth": "Success", "user_id": user_id, "username": username})

View File

@ -138,16 +138,6 @@ def validate_account() -> Tuple[Response, int]:
200, 200,
) )
# Check if account is expired
exp_date_str = result["data"]["user_info"]["exp_date"]
if exp_date_str:
from datetime import datetime, timezone
exp_date = datetime.fromtimestamp(int(exp_date_str), tz=timezone.utc)
current_date = datetime.now(timezone.utc)
if current_date > exp_date:
return jsonify({"message": "Account is expired", "data": result}), 401
return jsonify({"message": "Account is valid", "data": result}), 200 return jsonify({"message": "Account is valid", "data": result}), 200
else: else:
return jsonify({"message": "Account is invalid"}), 401 return jsonify({"message": "Account is invalid"}), 401

View File

@ -114,17 +114,6 @@ def get_stream_names() -> Response:
return jsonify(stream_names) return jsonify(stream_names)
def get_all_stream_urls() -> Response:
"""Retrieves all stream names and URLs from the database.
Returns:
A Flask JSON response containing a list of stream names and URLs.
"""
query = "SELECT DISTINCT SUBSTRING_INDEX(stream, ' ', 1) AS streamName, streamURL FROM userAccounts"
results = _execute_query(query)
return jsonify(results)
def single_check() -> Response | Tuple[Response, int]: def single_check() -> Response | Tuple[Response, int]:
""" """
Performs a check on a single account provided in the request JSON. Performs a check on a single account provided in the request JSON.
@ -233,29 +222,14 @@ def save_push_subscription(user_id: int, subscription_json: str) -> None:
_execute_query(query, params) _execute_query(query, params)
def get_push_subscriptions(user_id: Optional[int] = None) -> List[Dict[str, Any]]: def get_push_subscriptions(user_id: int) -> List[Dict[str, Any]]:
"""Retrieves all push subscriptions for a given user ID, or all if no user_id is provided. """Retrieves all push subscriptions for a given user ID.
Args: Args:
user_id: The ID of the user (optional). user_id: The ID of the user.
Returns: Returns:
A list of push subscriptions. A list of push subscriptions.
""" """
if user_id: query = "SELECT * FROM push_subscriptions WHERE user_id = %s"
query = "SELECT * FROM push_subscriptions WHERE user_id = %s" return _execute_query(query, (user_id,))
return _execute_query(query, (user_id,))
else:
query = "SELECT * FROM push_subscriptions"
return _execute_query(query)
def delete_push_subscription(subscription_json: str) -> None:
"""Deletes a push subscription from the database.
Args:
subscription_json: The push subscription information as a JSON string.
"""
query = "DELETE FROM push_subscriptions WHERE subscription_json = %s"
params = (subscription_json,)
_execute_query(query, params)

View File

@ -1,19 +0,0 @@
import json
from flask import current_app
from pywebpush import webpush, WebPushException
from ktvmanager.lib.database import delete_push_subscription
def send_notification(subscription_json, message_body):
try:
subscription_info = json.loads(subscription_json)
webpush(
subscription_info=subscription_info,
data=message_body,
vapid_private_key=current_app.config["VAPID_PRIVATE_KEY"],
vapid_claims={"sub": current_app.config["VAPID_CLAIM_EMAIL"]},
)
except WebPushException as ex:
print(f"Web push error: {ex}")
if ex.response and ex.response.status_code == 410:
print("Subscription is no longer valid, removing from DB.")
delete_push_subscription(subscription_json)

View File

@ -1,18 +1,12 @@
import os import os
from flask import Flask, jsonify from flask import Flask, jsonify
from flask_cors import CORS
from dotenv import load_dotenv from dotenv import load_dotenv
from ktvmanager.config import DevelopmentConfig, ProductionConfig from ktvmanager.config import DevelopmentConfig, ProductionConfig
from routes.api import api_blueprint from routes.api import api_blueprint
from routes.dns import dns_bp
from routes.extra_urls import extra_urls_bp
from ktvmanager.lib.database import initialize_db_pool from ktvmanager.lib.database import initialize_db_pool
from ktvmanager.account_checker import send_expiry_notifications
from apscheduler.schedulers.background import BackgroundScheduler
def create_app(): def create_app():
app = Flask(__name__) app = Flask(__name__)
CORS(app)
load_dotenv() load_dotenv()
if os.environ.get("FLASK_ENV") == "production": if os.environ.get("FLASK_ENV") == "production":
@ -23,22 +17,8 @@ def create_app():
with app.app_context(): with app.app_context():
initialize_db_pool() initialize_db_pool()
# Schedule the daily account check
scheduler = BackgroundScheduler()
# Pass the app instance to the job
scheduler.add_job(func=lambda: send_expiry_notifications(app), trigger="cron", hour=10, minute=0)
scheduler.start()
# Register blueprints # Register blueprints
app.register_blueprint(api_blueprint) app.register_blueprint(api_blueprint)
app.register_blueprint(dns_bp)
app.register_blueprint(extra_urls_bp)
@app.route('/check-expiry', methods=['POST'])
def check_expiry():
"""Manually triggers the expiry notification check."""
send_expiry_notifications(app)
return jsonify({"message": "Expiry notification check triggered."})
# Error handlers # Error handlers
@app.errorhandler(404) @app.errorhandler(404)

View File

@ -1,157 +0,0 @@
import requests
import json
import argparse
import mysql.connector
import re
import os
from dotenv import load_dotenv
load_dotenv()
class NginxProxyManager:
def __init__(self, host, email, password):
self.host = host
self.email = email
self.password = password
self.token = None
def login(self):
url = f"{self.host}/api/tokens"
payload = {
"identity": self.email,
"secret": self.password
}
headers = {
"Content-Type": "application/json"
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
if response.status_code == 200:
self.token = response.json()["token"]
print("Login successful.")
else:
print(f"Failed to login: {response.text}")
exit(1)
def get_proxy_hosts(self):
if not self.token:
self.login()
url = f"{self.host}/api/nginx/proxy-hosts"
headers = {
"Authorization": f"Bearer {self.token}"
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json()
else:
print(f"Failed to get proxy hosts: {response.text}")
return []
def get_proxy_host(self, host_id):
if not self.token:
self.login()
url = f"{self.host}/api/nginx/proxy-hosts/{host_id}"
headers = {
"Authorization": f"Bearer {self.token}"
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json()
else:
print(f"Failed to get proxy host {host_id}: {response.text}")
return None
def update_proxy_host_config(self, host_id, config):
if not self.token:
self.login()
url = f"{self.host}/api/nginx/proxy-hosts/{host_id}"
payload = {
"advanced_config": config
}
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
response = requests.put(url, headers=headers, data=json.dumps(payload))
if response.status_code == 200:
print(f"Successfully updated proxy host {host_id}")
else:
print(f"Failed to update proxy host {host_id}: {response.text}")
def get_streams_from_db(db_host, db_user, db_pass, db_name, db_port):
try:
conn = mysql.connector.connect(
host=db_host,
user=db_user,
password=db_pass,
database=db_name,
port=db_port
)
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT streamName, streamURL FROM streams")
streams = cursor.fetchall()
cursor.close()
conn.close()
return streams
except mysql.connector.Error as err:
print(f"Error connecting to database: {err}")
return []
def update_config_with_streams(config, streams):
for stream in streams:
stream_name = stream['streamName']
stream_url = stream['streamURL']
# Use a more specific regex to avoid replacing parts of other URLs
pattern = re.compile(f'(location ~ \^/{re.escape(stream_name)}\(\.\*\)\$ {{\s*return 302 )([^;]+)(;\\s*}})')
config = pattern.sub(f'\\1{stream_url}/$1$is_args$args\\3', config)
return config
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Modify Nginx Proxy Manager custom configuration.")
parser.add_argument("--list-hosts", action="store_true", help="List all proxy hosts")
parser.add_argument("--host-id", type=int, help="The ID of the proxy host to modify")
parser.add_argument("--config-file", type=str, help="Path to the file containing the new advanced configuration")
parser.add_argument("--download-config", type=str, help="Path to save the current advanced configuration")
parser.add_argument("--update-from-db", action="store_true", help="Update the configuration from the database")
args = parser.parse_args()
npm_host = os.getenv("NPM_HOST")
npm_email = os.getenv("NPM_EMAIL")
npm_password = os.getenv("NPM_PASSWORD")
db_host = os.getenv("DBHOST")
db_user = os.getenv("DBUSER")
db_pass = os.getenv("DBPASS")
db_name = os.getenv("DATABASE")
db_port = os.getenv("DBPORT")
npm = NginxProxyManager(npm_host, npm_email, npm_password)
npm.login()
if args.list_hosts:
hosts = npm.get_proxy_hosts()
for host in hosts:
print(f"ID: {host['id']}, Domains: {', '.join(host['domain_names'])}")
if args.host_id and args.download_config:
host = npm.get_proxy_host(args.host_id)
if host:
with open(args.download_config, 'w') as f:
f.write(host.get('advanced_config', ''))
print(f"Configuration for host {args.host_id} downloaded to {args.download_config}")
if args.host_id and args.config_file:
with open(args.config_file, 'r') as f:
config = f.read()
npm.update_proxy_host_config(args.host_id, config)
if args.host_id and args.update_from_db:
host = npm.get_proxy_host(args.host_id)
if host:
current_config = host.get('advanced_config', '')
streams = get_streams_from_db(db_host, db_user, db_pass, db_name, db_port)
if streams:
new_config = update_config_with_streams(current_config, streams)
npm.update_proxy_host_config(args.host_id, new_config)

View File

@ -38,6 +38,4 @@ mysql-connector-python
python-dotenv python-dotenv
python-dotenv python-dotenv
pywebpush==1.13.0 pywebpush==1.13.0
stem==1.8.2 stem==1.8.2
APScheduler==3.10.4
Flask-Cors==4.0.1

View File

@ -8,7 +8,6 @@ from ktvmanager.lib.database import (
get_user_id_from_username, get_user_id_from_username,
save_push_subscription, save_push_subscription,
get_push_subscriptions, get_push_subscriptions,
get_all_stream_urls,
) )
from ktvmanager.lib.get_urls import get_latest_urls_from_dns from ktvmanager.lib.get_urls import get_latest_urls_from_dns
from ktvmanager.lib.auth import requires_basic_auth, check_login from ktvmanager.lib.auth import requires_basic_auth, check_login
@ -16,10 +15,7 @@ from ktvmanager.lib.checker import validate_account
from typing import Tuple from typing import Tuple
import json import json
import re import re
import base64 from pywebpush import webpush, WebPushException
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
from ktvmanager.lib.notifications import send_notification
api_blueprint = Blueprint("api", __name__) api_blueprint = Blueprint("api", __name__)
@ -72,21 +68,6 @@ def get_user_accounts_streams_route(username: str, password: str) -> Response:
return jsonify(get_latest_urls_from_dns()) return jsonify(get_latest_urls_from_dns())
@api_blueprint.route("/get_all_stream_urls")
@requires_basic_auth
def get_all_stream_urls_route(username: str, password: str) -> Response:
"""Retrieves all stream names and URLs.
Args:
username: The username of the user.
password: The password of the user (used for authentication).
Returns:
A Flask JSON response containing the list of stream names and URLs.
"""
return get_all_stream_urls()
@api_blueprint.route("/singleCheck", methods=["POST"]) @api_blueprint.route("/singleCheck", methods=["POST"])
@requires_basic_auth @requires_basic_auth
def single_check_route(username: str, password: str) -> Response: def single_check_route(username: str, password: str) -> Response:
@ -168,24 +149,14 @@ def login_route(username: str, password: str) -> Response:
def vapid_public_key(): def vapid_public_key():
"""Provides the VAPID public key in the correct format.""" """Provides the VAPID public key in the correct format."""
pem_key = current_app.config["VAPID_PUBLIC_KEY"] pem_key = current_app.config["VAPID_PUBLIC_KEY"]
try: # Use regex to robustly extract the base64 content from the PEM key
public_key = serialization.load_pem_public_key(pem_key.encode("utf-8")) match = re.search(r"-----BEGIN PUBLIC KEY-----(.*)-----END PUBLIC KEY-----", pem_key, re.DOTALL)
if not isinstance(public_key, ec.EllipticCurvePublicKey): if not match:
raise TypeError("VAPID public key is not an Elliptic Curve key") return jsonify({"error": "Could not parse VAPID public key from config"}), 500
# Get the raw, uncompressed public key bytes (65 bytes for P-256) # Join the split lines to remove all whitespace and newlines
raw_key = public_key.public_bytes( base64_key = "".join(match.group(1).split())
encoding=serialization.Encoding.X962, return jsonify({"public_key": base64_key})
format=serialization.PublicFormat.UncompressedPoint
)
# URL-safe base64 encode the raw key
url_safe_key = base64.urlsafe_b64encode(raw_key).rstrip(b'=').decode('utf-8')
return jsonify({"public_key": url_safe_key})
except (ValueError, TypeError, AttributeError) as e:
current_app.logger.error(f"Error processing VAPID public key: {e}")
return jsonify({"error": "Could not process VAPID public key"}), 500
@api_blueprint.route("/save-subscription", methods=["POST"]) @api_blueprint.route("/save-subscription", methods=["POST"])
@ -212,6 +183,20 @@ def save_subscription(username: str, password: str) -> Response:
return jsonify({"message": "Subscription saved."}) return jsonify({"message": "Subscription saved."})
def send_notification(subscription_info, message_body):
try:
webpush(
subscription_info=subscription_info,
data=message_body,
vapid_private_key=current_app.config["VAPID_PRIVATE_KEY"],
vapid_claims={"sub": "mailto:your-email@example.com"},
)
except WebPushException as ex:
print(f"Web push error: {ex}")
# You might want to remove the subscription if it's invalid
if ex.response and ex.response.status_code == 410:
print("Subscription is no longer valid, removing from DB.")
# Add logic to remove the subscription from your database
@api_blueprint.route("/send-expiry-notifications", methods=["POST"]) @api_blueprint.route("/send-expiry-notifications", methods=["POST"])
@ -228,39 +213,4 @@ def send_expiry_notifications_route(username: str, password: str) -> Response:
""" """
from ktvmanager.account_checker import send_expiry_notifications from ktvmanager.account_checker import send_expiry_notifications
send_expiry_notifications() send_expiry_notifications()
return jsonify({"message": "Expiry notifications sent."}) return jsonify({"message": "Expiry notifications sent."})
@api_blueprint.route("/send-test-notification", methods=["POST"])
@requires_basic_auth
def send_test_notification_route(username: str, password: str) -> Response:
"""Sends a test push notification to all users."""
data = request.get_json(silent=True)
message = data.get("message", "Ktv Test") if data else "Ktv Test"
try:
subscriptions = get_push_subscriptions() # Get all subscriptions
except Exception as e:
print(f"Error getting push subscriptions: {e}")
return jsonify({"error": "Could not retrieve push subscriptions from the database."}), 500
if not subscriptions:
return jsonify({"message": "No push subscriptions found."}), 404
message_body = json.dumps({"title": "KTVManager", "body": message})
success_count = 0
failure_count = 0
for sub in subscriptions:
try:
send_notification(sub['subscription_json'], message_body)
success_count += 1
except Exception as e:
print(f"Error sending notification to subscription ID {sub.get('id', 'N/A')}: {e}")
failure_count += 1
return jsonify({
"message": f"Test notification sending process completed.",
"sent": success_count,
"failed": failure_count
})

View File

@ -1,60 +0,0 @@
from flask import Blueprint, request, jsonify
import os
dns_bp = Blueprint('dns', __name__)
DNS_FILE = os.path.join(os.path.dirname(__file__), '..', 'ktvmanager', 'lib', 'DNS_list.txt')
def read_dns_list():
if not os.path.exists(DNS_FILE):
return []
with open(DNS_FILE, 'r') as f:
return [line.strip() for line in f.readlines() if line.strip()]
def write_dns_list(dns_list):
with open(DNS_FILE, 'w') as f:
for item in dns_list:
f.write(f"{item}\n")
@dns_bp.route('/dns', methods=['GET'])
def get_dns_list():
"""Gets the list of DNS entries."""
return jsonify(read_dns_list())
@dns_bp.route('/dns', methods=['POST'])
def add_dns():
"""Adds a new DNS entry."""
data = request.get_json()
if not data or 'dns_entry' not in data:
return jsonify({'error': 'Missing dns_entry in request body'}), 400
dns_entry = data.get('dns_entry')
if not dns_entry:
return jsonify({'error': 'DNS entry cannot be empty.'}), 400
dns_list = read_dns_list()
if dns_entry in dns_list:
return jsonify({'message': 'DNS entry already exists.'}), 200
dns_list.append(dns_entry)
write_dns_list(dns_list)
return jsonify({'message': 'DNS entry added successfully.'}), 201
@dns_bp.route('/dns', methods=['DELETE'])
def remove_dns():
"""Removes a DNS entry."""
data = request.get_json()
if not data or 'dns_entry' not in data:
return jsonify({'error': 'Missing dns_entry in request body'}), 400
dns_entry = data.get('dns_entry')
if not dns_entry:
return jsonify({'error': 'DNS entry cannot be empty.'}), 400
dns_list = read_dns_list()
if dns_entry not in dns_list:
return jsonify({'error': 'DNS entry not found.'}), 404
dns_list.remove(dns_entry)
write_dns_list(dns_list)
return jsonify({'message': 'DNS entry removed successfully.'}), 200

View File

@ -1,60 +0,0 @@
from flask import Blueprint, request, jsonify
import os
extra_urls_bp = Blueprint('extra_urls', __name__)
EXTRA_URLS_FILE = os.path.join(os.path.dirname(__file__), '..', 'ktvmanager', 'lib', 'extra_urls.txt')
def read_extra_urls_list():
if not os.path.exists(EXTRA_URLS_FILE):
return []
with open(EXTRA_URLS_FILE, 'r') as f:
return [line.strip() for line in f.readlines() if line.strip()]
def write_extra_urls_list(extra_urls_list):
with open(EXTRA_URLS_FILE, 'w') as f:
for item in extra_urls_list:
f.write(f"{item}\n")
@extra_urls_bp.route('/extra_urls', methods=['GET'])
def get_extra_urls_list():
"""Gets the list of extra URLs."""
return jsonify(read_extra_urls_list())
@extra_urls_bp.route('/extra_urls', methods=['POST'])
def add_extra_url():
"""Adds a new extra URL."""
data = request.get_json()
if not data or 'extra_url' not in data:
return jsonify({'error': 'Missing extra_url in request body'}), 400
extra_url = data.get('extra_url')
if not extra_url:
return jsonify({'error': 'Extra URL cannot be empty.'}), 400
extra_urls_list = read_extra_urls_list()
if extra_url in extra_urls_list:
return jsonify({'message': 'Extra URL already exists.'}), 200
extra_urls_list.append(extra_url)
write_extra_urls_list(extra_urls_list)
return jsonify({'message': 'Extra URL added successfully.'}), 201
@extra_urls_bp.route('/extra_urls', methods=['DELETE'])
def remove_extra_url():
"""Removes an extra URL."""
data = request.get_json()
if not data or 'extra_url' not in data:
return jsonify({'error': 'Missing extra_url in request body'}), 400
extra_url = data.get('extra_url')
if not extra_url:
return jsonify({'error': 'Extra URL cannot be empty.'}), 400
extra_urls_list = read_extra_urls_list()
if extra_url not in extra_urls_list:
return jsonify({'error': 'Extra URL not found.'}), 404
extra_urls_list.remove(extra_url)
write_extra_urls_list(extra_urls_list)
return jsonify({'message': 'Extra URL removed successfully.'}), 200