38 lines
1.3 KiB
Python
38 lines
1.3 KiB
Python
import mysql.connector
|
|
from flask import current_app
|
|
|
|
def execute_query(query, params=None, fetch_one=False):
|
|
"""Execute a SQL query and optionally fetch results."""
|
|
try:
|
|
# Get database configuration from the current app context
|
|
db_config = {
|
|
"host": current_app.config['DBHOST'],
|
|
"user": current_app.config['DBUSER'],
|
|
"password": current_app.config['DBPASS'],
|
|
"database": current_app.config['DATABASE'],
|
|
}
|
|
|
|
# Establish database connection
|
|
connection = mysql.connector.connect(**db_config)
|
|
cursor = connection.cursor(dictionary=True)
|
|
|
|
# Execute the query with optional parameters
|
|
cursor.execute(query, params)
|
|
|
|
# Fetch results if it's a SELECT query
|
|
if query.strip().upper().startswith("SELECT"):
|
|
result = cursor.fetchone() if fetch_one else cursor.fetchall()
|
|
else:
|
|
# Commit changes for INSERT, UPDATE, DELETE
|
|
connection.commit()
|
|
result = cursor.rowcount # Number of affected rows
|
|
|
|
# Close the database connection
|
|
cursor.close()
|
|
connection.close()
|
|
|
|
return result
|
|
except mysql.connector.Error as err:
|
|
print("Error: ", err)
|
|
return None
|