fix the random model logic

This commit is contained in:
Karl 2025-05-09 12:03:32 +01:00
parent ee18289558
commit e88d490e98
4 changed files with 290 additions and 277 deletions

2
.gitignore vendored
View File

@ -1,4 +1,4 @@
venv/* .venv/*
script.log script.log
**/*.pyc **/*.pyc
*.rtf *.rtf

512
lib.py
View File

@ -1,300 +1,302 @@
import random # import random
import configparser # import configparser
import logging # import logging
import sys # import sys
import litellm # import litellm
import time # import time
import os # import os
import requests # import requests
from PIL import Image # from PIL import Image
from typing import Optional # from typing import Optional
from comfy_api_simplified import ComfyApiWrapper, ComfyWorkflowWrapper # from comfy_api_simplified import ComfyApiWrapper, ComfyWorkflowWrapper
from tenacity import ( # from tenacity import (
retry, # retry,
stop_after_attempt, # stop_after_attempt,
wait_fixed, # wait_fixed,
before_log, # before_log,
retry_if_exception_type, # retry_if_exception_type,
) # )
import nest_asyncio # import nest_asyncio
import json # import json
from datetime import datetime # from datetime import datetime
from libs.create_thumbnail import generate_thumbnail # from libs.create_thumbnail import generate_thumbnail
nest_asyncio.apply() # nest_asyncio.apply()
logging.basicConfig(level=logging.INFO) # logging.basicConfig(level=logging.INFO)
LOG_FILE = "./prompts_log.jsonl" # LOG_FILE = "./prompts_log.jsonl"
def load_recent_prompts(count=7): # def load_recent_prompts(count=7):
recent_prompts = [] # recent_prompts = []
try: # try:
with open(LOG_FILE, "r") as f: # with open(LOG_FILE, "r") as f:
lines = f.readlines() # lines = f.readlines()
for line in lines[-count:]: # for line in lines[-count:]:
data = json.loads(line.strip()) # data = json.loads(line.strip())
recent_prompts.append(data["prompt"]) # recent_prompts.append(data["prompt"])
except FileNotFoundError: # except FileNotFoundError:
pass # No prompts yet # pass # No prompts yet
return recent_prompts # return recent_prompts
def save_prompt(prompt): # def save_prompt(prompt):
entry = {"date": datetime.now().strftime("%Y-%m-%d"), "prompt": prompt} # entry = {"date": datetime.now().strftime("%Y-%m-%d"), "prompt": prompt}
with open(LOG_FILE, "a") as f: # with open(LOG_FILE, "a") as f:
f.write(json.dumps(entry) + "\n") # f.write(json.dumps(entry) + "\n")
def get_available_models() -> list: # def get_available_models() -> list:
"""Fetches available models from ComfyUI.""" # """Fetches available models from ComfyUI."""
url = user_config["comfyui"]["comfyui_url"] + "/object_info" # url = user_config["comfyui"]["comfyui_url"] + "/object_info"
response = requests.get(url) # response = requests.get(url)
if response.status_code == 200: # if response.status_code == 200:
data = response.json() # data = response.json()
return ( # return (
data.get("CheckpointLoaderSimple", {}) # data.get("CheckpointLoaderSimple", {})
.get("input", {}) # .get("input", {})
.get("required", {}) # .get("required", {})
.get("ckpt_name", [])[0] # .get("ckpt_name", [])[0]
) # )
else: # else:
print(f"Failed to fetch models: {response.status_code}") # print(f"Failed to fetch models: {response.status_code}")
return [] # return []
def cancel_current_job() -> list: # def cancel_current_job() -> list:
"""Fetches available models from ComfyUI.""" # """Fetches available models from ComfyUI."""
url = user_config["comfyui"]["comfyui_url"] + "/interrupt" # url = user_config["comfyui"]["comfyui_url"] + "/interrupt"
response = requests.post(url) # response = requests.post(url)
if response.status_code == 200: # if response.status_code == 200:
return "Cancelled" # return "Cancelled"
else: # else:
return "Failed to cancel" # return "Failed to cancel"
def load_config() -> configparser.ConfigParser: # def load_config() -> configparser.ConfigParser:
"""Loads user configuration from ./user_config.cfg.""" # """Loads user configuration from ./user_config.cfg."""
user_config = configparser.ConfigParser() # user_config = configparser.ConfigParser()
try: # try:
user_config.read("./user_config.cfg") # user_config.read("./user_config.cfg")
logging.debug("Configuration loaded successfully.") # logging.debug("Configuration loaded successfully.")
return user_config # return user_config
except KeyError as e: # except KeyError as e:
logging.error(f"Missing configuration key: {e}") # logging.error(f"Missing configuration key: {e}")
sys.exit(1) # sys.exit(1)
def rename_image() -> str | None: # def rename_image() -> str | None:
"""Renames 'image.png' in the output folder to a timestamped filename if it exists.""" # """Renames 'image.png' in the output folder to a timestamped filename if it exists."""
old_path = os.path.join(user_config["comfyui"]["output_dir"], "image.png") # old_path = os.path.join(user_config["comfyui"]["output_dir"], "image.png")
if os.path.exists(old_path): # if os.path.exists(old_path):
new_filename = f"{str(time.time())}.png" # new_filename = f"{str(time.time())}.png"
new_path = os.path.join(user_config["comfyui"]["output_dir"], new_filename) # new_path = os.path.join(user_config["comfyui"]["output_dir"], new_filename)
os.rename(old_path, new_path) # os.rename(old_path, new_path)
generate_thumbnail(new_path) # generate_thumbnail(new_path)
print(f"Renamed 'image.png' to '{new_filename}'") # print(f"Renamed 'image.png' to '{new_filename}'")
return new_filename # return new_filename
else: # else:
print("No image.png found.") # print("No image.png found.")
return None # return None
def create_prompt_on_openwebui(prompt: str) -> str: # def create_prompt_on_openwebui(prompt: str) -> str:
"""Sends prompt to OpenWebui and returns the generated response.""" # """Sends prompt to OpenWebui and returns the generated response."""
# Unique list of recent prompts # # Unique list of recent prompts
recent_prompts = list(set(load_recent_prompts())) # recent_prompts = list(set(load_recent_prompts()))
# Decide on whether to include a topic (e.g., 30% chance to include) # # Decide on whether to include a topic (e.g., 30% chance to include)
topics = [t.strip() for t in user_config["comfyui"]["topics"].split(",") if t.strip()] # topics = [t.strip() for t in user_config["comfyui"]["topics"].split(",") if t.strip()]
topic_instruction = "" # topic_instruction = ""
if random.random() < 0.3 and topics: # if random.random() < 0.3 and topics:
selected_topic = random.choice(topics) # selected_topic = random.choice(topics)
topic_instruction = f" Incorporate the theme of '{selected_topic}' into the new prompt." # topic_instruction = f" Incorporate the theme of '{selected_topic}' into the new prompt."
user_content = ( # user_content = (
"Here are the prompts from the last 7 days:\n\n" # "Here are the prompts from the last 7 days:\n\n"
+ "\n".join(f"{i+1}. {p}" for i, p in enumerate(recent_prompts)) # + "\n".join(f"{i+1}. {p}" for i, p in enumerate(recent_prompts))
+ "\n\nDo not repeat ideas, themes, or settings from the above. " # + "\n\nDo not repeat ideas, themes, or settings from the above. "
"Now generate a new, completely original Stable Diffusion prompt that hasn't been done yet." # "Now generate a new, completely original Stable Diffusion prompt that hasn't been done yet."
+ topic_instruction # + topic_instruction
) # )
model = random.choice(user_config["openwebui"]["models"].split(",")) # model = random.choice(user_config["openwebui"]["models"].split(","))
response = litellm.completion( # response = litellm.completion(
api_base=user_config["openwebui"]["base_url"], # api_base=user_config["openwebui"]["base_url"],
model="openai/" + model, # model="openai/" + model,
messages=[ # messages=[
{ # {
"role": "system", # "role": "system",
"content": ( # "content": (
"You are a prompt generator for Stable Diffusion. " # "You are a prompt generator for Stable Diffusion. "
"Generate a detailed and imaginative prompt with a strong visual theme. " # "Generate a detailed and imaginative prompt with a strong visual theme. "
"Focus on lighting, atmosphere, and artistic style. " # "Focus on lighting, atmosphere, and artistic style. "
"Keep the prompt concise, no extra commentary or formatting." # "Keep the prompt concise, no extra commentary or formatting."
), # ),
}, # },
{ # {
"role": "user", # "role": "user",
"content": user_content, # "content": user_content,
}, # },
], # ],
api_key=user_config["openwebui"]["api_key"], # api_key=user_config["openwebui"]["api_key"],
) # )
prompt = response["choices"][0]["message"]["content"].strip('"') # prompt = response["choices"][0]["message"]["content"].strip('"')
# response = litellm.completion( # # response = litellm.completion(
# api_base=user_config["openwebui"]["base_url"], # # api_base=user_config["openwebui"]["base_url"],
# model="openai/brxce/stable-diffusion-prompt-generator:latest", # # model="openai/brxce/stable-diffusion-prompt-generator:latest",
# messages=[ # # messages=[
# { # # {
# "role": "user", # # "role": "user",
# "content": prompt, # # "content": prompt,
# }, # # },
# ], # # ],
# api_key=user_config["openwebui"]["api_key"], # # api_key=user_config["openwebui"]["api_key"],
# ) # # )
# prompt = response["choices"][0]["message"]["content"].strip('"') # # prompt = response["choices"][0]["message"]["content"].strip('"')
logging.debug(prompt) # logging.debug(prompt)
return prompt # return prompt
# Define the retry logic using Tenacity # # Define the retry logic using Tenacity
@retry( # @retry(
stop=stop_after_attempt(3), # stop=stop_after_attempt(3),
wait=wait_fixed(5), # wait=wait_fixed(5),
before=before_log(logging.getLogger(), logging.DEBUG), # before=before_log(logging.getLogger(), logging.DEBUG),
retry=retry_if_exception_type(Exception), # retry=retry_if_exception_type(Exception),
) # )
def generate_image( # def generate_image(
file_name: str, # file_name: str,
comfy_prompt: str, # comfy_prompt: str,
workflow_path: str = "./workflow_api.json", # workflow_path: str = "./workflow_api.json",
prompt_node: str = "CLIP Text Encode (Prompt)", # prompt_node: str = "CLIP Text Encode (Prompt)",
seed_node: str = "KSampler", # seed_node: str = "KSampler",
seed_param: str = "seed", # seed_param: str = "seed",
save_node: str = "Save Image", # save_node: str = "Save Image",
save_param: str = "filename_prefix", # save_param: str = "filename_prefix",
model_node: Optional[str] = "Load Checkpoint", # model_node: Optional[str] = "Load Checkpoint",
model_param: Optional[str] = "ckpt_name", # model_param: Optional[str] = "ckpt_name",
) -> None: # ) -> None:
"""Generates an image using the Comfy API with configurable workflow settings.""" # """Generates an image using the Comfy API with configurable workflow settings."""
try: # try:
api = ComfyApiWrapper(user_config["comfyui"]["comfyui_url"]) # api = ComfyApiWrapper(user_config["comfyui"]["comfyui_url"])
wf = ComfyWorkflowWrapper(workflow_path) # wf = ComfyWorkflowWrapper(workflow_path)
# Set workflow parameters # # Set workflow parameters
wf.set_node_param(seed_node, seed_param, random.getrandbits(32)) # wf.set_node_param(seed_node, seed_param, random.getrandbits(32))
wf.set_node_param(prompt_node, "text", comfy_prompt) # wf.set_node_param(prompt_node, "text", comfy_prompt)
wf.set_node_param(save_node, save_param, file_name) # wf.set_node_param(save_node, save_param, file_name)
wf.set_node_param( # wf.set_node_param(
( # (
"Empty Latent Image" # "Empty Latent Image"
if workflow_path.endswith("workflow_api.json") # if workflow_path.endswith("workflow_api.json")
else "CR Aspect Ratio" # else "CR Aspect Ratio"
), # ),
"width", # "width",
user_config["comfyui"]["width"], # user_config["comfyui"]["width"],
) # )
wf.set_node_param( # wf.set_node_param(
( # (
"Empty Latent Image" # "Empty Latent Image"
if workflow_path.endswith("workflow_api.json") # if workflow_path.endswith("workflow_api.json")
else "CR Aspect Ratio" # else "CR Aspect Ratio"
), # ),
"height", # "height",
user_config["comfyui"]["height"], # user_config["comfyui"]["height"],
) # )
# Conditionally set model if node and param are provided # # Conditionally set model if node and param are provided
if model_node and model_param: # if model_node and model_param:
if user_config["comfyui"].get("FLUX"): # if user_config["comfyui"].get("FLUX"):
valid_models = user_config["comfyui:flux"]["models"].split(",") # valid_models = user_config["comfyui:flux"]["models"].split(",")
else: # else:
available_model_list = user_config["comfyui"]["models"].split(",") # available_model_list = user_config["comfyui"]["models"].split(",")
valid_models = list( # valid_models = list(
set(get_available_models()) & set(available_model_list) # set(get_available_models()) & set(available_model_list)
) # )
if not valid_models: # if not valid_models:
raise Exception("No valid models available.") # raise Exception("No valid models available.")
model = random.choice(valid_models) # model = random.choice(valid_models)
wf.set_node_param(model_node, model_param, model) # wf.set_node_param(model_node, model_param, model)
# Generate image # # Generate image
logging.debug(f"Generating image: {file_name}") # logging.debug(f"Generating image: {file_name}")
results = api.queue_and_wait_images(wf, save_node) # results = api.queue_and_wait_images(wf, save_node)
rename_image() # rename_image()
for _, image_data in results.items(): # for _, image_data in results.items():
output_path = os.path.join( # output_path = os.path.join(
user_config["comfyui"]["output_dir"], f"{file_name}.png" # user_config["comfyui"]["output_dir"], f"{file_name}.png"
) # )
with open(output_path, "wb+") as f: # with open(output_path, "wb+") as f:
f.write(image_data) # f.write(image_data)
logging.debug(f"Image generated successfully for UID: {file_name}") # logging.debug(f"Image generated successfully for UID: {file_name}")
except Exception as e: # except Exception as e:
logging.error(f"Failed to generate image for UID: {file_name}. Error: {e}") # logging.error(f"Failed to generate image for UID: {file_name}. Error: {e}")
raise # raise
def create_image(prompt: str | None = None) -> None: # def create_image(prompt: str | None = None) -> None:
"""Main function for generating images.""" # """Main function for generating images."""
if prompt is None: # if prompt is None:
prompt = create_prompt_on_openwebui(user_config["comfyui"]["prompt"]) # prompt = create_prompt_on_openwebui(user_config["comfyui"]["prompt"])
if not prompt: # if not prompt:
logging.error("No prompt generated.") # logging.error("No prompt generated.")
return # return
save_prompt(prompt) # save_prompt(prompt)
use_flux = user_config["comfyui"].get("USE_FLUX", False) # use_flux = user_config["comfyui"].get("USE_FLUX", False)
only_flux = user_config["comfyui"].get("ONLY_FLUX", False) # only_flux = user_config["comfyui"].get("ONLY_FLUX", False)
selected_workflow = "SDXL" # selected_workflow = "SDXL"
if use_flux: # if use_flux:
selected_workflow = "FLUX" if only_flux else random.choice(["FLUX", "SDXL"]) # selected_workflow = "FLUX" if only_flux else random.choice(["FLUX", "SDXL"])
if selected_workflow == "FLUX": # if selected_workflow == "FLUX":
generate_image( # # generate_image(
file_name="image", # # file_name="image",
comfy_prompt=prompt, # # comfy_prompt=prompt,
workflow_path="./FLUX.json", # # workflow_path="./FLUX.json",
prompt_node="Positive Prompt T5", # # prompt_node="Positive Prompt T5",
seed_node="Seed", # # seed_node="Seed",
seed_param="seed", # # seed_param="seed",
save_node="CivitAI Image Saver", # # save_node="CivitAI Image Saver",
save_param="filename", # # save_param="filename",
model_node="CivitAI Image Saver", # # model_node="CivitAI Image Saver",
model_param="modelname", # # model_param="modelname",
) # # )
else: # print("flux")
generate_image("image", prompt) # else:
# print("sdxl")
# # generate_image("image", prompt)
logging.info(f"{selected_workflow} generation started with prompt: {prompt}") # logging.info(f"{selected_workflow} generation started with prompt: {prompt}")
def get_prompt_from_png(path): # def get_prompt_from_png(path):
try: # try:
with Image.open(path) as img: # with Image.open(path) as img:
try: # try:
# Flux workflow # # Flux workflow
meta = json.loads(img.info["prompt"])['44']['inputs']['text'] # meta = json.loads(img.info["prompt"])['44']['inputs']['text']
except KeyError: # except KeyError:
# SDXL workflow # # SDXL workflow
meta = json.loads(img.info["prompt"])['6']['inputs']['text'] # meta = json.loads(img.info["prompt"])['6']['inputs']['text']
return meta or "" # return meta or ""
except Exception as e: # except Exception as e:
print(f"Error reading metadata from {path}: {e}") # print(f"Error reading metadata from {path}: {e}")
return "" # return ""
user_config = load_config() # user_config = load_config()
output_folder = user_config["comfyui"]["output_dir"] # output_folder = user_config["comfyui"]["output_dir"]

View File

@ -1,6 +1,7 @@
import random import random
import logging import logging
import os import os
import json
import requests import requests
from typing import Optional from typing import Optional
from comfy_api_simplified import ComfyApiWrapper, ComfyWorkflowWrapper from comfy_api_simplified import ComfyApiWrapper, ComfyWorkflowWrapper
@ -139,25 +140,35 @@ def create_image(prompt: str | None = None) -> None:
"""Main function for generating images.""" """Main function for generating images."""
if prompt is None: if prompt is None:
prompt = create_prompt_on_openwebui(user_config["comfyui"]["prompt"]) prompt = create_prompt_on_openwebui(user_config["comfyui"]["prompt"])
if prompt:
logging.info(f"Generated prompt: {prompt}") # Log generated prompt if not prompt:
save_prompt(prompt)
if user_config["comfyui"]["FLUX"]:
generate_image(
file_name="image",
comfy_prompt=prompt,
workflow_path="./FLUX.json",
prompt_node="Positive Prompt T5",
seed_node="Seed",
seed_param="seed",
save_node="CivitAI Image Saver",
save_param="filename",
model_node="CivitAI Image Saver",
model_param="modelname",
)
else:
generate_image("image", prompt)
print(f"Image generation started with prompt: {prompt}")
else:
logging.error("No prompt generated.") logging.error("No prompt generated.")
return
save_prompt(prompt)
use_flux = json.loads((user_config["comfyui"].get("USE_FLUX", False)).lower())
only_flux = json.loads((user_config["comfyui"].get("ONLY_FLUX", False)).lower())
selected_workflow = "SDXL"
if use_flux:
selected_workflow = "FLUX" if only_flux else random.choice(["FLUX", "SDXL"])
if selected_workflow == "FLUX":
generate_image(
file_name="image",
comfy_prompt=prompt,
workflow_path="./FLUX.json",
prompt_node="Positive Prompt T5",
seed_node="Seed",
seed_param="seed",
save_node="CivitAI Image Saver",
save_param="filename",
model_node="CivitAI Image Saver",
model_param="modelname",
)
else:
generate_image("image", prompt)
logging.info(f"{selected_workflow} generation started with prompt: {prompt}")

View File

@ -19,7 +19,7 @@ def create_prompt_on_openwebui(prompt: str) -> str:
# Decide on whether to include a topic (e.g., 30% chance to include) # Decide on whether to include a topic (e.g., 30% chance to include)
topics = [t.strip() for t in user_config["comfyui"]["topics"].split(",") if t.strip()] topics = [t.strip() for t in user_config["comfyui"]["topics"].split(",") if t.strip()]
topic_instruction = "" topic_instruction = ""
if random.random() < 0.3 and topics: if random.random() < 0.5 and topics:
selected_topic = random.choice(topics) selected_topic = random.choice(topics)
topic_instruction = f" Incorporate the theme of '{selected_topic}' into the new prompt." topic_instruction = f" Incorporate the theme of '{selected_topic}' into the new prompt."