text chunker

This commit is contained in:
Karl 2025-09-23 16:00:27 +01:00
parent 2172d7da7f
commit 6aeeb74e8f
2 changed files with 97 additions and 114 deletions

View File

@ -8,12 +8,12 @@ class CLIPTextChunker:
Using a conservative limit of 70 tokens to account for special tokens. Using a conservative limit of 70 tokens to account for special tokens.
""" """
def __init__(self, max_tokens: int = 25): def __init__(self, max_tokens: int = 70):
""" """
Initialize the text chunker. Initialize the text chunker.
Args: Args:
max_tokens (int): Maximum number of tokens per chunk (default: 25 for CLIP, being ultra conservative) max_tokens (int): Maximum number of tokens per chunk (default: 70 for CLIP, leaving buffer for special tokens)
""" """
self.max_tokens = max_tokens self.max_tokens = max_tokens
self._tokenizer = None self._tokenizer = None
@ -43,8 +43,9 @@ class CLIPTextChunker:
""" """
if self.tokenizer is None: if self.tokenizer is None:
# Fallback to character count if tokenizer not available # Fallback to character count if tokenizer not available
# Use an ultra conservative estimate: ~0.3 characters per token for CLIP # CLIP tokenization is roughly 0.25-0.3 characters per token on average
return int(len(text) * 0.3) # Use 0.25 for a more conservative estimate to avoid exceeding limits
return int(len(text) * 0.25)
tokens = self.tokenizer( tokens = self.tokenizer(
text, text,
@ -70,33 +71,45 @@ class CLIPTextChunker:
if not text.strip(): if not text.strip():
return [] return []
# If text already fits within the limit, return as-is
if self.get_token_count(text) <= self.max_tokens: if self.get_token_count(text) <= self.max_tokens:
return [text] return [text]
chunks = [] chunks = []
words = text.split() sentences = re.split(r'(?<=[.!?])\s+', text) if preserve_sentences else text.split()
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
# If a single sentence is too long, we need to break it down further
if self.get_token_count(sentence) > self.max_tokens:
# Break sentence into smaller chunks
words = sentence.split()
current_chunk = [] current_chunk = []
current_tokens = 0
for word in words: for word in words:
word_with_space = word + " " # Test if adding this word would exceed the limit
# Check if adding this word would exceed the limit
test_chunk = " ".join(current_chunk + [word]) test_chunk = " ".join(current_chunk + [word])
test_tokens = self.get_token_count(test_chunk) if self.get_token_count(test_chunk) <= self.max_tokens:
if test_tokens > self.max_tokens and current_chunk:
# Current chunk is complete, add it
chunks.append(" ".join(current_chunk))
current_chunk = [word]
current_tokens = self.get_token_count(word)
else:
current_chunk.append(word) current_chunk.append(word)
current_tokens = test_tokens else:
# Current chunk is full, save it
# Add the last chunk if it exists
if current_chunk: if current_chunk:
chunks.append(" ".join(current_chunk)) chunks.append(" ".join(current_chunk))
# Start new chunk with current word
current_chunk = [word]
# Add the last chunk
if current_chunk:
chunks.append(" ".join(current_chunk))
else:
# Check if adding this sentence to the last chunk would exceed the limit
if chunks and self.get_token_count(chunks[-1] + " " + sentence) <= self.max_tokens:
chunks[-1] += " " + sentence
else:
chunks.append(sentence)
return chunks return chunks
@ -135,34 +148,31 @@ class CLIPTextChunker:
if last_space > 0: if last_space > 0:
candidate_chunk = candidate_chunk[:last_space] candidate_chunk = candidate_chunk[:last_space]
# Use the basic chunking to ensure proper word boundaries # Check if this candidate chunk fits within token limits
if self.get_token_count(candidate_chunk) <= self.max_tokens: if self.get_token_count(candidate_chunk) <= self.max_tokens:
# Use chunk_text to get a properly bounded chunk first_chunk = candidate_chunk
temp_chunks = self.chunk_text(candidate_chunk) remaining_text = text[len(first_chunk):].strip()
if temp_chunks:
first_chunk = temp_chunks[0]
remaining_text = text[len(first_chunk):]
break break
# If we found a good first chunk, use it # If we found a good first chunk, use it
if first_chunk and self.get_token_count(first_chunk) <= self.max_tokens: if first_chunk and self.get_token_count(first_chunk) <= self.max_tokens:
chunks = [first_chunk] chunks = [first_chunk]
# Add remaining text as additional chunks if needed # Add remaining text as additional chunks if needed
if remaining_text.strip(): if remaining_text:
chunks.extend(self.chunk_text(remaining_text)) chunks.extend(self.chunk_text(remaining_text))
return chunks return chunks
# Fallback to regular chunking # Fallback to regular chunking
return self.chunk_text(text) return self.chunk_text(text)
def chunk_prompt_for_clip(prompt: str, max_tokens: int = 25) -> List[str]: def chunk_prompt_for_clip(prompt: str, max_tokens: int = 70) -> List[str]:
""" """
Convenience function to chunk a prompt for CLIP processing. Convenience function to chunk a prompt for CLIP processing.
Uses a conservative 25 token limit to be safe. Uses a 70 token limit to be safe while allowing meaningful prompts.
Args: Args:
prompt (str): The prompt to chunk prompt (str): The prompt to chunk
max_tokens (int): Maximum tokens per chunk (default: 25 for safety) max_tokens (int): Maximum tokens per chunk (default: 70 for CLIP compatibility)
Returns: Returns:
List[str]: List of prompt chunks List[str]: List of prompt chunks

View File

@ -1,110 +1,83 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Test script for the CLIP text chunking functionality. Test script to verify that the text chunker fixes the token sequence length issues.
""" """
import sys import sys
import os import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# Add the lib directory to the path so we can import our modules from lib.text_chunker import chunk_prompt_for_clip, CLIPTextChunker
sys.path.append(os.path.join(os.path.dirname(__file__), 'lib'))
from text_chunker import CLIPTextChunker, chunk_prompt_for_clip def test_long_prompt_chunking():
"""Test that long prompts are properly chunked within CLIP token limits."""
def test_basic_chunking(): # Create a sample long prompt similar to what the app generates
"""Test basic text chunking functionality.""" test_prompt = "Ultra-realistic close-up headshot of a Medium Brown skinned male soccer player with a plain background looking at the camera with his whole head in shot. The player is twenty-five years old, from United Kingdom, with clean-shaven and Medium Length Brown curly hair. He is facing the camera with a confident expression, wearing a soccer jersey. The lighting is natural and soft, emphasizing facial features and skin texture"
print("=== Testing Basic Text Chunking ===")
chunker = CLIPTextChunker(max_tokens=60) # Using conservative limit print(f"Original prompt length: {len(test_prompt)} characters")
print(f"Original prompt: {test_prompt}")
print("-" * 80)
# Test short text (should not be chunked) # Test the chunking
short_text = "A simple prompt" chunker = CLIPTextChunker(max_tokens=70)
chunks = chunker.chunk_text(short_text) chunks = chunk_prompt_for_clip(test_prompt)
print(f"Short text: '{short_text}' -> {len(chunks)} chunks")
assert len(chunks) == 1, f"Expected 1 chunk, got {len(chunks)}"
# Test long text (should be chunked) print(f"Number of chunks: {len(chunks)}")
long_text = "This is a very long text that should definitely exceed the token limit when processed by CLIP. " * 10
chunks = chunker.chunk_text(long_text)
print(f"Long text -> {len(chunks)} chunks")
assert len(chunks) > 1, f"Expected multiple chunks, got {len(chunks)}"
# Verify each chunk is within token limit
for i, chunk in enumerate(chunks):
token_count = chunker.estimate_token_count(chunk)
print(f"Chunk {i+1}: {token_count} tokens (max: {chunker.max_tokens})")
assert token_count <= chunker.max_tokens, f"Chunk {i+1} exceeds token limit: {token_count} > {chunker.max_tokens}"
print("✓ Basic chunking test passed\n")
def test_prompt_chunking():
"""Test chunking with actual prompts similar to the app."""
print("=== Testing Prompt Chunking ===")
# Simulate a long prompt like the one from app_config.json
long_prompt = "Ultra-realistic close-up headshot of a Fair skinned male soccer player with a plain background looking at the camera with his whole head in shot. The player is twenty-five years old, from United Kingdom, with clean-shaven and curly hair. He is facing the camera with a confident expression, wearing a soccer jersey. The lighting is natural and soft, emphasizing facial features and skin texture"
chunks = chunk_prompt_for_clip(long_prompt)
print(f"Long prompt -> {len(chunks)} chunks")
for i, chunk in enumerate(chunks): for i, chunk in enumerate(chunks):
print(f"Chunk {i+1}: {chunk[:100]}...") token_count = chunker.get_token_count(chunk)
print(f"\nChunk {i+1}:")
print(f" Text: {chunk}")
print(f" Token count: {token_count}")
print(f" Character count: {len(chunk)}")
print("✓ Prompt chunking test passed\n") if token_count > 77:
print(f" ❌ ERROR: Chunk {i+1} exceeds CLIP's 77 token limit!")
return False
elif token_count > 70:
print(f" ⚠️ WARNING: Chunk {i+1} is close to the 77 token limit")
else:
print(f" ✅ Chunk {i+1} is within safe limits")
def test_priority_chunking(): print("-" * 80)
"""Test priority-based chunking.""" print("✅ All chunks are within CLIP's token limits!")
print("=== Testing Priority Chunking ===") return True
chunker = CLIPTextChunker(max_tokens=50) # Smaller limit for testing
text = "This is a long text with important information about soccer players and their characteristics. The most important part is that they are professional athletes."
essential_info = ["soccer players", "professional athletes", "important information"]
chunks = chunker.create_priority_chunks(text, essential_info)
print(f"Priority chunks -> {len(chunks)} chunks")
for i, chunk in enumerate(chunks):
print(f"Priority chunk {i+1}: {chunk}")
print("✓ Priority chunking test passed\n")
def test_edge_cases(): def test_edge_cases():
"""Test edge cases.""" """Test edge cases for the chunking functionality."""
print("=== Testing Edge Cases ===")
chunker = CLIPTextChunker(max_tokens=60) chunker = CLIPTextChunker(max_tokens=70)
# Test empty text # Test empty string
chunks = chunker.chunk_text("") chunks = chunker.chunk_text("")
assert len(chunks) == 0, "Empty text should return no chunks" assert chunks == [], "Empty string should return empty list"
# Test text exactly at limit # Test short string
exact_text = "A" * 60 # Text exactly at the character limit short_text = "Hello world"
chunks = chunker.chunk_text(exact_text) chunks = chunker.chunk_text(short_text)
# Should return the text as-is since it's exactly at the limit assert len(chunks) == 1 and chunks[0] == short_text, "Short text should not be chunked"
assert len(chunks) == 1, f"Expected 1 chunk for text at limit, got {len(chunks)}"
assert chunks[0] == exact_text, "Text at limit should be returned unchanged"
# Test text that exceeds limit (with spaces so it can be split) # Test very long single word (edge case)
long_text = "This is a very long text that should definitely exceed the character limit when processed. " * 3 # Text that exceeds the limit long_word = "a" * 200
chunks = chunker.chunk_text(long_text) chunks = chunker.chunk_text(long_word)
assert len(chunks) > 1, f"Expected multiple chunks for long text, got {len(chunks)}" # Should handle this gracefully
for chunk in chunks: for chunk in chunks:
assert chunker.estimate_token_count(chunk) <= chunker.max_tokens, f"Chunk exceeds limit: {len(chunk)} > {chunker.max_tokens}" assert chunker.get_token_count(chunk) <= 70, "Long word chunks should respect token limit"
print("✓ Edge cases test passed\n") print("✅ Edge case tests passed!")
return True
if __name__ == "__main__": if __name__ == "__main__":
try: print("Testing text chunker fixes...")
test_basic_chunking() print("=" * 80)
test_prompt_chunking()
test_priority_chunking()
test_edge_cases()
print("🎉 All tests passed! Text chunking functionality is working correctly.") success1 = test_long_prompt_chunking()
success2 = test_edge_cases()
except Exception as e: if success1 and success2:
print(f"❌ Test failed: {e}") print("\n🎉 All tests passed! The token sequence length issue should be fixed.")
sys.exit(0)
else:
print("\n❌ Some tests failed. The issue may not be fully resolved.")
sys.exit(1) sys.exit(1)