File size: 5,729 Bytes
ed147e2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"""

Transcript segmentation module.

Splits long transcripts into logical sections for better processing.

"""

import re
from typing import List, Dict

from src.utils.logger import setup_logger

logger = setup_logger(__name__)


class TranscriptSegmenter:
    """Handles intelligent segmentation of transcripts."""
    
    # Common filler words to remove
    FILLER_WORDS = {
        'um', 'uh', 'like', 'you know', 'i mean', 'sort of', 'kind of',
        'basically', 'actually', 'literally', 'right', 'okay', 'so yeah'
    }
    
    def __init__(self, max_segment_words: int = 500):
        """

        Initialize the segmenter.

        

        Args:

            max_segment_words: Maximum words per segment

        """
        self.max_segment_words = max_segment_words
    
    def clean_text(self, text: str) -> str:
        """

        Clean transcript by removing filler words and normalizing.

        

        Args:

            text: Raw transcript text

            

        Returns:

            Cleaned text

        """
        # Convert to lowercase for processing
        cleaned = text.lower()
        
        # Remove filler words
        for filler in self.FILLER_WORDS:
            # Use word boundaries to avoid partial matches
            pattern = r'\b' + re.escape(filler) + r'\b'
            cleaned = re.sub(pattern, '', cleaned, flags=re.IGNORECASE)
        
        # Remove multiple spaces
        cleaned = re.sub(r'\s+', ' ', cleaned)
        
        # Remove leading/trailing whitespace
        cleaned = cleaned.strip()
        
        # Capitalize first letter of sentences
        cleaned = '. '.join(s.capitalize() for s in cleaned.split('. '))
        
        logger.debug(f"Cleaned text: reduced from {len(text)} to {len(cleaned)} characters")
        
        return cleaned
    
    def segment_by_time(

        self,

        segments: List[Dict],

        interval_seconds: int = 300

    ) -> List[Dict]:
        """

        Segment transcript by time intervals.

        

        Args:

            segments: List of timestamped segments from Whisper

            interval_seconds: Time interval for each segment (default: 5 minutes)

            

        Returns:

            List of combined segments grouped by time

        """
        if not segments:
            return []
        
        time_segments = []
        current_segment = {
            'start': segments[0]['start'],
            'text': ''
        }
        
        for seg in segments:
            # Check if we should start a new time segment
            if seg['start'] - current_segment['start'] >= interval_seconds:
                # Save current segment
                current_segment['end'] = seg['start']
                time_segments.append(current_segment)
                
                # Start new segment
                current_segment = {
                    'start': seg['start'],
                    'text': seg['text']
                }
            else:
                # Add to current segment
                current_segment['text'] += ' ' + seg['text']
        
        # Add the last segment
        if current_segment['text']:
            current_segment['end'] = segments[-1]['end']
            time_segments.append(current_segment)
        
        logger.info(f"Segmented transcript into {len(time_segments)} time-based segments")
        
        return time_segments
    
    def segment_by_topic(self, text: str) -> List[str]:
        """

        Segment text by detecting topic transitions.

        Simple heuristic: Split on paragraph breaks and large sentences.

        

        Args:

            text: Full transcript text

            

        Returns:

            List of text segments

        """
        # Split by double newlines (paragraphs)
        paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
        
        segments = []
        current_segment = []
        current_word_count = 0
        
        for para in paragraphs:
            words = para.split()
            word_count = len(words)
            
            # If adding this paragraph exceeds max words, start new segment
            if current_word_count + word_count > self.max_segment_words and current_segment:
                segments.append(' '.join(current_segment))
                current_segment = [para]
                current_word_count = word_count
            else:
                current_segment.append(para)
                current_word_count += word_count
        
        # Add the last segment
        if current_segment:
            segments.append(' '.join(current_segment))
        
        logger.info(f"Segmented text into {len(segments)} topic-based segments")
        
        return segments
    
    def segment_transcript(

        self,

        transcript_data: Dict,

        method: str = "time"

    ) -> List[Dict]:
        """

        Segment transcript using specified method.

        

        Args:

            transcript_data: Full transcript data with text and segments

            method: Segmentation method ("time" or "topic")

            

        Returns:

            List of segmented chunks

        """
        if method == "time" and 'segments' in transcript_data:
            # Use timestamped segments
            return self.segment_by_time(transcript_data['segments'])
        else:
            # Use topic-based segmentation on full text
            text_segments = self.segment_by_topic(transcript_data['text'])
            return [{'text': seg} for seg in text_segments]