import os import sys from dotenv import load_dotenv import mysql.connector from datetime import datetime, timedelta from routes.api import send_notification from ktvmanager.lib.database import get_push_subscriptions, _execute_query # Add the project root to the Python path project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(project_root) from ktvmanager.lib.encryption import decrypt_password from ktvmanager.lib.checker import single_account_check from ktvmanager.lib.get_urls import get_latest_urls_from_dns from typing import List, Dict, Any from mysql.connector.connection import MySQLConnection def get_all_accounts(db_connection: MySQLConnection) -> List[Dict[str, Any]]: """Retrieves all user accounts from the database. Args: db_connection: An active MySQL database connection. Returns: A list of dictionaries, where each dictionary represents a user account. """ cursor = db_connection.cursor(dictionary=True) query = "SELECT * FROM userAccounts where userId = 1" cursor.execute(query) accounts = cursor.fetchall() cursor.close() return accounts def send_expiry_notifications() -> None: """ Sends notifications to users with accounts expiring in the next 30 days. """ now = datetime.now() thirty_days_later = now + timedelta(days=30) now_timestamp = int(now.timestamp()) thirty_days_later_timestamp = int(thirty_days_later.timestamp()) query = """ SELECT u.id as user_id, ua.username, ua.expiaryDate FROM users u JOIN userAccounts ua ON u.id = ua.userID WHERE ua.expiaryDate BETWEEN %s AND %s """ expiring_accounts = _execute_query(query, (now_timestamp, thirty_days_later_timestamp)) for account in expiring_accounts: user_id = account['user_id'] subscriptions = get_push_subscriptions(user_id) for sub in subscriptions: # Check if a notification has been sent recently last_notified_query = "SELECT last_notified FROM push_subscriptions WHERE id = %s" last_notified_result = _execute_query(last_notified_query, (sub['id'],)) last_notified = last_notified_result[0]['last_notified'] if last_notified_result and last_notified_result[0]['last_notified'] else None if last_notified and last_notified > now - timedelta(days=1): continue message = f"Your account {account['username']} is due to expire on {datetime.fromtimestamp(account['expiaryDate']).strftime('%d-%m-%Y')}." send_notification(sub['subscription_json'], message) # Update the last notified timestamp update_last_notified_query = "UPDATE push_subscriptions SET last_notified = %s WHERE id = %s" _execute_query(update_last_notified_query, (now, sub['id'])) def main() -> None: """ Checks the validity of all accounts in the database against available stream URLs. """ load_dotenv() db_connection = mysql.connector.connect( host=os.getenv("DBHOST"), user=os.getenv("DBUSER"), password=os.getenv("DBPASS"), database=os.getenv("DATABASE"), port=os.getenv("DBPORT"), ) accounts = get_all_accounts(db_connection) stream_urls = get_latest_urls_from_dns() for account in accounts: try: decrypted_password = decrypt_password(account["password"]) except Exception as e: print(f"Could not decrypt password for {account['username']}: {e}") continue account_data = { "username": account["username"], "password": decrypted_password, } result = single_account_check(account_data, stream_urls) if result: print( f"Account {account['username']} on stream {account['stream']} is VALID." ) else: print( f"Account {account['username']} on stream {account['stream']} is INVALID." ) db_connection.close() if __name__ == "__main__": main()