| """ |
| Video Metadata Utilities |
| |
| This module provides functionality to read and write JSON metadata to video files. |
| - MP4: Uses mutagen to store metadata in ©cmt tag |
| - MKV: Uses FFmpeg to store metadata in comment/description tags |
| """ |
|
|
| import json |
| import subprocess |
| import os |
| import shutil |
| import tempfile |
|
|
| def _convert_image_to_bytes(img): |
| """ |
| Convert various image formats to bytes suitable for MP4 cover art. |
| |
| Args: |
| img: Can be: |
| - PIL Image object |
| - File path (str) |
| - bytes |
| |
| Returns: |
| tuple: (image_bytes, image_format) |
| - image_bytes: Binary image data |
| - image_format: AtomDataType constant (JPEG or PNG) |
| """ |
| from mutagen.mp4 import AtomDataType |
| from PIL import Image |
| import io |
| import os |
| |
| try: |
| |
| if isinstance(img, bytes): |
| |
| if img.startswith(b'\x89PNG'): |
| return img, AtomDataType.PNG |
| else: |
| return img, AtomDataType.JPEG |
| |
| |
| if isinstance(img, str): |
| if not os.path.exists(img): |
| print(f"Warning: Image file not found: {img}") |
| return None, None |
| |
| |
| ext = os.path.splitext(img)[1].lower() |
| |
| |
| pil_img = Image.open(img) |
| |
| |
| if pil_img.mode not in ('RGB', 'L'): |
| if pil_img.mode == 'RGBA': |
| |
| background = Image.new('RGB', pil_img.size, (255, 255, 255)) |
| background.paste(pil_img, mask=pil_img.split()[3]) |
| pil_img = background |
| else: |
| pil_img = pil_img.convert('RGB') |
| |
| |
| img_bytes = io.BytesIO() |
| |
| |
| if ext in ['.png', '.bmp', '.tiff', '.tif']: |
| pil_img.save(img_bytes, format='PNG') |
| img_format = AtomDataType.PNG |
| else: |
| pil_img.save(img_bytes, format='JPEG', quality=95) |
| img_format = AtomDataType.JPEG |
| |
| return img_bytes.getvalue(), img_format |
| |
| |
| if isinstance(img, Image.Image): |
| |
| if img.mode not in ('RGB', 'L'): |
| if img.mode == 'RGBA': |
| background = Image.new('RGB', img.size, (255, 255, 255)) |
| background.paste(img, mask=img.split()[3]) |
| img = background |
| else: |
| img = img.convert('RGB') |
| |
| |
| img_bytes = io.BytesIO() |
| img.save(img_bytes, format='PNG') |
| return img_bytes.getvalue(), AtomDataType.PNG |
| |
| print(f"Warning: Unsupported image type: {type(img)}") |
| return None, None |
| |
| except Exception as e: |
| print(f"Error converting image to bytes: {e}") |
| return None, None |
|
|
| def embed_source_images_metadata_mp4(file, source_images): |
| from mutagen.mp4 import MP4, MP4Cover, AtomDataType |
| import json |
| import os |
| |
| if not source_images: |
| return file |
| |
| try: |
| |
| |
| cover_data = [] |
| image_metadata = {} |
| |
| |
| for img_tag, img_data in source_images.items(): |
| if img_data is None: |
| continue |
| |
| tag_images = [] |
| |
| |
| img_list = img_data if isinstance(img_data, list) else [img_data] |
| |
| for img in img_list: |
| if img is not None: |
| cover_bytes, image_format = _convert_image_to_bytes(img) |
| if cover_bytes: |
| |
| if isinstance(img, str) and os.path.exists(img): |
| filename = os.path.basename(img) |
| extension = os.path.splitext(filename)[1] |
| else: |
| |
| extension = '.png' if image_format == AtomDataType.PNG else '.jpg' |
| filename = f"{img_tag}{extension}" |
| |
| tag_images.append({ |
| 'index': len(cover_data), |
| 'filename': filename, |
| 'extension': extension |
| }) |
| cover_data.append(MP4Cover(cover_bytes, image_format)) |
| |
| if tag_images: |
| image_metadata[img_tag] = tag_images |
| |
| if cover_data: |
| file.tags['----:com.apple.iTunes:EMBEDDED_IMAGES'] = cover_data |
| |
| file.tags['----:com.apple.iTunes:IMAGE_METADATA'] = json.dumps(image_metadata).encode('utf-8') |
| |
| |
| |
| except Exception as e: |
| print(f"Failed to embed cover art with mutagen: {e}") |
| print(f"This might be due to image format or MP4 file structure issues") |
| |
| return file |
|
|
|
|
| def save_metadata_to_mp4(file_path, metadata_dict, source_images = None): |
| """ |
| Save JSON metadata to MP4 file using mutagen. |
| |
| Args: |
| file_path (str): Path to MP4 file |
| metadata_dict (dict): Metadata dictionary to save |
| |
| Returns: |
| bool: True if successful, False otherwise |
| """ |
| try: |
| from mutagen.mp4 import MP4 |
| file = MP4(file_path) |
| file.tags['©cmt'] = [json.dumps(metadata_dict)] |
| if source_images is not None: |
| embed_source_images_metadata_mp4(file, source_images) |
| file.save() |
| return True |
| except Exception as e: |
| print(f"Error saving metadata to MP4 {file_path}: {e}") |
| return False |
|
|
|
|
| def save_metadata_to_mkv(file_path, metadata_dict): |
| """ |
| Save JSON metadata to MKV file using FFmpeg. |
| |
| Args: |
| file_path (str): Path to MKV file |
| metadata_dict (dict): Metadata dictionary to save |
| |
| Returns: |
| bool: True if successful, False otherwise |
| """ |
| try: |
| |
| temp_path = file_path.replace('.mkv', '_temp_with_metadata.mkv') |
| |
| |
| ffmpeg_cmd = [ |
| 'ffmpeg', '-y', '-i', file_path, |
| '-metadata', f'comment={json.dumps(metadata_dict)}', |
| '-map', '0', |
| '-c', 'copy', |
| temp_path |
| ] |
| |
| result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True) |
| |
| if result.returncode == 0: |
| |
| shutil.move(temp_path, file_path) |
| return True |
| else: |
| print(f"Warning: Failed to add metadata to MKV file: {result.stderr}") |
| |
| if os.path.exists(temp_path): |
| os.remove(temp_path) |
| return False |
| |
| except Exception as e: |
| print(f"Error saving metadata to MKV {file_path}: {e}") |
| return False |
|
|
|
|
|
|
| def save_video_metadata(file_path, metadata_dict, source_images= None): |
| """ |
| Save JSON metadata to video file (auto-detects MP4 vs MKV). |
| |
| Args: |
| file_path (str): Path to video file |
| metadata_dict (dict): Metadata dictionary to save |
| |
| Returns: |
| bool: True if successful, False otherwise |
| """ |
|
|
| if file_path.endswith('.mp4'): |
| return save_metadata_to_mp4(file_path, metadata_dict, source_images) |
| elif file_path.endswith('.mkv'): |
| return save_metadata_to_mkv(file_path, metadata_dict) |
| else: |
| return False |
|
|
|
|
| def read_metadata_from_mp4(file_path): |
| """ |
| Read JSON metadata from MP4 file using mutagen. |
| |
| Args: |
| file_path (str): Path to MP4 file |
| |
| Returns: |
| dict or None: Metadata dictionary if found, None otherwise |
| """ |
| try: |
| from mutagen.mp4 import MP4 |
| file = MP4(file_path) |
| tags = file.tags['©cmt'][0] |
| return json.loads(tags) |
| except Exception: |
| return None |
|
|
|
|
| def read_metadata_from_mkv(file_path): |
| """ |
| Read JSON metadata from MKV file using ffprobe. |
| |
| Args: |
| file_path (str): Path to MKV file |
| |
| Returns: |
| dict or None: Metadata dictionary if found, None otherwise |
| """ |
| try: |
| |
| result = subprocess.run([ |
| 'ffprobe', '-v', 'quiet', '-print_format', 'json', |
| '-show_format', file_path |
| ], capture_output=True, text=True) |
| |
| if result.returncode == 0: |
| probe_data = json.loads(result.stdout) |
| format_tags = probe_data.get('format', {}).get('tags', {}) |
| |
| |
| for tag_key in ['comment', 'COMMENT', 'description', 'DESCRIPTION']: |
| if tag_key in format_tags: |
| try: |
| return json.loads(format_tags[tag_key]) |
| except: |
| continue |
| return None |
| except Exception: |
| return None |
|
|
|
|
| def read_metadata_from_video(file_path): |
| """ |
| Read JSON metadata from video file (auto-detects MP4 vs MKV). |
| |
| Args: |
| file_path (str): Path to video file |
| |
| Returns: |
| dict or None: Metadata dictionary if found, None otherwise |
| """ |
| if file_path.endswith('.mp4'): |
| return read_metadata_from_mp4(file_path) |
| elif file_path.endswith('.mkv'): |
| return read_metadata_from_mkv(file_path) |
| else: |
| return None |
|
|
| def _extract_mp4_cover_art(video_path, output_dir = None): |
| """ |
| Extract cover art from MP4 files using mutagen with proper tag association. |
| |
| Args: |
| video_path (str): Path to the MP4 file |
| output_dir (str): Directory to save extracted images |
| |
| Returns: |
| dict: Dictionary mapping tags to lists of extracted image file paths |
| Format: {tag_name: [path1, path2, ...], ...} |
| """ |
| try: |
| from mutagen.mp4 import MP4 |
| import json |
| |
| file = MP4(video_path) |
| |
| if file.tags is None or '----:com.apple.iTunes:EMBEDDED_IMAGES' not in file.tags: |
| return {} |
| |
| cover_art = file.tags['----:com.apple.iTunes:EMBEDDED_IMAGES'] |
| |
| |
| metadata_data = file.tags.get('----:com.apple.iTunes:IMAGE_METADATA') |
| |
| if metadata_data: |
| |
| image_metadata = json.loads(metadata_data[0].decode('utf-8')) |
| extracted_files = {} |
| |
| for tag, tag_images in image_metadata.items(): |
| extracted_files[tag] = [] |
| |
| for img_info in tag_images: |
| cover_idx = img_info['index'] |
| |
| if cover_idx >= len(cover_art): |
| continue |
| if output_dir is None: output_dir = _create_temp_dir() |
| os.makedirs(output_dir, exist_ok=True) |
|
|
| cover = cover_art[cover_idx] |
| |
| |
| filename = img_info['filename'] |
| output_file = os.path.join(output_dir, filename) |
| |
| |
| if os.path.exists(output_file): |
| base, ext = os.path.splitext(filename) |
| counter = 1 |
| while os.path.exists(output_file): |
| filename = f"{base}_{counter}{ext}" |
| output_file = os.path.join(output_dir, filename) |
| counter += 1 |
|
|
|
|
| |
| with open(output_file, 'wb') as f: |
| f.write(cover) |
| |
| if os.path.exists(output_file): |
| extracted_files[tag].append(output_file) |
| |
| return extracted_files |
| |
| else: |
| |
| print(f"Warning: No IMAGE_METADATA found in {video_path}, using generic extraction") |
| extracted_files = {'unknown': []} |
| |
| for i, cover in enumerate(cover_art): |
| if output_dir is None: output_dir = _create_temp_dir() |
| os.makedirs(output_dir, exist_ok=True) |
|
|
| filename = f"cover_art_{i}.jpg" |
| output_file = os.path.join(output_dir, filename) |
| |
| with open(output_file, 'wb') as f: |
| f.write(cover) |
| |
| if os.path.exists(output_file): |
| extracted_files['unknown'].append(output_file) |
| |
| return extracted_files |
| |
| except Exception as e: |
| print(f"Error extracting cover art from MP4: {e}") |
| return {} |
|
|
| def _create_temp_dir(): |
| temp_dir = tempfile.mkdtemp() |
| os.makedirs(temp_dir, exist_ok=True) |
| return temp_dir |
|
|
| def extract_source_images(video_path, output_dir = None): |
| |
| |
| if video_path.lower().endswith('.mp4'): |
| return _extract_mp4_cover_art(video_path, output_dir) |
| if output_dir is None: |
| output_dir = _create_temp_dir() |
|
|
| |
| try: |
| |
| probe_cmd = [ |
| 'ffprobe', '-v', 'quiet', '-print_format', 'json', |
| '-show_streams', video_path |
| ] |
| |
| result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True) |
| import json as json_module |
| probe_data = json_module.loads(result.stdout) |
| |
| |
| attachment_streams = [] |
| for i, stream in enumerate(probe_data.get('streams', [])): |
| |
| |
| |
| |
| is_attached_pic = stream.get('disposition', {}).get('attached_pic', 0) == 1 |
| |
| |
| tags = stream.get('tags', {}) |
| has_image_metadata = ( |
| 'FILENAME' in tags and tags['FILENAME'].lower().endswith(('.jpg', '.jpeg', '.png')) or |
| 'filename' in tags and tags['filename'].lower().endswith(('.jpg', '.jpeg', '.png')) or |
| 'MIMETYPE' in tags and tags['MIMETYPE'].startswith('image/') or |
| 'mimetype' in tags and tags['mimetype'].startswith('image/') |
| ) |
| |
| |
| is_mjpeg = stream.get('codec_name') == 'mjpeg' |
| |
| if (stream.get('codec_type') == 'video' and |
| (is_attached_pic or (has_image_metadata and is_mjpeg))): |
| attachment_streams.append(i) |
| |
| if not attachment_streams: |
| return [] |
| |
| |
| extracted_files = [] |
| used_filenames = set() |
| |
| for stream_idx in attachment_streams: |
| |
| stream_info = probe_data['streams'][stream_idx] |
| tags = stream_info.get('tags', {}) |
| original_filename = ( |
| tags.get('filename') or |
| tags.get('FILENAME') or |
| f'attachment_{stream_idx}.png' |
| ) |
| |
| |
| safe_filename = os.path.basename(original_filename) |
| if not safe_filename.lower().endswith(('.jpg', '.jpeg', '.png')): |
| safe_filename += '.png' |
| |
| |
| base_name, ext = os.path.splitext(safe_filename) |
| counter = 0 |
| final_filename = safe_filename |
| while final_filename in used_filenames: |
| counter += 1 |
| final_filename = f"{base_name}_{counter}{ext}" |
| used_filenames.add(final_filename) |
| |
| output_file = os.path.join(output_dir, final_filename) |
| |
| |
| extract_cmd = [ |
| 'ffmpeg', '-y', '-i', video_path, |
| '-map', f'0:{stream_idx}', '-frames:v', '1', |
| output_file |
| ] |
| |
| try: |
| subprocess.run(extract_cmd, capture_output=True, text=True, check=True) |
| if os.path.exists(output_file): |
| extracted_files.append(output_file) |
| except subprocess.CalledProcessError as e: |
| print(f"Failed to extract attachment {stream_idx} from {os.path.basename(video_path)}: {e.stderr}") |
| |
| return extracted_files |
| |
| except subprocess.CalledProcessError as e: |
| print(f"Error extracting source images from {os.path.basename(video_path)}: {e.stderr}") |
| return [] |
|
|
|
|