2025-03-28 18:37:58 +00:00
|
|
|
import random
|
|
|
|
import configparser
|
|
|
|
import logging
|
|
|
|
import sys
|
|
|
|
import litellm
|
|
|
|
import time
|
2025-03-29 08:28:13 +00:00
|
|
|
import os
|
2025-03-29 15:50:38 +00:00
|
|
|
import requests
|
2025-03-28 18:37:58 +00:00
|
|
|
from comfy_api_simplified import ComfyApiWrapper, ComfyWorkflowWrapper
|
2025-04-05 16:49:53 +01:00
|
|
|
from tenacity import retry, stop_after_attempt, wait_fixed, before_log, retry_if_exception_type
|
|
|
|
import nest_asyncio
|
|
|
|
nest_asyncio.apply()
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
2025-03-28 18:37:58 +00:00
|
|
|
|
2025-04-01 17:15:12 +01:00
|
|
|
|
|
|
|
def get_available_models() -> list:
|
|
|
|
"""Fetches available models from ComfyUI."""
|
2025-03-29 15:50:38 +00:00
|
|
|
url = user_config["comfyui"]["comfyui_url"] + "/object_info"
|
|
|
|
response = requests.get(url)
|
|
|
|
if response.status_code == 200:
|
|
|
|
data = response.json()
|
2025-04-01 17:15:12 +01:00
|
|
|
return (
|
|
|
|
data.get("CheckpointLoaderSimple", {})
|
|
|
|
.get("input", {})
|
|
|
|
.get("required", {})
|
|
|
|
.get("ckpt_name", [])[0]
|
|
|
|
)
|
2025-03-29 15:50:38 +00:00
|
|
|
else:
|
|
|
|
print(f"Failed to fetch models: {response.status_code}")
|
|
|
|
return []
|
|
|
|
|
2025-04-01 17:15:12 +01:00
|
|
|
|
2025-04-12 09:50:15 +01:00
|
|
|
def cancel_current_job() -> list:
|
|
|
|
"""Fetches available models from ComfyUI."""
|
|
|
|
url = user_config["comfyui"]["comfyui_url"] + "/interrupt"
|
|
|
|
response = requests.post(url)
|
|
|
|
if response.status_code == 200:
|
|
|
|
return "Cancelled"
|
|
|
|
else:
|
|
|
|
return "Failed to cancel"
|
|
|
|
|
|
|
|
|
2025-04-01 17:15:12 +01:00
|
|
|
def load_config() -> configparser.ConfigParser:
|
|
|
|
"""Loads user configuration from ./user_config.cfg."""
|
2025-03-29 12:24:46 +00:00
|
|
|
user_config = configparser.ConfigParser()
|
|
|
|
try:
|
|
|
|
user_config.read("./user_config.cfg")
|
|
|
|
logging.debug("Configuration loaded successfully.")
|
|
|
|
return user_config
|
|
|
|
except KeyError as e:
|
|
|
|
logging.error(f"Missing configuration key: {e}")
|
|
|
|
sys.exit(1)
|
2025-03-28 18:37:58 +00:00
|
|
|
|
2025-03-29 15:50:38 +00:00
|
|
|
|
2025-04-01 17:15:12 +01:00
|
|
|
def rename_image() -> str | None:
|
|
|
|
"""Renames 'image.png' in the output folder to a timestamped filename if it exists."""
|
2025-03-29 08:28:13 +00:00
|
|
|
old_path = os.path.join(user_config["comfyui"]["output_dir"], "image.png")
|
2025-04-01 17:15:12 +01:00
|
|
|
|
2025-03-29 08:28:13 +00:00
|
|
|
if os.path.exists(old_path):
|
|
|
|
new_filename = f"{str(time.time())}.png"
|
|
|
|
new_path = os.path.join(user_config["comfyui"]["output_dir"], new_filename)
|
|
|
|
os.rename(old_path, new_path)
|
|
|
|
print(f"Renamed 'image.png' to '{new_filename}'")
|
|
|
|
return new_filename
|
|
|
|
else:
|
|
|
|
print("No image.png found.")
|
|
|
|
return None
|
|
|
|
|
2025-03-29 15:50:38 +00:00
|
|
|
|
2025-04-12 09:50:15 +01:00
|
|
|
def create_prompt_on_openwebui(prompt: str) -> str:
|
2025-04-01 17:15:12 +01:00
|
|
|
"""Sends prompt to OpenWebui and returns the generated response."""
|
2025-04-12 10:17:24 +01:00
|
|
|
model = random.choice(user_config["openwebui"]["models"].split(","))
|
2025-03-28 18:37:58 +00:00
|
|
|
response = litellm.completion(
|
|
|
|
api_base=user_config["openwebui"]["base_url"],
|
2025-04-12 10:17:24 +01:00
|
|
|
model="openai/" + model,
|
2025-03-28 18:37:58 +00:00
|
|
|
messages=[
|
2025-04-05 16:49:53 +01:00
|
|
|
{
|
|
|
|
"role": "system",
|
|
|
|
"content": (
|
|
|
|
"You are a prompt generator for Stable Diffusion. "
|
|
|
|
"Generate a detailed and imaginative prompt with a strong visual theme. "
|
|
|
|
"Focus on lighting, atmosphere, and artistic style. "
|
|
|
|
"Keep the prompt concise, no extra commentary or formatting."
|
|
|
|
),
|
|
|
|
},
|
2025-03-28 18:37:58 +00:00
|
|
|
{
|
|
|
|
"role": "user",
|
|
|
|
"content": prompt,
|
2025-04-05 16:49:53 +01:00
|
|
|
},
|
2025-03-28 18:37:58 +00:00
|
|
|
],
|
|
|
|
api_key=user_config["openwebui"]["api_key"],
|
|
|
|
)
|
|
|
|
|
2025-04-05 16:49:53 +01:00
|
|
|
prompt = response["choices"][0]["message"]["content"].strip('"')
|
|
|
|
# response = litellm.completion(
|
|
|
|
# api_base=user_config["openwebui"]["base_url"],
|
|
|
|
# model="openai/brxce/stable-diffusion-prompt-generator:latest",
|
|
|
|
# messages=[
|
|
|
|
# {
|
|
|
|
# "role": "user",
|
|
|
|
# "content": prompt,
|
|
|
|
# },
|
|
|
|
# ],
|
|
|
|
# api_key=user_config["openwebui"]["api_key"],
|
|
|
|
# )
|
|
|
|
# prompt = response["choices"][0]["message"]["content"].strip('"')
|
|
|
|
logging.debug(prompt)
|
|
|
|
return prompt
|
|
|
|
|
|
|
|
|
|
|
|
# Define the retry logic using Tenacity
|
|
|
|
@retry(
|
|
|
|
stop=stop_after_attempt(3),
|
|
|
|
wait=wait_fixed(5),
|
|
|
|
before=before_log(logging.getLogger(), logging.DEBUG),
|
|
|
|
retry=retry_if_exception_type(Exception)
|
|
|
|
)
|
2025-04-01 17:15:12 +01:00
|
|
|
def generate_image(file_name: str, comfy_prompt: str) -> None:
|
2025-04-05 16:49:53 +01:00
|
|
|
"""Generates an image using the Comfy API with retry logic."""
|
2025-03-28 18:37:58 +00:00
|
|
|
try:
|
2025-04-01 17:15:12 +01:00
|
|
|
# Initialize ComfyUI API and workflow
|
2025-03-28 18:37:58 +00:00
|
|
|
api = ComfyApiWrapper(user_config["comfyui"]["comfyui_url"])
|
|
|
|
wf = ComfyWorkflowWrapper("./workflow_api.json")
|
2025-04-05 16:49:53 +01:00
|
|
|
|
2025-03-28 18:37:58 +00:00
|
|
|
# Set workflow parameters
|
2025-04-01 17:18:22 +01:00
|
|
|
wf.set_node_param(
|
|
|
|
"KSampler", "seed", random.getrandbits(32)
|
|
|
|
) # Set a random seed for the sampler
|
|
|
|
wf.set_node_param(
|
|
|
|
"CLIP Text Encode (Prompt)", "text", comfy_prompt
|
|
|
|
) # Set the prompt to be used for image generation
|
|
|
|
wf.set_node_param(
|
|
|
|
"Save Image", "filename_prefix", file_name
|
|
|
|
) # Set the filename prefix for the generated image
|
2025-04-01 17:15:12 +01:00
|
|
|
wf.set_node_param( # Set image dimensions
|
|
|
|
"Empty Latent Image", "width", user_config["comfyui"]["width"]
|
|
|
|
)
|
|
|
|
wf.set_node_param(
|
|
|
|
"Empty Latent Image", "height", user_config["comfyui"]["height"]
|
|
|
|
)
|
2025-04-05 16:49:53 +01:00
|
|
|
|
2025-04-01 17:15:12 +01:00
|
|
|
# Validate available models and choose a random one
|
|
|
|
valid_models = list(
|
|
|
|
set(get_available_models()) # Get all available models from ComfyUI
|
|
|
|
& set(user_config["comfyui"]["models"].split(","))
|
|
|
|
)
|
2025-03-29 15:50:38 +00:00
|
|
|
if not valid_models:
|
|
|
|
raise Exception("No valid options available.")
|
|
|
|
model = random.choice(valid_models)
|
2025-04-01 17:18:22 +01:00
|
|
|
wf.set_node_param(
|
|
|
|
"Load Checkpoint", "ckpt_name", model
|
|
|
|
) # Set the model to be used for image generation
|
2025-04-05 16:49:53 +01:00
|
|
|
|
2025-04-01 17:15:12 +01:00
|
|
|
# Generate the image using the workflow and wait for completion
|
2025-03-28 18:37:58 +00:00
|
|
|
logging.debug(f"Generating image: {file_name}")
|
2025-04-01 17:18:22 +01:00
|
|
|
results = api.queue_and_wait_images(
|
|
|
|
wf, "Save Image"
|
|
|
|
) # Queue the workflow and wait for image generation to complete
|
2025-04-01 17:15:12 +01:00
|
|
|
rename_image() # Rename the generated image file if it exists
|
2025-04-05 16:49:53 +01:00
|
|
|
|
2025-04-01 17:15:12 +01:00
|
|
|
# Save the generated image to disk
|
2025-03-28 18:37:58 +00:00
|
|
|
for filename, image_data in results.items():
|
|
|
|
with open(
|
|
|
|
user_config["comfyui"]["output_dir"] + file_name + ".png", "wb+"
|
|
|
|
) as f:
|
|
|
|
f.write(image_data)
|
|
|
|
logging.debug(f"Image generated successfully for UID: {file_name}")
|
2025-04-05 16:49:53 +01:00
|
|
|
|
2025-03-28 18:37:58 +00:00
|
|
|
except Exception as e:
|
|
|
|
logging.error(f"Failed to generate image for UID: {file_name}. Error: {e}")
|
2025-04-05 16:49:53 +01:00
|
|
|
raise # Re-raise the exception for Tenacity to handle retries
|
2025-03-28 18:37:58 +00:00
|
|
|
|
|
|
|
|
2025-04-01 17:15:12 +01:00
|
|
|
def create_image(prompt: str | None = None) -> None:
|
2025-03-28 18:37:58 +00:00
|
|
|
"""Main function for generating images."""
|
2025-03-29 15:50:38 +00:00
|
|
|
if prompt is None:
|
2025-04-12 09:50:15 +01:00
|
|
|
prompt = create_prompt_on_openwebui(user_config["comfyui"]["prompt"])
|
2025-04-05 16:49:53 +01:00
|
|
|
if prompt:
|
|
|
|
logging.info(f"Generated prompt: {prompt}") # Log generated prompt
|
|
|
|
generate_image("image", prompt)
|
|
|
|
print(f"Image generation started with prompt: {prompt}")
|
|
|
|
else:
|
|
|
|
logging.error("No prompt generated.")
|
2025-03-29 12:24:46 +00:00
|
|
|
|
|
|
|
|
|
|
|
user_config = load_config()
|
|
|
|
output_folder = user_config["comfyui"]["output_dir"]
|