比较提交

..

没有共同的提交。898d737324fb33c777831c936198fd35819413ca 和 b274bf12d34de1f4776e9e02878ca5ccdc9f9bd8 的历史完全不同。

共有 3 个文件被更改,包括 56 次插入15 次删除

查看文件

@ -1,5 +1,5 @@
[tool.bumpversion] [tool.bumpversion]
current_version = "1.3.49" current_version = "1.3.48"
commit = true commit = true
tag = true tag = true
tag_name = "{new_version}" tag_name = "{new_version}"

查看文件

@ -1 +1 @@
1.3.49 1.3.48

67
app.py
查看文件

@ -9,6 +9,7 @@ from typing import Dict, Any, Tuple, Union
import sys import sys
import redis import redis
import json import json
from pywebpush import webpush, WebPushException
import mysql.connector import mysql.connector
from lib.datetime import filter_accounts_next_30_days, filter_accounts_expired from lib.datetime import filter_accounts_next_30_days, filter_accounts_expired
@ -134,25 +135,65 @@ def proxy_save_subscription():
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
return jsonify({"error": str(e)}), 502 return jsonify({"error": str(e)}), 502
def get_db_connection():
# This is a simplified version for demonstration.
# In a real application, you would use a connection pool.
return mysql.connector.connect(
host=app.config["DBHOST"],
user=app.config["DBUSER"],
password=app.config["DBPASS"],
database=app.config["DATABASE"],
port=app.config["DBPORT"],
)
def get_push_subscriptions():
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM push_subscriptions")
subscriptions = cursor.fetchall()
cursor.close()
conn.close()
return subscriptions
def send_notification(subscription_info, message_body):
try:
webpush(
subscription_info=subscription_info,
data=message_body,
vapid_private_key=app.config["VAPID_PRIVATE_KEY"],
vapid_claims={"sub": app.config["VAPID_CLAIM_EMAIL"]},
)
except WebPushException as ex:
print(f"Web push error: {ex}")
# You might want to remove the subscription if it's invalid
if ex.response and ex.response.status_code == 410:
print("Subscription is no longer valid, removing from DB.")
# Add logic to remove the subscription from your database
@app.route('/send-test-notification', methods=['POST']) @app.route('/send-test-notification', methods=['POST'])
def send_test_notification(): def send_test_notification():
"""Proxies the request to send a test notification to the backend.""" """Sends a test push notification to all users."""
if not session.get("logged_in"): if not session.get("logged_in"):
return jsonify({'error': 'Unauthorized'}), 401 return jsonify({'error': 'Unauthorized'}), 401
backend_url = f"{app.config['BASE_URL']}/send-test-notification"
credentials = base64.b64decode(session["auth_credentials"]).decode()
username, password = credentials.split(":", 1)
try: try:
response = requests.post( subscriptions = get_push_subscriptions()
backend_url, except Exception as e:
auth=requests.auth.HTTPBasicAuth(username, password), print(f"Error getting push subscriptions: {e}")
json=request.get_json() return jsonify({"error": "Could not retrieve push subscriptions from the database."}), 500
)
return Response(response.content, status=response.status_code, mimetype=response.headers['Content-Type']) if not subscriptions:
except requests.exceptions.RequestException as e: return jsonify({"message": "No push subscriptions found."}), 404
return jsonify({"error": str(e)}), 502
message_body = json.dumps({"title": "KTVManager", "body": "Ktv Test"})
for sub in subscriptions:
try:
send_notification(json.loads(sub['subscription_json']), message_body)
except Exception as e:
print(f"Error sending notification to subscription ID {sub.get('id', 'N/A')}: {e}")
return jsonify({"message": f"Test notification sent to {len(subscriptions)} subscription(s)."})
@app.route("/home") @app.route("/home")
@cache.cached(timeout=60, key_prefix=make_cache_key) @cache.cached(timeout=60, key_prefix=make_cache_key)