| """ |
| Task 2: Duration - Generate duration comparison questions |
| |
| This task creates audio samples where sources have different effective durations |
| and asks questions about which sound is heard for the longest or shortest time. |
| |
| Key features: |
| - Uses amplitude-filtered (preprocessed) audio clips with known effective durations |
| - First calculates max clips from total duration, then distributes slots |
| - Strategically distributes repetitions to ensure clear longest/shortest answers |
| - Consecutive ordering within sources, random order between sources |
| - Gap multipliers ensure unambiguous answers (e.g., longest is 1.5x longer than next) |
| - NO category preference - random selection to avoid bias |
| """ |
|
|
| import csv |
| import random |
| import math |
| from pathlib import Path |
| from typing import Dict, List, Tuple, Optional |
| from collections import Counter |
|
|
| import sys |
| sys.path.append(str(Path(__file__).parent.parent)) |
|
|
| from utils import ( |
| AudioProcessor, PreprocessedESC50Dataset, QuestionGenerator, LLMQuestionGenerator, |
| setup_logger, set_random_seed, calculate_num_samples_for_task, |
| generate_single_clip_duration, get_max_clip_num_to_be_joined, |
| build_duration_task_audio, distribute_remainder_as_silences, |
| generate_sample_durations_for_task |
| ) |
|
|
|
|
| class DurationTaskGenerator: |
| """Generator for duration comparison task dataset using preprocessed ESC-50.""" |
| |
| def __init__(self, config: Dict, logger): |
| """ |
| Initialize duration task generator. |
| |
| Args: |
| config: Configuration dictionary |
| logger: Logger instance |
| """ |
| self.config = config |
| self.logger = logger |
| self.task_config = config['tasks']['duration'] |
| |
| |
| self.dataset = PreprocessedESC50Dataset( |
| metadata_path=config['esc50']['metadata_path'], |
| audio_path=config['esc50']['audio_path'], |
| preprocessed_path=self.task_config['preprocessed_data_path'], |
| config=config |
| ) |
| |
| |
| self.avg_effective_duration = self.dataset.effective_df['effective_duration_s'].mean() |
| self.logger.info(f"Average effective duration: {self.avg_effective_duration:.2f}s") |
| |
| |
| self.audio_processor = AudioProcessor( |
| crossfade_duration=config['audio']['crossfade_duration'], |
| silence_duration=config['audio']['silence_duration'], |
| with_silence=config['audio']['with_silence'], |
| normalize=config['audio']['normalize'], |
| normalize_target_dBFS=config['audio']['normalize_target_dBFS'], |
| synthetic_silence_path=config['synthetic_silence']['path'] |
| ) |
| |
| |
| self.question_generator = QuestionGenerator( |
| num_options=config['mcq']['num_options'], |
| option_labels=config['mcq']['option_labels'], |
| distractor_strategy=config['mcq']['distractor_strategy'] |
| ) |
| |
| |
| self.llm_enabled = config.get('llm', {}).get('enabled', False) |
| self.llm_generator = LLMQuestionGenerator( |
| enabled=self.llm_enabled, |
| template_questions=self.task_config |
| ) |
| |
| |
| self.min_clip_duration = config['audio']['min_clip_duration'] |
| self.max_clip_duration = config['audio']['max_clip_duration'] |
| self.min_silence_ms = config['audio'].get('min_silence_duration', 100) |
| self.max_extra_silence_per_gap_ms = config['audio'].get('max_extra_silence_per_gap', 500) |
| self.crossfade_within_source_ms = config['audio'].get('crossfade_within_source', 50) |
| self.task_duration_hours = self.task_config['task_duration_size'] |
| |
| |
| self.multiplier_longest = self.task_config.get('multiplier_longest', 1.5) |
| self.multiplier_shortest = self.task_config.get('multiplier_shortest', 0.75) |
| self.reject_if_gap_not_met = self.task_config.get('reject_if_gap_not_met', True) |
| self.sample_different_clips = self.task_config.get('sample_different_clips_same_class', True) |
| |
| self.min_effective_duration_per_source = self.task_config.get('min_effective_duration_per_source', 1.0) |
| |
| |
| self.output_base = Path(config['output']['base_path']) / 'duration' |
| self.output_base.mkdir(parents=True, exist_ok=True) |
| self.audio_output = self.output_base / 'audios' |
| self.audio_output.mkdir(parents=True, exist_ok=True) |
| |
| |
| self.rejection_count = 0 |
| self.success_count = 0 |
| |
| def _calculate_max_clips_and_sources( |
| self, |
| target_duration_s: float, |
| question_type: str |
| ) -> Tuple[int, int, float]: |
| """ |
| Calculate max clips possible and choose n_sources from config that satisfies gap. |
| |
| Key principle: |
| 1. Calculate valid range of sources that can satisfy gap constraint |
| 2. Filter config values to only those within valid range |
| 3. Pick RANDOMLY from valid config values (ensures variety) |
| |
| For LONGEST: |
| - Target needs at least 2 clips to beat max_background by 1.5x |
| - max_sources = max_clips - 2 + 1 (backgrounds get 1 each) |
| - min_sources = 2 (need at least 1 background) |
| |
| For SHORTEST: |
| - Target gets 1 clip |
| - Each background needs at least 2 clips to be 2x target (1/0.5) |
| - max_sources = 1 + (max_clips - 1) // 2 |
| - min_sources = 2 |
| |
| Args: |
| target_duration_s: Target total audio duration |
| question_type: "longest" or "shortest" |
| |
| Returns: |
| Tuple of (max_clips, n_sources, remainder_s) |
| """ |
| |
| max_clips, remainder_s = get_max_clip_num_to_be_joined( |
| target_duration_s, |
| self.avg_effective_duration, |
| self.min_silence_ms |
| ) |
| |
| |
| max_clips = max(2, max_clips) |
| |
| |
| |
| |
| num_sources_config = self.task_config.get('num_unique_sources', [2, 3, 4, 5]) |
| if isinstance(num_sources_config, int): |
| |
| num_sources_config = list(range(1, num_sources_config + 1)) |
| |
| if question_type == "longest": |
| |
| |
| min_target_clips = 2 |
| |
| |
| min_valid_sources = 2 |
| |
| |
| |
| max_valid_sources = max_clips - min_target_clips + 1 |
| |
| else: |
| |
| |
| min_clips_per_background = 2 |
| |
| |
| min_valid_sources = 2 |
| |
| |
| remaining_clips = max_clips - 1 |
| max_backgrounds = remaining_clips // min_clips_per_background |
| max_valid_sources = max_backgrounds + 1 |
| |
| |
| valid_config_sources = [ |
| n for n in num_sources_config |
| if min_valid_sources <= n <= max_valid_sources |
| ] |
| |
| if not valid_config_sources: |
| raise ValueError( |
| f"Duration task: No valid num_unique_sources for {question_type} question. " |
| f"Config values: {num_sources_config}, Valid range: [{min_valid_sources}, {max_valid_sources}]. " |
| f"max_clips={max_clips}, duration={target_duration_s:.1f}s. " |
| f"Increase min_clip_duration or adjust num_unique_sources config." |
| ) |
| |
| |
| n_sources = random.choice(valid_config_sources) |
| |
| |
| if n_sources < 2 or n_sources > len(self.dataset.CATEGORIES): |
| raise ValueError( |
| f"Duration task: Invalid n_sources={n_sources}. " |
| f"Must be in range [2, {len(self.dataset.CATEGORIES)}]" |
| ) |
| |
| self.logger.debug( |
| f"Max clips: {max_clips}, Question: {question_type}, " |
| f"Valid range: [{min_valid_sources}, {max_valid_sources}], " |
| f"Valid config: {valid_config_sources}, Selected: {n_sources}" |
| ) |
| |
| return max_clips, n_sources, remainder_s |
| |
| def _calculate_slot_distribution( |
| self, |
| max_clips: int, |
| n_sources: int, |
| effective_durations: Dict[str, float], |
| target_category: str, |
| question_type: str |
| ) -> Tuple[Dict[str, int], bool, Dict]: |
| """ |
| Calculate how many clips each source gets. |
| |
| For LONGEST: target gets (max_clips - n_backgrounds), backgrounds get 1 each |
| For SHORTEST: target gets 1, backgrounds share (max_clips - 1) |
| |
| Args: |
| max_clips: Maximum number of clips that fit |
| n_sources: Number of unique sources |
| effective_durations: Dict mapping category -> effective duration |
| target_category: The category that should be longest/shortest |
| question_type: "longest" or "shortest" |
| |
| Returns: |
| Tuple of (slot_distribution, gap_satisfied, metadata) |
| """ |
| categories = list(effective_durations.keys()) |
| background_categories = [c for c in categories if c != target_category] |
| n_backgrounds = len(background_categories) |
| |
| if question_type == "longest": |
| |
| |
| target_clips = max_clips - n_backgrounds |
| target_clips = max(1, target_clips) |
| |
| slot_distribution = {target_category: target_clips} |
| for cat in background_categories: |
| slot_distribution[cat] = 1 |
| |
| |
| target_duration = target_clips * effective_durations[target_category] |
| background_durations = [effective_durations[c] for c in background_categories] |
| max_background = max(background_durations) if background_durations else 0 |
| required_target = max_background * self.multiplier_longest |
| gap_satisfied = target_duration >= required_target |
| |
| metadata = { |
| 'target_clips': target_clips, |
| 'target_duration_s': target_duration, |
| 'max_background_s': max_background, |
| 'required_target_s': required_target, |
| 'multiplier': self.multiplier_longest |
| } |
| |
| else: |
| |
| |
| remaining_clips = max_clips - 1 |
| clips_per_background = max(1, remaining_clips // n_backgrounds) |
| extra_clips = remaining_clips % n_backgrounds |
| |
| slot_distribution = {target_category: 1} |
| |
| for i, cat in enumerate(background_categories): |
| clips = clips_per_background + (1 if i < extra_clips else 0) |
| slot_distribution[cat] = clips |
| |
| |
| target_duration = effective_durations[target_category] |
| background_durations = [ |
| slot_distribution[c] * effective_durations[c] |
| for c in background_categories |
| ] |
| min_background = min(background_durations) if background_durations else float('inf') |
| required_max_target = min_background * self.multiplier_shortest |
| |
| |
| |
| target_too_short = target_duration < self.min_effective_duration_per_source |
| gap_satisfied = (target_duration <= required_max_target) and (not target_too_short) |
| |
| metadata = { |
| 'target_clips': 1, |
| 'target_duration_s': target_duration, |
| 'min_background_s': min_background, |
| 'required_max_target_s': required_max_target, |
| 'multiplier': self.multiplier_shortest, |
| 'target_too_short': target_too_short |
| } |
| |
| return slot_distribution, gap_satisfied, metadata |
| |
| def _try_generate_sample( |
| self, |
| sample_id: int, |
| question_type: str, |
| max_retries: int = 5, |
| target_duration_seconds: float = None |
| ) -> Optional[Dict]: |
| """ |
| Try to generate a valid duration sample with retries. |
| |
| Args: |
| sample_id: Sample ID |
| question_type: "longest" or "shortest" |
| max_retries: Maximum retry attempts |
| target_duration_seconds: Pre-generated target duration |
| |
| Returns: |
| Metadata dict if successful, None if all retries failed |
| """ |
| for attempt in range(max_retries): |
| try: |
| result = self._generate_single_sample(sample_id, question_type, target_duration_seconds=target_duration_seconds) |
| if result is not None: |
| return result |
| except Exception as e: |
| self.logger.warning(f"Sample {sample_id} attempt {attempt+1} failed: {e}") |
| |
| return None |
| |
| def _generate_single_sample( |
| self, |
| sample_id: int, |
| question_type: str, |
| target_duration_seconds: float = None |
| ) -> Optional[Dict]: |
| """ |
| Generate a single duration task sample. |
| |
| Corrected Pipeline: |
| 1. Use pre-generated target duration (or generate if not provided) |
| 2. Calculate max_clips using get_max_clip_num_to_be_joined |
| 3. Based on max_clips and question_type, determine n_sources |
| 4. Select categories RANDOMLY (no bias toward short/long) |
| 5. Pick target category RANDOMLY from selected |
| 6. Get effective durations for all sources |
| 7. Calculate slot distribution based on max_clips |
| 8. Verify gap constraint |
| 9. Load audio clips and build final audio |
| |
| Args: |
| sample_id: Sample ID number |
| question_type: "longest" or "shortest" |
| target_duration_seconds: Pre-generated target duration (from generate_sample_durations_for_task) |
| |
| Returns: |
| Dictionary with sample metadata, or None if failed |
| """ |
| |
| if target_duration_seconds is not None: |
| target_duration_s = target_duration_seconds |
| else: |
| target_duration_s = generate_single_clip_duration( |
| self.min_clip_duration, |
| self.max_clip_duration |
| ) |
| |
| |
| max_clips, n_sources, remainder_s = self._calculate_max_clips_and_sources( |
| target_duration_s, |
| question_type |
| ) |
| |
| |
| all_categories = self.dataset.get_least_used_categories(n_sources) |
| |
| |
| target_category = random.choice(all_categories) |
| self.dataset.category_usage_counts[target_category] += 1 |
| |
| |
| |
| effective_durations = {} |
| selected_files = {} |
| |
| for category in all_categories: |
| filename, filepath, eff_dur = self.dataset.sample_file_from_category_with_duration( |
| category, |
| min_effective_duration=self.min_effective_duration_per_source |
| ) |
| effective_durations[category] = eff_dur |
| selected_files[category] = { |
| 'filename': filename, |
| 'filepath': filepath, |
| 'effective_duration_s': eff_dur |
| } |
| |
| |
| slot_distribution, gap_satisfied, calc_metadata = self._calculate_slot_distribution( |
| max_clips=max_clips, |
| n_sources=n_sources, |
| effective_durations=effective_durations, |
| target_category=target_category, |
| question_type=question_type |
| ) |
| |
| |
| if not gap_satisfied: |
| |
| if self.sample_different_clips: |
| gap_satisfied = self._try_improve_gap_with_different_clips( |
| question_type=question_type, |
| target_category=target_category, |
| all_categories=all_categories, |
| max_clips=max_clips, |
| n_sources=n_sources, |
| effective_durations=effective_durations, |
| selected_files=selected_files, |
| slot_distribution=slot_distribution |
| ) |
| |
| if not gap_satisfied and self.reject_if_gap_not_met: |
| self.rejection_count += 1 |
| self.logger.debug( |
| f"Sample {sample_id} rejected: gap not satisfied " |
| f"(type={question_type}, max_clips={max_clips}, sources={n_sources})" |
| ) |
| return None |
| |
| |
| source_audio_lists = {} |
| files_used = {} |
| |
| for category in all_categories: |
| reps = slot_distribution.get(category, 0) |
| if reps == 0: |
| continue |
| |
| |
| if self.sample_different_clips and reps > 1: |
| filenames, filepaths, total_dur = self.dataset.sample_files_from_category_to_reach_duration( |
| category, |
| reps * effective_durations[category], |
| prefer_same_file=False |
| ) |
| else: |
| |
| file_info = selected_files[category] |
| filenames = [file_info['filename']] * reps |
| filepaths = [file_info['filepath']] * reps |
| |
| |
| audio_list = [] |
| for fp in filepaths[:reps]: |
| audio = self.audio_processor.load_audio(fp) |
| audio_list.append(audio) |
| |
| |
| while len(audio_list) < reps: |
| audio_list.append(audio_list[len(audio_list) % len(audio_list)]) |
| |
| source_audio_lists[category] = audio_list[:reps] |
| files_used[category] = filenames[:reps] |
| |
| |
| final_audio, category_sequence, build_metadata = build_duration_task_audio( |
| source_audio_lists=source_audio_lists, |
| slot_distribution=slot_distribution, |
| effective_durations=effective_durations, |
| target_total_duration_s=target_duration_s, |
| min_silence_between_sources_ms=self.min_silence_ms, |
| max_extra_silence_per_gap_ms=self.max_extra_silence_per_gap_ms, |
| crossfade_within_source_ms=self.crossfade_within_source_ms |
| ) |
| |
| |
| output_audio_path = self.audio_output / f"{sample_id}.wav" |
| final_audio.export(str(output_audio_path), format="wav") |
| |
| |
| correct_category = target_category |
| present_categories = all_categories |
| |
| mcq_question = self.task_config['mcq_questions'][question_type] |
| mcq_data = self.question_generator.generate_category_mcq( |
| mcq_question, |
| correct_category, |
| present_categories, |
| self.dataset.CATEGORIES |
| ) |
| |
| open_text_question = self.task_config['open_text_questions'][question_type] |
| open_text_data = self.question_generator.generate_category_open_text( |
| open_text_question, |
| correct_category |
| ) |
| |
| |
| actual_effective_durations = { |
| cat: slot_distribution[cat] * effective_durations[cat] |
| for cat in all_categories |
| if cat in slot_distribution |
| } |
| |
| |
| metadata = { |
| 'id': sample_id, |
| 'audio_path': str(output_audio_path.relative_to(self.output_base.parent)), |
| 'question_type': question_type, |
| 'max_clips': max_clips, |
| 'n_unique_sources': n_sources, |
| 'target_category': target_category, |
| 'present_categories': present_categories, |
| 'source_order': build_metadata['source_order'], |
| 'slot_distribution': slot_distribution, |
| 'effective_durations_per_clip': effective_durations, |
| 'total_effective_durations': actual_effective_durations, |
| 'gap_satisfied': gap_satisfied, |
| 'multiplier_used': self.multiplier_longest if question_type == 'longest' else self.multiplier_shortest, |
| 'files_used': files_used, |
| 'target_duration_s': target_duration_s, |
| 'actual_duration_s': len(final_audio) / 1000.0, |
| 'timestamp_string': build_metadata.get('timestamp_string', ''), |
| 'source_timestamps': build_metadata.get('source_timestamps', []), |
| 'mcq_question': mcq_data['question'], |
| 'mcq_options': mcq_data['options'], |
| 'mcq_correct_answer': mcq_data['correct_answer'], |
| 'open_text_question': open_text_data['question'], |
| 'open_text_answer': open_text_data['correct_answer'], |
| 'calc_metadata': calc_metadata |
| } |
| |
| self.success_count += 1 |
| self.logger.info( |
| f"Generated duration sample {sample_id}: {question_type}, " |
| f"max_clips={max_clips}, sources={n_sources}, target={target_category}, " |
| f"slots={slot_distribution}, gap_satisfied={gap_satisfied}" |
| ) |
| |
| return metadata |
| |
| def _try_improve_gap_with_different_clips( |
| self, |
| question_type: str, |
| target_category: str, |
| all_categories: List[str], |
| max_clips: int, |
| n_sources: int, |
| effective_durations: Dict[str, float], |
| selected_files: Dict[str, Dict], |
| slot_distribution: Dict[str, int] |
| ) -> bool: |
| """ |
| Try to improve gap satisfaction by selecting different clips. |
| |
| For LONGEST: try clips with longer effective duration for target |
| For SHORTEST: try clips with shorter effective duration for target |
| |
| Args: |
| Various state from generate_sample |
| |
| Returns: |
| True if gap is now satisfied |
| """ |
| files = self.dataset.get_files_by_category_with_durations(target_category) |
| |
| if question_type == "longest": |
| |
| files_sorted = sorted(files, key=lambda x: x['effective_duration_s'], reverse=True) |
| else: |
| |
| files_sorted = sorted(files, key=lambda x: x['effective_duration_s']) |
| |
| if files_sorted: |
| best = files_sorted[0] |
| effective_durations[target_category] = best['effective_duration_s'] |
| selected_files[target_category] = { |
| 'filename': best['filename'], |
| 'filepath': best['filepath'], |
| 'effective_duration_s': best['effective_duration_s'] |
| } |
| |
| |
| new_slots, gap_satisfied, _ = self._calculate_slot_distribution( |
| max_clips=max_clips, |
| n_sources=n_sources, |
| effective_durations=effective_durations, |
| target_category=target_category, |
| question_type=question_type |
| ) |
| |
| if gap_satisfied: |
| slot_distribution.clear() |
| slot_distribution.update(new_slots) |
| |
| return gap_satisfied |
| |
| def generate_sample(self, sample_id: int, target_question_type: str = None, target_duration_seconds: float = None) -> Optional[Dict]: |
| """ |
| Generate a single duration task sample with retries. |
| |
| Args: |
| sample_id: Sample ID number |
| target_question_type: Target question type for balanced distribution |
| target_duration_seconds: Pre-generated target duration (from generate_sample_durations_for_task) |
| |
| Returns: |
| Dictionary with sample metadata, or None if failed |
| """ |
| question_type = target_question_type or random.choice( |
| self.task_config['question_types'] |
| ) |
| |
| return self._try_generate_sample(sample_id, question_type, target_duration_seconds=target_duration_seconds) |
| |
| def generate_dataset(self) -> tuple: |
| """ |
| Generate the complete duration task dataset. |
| |
| Uses generate_sample_durations_for_task() to pre-generate exact sample durations |
| that sum to exactly the target task duration. This guarantees: |
| - Exact coverage of target duration |
| - No estimation errors from average-based calculation |
| |
| Returns: |
| Tuple of (mcq_csv_path, open_text_csv_path) |
| """ |
| |
| sample_durations = generate_sample_durations_for_task( |
| self.task_duration_hours, |
| self.min_clip_duration, |
| self.max_clip_duration |
| ) |
| num_samples = len(sample_durations) |
| |
| self.logger.info( |
| f"Generating {num_samples} duration task samples " |
| f"(target: {self.task_duration_hours}h, exact fill)..." |
| ) |
| |
| |
| question_types = self.task_config['question_types'] |
| balanced_types = [] |
| samples_per_type = num_samples // len(question_types) |
| remainder = num_samples % len(question_types) |
| |
| for qtype in question_types: |
| count = samples_per_type + (1 if remainder > 0 else 0) |
| balanced_types.extend([qtype] * count) |
| remainder = max(0, remainder - 1) |
| |
| random.shuffle(balanced_types) |
| type_dist = Counter(balanced_types) |
| self.logger.info(f"Balanced question type distribution: {dict(sorted(type_dist.items()))}") |
| |
| all_metadata = [] |
| sample_idx = 0 |
| type_idx = 0 |
| |
| while len(all_metadata) < num_samples and type_idx < len(balanced_types) * 2: |
| question_type = balanced_types[type_idx % len(balanced_types)] |
| target_duration = sample_durations[sample_idx] if sample_idx < len(sample_durations) else None |
| |
| metadata = self.generate_sample(sample_idx, question_type, target_duration_seconds=target_duration) |
| |
| if metadata is not None: |
| all_metadata.append(metadata) |
| sample_idx += 1 |
| |
| type_idx += 1 |
| |
| |
| if len(all_metadata) % 50 == 0: |
| self.logger.info( |
| f"Progress: {len(all_metadata)}/{num_samples} samples, " |
| f"{self.rejection_count} rejections" |
| ) |
| |
| self.logger.info( |
| f"Generation complete: {len(all_metadata)} samples, " |
| f"{self.rejection_count} rejections " |
| f"({self.rejection_count/(len(all_metadata)+self.rejection_count)*100:.1f}% rejection rate)" |
| ) |
| |
| |
| mcq_csv_path = self.output_base / 'duration_mcq.csv' |
| self._save_mcq_csv(all_metadata, mcq_csv_path) |
| |
| open_text_csv_path = self.output_base / 'duration_open_text.csv' |
| self._save_open_text_csv(all_metadata, open_text_csv_path) |
| |
| metadata_csv_path = self.output_base / 'duration_metadata.csv' |
| self._save_metadata_csv(all_metadata, metadata_csv_path) |
| |
| self.logger.info(f"Duration task dataset generation complete!") |
| self.logger.info(f" - MCQ CSV: {mcq_csv_path}") |
| self.logger.info(f" - Open-text CSV: {open_text_csv_path}") |
| self.logger.info(f" - Metadata CSV: {metadata_csv_path}") |
| self.logger.info(f" - Audio files: {self.audio_output}") |
| |
| return mcq_csv_path, open_text_csv_path |
| |
| def _save_mcq_csv(self, metadata_list: List[Dict], output_path: Path): |
| """Save MCQ format CSV.""" |
| with open(output_path, 'w', newline='') as f: |
| writer = csv.writer(f) |
| writer.writerow([ |
| 'question', 'id', 'audio_path', |
| 'optionA', 'optionB', 'optionC', 'optionD', |
| 'correct', 'question_type', 'max_clips', 'n_sources', |
| 'target_category', 'slot_distribution', 'effective_durations' |
| ]) |
| |
| for meta in metadata_list: |
| writer.writerow([ |
| meta['mcq_question'], |
| meta['id'], |
| meta['audio_path'], |
| meta['mcq_options']['A'], |
| meta['mcq_options']['B'], |
| meta['mcq_options']['C'], |
| meta['mcq_options']['D'], |
| meta['mcq_correct_answer'], |
| meta['question_type'], |
| meta['max_clips'], |
| meta['n_unique_sources'], |
| meta['target_category'], |
| str(meta['slot_distribution']), |
| str(meta['total_effective_durations']) |
| ]) |
| |
| def _save_open_text_csv(self, metadata_list: List[Dict], output_path: Path): |
| """Save open-text format CSV.""" |
| with open(output_path, 'w', newline='') as f: |
| writer = csv.writer(f) |
| writer.writerow([ |
| 'question', 'id', 'audio_path', 'answer', |
| 'question_type', 'max_clips', 'n_sources', |
| 'target_category', 'effective_durations' |
| ]) |
| |
| for meta in metadata_list: |
| writer.writerow([ |
| meta['open_text_question'], |
| meta['id'], |
| meta['audio_path'], |
| meta['open_text_answer'], |
| meta['question_type'], |
| meta['max_clips'], |
| meta['n_unique_sources'], |
| meta['target_category'], |
| str(meta['total_effective_durations']) |
| ]) |
| |
| def _save_metadata_csv(self, metadata_list: List[Dict], output_path: Path): |
| """Save detailed metadata CSV with effective durations and timestamps.""" |
| with open(output_path, 'w', newline='') as f: |
| writer = csv.writer(f) |
| writer.writerow([ |
| 'id', 'audio_path', 'question_type', 'max_clips', 'n_sources', |
| 'target_category', 'present_categories', 'source_order', |
| 'slot_distribution', 'effective_durations_per_clip', |
| 'total_effective_durations', 'gap_satisfied', 'multiplier_used', |
| 'target_duration_s', 'actual_duration_s', 'clip_timestamps', 'files_used' |
| ]) |
| |
| for meta in metadata_list: |
| writer.writerow([ |
| meta['id'], |
| meta['audio_path'], |
| meta['question_type'], |
| meta['max_clips'], |
| meta['n_unique_sources'], |
| meta['target_category'], |
| str(meta['present_categories']), |
| str(meta['source_order']), |
| str(meta['slot_distribution']), |
| str(meta['effective_durations_per_clip']), |
| str(meta['total_effective_durations']), |
| meta['gap_satisfied'], |
| meta['multiplier_used'], |
| round(meta['target_duration_s'], 2), |
| round(meta['actual_duration_s'], 2), |
| meta.get('timestamp_string', ''), |
| str(meta['files_used']) |
| ]) |
|
|
|
|
| def main(config_path: str = None): |
| """Main entry point for duration task generation.""" |
| import yaml |
| |
| if config_path is None: |
| config_path = Path(__file__).parent.parent / 'config.yaml' |
| |
| with open(config_path, 'r') as f: |
| config = yaml.safe_load(f) |
| |
| set_random_seed(config['random_seed']) |
| |
| logger = setup_logger( |
| 'duration_task', |
| log_file=str(Path(config['output']['base_path']) / config['logging']['log_file']), |
| level=config['logging']['level'], |
| console_output=config['logging']['console_output'] |
| ) |
| |
| generator = DurationTaskGenerator(config, logger) |
| generator.generate_dataset() |
|
|
|
|
| if __name__ == '__main__': |
| main() |
|
|