Spaces:
Sleeping
Sleeping
File size: 5,729 Bytes
ed147e2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 | """
Transcript segmentation module.
Splits long transcripts into logical sections for better processing.
"""
import re
from typing import List, Dict
from src.utils.logger import setup_logger
logger = setup_logger(__name__)
class TranscriptSegmenter:
"""Handles intelligent segmentation of transcripts."""
# Common filler words to remove
FILLER_WORDS = {
'um', 'uh', 'like', 'you know', 'i mean', 'sort of', 'kind of',
'basically', 'actually', 'literally', 'right', 'okay', 'so yeah'
}
def __init__(self, max_segment_words: int = 500):
"""
Initialize the segmenter.
Args:
max_segment_words: Maximum words per segment
"""
self.max_segment_words = max_segment_words
def clean_text(self, text: str) -> str:
"""
Clean transcript by removing filler words and normalizing.
Args:
text: Raw transcript text
Returns:
Cleaned text
"""
# Convert to lowercase for processing
cleaned = text.lower()
# Remove filler words
for filler in self.FILLER_WORDS:
# Use word boundaries to avoid partial matches
pattern = r'\b' + re.escape(filler) + r'\b'
cleaned = re.sub(pattern, '', cleaned, flags=re.IGNORECASE)
# Remove multiple spaces
cleaned = re.sub(r'\s+', ' ', cleaned)
# Remove leading/trailing whitespace
cleaned = cleaned.strip()
# Capitalize first letter of sentences
cleaned = '. '.join(s.capitalize() for s in cleaned.split('. '))
logger.debug(f"Cleaned text: reduced from {len(text)} to {len(cleaned)} characters")
return cleaned
def segment_by_time(
self,
segments: List[Dict],
interval_seconds: int = 300
) -> List[Dict]:
"""
Segment transcript by time intervals.
Args:
segments: List of timestamped segments from Whisper
interval_seconds: Time interval for each segment (default: 5 minutes)
Returns:
List of combined segments grouped by time
"""
if not segments:
return []
time_segments = []
current_segment = {
'start': segments[0]['start'],
'text': ''
}
for seg in segments:
# Check if we should start a new time segment
if seg['start'] - current_segment['start'] >= interval_seconds:
# Save current segment
current_segment['end'] = seg['start']
time_segments.append(current_segment)
# Start new segment
current_segment = {
'start': seg['start'],
'text': seg['text']
}
else:
# Add to current segment
current_segment['text'] += ' ' + seg['text']
# Add the last segment
if current_segment['text']:
current_segment['end'] = segments[-1]['end']
time_segments.append(current_segment)
logger.info(f"Segmented transcript into {len(time_segments)} time-based segments")
return time_segments
def segment_by_topic(self, text: str) -> List[str]:
"""
Segment text by detecting topic transitions.
Simple heuristic: Split on paragraph breaks and large sentences.
Args:
text: Full transcript text
Returns:
List of text segments
"""
# Split by double newlines (paragraphs)
paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
segments = []
current_segment = []
current_word_count = 0
for para in paragraphs:
words = para.split()
word_count = len(words)
# If adding this paragraph exceeds max words, start new segment
if current_word_count + word_count > self.max_segment_words and current_segment:
segments.append(' '.join(current_segment))
current_segment = [para]
current_word_count = word_count
else:
current_segment.append(para)
current_word_count += word_count
# Add the last segment
if current_segment:
segments.append(' '.join(current_segment))
logger.info(f"Segmented text into {len(segments)} topic-based segments")
return segments
def segment_transcript(
self,
transcript_data: Dict,
method: str = "time"
) -> List[Dict]:
"""
Segment transcript using specified method.
Args:
transcript_data: Full transcript data with text and segments
method: Segmentation method ("time" or "topic")
Returns:
List of segmented chunks
"""
if method == "time" and 'segments' in transcript_data:
# Use timestamped segments
return self.segment_by_time(transcript_data['segments'])
else:
# Use topic-based segmentation on full text
text_segments = self.segment_by_topic(transcript_data['text'])
return [{'text': seg} for seg in text_segments]
|