cleanup and show modle name on gallery

This commit is contained in:
Karl 2025-05-17 10:15:29 +01:00
parent ab1c0c3913
commit cf9f5d0413
7 changed files with 20 additions and 318 deletions

View File

@ -9,8 +9,7 @@ import os
import time
import threading
from apscheduler.schedulers.background import BackgroundScheduler
# from lib import create_image, load_config, create_prompt_on_openwebui, cancel_current_job, get_prompt_from_png
from libs.generic import load_config, load_recent_prompts, get_prompt_from_png
from libs.generic import load_config, load_recent_prompts, get_details_from_png
from libs.comfyui import cancel_current_job, create_image
from libs.ollama import create_prompt_on_openwebui
@ -29,7 +28,7 @@ def index() -> str:
image_filename = "./image.png"
image_path = os.path.join(image_folder, image_filename)
prompt = get_prompt_from_png(image_path)
prompt = get_details_from_png(image_path)["p"]
return render_template(
"index.html",
@ -50,8 +49,8 @@ def gallery() -> str:
for f in os.listdir(image_folder):
if f.lower().endswith(('png', 'jpg', 'jpeg', 'gif')):
path = os.path.join(image_folder, f) # Full path to the image
prompt = get_prompt_from_png(path) # Your method to extract the prompt
images.append({'filename': f, 'prompt': prompt, 'path': path}) # Add 'path' to the dictionary
details = get_details_from_png(path) # Your method to extract the prompt
images.append({'filename': f, 'prompt': details["p"], 'model':details["m"], 'path': path}) # Add 'path' to the dictionary
images = sorted(images, key=lambda x: os.path.getmtime(x['path']), reverse=True)
return render_template("gallery.html", images=images)

302
lib.py
View File

@ -1,302 +0,0 @@
# import random
# import configparser
# import logging
# import sys
# import litellm
# import time
# import os
# import requests
# from PIL import Image
# from typing import Optional
# from comfy_api_simplified import ComfyApiWrapper, ComfyWorkflowWrapper
# from tenacity import (
# retry,
# stop_after_attempt,
# wait_fixed,
# before_log,
# retry_if_exception_type,
# )
# import nest_asyncio
# import json
# from datetime import datetime
# from libs.create_thumbnail import generate_thumbnail
# nest_asyncio.apply()
# logging.basicConfig(level=logging.INFO)
# LOG_FILE = "./prompts_log.jsonl"
# def load_recent_prompts(count=7):
# recent_prompts = []
# try:
# with open(LOG_FILE, "r") as f:
# lines = f.readlines()
# for line in lines[-count:]:
# data = json.loads(line.strip())
# recent_prompts.append(data["prompt"])
# except FileNotFoundError:
# pass # No prompts yet
# return recent_prompts
# def save_prompt(prompt):
# entry = {"date": datetime.now().strftime("%Y-%m-%d"), "prompt": prompt}
# with open(LOG_FILE, "a") as f:
# f.write(json.dumps(entry) + "\n")
# def get_available_models() -> list:
# """Fetches available models from ComfyUI."""
# url = user_config["comfyui"]["comfyui_url"] + "/object_info"
# response = requests.get(url)
# if response.status_code == 200:
# data = response.json()
# return (
# data.get("CheckpointLoaderSimple", {})
# .get("input", {})
# .get("required", {})
# .get("ckpt_name", [])[0]
# )
# else:
# print(f"Failed to fetch models: {response.status_code}")
# return []
# def cancel_current_job() -> list:
# """Fetches available models from ComfyUI."""
# url = user_config["comfyui"]["comfyui_url"] + "/interrupt"
# response = requests.post(url)
# if response.status_code == 200:
# return "Cancelled"
# else:
# return "Failed to cancel"
# def load_config() -> configparser.ConfigParser:
# """Loads user configuration from ./user_config.cfg."""
# user_config = configparser.ConfigParser()
# try:
# user_config.read("./user_config.cfg")
# logging.debug("Configuration loaded successfully.")
# return user_config
# except KeyError as e:
# logging.error(f"Missing configuration key: {e}")
# sys.exit(1)
# def rename_image() -> str | None:
# """Renames 'image.png' in the output folder to a timestamped filename if it exists."""
# old_path = os.path.join(user_config["comfyui"]["output_dir"], "image.png")
# if os.path.exists(old_path):
# new_filename = f"{str(time.time())}.png"
# new_path = os.path.join(user_config["comfyui"]["output_dir"], new_filename)
# os.rename(old_path, new_path)
# generate_thumbnail(new_path)
# print(f"Renamed 'image.png' to '{new_filename}'")
# return new_filename
# else:
# print("No image.png found.")
# return None
# def create_prompt_on_openwebui(prompt: str) -> str:
# """Sends prompt to OpenWebui and returns the generated response."""
# # Unique list of recent prompts
# recent_prompts = list(set(load_recent_prompts()))
# # Decide on whether to include a topic (e.g., 30% chance to include)
# topics = [t.strip() for t in user_config["comfyui"]["topics"].split(",") if t.strip()]
# topic_instruction = ""
# if random.random() < 0.3 and topics:
# selected_topic = random.choice(topics)
# topic_instruction = f" Incorporate the theme of '{selected_topic}' into the new prompt."
# user_content = (
# "Here are the prompts from the last 7 days:\n\n"
# + "\n".join(f"{i+1}. {p}" for i, p in enumerate(recent_prompts))
# + "\n\nDo not repeat ideas, themes, or settings from the above. "
# "Now generate a new, completely original Stable Diffusion prompt that hasn't been done yet."
# + topic_instruction
# )
# model = random.choice(user_config["openwebui"]["models"].split(","))
# response = litellm.completion(
# api_base=user_config["openwebui"]["base_url"],
# model="openai/" + model,
# messages=[
# {
# "role": "system",
# "content": (
# "You are a prompt generator for Stable Diffusion. "
# "Generate a detailed and imaginative prompt with a strong visual theme. "
# "Focus on lighting, atmosphere, and artistic style. "
# "Keep the prompt concise, no extra commentary or formatting."
# ),
# },
# {
# "role": "user",
# "content": user_content,
# },
# ],
# api_key=user_config["openwebui"]["api_key"],
# )
# prompt = response["choices"][0]["message"]["content"].strip('"')
# # response = litellm.completion(
# # api_base=user_config["openwebui"]["base_url"],
# # model="openai/brxce/stable-diffusion-prompt-generator:latest",
# # messages=[
# # {
# # "role": "user",
# # "content": prompt,
# # },
# # ],
# # api_key=user_config["openwebui"]["api_key"],
# # )
# # prompt = response["choices"][0]["message"]["content"].strip('"')
# logging.debug(prompt)
# return prompt
# # Define the retry logic using Tenacity
# @retry(
# stop=stop_after_attempt(3),
# wait=wait_fixed(5),
# before=before_log(logging.getLogger(), logging.DEBUG),
# retry=retry_if_exception_type(Exception),
# )
# def generate_image(
# file_name: str,
# comfy_prompt: str,
# workflow_path: str = "./workflow_api.json",
# prompt_node: str = "CLIP Text Encode (Prompt)",
# seed_node: str = "KSampler",
# seed_param: str = "seed",
# save_node: str = "Save Image",
# save_param: str = "filename_prefix",
# model_node: Optional[str] = "Load Checkpoint",
# model_param: Optional[str] = "ckpt_name",
# ) -> None:
# """Generates an image using the Comfy API with configurable workflow settings."""
# try:
# api = ComfyApiWrapper(user_config["comfyui"]["comfyui_url"])
# wf = ComfyWorkflowWrapper(workflow_path)
# # Set workflow parameters
# wf.set_node_param(seed_node, seed_param, random.getrandbits(32))
# wf.set_node_param(prompt_node, "text", comfy_prompt)
# wf.set_node_param(save_node, save_param, file_name)
# wf.set_node_param(
# (
# "Empty Latent Image"
# if workflow_path.endswith("workflow_api.json")
# else "CR Aspect Ratio"
# ),
# "width",
# user_config["comfyui"]["width"],
# )
# wf.set_node_param(
# (
# "Empty Latent Image"
# if workflow_path.endswith("workflow_api.json")
# else "CR Aspect Ratio"
# ),
# "height",
# user_config["comfyui"]["height"],
# )
# # Conditionally set model if node and param are provided
# if model_node and model_param:
# if user_config["comfyui"].get("FLUX"):
# valid_models = user_config["comfyui:flux"]["models"].split(",")
# else:
# available_model_list = user_config["comfyui"]["models"].split(",")
# valid_models = list(
# set(get_available_models()) & set(available_model_list)
# )
# if not valid_models:
# raise Exception("No valid models available.")
# model = random.choice(valid_models)
# wf.set_node_param(model_node, model_param, model)
# # Generate image
# logging.debug(f"Generating image: {file_name}")
# results = api.queue_and_wait_images(wf, save_node)
# rename_image()
# for _, image_data in results.items():
# output_path = os.path.join(
# user_config["comfyui"]["output_dir"], f"{file_name}.png"
# )
# with open(output_path, "wb+") as f:
# f.write(image_data)
# logging.debug(f"Image generated successfully for UID: {file_name}")
# except Exception as e:
# logging.error(f"Failed to generate image for UID: {file_name}. Error: {e}")
# raise
# def create_image(prompt: str | None = None) -> None:
# """Main function for generating images."""
# if prompt is None:
# prompt = create_prompt_on_openwebui(user_config["comfyui"]["prompt"])
# if not prompt:
# logging.error("No prompt generated.")
# return
# save_prompt(prompt)
# use_flux = user_config["comfyui"].get("USE_FLUX", False)
# only_flux = user_config["comfyui"].get("ONLY_FLUX", False)
# selected_workflow = "SDXL"
# if use_flux:
# selected_workflow = "FLUX" if only_flux else random.choice(["FLUX", "SDXL"])
# if selected_workflow == "FLUX":
# # generate_image(
# # file_name="image",
# # comfy_prompt=prompt,
# # workflow_path="./FLUX.json",
# # prompt_node="Positive Prompt T5",
# # seed_node="Seed",
# # seed_param="seed",
# # save_node="CivitAI Image Saver",
# # save_param="filename",
# # model_node="CivitAI Image Saver",
# # model_param="modelname",
# # )
# print("flux")
# else:
# print("sdxl")
# # generate_image("image", prompt)
# logging.info(f"{selected_workflow} generation started with prompt: {prompt}")
# def get_prompt_from_png(path):
# try:
# with Image.open(path) as img:
# try:
# # Flux workflow
# meta = json.loads(img.info["prompt"])['44']['inputs']['text']
# except KeyError:
# # SDXL workflow
# meta = json.loads(img.info["prompt"])['6']['inputs']['text']
# return meta or ""
# except Exception as e:
# print(f"Error reading metadata from {path}: {e}")
# return ""
# user_config = load_config()
# output_folder = user_config["comfyui"]["output_dir"]

View File

@ -63,7 +63,7 @@ def cancel_current_job() -> list:
def generate_image(
file_name: str,
comfy_prompt: str,
workflow_path: str = "./workflow_api.json",
workflow_path: str = "./workflow_sdxl.json",
prompt_node: str = "CLIP Text Encode (Prompt)",
seed_node: str = "KSampler",
seed_param: str = "seed",
@ -84,7 +84,7 @@ def generate_image(
wf.set_node_param(
(
"Empty Latent Image"
if workflow_path.endswith("workflow_api.json")
if workflow_path.endswith("workflow_sdxl.json")
else "CR Aspect Ratio"
),
"width",
@ -93,7 +93,7 @@ def generate_image(
wf.set_node_param(
(
"Empty Latent Image"
if workflow_path.endswith("workflow_api.json")
if workflow_path.endswith("workflow_sdxl.json")
else "CR Aspect Ratio"
),
"height",
@ -157,7 +157,7 @@ def create_image(prompt: str | None = None) -> None:
generate_image(
file_name="image",
comfy_prompt=prompt,
workflow_path="./FLUX.json",
workflow_path="./workflow_flux.json",
prompt_node="Positive Prompt T5",
seed_node="Seed",
seed_param="seed",

View File

@ -63,16 +63,20 @@ def rename_image() -> str | None:
return None
def get_prompt_from_png(path):
def get_details_from_png(path):
try:
with Image.open(path) as img:
try:
# Flux workflow
meta = json.loads(img.info["prompt"])['44']['inputs']['text']
data = json.loads(img.info["prompt"])
prompt = data['44']['inputs']['text']
model = data['35']['inputs']['unet_name'].split(".")[0]
except KeyError:
# SDXL workflow
meta = json.loads(img.info["prompt"])['6']['inputs']['text']
return meta or ""
data = json.loads(img.info["prompt"])
prompt = data['6']['inputs']['text']
model = data['4']['inputs']['ckpt_name']
return {"p":prompt,"m":model} or {"p":"","m":""}
except Exception as e:
print(f"Error reading metadata from {path}: {e}")
return ""

View File

@ -131,7 +131,6 @@
<h1>Image Archive</h1>
<div class="gallery">
{% for image in images %}
<!-- <img src="{{ url_for('images', filename=image.thumbnail_filename) }}" alt="Image" loading="lazy" onclick="openLightbox({{ loop.index0 }})"> -->
<img src="{{ url_for('images', filename='thumbnails/' + image.filename) }}"
data-fullsrc="{{ url_for('images', filename=image.filename) }}" onclick="openLightbox({{ loop.index0 }})">
@ -156,7 +155,9 @@
{% for image in images %}
{
src: "{{ url_for('images', filename=image.filename) }}",
prompt: `{{ image.prompt | escape }}`
prompt: `{{ image.prompt | escape }}`,
model: `{{ image.model | escape }}`
},
{% endfor %}
];
@ -166,7 +167,7 @@
currentIndex = index;
<!-- document.getElementById("lightbox-img").src = images[currentIndex].src; -->
document.getElementById("lightbox-img").src = document.querySelectorAll('.gallery img')[currentIndex].dataset.fullsrc;
document.getElementById("lightbox-prompt").textContent = images[currentIndex].prompt;
document.getElementById("lightbox-prompt").textContent = "Model:" + images[currentIndex].model + "\n\n" + images[currentIndex].prompt;
document.getElementById("lightbox").style.display = "flex";
}