| | |
| | """ |
| | Audio Processing Module |
| | Handles audio extraction, processing, and integration with FFmpeg operations. |
| | |
| | Upgrades: |
| | - Prefer lossless audio stream-copy for muxing (no generational loss). |
| | - Safe fallback to AAC re-encode when needed. |
| | - Optional EBU R128 loudness normalization (two-pass loudnorm). |
| | - Optional audio/video offset with sample-accurate filters. |
| | - Robust ffprobe-based audio detection and metadata. |
| | - MoviePy fallback when ffmpeg is unavailable. |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import os |
| | import re |
| | import json |
| | import time |
| | import math |
| | import shutil |
| | import logging |
| | import tempfile |
| | import subprocess |
| | from pathlib import Path |
| | from typing import Optional, Dict, Any, List |
| |
|
| | from core.exceptions import AudioProcessingError |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class AudioProcessor: |
| | """ |
| | Comprehensive audio processing for video background replacement. |
| | """ |
| |
|
| | def __init__(self, temp_dir: Optional[str] = None): |
| | self.temp_dir = temp_dir or tempfile.gettempdir() |
| | self.ffmpeg_path = shutil.which("ffmpeg") |
| | self.ffprobe_path = shutil.which("ffprobe") |
| | self.ffmpeg_available = self.ffmpeg_path is not None |
| | self.ffprobe_available = self.ffprobe_path is not None |
| |
|
| | self.stats = { |
| | "audio_extractions": 0, |
| | "audio_merges": 0, |
| | "total_processing_time": 0.0, |
| | "failed_operations": 0, |
| | } |
| |
|
| | if not self.ffmpeg_available: |
| | logger.warning("FFmpeg not available - audio processing will be limited") |
| | logger.info( |
| | "AudioProcessor initialized (FFmpeg: %s, FFprobe: %s)", |
| | self.ffmpeg_available, |
| | self.ffprobe_available, |
| | ) |
| |
|
| | |
| | |
| | |
| |
|
| | def _run(self, cmd: List[str], tag: str = "") -> subprocess.CompletedProcess: |
| | logger.info("ffmpeg%s: %s", f"[{tag}]" if tag else "", " ".join(cmd)) |
| | return subprocess.run(cmd, text=True, capture_output=True) |
| |
|
| | def _has_audio(self, path: str) -> bool: |
| | if not os.path.isfile(path): |
| | return False |
| | if self.ffprobe_available: |
| | try: |
| | proc = subprocess.run( |
| | [ |
| | self.ffprobe_path, "-v", "error", |
| | "-select_streams", "a:0", |
| | "-show_entries", "stream=index", |
| | "-of", "csv=p=0", |
| | path, |
| | ], |
| | text=True, capture_output=True, check=False, |
| | ) |
| | return bool(proc.stdout.strip()) |
| | except Exception: |
| | pass |
| | |
| | if self.ffmpeg_available: |
| | try: |
| | proc = subprocess.run( |
| | [self.ffmpeg_path, "-hide_banner", "-loglevel", "error", "-i", path, "-f", "null", "-"], |
| | text=True, capture_output=True, |
| | ) |
| | return "Audio:" in (proc.stderr or "") |
| | except Exception: |
| | return False |
| | return False |
| |
|
| | |
| | |
| | |
| |
|
| | def get_audio_info(self, video_path: str) -> Dict[str, Any]: |
| | """ |
| | Get comprehensive audio information from a media file. |
| | """ |
| | if not self.ffprobe_available: |
| | return {"has_audio": False, "error": "FFprobe not available"} |
| |
|
| | try: |
| | proc = subprocess.run( |
| | [ |
| | self.ffprobe_path, "-v", "error", |
| | "-select_streams", "a:0", |
| | "-show_entries", "stream=codec_name,sample_rate,channels,bit_rate,duration", |
| | "-of", "json", |
| | video_path, |
| | ], |
| | text=True, capture_output=True, check=False, |
| | ) |
| | if proc.returncode != 0: |
| | return {"has_audio": False, "error": proc.stderr.strip()} |
| |
|
| | data = json.loads(proc.stdout or "{}") |
| | streams = data.get("streams", []) |
| | if not streams: |
| | return {"has_audio": False, "error": "No audio stream found"} |
| |
|
| | s = streams[0] |
| | info = { |
| | "has_audio": True, |
| | "codec": s.get("codec_name", "unknown"), |
| | "sample_rate": int(s["sample_rate"]) if s.get("sample_rate") else "unknown", |
| | "channels": int(s["channels"]) if s.get("channels") else "unknown", |
| | "duration": float(s["duration"]) if s.get("duration") else "unknown", |
| | "bit_rate": int(s["bit_rate"]) if s.get("bit_rate") else "unknown", |
| | } |
| | return info |
| | except Exception as e: |
| | logger.error("Error getting audio info: %s", e) |
| | return {"has_audio": False, "error": str(e)} |
| |
|
| | |
| | |
| | |
| |
|
| | def extract_audio( |
| | self, |
| | video_path: str, |
| | output_path: Optional[str] = None, |
| | audio_format: str = "aac", |
| | quality: str = "high", |
| | ) -> Optional[str]: |
| | """ |
| | Extract audio from a media file to a separate file. |
| | """ |
| | if not self.ffmpeg_available: |
| | raise AudioProcessingError("extract", "FFmpeg not available", video_path) |
| |
|
| | start = time.time() |
| | info = self.get_audio_info(video_path) |
| | if not info.get("has_audio", False): |
| | logger.info("No audio found in %s", video_path) |
| | return None |
| |
|
| | if output_path is None: |
| | output_path = os.path.join(self.temp_dir, f"extracted_audio_{int(time.time())}.{audio_format}") |
| |
|
| | quality_map = { |
| | "low": {"aac": ["-b:a", "96k"], "mp3": ["-b:a", "128k"], "wav": []}, |
| | "medium": {"aac": ["-b:a", "192k"], "mp3": ["-b:a", "192k"], "wav": []}, |
| | "high": {"aac": ["-b:a", "320k"], "mp3": ["-b:a", "320k"], "wav": []}, |
| | } |
| | codec_map = {"aac": ["-c:a", "aac"], "mp3": ["-c:a", "libmp3lame"], "wav": ["-c:a", "pcm_s16le"]} |
| |
|
| | cmd = [self.ffmpeg_path, "-y", "-i", video_path] |
| | cmd += codec_map.get(audio_format, ["-c:a", "aac"]) |
| | cmd += quality_map.get(quality, {}).get(audio_format, []) |
| | cmd += ["-vn", output_path] |
| |
|
| | proc = self._run(cmd, "extract") |
| | if proc.returncode != 0: |
| | self.stats["failed_operations"] += 1 |
| | raise AudioProcessingError("extract", f"FFmpeg failed: {proc.stderr}", video_path, output_path) |
| |
|
| | if not os.path.exists(output_path): |
| | self.stats["failed_operations"] += 1 |
| | raise AudioProcessingError("extract", "Output audio file was not created", video_path, output_path) |
| |
|
| | self.stats["audio_extractions"] += 1 |
| | self.stats["total_processing_time"] += (time.time() - start) |
| | logger.info("Audio extracted: %s", output_path) |
| | return output_path |
| |
|
| | |
| | |
| | |
| |
|
| | def _measure_loudness(self, src_with_audio: str, stream_selector: str = "1:a:0") -> Optional[Dict[str, float]]: |
| | """ |
| | First pass loudnorm to measure levels. Returns dict with input_i, input_tp, input_lra, input_thresh, target_offset. |
| | We run ffmpeg with -filter_complex on the selected audio input and parse the printed JSON (stderr). |
| | """ |
| | |
| | |
| | cmd = [ |
| | self.ffmpeg_path, "-hide_banner", "-nostats", "-loglevel", "warning", |
| | "-i", src_with_audio, |
| | "-vn", |
| | "-af", "loudnorm=I=-16:TP=-1.5:LRA=11:print_format=json", |
| | "-f", "null", "-" |
| | ] |
| | proc = self._run(cmd, "loudnorm-pass1") |
| | txt = (proc.stderr or "") + (proc.stdout or "") |
| | |
| | m = re.search(r"\{\s*\"input_i\"[^\}]+\}", txt, re.MULTILINE | re.DOTALL) |
| | if not m: |
| | logger.warning("Could not parse loudnorm analysis output.") |
| | return None |
| | try: |
| | data = json.loads(m.group(0)) |
| | |
| | return { |
| | "input_i": float(data.get("input_i")), |
| | "input_tp": float(data.get("input_tp")), |
| | "input_lra": float(data.get("input_lra")), |
| | "input_thresh": float(data.get("input_thresh")), |
| | "target_offset": float(data.get("target_offset")), |
| | } |
| | except Exception as e: |
| | logger.warning("Loudnorm analysis JSON parse error: %s", e) |
| | return None |
| |
|
| | def _build_loudnorm_filter(self, measured: Dict[str, float], target_I=-16.0, target_TP=-1.5, target_LRA=11.0) -> str: |
| | """ |
| | Build the second-pass loudnorm filter string using measured values. |
| | """ |
| | |
| | return ( |
| | "loudnorm=" |
| | f"I={target_I}:TP={target_TP}:LRA={target_LRA}:" |
| | f"measured_I={measured['input_i']}:" |
| | f"measured_TP={measured['input_tp']}:" |
| | f"measured_LRA={measured['input_lra']}:" |
| | f"measured_thresh={measured['input_thresh']}:" |
| | f"offset={measured['target_offset']}:" |
| | "linear=true:print_format=summary" |
| | ) |
| |
|
| | |
| | |
| | |
| |
|
| | def add_audio_to_video( |
| | self, |
| | original_video: str, |
| | processed_video: str, |
| | output_path: Optional[str] = None, |
| | audio_quality: str = "high", |
| | normalize: bool = False, |
| | normalize_I: float = -16.0, |
| | normalize_TP: float = -1.5, |
| | normalize_LRA: float = 11.0, |
| | offset_ms: float = 0.0, |
| | ) -> str: |
| | """ |
| | Add/mux the audio from original_video into processed_video. |
| | |
| | Strategy: |
| | 1) If no audio in original → return processed (or copy to desired name). |
| | 2) If ffmpeg present: |
| | a) If normalize/offset requested → re-encode AAC with filters (two-pass loudnorm). |
| | b) Else try stream-copy (lossless): -c:a copy. If that fails, AAC re-encode. |
| | 3) If ffmpeg missing → fallback to MoviePy (re-encode). |
| | |
| | Returns path to the muxed video (MP4). |
| | """ |
| | if not os.path.isfile(processed_video): |
| | raise FileNotFoundError(f"Processed video not found: {processed_video}") |
| |
|
| | if output_path is None: |
| | base = os.path.splitext(os.path.basename(processed_video))[0] |
| | output_path = os.path.join(os.path.dirname(processed_video), f"{base}_with_audio.mp4") |
| |
|
| | |
| | if not self._has_audio(original_video): |
| | logger.info("Original has no audio; returning processed video unchanged.") |
| | if processed_video != output_path: |
| | shutil.copy2(processed_video, output_path) |
| | return output_path |
| |
|
| | if not self.ffmpeg_available: |
| | logger.warning("FFmpeg not available – using MoviePy fallback.") |
| | return self._moviepy_mux(original_video, processed_video, output_path) |
| |
|
| | start = time.time() |
| |
|
| | |
| | if normalize or abs(offset_ms) > 1e-3: |
| | |
| | filter_chain = [] |
| | if abs(offset_ms) > 1e-3: |
| | if offset_ms > 0: |
| | |
| | ms = int(round(offset_ms)) |
| | filter_chain.append(f"adelay={ms}|{ms}") |
| | else: |
| | |
| | secs = abs(offset_ms) / 1000.0 |
| | filter_chain.append(f"atrim=start={secs},asetpts=PTS-STARTPTS") |
| |
|
| | if normalize: |
| | measured = self._measure_loudness(original_video) |
| | if measured: |
| | filter_chain.append(self._build_loudnorm_filter(measured, normalize_I, normalize_TP, normalize_LRA)) |
| | else: |
| | |
| | filter_chain.append(f"loudnorm=I={normalize_I}:TP={normalize_TP}:LRA={normalize_LRA}") |
| |
|
| | afilter = ",".join(filter_chain) if filter_chain else None |
| |
|
| | |
| | cmd = [ |
| | self.ffmpeg_path, "-hide_banner", "-loglevel", "error", |
| | "-i", processed_video, |
| | "-i", original_video, |
| | "-map", "0:v:0", "-map", "1:a:0", |
| | "-c:v", "copy", |
| | "-c:a", "aac", "-b:a", "192k", "-ac", "2", "-ar", "48000", |
| | "-shortest", |
| | "-movflags", "+faststart", |
| | "-y", output_path, |
| | ] |
| | if afilter: |
| | |
| | cmd = [ |
| | self.ffmpeg_path, "-hide_banner", "-loglevel", "error", |
| | "-i", processed_video, |
| | "-i", original_video, |
| | "-map", "0:v:0", |
| | "-filter_complex", f"[1:a]{afilter}[aout]", |
| | "-map", "[aout]", |
| | "-c:v", "copy", |
| | "-c:a", "aac", "-b:a", "192k", "-ac", "2", "-ar", "48000", |
| | "-shortest", |
| | "-movflags", "+faststart", |
| | "-y", output_path, |
| | ] |
| |
|
| | proc = self._run(cmd, "mux-reencode-filters") |
| | if proc.returncode == 0 and os.path.exists(output_path) and os.path.getsize(output_path) > 0: |
| | self.stats["audio_merges"] += 1 |
| | self.stats["total_processing_time"] += (time.time() - start) |
| | logger.info("Audio merged with filters (normalize=%s, offset_ms=%.2f): %s", normalize, offset_ms, output_path) |
| | return output_path |
| |
|
| | logger.warning("Filtered mux failed; stderr: %s", proc.stderr) |
| |
|
| | |
| | cmd_copy = [ |
| | self.ffmpeg_path, "-hide_banner", "-loglevel", "error", |
| | "-i", processed_video, |
| | "-i", original_video, |
| | "-map", "0:v:0", "-map", "1:a:0", |
| | "-c:v", "copy", |
| | "-c:a", "copy", |
| | "-shortest", |
| | "-movflags", "+faststart", |
| | "-y", output_path, |
| | ] |
| | proc = self._run(cmd_copy, "mux-copy") |
| | if proc.returncode == 0 and os.path.exists(output_path) and os.path.getsize(output_path) > 0: |
| | self.stats["audio_merges"] += 1 |
| | self.stats["total_processing_time"] += (time.time() - start) |
| | logger.info("Audio merged (stream-copy): %s", output_path) |
| | return output_path |
| |
|
| | |
| | quality_map = {"low": ["-b:a", "96k"], "medium": ["-b:a", "192k"], "high": ["-b:a", "320k"]} |
| | cmd_aac = [ |
| | self.ffmpeg_path, "-hide_banner", "-loglevel", "error", |
| | "-i", processed_video, |
| | "-i", original_video, |
| | "-map", "0:v:0", "-map", "1:a:0", |
| | "-c:v", "copy", |
| | "-c:a", "aac", |
| | *quality_map.get(audio_quality, quality_map["high"]), |
| | "-ac", "2", "-ar", "48000", |
| | "-shortest", |
| | "-movflags", "+faststart", |
| | "-y", output_path, |
| | ] |
| | proc = self._run(cmd_aac, "mux-aac") |
| | if proc.returncode == 0 and os.path.exists(output_path) and os.path.getsize(output_path) > 0: |
| | self.stats["audio_merges"] += 1 |
| | self.stats["total_processing_time"] += (time.time() - start) |
| | logger.info("Audio merged (AAC re-encode): %s", output_path) |
| | return output_path |
| |
|
| | |
| | logger.warning("FFmpeg mux failed; using MoviePy fallback.") |
| | return self._moviepy_mux(original_video, processed_video, output_path) |
| |
|
| | |
| | |
| | |
| |
|
| | def _moviepy_mux(self, original_video: str, processed_video: str, output_path: str) -> str: |
| | try: |
| | from moviepy.editor import VideoFileClip, AudioFileClip |
| | except Exception as e: |
| | self.stats["failed_operations"] += 1 |
| | raise AudioProcessingError("mux", f"MoviePy unavailable and ffmpeg failed: {e}", processed_video) |
| |
|
| | with VideoFileClip(processed_video) as v_clip: |
| | try: |
| | a_clip = AudioFileClip(original_video) |
| | except Exception as e: |
| | logger.warning("MoviePy could not load audio from %s (%s). Returning processed video.", original_video, e) |
| | if processed_video != output_path: |
| | shutil.copy2(processed_video, output_path) |
| | return output_path |
| |
|
| | v_clip = v_clip.set_audio(a_clip) |
| | v_clip.write_videofile( |
| | output_path, |
| | codec="libx264", |
| | audio_codec="aac", |
| | audio_bitrate="192k", |
| | temp_audiofile=os.path.join(self.temp_dir, "temp-audio.m4a"), |
| | remove_temp=True, |
| | threads=2, |
| | preset="medium", |
| | ) |
| | return output_path |
| |
|
| | |
| | |
| | |
| |
|
| | def sync_audio_video( |
| | self, |
| | video_path: str, |
| | audio_path: str, |
| | output_path: str, |
| | offset_ms: float = 0.0, |
| | normalize: bool = False, |
| | normalize_I: float = -16.0, |
| | normalize_TP: float = -1.5, |
| | normalize_LRA: float = 11.0, |
| | ) -> bool: |
| | """ |
| | Synchronize a separate audio file with a video (copy video, re-encode audio AAC). |
| | Positive offset_ms delays audio; negative trims audio start. |
| | """ |
| | if not self.ffmpeg_available: |
| | raise AudioProcessingError("sync", "FFmpeg not available") |
| |
|
| | filter_chain = [] |
| | if abs(offset_ms) > 1e-3: |
| | if offset_ms > 0: |
| | ms = int(round(offset_ms)) |
| | filter_chain.append(f"adelay={ms}|{ms}") |
| | else: |
| | secs = abs(offset_ms) / 1000.0 |
| | filter_chain.append(f"atrim=start={secs},asetpts=PTS-STARTPTS") |
| |
|
| | if normalize: |
| | measured = self._measure_loudness(audio_path) |
| | if measured: |
| | filter_chain.append(self._build_loudnorm_filter(measured, normalize_I, normalize_TP, normalize_LRA)) |
| | else: |
| | filter_chain.append(f"loudnorm=I={normalize_I}:TP={normalize_TP}:LRA={normalize_LRA}") |
| |
|
| | afilter = ",".join(filter_chain) if filter_chain else None |
| |
|
| | if afilter: |
| | cmd = [ |
| | self.ffmpeg_path, "-hide_banner", "-loglevel", "error", |
| | "-i", video_path, |
| | "-i", audio_path, |
| | "-map", "0:v:0", |
| | "-filter_complex", f"[1:a]{afilter}[aout]", |
| | "-map", "[aout]", |
| | "-c:v", "copy", |
| | "-c:a", "aac", "-b:a", "192k", "-ac", "2", "-ar", "48000", |
| | "-shortest", |
| | "-movflags", "+faststart", |
| | "-y", output_path, |
| | ] |
| | else: |
| | cmd = [ |
| | self.ffmpeg_path, "-hide_banner", "-loglevel", "error", |
| | "-i", video_path, |
| | "-i", audio_path, |
| | "-map", "0:v:0", "-map", "1:a:0", |
| | "-c:v", "copy", |
| | "-c:a", "aac", "-b:a", "192k", "-ac", "2", "-ar", "48000", |
| | "-shortest", |
| | "-movflags", "+faststart", |
| | "-y", output_path, |
| | ] |
| |
|
| | proc = self._run(cmd, "sync") |
| | return proc.returncode == 0 and os.path.exists(output_path) and os.path.getsize(output_path) > 0 |
| |
|
| | |
| | |
| | |
| |
|
| | def adjust_audio_levels( |
| | self, |
| | input_path: str, |
| | output_path: str, |
| | volume_factor: float = 1.0, |
| | normalize: bool = False, |
| | normalize_I: float = -16.0, |
| | normalize_TP: float = -1.5, |
| | normalize_LRA: float = 11.0, |
| | ) -> bool: |
| | """ |
| | Adjust levels on a single-file video (copy video, re-encode audio AAC). |
| | """ |
| | if not self.ffmpeg_available: |
| | raise AudioProcessingError("adjust_levels", "FFmpeg not available") |
| |
|
| | filters = [] |
| | if volume_factor != 1.0: |
| | filters.append(f"volume={volume_factor}") |
| | if normalize: |
| | measured = self._measure_loudness(input_path) |
| | if measured: |
| | filters.append(self._build_loudnorm_filter(measured, normalize_I, normalize_TP, normalize_LRA)) |
| | else: |
| | filters.append(f"loudnorm=I={normalize_I}:TP={normalize_TP}:LRA={normalize_LRA}") |
| |
|
| | if filters: |
| | cmd = [ |
| | self.ffmpeg_path, "-hide_banner", "-loglevel", "error", |
| | "-i", input_path, |
| | "-c:v", "copy", |
| | "-af", ",".join(filters), |
| | "-c:a", "aac", "-b:a", "192k", "-ac", "2", "-ar", "48000", |
| | "-movflags", "+faststart", |
| | "-y", output_path, |
| | ] |
| | else: |
| | |
| | shutil.copy2(input_path, output_path) |
| | return True |
| |
|
| | proc = self._run(cmd, "adjust-levels") |
| | if proc.returncode != 0: |
| | raise AudioProcessingError("adjust_levels", proc.stderr, input_path) |
| | return os.path.exists(output_path) and os.path.getsize(output_path) > 0 |
| |
|
| | |
| | |
| | |
| |
|
| | def get_stats(self) -> Dict[str, Any]: |
| | tot_ops = self.stats["audio_extractions"] + self.stats["audio_merges"] + self.stats["failed_operations"] |
| | successes = self.stats["audio_extractions"] + self.stats["audio_merges"] |
| | success_rate = (successes / max(1, tot_ops)) * 100.0 |
| | return { |
| | "ffmpeg_available": self.ffmpeg_available, |
| | "ffprobe_available": self.ffprobe_available, |
| | "audio_extractions": self.stats["audio_extractions"], |
| | "audio_merges": self.stats["audio_merges"], |
| | "total_processing_time": self.stats["total_processing_time"], |
| | "failed_operations": self.stats["failed_operations"], |
| | "success_rate": success_rate, |
| | } |
| |
|
| | def cleanup_temp_files(self, max_age_hours: int = 24): |
| | """ |
| | Clean up temporary audio/video files older than specified age in temp_dir. |
| | """ |
| | try: |
| | temp_path = Path(self.temp_dir) |
| | cutoff = time.time() - (max_age_hours * 3600) |
| | cleaned = 0 |
| | |
| | for ext in (".aac", ".mp3", ".wav", ".mp4", ".m4a"): |
| | for p in temp_path.glob(f"*audio*{ext}"): |
| | try: |
| | if p.stat().st_mtime < cutoff: |
| | p.unlink() |
| | cleaned += 1 |
| | except Exception as e: |
| | logger.warning("Could not delete temp file %s: %s", p, e) |
| | if cleaned: |
| | logger.info("Cleaned up %d temporary audio files", cleaned) |
| | except Exception as e: |
| | logger.warning("Temp file cleanup error: %s", e) |
| |
|