292 lines
10 KiB
Python
Raw Normal View History

2025-03-28 18:37:58 +00:00
import random
import configparser
import logging
import sys
import litellm
import time
2025-03-29 08:28:13 +00:00
import os
import requests
2025-03-28 18:37:58 +00:00
from comfy_api_simplified import ComfyApiWrapper, ComfyWorkflowWrapper
2025-04-19 17:33:20 +01:00
from tenacity import (
retry,
stop_after_attempt,
wait_fixed,
before_log,
retry_if_exception_type,
)
import nest_asyncio
2025-04-19 17:33:20 +01:00
import json
from datetime import datetime, timedelta
nest_asyncio.apply()
logging.basicConfig(level=logging.INFO)
2025-03-28 18:37:58 +00:00
2025-04-19 17:33:20 +01:00
LOG_FILE = "./prompts_log.jsonl"
def load_recent_prompts(days=7):
recent_prompts = []
cutoff_date = datetime.now().date() - timedelta(days=days)
try:
with open(LOG_FILE, "r") as f:
for line in f:
data = json.loads(line.strip())
prompt_date = datetime.strptime(data["date"], "%Y-%m-%d").date()
if prompt_date >= cutoff_date:
recent_prompts.append(data["prompt"])
except FileNotFoundError:
pass # No prompts yet
return recent_prompts
def save_prompt(prompt):
entry = {"date": datetime.now().strftime("%Y-%m-%d"), "prompt": prompt}
with open(LOG_FILE, "a") as f:
f.write(json.dumps(entry) + "\n")
2025-04-01 17:15:12 +01:00
def get_available_models() -> list:
"""Fetches available models from ComfyUI."""
url = user_config["comfyui"]["comfyui_url"] + "/object_info"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
2025-04-01 17:15:12 +01:00
return (
data.get("CheckpointLoaderSimple", {})
.get("input", {})
.get("required", {})
.get("ckpt_name", [])[0]
)
else:
print(f"Failed to fetch models: {response.status_code}")
return []
2025-04-01 17:15:12 +01:00
def cancel_current_job() -> list:
"""Fetches available models from ComfyUI."""
url = user_config["comfyui"]["comfyui_url"] + "/interrupt"
response = requests.post(url)
if response.status_code == 200:
return "Cancelled"
else:
return "Failed to cancel"
2025-04-01 17:15:12 +01:00
def load_config() -> configparser.ConfigParser:
"""Loads user configuration from ./user_config.cfg."""
2025-03-29 12:24:46 +00:00
user_config = configparser.ConfigParser()
try:
user_config.read("./user_config.cfg")
logging.debug("Configuration loaded successfully.")
return user_config
except KeyError as e:
logging.error(f"Missing configuration key: {e}")
sys.exit(1)
2025-03-28 18:37:58 +00:00
2025-04-01 17:15:12 +01:00
def rename_image() -> str | None:
"""Renames 'image.png' in the output folder to a timestamped filename if it exists."""
2025-03-29 08:28:13 +00:00
old_path = os.path.join(user_config["comfyui"]["output_dir"], "image.png")
2025-04-01 17:15:12 +01:00
2025-03-29 08:28:13 +00:00
if os.path.exists(old_path):
new_filename = f"{str(time.time())}.png"
new_path = os.path.join(user_config["comfyui"]["output_dir"], new_filename)
os.rename(old_path, new_path)
print(f"Renamed 'image.png' to '{new_filename}'")
return new_filename
else:
print("No image.png found.")
return None
def create_prompt_on_openwebui(prompt: str) -> str:
2025-04-01 17:15:12 +01:00
"""Sends prompt to OpenWebui and returns the generated response."""
2025-04-19 17:33:20 +01:00
recent_prompts = load_recent_prompts()
user_content = (
"Here are the prompts from the last 7 days:\n\n"
+ "\n".join(f"{i+1}. {p}" for i, p in enumerate(recent_prompts))
+ "\n\nDo not repeat ideas, themes, or settings from the above. Now generate a new, completely original Stable Diffusion prompt that hasn't been done yet."
)
model = random.choice(user_config["openwebui"]["models"].split(","))
2025-03-28 18:37:58 +00:00
response = litellm.completion(
api_base=user_config["openwebui"]["base_url"],
model="openai/" + model,
2025-03-28 18:37:58 +00:00
messages=[
{
"role": "system",
"content": (
"You are a prompt generator for Stable Diffusion. "
"Generate a detailed and imaginative prompt with a strong visual theme. "
"Focus on lighting, atmosphere, and artistic style. "
"Keep the prompt concise, no extra commentary or formatting."
),
},
2025-03-28 18:37:58 +00:00
{
"role": "user",
2025-04-19 17:33:20 +01:00
"content": user_content,
},
2025-03-28 18:37:58 +00:00
],
api_key=user_config["openwebui"]["api_key"],
)
prompt = response["choices"][0]["message"]["content"].strip('"')
# response = litellm.completion(
# api_base=user_config["openwebui"]["base_url"],
# model="openai/brxce/stable-diffusion-prompt-generator:latest",
# messages=[
# {
# "role": "user",
# "content": prompt,
# },
# ],
# api_key=user_config["openwebui"]["api_key"],
# )
# prompt = response["choices"][0]["message"]["content"].strip('"')
logging.debug(prompt)
return prompt
# Define the retry logic using Tenacity
2025-04-19 16:11:41 +01:00
# @retry(
2025-04-19 17:33:20 +01:00
# stop=stop_after_attempt(3),
# wait=wait_fixed(5),
2025-04-19 16:11:41 +01:00
# before=before_log(logging.getLogger(), logging.DEBUG),
2025-04-19 17:33:20 +01:00
# retry=retry_if_exception_type(Exception)
2025-04-19 16:11:41 +01:00
# )
2025-04-01 17:15:12 +01:00
def generate_image(file_name: str, comfy_prompt: str) -> None:
"""Generates an image using the Comfy API with retry logic."""
2025-03-28 18:37:58 +00:00
try:
2025-04-01 17:15:12 +01:00
# Initialize ComfyUI API and workflow
2025-03-28 18:37:58 +00:00
api = ComfyApiWrapper(user_config["comfyui"]["comfyui_url"])
wf = ComfyWorkflowWrapper("./workflow_api.json")
2025-04-19 17:33:20 +01:00
2025-03-28 18:37:58 +00:00
# Set workflow parameters
2025-04-01 17:18:22 +01:00
wf.set_node_param(
"KSampler", "seed", random.getrandbits(32)
) # Set a random seed for the sampler
wf.set_node_param(
"CLIP Text Encode (Prompt)", "text", comfy_prompt
) # Set the prompt to be used for image generation
wf.set_node_param(
"Save Image", "filename_prefix", file_name
) # Set the filename prefix for the generated image
2025-04-01 17:15:12 +01:00
wf.set_node_param( # Set image dimensions
"Empty Latent Image", "width", user_config["comfyui"]["width"]
)
wf.set_node_param(
"Empty Latent Image", "height", user_config["comfyui"]["height"]
)
2025-04-19 17:33:20 +01:00
2025-04-01 17:15:12 +01:00
# Validate available models and choose a random one
valid_models = list(
set(get_available_models()) # Get all available models from ComfyUI
& set(user_config["comfyui"]["models"].split(","))
)
if not valid_models:
raise Exception("No valid options available.")
model = random.choice(valid_models)
2025-04-01 17:18:22 +01:00
wf.set_node_param(
"Load Checkpoint", "ckpt_name", model
) # Set the model to be used for image generation
2025-04-19 17:33:20 +01:00
2025-04-01 17:15:12 +01:00
# Generate the image using the workflow and wait for completion
2025-03-28 18:37:58 +00:00
logging.debug(f"Generating image: {file_name}")
2025-04-01 17:18:22 +01:00
results = api.queue_and_wait_images(
wf, "Save Image"
) # Queue the workflow and wait for image generation to complete
2025-04-01 17:15:12 +01:00
rename_image() # Rename the generated image file if it exists
2025-04-19 17:33:20 +01:00
2025-04-01 17:15:12 +01:00
# Save the generated image to disk
2025-03-28 18:37:58 +00:00
for filename, image_data in results.items():
with open(
user_config["comfyui"]["output_dir"] + file_name + ".png", "wb+"
) as f:
f.write(image_data)
logging.debug(f"Image generated successfully for UID: {file_name}")
2025-04-19 17:33:20 +01:00
2025-03-28 18:37:58 +00:00
except Exception as e:
logging.error(f"Failed to generate image for UID: {file_name}. Error: {e}")
raise # Re-raise the exception for Tenacity to handle retries
2025-03-28 18:37:58 +00:00
2025-04-19 17:33:20 +01:00
2025-04-19 16:11:41 +01:00
def generate_image_flux(file_name: str, comfy_prompt: str) -> None:
"""Generates an image using the Comfy API with retry logic."""
try:
# Initialize ComfyUI API and workflow
api = ComfyApiWrapper(user_config["comfyui"]["comfyui_url"])
wf = ComfyWorkflowWrapper("./FLUX.json")
# Set workflow parameters
wf.set_node_param(
"Seed", "seed", random.getrandbits(32)
) # Set a random seed for the sampler
wf.set_node_param(
"Positive Prompt T5", "text", comfy_prompt
) # Set the prompt to be used for image generation
wf.set_node_param(
"CivitAI Image Saver", "filename", file_name
) # Set the filename prefix for the generated image
2025-04-19 17:33:20 +01:00
wf.set_node_param( # Set image dimensions
"CR Aspect Ratio", "width", user_config["comfyui"]["width"]
)
wf.set_node_param(
"CR Aspect Ratio", "height", user_config["comfyui"]["height"]
)
2025-04-19 16:11:41 +01:00
# # Validate available models and choose a random one
# valid_models = list(
# set(get_available_models()) # Get all available models from ComfyUI
# & set(user_config["comfyui"]["models"].split(","))
# )
# if not valid_models:
# raise Exception("No valid options available.")
# model = random.choice(valid_models)
# wf.set_node_param(
# "Load Checkpoint", "ckpt_name", model
# ) # Set the model to be used for image generation
2025-04-19 17:33:20 +01:00
2025-04-19 16:11:41 +01:00
# Generate the image using the workflow and wait for completion
logging.debug(f"Generating image: {file_name}")
results = api.queue_and_wait_images(
# wf, "Save Image"
2025-04-19 17:33:20 +01:00
wf,
"CivitAI Image Saver",
2025-04-19 16:11:41 +01:00
) # Queue the workflow and wait for image generation to complete
rename_image() # Rename the generated image file if it exists
2025-04-19 17:33:20 +01:00
2025-04-19 16:11:41 +01:00
# Save the generated image to disk
for filename, image_data in results.items():
with open(
user_config["comfyui"]["output_dir"] + file_name + ".png", "wb+"
) as f:
f.write(image_data)
logging.debug(f"Image generated successfully for UID: {file_name}")
2025-04-19 17:33:20 +01:00
2025-04-19 16:11:41 +01:00
except Exception as e:
logging.error(f"Failed to generate image for UID: {file_name}. Error: {e}")
raise # Re-raise the exception for Tenacity to handle retries
2025-03-28 18:37:58 +00:00
2025-04-01 17:15:12 +01:00
def create_image(prompt: str | None = None) -> None:
2025-03-28 18:37:58 +00:00
"""Main function for generating images."""
if prompt is None:
prompt = create_prompt_on_openwebui(user_config["comfyui"]["prompt"])
if prompt:
logging.info(f"Generated prompt: {prompt}") # Log generated prompt
2025-04-19 17:33:20 +01:00
save_prompt(prompt)
2025-04-19 16:11:41 +01:00
if user_config["comfyui"]["FLUX"]:
generate_image_flux("image", prompt)
else:
generate_image("image", prompt)
print(f"Image generation started with prompt: {prompt}")
else:
logging.error("No prompt generated.")
2025-03-29 12:24:46 +00:00
user_config = load_config()
output_folder = user_config["comfyui"]["output_dir"]