Spaces:
Running
Running
| # ========================= | |
| # Cell 1 — Install deps (Colab) | |
| # ========================= | |
| !apt-get -qq update | |
| !apt-get -qq install -y poppler-utils tesseract-ocr tesseract-ocr-eng tesseract-ocr-ara | |
| !pip -q install -U transformers accelerate sentencepiece pymupdf pdf2image pytesseract pillow tqdm | |
| # ========================= | |
| # Cell 2 — Imports + Config | |
| # ========================= | |
| import os, re, json | |
| from pathlib import Path | |
| from math import ceil | |
| import torch | |
| from tqdm.auto import tqdm | |
| import fitz # pymupdf | |
| from pdf2image import convert_from_path | |
| import pytesseract | |
| from PIL import ImageOps, ImageEnhance | |
| from transformers import AutoTokenizer, AutoModelForSeq2SeqLM | |
| OUTPUT_DIR = Path("/content/output") | |
| OUTPUT_DIR.mkdir(parents=True, exist_ok=True) | |
| # Model (English-focused) | |
| MODEL_NAME = "facebook/bart-large-cnn" # https://huggingface.co/facebook/bart-large-cnn | |
| # OCR | |
| OCR_LANG = "eng+ara" | |
| OCR_DPI = 250 | |
| NATIVE_MIN_CHARS_PER_PAGE = 60 # if native extracted text < this => OCR that page | |
| # Summarization quality/speed knobs | |
| BATCH_SIZE = 4 | |
| NUM_BEAMS = 4 | |
| NO_REPEAT_NGRAM_SIZE = 3 | |
| EARLY_STOPPING = False | |
| # Chunking | |
| MAX_INPUT_TOKENS = 1024 | |
| HEADROOM_TOKENS = 16 | |
| EFFECTIVE_MAX_INPUT = MAX_INPUT_TOKENS - HEADROOM_TOKENS | |
| OVERLAP_SENTENCES = 2 | |
| # Output size (big + محترم) | |
| CHAPTER_MAX_NEW_TOKENS_CAP = 320 # max tokens generated per chapter summary | |
| CHAPTER_MIN_NEW_TOKENS_FLOOR = 120 | |
| BOOK_PARTS = 8 # final organized "big" summary in N parts | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| print("Device:", device) | |
| print("Output folder:", OUTPUT_DIR) | |
| # ========================= | |
| # Cell 3 — Upload input (PDF or TXT) | |
| # ========================= | |
| from google.colab import files | |
| uploaded = files.upload() | |
| INPUT_PATH = Path(next(iter(uploaded.keys()))).resolve() | |
| print("Uploaded:", INPUT_PATH) | |
| print("Suffix:", INPUT_PATH.suffix.lower()) | |
| # ========================= | |
| # Cell 4 — PDF/TXT -> Clean TXT (robust native + per-page OCR fallback) | |
| # ========================= | |
| _SENT_BOUNDARY_RE = re.compile(r"(?<=[\.\!\?\u061F\u06D4\u061B…])\s+") # . ! ? ؟ ۔ ؛ … | |
| def normalize_text(text: str) -> str: | |
| text = text.replace("\r\n", "\n").replace("\r", "\n") | |
| text = re.sub(r"[ \t]+", " ", text) | |
| text = re.sub(r"\n{3,}", "\n\n", text) | |
| return text.strip() | |
| def ocr_image_pil(img): | |
| # Light preprocessing to improve OCR | |
| img = img.convert("RGB") | |
| img = ImageOps.grayscale(img) | |
| img = ImageEnhance.Contrast(img).enhance(1.6) | |
| return img | |
| def ocr_pdf_page(pdf_path: Path, page_number_1based: int, dpi: int = OCR_DPI, lang: str = OCR_LANG) -> str: | |
| images = convert_from_path( | |
| str(pdf_path), | |
| dpi=dpi, | |
| first_page=page_number_1based, | |
| last_page=page_number_1based, | |
| fmt="png", | |
| thread_count=2, | |
| ) | |
| img = images[0] | |
| img = ocr_image_pil(img) | |
| return pytesseract.image_to_string(img, lang=lang) | |
| def pdf_to_text_smart(pdf_path: Path, | |
| native_min_chars_per_page: int = NATIVE_MIN_CHARS_PER_PAGE) -> str: | |
| doc = fitz.open(str(pdf_path)) | |
| parts = [] | |
| for i in tqdm(range(doc.page_count), desc="Extracting pages"): | |
| page = doc.load_page(i) | |
| native = (page.get_text("text") or "").strip() | |
| native_compact_len = len(re.sub(r"\s+", "", native)) | |
| if native_compact_len >= native_min_chars_per_page: | |
| parts.append(native) | |
| else: | |
| ocr = ocr_pdf_page(pdf_path, page_number_1based=i+1) | |
| parts.append(ocr) | |
| doc.close() | |
| return normalize_text("\n\n".join(parts)) | |
| def ensure_txt(input_path: Path) -> Path: | |
| out_txt = OUTPUT_DIR / f"{input_path.stem}.txt" | |
| suf = input_path.suffix.lower() | |
| if suf == ".txt": | |
| raw = input_path.read_text(encoding="utf-8", errors="ignore") | |
| out_txt.write_text(normalize_text(raw), encoding="utf-8") | |
| return out_txt | |
| if suf == ".pdf": | |
| text = pdf_to_text_smart(input_path) | |
| out_txt.write_text(text, encoding="utf-8") | |
| return out_txt | |
| raise ValueError("Unsupported type. Upload .pdf or .txt only.") | |
| BOOK_TXT_PATH = ensure_txt(INPUT_PATH) | |
| BOOK_TEXT = BOOK_TXT_PATH.read_text(encoding="utf-8", errors="ignore") | |
| print("Saved TXT:", BOOK_TXT_PATH) | |
| print("Chars:", len(BOOK_TEXT)) | |
| print("Head preview:\n", BOOK_TEXT[:800]) | |
| # FIX Pillow broken install (PIL._typing/_Ink issue) | |
| !pip -q uninstall -y Pillow pillow-simd | |
| !pip -q install --no-cache-dir --force-reinstall "Pillow==10.4.0" | |
| import PIL, sys | |
| print("Pillow version:", PIL.__version__) | |
| print("Python:", sys.version) | |
| # ========================= | |
| # Cell 5 — Load tokenizer + model (from Hugging Face) | |
| # ========================= | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True) | |
| model = AutoModelForSeq2SeqLM.from_pretrained(MODEL_NAME).to(device) | |
| if device == "cuda": | |
| try: | |
| model.half() | |
| except Exception: | |
| pass | |
| torch.set_grad_enabled(False) | |
| print("Model loaded:", MODEL_NAME) | |
| # ========================= | |
| # Cell 6 — Chapter splitting + token-aware chunking | |
| # ========================= | |
| def split_into_chapters(text: str): | |
| """ | |
| Best effort chapter split: | |
| - Detect lines that look like: CHAPTER 1 / Chapter One / CHAPTER ONE etc. | |
| - If not found, return one chapter = full text. | |
| """ | |
| text = normalize_text(text) | |
| lines = text.splitlines() | |
| chapter_re = re.compile(r"^\s*(chapter|CHAPTER)\s+([0-9]+|[IVXLC]+|[A-Za-z]+)\b.*$", re.IGNORECASE) | |
| idxs = [] | |
| titles = [] | |
| for i, ln in enumerate(lines): | |
| if chapter_re.match(ln.strip()): | |
| idxs.append(i) | |
| titles.append(ln.strip()) | |
| if len(idxs) < 2: | |
| return [("BOOK", text)] | |
| chapters = [] | |
| for k in range(len(idxs)): | |
| start = idxs[k] | |
| end = idxs[k+1] if k+1 < len(idxs) else len(lines) | |
| title = titles[k] | |
| body = "\n".join(lines[start:end]).strip() | |
| chapters.append((title, body)) | |
| return chapters | |
| def split_sentences(paragraph: str): | |
| paragraph = paragraph.strip() | |
| if not paragraph: | |
| return [] | |
| if not any(ch in paragraph for ch in ".!?\u061F\u06D4\u061B…"): | |
| ls = [ln.strip() for ln in paragraph.split("\n") if ln.strip()] | |
| return ls if ls else [paragraph] | |
| return [s.strip() for s in _SENT_BOUNDARY_RE.split(paragraph) if s.strip()] | |
| def iter_paragraphs(text: str): | |
| for p in re.split(r"\n\s*\n+", text): | |
| p = p.strip() | |
| if p: | |
| yield p | |
| def tok_len(s: str) -> int: | |
| return len(tokenizer.encode(s, add_special_tokens=False)) | |
| def split_by_tokens(s: str, max_len: int, overlap_tokens: int = 64): | |
| ids = tokenizer.encode(s, add_special_tokens=False) | |
| if len(ids) <= max_len: | |
| return [s.strip()] | |
| overlap_tokens = max(0, min(overlap_tokens, max_len // 3)) | |
| step = max(1, max_len - overlap_tokens) | |
| parts = [] | |
| for i in range(0, len(ids), step): | |
| chunk_ids = ids[i:i+max_len] | |
| if not chunk_ids: | |
| continue | |
| t = tokenizer.decode(chunk_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True).strip() | |
| if t: | |
| parts.append(t) | |
| return parts | |
| def chunk_text(text: str, max_input_tokens: int = EFFECTIVE_MAX_INPUT, overlap_sentences: int = OVERLAP_SENTENCES): | |
| """ | |
| Professional chunking: | |
| - pack sentences under token limit | |
| - add sentence overlap between chunks for continuity | |
| - if a single sentence is too long => token-split it | |
| """ | |
| text = normalize_text(text) | |
| if not text: | |
| return [] | |
| chunks = [] | |
| cur_sents, cur_tok = [], 0 | |
| def flush(): | |
| nonlocal cur_sents, cur_tok | |
| if cur_sents: | |
| ch = " ".join(cur_sents).strip() | |
| if ch: | |
| chunks.append(ch) | |
| cur_sents, cur_tok = [], 0 | |
| for para in iter_paragraphs(text): | |
| for sent in split_sentences(para): | |
| st = sent.strip() | |
| if not st: | |
| continue | |
| st_tok = tok_len(st) | |
| if st_tok > max_input_tokens: | |
| flush() | |
| chunks.extend(split_by_tokens(st, max_len=max_input_tokens, overlap_tokens=64)) | |
| continue | |
| if cur_tok + st_tok <= max_input_tokens: | |
| cur_sents.append(st) | |
| cur_tok += st_tok | |
| else: | |
| prev = cur_sents[:] | |
| flush() | |
| overlap = prev[-overlap_sentences:] if overlap_sentences and prev else [] | |
| cur_sents = overlap + [st] | |
| cur_tok = tok_len(" ".join(cur_sents)) | |
| flush() | |
| return chunks | |
| # ========================= | |
| # Cell 7 — Summarization helpers (map -> reduce) + "organized big summary" | |
| # ========================= | |
| def generate_summaries(texts, min_new_tokens, max_new_tokens, batch_size=BATCH_SIZE): | |
| outs = [] | |
| for i in range(0, len(texts), batch_size): | |
| batch = texts[i:i+batch_size] | |
| enc = tokenizer( | |
| batch, return_tensors="pt", | |
| truncation=True, padding=True, | |
| max_length=EFFECTIVE_MAX_INPUT | |
| ).to(device) | |
| try: | |
| gen = model.generate( | |
| **enc, | |
| num_beams=NUM_BEAMS, | |
| no_repeat_ngram_size=NO_REPEAT_NGRAM_SIZE, | |
| min_new_tokens=min_new_tokens, | |
| max_new_tokens=max_new_tokens, | |
| early_stopping=EARLY_STOPPING, | |
| ) | |
| except TypeError: | |
| # fallback for older transformers | |
| gen = model.generate( | |
| **enc, | |
| num_beams=NUM_BEAMS, | |
| no_repeat_ngram_size=NO_REPEAT_NGRAM_SIZE, | |
| min_length=min_new_tokens, | |
| max_length=max_new_tokens, | |
| early_stopping=EARLY_STOPPING, | |
| ) | |
| decoded = tokenizer.batch_decode(gen, skip_special_tokens=True, clean_up_tokenization_spaces=True) | |
| outs.extend([d.strip() for d in decoded]) | |
| return outs | |
| def summarize_long_text(text: str, min_new: int, max_new: int): | |
| """ | |
| Summarize very long text reliably: | |
| - chunk -> summarize each chunk | |
| - if multiple chunk summaries, reduce them into one (still ordered) | |
| """ | |
| chunks = chunk_text(text) | |
| if not chunks: | |
| return "" | |
| # summarize chunks | |
| chunk_summaries = [] | |
| for ch in chunks: | |
| tlen = tok_len(ch) | |
| # dynamic summary size per chunk (keeps it detailed) | |
| dyn_max = int(min(max_new, max(min_new, round(tlen * 0.18)))) | |
| dyn_min = max(30, min(min_new, dyn_max - 10)) | |
| chunk_summaries.append(generate_summaries([ch], dyn_min, dyn_max, batch_size=1)[0]) | |
| if len(chunk_summaries) == 1: | |
| return chunk_summaries[0] | |
| # reduce in groups (keeps order) | |
| current = chunk_summaries | |
| for _ in range(6): | |
| combined = "\n".join([f"Part {i+1}: {t}" for i, t in enumerate(current)]) | |
| if tok_len(combined) <= EFFECTIVE_MAX_INPUT: | |
| return generate_summaries([combined], min_new, max_new, batch_size=1)[0] | |
| # too long -> chunk combined summaries and summarize each chunk | |
| sub_chunks = chunk_text(combined, overlap_sentences=1) | |
| current = generate_summaries( | |
| sub_chunks, | |
| min_new_tokens=max(60, min_new // 2), | |
| max_new_tokens=max(180, max_new // 2), | |
| batch_size=BATCH_SIZE | |
| ) | |
| return "\n".join(current).strip() | |
| def make_big_book_summary(chapter_summaries, parts=BOOK_PARTS): | |
| """ | |
| Organized "big" summary: | |
| - group chapter summaries into N parts | |
| - summarize each group into a longer part-summary | |
| - output stays structured and chronological | |
| """ | |
| chap_summaries = [s for s in chapter_summaries if s.strip()] | |
| if not chap_summaries: | |
| return [] | |
| n = len(chap_summaries) | |
| group_size = max(1, ceil(n / parts)) | |
| groups = [chap_summaries[i:i+group_size] for i in range(0, n, group_size)] | |
| part_summaries = [] | |
| for gi, g in enumerate(tqdm(groups, desc="Building big organized summary")): | |
| combined = "\n".join([f"ChapterSummary {gi+1}.{i+1}: {t}" for i, t in enumerate(g)]) | |
| ps = summarize_long_text(combined, min_new=220, max_new=520) | |
| part_summaries.append(ps.strip()) | |
| return part_summaries | |
| # ========================= | |
| # Cell 8 — RUN: chapter summaries + big organized summary + save all outputs | |
| # ========================= | |
| chapters = split_into_chapters(BOOK_TEXT) | |
| print("Detected chapters:", len(chapters)) | |
| print("First chapter title:", chapters[0][0]) | |
| # Save chapters as separate txt files (for debugging) | |
| chapters_dir = OUTPUT_DIR / f"{BOOK_TXT_PATH.stem}_chapters" | |
| chapters_dir.mkdir(parents=True, exist_ok=True) | |
| chapter_summaries = [] | |
| chapter_meta = [] | |
| for idx, (title, body) in enumerate(tqdm(chapters, desc="Summarizing chapters")): | |
| safe_title = re.sub(r"[^A-Za-z0-9 _-]+", "", title)[:80].strip().replace(" ", "_") | |
| ch_txt_path = chapters_dir / f"{idx+1:03d}_{safe_title or 'CHAPTER'}.txt" | |
| ch_txt_path.write_text(body, encoding="utf-8") | |
| # chapter summary (detailed) | |
| # (إذا الفصل طويل جدًا summarize_long_text هيعمل chunking داخليًا) | |
| summary = summarize_long_text( | |
| body, | |
| min_new=CHAPTER_MIN_NEW_TOKENS_FLOOR, | |
| max_new=CHAPTER_MAX_NEW_TOKENS_CAP | |
| ) | |
| chapter_summaries.append(summary) | |
| chapter_meta.append({"index": idx+1, "title": title, "txt_path": str(ch_txt_path)}) | |
| # 1) Save per-chapter summaries (organized) | |
| chapter_summaries_path = OUTPUT_DIR / f"{BOOK_TXT_PATH.stem}.chapter_summaries.txt" | |
| with chapter_summaries_path.open("w", encoding="utf-8") as f: | |
| for i, (meta, summ) in enumerate(zip(chapter_meta, chapter_summaries), start=1): | |
| f.write(f"===== CHAPTER {i}: {meta['title']} =====\n") | |
| f.write(summ.strip() + "\n\n") | |
| # 2) Save "big organized book summary" (multi-part, محترم وكبير) | |
| big_parts = make_big_book_summary(chapter_summaries, parts=BOOK_PARTS) | |
| big_summary_path = OUTPUT_DIR / f"{BOOK_TXT_PATH.stem}.BIG_book_summary_parts.txt" | |
| big_summary_path.write_text( | |
| "\n\n".join([f"=== BOOK SUMMARY PART {i+1} ===\n{p}" for i, p in enumerate(big_parts)]), | |
| encoding="utf-8" | |
| ) | |
| # 3) Also save a single-file "full" summary by concatenating chapter summaries (very long, but super clear) | |
| full_concat_path = OUTPUT_DIR / f"{BOOK_TXT_PATH.stem}.FULL_chapter_summaries_concat.txt" | |
| full_concat_path.write_text("\n\n".join(chapter_summaries), encoding="utf-8") | |
| # 4) Metadata | |
| meta_path = OUTPUT_DIR / f"{BOOK_TXT_PATH.stem}.meta.json" | |
| meta_path.write_text(json.dumps({ | |
| "input_file": str(INPUT_PATH), | |
| "book_txt": str(BOOK_TXT_PATH), | |
| "model": MODEL_NAME, | |
| "device": device, | |
| "chapters_detected": len(chapters), | |
| "chapter_files_dir": str(chapters_dir), | |
| "outputs": { | |
| "chapter_summaries": str(chapter_summaries_path), | |
| "big_book_summary_parts": str(big_summary_path), | |
| "full_concat": str(full_concat_path), | |
| } | |
| }, ensure_ascii=False, indent=2), encoding="utf-8") | |
| print("\nSaved outputs:") | |
| print(" - Chapter summaries:", chapter_summaries_path) | |
| print(" - BIG organized parts:", big_summary_path) | |
| print(" - FULL concat:", full_concat_path) | |
| print(" - Meta:", meta_path) | |
| print("\nPreview BIG summary part 1:\n") | |
| print(big_parts[0][:1500] if big_parts else "N/A") | |
| # ========================= | |
| # Cell 9 — Save model + zip outputs + download | |
| # ========================= | |
| saved_model_dir = OUTPUT_DIR / "saved_model_bart_large_cnn" | |
| saved_model_dir.mkdir(parents=True, exist_ok=True) | |
| model.save_pretrained(saved_model_dir) | |
| tokenizer.save_pretrained(saved_model_dir) | |
| print("Model saved to:", saved_model_dir) | |
| zip_path = Path("/content/litvision_output.zip") | |
| !zip -qr "{zip_path}" "{OUTPUT_DIR}" | |
| print("Zipped to:", zip_path) | |
| from google.colab import files | |
| files.download(str(zip_path)) | |