Spaces:
Sleeping
Sleeping
| """ | |
| Transcript segmentation module. | |
| Splits long transcripts into logical sections for better processing. | |
| """ | |
| import re | |
| from typing import List, Dict | |
| from src.utils.logger import setup_logger | |
| logger = setup_logger(__name__) | |
| class TranscriptSegmenter: | |
| """Handles intelligent segmentation of transcripts.""" | |
| # Common filler words to remove | |
| FILLER_WORDS = { | |
| 'um', 'uh', 'like', 'you know', 'i mean', 'sort of', 'kind of', | |
| 'basically', 'actually', 'literally', 'right', 'okay', 'so yeah' | |
| } | |
| def __init__(self, max_segment_words: int = 500): | |
| """ | |
| Initialize the segmenter. | |
| Args: | |
| max_segment_words: Maximum words per segment | |
| """ | |
| self.max_segment_words = max_segment_words | |
| def clean_text(self, text: str) -> str: | |
| """ | |
| Clean transcript by removing filler words and normalizing. | |
| Args: | |
| text: Raw transcript text | |
| Returns: | |
| Cleaned text | |
| """ | |
| # Convert to lowercase for processing | |
| cleaned = text.lower() | |
| # Remove filler words | |
| for filler in self.FILLER_WORDS: | |
| # Use word boundaries to avoid partial matches | |
| pattern = r'\b' + re.escape(filler) + r'\b' | |
| cleaned = re.sub(pattern, '', cleaned, flags=re.IGNORECASE) | |
| # Remove multiple spaces | |
| cleaned = re.sub(r'\s+', ' ', cleaned) | |
| # Remove leading/trailing whitespace | |
| cleaned = cleaned.strip() | |
| # Capitalize first letter of sentences | |
| cleaned = '. '.join(s.capitalize() for s in cleaned.split('. ')) | |
| logger.debug(f"Cleaned text: reduced from {len(text)} to {len(cleaned)} characters") | |
| return cleaned | |
| def segment_by_time( | |
| self, | |
| segments: List[Dict], | |
| interval_seconds: int = 300 | |
| ) -> List[Dict]: | |
| """ | |
| Segment transcript by time intervals. | |
| Args: | |
| segments: List of timestamped segments from Whisper | |
| interval_seconds: Time interval for each segment (default: 5 minutes) | |
| Returns: | |
| List of combined segments grouped by time | |
| """ | |
| if not segments: | |
| return [] | |
| time_segments = [] | |
| current_segment = { | |
| 'start': segments[0]['start'], | |
| 'text': '' | |
| } | |
| for seg in segments: | |
| # Check if we should start a new time segment | |
| if seg['start'] - current_segment['start'] >= interval_seconds: | |
| # Save current segment | |
| current_segment['end'] = seg['start'] | |
| time_segments.append(current_segment) | |
| # Start new segment | |
| current_segment = { | |
| 'start': seg['start'], | |
| 'text': seg['text'] | |
| } | |
| else: | |
| # Add to current segment | |
| current_segment['text'] += ' ' + seg['text'] | |
| # Add the last segment | |
| if current_segment['text']: | |
| current_segment['end'] = segments[-1]['end'] | |
| time_segments.append(current_segment) | |
| logger.info(f"Segmented transcript into {len(time_segments)} time-based segments") | |
| return time_segments | |
| def segment_by_topic(self, text: str) -> List[str]: | |
| """ | |
| Segment text by detecting topic transitions. | |
| Simple heuristic: Split on paragraph breaks and large sentences. | |
| Args: | |
| text: Full transcript text | |
| Returns: | |
| List of text segments | |
| """ | |
| # Split by double newlines (paragraphs) | |
| paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()] | |
| segments = [] | |
| current_segment = [] | |
| current_word_count = 0 | |
| for para in paragraphs: | |
| words = para.split() | |
| word_count = len(words) | |
| # If adding this paragraph exceeds max words, start new segment | |
| if current_word_count + word_count > self.max_segment_words and current_segment: | |
| segments.append(' '.join(current_segment)) | |
| current_segment = [para] | |
| current_word_count = word_count | |
| else: | |
| current_segment.append(para) | |
| current_word_count += word_count | |
| # Add the last segment | |
| if current_segment: | |
| segments.append(' '.join(current_segment)) | |
| logger.info(f"Segmented text into {len(segments)} topic-based segments") | |
| return segments | |
| def segment_transcript( | |
| self, | |
| transcript_data: Dict, | |
| method: str = "time" | |
| ) -> List[Dict]: | |
| """ | |
| Segment transcript using specified method. | |
| Args: | |
| transcript_data: Full transcript data with text and segments | |
| method: Segmentation method ("time" or "topic") | |
| Returns: | |
| List of segmented chunks | |
| """ | |
| if method == "time" and 'segments' in transcript_data: | |
| # Use timestamped segments | |
| return self.segment_by_time(transcript_data['segments']) | |
| else: | |
| # Use topic-based segmentation on full text | |
| text_segments = self.segment_by_topic(transcript_data['text']) | |
| return [{'text': seg} for seg in text_segments] | |