2025-03-28 18:37:58 +00:00
import random
import configparser
import logging
import sys
import litellm
import time
2025-03-29 08:28:13 +00:00
import os
2025-03-29 15:50:38 +00:00
import requests
2025-04-19 17:46:04 +01:00
from typing import Optional
2025-03-28 18:37:58 +00:00
from comfy_api_simplified import ComfyApiWrapper , ComfyWorkflowWrapper
2025-04-19 17:33:20 +01:00
from tenacity import (
retry ,
stop_after_attempt ,
wait_fixed ,
before_log ,
retry_if_exception_type ,
)
2025-04-05 16:49:53 +01:00
import nest_asyncio
2025-04-19 17:33:20 +01:00
import json
2025-04-20 10:05:43 +01:00
from datetime import datetime
2025-04-19 17:33:20 +01:00
2025-04-05 16:49:53 +01:00
nest_asyncio . apply ( )
logging . basicConfig ( level = logging . INFO )
2025-03-28 18:37:58 +00:00
2025-04-19 17:33:20 +01:00
LOG_FILE = " ./prompts_log.jsonl "
2025-04-20 10:05:43 +01:00
def load_recent_prompts ( count = 7 ) :
2025-04-19 17:33:20 +01:00
recent_prompts = [ ]
try :
with open ( LOG_FILE , " r " ) as f :
2025-04-20 10:05:43 +01:00
lines = f . readlines ( )
for line in lines [ - count : ] :
2025-04-19 17:33:20 +01:00
data = json . loads ( line . strip ( ) )
2025-04-20 10:05:43 +01:00
recent_prompts . append ( data [ " prompt " ] )
2025-04-19 17:33:20 +01:00
except FileNotFoundError :
pass # No prompts yet
return recent_prompts
def save_prompt ( prompt ) :
entry = { " date " : datetime . now ( ) . strftime ( " % Y- % m- %d " ) , " prompt " : prompt }
with open ( LOG_FILE , " a " ) as f :
f . write ( json . dumps ( entry ) + " \n " )
2025-04-01 17:15:12 +01:00
2025-04-19 17:46:04 +01:00
2025-04-01 17:15:12 +01:00
def get_available_models ( ) - > list :
""" Fetches available models from ComfyUI. """
2025-03-29 15:50:38 +00:00
url = user_config [ " comfyui " ] [ " comfyui_url " ] + " /object_info "
response = requests . get ( url )
if response . status_code == 200 :
data = response . json ( )
2025-04-01 17:15:12 +01:00
return (
data . get ( " CheckpointLoaderSimple " , { } )
. get ( " input " , { } )
. get ( " required " , { } )
. get ( " ckpt_name " , [ ] ) [ 0 ]
)
2025-03-29 15:50:38 +00:00
else :
print ( f " Failed to fetch models: { response . status_code } " )
return [ ]
2025-04-01 17:15:12 +01:00
2025-04-12 09:50:15 +01:00
def cancel_current_job ( ) - > list :
""" Fetches available models from ComfyUI. """
url = user_config [ " comfyui " ] [ " comfyui_url " ] + " /interrupt "
response = requests . post ( url )
if response . status_code == 200 :
return " Cancelled "
else :
return " Failed to cancel "
2025-04-01 17:15:12 +01:00
def load_config ( ) - > configparser . ConfigParser :
""" Loads user configuration from ./user_config.cfg. """
2025-03-29 12:24:46 +00:00
user_config = configparser . ConfigParser ( )
try :
user_config . read ( " ./user_config.cfg " )
logging . debug ( " Configuration loaded successfully. " )
return user_config
except KeyError as e :
logging . error ( f " Missing configuration key: { e } " )
sys . exit ( 1 )
2025-03-28 18:37:58 +00:00
2025-03-29 15:50:38 +00:00
2025-04-01 17:15:12 +01:00
def rename_image ( ) - > str | None :
""" Renames ' image.png ' in the output folder to a timestamped filename if it exists. """
2025-03-29 08:28:13 +00:00
old_path = os . path . join ( user_config [ " comfyui " ] [ " output_dir " ] , " image.png " )
2025-04-01 17:15:12 +01:00
2025-03-29 08:28:13 +00:00
if os . path . exists ( old_path ) :
new_filename = f " { str ( time . time ( ) ) } .png "
new_path = os . path . join ( user_config [ " comfyui " ] [ " output_dir " ] , new_filename )
os . rename ( old_path , new_path )
print ( f " Renamed ' image.png ' to ' { new_filename } ' " )
return new_filename
else :
print ( " No image.png found. " )
return None
2025-03-29 15:50:38 +00:00
2025-04-12 09:50:15 +01:00
def create_prompt_on_openwebui ( prompt : str ) - > str :
2025-04-01 17:15:12 +01:00
""" Sends prompt to OpenWebui and returns the generated response. """
2025-04-19 17:33:20 +01:00
recent_prompts = load_recent_prompts ( )
user_content = (
" Here are the prompts from the last 7 days: \n \n "
+ " \n " . join ( f " { i + 1 } . { p } " for i , p in enumerate ( recent_prompts ) )
+ " \n \n Do not repeat ideas, themes, or settings from the above. Now generate a new, completely original Stable Diffusion prompt that hasn ' t been done yet. "
)
2025-04-12 10:17:24 +01:00
model = random . choice ( user_config [ " openwebui " ] [ " models " ] . split ( " , " ) )
2025-03-28 18:37:58 +00:00
response = litellm . completion (
api_base = user_config [ " openwebui " ] [ " base_url " ] ,
2025-04-12 10:17:24 +01:00
model = " openai/ " + model ,
2025-03-28 18:37:58 +00:00
messages = [
2025-04-05 16:49:53 +01:00
{
" role " : " system " ,
" content " : (
" You are a prompt generator for Stable Diffusion. "
" Generate a detailed and imaginative prompt with a strong visual theme. "
" Focus on lighting, atmosphere, and artistic style. "
" Keep the prompt concise, no extra commentary or formatting. "
) ,
} ,
2025-03-28 18:37:58 +00:00
{
" role " : " user " ,
2025-04-19 17:33:20 +01:00
" content " : user_content ,
2025-04-05 16:49:53 +01:00
} ,
2025-03-28 18:37:58 +00:00
] ,
api_key = user_config [ " openwebui " ] [ " api_key " ] ,
)
2025-04-05 16:49:53 +01:00
prompt = response [ " choices " ] [ 0 ] [ " message " ] [ " content " ] . strip ( ' " ' )
# response = litellm.completion(
# api_base=user_config["openwebui"]["base_url"],
# model="openai/brxce/stable-diffusion-prompt-generator:latest",
# messages=[
# {
# "role": "user",
# "content": prompt,
# },
# ],
# api_key=user_config["openwebui"]["api_key"],
# )
# prompt = response["choices"][0]["message"]["content"].strip('"')
logging . debug ( prompt )
return prompt
# Define the retry logic using Tenacity
2025-04-19 17:46:04 +01:00
@retry (
stop = stop_after_attempt ( 3 ) ,
wait = wait_fixed ( 5 ) ,
before = before_log ( logging . getLogger ( ) , logging . DEBUG ) ,
retry = retry_if_exception_type ( Exception ) ,
)
def generate_image (
file_name : str ,
comfy_prompt : str ,
workflow_path : str = " ./workflow_api.json " ,
prompt_node : str = " CLIP Text Encode (Prompt) " ,
seed_node : str = " KSampler " ,
seed_param : str = " seed " ,
save_node : str = " Save Image " ,
save_param : str = " filename_prefix " ,
model_node : Optional [ str ] = " Load Checkpoint " ,
model_param : Optional [ str ] = " ckpt_name " ,
) - > None :
""" Generates an image using the Comfy API with configurable workflow settings. """
2025-03-28 18:37:58 +00:00
try :
api = ComfyApiWrapper ( user_config [ " comfyui " ] [ " comfyui_url " ] )
2025-04-19 17:46:04 +01:00
wf = ComfyWorkflowWrapper ( workflow_path )
2025-04-19 17:33:20 +01:00
2025-03-28 18:37:58 +00:00
# Set workflow parameters
2025-04-19 17:46:04 +01:00
wf . set_node_param ( seed_node , seed_param , random . getrandbits ( 32 ) )
wf . set_node_param ( prompt_node , " text " , comfy_prompt )
wf . set_node_param ( save_node , save_param , file_name )
2025-04-01 17:18:22 +01:00
wf . set_node_param (
2025-04-19 17:46:04 +01:00
(
" Empty Latent Image "
if workflow_path . endswith ( " workflow_api.json " )
else " CR Aspect Ratio "
) ,
" width " ,
user_config [ " comfyui " ] [ " width " ] ,
2025-04-01 17:15:12 +01:00
)
wf . set_node_param (
2025-04-19 17:46:04 +01:00
(
" Empty Latent Image "
if workflow_path . endswith ( " workflow_api.json " )
else " CR Aspect Ratio "
) ,
" height " ,
user_config [ " comfyui " ] [ " height " ] ,
2025-04-01 17:15:12 +01:00
)
2025-04-19 17:33:20 +01:00
2025-04-19 17:46:04 +01:00
# Conditionally set model if node and param are provided
if model_node and model_param :
2025-04-21 10:31:13 +01:00
if user_config [ " comfyui " ] . get ( " FLUX " ) :
2025-04-21 18:53:29 +01:00
available_model_list = user_config [ " comfyui " ] [ " comfyui:flux " ] [ " models " ] . split ( " , " )
2025-04-21 10:31:13 +01:00
else :
available_model_list = user_config [ " comfyui " ] [ " models " ] . split ( " , " )
2025-04-19 17:46:04 +01:00
valid_models = list (
2025-04-21 10:31:13 +01:00
set ( get_available_models ( ) ) & set ( available_model_list )
2025-04-19 17:46:04 +01:00
)
2025-04-21 10:31:13 +01:00
2025-04-19 17:46:04 +01:00
if not valid_models :
raise Exception ( " No valid models available. " )
2025-04-21 10:31:13 +01:00
2025-04-19 17:46:04 +01:00
model = random . choice ( valid_models )
wf . set_node_param ( model_node , model_param , model )
2025-04-21 10:31:13 +01:00
2025-04-19 17:46:04 +01:00
# Generate image
2025-03-28 18:37:58 +00:00
logging . debug ( f " Generating image: { file_name } " )
2025-04-19 17:46:04 +01:00
results = api . queue_and_wait_images ( wf , save_node )
rename_image ( )
for _ , image_data in results . items ( ) :
output_path = os . path . join (
user_config [ " comfyui " ] [ " output_dir " ] , f " { file_name } .png "
)
with open ( output_path , " wb+ " ) as f :
2025-03-28 18:37:58 +00:00
f . write ( image_data )
2025-04-19 17:33:20 +01:00
2025-04-19 16:11:41 +01:00
logging . debug ( f " Image generated successfully for UID: { file_name } " )
2025-04-19 17:33:20 +01:00
2025-04-19 16:11:41 +01:00
except Exception as e :
logging . error ( f " Failed to generate image for UID: { file_name } . Error: { e } " )
2025-04-19 17:46:04 +01:00
raise
2025-04-19 16:11:41 +01:00
2025-03-28 18:37:58 +00:00
2025-04-01 17:15:12 +01:00
def create_image ( prompt : str | None = None ) - > None :
2025-03-28 18:37:58 +00:00
""" Main function for generating images. """
2025-03-29 15:50:38 +00:00
if prompt is None :
2025-04-12 09:50:15 +01:00
prompt = create_prompt_on_openwebui ( user_config [ " comfyui " ] [ " prompt " ] )
2025-04-05 16:49:53 +01:00
if prompt :
logging . info ( f " Generated prompt: { prompt } " ) # Log generated prompt
2025-04-19 17:33:20 +01:00
save_prompt ( prompt )
2025-04-19 16:11:41 +01:00
if user_config [ " comfyui " ] [ " FLUX " ] :
2025-04-19 17:46:04 +01:00
generate_image (
file_name = " image " ,
comfy_prompt = prompt ,
workflow_path = " ./FLUX.json " ,
prompt_node = " Positive Prompt T5 " ,
seed_node = " Seed " ,
seed_param = " seed " ,
save_node = " CivitAI Image Saver " ,
save_param = " filename " ,
2025-04-21 10:31:13 +01:00
model_node = " CivitAI Image Saver " ,
model_param = " modelname " ,
2025-04-19 17:46:04 +01:00
)
2025-04-19 16:11:41 +01:00
else :
generate_image ( " image " , prompt )
2025-04-05 16:49:53 +01:00
print ( f " Image generation started with prompt: { prompt } " )
else :
logging . error ( " No prompt generated. " )
2025-03-29 12:24:46 +00:00
user_config = load_config ( )
output_folder = user_config [ " comfyui " ] [ " output_dir " ]