import os import re import json import subprocess import time import shutil import img2pdf import gradio as gr from google import genai # NEW SDK from pdf2image import convert_from_path from PIL import Image, ImageDraw, ImageFont import cv2 import numpy as np from PyPDF2 import PdfReader, PdfWriter from prompts import QP_MS_TRANSCRIPTION_PROMPT, get_grading_prompt from supabase import create_client, Client # ---------------- CONFIG ---------------- # Multi-API Key Configuration for handling RESOURCE_EXHAUSTED errors class GeminiClientManager: """Manages multiple Gemini API keys with automatic rotation on quota exhaustion.""" def __init__(self): # Load all three API keys from environment self.api_keys = [ os.getenv("GEMINI_API_KEY_1"), os.getenv("GEMINI_API_KEY_2"), os.getenv("GEMINI_API_KEY_3") ] # Filter out None values self.api_keys = [key for key in self.api_keys if key] if not self.api_keys: raise ValueError("❌ No API keys found! Please set at least GEMINI_API_KEY_1") print(f"βœ… Loaded {len(self.api_keys)} Gemini API key(s)") # Current key index (0 = primary) self.current_key_index = 0 # Create clients for all keys self.clients = [genai.Client(api_key=key) for key in self.api_keys] def get_current_client(self): """Get the currently active client.""" return self.clients[self.current_key_index] def rotate_to_next_key(self): """Rotate to the next available API key.""" if len(self.api_keys) == 1: print("⚠️ Only one API key available, cannot rotate") return False old_index = self.current_key_index self.current_key_index = (self.current_key_index + 1) % len(self.api_keys) print(f"πŸ”„ Rotating from API key #{old_index + 1} to API key #{self.current_key_index + 1}") return True def reset_to_primary(self): """Reset to primary (first) API key.""" if self.current_key_index != 0: print(f"πŸ”™ Resetting to primary API key #1") self.current_key_index = 0 # Initialize the client manager client_manager = GeminiClientManager() client = client_manager.get_current_client() # For backward compatibility GRID_ROWS, GRID_COLS = 20, 14 # Supabase configuration SUPABASE_URL = os.getenv("SUPABASE_URL") SUPABASE_SERVICE_KEY = os.getenv("SUPABASE_SERVICE_KEY") SUPABASE_BUCKET = "examfiles" # Initialize Supabase client (only if credentials are available) supabase_client = None if SUPABASE_URL and SUPABASE_SERVICE_KEY: try: supabase_client = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY) print("βœ… Supabase client initialized successfully") except Exception as e: print(f"⚠️ Supabase initialization failed: {e}") else: print("⚠️ Supabase credentials not found - file upload to storage disabled") # ---------------- PROMPTS ---------------- # Prompts are now imported from prompts.py # ---------------- SUPABASE HELPERS ---------------- def upload_file_to_supabase(local_path, file_type="unknown", timestamp=None): """ Upload a file to Supabase Storage. Args: local_path (str): Local file path file_type (str): Type of file (qp, ms, ans, graded, imprinted) timestamp (str): Unix timestamp for folder organization (optional) Returns: str: Public URL of uploaded file or None if upload failed """ if not supabase_client: print("⚠️ Supabase not configured - skipping upload") return None try: if timestamp is None: timestamp = str(int(time.time())) original_name = os.path.basename(local_path) # Use original filename without prefix for cleaner storage remote_path = f"{timestamp}/{original_name}" print(f"πŸ“€ Uploading {file_type} to Supabase: {remote_path}") with open(local_path, "rb") as f: supabase_client.storage.from_(SUPABASE_BUCKET).upload( remote_path, f, file_options={"upsert": "true"} ) public_url = f"{SUPABASE_URL}/storage/v1/object/public/{SUPABASE_BUCKET}/{remote_path}" print(f"βœ… Uploaded successfully: {public_url}") return public_url except Exception as e: print(f"❌ Supabase upload failed for {file_type}: {e}") return None def process_and_upload_input_files(qp_file_obj, ms_file_obj, ans_file_obj): """ Process uploaded files and upload them to Supabase using a shared timestamp. Args: qp_file_obj: Gradio file object for Question Paper ms_file_obj: Gradio file object for Markscheme ans_file_obj: Gradio file object for Answer Sheet Returns: tuple: (qp_path, ms_path, ans_path, upload_urls_dict, timestamp) """ print("\n" + "="*60) print("πŸ“ PROCESSING INPUT FILES") print("="*60) # Generate single timestamp for this entire run run_timestamp = str(int(time.time())) print(f"πŸ• Run timestamp: {run_timestamp}") upload_urls = { "qp_url": None, "ms_url": None, "ans_url": None } # Get local paths from Gradio file objects qp_path = qp_file_obj.name if qp_file_obj else None ms_path = ms_file_obj.name if ms_file_obj else None ans_path = ans_file_obj.name if ans_file_obj else None # Upload to Supabase if configured (all files use same timestamp) if supabase_client: if qp_path: upload_urls["qp_url"] = upload_file_to_supabase(qp_path, "qp", run_timestamp) if ms_path: upload_urls["ms_url"] = upload_file_to_supabase(ms_path, "ms", run_timestamp) if ans_path: upload_urls["ans_url"] = upload_file_to_supabase(ans_path, "ans", run_timestamp) print("="*60 + "\n") return qp_path, ms_path, ans_path, upload_urls, run_timestamp # ---------------- HELPERS ---------------- def parse_md_table(md): """Parse a Markdown table into a list of rows.""" lines = [l for l in md.split("\n") if l.strip()] if len(lines) < 3: return [] lines = lines[2:] # skip header + separator rows = [] for line in lines: parts = [c.strip() for c in line.strip("|").split("|")] # Filter out empty strings from leading/trailing pipes clean_parts = [p for p in parts if p] if clean_parts: rows.append(clean_parts) return rows def convert_html_color_spans(md_text): """Convert HTML color spans to LaTeX textcolor commands.""" pattern = r'\s*(.*?)\s*' def repl(m): color = m.group(1).strip() text = m.group(2) return fr'\textcolor{{{color}}}{{{text}}}' return re.sub(pattern, repl, md_text, flags=re.IGNORECASE) def cleanup_markdown_for_latex(md_text): """Clean up markdown text for better LaTeX conversion.""" # Ensure spacing between bold headers and tables md_text = re.sub(r'(\*\*Markscheme vs Student Answer\*\*)\s*(\|)', r'\1\n\n\2', md_text) # Convert common unicode math symbols to LaTeX (safety net) replacements = { '∫': r'\int ', 'Β²': '^2', 'Β³': '^3', 'Β½': r'\frac{1}{2}', 'ΒΌ': r'\frac{1}{4}', '∞': r'\infty', '≀': r'\leq', 'β‰₯': r'\geq', 'β‰ ': r'\neq', 'Β±': r'\pm', 'Γ—': r'\times', 'Γ·': r'\div', '√': r'\sqrt', 'βˆ‘': r'\sum', '∏': r'\prod', 'βˆ‚': r'\partial', 'Ο€': r'\pi', 'ΞΈ': r'\theta', 'Ξ±': r'\alpha', 'Ξ²': r'\beta', 'Ξ³': r'\gamma', 'Ξ΄': r'\delta', 'Ξ΅': r'\epsilon', 'Ξ»': r'\lambda', 'ΞΌ': r'\mu', 'Οƒ': r'\sigma', 'Ξ”': r'\Delta', 'Ξ£': r'\Sigma', 'Ξ©': r'\Omega' } for char, latex in replacements.items(): md_text = md_text.replace(char, f'${latex}$') return md_text def escape_latex_special_chars(text): """Escape special LaTeX characters in text.""" replacements = { '%': r'\%', '&': r'\&', '#': r'\#', '_': r'\_', '{': r'\{', '}': r'\}', '~': r'\textasciitilde{}', '^': r'\textasciicircum{}' } # Don't escape if already in math mode or LaTeX command if '$' in text or '\\' in text: return text for char, escaped in replacements.items(): text = text.replace(char, escaped) return text def save_as_pdf(text, filename="output.pdf"): """ Convert Markdown text to PDF using Pandoc with pdflatex. Extracts the Examiner's Summary Report and places it at the top with enhanced formatting. Converts HTML color spans to LaTeX textcolor commands. Args: text (str): Markdown content to convert filename (str): Output PDF filename Returns: str: Path to the generated PDF file Raises: Exception: If Pandoc or pdflatex is not available, or conversion fails """ # Sanitize filename - replace spaces and special characters with underscores # This prevents issues with pdflatex and file operations import string valid_chars = f"-_.() {string.ascii_letters}{string.digits}" sanitized_filename = ''.join(c if c in valid_chars else '_' for c in filename) # Replace multiple spaces with single underscore sanitized_filename = re.sub(r'\s+', '_', sanitized_filename) # Remove double underscores sanitized_filename = re.sub(r'_+', '_', sanitized_filename) if sanitized_filename != filename: print(f"ℹ️ Sanitized filename: '{filename}' β†’ '{sanitized_filename}'") filename = sanitized_filename base_name = os.path.splitext(filename)[0] temp_md_file = f"{base_name}_input.md" temp_tex_file = f"{base_name}_temp.tex" print("\n" + "="*60) print("πŸ“„ MARKDOWN TO PDF CONVERSION PROCESS") print("="*60) try: # Step 1: Extract Summary Report Table print("\n[STEP 1/6] Extracting Examiner's Summary Report...") summary_pattern = re.compile( r"### Examiner's Summary Report\s*\n\n(\|.*?\|)\s*\n\n\*\*Total:\s*(.*?)\*\*", re.DOTALL ) summary_match = summary_pattern.search(text) if summary_match: summary_table_md = summary_match.group(1) summary_total = summary_match.group(2) text = summary_pattern.sub("", text) print(f" βœ… SUCCESS: Extracted summary report with total: {summary_total}") else: summary_table_md = "" summary_total = "" print(" ⚠️ WARNING: No Examiner's Summary Report found in markdown") # Step 2: Clean up markdown print("\n[STEP 2/6] Cleaning markdown and converting HTML to LaTeX...") text = cleanup_markdown_for_latex(text) text = convert_html_color_spans(text) print(" βœ… SUCCESS: Markdown cleaned and HTML color spans converted") # Save cleaned markdown with open(temp_md_file, 'w', encoding='utf-8') as f: f.write(text) print(f" πŸ“ Saved cleaned markdown to: {temp_md_file}") # Step 3: Convert MD to LaTeX via Pandoc print("\n[STEP 3/6] Converting markdown to LaTeX using Pandoc...") pandoc_cmd = [ "pandoc", "--from=markdown", "--to=latex", "--standalone", temp_md_file, "-o", temp_tex_file ] print(f" πŸ”§ Running: {' '.join(pandoc_cmd)}") result = subprocess.run(pandoc_cmd, capture_output=True, check=False) if result.returncode != 0: try: stderr = result.stderr.decode('utf-8', errors='replace') except: stderr = str(result.stderr) print(f" ❌ FAILED: Pandoc returned error code {result.returncode}") print(f" Error details: {stderr[:500]}") raise Exception(f"Pandoc conversion failed: {stderr}") if not os.path.exists(temp_tex_file): print(f" ❌ FAILED: LaTeX file not created at {temp_tex_file}") raise Exception("Pandoc did not create the expected LaTeX file") print(f" βœ… SUCCESS: LaTeX file created at {temp_tex_file}") # Step 4: Modify the generated LaTeX print("\n[STEP 4/6] Enhancing LaTeX document...") with open(temp_tex_file, "r", encoding="utf-8") as f: tex = f.read() tex = tex.replace( r"\documentclass{article}", r"\documentclass[12pt]{extarticle}" ) insert_packages = r"""\usepackage[a4paper, margin=1in]{geometry} \usepackage{xcolor} \usepackage{colortbl} \usepackage{booktabs} \usepackage{array} \usepackage{longtable} \renewcommand{\arraystretch}{1.4} \newcolumntype{L}[1]{>{\raggedright\arraybackslash}p{#1}}""" tex = tex.replace(r"\begin{document}", insert_packages + "\n\\begin{document}") print(" βœ… SUCCESS: Enhanced document class and added packages") # Step 5: Build enhanced LaTeX table for summary if summary_table_md: print("\n[STEP 5/6] Building enhanced summary table...") summary_rows = parse_md_table(summary_table_md) print(f" πŸ“Š Parsed {len(summary_rows)} rows from summary table") summary_latex = r"""\section*{Examiner's Summary Report} \begin{center} \rowcolors{2}{gray!10}{white} \begin{tabular}{|c|c|c|L{8cm}|} \hline \rowcolor{gray!30} \textbf{Question} & \textbf{Marks} & \textbf{Remark} & \textbf{Feedback} \\ \hline """ for row in summary_rows: if len(row) >= 4: feedback = row[3] if not ('$' in feedback or '\\textcolor' in feedback): feedback = feedback.replace('%', r'\%').replace('&', r'\&').replace('#', r'\#') summary_latex += f"{row[0]} & {row[1]} & {row[2]} & {feedback} \\\\ \\hline\n" summary_latex += r"\end{tabular}" summary_latex += "\n\\end{center}\n\n" summary_latex += f"\\vspace{{0.5cm}}\\noindent\\textbf{{\\Large Overall Score: {summary_total}}}\n\n" summary_latex += "\\hrulefill\n\\vspace{1cm}\n\n" summary_latex += "\\newpage\n\n" tex = tex.replace( r"\begin{document}", r"\begin{document}" + "\n\n" + summary_latex ) print(" βœ… SUCCESS: Summary table with zebra striping injected at document top") else: print("\n[STEP 5/6] Skipping summary table (not found)") with open(temp_tex_file, "w", encoding="utf-8") as f: f.write(tex) # Step 6: Compile PDF with pdflatex print("\n[STEP 6/6] Compiling PDF with pdflatex...") pdflatex_cmd = [ "pdflatex", "-interaction=nonstopmode", f"-output-directory={os.path.dirname(os.path.abspath(temp_tex_file)) or '.'}", temp_tex_file ] print(" πŸ”§ Running pdflatex (pass 1/2)...") result1 = subprocess.run(pdflatex_cmd, capture_output=True, check=False) print(" πŸ”§ Running pdflatex (pass 2/2)...") result2 = subprocess.run(pdflatex_cmd, capture_output=True, check=False) temp_pdf = temp_tex_file.replace(".tex", ".pdf") if not os.path.exists(temp_pdf): print(f" ❌ FAILED: PDF not created at {temp_pdf}") try: stderr = result2.stderr.decode('utf-8', errors='replace') except: stderr = str(result2.stderr) log_file = temp_tex_file.replace(".tex", ".log") if os.path.exists(log_file): print(f" πŸ“‹ Checking LaTeX log file: {log_file}") try: with open(log_file, 'r', encoding='utf-8', errors='replace') as f: log_content = f.read() error_lines = [line for line in log_content.split('\n') if '!' in line] if error_lines: print(f" ❌ LaTeX Errors found ({len(error_lines)} lines):") for err_line in error_lines[:10]: print(f" {err_line}") stderr += "\n\nLaTeX Errors:\n" + "\n".join(error_lines[:10]) except Exception as log_err: print(f" ⚠️ Could not read log file: {log_err}") raise Exception(f"pdflatex failed to create PDF. Error: {stderr[:1000]}") print(f" βœ… SUCCESS: PDF compiled at {temp_pdf}") # Move output PDF to final filename if os.path.exists(filename): os.remove(filename) os.rename(temp_pdf, filename) print(f" πŸ“¦ Moved to final location: {filename}") # Clean up temporary files print("\n[CLEANUP] Removing temporary files...") cleaned_count = 0 for ext in [".md", ".tex", ".aux", ".log", ".out"]: temp_file = base_name + ext if os.path.exists(temp_file): os.remove(temp_file) cleaned_count += 1 for prefix in ["_input", "_temp"]: temp_file = base_name + prefix + ext if os.path.exists(temp_file): os.remove(temp_file) cleaned_count += 1 print(f" 🧹 Cleaned up {cleaned_count} temporary files") print("\n" + "="*60) print("βœ… PDF CONVERSION COMPLETED SUCCESSFULLY") print(f"πŸ“„ Output file: {filename}") print("="*60 + "\n") return filename except subprocess.CalledProcessError as e: print(f"\n❌ SUBPROCESS ERROR: {e}") print(f" STDOUT: {e.stdout}") print(f" STDERR: {e.stderr}") print("="*60 + "\n") raise Exception(f"PDF conversion failed: {e.stderr}") except FileNotFoundError as e: print(f"\n❌ FILE NOT FOUND ERROR: {e}") print("="*60) print("⚠️ REQUIRED TOOLS MISSING") print("Please install the following:") print(" β€’ pandoc") print(" β€’ texlive (or MiKTeX on Windows)") print(" β€’ texlive-latex-extra (for extarticle class)") print("="*60 + "\n") raise Exception( "Pandoc or pdflatex not found. Please install:\n" " - pandoc\n" " - texlive (or MiKTeX on Windows)\n" " - texlive-latex-extra (for extarticle class)" ) except Exception as e: print(f"\n❌ UNEXPECTED ERROR: {e}") import traceback traceback.print_exc() print("="*60 + "\n") raise def compress_pdf(input_path, output_path=None, max_size=20*1024*1024): if output_path is None: base, ext = os.path.splitext(input_path) output_path = f"{base}_compressed{ext}" try: size = os.path.getsize(input_path) except Exception: return input_path if size <= max_size: print(f"ℹ️ Not compressing {input_path} ({size/1024/1024:.2f} MB <= {max_size/1024/1024} MB)") return input_path print(f"πŸ”Ž Compressing {input_path} ({size/1024/1024:.2f} MB) -> {output_path}") try: gs_cmd = [ "gs", "-sDEVICE=pdfwrite", "-dCompatibilityLevel=1.4", "-dPDFSETTINGS=/ebook", "-dNOPAUSE", "-dQUIET", "-dBATCH", f"-sOutputFile={output_path}", input_path ] subprocess.run(gs_cmd, check=True) new_size = os.path.getsize(output_path) print(f"βœ… Compression done. New size: {new_size/1024/1024:.2f} MB") if new_size <= max_size: return output_path else: print("⚠️ Compressed file still larger than threshold; returning original") return input_path except Exception as e: print("❌ Compression error:", e) return input_path def upload_to_gemini(path, display_name=None): """ Upload a file to Gemini using the NEW google-genai SDK. Uses the current active API key from client_manager. """ print(f"πŸ“€ Uploading {path} to Gemini...") try: current_client = client_manager.get_current_client() uploaded_file = current_client.files.upload(file=path) # Wait for processing to complete print(f"⏳ Waiting for file processing: {uploaded_file.name}") while uploaded_file.state.name == "PROCESSING": time.sleep(2) uploaded_file = current_client.files.get(name=uploaded_file.name) if uploaded_file.state.name == "FAILED": raise Exception(f"File processing failed: {uploaded_file.name}") print(f"βœ… Uploaded and processed: {uploaded_file.name}") return uploaded_file except Exception as e: print(f"❌ Upload failed for {path}: {e}") raise def merge_pdfs(paths, output_path): writer = PdfWriter() for p in paths: reader = PdfReader(p) for page in reader.pages: writer.add_page(page) with open(output_path, "wb") as f: writer.write(f) return output_path def gemini_generate_content(prompt_text, file_upload_obj=None, image_obj=None, model_name="gemini-2.5-pro", fallback_model="gemini-2.5-flash", fallback_model_2="gemini-2.5-flash-lite", file_path=None): """ Send prompt_text and optionally an uploaded file (or an image object/list) to the model using NEW SDK. Automatically rotates through available API keys on RESOURCE_EXHAUSTED errors. When rotating keys with file uploads, re-uploads the file with the new API key. Args: prompt_text: The prompt to send file_upload_obj: Previously uploaded file object (optional) image_obj: Image or list of images (optional) model_name: Primary model to use fallback_model: First fallback model if primary fails fallback_model_2: Second fallback model if first fallback fails file_path: Local file path (needed for re-upload when rotating keys) Returns textual response and prints progress. """ contents = [prompt_text] current_file_obj = file_upload_obj if current_file_obj: contents.append(current_file_obj) if image_obj: if isinstance(image_obj, list): for img_path in image_obj: if isinstance(img_path, str): pil_img = Image.open(img_path) contents.append(pil_img) else: contents.append(img_path) else: if isinstance(image_obj, str): pil_img = Image.open(image_obj) contents.append(pil_img) else: contents.append(image_obj) print("πŸ“‘ Sending request to Gemini (prompt length:", len(prompt_text), "chars )") # Try with all available API keys max_attempts = len(client_manager.api_keys) attempt = 0 while attempt < max_attempts: current_client = client_manager.get_current_client() current_key_num = client_manager.current_key_index + 1 # Update contents with current file object contents = [prompt_text] if current_file_obj: contents.append(current_file_obj) if image_obj: if isinstance(image_obj, list): for img_path in image_obj: if isinstance(img_path, str): pil_img = Image.open(img_path) contents.append(pil_img) else: contents.append(img_path) else: if isinstance(image_obj, str): pil_img = Image.open(image_obj) contents.append(pil_img) else: contents.append(image_obj) # Try primary model first try: print(f"πŸ”‘ Using API key #{current_key_num} with model {model_name}") response = current_client.models.generate_content( model=model_name, contents=contents ) raw_text = response.text print(f"πŸ“₯ Received response (chars): {len(raw_text)}") # Success! Reset to primary key for next request client_manager.reset_to_primary() return raw_text except Exception as e: error_str = str(e) print(f"❌ Generation failed with API key #{current_key_num} and model {model_name}: {e}") # Check if it's a RESOURCE_EXHAUSTED error if "429" in error_str or "RESOURCE_EXHAUSTED" in error_str: print(f"⚠️ Quota exhausted for API key #{current_key_num} with model {model_name}") # Try first fallback model with SAME API key print(f"⚑ Trying fallback model {fallback_model} with same API key #{current_key_num}") try: response = current_client.models.generate_content( model=fallback_model, contents=contents ) raw_text = response.text print(f"πŸ“₯ Received response (chars): {len(raw_text)}") client_manager.reset_to_primary() return raw_text except Exception as e_fallback: error_fallback_str = str(e_fallback) print(f"❌ Fallback model {fallback_model} also failed: {e_fallback}") # Check if first fallback also exhausted if "429" in error_fallback_str or "RESOURCE_EXHAUSTED" in error_fallback_str: print(f"⚠️ First fallback model also exhausted for API key #{current_key_num}") # Try second fallback model with SAME API key print(f"⚑ Trying second fallback model {fallback_model_2} with same API key #{current_key_num}") try: response = current_client.models.generate_content( model=fallback_model_2, contents=contents ) raw_text = response.text print(f"πŸ“₯ Received response (chars): {len(raw_text)}") client_manager.reset_to_primary() return raw_text except Exception as e_fallback_2: error_fallback_2_str = str(e_fallback_2) print(f"❌ Second fallback model {fallback_model_2} also failed: {e_fallback_2}") # Check if second fallback also exhausted if "429" in error_fallback_2_str or "RESOURCE_EXHAUSTED" in error_fallback_2_str: print(f"⚠️ All 3 models exhausted for API key #{current_key_num}") # Now try next API key if available if attempt < max_attempts - 1: # Check if we have file uploads and can re-upload if file_upload_obj and file_path: print(f"πŸ”„ Rotating to next API key and re-uploading file...") client_manager.rotate_to_next_key() # Re-upload file with new API key try: print(f"πŸ“€ Re-uploading file with API key #{client_manager.current_key_index + 1}...") current_file_obj = upload_to_gemini(file_path) print(f"βœ… File re-uploaded successfully") except Exception as upload_error: print(f"❌ Failed to re-upload file: {upload_error}") raise Exception(f"Failed to re-upload file with new API key: {upload_error}") attempt += 1 print(f"πŸ”„ Retrying with next API key (attempt {attempt + 1}/{max_attempts})...") continue elif file_upload_obj and not file_path: print("⚠️ WARNING: Cannot rotate API keys - file_path not provided for re-upload!") print(" To enable API key rotation with file uploads, pass file_path parameter.") raise Exception(f"All 3 models exhausted for API key #{current_key_num}. Cannot rotate without file_path.") else: # No file uploads, safe to rotate client_manager.rotate_to_next_key() attempt += 1 print(f"πŸ”„ Trying next API key (attempt {attempt + 1}/{max_attempts})...") continue else: raise Exception(f"All {max_attempts} API key(s) exhausted with all 3 models.") else: # Second fallback failed with different error raise Exception(f"Second fallback model failed: {e_fallback_2}") else: # First fallback failed with different error raise Exception(f"First fallback model failed: {e_fallback}") elif "403" in error_str or "PERMISSION_DENIED" in error_str: # This happens when trying to access a file uploaded with a different API key print(f"⚠️ Permission denied - likely due to file uploaded with different API key") # Try to re-upload if we have the file path if file_path and attempt < max_attempts - 1: print(f"πŸ”„ Attempting to re-upload file with next API key...") client_manager.rotate_to_next_key() try: print(f"πŸ“€ Re-uploading file with API key #{client_manager.current_key_index + 1}...") current_file_obj = upload_to_gemini(file_path) print(f"βœ… File re-uploaded successfully") attempt += 1 print(f"πŸ”„ Retrying with next API key (attempt {attempt + 1}/{max_attempts})...") continue except Exception as upload_error: print(f"❌ Failed to re-upload file: {upload_error}") raise Exception(f"Failed to re-upload file with new API key: {upload_error}") else: raise Exception(f"File access denied. Cannot re-upload without file_path. Error: {e}") else: # Other error - try fallback models with same key print(f"⚑ Trying fallback model {fallback_model} with same API key #{current_key_num}") try: response = current_client.models.generate_content( model=fallback_model, contents=contents ) raw_text = response.text print(f"πŸ“₯ Received response (chars): {len(raw_text)}") client_manager.reset_to_primary() return raw_text except Exception as e2: print(f"❌ First fallback also failed: {e2}") # Try second fallback print(f"⚑ Trying second fallback model {fallback_model_2} with same API key #{current_key_num}") try: response = current_client.models.generate_content( model=fallback_model_2, contents=contents ) raw_text = response.text print(f"πŸ“₯ Received response (chars): {len(raw_text)}") client_manager.reset_to_primary() return raw_text except Exception as e3: print(f"❌ Second fallback also failed: {e3}") # If we have more keys, try them if attempt < max_attempts - 1: if file_upload_obj and file_path: print(f"πŸ”„ Rotating to next API key and re-uploading file...") client_manager.rotate_to_next_key() try: print(f"πŸ“€ Re-uploading file with API key #{client_manager.current_key_index + 1}...") current_file_obj = upload_to_gemini(file_path) print(f"βœ… File re-uploaded successfully") except Exception as upload_error: print(f"❌ Failed to re-upload file: {upload_error}") raise Exception(f"Failed to re-upload file with new API key: {upload_error}") attempt += 1 print(f"πŸ”„ Retrying with next API key (attempt {attempt + 1}/{max_attempts})...") continue elif file_upload_obj and not file_path: raise Exception(f"All models failed. Cannot rotate keys without file_path. Last error: {e3}") else: client_manager.rotate_to_next_key() attempt += 1 print(f"πŸ”„ Trying next API key (attempt {attempt + 1}/{max_attempts})...") continue else: raise Exception(f"All attempts failed. Last error: {e3}") # If we exhausted all attempts raise Exception(f"❌ All {max_attempts} API key(s) exhausted. Please check your quota or try again later.") # ---------------- PARSERS ---------------- def extract_question_ids_from_qpms(text: str): """Extract question IDs from QP+MS transcript.""" print("πŸ”Ž Extracting question IDs from QP+MS transcript using regex...") clean_text = text.replace("\u00A0", " ").replace("\t", " ") primary_matches = re.findall(r"^\s*Question\s*[:\s]\s*([\dA-Za-z.()]+)", clean_text, re.MULTILINE) if primary_matches: print(f"βœ… Extracted {len(primary_matches)} question IDs from explicit 'Question X' lines.") print("IDs:", primary_matches) return primary_matches fallback_matches = re.findall(r"^\s*(\d+(?:[.)]|\([a-zA-Z0-9]+\))?[a-zA-Z0-9]*)", clean_text, re.MULTILINE) if fallback_matches: print(f"βœ… Extracted {len(fallback_matches)} question IDs (fallback numbered lists).") print("IDs:", fallback_matches) else: print("⚠️ No question IDs extracted; will send NA placeholder.") return fallback_matches def build_as_cot_prompt_with_expected_ids(expected_ids, qpms_text=None): """ Construct the AS transcription prompt injecting the expected IDs block and graph detection instructions, modifying it to include a Chain-of-Thought (CoT) section using a tag, and requiring mathematical expressions to be enclosed in LaTeX dollar delimiters ($...$). Includes explicit rules for interpreting NA-like answers and no-response situations. """ if not expected_ids: ids_block = "{NA}" else: ids_block = "{\n" + "\n".join(expected_ids) + "\n}" qpms_section = "" if qpms_text is not None: qpms_section = ( "\nYou are also provided with the full transcript of the Question Paper and Markscheme (QP+MS) below." "\nUse it primarily to resolve ambiguous handwriting and to confirm expected answers when needed." "\n--- BEGIN QP+MS TRANSCRIPT ---\n" f"{qpms_text.strip()}\n" "--- END QP+MS TRANSCRIPT ---\n" ) prompt = f"""You are a high-quality handwritten transcription assistant, performing transcription with a Chain-of-Thought process. INPUT: This PDF contains a student's handwritten answer sheet. {qpms_section} TASK: 1. **THINKING:** Before transcribing each answer, document your thought process inside a **** tag. - Identify the question ID. If inferred, note why. - Detail any ambiguities (unclear numbers, symbols, or structures). - Explain how ambiguities were resolved, including whether the QP+MS transcript was consulted. - If QP+MS was consulted but you chose not to change the transcription, state this. - If the initial question label was incorrect (e.g., 2.a vs 2.b), correct it and briefly explain the reasoning in . *Example Thinking:* - Found Question 3(a). - The term could be '$2x$' or '21x'. - Markscheme uses '$21x$', but handwriting matches '$2x$'. - Decision: transcribe '$2x$'. 2. **TRANSCRIPTION:** Transcribe the student's answers directly and faithfully. - Assign each answer to a labelled question ID when present. - For unlabeled answers, segment logically and mark inferred IDs as "**INFERRED: **". - **Mathematical expressions and standalone variables must appear inside LaTeX dollar delimiters ($...$).** - If a diagram/graph is omitted, write **[Graph omitted]**. - If handwriting is unreadable: **[illegible]**. **ANSWER-INTERPRETATION RULES:** - If the student writes β€œNA”, β€œN/A”, β€œNot Applicable”, or clear equivalents β†’ record exactly as **NA**. - If the student leaves the space blank, crosses it out, makes no meaningful attempt, or provides no answer β†’ record **[No response]**. Ensure deterministic formatting so subsequent models can grade directly from this aligned format. Expected questions (if missing, write NA): {ids_block} ----------------------- OUTPUT FORMAT: ... Question AS: ... Question AS: ... ==== GRAPH FOUND ANSWERS ==== Graph found in: - Answer β†’ Page (one per line) ==== END GRAPH FOUND ====""" return prompt def extract_graph_questions_from_ms(text: str): """Extract graph questions and page numbers from MS transcript.""" clean_text = text.replace("\u00A0", " ").replace("\t", " ") match = re.search(r"==== GRAPH EXPECTED QUESTIONS ====\s*(.*?)\s*==== END GRAPH EXPECTED ====", clean_text, re.S) graph_dict = {} if match: block = match.group(1) for line in block.splitlines(): line = line.strip() if line.startswith("- Question"): q_match = re.match(r"- Question\s+([\dA-Za-z.()]+)\s*β†’\s*Page\s*(\d+)", line) if q_match: q_id, page = q_match.groups() graph_dict[q_id] = int(page) return graph_dict def extract_graph_answers_from_as(text: str): """Extract graph answers and page numbers from AS transcript.""" clean_text = text.replace("\u00A0", " ").replace("\t", " ") block = re.search(r"==== GRAPH FOUND ANSWERS ====\s*(.*?)\s*==== END GRAPH FOUND ====", clean_text, re.S) graph_dict = {} if block: for line in block.group(1).splitlines(): line = line.strip() if line.startswith("- Answer"): match = re.match(r"- Answer\s+([\dA-Za-z.()]+)\s*β†’\s*Page\s*(\d+)", line) if match: ans_id, page = match.groups() graph_dict[ans_id] = int(page) return graph_dict def extract_marks_from_grading(grading_text): """ Parse the grading markdown and extract marks per question from the Awarded column only. """ print("πŸ”Ž Extracting awarded marks from grading output...") grading_json = {"grading": []} question_blocks = re.split(r"###\s*Question\s+", grading_text) for block in question_blocks[1:]: first_line = block.strip().splitlines()[0].strip() if block.strip().splitlines() else "" q_id_match = re.match(r"([0-9]+(?:[a-zA-Z]|\([^)]+\)|(?:\.[a-zA-Z0-9]+))*)", first_line) if not q_id_match: q_id = first_line.split()[0] if first_line else "" else: q_id = q_id_match.group(1).strip() # Extract marks only from the "Awarded" column (4th column in the table) awarded = [] lines = block.split('\n') for line in lines: if '|' in line: parts = [p.strip() for p in line.split('|')] # Check if this is a data row (not header or separator) and has at least 5 columns if len(parts) >= 5 and not parts[1].startswith('-'): awarded_col = parts[4] # 4th column (index 4 because of leading empty from split) # Extract mark codes from the awarded column marks = re.findall(r"\b([MABCR]\d+|[MABCR]0)\b", awarded_col) awarded.extend(marks) grading_json["grading"].append({ "question": q_id, "marks_awarded": awarded }) print("βœ… Extracted grading marks for", len(grading_json["grading"]), "question blocks.") print(json.dumps(grading_json, indent=2)) return grading_json def check_and_correct_total_marks(grading_text): """ Verifies the total marks in the Examiner's Summary Report against the sum of individual question marks. Corrects if discrepancy found. Args: grading_text (str): The full grading markdown text Returns: tuple: (corrected_text, calculated_awarded, calculated_possible, was_corrected) """ print("\n" + "="*60) print("πŸ” VERIFYING TOTAL MARKS IN SUMMARY REPORT") print("="*60) question_marks = {} calculated_total_awarded = 0 calculated_total_possible = 0 # Updated pattern to match BOTH formats: # ### Question <1.a> (with angle brackets) # ### Question 1.a (without angle brackets) # The ? makes the closing bracket optional question_block_pattern = re.compile( r"### Question\s*?\s*[\s\S]*?\*\*Total:\s*(\d+)/(\d+)\*\*", re.DOTALL | re.IGNORECASE ) matches = question_block_pattern.finditer(grading_text) for match in matches: question_id = match.group(1).strip() awarded = int(match.group(2)) possible = int(match.group(3)) question_marks[question_id] = {'awarded': awarded, 'possible': possible} calculated_total_awarded += awarded calculated_total_possible += possible print(f"\nοΏ½ Exltracted marks from {len(question_marks)} questions:") for q_id, marks in question_marks.items(): print(f" Question {q_id}: {marks['awarded']}/{marks['possible']}") print(f"\nπŸ“ˆ Calculated totals from individual questions:") print(f" Awarded: {calculated_total_awarded}") print(f" Possible: {calculated_total_possible}") # Find the summary report section summary_report_start = grading_text.find("### Examiner's Summary Report") if summary_report_start == -1: print("⚠️ Warning: Could not find '### Examiner's Summary Report' section.") return grading_text, calculated_total_awarded, calculated_total_possible, False summary_section = grading_text[summary_report_start:] summary_total_pattern = re.compile(r"(\*\*Total:\s*)(\d+)/(\d+)(\*\*)") summary_match = summary_total_pattern.search(summary_section) original_summary_awarded = 0 original_summary_possible = 0 if summary_match: original_summary_awarded = int(summary_match.group(2)) original_summary_possible = int(summary_match.group(3)) print(f"\nπŸ“‹ Original summary report total: {original_summary_awarded}/{original_summary_possible}") else: print("⚠️ Warning: Could not find overall total in summary report.") return grading_text, calculated_total_awarded, calculated_total_possible, False # Check for discrepancies corrected_report_text = grading_text total_mismatch = False if calculated_total_awarded != original_summary_awarded: print(f"\n❌ DISCREPANCY FOUND in awarded marks!") print(f" Calculated: {calculated_total_awarded}") print(f" Reported: {original_summary_awarded}") total_mismatch = True if calculated_total_possible != original_summary_possible: print(f"\n❌ DISCREPANCY FOUND in possible marks!") print(f" Calculated: {calculated_total_possible}") print(f" Reported: {original_summary_possible}") total_mismatch = True if total_mismatch: print(f"\nπŸ”§ CORRECTING summary total:") print(f" FROM: {original_summary_awarded}/{original_summary_possible}") print(f" TO: {calculated_total_awarded}/{calculated_total_possible}") # Correct only in the summary section corrected_summary_section = re.sub( summary_total_pattern, rf"\g<1>{calculated_total_awarded}/{calculated_total_possible}\g<4>", summary_section, count=1 ) corrected_report_text = grading_text[:summary_report_start] + corrected_summary_section print("βœ… Total marks corrected successfully!") else: print("\nβœ… Total marks are CORRECT - no correction needed!") print("="*60 + "\n") return corrected_report_text, calculated_total_awarded, calculated_total_possible, total_mismatch # ---------------- MAPPING/IMPRINT HELPERS ---------------- def ask_gemini_for_mapping_batch(image_paths, grading_json, expected_ids=None, rows=GRID_ROWS, cols=GRID_COLS): """ Send multiple page images together to Gemini for batch mapping processing. """ ids_block = "{NA}" if expected_ids: ids_block = "{\n" + "\n".join(expected_ids) + "\n}" prompt = f"""You are an exam marker. Your role is to identify where each question begins on each page. The pages are divided into a {rows} x {cols} grid. Each cell has a RUNNING NUMBER label. For each question in the grading JSON, return the cell NUMBER where the FIRST STEP of that question begins. ⚠ IMPORTANT RULES: - Do not place marks inside another question's answer area. - Prefer placing the marks in a BLANK cell immediately to the RIGHT of the answer step. If no blank cell is available to the right, then place in a blank cell to the LEFT. - Never place marks above or below the answer. - Each question should have unique cell number - If a question serial number is visible in the answer image, you must mandatorily identify the corresponding question using the grading JSON. IMPORTANT: For your help i have provided u questions that u can expect in the images: {ids_block} Return JSON only, like: [{{"page": 1, "question": "1(a)", "cell_number": 15}}, ...] Grading JSON: {json.dumps(grading_json, indent=2)}""" images = [Image.open(p) for p in image_paths] print(f"πŸ“‘ Sending batch mapping request for {len(image_paths)} pages to Gemini...") try: contents = [prompt] + images response = client.models.generate_content( model="gemini-2.5-flash", contents=contents ) raw_text = response.text except: print("⚠️ Trying fallback model for mapping...") contents = [prompt] + images response = client.models.generate_content( model="gemini-2.5-flash-preview-09-2025", contents=contents ) raw_text = response.text print("πŸ“₯ Batch mapping response (chars):", len(raw_text)) print("πŸ”Ž Gemini raw batch output:") print(raw_text) try: match = re.search(r'(\[.*\])', raw_text, re.DOTALL) if match: mapping = json.loads(match.group(1)) print(f"βœ… Parsed Gemini batch mapping for {len(image_paths)} pages") return mapping else: print("❌ Failed to find JSON array in response") return [] except Exception as e: print(f"❌ Failed to parse Gemini JSON mapping: {e}") return [] def normalize_question_id(qid): """ Normalize question ID to a standard format for matching. Converts formats like: - "1(a)" -> "1.a" - "2(c).i" -> "2.c.i" - "3.d.ii" -> "3.d.ii" (already normalized) """ if not qid: return qid # Replace parentheses format: 1(a) -> 1.a qid = re.sub(r'(\d+)\(([a-zA-Z])\)', r'\1.\2', qid) # Replace format like 2(c).i -> 2.c.i qid = re.sub(r'(\d+)\(([a-zA-Z]+)\)\.', r'\1.\2.', qid) return qid def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, expected_ids=None, rows=GRID_ROWS, cols=GRID_COLS): """ Convert PDF to images, create grid-numbered images for batch sending to Gemini, then annotate and produce imprinted PDF. """ print("πŸ“„ Converting answer PDF to images for imprinting...") pages = convert_from_path(pdf_path, dpi=100) annotated_page_paths = [] temp_grid_images = [] for p_index, page in enumerate(pages): img = page.convert("RGB") w, h = img.size cell_w, cell_h = w / cols, h / rows draw = ImageDraw.Draw(img) try: num_font = ImageFont.truetype("arial.ttf", 20) except Exception: num_font = ImageFont.load_default() cell_num = 1 for r in range(rows): for c in range(cols): x = int(c * cell_w + cell_w / 2) y = int(r * cell_h + cell_h / 2) text = str(cell_num) bbox = draw.textbbox((0, 0), text, font=num_font) tw = bbox[2] - bbox[0] th = bbox[3] - bbox[1] draw.text((x - tw/2, y - th/2), text, fill="black", font=num_font) cell_num += 1 temp_path = f"page_{p_index+1}_grid.png" img.save(temp_path, "PNG") temp_grid_images.append(temp_path) print("πŸ›° Created grid image:", temp_path) print("πŸ“‘ Sending page images to Gemini in batches for mapping...") batch_size = 10 all_mappings = [] for start in range(0, len(temp_grid_images), batch_size): batch_paths = temp_grid_images[start:start+batch_size] batch_mapping = ask_gemini_for_mapping_batch(batch_paths, grading_json, expected_ids, rows, cols) all_mappings.extend(batch_mapping) print(f"βœ… Processed batch {start//batch_size + 1}: pages {start+1}-{start+len(batch_paths)}") print("πŸ–Š Annotating pages with marks...") for p_index, page in enumerate(pages): page_num = p_index + 1 page_img = page.convert("RGB") img_cv = np.array(page_img) img_cv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2BGR) h, w, _ = img_cv.shape cell_w_px, cell_h_px = w / cols, h / rows page_mappings = [m for m in all_mappings if m.get("page") == page_num] for item in page_mappings: qid = item.get("question") cell_number = item.get("cell_number") if qid is None or cell_number is None: continue # Normalize the question ID from Gemini mapping normalized_qid = normalize_question_id(qid) # Try exact match first with normalized ID marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", []) if g["question"] == normalized_qid), []) # If no match, try case-insensitive match if not marks_list: marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", []) if g["question"].lower() == normalized_qid.lower()), []) # If still no match, try with original qid if not marks_list: marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", []) if g["question"] == qid), []) marks_text = ",".join(marks_list) if marks_list else "?" if marks_text == "?": print(f"⚠️ No marks found for question '{qid}' (normalized: '{normalized_qid}') on page {page_num}") row = (cell_number - 1) // cols col = (cell_number - 1) % cols x_c = int((col + 1) * cell_w_px - cell_w_px / 4) y_c = int((row + 0.5) * cell_h_px) font_scale = max(1.0, min(2.0, cell_h_px / 40.0)) thickness = max(2, int(font_scale * 2)) cv2.putText(img_cv, marks_text, (x_c, y_c), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 255), thickness, cv2.LINE_AA) print(f"πŸ–Š Marks annotated for page {page_num}, question {qid}: {marks_text}") annotated_path = f"annotated_page_{page_num}.png" cv2.imwrite(annotated_path, img_cv) annotated_page_paths.append(annotated_path) print("βœ… Annotated page saved:", annotated_path) print("πŸ“‘ Merging annotated pages into final PDF...") with open(output_pdf, "wb") as f: f.write(img2pdf.convert(annotated_page_paths)) compressed = compress_pdf(output_pdf) print("πŸ“‘ Imprinted PDF saved to:", compressed) return compressed def extract_pdf_pages_as_images(pdf_path, page_numbers, prefix): """ Extracts unique pages (1-based) from a PDF as images, saves as PNG, returns list of file paths. Handles cases where requested pages don't exist in the PDF. """ if not page_numbers: print(f"⚠️ No page numbers provided for extraction") return [] unique_pages = sorted(set(page_numbers)) # First, get the total page count to validate requested pages try: from PyPDF2 import PdfReader reader = PdfReader(pdf_path) total_pages = len(reader.pages) print(f"πŸ“„ PDF has {total_pages} total pages") # Filter out invalid page numbers valid_pages = [p for p in unique_pages if 1 <= p <= total_pages] invalid_pages = [p for p in unique_pages if p not in valid_pages] if invalid_pages: print(f"⚠️ Skipping invalid page numbers (out of range): {invalid_pages}") if not valid_pages: print(f"❌ No valid pages to extract from {pdf_path}") return [] unique_pages = valid_pages except Exception as e: print(f"⚠️ Could not validate page numbers: {e}. Proceeding with extraction...") # Extract the pages try: images = convert_from_path(pdf_path, dpi=200, first_page=min(unique_pages), last_page=max(unique_pages)) except Exception as e: print(f"❌ Failed to convert PDF pages to images: {e}") return [] out_paths = [] for idx, page_num in enumerate(unique_pages): img_idx = page_num - min(unique_pages) # Bounds check to prevent index errors if img_idx >= len(images): print(f"⚠️ Page {page_num} not found in extracted images (index {img_idx} >= {len(images)}). Skipping...") continue try: img = images[img_idx] out_path = f"{prefix}_page_{page_num}.png" img.save(out_path, "PNG") print(f"πŸ“€ Extracted graph page {page_num} from {pdf_path} as {out_path}") out_paths.append(out_path) except Exception as e: print(f"❌ Failed to save page {page_num}: {e}") continue return out_paths # ---------------- PIPELINE ---------------- def align_and_grade_pipeline(qp_path, ms_path, ans_path, subject="Maths", imprint=False, run_timestamp=None): """ Final pipeline with graph-aware grading logic using NEW SDK. Args: qp_path: Path to Question Paper PDF ms_path: Path to Markscheme PDF ans_path: Path to Answer Sheet PDF subject: Subject name (Maths or Science) imprint: Whether to generate imprinted PDF run_timestamp: Unix timestamp for organizing files in Supabase """ try: print("πŸ” Starting pipeline...") qp_path = compress_pdf(qp_path) ms_path = compress_pdf(ms_path) ans_path = compress_pdf(ans_path) merged_qpms_path = os.path.splitext(qp_path)[0] + "_merged_qp_ms.pdf" merge_pdfs([qp_path, ms_path], merged_qpms_path) print("πŸ“Ž Merged QP + MS ->", merged_qpms_path) print("πŸ”Ό Uploading files to Gemini...") merged_uploaded = upload_to_gemini(merged_qpms_path) ans_uploaded = upload_to_gemini(ans_path) print("βœ… Upload complete.") print("1.i) Transcribing QP+MS (questions first, then full markscheme, with graph detection)...") qpms_prompt = QP_MS_TRANSCRIPTION_PROMPT["content"] + "\nAt the end, also list all questions in the markscheme where a graph is expected, in the format:\nGraph expected in:\n- Question β†’ Page \n(One per line, after ==== MARKSCHEME END ====)" qpms_text = gemini_generate_content(qpms_prompt, file_upload_obj=merged_uploaded, model_name="gemini-2.5-flash", fallback_model="gemini-2.5-flash-preview-09-2025", fallback_model_2="gemini-2.5-flash-lite", file_path=merged_qpms_path) print("πŸ“„ QP+MS transcription received. Saving debug file: debug_qpms_transcript.txt") with open("debug_qpms_transcript.txt", "w", encoding="utf-8") as f: f.write(qpms_text) ms_graph_mapping = extract_graph_questions_from_ms(qpms_text) print("πŸ–ΌοΈ Graph-expected questions in MS:", ms_graph_mapping) ms_graph_pages = list(ms_graph_mapping.values()) ms_graph_images = [] if ms_graph_pages: ms_graph_images = extract_pdf_pages_as_images(merged_qpms_path, ms_graph_pages, prefix="qpms_graph") extracted_ids = extract_question_ids_from_qpms(qpms_text) if not extracted_ids: extracted_ids = ["NA"] print("1.ii) Building AS transcription prompt with expected question IDs and graph detection, sending to Gemini...") as_prompt = build_as_cot_prompt_with_expected_ids(extracted_ids, qpms_text) + "\nAt the end, also list all answers where a graph is found, in the format:\nGraph found in:\n- Answer β†’ Page \n(One per line, after all answers)" as_text = gemini_generate_content(as_prompt, file_upload_obj=ans_uploaded, model_name="gemini-2.5-flash", fallback_model="gemini-2.5-flash-preview-09-2025", fallback_model_2="gemini-2.5-flash-lite", file_path=ans_path) print("πŸ“ AS transcription received. Saving debug file: debug_as_transcript.txt") with open("debug_as_transcript.txt", "w", encoding="utf-8") as f: f.write(as_text) as_graph_mapping = extract_graph_answers_from_as(as_text) print("πŸ–ΌοΈ Graph-attempted answers in AS:", as_graph_mapping) as_graph_pages = list(as_graph_mapping.values()) as_graph_images = [] if as_graph_pages: as_graph_images = extract_pdf_pages_as_images(ans_path, as_graph_pages, prefix="as_graph") print("2) Preparing grading input and sending to Gemini for grading...") grading_input = ( "=== QP+MS TRANSCRIPT BEGIN ===\n" + qpms_text + "\n=== QP+MS TRANSCRIPT END ===\n\n" + "=== ANSWER SHEET TRANSCRIPT BEGIN ===\n" + as_text + "\n=== ANSWER SHEET TRANSCRIPT END ===\n" ) if ms_graph_images or as_graph_images: graph_note = "\n\n---\nSome questions require graphs. I've attached the relevant graph pages from QP+MS and from the Answer Sheet. Use them as visual context when grading.\n---\n" grading_input += graph_note grading_prompt_obj = get_grading_prompt(subject.lower()) grading_prompt_system = grading_prompt_obj["content"] grading_images = ms_graph_images + as_graph_images grading_text = gemini_generate_content(grading_prompt_system + "\n\nPlease grade the following transcripts:\n" + grading_input, image_obj=grading_images if grading_images else None, model_name="gemini-2.5-pro", fallback_model="gemini-2.5-flash") print("🧾 Grading output received. Saving debug file: debug_grading.md") with open("debug_grading.md", "w", encoding="utf-8") as f: f.write(grading_text) # Verify and correct total marks if needed grading_text, calc_awarded, calc_possible, was_corrected = check_and_correct_total_marks(grading_text) if was_corrected: print("πŸ“ Saving corrected grading to debug file: debug_grading_corrected.md") with open("debug_grading_corrected.md", "w", encoding="utf-8") as f: f.write(grading_text) base_name = os.path.splitext(os.path.basename(ans_path))[0] grading_pdf_path = save_as_pdf(grading_text, f"{base_name}_graded.pdf") print("πŸ“„ Grading PDF saved:", grading_pdf_path) grading_json = extract_marks_from_grading(grading_text) with open("debug_grading_json.json", "w", encoding="utf-8") as f: json.dump(grading_json, f, indent=2, ensure_ascii=False) print("πŸ”§ Grading marks extraction complete.") imprinted_pdf_path = None if imprint: print("✍ Imprint option enabled. Starting imprinting process...") imprinted_pdf_path = f"{base_name}_imprinted.pdf" imprinted_pdf_path = imprint_marks_using_mapping(ans_path, grading_json, imprinted_pdf_path, extracted_ids) print("βœ… Imprinting finished. Imprinted PDF at:", imprinted_pdf_path) # Upload output files to Supabase (using same timestamp as input files) output_urls = { "graded_pdf_url": None, "imprinted_pdf_url": None } if supabase_client: print("\nπŸ“€ Uploading output files to Supabase...") if grading_pdf_path: output_urls["graded_pdf_url"] = upload_file_to_supabase(grading_pdf_path, "graded", run_timestamp) if imprinted_pdf_path: output_urls["imprinted_pdf_url"] = upload_file_to_supabase(imprinted_pdf_path, "imprinted", run_timestamp) print("🏁 Pipeline finished successfully.") return qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path, output_urls except Exception as e: print("❌ Pipeline error:", e) import traceback traceback.print_exc() return f"❌ Error: {e}", None, None, None, None, {} # ---------------- GRADIO UI ---------------- with gr.Blocks(title="AI Grading (Pandoc + pdflatex)") as demo: gr.Markdown("## πŸ“˜ AI Grading β€” Using Pandoc + pdflatex for PDF Generation") gr.Markdown("**βœ… Now using Pandoc with pdflatex for professional-quality PDF outputs!**") if supabase_client: gr.Markdown("**☁️ Supabase Storage: Enabled** - All files will be uploaded to cloud storage") else: gr.Markdown("**⚠️ Supabase Storage: Disabled** - Files will only be processed locally") with gr.Row(): qp_file = gr.File(label="πŸ“„ Upload Question Paper (PDF)") ms_file = gr.File(label="πŸ“„ Upload Markscheme (PDF)") ans_file = gr.File(label="πŸ“ Upload Student Answer Sheet (PDF)") with gr.Row(): subject_dropdown = gr.Dropdown( choices=["Maths", "Science", "Economics"], value="Maths", label="πŸ“š Subject", info="Select the subject to apply appropriate grading guidelines" ) imprint_toggle = gr.Checkbox(label="✍ Imprint Marks on Student Answer Sheet", value=False) run_button = gr.Button("πŸš€ Run Pipeline") # File URLs section (only shown if Supabase is enabled) if supabase_client: with gr.Accordion("☁️ Uploaded File URLs", open=False): file_urls_box = gr.Textbox(label="Cloud Storage URLs", lines=8, interactive=False) with gr.Row(): qpms_box = gr.Textbox(label="πŸ“‘ QP+MS Transcript", lines=12) as_box = gr.Textbox(label="πŸ“ AS Transcript", lines=12) grading_output_box = gr.Textbox(label="🧾 Grading (Markdown)", lines=20) grading_pdf_file = gr.File(label="πŸ“₯ Download Grading PDF") imprint_pdf_file = gr.File(label="πŸ“₯ Download Imprinted PDF (Optional)") def run_pipeline(qp_file_obj, ms_file_obj, ans_file_obj, subject_choice, imprint_flag): if not qp_file_obj or not ms_file_obj or not ans_file_obj: error_msg = "❌ Please upload all three files" if supabase_client: return error_msg, "", "", None, None, "" else: return error_msg, "", "", None, None # Process and upload input files (generates shared timestamp) qp_path, ms_path, ans_path, input_urls, run_timestamp = process_and_upload_input_files( qp_file_obj, ms_file_obj, ans_file_obj ) # Run the grading pipeline (pass timestamp to keep all files together) qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path, output_urls = align_and_grade_pipeline( qp_path, ms_path, ans_path, subject=subject_choice, imprint=imprint_flag, run_timestamp=run_timestamp ) # Build URLs summary urls_summary = "" if supabase_client: urls_summary = f"πŸ“€ UPLOADED FILES (Timestamp: {run_timestamp}):\n\n" urls_summary += "INPUT FILES:\n" if input_urls.get("qp_url"): urls_summary += f"β€’ Question Paper: {input_urls['qp_url']}\n" if input_urls.get("ms_url"): urls_summary += f"β€’ Markscheme: {input_urls['ms_url']}\n" if input_urls.get("ans_url"): urls_summary += f"β€’ Answer Sheet: {input_urls['ans_url']}\n" urls_summary += "\nOUTPUT FILES:\n" if output_urls.get("graded_pdf_url"): urls_summary += f"β€’ Graded PDF: {output_urls['graded_pdf_url']}\n" if output_urls.get("imprinted_pdf_url"): urls_summary += f"β€’ Imprinted PDF: {output_urls['imprinted_pdf_url']}\n" urls_summary += f"\nπŸ“ All files stored in: examfiles/{run_timestamp}/\n" if not any(input_urls.values()) and not any(output_urls.values()): urls_summary += "\n⚠️ No files were uploaded to Supabase" if supabase_client: return ( qpms_text or "", as_text or "", grading_text or "", grading_pdf_path, imprinted_pdf_path, urls_summary ) else: return ( qpms_text or "", as_text or "", grading_text or "", grading_pdf_path, imprinted_pdf_path ) # Set up the click handler based on whether Supabase is enabled if supabase_client: run_button.click( fn=run_pipeline, inputs=[qp_file, ms_file, ans_file, subject_dropdown, imprint_toggle], outputs=[qpms_box, as_box, grading_output_box, grading_pdf_file, imprint_pdf_file, file_urls_box] ) else: run_button.click( fn=run_pipeline, inputs=[qp_file, ms_file, ans_file, subject_dropdown, imprint_toggle], outputs=[qpms_box, as_box, grading_output_box, grading_pdf_file, imprint_pdf_file] ) if __name__ == "__main__": demo.launch()