Karl 79a2f6e944 feat(security): implement AES-GCM for password encryption
Replaces the `pyeasyencrypt` library with a more robust and standard
encryption implementation using `cryptography.hazmat`.

This commit introduces AES-256-GCM for encrypting and decrypting user
account passwords. The `add_account` endpoint now properly encrypts
passwords before database insertion.

Error handling has been added to the `get_user_accounts` endpoint to
manage decryption failures for legacy passwords, which will be returned
as "DECRYPTION_FAILED".

BREAKING CHANGE: The password encryption algorithm has been changed.
All previously stored passwords are now invalid and cannot be decrypted.
2025-07-14 11:12:13 +01:00

84 lines
3.0 KiB
Python

import os
import mysql.connector
from dotenv import load_dotenv
from flask import jsonify, request
from ktvmanager.lib.checker import single_account_check
from ktvmanager.lib.encryption import encrypt_password, decrypt_password
load_dotenv()
def _create_connection():
return mysql.connector.connect(
host=os.getenv("DBHOST"),
user=os.getenv("DBUSER"),
password=os.getenv("DBPASS"),
database=os.getenv("DATABASE"),
port=os.getenv("DBPORT")
)
def _execute_query(query, params=None):
conn = _create_connection()
cursor = conn.cursor(dictionary=True)
try:
cursor.execute(query, params)
if query.strip().upper().startswith("SELECT"):
result = cursor.fetchall()
else:
conn.commit()
result = {"affected_rows": cursor.rowcount}
return result
finally:
cursor.close()
conn.close()
def get_user_id_from_username(username):
query = "SELECT id FROM users WHERE username = %s"
result = _execute_query(query, (username,))
if result:
return result[0]['id']
return None
def get_user_accounts(user_id):
query = "SELECT * FROM userAccounts WHERE userID = %s"
accounts = _execute_query(query, (user_id,))
for account in accounts:
try:
account['password'] = decrypt_password(account['password'])
except Exception as e:
# Log the error to the console for debugging
print(f"Password decryption failed for account ID {account.get('id', 'N/A')}: {e}")
account['password'] = "DECRYPTION_FAILED"
return jsonify(accounts)
def get_stream_names():
query = "SELECT streamName FROM streams"
results = _execute_query(query)
stream_names = [row['streamName'] for row in results]
return jsonify(stream_names)
def single_check():
data = request.get_json()
# This is a placeholder for getting stream URLs. In a real application,
# this would likely come from a database query or a configuration file.
stream_urls = ["http://example.com", "http://example.org"]
result = single_account_check(data, stream_urls)
if result:
# Here you would typically update the database with the new information
return jsonify(result)
return jsonify({"message": "All checks failed"}), 400
def add_account():
data = request.get_json()
encrypted_password = encrypt_password(data['password'])
query = "INSERT INTO userAccounts (username, stream, streamURL, expiaryDate, password, userID) VALUES (%s, %s, %s, %s, %s, %s)"
params = (data['username'], data['stream'], data['streamURL'], data['expiaryDate'], encrypted_password, data['userID'])
result = _execute_query(query, params)
return jsonify(result)
def delete_account():
data = request.get_json()
query = "DELETE FROM userAccounts WHERE id = %s"
params = (data['id'],)
result = _execute_query(query, params)
return jsonify(result)