From 6aeeb74e8fbb351a6130bd0d9189ab8b44065457 Mon Sep 17 00:00:00 2001 From: Karl Date: Tue, 23 Sep 2025 16:00:27 +0100 Subject: [PATCH] text chunker --- lib/text_chunker.py | 78 ++++++++++++++----------- test_text_chunker.py | 133 +++++++++++++++++-------------------------- 2 files changed, 97 insertions(+), 114 deletions(-) diff --git a/lib/text_chunker.py b/lib/text_chunker.py index 0797aa3..fd27470 100644 --- a/lib/text_chunker.py +++ b/lib/text_chunker.py @@ -8,12 +8,12 @@ class CLIPTextChunker: Using a conservative limit of 70 tokens to account for special tokens. """ - def __init__(self, max_tokens: int = 25): + def __init__(self, max_tokens: int = 70): """ Initialize the text chunker. Args: - max_tokens (int): Maximum number of tokens per chunk (default: 25 for CLIP, being ultra conservative) + max_tokens (int): Maximum number of tokens per chunk (default: 70 for CLIP, leaving buffer for special tokens) """ self.max_tokens = max_tokens self._tokenizer = None @@ -43,8 +43,9 @@ class CLIPTextChunker: """ if self.tokenizer is None: # Fallback to character count if tokenizer not available - # Use an ultra conservative estimate: ~0.3 characters per token for CLIP - return int(len(text) * 0.3) + # CLIP tokenization is roughly 0.25-0.3 characters per token on average + # Use 0.25 for a more conservative estimate to avoid exceeding limits + return int(len(text) * 0.25) tokens = self.tokenizer( text, @@ -70,33 +71,45 @@ class CLIPTextChunker: if not text.strip(): return [] + # If text already fits within the limit, return as-is if self.get_token_count(text) <= self.max_tokens: return [text] chunks = [] - words = text.split() - current_chunk = [] - current_tokens = 0 + sentences = re.split(r'(?<=[.!?])\s+', text) if preserve_sentences else text.split() - for word in words: - word_with_space = word + " " + for sentence in sentences: + sentence = sentence.strip() + if not sentence: + continue - # Check if adding this word would exceed the limit - test_chunk = " ".join(current_chunk + [word]) - test_tokens = self.get_token_count(test_chunk) + # If a single sentence is too long, we need to break it down further + if self.get_token_count(sentence) > self.max_tokens: + # Break sentence into smaller chunks + words = sentence.split() + current_chunk = [] - if test_tokens > self.max_tokens and current_chunk: - # Current chunk is complete, add it - chunks.append(" ".join(current_chunk)) - current_chunk = [word] - current_tokens = self.get_token_count(word) + for word in words: + # Test if adding this word would exceed the limit + test_chunk = " ".join(current_chunk + [word]) + if self.get_token_count(test_chunk) <= self.max_tokens: + current_chunk.append(word) + else: + # Current chunk is full, save it + if current_chunk: + chunks.append(" ".join(current_chunk)) + # Start new chunk with current word + current_chunk = [word] + + # Add the last chunk + if current_chunk: + chunks.append(" ".join(current_chunk)) else: - current_chunk.append(word) - current_tokens = test_tokens - - # Add the last chunk if it exists - if current_chunk: - chunks.append(" ".join(current_chunk)) + # Check if adding this sentence to the last chunk would exceed the limit + if chunks and self.get_token_count(chunks[-1] + " " + sentence) <= self.max_tokens: + chunks[-1] += " " + sentence + else: + chunks.append(sentence) return chunks @@ -135,34 +148,31 @@ class CLIPTextChunker: if last_space > 0: candidate_chunk = candidate_chunk[:last_space] - # Use the basic chunking to ensure proper word boundaries + # Check if this candidate chunk fits within token limits if self.get_token_count(candidate_chunk) <= self.max_tokens: - # Use chunk_text to get a properly bounded chunk - temp_chunks = self.chunk_text(candidate_chunk) - if temp_chunks: - first_chunk = temp_chunks[0] - remaining_text = text[len(first_chunk):] - break + first_chunk = candidate_chunk + remaining_text = text[len(first_chunk):].strip() + break # If we found a good first chunk, use it if first_chunk and self.get_token_count(first_chunk) <= self.max_tokens: chunks = [first_chunk] # Add remaining text as additional chunks if needed - if remaining_text.strip(): + if remaining_text: chunks.extend(self.chunk_text(remaining_text)) return chunks # Fallback to regular chunking return self.chunk_text(text) -def chunk_prompt_for_clip(prompt: str, max_tokens: int = 25) -> List[str]: +def chunk_prompt_for_clip(prompt: str, max_tokens: int = 70) -> List[str]: """ Convenience function to chunk a prompt for CLIP processing. - Uses a conservative 25 token limit to be safe. + Uses a 70 token limit to be safe while allowing meaningful prompts. Args: prompt (str): The prompt to chunk - max_tokens (int): Maximum tokens per chunk (default: 25 for safety) + max_tokens (int): Maximum tokens per chunk (default: 70 for CLIP compatibility) Returns: List[str]: List of prompt chunks diff --git a/test_text_chunker.py b/test_text_chunker.py index fb8f7a1..3a453f7 100644 --- a/test_text_chunker.py +++ b/test_text_chunker.py @@ -1,110 +1,83 @@ #!/usr/bin/env python3 """ -Test script for the CLIP text chunking functionality. +Test script to verify that the text chunker fixes the token sequence length issues. """ import sys import os +sys.path.append(os.path.dirname(os.path.abspath(__file__))) -# Add the lib directory to the path so we can import our modules -sys.path.append(os.path.join(os.path.dirname(__file__), 'lib')) +from lib.text_chunker import chunk_prompt_for_clip, CLIPTextChunker -from text_chunker import CLIPTextChunker, chunk_prompt_for_clip +def test_long_prompt_chunking(): + """Test that long prompts are properly chunked within CLIP token limits.""" -def test_basic_chunking(): - """Test basic text chunking functionality.""" - print("=== Testing Basic Text Chunking ===") + # Create a sample long prompt similar to what the app generates + test_prompt = "Ultra-realistic close-up headshot of a Medium Brown skinned male soccer player with a plain background looking at the camera with his whole head in shot. The player is twenty-five years old, from United Kingdom, with clean-shaven and Medium Length Brown curly hair. He is facing the camera with a confident expression, wearing a soccer jersey. The lighting is natural and soft, emphasizing facial features and skin texture" - chunker = CLIPTextChunker(max_tokens=60) # Using conservative limit + print(f"Original prompt length: {len(test_prompt)} characters") + print(f"Original prompt: {test_prompt}") + print("-" * 80) - # Test short text (should not be chunked) - short_text = "A simple prompt" - chunks = chunker.chunk_text(short_text) - print(f"Short text: '{short_text}' -> {len(chunks)} chunks") - assert len(chunks) == 1, f"Expected 1 chunk, got {len(chunks)}" + # Test the chunking + chunker = CLIPTextChunker(max_tokens=70) + chunks = chunk_prompt_for_clip(test_prompt) - # Test long text (should be chunked) - long_text = "This is a very long text that should definitely exceed the token limit when processed by CLIP. " * 10 - chunks = chunker.chunk_text(long_text) - print(f"Long text -> {len(chunks)} chunks") - assert len(chunks) > 1, f"Expected multiple chunks, got {len(chunks)}" - - # Verify each chunk is within token limit - for i, chunk in enumerate(chunks): - token_count = chunker.estimate_token_count(chunk) - print(f"Chunk {i+1}: {token_count} tokens (max: {chunker.max_tokens})") - assert token_count <= chunker.max_tokens, f"Chunk {i+1} exceeds token limit: {token_count} > {chunker.max_tokens}" - - print("✓ Basic chunking test passed\n") - -def test_prompt_chunking(): - """Test chunking with actual prompts similar to the app.""" - print("=== Testing Prompt Chunking ===") - - # Simulate a long prompt like the one from app_config.json - long_prompt = "Ultra-realistic close-up headshot of a Fair skinned male soccer player with a plain background looking at the camera with his whole head in shot. The player is twenty-five years old, from United Kingdom, with clean-shaven and curly hair. He is facing the camera with a confident expression, wearing a soccer jersey. The lighting is natural and soft, emphasizing facial features and skin texture" - - chunks = chunk_prompt_for_clip(long_prompt) - print(f"Long prompt -> {len(chunks)} chunks") + print(f"Number of chunks: {len(chunks)}") for i, chunk in enumerate(chunks): - print(f"Chunk {i+1}: {chunk[:100]}...") + token_count = chunker.get_token_count(chunk) + print(f"\nChunk {i+1}:") + print(f" Text: {chunk}") + print(f" Token count: {token_count}") + print(f" Character count: {len(chunk)}") - print("✓ Prompt chunking test passed\n") + if token_count > 77: + print(f" ❌ ERROR: Chunk {i+1} exceeds CLIP's 77 token limit!") + return False + elif token_count > 70: + print(f" ⚠️ WARNING: Chunk {i+1} is close to the 77 token limit") + else: + print(f" ✅ Chunk {i+1} is within safe limits") -def test_priority_chunking(): - """Test priority-based chunking.""" - print("=== Testing Priority Chunking ===") - - chunker = CLIPTextChunker(max_tokens=50) # Smaller limit for testing - - text = "This is a long text with important information about soccer players and their characteristics. The most important part is that they are professional athletes." - - essential_info = ["soccer players", "professional athletes", "important information"] - - chunks = chunker.create_priority_chunks(text, essential_info) - print(f"Priority chunks -> {len(chunks)} chunks") - - for i, chunk in enumerate(chunks): - print(f"Priority chunk {i+1}: {chunk}") - - print("✓ Priority chunking test passed\n") + print("-" * 80) + print("✅ All chunks are within CLIP's token limits!") + return True def test_edge_cases(): - """Test edge cases.""" - print("=== Testing Edge Cases ===") + """Test edge cases for the chunking functionality.""" - chunker = CLIPTextChunker(max_tokens=60) + chunker = CLIPTextChunker(max_tokens=70) - # Test empty text + # Test empty string chunks = chunker.chunk_text("") - assert len(chunks) == 0, "Empty text should return no chunks" + assert chunks == [], "Empty string should return empty list" - # Test text exactly at limit - exact_text = "A" * 60 # Text exactly at the character limit - chunks = chunker.chunk_text(exact_text) - # Should return the text as-is since it's exactly at the limit - assert len(chunks) == 1, f"Expected 1 chunk for text at limit, got {len(chunks)}" - assert chunks[0] == exact_text, "Text at limit should be returned unchanged" + # Test short string + short_text = "Hello world" + chunks = chunker.chunk_text(short_text) + assert len(chunks) == 1 and chunks[0] == short_text, "Short text should not be chunked" - # Test text that exceeds limit (with spaces so it can be split) - long_text = "This is a very long text that should definitely exceed the character limit when processed. " * 3 # Text that exceeds the limit - chunks = chunker.chunk_text(long_text) - assert len(chunks) > 1, f"Expected multiple chunks for long text, got {len(chunks)}" + # Test very long single word (edge case) + long_word = "a" * 200 + chunks = chunker.chunk_text(long_word) + # Should handle this gracefully for chunk in chunks: - assert chunker.estimate_token_count(chunk) <= chunker.max_tokens, f"Chunk exceeds limit: {len(chunk)} > {chunker.max_tokens}" + assert chunker.get_token_count(chunk) <= 70, "Long word chunks should respect token limit" - print("✓ Edge cases test passed\n") + print("✅ Edge case tests passed!") + return True if __name__ == "__main__": - try: - test_basic_chunking() - test_prompt_chunking() - test_priority_chunking() - test_edge_cases() + print("Testing text chunker fixes...") + print("=" * 80) - print("🎉 All tests passed! Text chunking functionality is working correctly.") + success1 = test_long_prompt_chunking() + success2 = test_edge_cases() - except Exception as e: - print(f"❌ Test failed: {e}") + if success1 and success2: + print("\n🎉 All tests passed! The token sequence length issue should be fixed.") + sys.exit(0) + else: + print("\n❌ Some tests failed. The issue may not be fully resolved.") sys.exit(1) \ No newline at end of file