Spaces:
Sleeping
Sleeping
| """ | |
| FastAPI application for YouTube study notes generation. | |
| Provides REST API endpoints for note generation and status tracking. | |
| """ | |
| import asyncio | |
| import uuid | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, Optional | |
| from enum import Enum | |
| from contextlib import asynccontextmanager | |
| from fastapi import FastAPI, HTTPException, BackgroundTasks | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse | |
| from pydantic import BaseModel, HttpUrl, Field | |
| from src.audio.downloader import YouTubeDownloader | |
| from src.audio.processor import AudioProcessor | |
| from src.transcription.whisper_transcriber import WhisperTranscriber | |
| from src.summarization.segmenter import TranscriptSegmenter | |
| from src.summarization.note_generator import NoteGenerator | |
| from src.utils.logger import setup_logger | |
| from src.utils.config import settings | |
| from src.db.database import create_db_and_tables | |
| logger = setup_logger(__name__) | |
| # Pydantic Models | |
| class TaskStatus(str, Enum): | |
| """Task processing status.""" | |
| PENDING = "pending" | |
| DOWNLOADING = "downloading" | |
| TRANSCRIBING = "transcribing" | |
| GENERATING_NOTES = "generating_notes" | |
| COMPLETED = "completed" | |
| FAILED = "failed" | |
| class GenerateNotesRequest(BaseModel): | |
| """Request model for note generation.""" | |
| youtube_url: HttpUrl = Field(..., description="YouTube video URL") | |
| language: str = Field(default="en", description="Video language code") | |
| class Config: | |
| json_schema_extra = { | |
| "example": { | |
| "youtube_url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ", | |
| "language": "en", | |
| } | |
| } | |
| class TaskResponse(BaseModel): | |
| """Response model for task creation.""" | |
| task_id: str = Field(..., description="Unique task identifier") | |
| status: TaskStatus = Field(..., description="Current task status") | |
| message: str = Field(..., description="Status message") | |
| class TaskStatusResponse(BaseModel): | |
| """Response model for task status queries.""" | |
| task_id: str | |
| status: TaskStatus | |
| message: str | |
| video_title: Optional[str] = None | |
| progress: Optional[int] = Field(None, description="Progress percentage (0-100)") | |
| notes_file: Optional[str] = None | |
| created_at: datetime | |
| updated_at: datetime | |
| # Global task storage (in production, use a database) | |
| tasks: Dict[str, Dict] = {} | |
| # --- Lifespan Event Handler (Fixes Windows Event Loop Issue) --- | |
| async def lifespan(app: FastAPI): | |
| """ | |
| Handle startup and shutdown events. | |
| Initializes the database tables when the server starts. | |
| """ | |
| logger.info("Lifespan: Initializing database tables...") | |
| await create_db_and_tables() | |
| logger.info("Lifespan: Database tables initialized successfully") | |
| yield | |
| logger.info("Lifespan: Server shutting down...") | |
| # FastAPI app | |
| app = FastAPI( | |
| title="YouTube Study Notes AI", | |
| description="Generate structured study notes from YouTube educational videos", | |
| version="1.0.0", | |
| lifespan=lifespan, | |
| ) | |
| # CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # In production, specify allowed origins | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Include routers | |
| from src.api.auth_routes import router as auth_router | |
| from src.api.notes_routes import router as notes_router | |
| from src.api.analytics_routes import router as analytics_router | |
| app.include_router(auth_router) | |
| app.include_router(notes_router) | |
| app.include_router(analytics_router) | |
| async def root(): | |
| """Root endpoint with API information.""" | |
| return { | |
| "name": "YouTube Study Notes AI", | |
| "version": "1.0.0", | |
| "description": "Generate structured study notes from YouTube videos with user management", | |
| "endpoints": { | |
| "authentication": { | |
| "signup": "POST /auth/signup", | |
| "login": "POST /auth/login", | |
| }, | |
| "notes": { | |
| "create": "POST /notes", | |
| "list": "GET /notes", | |
| "get": "GET /notes/{note_id}", | |
| "delete": "DELETE /notes/{note_id}", | |
| }, | |
| "analytics": {"user_stats": "GET /analytics"}, | |
| "generation": { | |
| "generate_notes": "POST /generate-notes", | |
| "check_status": "GET /status/{task_id}", | |
| "download_notes": "GET /download/{task_id}", | |
| }, | |
| }, | |
| "documentation": {"swagger_ui": "/docs", "redoc": "/redoc"}, | |
| } | |
| async def generate_notes( | |
| request: GenerateNotesRequest, background_tasks: BackgroundTasks | |
| ): | |
| """ | |
| Generate study notes from a YouTube video. | |
| This endpoint starts an async task to process the video. | |
| Use the returned task_id to check status and download results. | |
| """ | |
| try: | |
| # Generate unique task ID | |
| task_id = str(uuid.uuid4()) | |
| # Initialize task | |
| tasks[task_id] = { | |
| "status": TaskStatus.PENDING, | |
| "message": "Task created, starting processing...", | |
| "youtube_url": str(request.youtube_url), | |
| "language": request.language, | |
| "video_title": None, | |
| "progress": 0, | |
| "notes_file": None, | |
| "created_at": datetime.now(), | |
| "updated_at": datetime.now(), | |
| } | |
| # Start background processing | |
| background_tasks.add_task( | |
| process_video, task_id, str(request.youtube_url), request.language | |
| ) | |
| logger.info(f"Created task {task_id} for URL: {request.youtube_url}") | |
| return TaskResponse( | |
| task_id=task_id, | |
| status=TaskStatus.PENDING, | |
| message="Processing started. Use task_id to check status.", | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to create task: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_status(task_id: str): | |
| """Get the current status of a processing task.""" | |
| if task_id not in tasks: | |
| raise HTTPException(status_code=404, detail="Task not found") | |
| task = tasks[task_id] | |
| return TaskStatusResponse( | |
| task_id=task_id, | |
| status=task["status"], | |
| message=task["message"], | |
| video_title=task.get("video_title"), | |
| progress=task.get("progress"), | |
| notes_file=task.get("notes_file"), | |
| created_at=task["created_at"], | |
| updated_at=task["updated_at"], | |
| ) | |
| async def download_notes(task_id: str): | |
| """Download the generated notes file.""" | |
| if task_id not in tasks: | |
| raise HTTPException(status_code=404, detail="Task not found") | |
| task = tasks[task_id] | |
| if task["status"] != TaskStatus.COMPLETED: | |
| raise HTTPException( | |
| status_code=400, detail=f"Notes not ready. Current status: {task['status']}" | |
| ) | |
| notes_file = task.get("notes_file") | |
| if not notes_file or not Path(notes_file).exists(): | |
| raise HTTPException(status_code=404, detail="Notes file not found") | |
| return FileResponse( | |
| notes_file, media_type="text/markdown", filename=Path(notes_file).name | |
| ) | |
| async def process_video(task_id: str, youtube_url: str, language: str): | |
| """ | |
| Background task to process video and generate notes. | |
| Args: | |
| task_id: Unique task identifier | |
| youtube_url: YouTube video URL | |
| language: Video language code | |
| """ | |
| audio_file = None | |
| try: | |
| # Update status: Downloading | |
| update_task(task_id, TaskStatus.DOWNLOADING, "Downloading video...", 10) | |
| # Download video and extract audio | |
| downloader = YouTubeDownloader() | |
| # Get video info | |
| video_info = downloader.get_video_info(youtube_url) | |
| video_title = video_info["title"] | |
| video_duration = video_info["duration"] | |
| update_task( | |
| task_id, | |
| TaskStatus.DOWNLOADING, | |
| f"Downloading: {video_title}", | |
| 20, | |
| video_title=video_title, | |
| ) | |
| audio_file = downloader.download_audio(youtube_url, task_id) | |
| # Validate audio | |
| processor = AudioProcessor() | |
| if not processor.validate_audio_file(audio_file): | |
| raise ValueError("Invalid audio file") | |
| # Update status: Transcribing | |
| update_task(task_id, TaskStatus.TRANSCRIBING, "Transcribing audio...", 40) | |
| # Transcribe audio | |
| transcriber = WhisperTranscriber() | |
| transcript_data = transcriber.transcribe(audio_file, language=language) | |
| update_task(task_id, TaskStatus.TRANSCRIBING, "Transcription complete", 60) | |
| # Update status: Generating notes | |
| update_task( | |
| task_id, TaskStatus.GENERATING_NOTES, "Generating structured notes...", 70 | |
| ) | |
| # Segment transcript | |
| segmenter = TranscriptSegmenter() | |
| # For shorter transcripts, process as a whole | |
| # For longer ones, segment first | |
| word_count = len(transcript_data["text"].split()) | |
| if word_count < 2000: | |
| # Short video: process full transcript | |
| logger.info("Processing short video (full transcript)") | |
| note_gen = NoteGenerator() | |
| notes = note_gen.generate_notes_from_full_transcript( | |
| transcript_data["text"], video_title | |
| ) | |
| else: | |
| # Long video: segment and process | |
| logger.info("Processing long video (segmented)") | |
| segments = segmenter.segment_transcript(transcript_data, method="time") | |
| note_gen = NoteGenerator() | |
| notes = note_gen.generate_notes_from_segments(segments) | |
| # Add title | |
| notes = f"# {video_title}\n\n{notes}" | |
| update_task(task_id, TaskStatus.GENERATING_NOTES, "Formatting notes...", 90) | |
| # Format final notes with metadata | |
| final_notes = note_gen.format_final_notes( | |
| notes, video_title, youtube_url, video_duration | |
| ) | |
| # Save notes to file | |
| notes_file = settings.output_dir / f"{task_id}_notes.md" | |
| notes_file.write_text(final_notes, encoding="utf-8") | |
| # Update status: Completed | |
| update_task( | |
| task_id, | |
| TaskStatus.COMPLETED, | |
| "Notes generated successfully!", | |
| 100, | |
| notes_file=str(notes_file), | |
| ) | |
| logger.info(f"Task {task_id} completed successfully") | |
| except Exception as e: | |
| logger.error(f"Task {task_id} failed: {e}") | |
| update_task(task_id, TaskStatus.FAILED, f"Processing failed: {str(e)}", 0) | |
| finally: | |
| # Cleanup audio file | |
| if audio_file and audio_file.exists(): | |
| try: | |
| downloader.cleanup(audio_file) | |
| except Exception as e: | |
| logger.warning(f"Cleanup failed: {e}") | |
| def update_task( | |
| task_id: str, status: TaskStatus, message: str, progress: int, **kwargs | |
| ): | |
| """Update task status and metadata.""" | |
| if task_id in tasks: | |
| tasks[task_id].update( | |
| { | |
| "status": status, | |
| "message": message, | |
| "progress": progress, | |
| "updated_at": datetime.now(), | |
| **kwargs, | |
| } | |
| ) | |