import re from typing import List, Optional class CLIPTextChunker: """ Utility class for chunking text to fit within CLIP's token limits. CLIP models typically have a maximum sequence length of 77 tokens. Using a conservative limit of 70 tokens to account for special tokens. """ def __init__(self, max_tokens: int = 60): """ Initialize the text chunker. Args: max_tokens (int): Maximum number of tokens per chunk (default: 60 for CLIP, being extra conservative) """ self.max_tokens = max_tokens self._tokenizer = None @property def tokenizer(self): """Lazy load CLIP tokenizer""" if self._tokenizer is None: try: from transformers import CLIPTokenizer # Use a simpler model that should be more reliable self._tokenizer = CLIPTokenizer.from_pretrained("openai/clip-vit-base-patch32", local_files_only=False) except Exception as e: # Fallback to character-based estimation if transformers not available self._tokenizer = None return self._tokenizer def get_token_count(self, text: str) -> int: """ Get the actual token count for a text string using CLIP tokenizer. Args: text (str): Input text Returns: int: Actual token count """ if self.tokenizer is None: # Fallback to character count if tokenizer not available # CLIP tokenization is roughly 0.25-0.3 characters per token on average # Use 0.2 for an ultra-conservative estimate to ensure we never exceed limits return int(len(text) * 0.2) tokens = self.tokenizer( text, padding=False, truncation=False, return_tensors=None ) return len(tokens["input_ids"]) def chunk_text(self, text: str, preserve_sentences: bool = True) -> List[str]: """ Chunk text into smaller pieces that fit within the token limit. Uses actual CLIP tokenization for accuracy. Args: text (str): Input text to chunk preserve_sentences (bool): Whether to try to preserve sentence boundaries Returns: List[str]: List of text chunks """ if not text.strip(): return [] # If text already fits within the limit, return as-is if self.get_token_count(text) <= self.max_tokens: return [text] chunks = [] sentences = re.split(r'(?<=[.!?])\s+', text) if preserve_sentences else text.split() for sentence in sentences: sentence = sentence.strip() if not sentence: continue # If a single sentence is too long, we need to break it down further if self.get_token_count(sentence) > self.max_tokens: # Break sentence into smaller chunks words = sentence.split() current_chunk = [] for word in words: # Test if adding this word would exceed the limit test_chunk = " ".join(current_chunk + [word]) if self.get_token_count(test_chunk) <= self.max_tokens: current_chunk.append(word) else: # Current chunk is full, save it if current_chunk: chunks.append(" ".join(current_chunk)) # Start new chunk with current word current_chunk = [word] # Add the last chunk if current_chunk: chunks.append(" ".join(current_chunk)) else: # Check if adding this sentence to the last chunk would exceed the limit if chunks and self.get_token_count(chunks[-1] + " " + sentence) <= self.max_tokens: chunks[-1] += " " + sentence else: chunks.append(sentence) return chunks def create_priority_chunks(self, text: str, essential_info: List[str]) -> List[str]: """ Create chunks with priority given to essential information. Args: text (str): Full text to chunk essential_info (List[str]): List of essential phrases that should be preserved Returns: List[str]: List of prioritized chunks """ # If text fits within limits, return as-is if self.get_token_count(text) <= self.max_tokens: return [text] # Find the most important essential information at the beginning # Look for key phrases that should be preserved first_chunk = "" remaining_text = text # Try to find essential info near the beginning for info in essential_info: if info in text: info_index = text.find(info) # If the essential info is near the beginning, include it if info_index < 100: # Within first 100 characters # Take from start up to and including the essential info end_pos = min(len(text), info_index + len(info) + 30) # Include some context after candidate_chunk = text[:end_pos] # Ensure the candidate chunk ends at a word boundary last_space = candidate_chunk.rfind(" ") if last_space > 0: candidate_chunk = candidate_chunk[:last_space] # Check if this candidate chunk fits within token limits if self.get_token_count(candidate_chunk) <= self.max_tokens: first_chunk = candidate_chunk remaining_text = text[len(first_chunk):].strip() break # If we found a good first chunk, use it if first_chunk and self.get_token_count(first_chunk) <= self.max_tokens: chunks = [first_chunk] # Add remaining text as additional chunks if needed if remaining_text: chunks.extend(self.chunk_text(remaining_text)) return chunks # Fallback to regular chunking return self.chunk_text(text) def chunk_prompt_for_clip(prompt: str, max_tokens: int = 60) -> List[str]: """ Convenience function to chunk a prompt for CLIP processing. Uses a 60 token limit to be extra safe for any CLIP model. Args: prompt (str): The prompt to chunk max_tokens (int): Maximum tokens per chunk (default: 60 for maximum CLIP compatibility) Returns: List[str]: List of prompt chunks """ chunker = CLIPTextChunker(max_tokens=max_tokens) # Define essential information that should be preserved (matching actual prompt format) essential_info = [ "Ultra realistic headshot", "male soccer player", "looking at the camera", "facing the camera", "Olive skinned", "transparent background" ] return chunker.create_priority_chunks(prompt, essential_info)