Compare commits

...

3 Commits

Author SHA1 Message Date
ff5b8b4937 new encryption logic 2025-07-14 13:29:55 +01:00
aa1b9d7281 refactor(security): improve encryption using PyCryptodome and PBKDF2
Replace the `cryptography` library with `pycryptodome` for password encryption.

The previous implementation used AES-GCM with a static key derived from a
hardcoded secret. This change introduces a more robust security model by:
- Using PBKDF2 to derive the encryption key from the secret.
- Adding a unique, randomly generated salt for each encrypted password.

This significantly enhances security by protecting against rainbow table
and pre-computation attacks.

BREAKING CHANGE: The password encryption format has changed. All previously
encrypted passwords stored in the database are now invalid and will need
to be reset.
2025-07-14 11:55:13 +01:00
79a2f6e944 feat(security): implement AES-GCM for password encryption
Replaces the `pyeasyencrypt` library with a more robust and standard
encryption implementation using `cryptography.hazmat`.

This commit introduces AES-256-GCM for encrypting and decrypting user
account passwords. The `add_account` endpoint now properly encrypts
passwords before database insertion.

Error handling has been added to the `get_user_accounts` endpoint to
manage decryption failures for legacy passwords, which will be returned
as "DECRYPTION_FAILED".

BREAKING CHANGE: The password encryption algorithm has been changed.
All previously stored passwords are now invalid and cannot be decrypted.
2025-07-14 11:12:13 +01:00
2 changed files with 44 additions and 14 deletions

View File

@ -3,7 +3,7 @@ import mysql.connector
from dotenv import load_dotenv
from flask import jsonify, request
from ktvmanager.lib.checker import single_account_check
from ktvmanager.lib.encryption import decrypt_password
from ktvmanager.lib.encryption import encrypt_password, decrypt_password
load_dotenv()
@ -42,7 +42,12 @@ def get_user_accounts(user_id):
query = "SELECT * FROM userAccounts WHERE userID = %s"
accounts = _execute_query(query, (user_id,))
for account in accounts:
account['password'] = decrypt_password(account['password'])
try:
account['password'] = decrypt_password(account['password'])
except Exception as e:
# Log the error to the console for debugging
print(f"Password decryption failed for account ID {account.get('id', 'N/A')}: {e}")
account['password'] = "DECRYPTION_FAILED"
return jsonify(accounts)
def get_stream_names():
@ -64,8 +69,9 @@ def single_check():
def add_account():
data = request.get_json()
encrypted_password = encrypt_password(data['password'])
query = "INSERT INTO userAccounts (username, stream, streamURL, expiaryDate, password, userID) VALUES (%s, %s, %s, %s, %s, %s)"
params = (data['username'], data['stream'], data['streamURL'], data['expiaryDate'], data['password'], data['userID'])
params = (data['username'], data['stream'], data['streamURL'], data['expiaryDate'], encrypted_password, data['userID'])
result = _execute_query(query, params)
return jsonify(result)

View File

@ -1,17 +1,41 @@
from pyeasyencrypt.pyeasyencrypt import encrypt_string, decrypt_string
import os
from dotenv import load_dotenv
load_dotenv()
from Cryptodome.Cipher import AES
from Cryptodome.Protocol.KDF import PBKDF2
from Cryptodome.Random import get_random_bytes
# Use a new secret for the new encryption scheme
SECRET = "KTVM-NEW-ENCRYPTION-SECRET"
SALT_SIZE = 16
KEY_SIZE = 32
ITERATIONS = 100000
AUTH_TAG_LENGTH = 16
def encrypt_password(clear_string):
password = os.getenv("ENCRYPTKEY")
encrypted_string = encrypt_string(clear_string, password)
return encrypted_string
"""Encrypts a string using AES-GCM with a derived key."""
salt = get_random_bytes(SALT_SIZE)
# Derive a key from the master secret and a random salt
key = PBKDF2(SECRET, salt, dkLen=KEY_SIZE, count=ITERATIONS)
cipher = AES.new(key, AES.MODE_GCM)
# Encrypt the data
ciphertext, tag = cipher.encrypt_and_digest(clear_string.encode('utf-8'))
# Return a single hex-encoded string containing all parts
return (salt + cipher.nonce + tag + ciphertext).hex()
def decrypt_password(encrypted_string):
password = os.getenv("ENCRYPTKEY")
decrypted_string = decrypt_string(encrypted_string, password)
return decrypted_string
"""Decrypts a string encrypted with the above function."""
data = bytes.fromhex(encrypted_string)
# Extract the components from the encrypted string
salt = data[:SALT_SIZE]
nonce = data[SALT_SIZE:SALT_SIZE + 16]
tag = data[SALT_SIZE + 16:SALT_SIZE + 16 + AUTH_TAG_LENGTH]
ciphertext = data[SALT_SIZE + 16 + AUTH_TAG_LENGTH:]
# Re-derive the same key using the stored salt
key = PBKDF2(SECRET, salt, dkLen=KEY_SIZE, count=ITERATIONS)
cipher = AES.new(key, AES.MODE_GCM, nonce=nonce)
# Decrypt and verify the data
try:
decrypted_bytes = cipher.decrypt_and_verify(ciphertext, tag)
return decrypted_bytes.decode('utf-8')
except ValueError:
# This will be raised if the MAC check fails (tampered data or wrong key)
raise ValueError("Decryption failed: MAC check failed. The data may be corrupt or the key is incorrect.")