Spaces:
Running
Running
| import os | |
| import re | |
| import json | |
| import subprocess | |
| import time | |
| import shutil | |
| import img2pdf | |
| import gradio as gr | |
| from google import genai # NEW SDK | |
| from pdf2image import convert_from_path | |
| from PIL import Image, ImageDraw, ImageFont | |
| import cv2 | |
| import numpy as np | |
| from PyPDF2 import PdfReader, PdfWriter | |
| from prompts import QP_MS_TRANSCRIPTION_PROMPT, get_grading_prompt | |
| from supabase import create_client, Client | |
| # ---------------- CONFIG ---------------- | |
| # Multi-API Key Configuration for handling RESOURCE_EXHAUSTED errors | |
| class GeminiClientManager: | |
| """Manages multiple Gemini API keys with automatic rotation on quota exhaustion.""" | |
| def __init__(self): | |
| # Load all three API keys from environment | |
| self.api_keys = [ | |
| os.getenv("GEMINI_API_KEY_1"), | |
| os.getenv("GEMINI_API_KEY_2"), | |
| os.getenv("GEMINI_API_KEY_3") | |
| ] | |
| # Filter out None values | |
| self.api_keys = [key for key in self.api_keys if key] | |
| if not self.api_keys: | |
| raise ValueError("β No API keys found! Please set at least GEMINI_API_KEY_1") | |
| print(f"β Loaded {len(self.api_keys)} Gemini API key(s)") | |
| # Current key index (0 = primary) | |
| self.current_key_index = 0 | |
| # Create clients for all keys | |
| self.clients = [genai.Client(api_key=key) for key in self.api_keys] | |
| def get_current_client(self): | |
| """Get the currently active client.""" | |
| return self.clients[self.current_key_index] | |
| def rotate_to_next_key(self): | |
| """Rotate to the next available API key.""" | |
| if len(self.api_keys) == 1: | |
| print("β οΈ Only one API key available, cannot rotate") | |
| return False | |
| old_index = self.current_key_index | |
| self.current_key_index = (self.current_key_index + 1) % len(self.api_keys) | |
| print(f"π Rotating from API key #{old_index + 1} to API key #{self.current_key_index + 1}") | |
| return True | |
| def reset_to_primary(self): | |
| """Reset to primary (first) API key.""" | |
| if self.current_key_index != 0: | |
| print(f"π Resetting to primary API key #1") | |
| self.current_key_index = 0 | |
| # Initialize the client manager | |
| client_manager = GeminiClientManager() | |
| client = client_manager.get_current_client() # For backward compatibility | |
| GRID_ROWS, GRID_COLS = 20, 14 | |
| # Supabase configuration | |
| SUPABASE_URL = os.getenv("SUPABASE_URL") | |
| SUPABASE_SERVICE_KEY = os.getenv("SUPABASE_SERVICE_KEY") | |
| SUPABASE_BUCKET = "examfiles" | |
| # Initialize Supabase client (only if credentials are available) | |
| supabase_client = None | |
| if SUPABASE_URL and SUPABASE_SERVICE_KEY: | |
| try: | |
| supabase_client = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY) | |
| print("β Supabase client initialized successfully") | |
| except Exception as e: | |
| print(f"β οΈ Supabase initialization failed: {e}") | |
| else: | |
| print("β οΈ Supabase credentials not found - file upload to storage disabled") | |
| # ---------------- PROMPTS ---------------- | |
| # Prompts are now imported from prompts.py | |
| # ---------------- SUPABASE HELPERS ---------------- | |
| def upload_file_to_supabase(local_path, file_type="unknown", timestamp=None): | |
| """ | |
| Upload a file to Supabase Storage. | |
| Args: | |
| local_path (str): Local file path | |
| file_type (str): Type of file (qp, ms, ans, graded, imprinted) | |
| timestamp (str): Unix timestamp for folder organization (optional) | |
| Returns: | |
| str: Public URL of uploaded file or None if upload failed | |
| """ | |
| if not supabase_client: | |
| print("β οΈ Supabase not configured - skipping upload") | |
| return None | |
| try: | |
| if timestamp is None: | |
| timestamp = str(int(time.time())) | |
| original_name = os.path.basename(local_path) | |
| # Use original filename without prefix for cleaner storage | |
| remote_path = f"{timestamp}/{original_name}" | |
| print(f"π€ Uploading {file_type} to Supabase: {remote_path}") | |
| with open(local_path, "rb") as f: | |
| supabase_client.storage.from_(SUPABASE_BUCKET).upload( | |
| remote_path, | |
| f, | |
| file_options={"upsert": "true"} | |
| ) | |
| public_url = f"{SUPABASE_URL}/storage/v1/object/public/{SUPABASE_BUCKET}/{remote_path}" | |
| print(f"β Uploaded successfully: {public_url}") | |
| return public_url | |
| except Exception as e: | |
| print(f"β Supabase upload failed for {file_type}: {e}") | |
| return None | |
| def process_and_upload_input_files(qp_file_obj, ms_file_obj, ans_file_obj): | |
| """ | |
| Process uploaded files and upload them to Supabase using a shared timestamp. | |
| Args: | |
| qp_file_obj: Gradio file object for Question Paper | |
| ms_file_obj: Gradio file object for Markscheme | |
| ans_file_obj: Gradio file object for Answer Sheet | |
| Returns: | |
| tuple: (qp_path, ms_path, ans_path, upload_urls_dict, timestamp) | |
| """ | |
| print("\n" + "="*60) | |
| print("π PROCESSING INPUT FILES") | |
| print("="*60) | |
| # Generate single timestamp for this entire run | |
| run_timestamp = str(int(time.time())) | |
| print(f"π Run timestamp: {run_timestamp}") | |
| upload_urls = { | |
| "qp_url": None, | |
| "ms_url": None, | |
| "ans_url": None | |
| } | |
| # Get local paths from Gradio file objects | |
| qp_path = qp_file_obj.name if qp_file_obj else None | |
| ms_path = ms_file_obj.name if ms_file_obj else None | |
| ans_path = ans_file_obj.name if ans_file_obj else None | |
| # Upload to Supabase if configured (all files use same timestamp) | |
| if supabase_client: | |
| if qp_path: | |
| upload_urls["qp_url"] = upload_file_to_supabase(qp_path, "qp", run_timestamp) | |
| if ms_path: | |
| upload_urls["ms_url"] = upload_file_to_supabase(ms_path, "ms", run_timestamp) | |
| if ans_path: | |
| upload_urls["ans_url"] = upload_file_to_supabase(ans_path, "ans", run_timestamp) | |
| print("="*60 + "\n") | |
| return qp_path, ms_path, ans_path, upload_urls, run_timestamp | |
| # ---------------- HELPERS ---------------- | |
| def parse_md_table(md): | |
| """Parse a Markdown table into a list of rows.""" | |
| lines = [l for l in md.split("\n") if l.strip()] | |
| if len(lines) < 3: | |
| return [] | |
| lines = lines[2:] # skip header + separator | |
| rows = [] | |
| for line in lines: | |
| parts = [c.strip() for c in line.strip("|").split("|")] | |
| # Filter out empty strings from leading/trailing pipes | |
| clean_parts = [p for p in parts if p] | |
| if clean_parts: | |
| rows.append(clean_parts) | |
| return rows | |
| def convert_html_color_spans(md_text): | |
| """Convert HTML color spans to LaTeX textcolor commands.""" | |
| pattern = r'<span\s+style="color:\s*([^"]+)">\s*(.*?)\s*</span>' | |
| def repl(m): | |
| color = m.group(1).strip() | |
| text = m.group(2) | |
| return fr'\textcolor{{{color}}}{{{text}}}' | |
| return re.sub(pattern, repl, md_text, flags=re.IGNORECASE) | |
| def cleanup_markdown_for_latex(md_text): | |
| """Clean up markdown text for better LaTeX conversion.""" | |
| # Ensure spacing between bold headers and tables | |
| md_text = re.sub(r'(\*\*Markscheme vs Student Answer\*\*)\s*(\|)', r'\1\n\n\2', md_text) | |
| # Convert common unicode math symbols to LaTeX (safety net) | |
| replacements = { | |
| 'β«': r'\int ', | |
| 'Β²': '^2', | |
| 'Β³': '^3', | |
| 'Β½': r'\frac{1}{2}', | |
| 'ΒΌ': r'\frac{1}{4}', | |
| 'β': r'\infty', | |
| 'β€': r'\leq', | |
| 'β₯': r'\geq', | |
| 'β ': r'\neq', | |
| 'Β±': r'\pm', | |
| 'Γ': r'\times', | |
| 'Γ·': r'\div', | |
| 'β': r'\sqrt', | |
| 'β': r'\sum', | |
| 'β': r'\prod', | |
| 'β': r'\partial', | |
| 'Ο': r'\pi', | |
| 'ΞΈ': r'\theta', | |
| 'Ξ±': r'\alpha', | |
| 'Ξ²': r'\beta', | |
| 'Ξ³': r'\gamma', | |
| 'Ξ΄': r'\delta', | |
| 'Ξ΅': r'\epsilon', | |
| 'Ξ»': r'\lambda', | |
| 'ΞΌ': r'\mu', | |
| 'Ο': r'\sigma', | |
| 'Ξ': r'\Delta', | |
| 'Ξ£': r'\Sigma', | |
| 'Ξ©': r'\Omega' | |
| } | |
| for char, latex in replacements.items(): | |
| md_text = md_text.replace(char, f'${latex}$') | |
| return md_text | |
| def escape_latex_special_chars(text): | |
| """Escape special LaTeX characters in text.""" | |
| replacements = { | |
| '%': r'\%', | |
| '&': r'\&', | |
| '#': r'\#', | |
| '_': r'\_', | |
| '{': r'\{', | |
| '}': r'\}', | |
| '~': r'\textasciitilde{}', | |
| '^': r'\textasciicircum{}' | |
| } | |
| # Don't escape if already in math mode or LaTeX command | |
| if '$' in text or '\\' in text: | |
| return text | |
| for char, escaped in replacements.items(): | |
| text = text.replace(char, escaped) | |
| return text | |
| def save_as_pdf(text, filename="output.pdf"): | |
| """ | |
| Convert Markdown text to PDF using Pandoc with pdflatex. | |
| Extracts the Examiner's Summary Report and places it at the top with enhanced formatting. | |
| Converts HTML color spans to LaTeX textcolor commands. | |
| Args: | |
| text (str): Markdown content to convert | |
| filename (str): Output PDF filename | |
| Returns: | |
| str: Path to the generated PDF file | |
| Raises: | |
| Exception: If Pandoc or pdflatex is not available, or conversion fails | |
| """ | |
| # Sanitize filename - replace spaces and special characters with underscores | |
| # This prevents issues with pdflatex and file operations | |
| import string | |
| valid_chars = f"-_.() {string.ascii_letters}{string.digits}" | |
| sanitized_filename = ''.join(c if c in valid_chars else '_' for c in filename) | |
| # Replace multiple spaces with single underscore | |
| sanitized_filename = re.sub(r'\s+', '_', sanitized_filename) | |
| # Remove double underscores | |
| sanitized_filename = re.sub(r'_+', '_', sanitized_filename) | |
| if sanitized_filename != filename: | |
| print(f"βΉοΈ Sanitized filename: '{filename}' β '{sanitized_filename}'") | |
| filename = sanitized_filename | |
| base_name = os.path.splitext(filename)[0] | |
| temp_md_file = f"{base_name}_input.md" | |
| temp_tex_file = f"{base_name}_temp.tex" | |
| print("\n" + "="*60) | |
| print("π MARKDOWN TO PDF CONVERSION PROCESS") | |
| print("="*60) | |
| try: | |
| # Step 1: Extract Summary Report Table | |
| print("\n[STEP 1/6] Extracting Examiner's Summary Report...") | |
| summary_pattern = re.compile( | |
| r"### Examiner's Summary Report\s*\n\n(\|.*?\|)\s*\n\n\*\*Total:\s*(.*?)\*\*", | |
| re.DOTALL | |
| ) | |
| summary_match = summary_pattern.search(text) | |
| if summary_match: | |
| summary_table_md = summary_match.group(1) | |
| summary_total = summary_match.group(2) | |
| text = summary_pattern.sub("", text) | |
| print(f" β SUCCESS: Extracted summary report with total: {summary_total}") | |
| else: | |
| summary_table_md = "" | |
| summary_total = "" | |
| print(" β οΈ WARNING: No Examiner's Summary Report found in markdown") | |
| # Step 2: Clean up markdown | |
| print("\n[STEP 2/6] Cleaning markdown and converting HTML to LaTeX...") | |
| text = cleanup_markdown_for_latex(text) | |
| text = convert_html_color_spans(text) | |
| print(" β SUCCESS: Markdown cleaned and HTML color spans converted") | |
| # Save cleaned markdown | |
| with open(temp_md_file, 'w', encoding='utf-8') as f: | |
| f.write(text) | |
| print(f" π Saved cleaned markdown to: {temp_md_file}") | |
| # Step 3: Convert MD to LaTeX via Pandoc | |
| print("\n[STEP 3/6] Converting markdown to LaTeX using Pandoc...") | |
| pandoc_cmd = [ | |
| "pandoc", | |
| "--from=markdown", | |
| "--to=latex", | |
| "--standalone", | |
| temp_md_file, | |
| "-o", temp_tex_file | |
| ] | |
| print(f" π§ Running: {' '.join(pandoc_cmd)}") | |
| result = subprocess.run(pandoc_cmd, capture_output=True, check=False) | |
| if result.returncode != 0: | |
| try: | |
| stderr = result.stderr.decode('utf-8', errors='replace') | |
| except: | |
| stderr = str(result.stderr) | |
| print(f" β FAILED: Pandoc returned error code {result.returncode}") | |
| print(f" Error details: {stderr[:500]}") | |
| raise Exception(f"Pandoc conversion failed: {stderr}") | |
| if not os.path.exists(temp_tex_file): | |
| print(f" β FAILED: LaTeX file not created at {temp_tex_file}") | |
| raise Exception("Pandoc did not create the expected LaTeX file") | |
| print(f" β SUCCESS: LaTeX file created at {temp_tex_file}") | |
| # Step 4: Modify the generated LaTeX | |
| print("\n[STEP 4/6] Enhancing LaTeX document...") | |
| with open(temp_tex_file, "r", encoding="utf-8") as f: | |
| tex = f.read() | |
| tex = tex.replace( | |
| r"\documentclass{article}", | |
| r"\documentclass[12pt]{extarticle}" | |
| ) | |
| insert_packages = r"""\usepackage[a4paper, margin=1in]{geometry} | |
| \usepackage{xcolor} | |
| \usepackage{colortbl} | |
| \usepackage{booktabs} | |
| \usepackage{array} | |
| \usepackage{longtable} | |
| \renewcommand{\arraystretch}{1.4} | |
| \newcolumntype{L}[1]{>{\raggedright\arraybackslash}p{#1}}""" | |
| tex = tex.replace(r"\begin{document}", insert_packages + "\n\\begin{document}") | |
| print(" β SUCCESS: Enhanced document class and added packages") | |
| # Step 5: Build enhanced LaTeX table for summary | |
| if summary_table_md: | |
| print("\n[STEP 5/6] Building enhanced summary table...") | |
| summary_rows = parse_md_table(summary_table_md) | |
| print(f" π Parsed {len(summary_rows)} rows from summary table") | |
| summary_latex = r"""\section*{Examiner's Summary Report} | |
| \begin{center} | |
| \rowcolors{2}{gray!10}{white} | |
| \begin{tabular}{|c|c|c|L{8cm}|} | |
| \hline | |
| \rowcolor{gray!30} | |
| \textbf{Question} & \textbf{Marks} & \textbf{Remark} & \textbf{Feedback} \\ \hline | |
| """ | |
| for row in summary_rows: | |
| if len(row) >= 4: | |
| feedback = row[3] | |
| if not ('$' in feedback or '\\textcolor' in feedback): | |
| feedback = feedback.replace('%', r'\%').replace('&', r'\&').replace('#', r'\#') | |
| summary_latex += f"{row[0]} & {row[1]} & {row[2]} & {feedback} \\\\ \\hline\n" | |
| summary_latex += r"\end{tabular}" | |
| summary_latex += "\n\\end{center}\n\n" | |
| summary_latex += f"\\vspace{{0.5cm}}\\noindent\\textbf{{\\Large Overall Score: {summary_total}}}\n\n" | |
| summary_latex += "\\hrulefill\n\\vspace{1cm}\n\n" | |
| summary_latex += "\\newpage\n\n" | |
| tex = tex.replace( | |
| r"\begin{document}", | |
| r"\begin{document}" + "\n\n" + summary_latex | |
| ) | |
| print(" β SUCCESS: Summary table with zebra striping injected at document top") | |
| else: | |
| print("\n[STEP 5/6] Skipping summary table (not found)") | |
| with open(temp_tex_file, "w", encoding="utf-8") as f: | |
| f.write(tex) | |
| # Step 6: Compile PDF with pdflatex | |
| print("\n[STEP 6/6] Compiling PDF with pdflatex...") | |
| pdflatex_cmd = [ | |
| "pdflatex", | |
| "-interaction=nonstopmode", | |
| f"-output-directory={os.path.dirname(os.path.abspath(temp_tex_file)) or '.'}", | |
| temp_tex_file | |
| ] | |
| print(" π§ Running pdflatex (pass 1/2)...") | |
| result1 = subprocess.run(pdflatex_cmd, capture_output=True, check=False) | |
| print(" π§ Running pdflatex (pass 2/2)...") | |
| result2 = subprocess.run(pdflatex_cmd, capture_output=True, check=False) | |
| temp_pdf = temp_tex_file.replace(".tex", ".pdf") | |
| if not os.path.exists(temp_pdf): | |
| print(f" β FAILED: PDF not created at {temp_pdf}") | |
| try: | |
| stderr = result2.stderr.decode('utf-8', errors='replace') | |
| except: | |
| stderr = str(result2.stderr) | |
| log_file = temp_tex_file.replace(".tex", ".log") | |
| if os.path.exists(log_file): | |
| print(f" π Checking LaTeX log file: {log_file}") | |
| try: | |
| with open(log_file, 'r', encoding='utf-8', errors='replace') as f: | |
| log_content = f.read() | |
| error_lines = [line for line in log_content.split('\n') if '!' in line] | |
| if error_lines: | |
| print(f" β LaTeX Errors found ({len(error_lines)} lines):") | |
| for err_line in error_lines[:10]: | |
| print(f" {err_line}") | |
| stderr += "\n\nLaTeX Errors:\n" + "\n".join(error_lines[:10]) | |
| except Exception as log_err: | |
| print(f" β οΈ Could not read log file: {log_err}") | |
| raise Exception(f"pdflatex failed to create PDF. Error: {stderr[:1000]}") | |
| print(f" β SUCCESS: PDF compiled at {temp_pdf}") | |
| # Move output PDF to final filename | |
| if os.path.exists(filename): | |
| os.remove(filename) | |
| os.rename(temp_pdf, filename) | |
| print(f" π¦ Moved to final location: {filename}") | |
| # Clean up temporary files | |
| print("\n[CLEANUP] Removing temporary files...") | |
| cleaned_count = 0 | |
| for ext in [".md", ".tex", ".aux", ".log", ".out"]: | |
| temp_file = base_name + ext | |
| if os.path.exists(temp_file): | |
| os.remove(temp_file) | |
| cleaned_count += 1 | |
| for prefix in ["_input", "_temp"]: | |
| temp_file = base_name + prefix + ext | |
| if os.path.exists(temp_file): | |
| os.remove(temp_file) | |
| cleaned_count += 1 | |
| print(f" π§Ή Cleaned up {cleaned_count} temporary files") | |
| print("\n" + "="*60) | |
| print("β PDF CONVERSION COMPLETED SUCCESSFULLY") | |
| print(f"π Output file: {filename}") | |
| print("="*60 + "\n") | |
| return filename | |
| except subprocess.CalledProcessError as e: | |
| print(f"\nβ SUBPROCESS ERROR: {e}") | |
| print(f" STDOUT: {e.stdout}") | |
| print(f" STDERR: {e.stderr}") | |
| print("="*60 + "\n") | |
| raise Exception(f"PDF conversion failed: {e.stderr}") | |
| except FileNotFoundError as e: | |
| print(f"\nβ FILE NOT FOUND ERROR: {e}") | |
| print("="*60) | |
| print("β οΈ REQUIRED TOOLS MISSING") | |
| print("Please install the following:") | |
| print(" β’ pandoc") | |
| print(" β’ texlive (or MiKTeX on Windows)") | |
| print(" β’ texlive-latex-extra (for extarticle class)") | |
| print("="*60 + "\n") | |
| raise Exception( | |
| "Pandoc or pdflatex not found. Please install:\n" | |
| " - pandoc\n" | |
| " - texlive (or MiKTeX on Windows)\n" | |
| " - texlive-latex-extra (for extarticle class)" | |
| ) | |
| except Exception as e: | |
| print(f"\nβ UNEXPECTED ERROR: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| print("="*60 + "\n") | |
| raise | |
| def compress_pdf(input_path, output_path=None, max_size=20*1024*1024): | |
| if output_path is None: | |
| base, ext = os.path.splitext(input_path) | |
| output_path = f"{base}_compressed{ext}" | |
| try: | |
| size = os.path.getsize(input_path) | |
| except Exception: | |
| return input_path | |
| if size <= max_size: | |
| print(f"βΉοΈ Not compressing {input_path} ({size/1024/1024:.2f} MB <= {max_size/1024/1024} MB)") | |
| return input_path | |
| print(f"π Compressing {input_path} ({size/1024/1024:.2f} MB) -> {output_path}") | |
| try: | |
| gs_cmd = [ | |
| "gs", "-sDEVICE=pdfwrite", | |
| "-dCompatibilityLevel=1.4", | |
| "-dPDFSETTINGS=/ebook", | |
| "-dNOPAUSE", "-dQUIET", "-dBATCH", | |
| f"-sOutputFile={output_path}", input_path | |
| ] | |
| subprocess.run(gs_cmd, check=True) | |
| new_size = os.path.getsize(output_path) | |
| print(f"β Compression done. New size: {new_size/1024/1024:.2f} MB") | |
| if new_size <= max_size: | |
| return output_path | |
| else: | |
| print("β οΈ Compressed file still larger than threshold; returning original") | |
| return input_path | |
| except Exception as e: | |
| print("β Compression error:", e) | |
| return input_path | |
| def upload_to_gemini(path, display_name=None): | |
| """ | |
| Upload a file to Gemini using the NEW google-genai SDK. | |
| Uses the current active API key from client_manager. | |
| """ | |
| print(f"π€ Uploading {path} to Gemini...") | |
| try: | |
| current_client = client_manager.get_current_client() | |
| uploaded_file = current_client.files.upload(file=path) | |
| # Wait for processing to complete | |
| print(f"β³ Waiting for file processing: {uploaded_file.name}") | |
| while uploaded_file.state.name == "PROCESSING": | |
| time.sleep(2) | |
| uploaded_file = current_client.files.get(name=uploaded_file.name) | |
| if uploaded_file.state.name == "FAILED": | |
| raise Exception(f"File processing failed: {uploaded_file.name}") | |
| print(f"β Uploaded and processed: {uploaded_file.name}") | |
| return uploaded_file | |
| except Exception as e: | |
| print(f"β Upload failed for {path}: {e}") | |
| raise | |
| def merge_pdfs(paths, output_path): | |
| writer = PdfWriter() | |
| for p in paths: | |
| reader = PdfReader(p) | |
| for page in reader.pages: | |
| writer.add_page(page) | |
| with open(output_path, "wb") as f: | |
| writer.write(f) | |
| return output_path | |
| def gemini_generate_content(prompt_text, file_upload_obj=None, image_obj=None, model_name="gemini-2.5-pro", fallback_model="gemini-2.5-flash", fallback_model_2="gemini-2.5-flash-lite", file_path=None): | |
| """ | |
| Send prompt_text and optionally an uploaded file (or an image object/list) to the model using NEW SDK. | |
| Automatically rotates through available API keys on RESOURCE_EXHAUSTED errors. | |
| When rotating keys with file uploads, re-uploads the file with the new API key. | |
| Args: | |
| prompt_text: The prompt to send | |
| file_upload_obj: Previously uploaded file object (optional) | |
| image_obj: Image or list of images (optional) | |
| model_name: Primary model to use | |
| fallback_model: First fallback model if primary fails | |
| fallback_model_2: Second fallback model if first fallback fails | |
| file_path: Local file path (needed for re-upload when rotating keys) | |
| Returns textual response and prints progress. | |
| """ | |
| contents = [prompt_text] | |
| current_file_obj = file_upload_obj | |
| if current_file_obj: | |
| contents.append(current_file_obj) | |
| if image_obj: | |
| if isinstance(image_obj, list): | |
| for img_path in image_obj: | |
| if isinstance(img_path, str): | |
| pil_img = Image.open(img_path) | |
| contents.append(pil_img) | |
| else: | |
| contents.append(img_path) | |
| else: | |
| if isinstance(image_obj, str): | |
| pil_img = Image.open(image_obj) | |
| contents.append(pil_img) | |
| else: | |
| contents.append(image_obj) | |
| print("π‘ Sending request to Gemini (prompt length:", len(prompt_text), "chars )") | |
| # Try with all available API keys | |
| max_attempts = len(client_manager.api_keys) | |
| attempt = 0 | |
| while attempt < max_attempts: | |
| current_client = client_manager.get_current_client() | |
| current_key_num = client_manager.current_key_index + 1 | |
| # Update contents with current file object | |
| contents = [prompt_text] | |
| if current_file_obj: | |
| contents.append(current_file_obj) | |
| if image_obj: | |
| if isinstance(image_obj, list): | |
| for img_path in image_obj: | |
| if isinstance(img_path, str): | |
| pil_img = Image.open(img_path) | |
| contents.append(pil_img) | |
| else: | |
| contents.append(img_path) | |
| else: | |
| if isinstance(image_obj, str): | |
| pil_img = Image.open(image_obj) | |
| contents.append(pil_img) | |
| else: | |
| contents.append(image_obj) | |
| # Try primary model first | |
| try: | |
| print(f"π Using API key #{current_key_num} with model {model_name}") | |
| response = current_client.models.generate_content( | |
| model=model_name, | |
| contents=contents | |
| ) | |
| raw_text = response.text | |
| print(f"π₯ Received response (chars): {len(raw_text)}") | |
| # Success! Reset to primary key for next request | |
| client_manager.reset_to_primary() | |
| return raw_text | |
| except Exception as e: | |
| error_str = str(e) | |
| print(f"β Generation failed with API key #{current_key_num} and model {model_name}: {e}") | |
| # Check if it's a RESOURCE_EXHAUSTED error | |
| if "429" in error_str or "RESOURCE_EXHAUSTED" in error_str: | |
| print(f"β οΈ Quota exhausted for API key #{current_key_num} with model {model_name}") | |
| # Try first fallback model with SAME API key | |
| print(f"β‘ Trying fallback model {fallback_model} with same API key #{current_key_num}") | |
| try: | |
| response = current_client.models.generate_content( | |
| model=fallback_model, | |
| contents=contents | |
| ) | |
| raw_text = response.text | |
| print(f"π₯ Received response (chars): {len(raw_text)}") | |
| client_manager.reset_to_primary() | |
| return raw_text | |
| except Exception as e_fallback: | |
| error_fallback_str = str(e_fallback) | |
| print(f"β Fallback model {fallback_model} also failed: {e_fallback}") | |
| # Check if first fallback also exhausted | |
| if "429" in error_fallback_str or "RESOURCE_EXHAUSTED" in error_fallback_str: | |
| print(f"β οΈ First fallback model also exhausted for API key #{current_key_num}") | |
| # Try second fallback model with SAME API key | |
| print(f"β‘ Trying second fallback model {fallback_model_2} with same API key #{current_key_num}") | |
| try: | |
| response = current_client.models.generate_content( | |
| model=fallback_model_2, | |
| contents=contents | |
| ) | |
| raw_text = response.text | |
| print(f"π₯ Received response (chars): {len(raw_text)}") | |
| client_manager.reset_to_primary() | |
| return raw_text | |
| except Exception as e_fallback_2: | |
| error_fallback_2_str = str(e_fallback_2) | |
| print(f"β Second fallback model {fallback_model_2} also failed: {e_fallback_2}") | |
| # Check if second fallback also exhausted | |
| if "429" in error_fallback_2_str or "RESOURCE_EXHAUSTED" in error_fallback_2_str: | |
| print(f"β οΈ All 3 models exhausted for API key #{current_key_num}") | |
| # Now try next API key if available | |
| if attempt < max_attempts - 1: | |
| # Check if we have file uploads and can re-upload | |
| if file_upload_obj and file_path: | |
| print(f"π Rotating to next API key and re-uploading file...") | |
| client_manager.rotate_to_next_key() | |
| # Re-upload file with new API key | |
| try: | |
| print(f"π€ Re-uploading file with API key #{client_manager.current_key_index + 1}...") | |
| current_file_obj = upload_to_gemini(file_path) | |
| print(f"β File re-uploaded successfully") | |
| except Exception as upload_error: | |
| print(f"β Failed to re-upload file: {upload_error}") | |
| raise Exception(f"Failed to re-upload file with new API key: {upload_error}") | |
| attempt += 1 | |
| print(f"π Retrying with next API key (attempt {attempt + 1}/{max_attempts})...") | |
| continue | |
| elif file_upload_obj and not file_path: | |
| print("β οΈ WARNING: Cannot rotate API keys - file_path not provided for re-upload!") | |
| print(" To enable API key rotation with file uploads, pass file_path parameter.") | |
| raise Exception(f"All 3 models exhausted for API key #{current_key_num}. Cannot rotate without file_path.") | |
| else: | |
| # No file uploads, safe to rotate | |
| client_manager.rotate_to_next_key() | |
| attempt += 1 | |
| print(f"π Trying next API key (attempt {attempt + 1}/{max_attempts})...") | |
| continue | |
| else: | |
| raise Exception(f"All {max_attempts} API key(s) exhausted with all 3 models.") | |
| else: | |
| # Second fallback failed with different error | |
| raise Exception(f"Second fallback model failed: {e_fallback_2}") | |
| else: | |
| # First fallback failed with different error | |
| raise Exception(f"First fallback model failed: {e_fallback}") | |
| elif "403" in error_str or "PERMISSION_DENIED" in error_str: | |
| # This happens when trying to access a file uploaded with a different API key | |
| print(f"β οΈ Permission denied - likely due to file uploaded with different API key") | |
| # Try to re-upload if we have the file path | |
| if file_path and attempt < max_attempts - 1: | |
| print(f"π Attempting to re-upload file with next API key...") | |
| client_manager.rotate_to_next_key() | |
| try: | |
| print(f"π€ Re-uploading file with API key #{client_manager.current_key_index + 1}...") | |
| current_file_obj = upload_to_gemini(file_path) | |
| print(f"β File re-uploaded successfully") | |
| attempt += 1 | |
| print(f"π Retrying with next API key (attempt {attempt + 1}/{max_attempts})...") | |
| continue | |
| except Exception as upload_error: | |
| print(f"β Failed to re-upload file: {upload_error}") | |
| raise Exception(f"Failed to re-upload file with new API key: {upload_error}") | |
| else: | |
| raise Exception(f"File access denied. Cannot re-upload without file_path. Error: {e}") | |
| else: | |
| # Other error - try fallback models with same key | |
| print(f"β‘ Trying fallback model {fallback_model} with same API key #{current_key_num}") | |
| try: | |
| response = current_client.models.generate_content( | |
| model=fallback_model, | |
| contents=contents | |
| ) | |
| raw_text = response.text | |
| print(f"π₯ Received response (chars): {len(raw_text)}") | |
| client_manager.reset_to_primary() | |
| return raw_text | |
| except Exception as e2: | |
| print(f"β First fallback also failed: {e2}") | |
| # Try second fallback | |
| print(f"β‘ Trying second fallback model {fallback_model_2} with same API key #{current_key_num}") | |
| try: | |
| response = current_client.models.generate_content( | |
| model=fallback_model_2, | |
| contents=contents | |
| ) | |
| raw_text = response.text | |
| print(f"π₯ Received response (chars): {len(raw_text)}") | |
| client_manager.reset_to_primary() | |
| return raw_text | |
| except Exception as e3: | |
| print(f"β Second fallback also failed: {e3}") | |
| # If we have more keys, try them | |
| if attempt < max_attempts - 1: | |
| if file_upload_obj and file_path: | |
| print(f"π Rotating to next API key and re-uploading file...") | |
| client_manager.rotate_to_next_key() | |
| try: | |
| print(f"π€ Re-uploading file with API key #{client_manager.current_key_index + 1}...") | |
| current_file_obj = upload_to_gemini(file_path) | |
| print(f"β File re-uploaded successfully") | |
| except Exception as upload_error: | |
| print(f"β Failed to re-upload file: {upload_error}") | |
| raise Exception(f"Failed to re-upload file with new API key: {upload_error}") | |
| attempt += 1 | |
| print(f"π Retrying with next API key (attempt {attempt + 1}/{max_attempts})...") | |
| continue | |
| elif file_upload_obj and not file_path: | |
| raise Exception(f"All models failed. Cannot rotate keys without file_path. Last error: {e3}") | |
| else: | |
| client_manager.rotate_to_next_key() | |
| attempt += 1 | |
| print(f"π Trying next API key (attempt {attempt + 1}/{max_attempts})...") | |
| continue | |
| else: | |
| raise Exception(f"All attempts failed. Last error: {e3}") | |
| # If we exhausted all attempts | |
| raise Exception(f"β All {max_attempts} API key(s) exhausted. Please check your quota or try again later.") | |
| # ---------------- PARSERS ---------------- | |
| def extract_question_ids_from_qpms(text: str): | |
| """Extract question IDs from QP+MS transcript.""" | |
| print("π Extracting question IDs from QP+MS transcript using regex...") | |
| clean_text = text.replace("\u00A0", " ").replace("\t", " ") | |
| primary_matches = re.findall(r"^\s*Question\s*[:\s]\s*([\dA-Za-z.()]+)", clean_text, re.MULTILINE) | |
| if primary_matches: | |
| print(f"β Extracted {len(primary_matches)} question IDs from explicit 'Question X' lines.") | |
| print("IDs:", primary_matches) | |
| return primary_matches | |
| fallback_matches = re.findall(r"^\s*(\d+(?:[.)]|\([a-zA-Z0-9]+\))?[a-zA-Z0-9]*)", clean_text, re.MULTILINE) | |
| if fallback_matches: | |
| print(f"β Extracted {len(fallback_matches)} question IDs (fallback numbered lists).") | |
| print("IDs:", fallback_matches) | |
| else: | |
| print("β οΈ No question IDs extracted; will send NA placeholder.") | |
| return fallback_matches | |
| def build_as_cot_prompt_with_expected_ids(expected_ids, qpms_text=None): | |
| """ | |
| Construct the AS transcription prompt injecting the expected IDs block and graph detection instructions, | |
| modifying it to include a Chain-of-Thought (CoT) section using a <think> tag, and | |
| requiring mathematical expressions to be enclosed in LaTeX dollar delimiters ($...$). | |
| Includes explicit rules for interpreting NA-like answers and no-response situations. | |
| """ | |
| if not expected_ids: | |
| ids_block = "{NA}" | |
| else: | |
| ids_block = "{\n" + "\n".join(expected_ids) + "\n}" | |
| qpms_section = "" | |
| if qpms_text is not None: | |
| qpms_section = ( | |
| "\nYou are also provided with the full transcript of the Question Paper and Markscheme (QP+MS) below." | |
| "\nUse it primarily to resolve ambiguous handwriting and to confirm expected answers when needed." | |
| "\n--- BEGIN QP+MS TRANSCRIPT ---\n" | |
| f"{qpms_text.strip()}\n" | |
| "--- END QP+MS TRANSCRIPT ---\n" | |
| ) | |
| prompt = f"""You are a high-quality handwritten transcription assistant, performing transcription with a Chain-of-Thought process. | |
| INPUT: This PDF contains a student's handwritten answer sheet. | |
| {qpms_section} | |
| TASK: | |
| 1. **THINKING:** Before transcribing each answer, document your thought process inside a **<think>** tag. | |
| - Identify the question ID. If inferred, note why. | |
| - Detail any ambiguities (unclear numbers, symbols, or structures). | |
| - Explain how ambiguities were resolved, including whether the QP+MS transcript was consulted. | |
| - If QP+MS was consulted but you chose not to change the transcription, state this. | |
| - If the initial question label was incorrect (e.g., 2.a vs 2.b), correct it and briefly explain the reasoning in <think>. | |
| *Example Thinking:* | |
| <think> | |
| - Found Question 3(a). | |
| - The term could be '$2x$' or '21x'. | |
| - Markscheme uses '$21x$', but handwriting matches '$2x$'. | |
| - Decision: transcribe '$2x$'. | |
| </think> | |
| 2. **TRANSCRIPTION:** Transcribe the student's answers directly and faithfully. | |
| - Assign each answer to a labelled question ID when present. | |
| - For unlabeled answers, segment logically and mark inferred IDs as "**INFERRED: <id>**". | |
| - **Mathematical expressions and standalone variables must appear inside LaTeX dollar delimiters ($...$).** | |
| - If a diagram/graph is omitted, write **[Graph omitted]**. | |
| - If handwriting is unreadable: **[illegible]**. | |
| **ANSWER-INTERPRETATION RULES:** | |
| - If the student writes βNAβ, βN/Aβ, βNot Applicableβ, or clear equivalents β record exactly as **NA**. | |
| - If the student leaves the space blank, crosses it out, makes no meaningful attempt, or provides no answer β record **[No response]**. | |
| Ensure deterministic formatting so subsequent models can grade directly from this aligned format. | |
| Expected questions (if missing, write NA): | |
| {ids_block} | |
| ----------------------- | |
| OUTPUT FORMAT: | |
| <think>...</think> | |
| Question <id> | |
| AS:<transcribed answer or placeholder> | |
| <think>...</think> | |
| Question <id> | |
| AS:<transcribed answer or placeholder> | |
| ... | |
| ==== GRAPH FOUND ANSWERS ==== | |
| Graph found in: | |
| - Answer <number> β Page <number> | |
| (one per line) | |
| ==== END GRAPH FOUND ====""" | |
| return prompt | |
| def extract_graph_questions_from_ms(text: str): | |
| """Extract graph questions and page numbers from MS transcript.""" | |
| clean_text = text.replace("\u00A0", " ").replace("\t", " ") | |
| match = re.search(r"==== GRAPH EXPECTED QUESTIONS ====\s*(.*?)\s*==== END GRAPH EXPECTED ====", | |
| clean_text, re.S) | |
| graph_dict = {} | |
| if match: | |
| block = match.group(1) | |
| for line in block.splitlines(): | |
| line = line.strip() | |
| if line.startswith("- Question"): | |
| q_match = re.match(r"- Question\s+([\dA-Za-z.()]+)\s*β\s*Page\s*(\d+)", line) | |
| if q_match: | |
| q_id, page = q_match.groups() | |
| graph_dict[q_id] = int(page) | |
| return graph_dict | |
| def extract_graph_answers_from_as(text: str): | |
| """Extract graph answers and page numbers from AS transcript.""" | |
| clean_text = text.replace("\u00A0", " ").replace("\t", " ") | |
| block = re.search(r"==== GRAPH FOUND ANSWERS ====\s*(.*?)\s*==== END GRAPH FOUND ====", | |
| clean_text, re.S) | |
| graph_dict = {} | |
| if block: | |
| for line in block.group(1).splitlines(): | |
| line = line.strip() | |
| if line.startswith("- Answer"): | |
| match = re.match(r"- Answer\s+([\dA-Za-z.()]+)\s*β\s*Page\s*(\d+)", line) | |
| if match: | |
| ans_id, page = match.groups() | |
| graph_dict[ans_id] = int(page) | |
| return graph_dict | |
| def extract_marks_from_grading(grading_text): | |
| """ | |
| Parse the grading markdown and extract marks per question from the Awarded column only. | |
| """ | |
| print("π Extracting awarded marks from grading output...") | |
| grading_json = {"grading": []} | |
| question_blocks = re.split(r"###\s*Question\s+", grading_text) | |
| for block in question_blocks[1:]: | |
| first_line = block.strip().splitlines()[0].strip() if block.strip().splitlines() else "" | |
| q_id_match = re.match(r"([0-9]+(?:[a-zA-Z]|\([^)]+\)|(?:\.[a-zA-Z0-9]+))*)", first_line) | |
| if not q_id_match: | |
| q_id = first_line.split()[0] if first_line else "" | |
| else: | |
| q_id = q_id_match.group(1).strip() | |
| # Extract marks only from the "Awarded" column (4th column in the table) | |
| awarded = [] | |
| lines = block.split('\n') | |
| for line in lines: | |
| if '|' in line: | |
| parts = [p.strip() for p in line.split('|')] | |
| # Check if this is a data row (not header or separator) and has at least 5 columns | |
| if len(parts) >= 5 and not parts[1].startswith('-'): | |
| awarded_col = parts[4] # 4th column (index 4 because of leading empty from split) | |
| # Extract mark codes from the awarded column | |
| marks = re.findall(r"\b([MABCR]\d+|[MABCR]0)\b", awarded_col) | |
| awarded.extend(marks) | |
| grading_json["grading"].append({ | |
| "question": q_id, | |
| "marks_awarded": awarded | |
| }) | |
| print("β Extracted grading marks for", len(grading_json["grading"]), "question blocks.") | |
| print(json.dumps(grading_json, indent=2)) | |
| return grading_json | |
| def check_and_correct_total_marks(grading_text): | |
| """ | |
| Verifies the total marks in the Examiner's Summary Report against | |
| the sum of individual question marks. Corrects if discrepancy found. | |
| Args: | |
| grading_text (str): The full grading markdown text | |
| Returns: | |
| tuple: (corrected_text, calculated_awarded, calculated_possible, was_corrected) | |
| """ | |
| print("\n" + "="*60) | |
| print("π VERIFYING TOTAL MARKS IN SUMMARY REPORT") | |
| print("="*60) | |
| question_marks = {} | |
| calculated_total_awarded = 0 | |
| calculated_total_possible = 0 | |
| # Updated pattern to match BOTH formats: | |
| # ### Question <1.a> (with angle brackets) | |
| # ### Question 1.a (without angle brackets) | |
| # The <? makes the opening bracket optional | |
| # The >? makes the closing bracket optional | |
| question_block_pattern = re.compile( | |
| r"### Question\s*<?([0-9]+(?:[.()][a-z0-9]+)*)>?\s*[\s\S]*?\*\*Total:\s*(\d+)/(\d+)\*\*", | |
| re.DOTALL | re.IGNORECASE | |
| ) | |
| matches = question_block_pattern.finditer(grading_text) | |
| for match in matches: | |
| question_id = match.group(1).strip() | |
| awarded = int(match.group(2)) | |
| possible = int(match.group(3)) | |
| question_marks[question_id] = {'awarded': awarded, 'possible': possible} | |
| calculated_total_awarded += awarded | |
| calculated_total_possible += possible | |
| print(f"\nοΏ½ Exltracted marks from {len(question_marks)} questions:") | |
| for q_id, marks in question_marks.items(): | |
| print(f" Question {q_id}: {marks['awarded']}/{marks['possible']}") | |
| print(f"\nπ Calculated totals from individual questions:") | |
| print(f" Awarded: {calculated_total_awarded}") | |
| print(f" Possible: {calculated_total_possible}") | |
| # Find the summary report section | |
| summary_report_start = grading_text.find("### Examiner's Summary Report") | |
| if summary_report_start == -1: | |
| print("β οΈ Warning: Could not find '### Examiner's Summary Report' section.") | |
| return grading_text, calculated_total_awarded, calculated_total_possible, False | |
| summary_section = grading_text[summary_report_start:] | |
| summary_total_pattern = re.compile(r"(\*\*Total:\s*)(\d+)/(\d+)(\*\*)") | |
| summary_match = summary_total_pattern.search(summary_section) | |
| original_summary_awarded = 0 | |
| original_summary_possible = 0 | |
| if summary_match: | |
| original_summary_awarded = int(summary_match.group(2)) | |
| original_summary_possible = int(summary_match.group(3)) | |
| print(f"\nπ Original summary report total: {original_summary_awarded}/{original_summary_possible}") | |
| else: | |
| print("β οΈ Warning: Could not find overall total in summary report.") | |
| return grading_text, calculated_total_awarded, calculated_total_possible, False | |
| # Check for discrepancies | |
| corrected_report_text = grading_text | |
| total_mismatch = False | |
| if calculated_total_awarded != original_summary_awarded: | |
| print(f"\nβ DISCREPANCY FOUND in awarded marks!") | |
| print(f" Calculated: {calculated_total_awarded}") | |
| print(f" Reported: {original_summary_awarded}") | |
| total_mismatch = True | |
| if calculated_total_possible != original_summary_possible: | |
| print(f"\nβ DISCREPANCY FOUND in possible marks!") | |
| print(f" Calculated: {calculated_total_possible}") | |
| print(f" Reported: {original_summary_possible}") | |
| total_mismatch = True | |
| if total_mismatch: | |
| print(f"\nπ§ CORRECTING summary total:") | |
| print(f" FROM: {original_summary_awarded}/{original_summary_possible}") | |
| print(f" TO: {calculated_total_awarded}/{calculated_total_possible}") | |
| # Correct only in the summary section | |
| corrected_summary_section = re.sub( | |
| summary_total_pattern, | |
| rf"\g<1>{calculated_total_awarded}/{calculated_total_possible}\g<4>", | |
| summary_section, | |
| count=1 | |
| ) | |
| corrected_report_text = grading_text[:summary_report_start] + corrected_summary_section | |
| print("β Total marks corrected successfully!") | |
| else: | |
| print("\nβ Total marks are CORRECT - no correction needed!") | |
| print("="*60 + "\n") | |
| return corrected_report_text, calculated_total_awarded, calculated_total_possible, total_mismatch | |
| # ---------------- MAPPING/IMPRINT HELPERS ---------------- | |
| def ask_gemini_for_mapping_batch(image_paths, grading_json, expected_ids=None, rows=GRID_ROWS, cols=GRID_COLS): | |
| """ | |
| Send multiple page images together to Gemini for batch mapping processing. | |
| """ | |
| ids_block = "{NA}" | |
| if expected_ids: | |
| ids_block = "{\n" + "\n".join(expected_ids) + "\n}" | |
| prompt = f"""You are an exam marker. Your role is to identify where each question begins on each page. | |
| The pages are divided into a {rows} x {cols} grid. Each cell has a RUNNING NUMBER label. | |
| For each question in the grading JSON, return the cell NUMBER where the FIRST STEP of that question begins. | |
| β IMPORTANT RULES: | |
| - Do not place marks inside another question's answer area. | |
| - Prefer placing the marks in a BLANK cell immediately to the RIGHT of the answer step. If no blank cell is available to the right, then place in a blank cell to the LEFT. | |
| - Never place marks above or below the answer. | |
| - Each question should have unique cell number | |
| - If a question serial number is visible in the answer image, you must mandatorily identify the corresponding question using the grading JSON. | |
| IMPORTANT: For your help i have provided u questions that u can expect in the images: | |
| {ids_block} | |
| Return JSON only, like: | |
| [{{"page": 1, "question": "1(a)", "cell_number": 15}}, ...] | |
| Grading JSON: | |
| {json.dumps(grading_json, indent=2)}""" | |
| images = [Image.open(p) for p in image_paths] | |
| print(f"π‘ Sending batch mapping request for {len(image_paths)} pages to Gemini...") | |
| try: | |
| contents = [prompt] + images | |
| response = client.models.generate_content( | |
| model="gemini-2.5-flash", | |
| contents=contents | |
| ) | |
| raw_text = response.text | |
| except: | |
| print("β οΈ Trying fallback model for mapping...") | |
| contents = [prompt] + images | |
| response = client.models.generate_content( | |
| model="gemini-2.5-flash-preview-09-2025", | |
| contents=contents | |
| ) | |
| raw_text = response.text | |
| print("π₯ Batch mapping response (chars):", len(raw_text)) | |
| print("π Gemini raw batch output:") | |
| print(raw_text) | |
| try: | |
| match = re.search(r'(\[.*\])', raw_text, re.DOTALL) | |
| if match: | |
| mapping = json.loads(match.group(1)) | |
| print(f"β Parsed Gemini batch mapping for {len(image_paths)} pages") | |
| return mapping | |
| else: | |
| print("β Failed to find JSON array in response") | |
| return [] | |
| except Exception as e: | |
| print(f"β Failed to parse Gemini JSON mapping: {e}") | |
| return [] | |
| def normalize_question_id(qid): | |
| """ | |
| Normalize question ID to a standard format for matching. | |
| Converts formats like: | |
| - "1(a)" -> "1.a" | |
| - "2(c).i" -> "2.c.i" | |
| - "3.d.ii" -> "3.d.ii" (already normalized) | |
| """ | |
| if not qid: | |
| return qid | |
| # Replace parentheses format: 1(a) -> 1.a | |
| qid = re.sub(r'(\d+)\(([a-zA-Z])\)', r'\1.\2', qid) | |
| # Replace format like 2(c).i -> 2.c.i | |
| qid = re.sub(r'(\d+)\(([a-zA-Z]+)\)\.', r'\1.\2.', qid) | |
| return qid | |
| def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, expected_ids=None, rows=GRID_ROWS, cols=GRID_COLS): | |
| """ | |
| Convert PDF to images, create grid-numbered images for batch sending to Gemini, | |
| then annotate and produce imprinted PDF. | |
| """ | |
| print("π Converting answer PDF to images for imprinting...") | |
| pages = convert_from_path(pdf_path, dpi=100) | |
| annotated_page_paths = [] | |
| temp_grid_images = [] | |
| for p_index, page in enumerate(pages): | |
| img = page.convert("RGB") | |
| w, h = img.size | |
| cell_w, cell_h = w / cols, h / rows | |
| draw = ImageDraw.Draw(img) | |
| try: | |
| num_font = ImageFont.truetype("arial.ttf", 20) | |
| except Exception: | |
| num_font = ImageFont.load_default() | |
| cell_num = 1 | |
| for r in range(rows): | |
| for c in range(cols): | |
| x = int(c * cell_w + cell_w / 2) | |
| y = int(r * cell_h + cell_h / 2) | |
| text = str(cell_num) | |
| bbox = draw.textbbox((0, 0), text, font=num_font) | |
| tw = bbox[2] - bbox[0] | |
| th = bbox[3] - bbox[1] | |
| draw.text((x - tw/2, y - th/2), text, fill="black", font=num_font) | |
| cell_num += 1 | |
| temp_path = f"page_{p_index+1}_grid.png" | |
| img.save(temp_path, "PNG") | |
| temp_grid_images.append(temp_path) | |
| print("π° Created grid image:", temp_path) | |
| print("π‘ Sending page images to Gemini in batches for mapping...") | |
| batch_size = 10 | |
| all_mappings = [] | |
| for start in range(0, len(temp_grid_images), batch_size): | |
| batch_paths = temp_grid_images[start:start+batch_size] | |
| batch_mapping = ask_gemini_for_mapping_batch(batch_paths, grading_json, expected_ids, rows, cols) | |
| all_mappings.extend(batch_mapping) | |
| print(f"β Processed batch {start//batch_size + 1}: pages {start+1}-{start+len(batch_paths)}") | |
| print("π Annotating pages with marks...") | |
| for p_index, page in enumerate(pages): | |
| page_num = p_index + 1 | |
| page_img = page.convert("RGB") | |
| img_cv = np.array(page_img) | |
| img_cv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2BGR) | |
| h, w, _ = img_cv.shape | |
| cell_w_px, cell_h_px = w / cols, h / rows | |
| page_mappings = [m for m in all_mappings if m.get("page") == page_num] | |
| for item in page_mappings: | |
| qid = item.get("question") | |
| cell_number = item.get("cell_number") | |
| if qid is None or cell_number is None: | |
| continue | |
| # Normalize the question ID from Gemini mapping | |
| normalized_qid = normalize_question_id(qid) | |
| # Try exact match first with normalized ID | |
| marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", []) | |
| if g["question"] == normalized_qid), []) | |
| # If no match, try case-insensitive match | |
| if not marks_list: | |
| marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", []) | |
| if g["question"].lower() == normalized_qid.lower()), []) | |
| # If still no match, try with original qid | |
| if not marks_list: | |
| marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", []) | |
| if g["question"] == qid), []) | |
| marks_text = ",".join(marks_list) if marks_list else "?" | |
| if marks_text == "?": | |
| print(f"β οΈ No marks found for question '{qid}' (normalized: '{normalized_qid}') on page {page_num}") | |
| row = (cell_number - 1) // cols | |
| col = (cell_number - 1) % cols | |
| x_c = int((col + 1) * cell_w_px - cell_w_px / 4) | |
| y_c = int((row + 0.5) * cell_h_px) | |
| font_scale = max(1.0, min(2.0, cell_h_px / 40.0)) | |
| thickness = max(2, int(font_scale * 2)) | |
| cv2.putText(img_cv, marks_text, (x_c, y_c), cv2.FONT_HERSHEY_SIMPLEX, | |
| font_scale, (0, 0, 255), thickness, cv2.LINE_AA) | |
| print(f"π Marks annotated for page {page_num}, question {qid}: {marks_text}") | |
| annotated_path = f"annotated_page_{page_num}.png" | |
| cv2.imwrite(annotated_path, img_cv) | |
| annotated_page_paths.append(annotated_path) | |
| print("β Annotated page saved:", annotated_path) | |
| print("π Merging annotated pages into final PDF...") | |
| with open(output_pdf, "wb") as f: | |
| f.write(img2pdf.convert(annotated_page_paths)) | |
| compressed = compress_pdf(output_pdf) | |
| print("π Imprinted PDF saved to:", compressed) | |
| return compressed | |
| def extract_pdf_pages_as_images(pdf_path, page_numbers, prefix): | |
| """ | |
| Extracts unique pages (1-based) from a PDF as images, saves as PNG, returns list of file paths. | |
| Handles cases where requested pages don't exist in the PDF. | |
| """ | |
| if not page_numbers: | |
| print(f"β οΈ No page numbers provided for extraction") | |
| return [] | |
| unique_pages = sorted(set(page_numbers)) | |
| # First, get the total page count to validate requested pages | |
| try: | |
| from PyPDF2 import PdfReader | |
| reader = PdfReader(pdf_path) | |
| total_pages = len(reader.pages) | |
| print(f"π PDF has {total_pages} total pages") | |
| # Filter out invalid page numbers | |
| valid_pages = [p for p in unique_pages if 1 <= p <= total_pages] | |
| invalid_pages = [p for p in unique_pages if p not in valid_pages] | |
| if invalid_pages: | |
| print(f"β οΈ Skipping invalid page numbers (out of range): {invalid_pages}") | |
| if not valid_pages: | |
| print(f"β No valid pages to extract from {pdf_path}") | |
| return [] | |
| unique_pages = valid_pages | |
| except Exception as e: | |
| print(f"β οΈ Could not validate page numbers: {e}. Proceeding with extraction...") | |
| # Extract the pages | |
| try: | |
| images = convert_from_path(pdf_path, dpi=200, first_page=min(unique_pages), last_page=max(unique_pages)) | |
| except Exception as e: | |
| print(f"β Failed to convert PDF pages to images: {e}") | |
| return [] | |
| out_paths = [] | |
| for idx, page_num in enumerate(unique_pages): | |
| img_idx = page_num - min(unique_pages) | |
| # Bounds check to prevent index errors | |
| if img_idx >= len(images): | |
| print(f"β οΈ Page {page_num} not found in extracted images (index {img_idx} >= {len(images)}). Skipping...") | |
| continue | |
| try: | |
| img = images[img_idx] | |
| out_path = f"{prefix}_page_{page_num}.png" | |
| img.save(out_path, "PNG") | |
| print(f"π€ Extracted graph page {page_num} from {pdf_path} as {out_path}") | |
| out_paths.append(out_path) | |
| except Exception as e: | |
| print(f"β Failed to save page {page_num}: {e}") | |
| continue | |
| return out_paths | |
| # ---------------- PIPELINE ---------------- | |
| def align_and_grade_pipeline(qp_path, ms_path, ans_path, subject="Maths", imprint=False, run_timestamp=None): | |
| """ | |
| Final pipeline with graph-aware grading logic using NEW SDK. | |
| Args: | |
| qp_path: Path to Question Paper PDF | |
| ms_path: Path to Markscheme PDF | |
| ans_path: Path to Answer Sheet PDF | |
| subject: Subject name (Maths or Science) | |
| imprint: Whether to generate imprinted PDF | |
| run_timestamp: Unix timestamp for organizing files in Supabase | |
| """ | |
| try: | |
| print("π Starting pipeline...") | |
| qp_path = compress_pdf(qp_path) | |
| ms_path = compress_pdf(ms_path) | |
| ans_path = compress_pdf(ans_path) | |
| merged_qpms_path = os.path.splitext(qp_path)[0] + "_merged_qp_ms.pdf" | |
| merge_pdfs([qp_path, ms_path], merged_qpms_path) | |
| print("π Merged QP + MS ->", merged_qpms_path) | |
| print("πΌ Uploading files to Gemini...") | |
| merged_uploaded = upload_to_gemini(merged_qpms_path) | |
| ans_uploaded = upload_to_gemini(ans_path) | |
| print("β Upload complete.") | |
| print("1.i) Transcribing QP+MS (questions first, then full markscheme, with graph detection)...") | |
| qpms_prompt = QP_MS_TRANSCRIPTION_PROMPT["content"] + "\nAt the end, also list all questions in the markscheme where a graph is expected, in the format:\nGraph expected in:\n- Question <number> β Page <number>\n(One per line, after ==== MARKSCHEME END ====)" | |
| qpms_text = gemini_generate_content(qpms_prompt, file_upload_obj=merged_uploaded, model_name="gemini-2.5-flash", fallback_model="gemini-2.5-flash-preview-09-2025", fallback_model_2="gemini-2.5-flash-lite", file_path=merged_qpms_path) | |
| print("π QP+MS transcription received. Saving debug file: debug_qpms_transcript.txt") | |
| with open("debug_qpms_transcript.txt", "w", encoding="utf-8") as f: | |
| f.write(qpms_text) | |
| ms_graph_mapping = extract_graph_questions_from_ms(qpms_text) | |
| print("πΌοΈ Graph-expected questions in MS:", ms_graph_mapping) | |
| ms_graph_pages = list(ms_graph_mapping.values()) | |
| ms_graph_images = [] | |
| if ms_graph_pages: | |
| ms_graph_images = extract_pdf_pages_as_images(merged_qpms_path, ms_graph_pages, prefix="qpms_graph") | |
| extracted_ids = extract_question_ids_from_qpms(qpms_text) | |
| if not extracted_ids: | |
| extracted_ids = ["NA"] | |
| print("1.ii) Building AS transcription prompt with expected question IDs and graph detection, sending to Gemini...") | |
| as_prompt = build_as_cot_prompt_with_expected_ids(extracted_ids, qpms_text) + "\nAt the end, also list all answers where a graph is found, in the format:\nGraph found in:\n- Answer <number> β Page <number>\n(One per line, after all answers)" | |
| as_text = gemini_generate_content(as_prompt, file_upload_obj=ans_uploaded, model_name="gemini-2.5-flash", fallback_model="gemini-2.5-flash-preview-09-2025", fallback_model_2="gemini-2.5-flash-lite", file_path=ans_path) | |
| print("π AS transcription received. Saving debug file: debug_as_transcript.txt") | |
| with open("debug_as_transcript.txt", "w", encoding="utf-8") as f: | |
| f.write(as_text) | |
| as_graph_mapping = extract_graph_answers_from_as(as_text) | |
| print("πΌοΈ Graph-attempted answers in AS:", as_graph_mapping) | |
| as_graph_pages = list(as_graph_mapping.values()) | |
| as_graph_images = [] | |
| if as_graph_pages: | |
| as_graph_images = extract_pdf_pages_as_images(ans_path, as_graph_pages, prefix="as_graph") | |
| print("2) Preparing grading input and sending to Gemini for grading...") | |
| grading_input = ( | |
| "=== QP+MS TRANSCRIPT BEGIN ===\n" | |
| + qpms_text | |
| + "\n=== QP+MS TRANSCRIPT END ===\n\n" | |
| + "=== ANSWER SHEET TRANSCRIPT BEGIN ===\n" | |
| + as_text | |
| + "\n=== ANSWER SHEET TRANSCRIPT END ===\n" | |
| ) | |
| if ms_graph_images or as_graph_images: | |
| graph_note = "\n\n---\nSome questions require graphs. I've attached the relevant graph pages from QP+MS and from the Answer Sheet. Use them as visual context when grading.\n---\n" | |
| grading_input += graph_note | |
| grading_prompt_obj = get_grading_prompt(subject.lower()) | |
| grading_prompt_system = grading_prompt_obj["content"] | |
| grading_images = ms_graph_images + as_graph_images | |
| grading_text = gemini_generate_content(grading_prompt_system + "\n\nPlease grade the following transcripts:\n" + grading_input, image_obj=grading_images if grading_images else None, model_name="gemini-2.5-pro", fallback_model="gemini-2.5-flash") | |
| print("π§Ύ Grading output received. Saving debug file: debug_grading.md") | |
| with open("debug_grading.md", "w", encoding="utf-8") as f: | |
| f.write(grading_text) | |
| # Verify and correct total marks if needed | |
| grading_text, calc_awarded, calc_possible, was_corrected = check_and_correct_total_marks(grading_text) | |
| if was_corrected: | |
| print("π Saving corrected grading to debug file: debug_grading_corrected.md") | |
| with open("debug_grading_corrected.md", "w", encoding="utf-8") as f: | |
| f.write(grading_text) | |
| base_name = os.path.splitext(os.path.basename(ans_path))[0] | |
| grading_pdf_path = save_as_pdf(grading_text, f"{base_name}_graded.pdf") | |
| print("π Grading PDF saved:", grading_pdf_path) | |
| grading_json = extract_marks_from_grading(grading_text) | |
| with open("debug_grading_json.json", "w", encoding="utf-8") as f: | |
| json.dump(grading_json, f, indent=2, ensure_ascii=False) | |
| print("π§ Grading marks extraction complete.") | |
| imprinted_pdf_path = None | |
| if imprint: | |
| print("β Imprint option enabled. Starting imprinting process...") | |
| imprinted_pdf_path = f"{base_name}_imprinted.pdf" | |
| imprinted_pdf_path = imprint_marks_using_mapping(ans_path, grading_json, imprinted_pdf_path, extracted_ids) | |
| print("β Imprinting finished. Imprinted PDF at:", imprinted_pdf_path) | |
| # Upload output files to Supabase (using same timestamp as input files) | |
| output_urls = { | |
| "graded_pdf_url": None, | |
| "imprinted_pdf_url": None | |
| } | |
| if supabase_client: | |
| print("\nπ€ Uploading output files to Supabase...") | |
| if grading_pdf_path: | |
| output_urls["graded_pdf_url"] = upload_file_to_supabase(grading_pdf_path, "graded", run_timestamp) | |
| if imprinted_pdf_path: | |
| output_urls["imprinted_pdf_url"] = upload_file_to_supabase(imprinted_pdf_path, "imprinted", run_timestamp) | |
| print("π Pipeline finished successfully.") | |
| return qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path, output_urls | |
| except Exception as e: | |
| print("β Pipeline error:", e) | |
| import traceback | |
| traceback.print_exc() | |
| return f"β Error: {e}", None, None, None, None, {} | |
| # ---------------- GRADIO UI ---------------- | |
| with gr.Blocks(title="AI Grading (Pandoc + pdflatex)") as demo: | |
| gr.Markdown("## π AI Grading β Using Pandoc + pdflatex for PDF Generation") | |
| gr.Markdown("**β Now using Pandoc with pdflatex for professional-quality PDF outputs!**") | |
| if supabase_client: | |
| gr.Markdown("**βοΈ Supabase Storage: Enabled** - All files will be uploaded to cloud storage") | |
| else: | |
| gr.Markdown("**β οΈ Supabase Storage: Disabled** - Files will only be processed locally") | |
| with gr.Row(): | |
| qp_file = gr.File(label="π Upload Question Paper (PDF)") | |
| ms_file = gr.File(label="π Upload Markscheme (PDF)") | |
| ans_file = gr.File(label="π Upload Student Answer Sheet (PDF)") | |
| with gr.Row(): | |
| subject_dropdown = gr.Dropdown( | |
| choices=["Maths", "Science", "Economics"], | |
| value="Maths", | |
| label="π Subject", | |
| info="Select the subject to apply appropriate grading guidelines" | |
| ) | |
| imprint_toggle = gr.Checkbox(label="β Imprint Marks on Student Answer Sheet", value=False) | |
| run_button = gr.Button("π Run Pipeline") | |
| # File URLs section (only shown if Supabase is enabled) | |
| if supabase_client: | |
| with gr.Accordion("βοΈ Uploaded File URLs", open=False): | |
| file_urls_box = gr.Textbox(label="Cloud Storage URLs", lines=8, interactive=False) | |
| with gr.Row(): | |
| qpms_box = gr.Textbox(label="π QP+MS Transcript", lines=12) | |
| as_box = gr.Textbox(label="π AS Transcript", lines=12) | |
| grading_output_box = gr.Textbox(label="π§Ύ Grading (Markdown)", lines=20) | |
| grading_pdf_file = gr.File(label="π₯ Download Grading PDF") | |
| imprint_pdf_file = gr.File(label="π₯ Download Imprinted PDF (Optional)") | |
| def run_pipeline(qp_file_obj, ms_file_obj, ans_file_obj, subject_choice, imprint_flag): | |
| if not qp_file_obj or not ms_file_obj or not ans_file_obj: | |
| error_msg = "β Please upload all three files" | |
| if supabase_client: | |
| return error_msg, "", "", None, None, "" | |
| else: | |
| return error_msg, "", "", None, None | |
| # Process and upload input files (generates shared timestamp) | |
| qp_path, ms_path, ans_path, input_urls, run_timestamp = process_and_upload_input_files( | |
| qp_file_obj, ms_file_obj, ans_file_obj | |
| ) | |
| # Run the grading pipeline (pass timestamp to keep all files together) | |
| qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path, output_urls = align_and_grade_pipeline( | |
| qp_path, ms_path, ans_path, subject=subject_choice, imprint=imprint_flag, run_timestamp=run_timestamp | |
| ) | |
| # Build URLs summary | |
| urls_summary = "" | |
| if supabase_client: | |
| urls_summary = f"π€ UPLOADED FILES (Timestamp: {run_timestamp}):\n\n" | |
| urls_summary += "INPUT FILES:\n" | |
| if input_urls.get("qp_url"): | |
| urls_summary += f"β’ Question Paper: {input_urls['qp_url']}\n" | |
| if input_urls.get("ms_url"): | |
| urls_summary += f"β’ Markscheme: {input_urls['ms_url']}\n" | |
| if input_urls.get("ans_url"): | |
| urls_summary += f"β’ Answer Sheet: {input_urls['ans_url']}\n" | |
| urls_summary += "\nOUTPUT FILES:\n" | |
| if output_urls.get("graded_pdf_url"): | |
| urls_summary += f"β’ Graded PDF: {output_urls['graded_pdf_url']}\n" | |
| if output_urls.get("imprinted_pdf_url"): | |
| urls_summary += f"β’ Imprinted PDF: {output_urls['imprinted_pdf_url']}\n" | |
| urls_summary += f"\nπ All files stored in: examfiles/{run_timestamp}/\n" | |
| if not any(input_urls.values()) and not any(output_urls.values()): | |
| urls_summary += "\nβ οΈ No files were uploaded to Supabase" | |
| if supabase_client: | |
| return ( | |
| qpms_text or "", | |
| as_text or "", | |
| grading_text or "", | |
| grading_pdf_path, | |
| imprinted_pdf_path, | |
| urls_summary | |
| ) | |
| else: | |
| return ( | |
| qpms_text or "", | |
| as_text or "", | |
| grading_text or "", | |
| grading_pdf_path, | |
| imprinted_pdf_path | |
| ) | |
| # Set up the click handler based on whether Supabase is enabled | |
| if supabase_client: | |
| run_button.click( | |
| fn=run_pipeline, | |
| inputs=[qp_file, ms_file, ans_file, subject_dropdown, imprint_toggle], | |
| outputs=[qpms_box, as_box, grading_output_box, grading_pdf_file, imprint_pdf_file, file_urls_box] | |
| ) | |
| else: | |
| run_button.click( | |
| fn=run_pipeline, | |
| inputs=[qp_file, ms_file, ans_file, subject_dropdown, imprint_toggle], | |
| outputs=[qpms_box, as_box, grading_output_box, grading_pdf_file, imprint_pdf_file] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |