ALI7ADEL's picture
Upload 51 files
ed147e2 verified
"""
Transcript segmentation module.
Splits long transcripts into logical sections for better processing.
"""
import re
from typing import List, Dict
from src.utils.logger import setup_logger
logger = setup_logger(__name__)
class TranscriptSegmenter:
"""Handles intelligent segmentation of transcripts."""
# Common filler words to remove
FILLER_WORDS = {
'um', 'uh', 'like', 'you know', 'i mean', 'sort of', 'kind of',
'basically', 'actually', 'literally', 'right', 'okay', 'so yeah'
}
def __init__(self, max_segment_words: int = 500):
"""
Initialize the segmenter.
Args:
max_segment_words: Maximum words per segment
"""
self.max_segment_words = max_segment_words
def clean_text(self, text: str) -> str:
"""
Clean transcript by removing filler words and normalizing.
Args:
text: Raw transcript text
Returns:
Cleaned text
"""
# Convert to lowercase for processing
cleaned = text.lower()
# Remove filler words
for filler in self.FILLER_WORDS:
# Use word boundaries to avoid partial matches
pattern = r'\b' + re.escape(filler) + r'\b'
cleaned = re.sub(pattern, '', cleaned, flags=re.IGNORECASE)
# Remove multiple spaces
cleaned = re.sub(r'\s+', ' ', cleaned)
# Remove leading/trailing whitespace
cleaned = cleaned.strip()
# Capitalize first letter of sentences
cleaned = '. '.join(s.capitalize() for s in cleaned.split('. '))
logger.debug(f"Cleaned text: reduced from {len(text)} to {len(cleaned)} characters")
return cleaned
def segment_by_time(
self,
segments: List[Dict],
interval_seconds: int = 300
) -> List[Dict]:
"""
Segment transcript by time intervals.
Args:
segments: List of timestamped segments from Whisper
interval_seconds: Time interval for each segment (default: 5 minutes)
Returns:
List of combined segments grouped by time
"""
if not segments:
return []
time_segments = []
current_segment = {
'start': segments[0]['start'],
'text': ''
}
for seg in segments:
# Check if we should start a new time segment
if seg['start'] - current_segment['start'] >= interval_seconds:
# Save current segment
current_segment['end'] = seg['start']
time_segments.append(current_segment)
# Start new segment
current_segment = {
'start': seg['start'],
'text': seg['text']
}
else:
# Add to current segment
current_segment['text'] += ' ' + seg['text']
# Add the last segment
if current_segment['text']:
current_segment['end'] = segments[-1]['end']
time_segments.append(current_segment)
logger.info(f"Segmented transcript into {len(time_segments)} time-based segments")
return time_segments
def segment_by_topic(self, text: str) -> List[str]:
"""
Segment text by detecting topic transitions.
Simple heuristic: Split on paragraph breaks and large sentences.
Args:
text: Full transcript text
Returns:
List of text segments
"""
# Split by double newlines (paragraphs)
paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
segments = []
current_segment = []
current_word_count = 0
for para in paragraphs:
words = para.split()
word_count = len(words)
# If adding this paragraph exceeds max words, start new segment
if current_word_count + word_count > self.max_segment_words and current_segment:
segments.append(' '.join(current_segment))
current_segment = [para]
current_word_count = word_count
else:
current_segment.append(para)
current_word_count += word_count
# Add the last segment
if current_segment:
segments.append(' '.join(current_segment))
logger.info(f"Segmented text into {len(segments)} topic-based segments")
return segments
def segment_transcript(
self,
transcript_data: Dict,
method: str = "time"
) -> List[Dict]:
"""
Segment transcript using specified method.
Args:
transcript_data: Full transcript data with text and segments
method: Segmentation method ("time" or "topic")
Returns:
List of segmented chunks
"""
if method == "time" and 'segments' in transcript_data:
# Use timestamped segments
return self.segment_by_time(transcript_data['segments'])
else:
# Use topic-based segmentation on full text
text_segments = self.segment_by_topic(transcript_data['text'])
return [{'text': seg} for seg in text_segments]