import os import re import json import subprocess import time import img2pdf import gradio as gr from google import genai # NEW SDK from markdown_pdf import MarkdownPdf, Section from pdf2image import convert_from_path from PIL import Image, ImageDraw, ImageFont import cv2 import numpy as np from PyPDF2 import PdfReader, PdfWriter # ---------------- CONFIG ---------------- # Create client with new SDK client = genai.Client(api_key=os.getenv("GEMINI_API_KEY")) GRID_ROWS, GRID_COLS = 20, 14 # ---------------- PROMPTS ---------------- PROMPTS = { "QP_MS_TRANSCRIPTION": { "role": "system", "content": """You are a high-quality OCR/Transcription assistant. INPUT: This file is a PDF that first contains the Question Paper and immediately after it the Markscheme. TASK: 1. Transcribe EXACTLY all the questions FIRST (with their total marks). 2. After ALL questions, transcribe the Markscheme exactly, preserving M/A/R notation in brackets. 3. Always number the questions sequentially (Question 1, Question 2, Question 3, …) **in the order they appear in the PDF**, even if the PDF shows a different number or leaves it blank. Do NOT skip or leave Question: blank. Never start a question other than question 1 (even if it is labelled in pdf as 8 name it 1). 4. If a question or sub-question is labelled with a letter (e.g., "Q1.a", "Q2(b)", "1 (c)(i)"), transcribe it as "Question 1.a", "Question 2.b", "Question 1.c.i" etc., exactly preserving the hierarchy of sub-question identifiers. 5. After the markscheme, DETECT and FLAG all questions in the markscheme where a graph/diagram is expected. For each, output the question number and the page number in the format below. FORMAT: ==== PAPER TOTAL MARKS ==== ==== QUESTIONS BEGIN ==== Question 1.a Total Marks: QP: --QUESTION-END-- Question 1.b Total Marks: QP: --QUESTION-END-- Question 2 Total Marks: QP: --QUESTION-END-- (repeat for all questions in order of appearance) ==== QUESTIONS END ==== ==== MARKSCHEME BEGIN ==== Answer 1.a: Answer 1.b: Answer 2 : (repeat for all answers) ==== MARKSCHEME END ==== ==== GRAPH EXPECTED QUESTIONS ==== Graph expected in: - Question → Page (one per line) ==== END GRAPH EXPECTED ==== """ }, "GRADING_PROMPT": { "role": "system", "content": """You are an official examiner. Apply the following grading rules precisely and consistently. ### Mark Abbreviations: - **M**: Method marks – awarded for correct mathematical procedures, approaches, or techniques - **A**: Accuracy/Answer marks – awarded for correct final or intermediate answers - **R**: Reasoning marks – awarded for justifications, explanations, or logical deductions - **AG**: Answer Given – the answer is provided in the question; award no marks for simply stating it - **FT**: Follow Through – marks awarded when a student correctly applies a method using their own previous (incorrect) answer - **MR**: Misread – penalty applied when student misreads a value from the question (deduct from first applicable A-mark only, once per question) --- ## Grading Rules ### Core Principles: 1. **Award marks using official annotations** (e.g., M1, A2, R1). 2. **Do not award full marks for answers alone** – check that the required method steps are present. 3. **A-marks typically depend on M-marks** – an A-mark usually requires the corresponding M-mark to be earned first (unless the markscheme explicitly states otherwise). 4. **Accept equivalent forms** unless the markscheme specifies exact form (e.g., "simplified form only"). 5. **Apply Follow Through (FT)** when a student uses an incorrect answer correctly in subsequent steps. 6. **Misread (MR) Penalty**: If a student misreads a numerical value from the question: - Deduct from the **first applicable A-mark** in that question only - Apply MR penalty **once per question** (not per sub-question) - M-marks can still be awarded if the method is correct - Annotate as: `A0 (MR applied)` ### Formatting Lost Marks: - **Lost marks must be highlighted in red**: `M0`, `A0`, etc. - **In the table**: Use red styling for "Awarded" column when mark is lost - **Do use red** for markscheme expectations or student responses themselves when mark is lost ### Graph/Diagram Questions: - When graph/diagram images are provided, describe visual evidence in the "Examiner Notes" column - Examples: "Correct parabola shape, y-intercept matches", "Line has wrong gradient", "Asymptote missing" --- ## Output Format Produce the following structure for each question/sub-question: ### Question <1.a> **Markscheme vs Student Answer** | Mark ID | Markscheme Expectation | Student's Response | Awarded | Examiner Notes | |---------|------------------------|-------------------|---------|----------------| | M1 | Use product rule: $u'v + uv'$ | Student wrote: $u'v + uv'$ ✓ | M1 | Correct method applied | | A1 | Final answer: $2xe^x + e^x$ | Student answer: $2xe^x + e^x$ ✓ | A1 | Correct, depends on M1 | **Total: X/Y** --- *(Repeat for all questions)* --- ### Examiner's Summary Report **IMPORTANT**: Group all sub-questions under their parent question. Sum the marks for all sub-parts (e.g., 1.a, 1.b, 1.c) and report as a single entry for Question 1. **Format Rules for Summary Report**: - If a question has sub-parts (1.a, 1.b, etc.), group them as "Question 1" with combined marks - If a question has no sub-parts (just "Question 2"), report it directly - Assign ONE overall remark per grouped question based on the predominant error type across all sub-parts | Question Number | Marks | Remark | Feedback | |-----------------|-------|--------|----------| | 1 | 10/12 | A | Strong answer, only minor mistake | | 2 | 5/8 | B | Good attempt, missing some detail | | 3 | 7/10 | C | Adequate, but lacked depth/clarity | | … | … | … | … (continue for all answers) | ...(repeat for all answers) **Example Explanation**: - Question 1 has sub-parts 1.a (3/5), 1.b (5/7), 1.c (2/0) → Total: (3+5+2)/(5+7+0) = 10/12 - Question 2 has sub-parts 2.a (2/3), 2.b (3/5) → Total: (2+3)/(3+5) = 5/8 - Question 3 has no sub-parts → Report as-is: 7/10 **Total: /** --- ## Remark Codes (assign ONE per grouped question): - **A**: All Good – mostly full marks across sub-parts, no major errors - **B**: Silly Mistake – minor arithmetic/algebraic slips (e.g., $2 + 3 = 6$, sign error in final step) - **C**: Conceptual Error – wrong formula, incorrect method, fundamental misunderstanding in one or more sub-parts - **D**: Hard Question – question is inherently difficult; partial credit reflects genuine attempt - **E**: Not Applicable – question not attempted, or answer entirely illegible/missing **Remark Selection for Grouped Questions**: - If all sub-parts are correct → **A** - If majority are correct with 1-2 arithmetic errors → **B** - If one or more sub-parts show conceptual errors → **C** - If question is difficult and student made reasonable attempt → **D** - If all sub-parts are missing/illegible → **E** --- ## Additional Instructions: - You will receive: 1. **QP+MS transcript** (authoritative source for question wording, total marks, and markscheme with M/A/R notation) 2. **AS transcript** (student answers in LaTeX-formatted markdown) 3. **Graph images** (if applicable) for questions involving diagrams - Match student answers to question IDs from the QP+MS transcript. - Grade according to the **verbatim markscheme**, but accept mathematically/conceptually equivalent answers (justify in "Examiner Notes"). - For graph questions, use provided images as visual context and describe what you observe. - Ensure mark IDs in your grading table match those in the markscheme. - Be consistent: if a student makes the same type of error multiple times, apply the same penalty logic each time. """ } } # ---------------- HELPERS ---------------- def save_as_pdf(text, filename="output.pdf"): pdf = MarkdownPdf() pdf.add_section(Section(text, toc=False)) pdf.save(filename) return filename def compress_pdf(input_path, output_path=None, max_size=20*1024*1024): if output_path is None: base, ext = os.path.splitext(input_path) output_path = f"{base}_compressed{ext}" try: size = os.path.getsize(input_path) except Exception: return input_path if size <= max_size: print(f"ℹ️ Not compressing {input_path} ({size/1024/1024:.2f} MB <= {max_size/1024/1024} MB)") return input_path print(f"🔎 Compressing {input_path} ({size/1024/1024:.2f} MB) -> {output_path}") try: gs_cmd = [ "gs", "-sDEVICE=pdfwrite", "-dCompatibilityLevel=1.4", "-dPDFSETTINGS=/ebook", "-dNOPAUSE", "-dQUIET", "-dBATCH", f"-sOutputFile={output_path}", input_path ] subprocess.run(gs_cmd, check=True) new_size = os.path.getsize(output_path) print(f"✅ Compression done. New size: {new_size/1024/1024:.2f} MB") if new_size <= max_size: return output_path else: print("⚠️ Compressed file still larger than threshold; returning original") return input_path except Exception as e: print("❌ Compression error:", e) return input_path def upload_to_gemini(path, display_name=None): """ Upload a file to Gemini using the NEW google-genai SDK. """ print(f"📤 Uploading {path} to Gemini...") try: uploaded_file = client.files.upload(file=path) # Wait for processing to complete print(f"⏳ Waiting for file processing: {uploaded_file.name}") while uploaded_file.state.name == "PROCESSING": time.sleep(2) uploaded_file = client.files.get(name=uploaded_file.name) if uploaded_file.state.name == "FAILED": raise Exception(f"File processing failed: {uploaded_file.name}") print(f"✅ Uploaded and processed: {uploaded_file.name}") return uploaded_file except Exception as e: print(f"❌ Upload failed for {path}: {e}") raise def merge_pdfs(paths, output_path): writer = PdfWriter() for p in paths: reader = PdfReader(p) for page in reader.pages: writer.add_page(page) with open(output_path, "wb") as f: writer.write(f) return output_path def gemini_generate_content(prompt_text, file_upload_obj=None, image_obj=None, model_name="gemini-2.5-pro"): """ Send prompt_text and optionally an uploaded file (or an image object/list) to the model using NEW SDK. Returns textual response and prints progress. """ contents = [prompt_text] if file_upload_obj: contents.append(file_upload_obj) if image_obj: if isinstance(image_obj, list): for img_path in image_obj: if isinstance(img_path, str): pil_img = Image.open(img_path) contents.append(pil_img) else: contents.append(img_path) else: if isinstance(image_obj, str): pil_img = Image.open(image_obj) contents.append(pil_img) else: contents.append(image_obj) print("📡 Sending request to Gemini (prompt length:", len(prompt_text), "chars )") try: response = client.models.generate_content( model=model_name, contents=contents ) raw_text = response.text print("📥 Received response (chars):", len(raw_text)) return raw_text except Exception as e: print(f"❌ Generation failed: {e}") # Try fallback model print("⚡ Trying fallback model: gemini-2.5-flash") try: response = client.models.generate_content( model="gemini-2.5-flash", contents=contents ) raw_text = response.text print("📥 Received response (chars):", len(raw_text)) return raw_text except Exception as e2: print(f"❌ Fallback also failed: {e2}") raise # ---------------- PARSERS ---------------- def extract_question_ids_from_qpms(text: str): """Extract question IDs from QP+MS transcript.""" print("🔎 Extracting question IDs from QP+MS transcript using regex...") clean_text = text.replace("\u00A0", " ").replace("\t", " ") primary_matches = re.findall(r"^\s*Question\s*[:\s]\s*([\dA-Za-z.()]+)", clean_text, re.MULTILINE) if primary_matches: print(f"✅ Extracted {len(primary_matches)} question IDs from explicit 'Question X' lines.") print("IDs:", primary_matches) return primary_matches fallback_matches = re.findall(r"^\s*(\d+(?:[.)]|\([a-zA-Z0-9]+\))?[a-zA-Z0-9]*)", clean_text, re.MULTILINE) if fallback_matches: print(f"✅ Extracted {len(fallback_matches)} question IDs (fallback numbered lists).") print("IDs:", fallback_matches) else: print("⚠️ No question IDs extracted; will send NA placeholder.") return fallback_matches # def build_as_cot_prompt_with_expected_ids(expected_ids, qpms_text=None): # """ # Construct the AS transcription prompt injecting the expected IDs block and graph detection instructions, # modifying it to include a Chain-of-Thought (CoT) section using a tag, and # requiring mathematical expressions to be enclosed in LaTeX dollar delimiters ($...$). # """ # if not expected_ids: # ids_block = "{NA}" # else: # ids_block = "{\n" + "\n".join(expected_ids) + "\n}" # qpms_guidance = "" # if qpms_text: # qpms_guidance = ( # "\nYou are also provided with the full transcript of the Question Paper and Markscheme (QP+MS). " # "Use this transcript primarily to resolve **ambiguous handwriting** (e.g., if a number could be '$-1.6$' or '$1.6$'). " # "If you are confident in your transcription without referring to the QP+MS, use your judgment. " # "**Always prioritize accuracy and context from the QP+MS transcript when in doubt about a specific ambiguous character or expression.**\n" # ) # prompt = f"""You are a high-quality handwritten transcription assistant, performing transcription with a Chain-of-Thought process. # INPUT: This PDF contains a student's handwritten answer sheet. # {qpms_guidance} # TASK: # 1. **THINKING:** Before transcribing each answer, you must document your thought process using the **** tag. # - Identify the question ID. If inferred, note why. # - Detail any ambiguities encountered (e.g., unclear numbers, symbols, or structure). # - Explain how you resolved ambiguities, specifically if you referred to the QP+MS transcript. # - If you *did* refer to the QP+MS but decided to keep your original transcription, state this clearly. # - If you initially label an answer as 2.a but later realize it aligns better with 2.b based on the marking scheme, you should reassign it to 2.b and briefly explain your reasoning in the tag to maintain clarity and consistency. # *Example Thinking:* # # - Found Question 3(a). # - Noticed '2x' was written ambiguously; it could be '2x' or '21x'. # - Referred to QP+MS: The expected answer involves '$21x$'. # - Re-examined the handwriting carefully: The student's handwriting strongly appears to be '$2x$' and not '$21x$'. # - DECISION: Transcribe exactly what the student wrote: '$2x$'. # # *Example Thinking 2 (Ambiguity Resolved by MS):* # # - Found Question INFERRED: 1(b) based on proximity to 1(a). # - Noticed the final answer looked like '3.6', but the decimal point was very faint and could be '36'. # - Referred to QP+MS: Expected answer is '$3.8$'. Re-examined the student's writing: it appears to be a poorly written '$3.8$' which I initially misread as '$3.6$'. # - DECISION: Corrected my transcription to '$3.8$' based on re-evaluation and MS context. # # 2. **TRANSCRIPTION:** Transcribe the student's answers with accordance to the markcheme provided. Preserve step order and line breaks. # - Attempt to assign each answer to a question ID if the student has labelled it (e.g., "1", "1a", "2(b)", "3"). # - If the student hasn't labelled answers, segment contiguous answer blocks and attempt to infer question IDs from context — but mark inferred IDs clearly as "**INFERRED: **". # - **Enclose all mathematical expressions and single variables in LaTeX dollar delimiters ($...$).** # - *Example:* "The area is $A = \pi r^2$ so $3x+5 = 11$ thus $x=2$." # - If a diagram/graph is omitted, write **[Graph omitted]**. # - Unreadable parts: **[illegible]**. # - Unanswered: **[No response]**. # - Do NOT recreate diagrams. # Ensure consistency and determinism in formatting so subsequent models can grade directly from this aligned format. # Expected questions (if missing, write NA): # {ids_block} # ----------------------- # OUTPUT FORMAT: # ... # Question # AS: # ... # Question # AS: # ... # ==== GRAPH FOUND ANSWERS ==== # Graph found in: # - Answer → Page # (one per line) # ==== END GRAPH FOUND ====""" # return prompt def build_as_cot_prompt_with_expected_ids(expected_ids, qpms_text=None): """ Construct the AS transcription prompt injecting the expected IDs block and graph detection instructions, modifying it to include a Chain-of-Thought (CoT) section using a tag, and requiring mathematical expressions to be enclosed in LaTeX dollar delimiters ($...$). The full qpms_text, when provided, is embedded directly in the prompt and not skipped. """ if not expected_ids: ids_block = "{NA}" else: ids_block = "{\n" + "\n".join(expected_ids) + "\n}" qpms_section = "" if qpms_text is not None: # Include the full QP+MS transcript exactly (strip only leading/trailing whitespace) qpms_section = ( "\nYou are also provided with the full transcript of the Question Paper and Markscheme (QP+MS) below." "\nUse it primarily to resolve ambiguous handwriting and to confirm expected answers when needed." "\n--- BEGIN QP+MS TRANSCRIPT ---\n" f"{qpms_text.strip()}\n" "--- END QP+MS TRANSCRIPT ---\n" ) prompt = f"""You are a high-quality handwritten transcription assistant, performing transcription with a Chain-of-Thought process. INPUT: This PDF contains a student's handwritten answer sheet. {qpms_section} TASK: 1. **THINKING:** Before transcribing each answer, you must document your thought process using the **** tag. - Identify the question ID. If inferred, note why. - Detail any ambiguities encountered (e.g., unclear numbers, symbols, or structure). - Explain how you resolved ambiguities, specifically if you referred to the QP+MS transcript. - If you *did* refer to QP+MS but decided to keep your original transcription, state this clearly. - If you initially label an answer as 2.a but later realize it aligns better with 2.b based on the marking scheme, reassign it to 2.b and briefly explain your reasoning in the tag. *Example Thinking:* - Found Question 3(a). - Noticed '2x' was written ambiguously; it could be '2x' or '21x'. - Referred to QP+MS: The expected answer involves '$21x$'. - Re-examined the handwriting carefully: The student's handwriting strongly appears to be '$2x$' and not '$21x$'. - DECISION: Transcribe exactly what the student wrote: '$2x$'. 2. **TRANSCRIPTION:** Transcribe the student's answers in accordance with the markscheme provided. Preserve step order and line breaks. - Attempt to assign each answer to a question ID if the student has labelled it (e.g., "1", "1a", "2(b)", "3"). - If the student hasn't labelled answers, segment contiguous answer blocks and attempt to infer question IDs from context — mark inferred IDs clearly as "**INFERRED: **". - **Enclose all mathematical expressions and single variables in LaTeX dollar delimiters ($...$).** - Example: "The area is $A = \pi r^2$ so $3x+5 = 11$ thus $x=2$." - If a diagram/graph is omitted, write **[Graph omitted]**. - Unreadable parts: **[illegible]**. - Unanswered: **[No response]**. - Do NOT recreate diagrams. Ensure consistency and determinism in formatting so subsequent models can grade directly from this aligned format. Expected questions (if missing, write NA): {ids_block} ----------------------- OUTPUT FORMAT: ... Question AS: ... Question AS: ... ==== GRAPH FOUND ANSWERS ==== Graph found in: - Answer → Page (one per line) ==== END GRAPH FOUND ====""" return prompt def extract_graph_questions_from_ms(text: str): """Extract graph questions and page numbers from MS transcript.""" clean_text = text.replace("\u00A0", " ").replace("\t", " ") match = re.search(r"==== GRAPH EXPECTED QUESTIONS ====\s*(.*?)\s*==== END GRAPH EXPECTED ====", clean_text, re.S) graph_dict = {} if match: block = match.group(1) for line in block.splitlines(): line = line.strip() if line.startswith("- Question"): q_match = re.match(r"- Question\s+([\dA-Za-z.()]+)\s*→\s*Page\s*(\d+)", line) if q_match: q_id, page = q_match.groups() graph_dict[q_id] = int(page) return graph_dict def extract_graph_answers_from_as(text: str): """Extract graph answers and page numbers from AS transcript.""" clean_text = text.replace("\u00A0", " ").replace("\t", " ") block = re.search(r"==== GRAPH FOUND ANSWERS ====\s*(.*?)\s*==== END GRAPH FOUND ====", clean_text, re.S) graph_dict = {} if block: for line in block.group(1).splitlines(): line = line.strip() if line.startswith("- Answer"): match = re.match(r"- Answer\s+([\dA-Za-z.()]+)\s*→\s*Page\s*(\d+)", line) if match: ans_id, page = match.groups() graph_dict[ans_id] = int(page) return graph_dict def extract_marks_from_grading(grading_text): """ Parse the grading markdown and extract marks per question. """ print("🔎 Extracting awarded marks from grading output...") grading_json = {"grading": []} question_blocks = re.split(r"##\s*Question\s+", grading_text) for block in question_blocks[1:]: first_line = block.strip().splitlines()[0].strip() if block.strip().splitlines() else "" q_id_match = re.match(r"([0-9]+(?:[a-zA-Z]|\([^)]+\)|(?:\.[a-zA-Z0-9]+))*)", first_line) if not q_id_match: q_id = first_line.split()[0] if first_line else "" else: q_id = q_id_match.group(1).strip() awarded = re.findall(r"\b(M\d+|A\d+|R\d+|M0|A0|R0)\b", block) grading_json["grading"].append({ "question": q_id, "marks_awarded": awarded }) print("✅ Extracted grading marks for", len(grading_json["grading"]), "question blocks.") print(json.dumps(grading_json, indent=2)) return grading_json # ---------------- MAPPING/IMPRINT HELPERS ---------------- def ask_gemini_for_mapping_batch(image_paths, grading_json, expected_ids=None, rows=GRID_ROWS, cols=GRID_COLS): """ Send multiple page images together to Gemini for batch mapping processing. """ ids_block = "{NA}" if expected_ids: ids_block = "{\n" + "\n".join(expected_ids) + "\n}" prompt = f"""You are an exam marker. Your role is to identify where each question begins on each page. The pages are divided into a {rows} x {cols} grid. Each cell has a RUNNING NUMBER label. For each question in the grading JSON, return the cell NUMBER where the FIRST STEP of that question begins. ⚠ IMPORTANT RULES: - Do not place marks inside another question's answer area. - Prefer placing the marks in a BLANK cell immediately to the RIGHT of the answer step. If no blank cell is available to the right, then place in a blank cell to the LEFT. - Never place marks above or below the answer. - Each question should have unique cell number - If a question serial number is visible in the answer image, you must mandatorily identify the corresponding question using the grading JSON. IMPORTANT: For your help i have provided u questions that u can expect in the images: {ids_block} Return JSON only, like: [{{"page": 1, "question": "1(a)", "cell_number": 15}}, ...] Grading JSON: {json.dumps(grading_json, indent=2)}""" images = [Image.open(p) for p in image_paths] print(f"📡 Sending batch mapping request for {len(image_paths)} pages to Gemini...") try: contents = [prompt] + images response = client.models.generate_content( model="gemini-2.0-flash-exp", contents=contents ) raw_text = response.text except: print("⚠️ Trying fallback model for mapping...") contents = [prompt] + images response = client.models.generate_content( model="gemini-1.5-flash", contents=contents ) raw_text = response.text print("📥 Batch mapping response (chars):", len(raw_text)) print("🔎 Gemini raw batch output:") print(raw_text) try: match = re.search(r'(\[.*\])', raw_text, re.DOTALL) if match: mapping = json.loads(match.group(1)) print(f"✅ Parsed Gemini batch mapping for {len(image_paths)} pages") return mapping else: print("❌ Failed to find JSON array in response") return [] except Exception as e: print(f"❌ Failed to parse Gemini JSON mapping: {e}") return [] def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, expected_ids=None, rows=GRID_ROWS, cols=GRID_COLS): """ Convert PDF to images, create grid-numbered images for batch sending to Gemini, then annotate and produce imprinted PDF. """ print("📄 Converting answer PDF to images for imprinting...") pages = convert_from_path(pdf_path, dpi=200) annotated_page_paths = [] temp_grid_images = [] for p_index, page in enumerate(pages): img = page.convert("RGB") w, h = img.size cell_w, cell_h = w / cols, h / rows draw = ImageDraw.Draw(img) try: num_font = ImageFont.truetype("arial.ttf", 20) except Exception: num_font = ImageFont.load_default() cell_num = 1 for r in range(rows): for c in range(cols): x = int(c * cell_w + cell_w / 2) y = int(r * cell_h + cell_h / 2) text = str(cell_num) bbox = draw.textbbox((0, 0), text, font=num_font) tw = bbox[2] - bbox[0] th = bbox[3] - bbox[1] draw.text((x - tw/2, y - th/2), text, fill="black", font=num_font) cell_num += 1 temp_path = f"page_{p_index+1}_grid.png" img.save(temp_path, "PNG") temp_grid_images.append(temp_path) print("🛰 Created grid image:", temp_path) print("📡 Sending page images to Gemini in batches for mapping...") batch_size = 10 all_mappings = [] for start in range(0, len(temp_grid_images), batch_size): batch_paths = temp_grid_images[start:start+batch_size] batch_mapping = ask_gemini_for_mapping_batch(batch_paths, grading_json, expected_ids, rows, cols) all_mappings.extend(batch_mapping) print(f"✅ Processed batch {start//batch_size + 1}: pages {start+1}-{start+len(batch_paths)}") print("🖊 Annotating pages with marks...") for p_index, page in enumerate(pages): page_num = p_index + 1 page_img = page.convert("RGB") img_cv = np.array(page_img) img_cv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2BGR) h, w, _ = img_cv.shape cell_w_px, cell_h_px = w / cols, h / rows page_mappings = [m for m in all_mappings if m.get("page") == page_num] for item in page_mappings: qid = item.get("question") cell_number = item.get("cell_number") if qid is None or cell_number is None: continue marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", []) if g["question"] == qid), []) if not marks_list: marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", []) if g["question"].lower() == (qid or "").lower()), []) marks_text = ",".join(marks_list) if marks_list else "?" row = (cell_number - 1) // cols col = (cell_number - 1) % cols x_c = int((col + 1) * cell_w_px - cell_w_px / 4) y_c = int((row + 0.5) * cell_h_px) font_scale = max(1.0, min(2.0, cell_h_px / 40.0)) thickness = max(2, int(font_scale * 2)) cv2.putText(img_cv, marks_text, (x_c, y_c), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 255), thickness, cv2.LINE_AA) print(f"🖊 Marks annotated for page {page_num}, question {qid}: {marks_text}") annotated_path = f"annotated_page_{page_num}.png" cv2.imwrite(annotated_path, img_cv) annotated_page_paths.append(annotated_path) print("✅ Annotated page saved:", annotated_path) print("📑 Merging annotated pages into final PDF...") with open(output_pdf, "wb") as f: f.write(img2pdf.convert(annotated_page_paths)) compressed = compress_pdf(output_pdf) print("📑 Imprinted PDF saved to:", compressed) return compressed def extract_pdf_pages_as_images(pdf_path, page_numbers, prefix): """ Extracts unique pages (1-based) from a PDF as images, saves as PNG, returns list of file paths. """ unique_pages = sorted(set(page_numbers)) images = convert_from_path(pdf_path, dpi=200, first_page=min(unique_pages), last_page=max(unique_pages)) out_paths = [] for idx, page_num in enumerate(unique_pages): img_idx = page_num - min(unique_pages) img = images[img_idx] out_path = f"{prefix}_page_{page_num}.png" img.save(out_path, "PNG") print(f"📤 Extracted graph page {page_num} from {pdf_path} as {out_path}") out_paths.append(out_path) return out_paths # ---------------- PIPELINE ---------------- def align_and_grade_pipeline(qp_path, ms_path, ans_path, imprint=False): """ Final pipeline with graph-aware grading logic using NEW SDK. """ try: print("🔁 Starting pipeline...") qp_path = compress_pdf(qp_path) ms_path = compress_pdf(ms_path) ans_path = compress_pdf(ans_path) merged_qpms_path = os.path.splitext(qp_path)[0] + "_merged_qp_ms.pdf" merge_pdfs([qp_path, ms_path], merged_qpms_path) print("📎 Merged QP + MS ->", merged_qpms_path) print("🔼 Uploading files to Gemini...") merged_uploaded = upload_to_gemini(merged_qpms_path) ans_uploaded = upload_to_gemini(ans_path) print("✅ Upload complete.") print("1.i) Transcribing QP+MS (questions first, then full markscheme, with graph detection)...") qpms_prompt = PROMPTS["QP_MS_TRANSCRIPTION"]["content"] + "\nAt the end, also list all questions in the markscheme where a graph is expected, in the format:\nGraph expected in:\n- Question → Page \n(One per line, after ==== MARKSCHEME END ====)" qpms_text = gemini_generate_content(qpms_prompt, file_upload_obj=merged_uploaded) print("📄 QP+MS transcription received. Saving debug file: debug_qpms_transcript.txt") with open("debug_qpms_transcript.txt", "w", encoding="utf-8") as f: f.write(qpms_text) ms_graph_mapping = extract_graph_questions_from_ms(qpms_text) print("🖼️ Graph-expected questions in MS:", ms_graph_mapping) ms_graph_pages = list(ms_graph_mapping.values()) ms_graph_images = [] if ms_graph_pages: ms_graph_images = extract_pdf_pages_as_images(merged_qpms_path, ms_graph_pages, prefix="qpms_graph") extracted_ids = extract_question_ids_from_qpms(qpms_text) if not extracted_ids: extracted_ids = ["NA"] print("1.ii) Building AS transcription prompt with expected question IDs and graph detection, sending to Gemini...") as_prompt = build_as_cot_prompt_with_expected_ids(extracted_ids, qpms_text) + "\nAt the end, also list all answers where a graph is found, in the format:\nGraph found in:\n- Answer → Page \n(One per line, after all answers)" as_text = gemini_generate_content(as_prompt, file_upload_obj=ans_uploaded) print("📝 AS transcription received. Saving debug file: debug_as_transcript.txt") with open("debug_as_transcript.txt", "w", encoding="utf-8") as f: f.write(as_text) as_graph_mapping = extract_graph_answers_from_as(as_text) print("🖼️ Graph-attempted answers in AS:", as_graph_mapping) as_graph_pages = list(as_graph_mapping.values()) as_graph_images = [] if as_graph_pages: as_graph_images = extract_pdf_pages_as_images(ans_path, as_graph_pages, prefix="as_graph") print("2) Preparing grading input and sending to Gemini for grading...") grading_input = ( "=== QP+MS TRANSCRIPT BEGIN ===\n" + qpms_text + "\n=== QP+MS TRANSCRIPT END ===\n\n" + "=== ANSWER SHEET TRANSCRIPT BEGIN ===\n" + as_text + "\n=== ANSWER SHEET TRANSCRIPT END ===\n" ) if ms_graph_images or as_graph_images: graph_note = "\n\n---\nSome questions require graphs. I've attached the relevant graph pages from QP+MS and from the Answer Sheet. Use them as visual context when grading.\n---\n" grading_input += graph_note grading_prompt_system = PROMPTS["GRADING_PROMPT"]["content"] grading_images = ms_graph_images + as_graph_images grading_text = gemini_generate_content(grading_prompt_system + "\n\nPlease grade the following transcripts:\n" + grading_input, image_obj=grading_images if grading_images else None) print("🧾 Grading output received. Saving debug file: debug_grading.md") with open("debug_grading.md", "w", encoding="utf-8") as f: f.write(grading_text) base_name = os.path.splitext(os.path.basename(ans_path))[0] grading_pdf_path = save_as_pdf(grading_text, f"{base_name}_graded.pdf") print("📄 Grading PDF saved:", grading_pdf_path) grading_json = extract_marks_from_grading(grading_text) with open("debug_grading_json.json", "w", encoding="utf-8") as f: json.dump(grading_json, f, indent=2, ensure_ascii=False) print("🔧 Grading marks extraction complete.") imprinted_pdf_path = None if imprint: print("✍ Imprint option enabled. Starting imprinting process...") imprinted_pdf_path = f"{base_name}_imprinted.pdf" imprinted_pdf_path = imprint_marks_using_mapping(ans_path, grading_json, imprinted_pdf_path, extracted_ids) print("✅ Imprinting finished. Imprinted PDF at:", imprinted_pdf_path) print("🏁 Pipeline finished successfully.") return qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path except Exception as e: print("❌ Pipeline error:", e) import traceback traceback.print_exc() return f"❌ Error: {e}", None, None, None, None # ---------------- GRADIO UI ---------------- with gr.Blocks(title="AI Grading (Fixed - google-genai SDK)") as demo: gr.Markdown("## 📘 AI Grading — Fixed with google-genai SDK") gr.Markdown("**✅ Now using the new official `google-genai` SDK (no more ragStoreName errors!)**") with gr.Row(): qp_file = gr.File(label="📄 Upload Question Paper (PDF)") ms_file = gr.File(label="📄 Upload Markscheme (PDF)") ans_file = gr.File(label="📝 Upload Student Answer Sheet (PDF)") imprint_toggle = gr.Checkbox(label="✍ Imprint Marks on Student Answer Sheet", value=False) run_button = gr.Button("🚀 Run Pipeline") with gr.Row(): qpms_box = gr.Textbox(label="📑 QP+MS Transcript", lines=12) as_box = gr.Textbox(label="📝 AS Transcript", lines=12) grading_output_box = gr.Textbox(label="🧾 Grading (Markdown)", lines=20) grading_pdf_file = gr.File(label="📥 Download Grading PDF") imprint_pdf_file = gr.File(label="📥 Download Imprinted PDF (Optional)") def run_pipeline(qp_file_obj, ms_file_obj, ans_file_obj, imprint_flag): if not qp_file_obj or not ms_file_obj or not ans_file_obj: return "❌ Please upload all three files", "", "", None, None qp_path = qp_file_obj.name ms_path = ms_file_obj.name ans_path = ans_file_obj.name qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path = align_and_grade_pipeline( qp_path, ms_path, ans_path, imprint=imprint_flag ) return qpms_text or "", as_text or "", grading_text or "", grading_pdf_path, imprinted_pdf_path run_button.click( fn=run_pipeline, inputs=[qp_file, ms_file, ans_file, imprint_toggle], outputs=[qpms_box, as_box, grading_output_box, grading_pdf_file, imprint_pdf_file] ) if __name__ == "__main__": demo.launch()