| | """ |
| | SPARKNET Backend API - GPU-Accelerated Document Processing |
| | |
| | This FastAPI service runs on a GPU server (e.g., Lytos) and provides: |
| | - Document processing with PaddleOCR |
| | - Layout detection |
| | - RAG indexing and querying |
| | - Embedding generation |
| | - LLM inference via Ollama |
| | |
| | Deploy this on your GPU server and connect Streamlit Cloud to it. |
| | """ |
| |
|
| | from fastapi import FastAPI, HTTPException, UploadFile, File, Form, BackgroundTasks |
| | from fastapi.middleware.cors import CORSMiddleware |
| | from pydantic import BaseModel, Field |
| | from typing import Optional, List, Dict, Any |
| | import hashlib |
| | import tempfile |
| | import os |
| | import sys |
| | from pathlib import Path |
| | from datetime import datetime |
| | import asyncio |
| |
|
| | |
| | PROJECT_ROOT = Path(__file__).parent.parent |
| | sys.path.insert(0, str(PROJECT_ROOT)) |
| |
|
| | app = FastAPI( |
| | title="SPARKNET Backend API", |
| | description="GPU-accelerated document processing for Technology Transfer Office automation", |
| | version="1.0.0", |
| | docs_url="/docs", |
| | redoc_url="/redoc", |
| | ) |
| |
|
| | |
| | app.add_middleware( |
| | CORSMiddleware, |
| | allow_origins=["*"], |
| | allow_credentials=True, |
| | allow_methods=["*"], |
| | allow_headers=["*"], |
| | ) |
| |
|
| | |
| | |
| | |
| |
|
| | class HealthResponse(BaseModel): |
| | status: str |
| | timestamp: str |
| | version: str = "1.0.0" |
| |
|
| |
|
| | class SystemStatus(BaseModel): |
| | ollama_available: bool |
| | ollama_models: List[str] = [] |
| | gpu_available: bool = False |
| | gpu_name: Optional[str] = None |
| | rag_ready: bool = False |
| | indexed_chunks: int = 0 |
| | embedding_model: Optional[str] = None |
| | llm_model: Optional[str] = None |
| |
|
| |
|
| | class ProcessRequest(BaseModel): |
| | filename: str |
| | options: Dict[str, Any] = Field(default_factory=dict) |
| |
|
| |
|
| | class ProcessResponse(BaseModel): |
| | success: bool |
| | doc_id: str |
| | filename: str |
| | raw_text: str = "" |
| | chunks: List[Dict[str, Any]] = [] |
| | page_count: int = 0 |
| | ocr_regions: List[Dict[str, Any]] = [] |
| | layout_regions: List[Dict[str, Any]] = [] |
| | ocr_confidence: float = 0.0 |
| | layout_confidence: float = 0.0 |
| | processing_time: float = 0.0 |
| | error: Optional[str] = None |
| |
|
| |
|
| | class IndexRequest(BaseModel): |
| | doc_id: str |
| | text: str |
| | chunks: List[Dict[str, Any]] = [] |
| | metadata: Dict[str, Any] = Field(default_factory=dict) |
| |
|
| |
|
| | class IndexResponse(BaseModel): |
| | success: bool |
| | doc_id: str |
| | num_chunks: int = 0 |
| | error: Optional[str] = None |
| |
|
| |
|
| | class QueryRequest(BaseModel): |
| | question: str |
| | filters: Optional[Dict[str, Any]] = None |
| | top_k: int = 5 |
| |
|
| |
|
| | class QueryResponse(BaseModel): |
| | success: bool |
| | answer: str = "" |
| | sources: List[Dict[str, Any]] = [] |
| | confidence: float = 0.0 |
| | latency_ms: float = 0.0 |
| | validated: bool = False |
| | error: Optional[str] = None |
| |
|
| |
|
| | class SearchRequest(BaseModel): |
| | query: str |
| | top_k: int = 5 |
| | doc_filter: Optional[str] = None |
| |
|
| |
|
| | class DocumentInfo(BaseModel): |
| | doc_id: str |
| | filename: str = "" |
| | chunk_count: int = 0 |
| | indexed_at: Optional[str] = None |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | _rag_system = None |
| | _processing_queue = {} |
| |
|
| |
|
| | def get_rag_system(): |
| | """Initialize and return the RAG system.""" |
| | global _rag_system |
| |
|
| | if _rag_system is not None: |
| | return _rag_system |
| |
|
| | try: |
| | from src.rag.agentic import AgenticRAG, RAGConfig |
| | from src.rag.store import get_vector_store, VectorStoreConfig, reset_vector_store |
| | from src.rag.embeddings import get_embedding_adapter, EmbeddingConfig, reset_embedding_adapter |
| |
|
| | |
| | ollama_ok, models = check_ollama_sync() |
| | if not ollama_ok: |
| | return None |
| |
|
| | |
| | EMBEDDING_MODELS = ["nomic-embed-text", "mxbai-embed-large:latest", "mxbai-embed-large"] |
| | LLM_MODELS = ["llama3.2:latest", "llama3.1:8b", "mistral:latest", "qwen2.5:14b"] |
| |
|
| | embed_model = next((m for m in EMBEDDING_MODELS if m in models), EMBEDDING_MODELS[0]) |
| | llm_model = next((m for m in LLM_MODELS if m in models), LLM_MODELS[0]) |
| |
|
| | |
| | reset_vector_store() |
| | reset_embedding_adapter() |
| |
|
| | |
| | embed_config = EmbeddingConfig( |
| | ollama_model=embed_model, |
| | ollama_base_url="http://localhost:11434", |
| | ) |
| | embedder = get_embedding_adapter(config=embed_config) |
| |
|
| | |
| | store_config = VectorStoreConfig( |
| | persist_directory="data/sparknet_unified_rag", |
| | collection_name="sparknet_documents", |
| | similarity_threshold=0.0, |
| | ) |
| | store = get_vector_store(config=store_config) |
| |
|
| | |
| | rag_config = RAGConfig( |
| | model=llm_model, |
| | base_url="http://localhost:11434", |
| | max_revision_attempts=1, |
| | enable_query_planning=True, |
| | enable_reranking=True, |
| | enable_validation=True, |
| | retrieval_top_k=10, |
| | final_top_k=5, |
| | min_confidence=0.3, |
| | verbose=False, |
| | ) |
| |
|
| | |
| | rag = AgenticRAG( |
| | config=rag_config, |
| | vector_store=store, |
| | embedding_adapter=embedder, |
| | ) |
| |
|
| | _rag_system = { |
| | "rag": rag, |
| | "store": store, |
| | "embedder": embedder, |
| | "embed_model": embed_model, |
| | "llm_model": llm_model, |
| | } |
| |
|
| | return _rag_system |
| |
|
| | except Exception as e: |
| | print(f"RAG init error: {e}") |
| | return None |
| |
|
| |
|
| | def check_ollama_sync(): |
| | """Check Ollama availability synchronously.""" |
| | try: |
| | import httpx |
| | with httpx.Client(timeout=3.0) as client: |
| | resp = client.get("http://localhost:11434/api/tags") |
| | if resp.status_code == 200: |
| | models = [m["name"] for m in resp.json().get("models", [])] |
| | return True, models |
| | except: |
| | pass |
| | return False, [] |
| |
|
| |
|
| | def check_gpu(): |
| | """Check GPU availability.""" |
| | try: |
| | import torch |
| | if torch.cuda.is_available(): |
| | return True, torch.cuda.get_device_name(0) |
| | except: |
| | pass |
| | return False, None |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @app.get("/", response_model=HealthResponse) |
| | async def root(): |
| | """Health check endpoint.""" |
| | return HealthResponse( |
| | status="healthy", |
| | timestamp=datetime.now().isoformat(), |
| | ) |
| |
|
| |
|
| | @app.get("/api/health", response_model=HealthResponse) |
| | async def health(): |
| | """Health check endpoint.""" |
| | return HealthResponse( |
| | status="healthy", |
| | timestamp=datetime.now().isoformat(), |
| | ) |
| |
|
| |
|
| | @app.get("/api/status", response_model=SystemStatus) |
| | async def get_status(): |
| | """Get system status including Ollama, GPU, and RAG availability.""" |
| | ollama_ok, models = check_ollama_sync() |
| | gpu_ok, gpu_name = check_gpu() |
| |
|
| | rag = get_rag_system() |
| | rag_ready = rag is not None |
| |
|
| | indexed_chunks = 0 |
| | embed_model = None |
| | llm_model = None |
| |
|
| | if rag: |
| | try: |
| | indexed_chunks = rag["store"].count() |
| | embed_model = rag.get("embed_model") |
| | llm_model = rag.get("llm_model") |
| | except: |
| | pass |
| |
|
| | return SystemStatus( |
| | ollama_available=ollama_ok, |
| | ollama_models=models, |
| | gpu_available=gpu_ok, |
| | gpu_name=gpu_name, |
| | rag_ready=rag_ready, |
| | indexed_chunks=indexed_chunks, |
| | embedding_model=embed_model, |
| | llm_model=llm_model, |
| | ) |
| |
|
| |
|
| | @app.post("/api/process", response_model=ProcessResponse) |
| | async def process_document( |
| | file: UploadFile = File(...), |
| | ocr_engine: str = Form(default="paddleocr"), |
| | max_pages: int = Form(default=10), |
| | enable_layout: bool = Form(default=True), |
| | preserve_tables: bool = Form(default=True), |
| | ): |
| | """ |
| | Process a document with OCR and layout detection. |
| | |
| | This endpoint uses GPU-accelerated PaddleOCR for text extraction. |
| | """ |
| | import time |
| | start_time = time.time() |
| |
|
| | |
| | file_bytes = await file.read() |
| | filename = file.filename |
| |
|
| | |
| | content_hash = hashlib.md5(file_bytes[:1000]).hexdigest()[:8] |
| | timestamp = datetime.now().strftime("%Y%m%d%H%M%S") |
| | doc_id = hashlib.md5(f"{filename}_{timestamp}_{content_hash}".encode()).hexdigest()[:12] |
| |
|
| | |
| | suffix = Path(filename).suffix |
| | with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp: |
| | tmp.write(file_bytes) |
| | tmp_path = tmp.name |
| |
|
| | try: |
| | |
| | try: |
| | from src.document.pipeline.processor import DocumentProcessor, PipelineConfig |
| | from src.document.ocr import OCRConfig |
| | from src.document.layout import LayoutConfig |
| | from src.document.chunking.chunker import ChunkerConfig |
| |
|
| | chunker_config = ChunkerConfig( |
| | preserve_table_structure=preserve_tables, |
| | detect_table_headers=True, |
| | chunk_tables=True, |
| | chunk_figures=True, |
| | include_captions=True, |
| | ) |
| |
|
| | layout_config = LayoutConfig( |
| | method="rule_based", |
| | detect_tables=True, |
| | detect_figures=True, |
| | detect_headers=True, |
| | detect_titles=True, |
| | detect_lists=True, |
| | min_confidence=0.3, |
| | heading_font_ratio=1.1, |
| | ) |
| |
|
| | config = PipelineConfig( |
| | ocr=OCRConfig(engine=ocr_engine), |
| | layout=layout_config, |
| | chunking=chunker_config, |
| | max_pages=max_pages, |
| | include_ocr_regions=True, |
| | include_layout_regions=enable_layout, |
| | generate_full_text=True, |
| | ) |
| |
|
| | processor = DocumentProcessor(config) |
| | processor.initialize() |
| | result = processor.process(tmp_path) |
| |
|
| | |
| | chunks_list = [] |
| | for chunk in result.chunks: |
| | chunks_list.append({ |
| | "chunk_id": chunk.chunk_id, |
| | "text": chunk.text, |
| | "page": chunk.page, |
| | "chunk_type": chunk.chunk_type.value, |
| | "confidence": chunk.confidence, |
| | "bbox": chunk.bbox.to_xyxy() if chunk.bbox else None, |
| | }) |
| |
|
| | ocr_regions = [] |
| | for region in result.ocr_regions: |
| | ocr_regions.append({ |
| | "text": region.text, |
| | "confidence": region.confidence, |
| | "page": region.page, |
| | "bbox": region.bbox.to_xyxy() if region.bbox else None, |
| | }) |
| |
|
| | layout_regions = [] |
| | for region in result.layout_regions: |
| | layout_regions.append({ |
| | "id": region.id, |
| | "type": region.type.value, |
| | "confidence": region.confidence, |
| | "page": region.page, |
| | "bbox": region.bbox.to_xyxy() if region.bbox else None, |
| | }) |
| |
|
| | processing_time = time.time() - start_time |
| |
|
| | return ProcessResponse( |
| | success=True, |
| | doc_id=doc_id, |
| | filename=filename, |
| | raw_text=result.full_text, |
| | chunks=chunks_list, |
| | page_count=result.metadata.num_pages, |
| | ocr_regions=ocr_regions, |
| | layout_regions=layout_regions, |
| | ocr_confidence=result.metadata.ocr_confidence_avg or 0.0, |
| | layout_confidence=result.metadata.layout_confidence_avg or 0.0, |
| | processing_time=processing_time, |
| | ) |
| |
|
| | except Exception as e: |
| | |
| | return await process_document_fallback(file_bytes, filename, doc_id, max_pages, str(e), start_time) |
| |
|
| | finally: |
| | |
| | if os.path.exists(tmp_path): |
| | os.unlink(tmp_path) |
| |
|
| |
|
| | async def process_document_fallback( |
| | file_bytes: bytes, |
| | filename: str, |
| | doc_id: str, |
| | max_pages: int, |
| | reason: str, |
| | start_time: float |
| | ) -> ProcessResponse: |
| | """Fallback document processing using PyMuPDF.""" |
| | import time |
| |
|
| | text = "" |
| | page_count = 1 |
| | suffix = Path(filename).suffix.lower() |
| |
|
| | if suffix == ".pdf": |
| | try: |
| | import fitz |
| | import io |
| | pdf_stream = io.BytesIO(file_bytes) |
| | doc = fitz.open(stream=pdf_stream, filetype="pdf") |
| | page_count = len(doc) |
| | max_p = min(max_pages, page_count) |
| |
|
| | text_parts = [] |
| | for page_num in range(max_p): |
| | page = doc[page_num] |
| | text_parts.append(f"--- Page {page_num + 1} ---\n{page.get_text()}") |
| | text = "\n\n".join(text_parts) |
| | doc.close() |
| | except Exception as e: |
| | text = f"PDF extraction failed: {e}" |
| | elif suffix in [".txt", ".md"]: |
| | try: |
| | text = file_bytes.decode("utf-8") |
| | except: |
| | text = file_bytes.decode("latin-1", errors="ignore") |
| | else: |
| | text = f"Unsupported file type: {suffix}" |
| |
|
| | |
| | chunk_size = 500 |
| | overlap = 50 |
| | chunks = [] |
| |
|
| | for i in range(0, len(text), chunk_size - overlap): |
| | chunk_text = text[i:i + chunk_size] |
| | if len(chunk_text.strip()) > 20: |
| | chunks.append({ |
| | "chunk_id": f"{doc_id}_chunk_{len(chunks)}", |
| | "text": chunk_text, |
| | "page": 0, |
| | "chunk_type": "text", |
| | "confidence": 0.9, |
| | "bbox": None, |
| | }) |
| |
|
| | processing_time = time.time() - start_time |
| |
|
| | return ProcessResponse( |
| | success=True, |
| | doc_id=doc_id, |
| | filename=filename, |
| | raw_text=text, |
| | chunks=chunks, |
| | page_count=page_count, |
| | ocr_regions=[], |
| | layout_regions=[], |
| | ocr_confidence=0.9, |
| | layout_confidence=0.0, |
| | processing_time=processing_time, |
| | error=f"Fallback mode: {reason}", |
| | ) |
| |
|
| |
|
| | @app.post("/api/index", response_model=IndexResponse) |
| | async def index_document(request: IndexRequest): |
| | """Index a document into the RAG vector store.""" |
| | rag = get_rag_system() |
| |
|
| | if not rag: |
| | return IndexResponse( |
| | success=False, |
| | doc_id=request.doc_id, |
| | error="RAG system not available. Check Ollama status.", |
| | ) |
| |
|
| | try: |
| | store = rag["store"] |
| | embedder = rag["embedder"] |
| |
|
| | chunk_dicts = [] |
| | embeddings = [] |
| |
|
| | for i, chunk in enumerate(request.chunks): |
| | chunk_text = chunk.get("text", "") if isinstance(chunk, dict) else str(chunk) |
| |
|
| | if len(chunk_text.strip()) < 20: |
| | continue |
| |
|
| | chunk_id = chunk.get("chunk_id", f"{request.doc_id}_chunk_{i}") |
| | chunk_dict = { |
| | "chunk_id": chunk_id, |
| | "document_id": request.doc_id, |
| | "text": chunk_text, |
| | "page": chunk.get("page", 0) if isinstance(chunk, dict) else 0, |
| | "chunk_type": "text", |
| | "source_path": request.metadata.get("filename", ""), |
| | "sequence_index": i, |
| | } |
| | chunk_dicts.append(chunk_dict) |
| |
|
| | embedding = embedder.embed_text(chunk_text) |
| | embeddings.append(embedding) |
| |
|
| | if not chunk_dicts: |
| | return IndexResponse( |
| | success=False, |
| | doc_id=request.doc_id, |
| | error="No valid chunks to index", |
| | ) |
| |
|
| | store.add_chunks(chunk_dicts, embeddings) |
| |
|
| | return IndexResponse( |
| | success=True, |
| | doc_id=request.doc_id, |
| | num_chunks=len(chunk_dicts), |
| | ) |
| |
|
| | except Exception as e: |
| | return IndexResponse( |
| | success=False, |
| | doc_id=request.doc_id, |
| | error=str(e), |
| | ) |
| |
|
| |
|
| | @app.post("/api/query", response_model=QueryResponse) |
| | async def query_rag(request: QueryRequest): |
| | """Query the RAG system.""" |
| | import time |
| | start_time = time.time() |
| |
|
| | rag = get_rag_system() |
| |
|
| | if not rag: |
| | return QueryResponse( |
| | success=False, |
| | error="RAG system not available. Check Ollama status.", |
| | ) |
| |
|
| | try: |
| | response = rag["rag"].query(request.question, filters=request.filters) |
| | latency_ms = (time.time() - start_time) * 1000 |
| |
|
| | sources = [] |
| | if hasattr(response, 'citations') and response.citations: |
| | for cite in response.citations: |
| | sources.append({ |
| | "index": cite.index if hasattr(cite, 'index') else 0, |
| | "text_snippet": cite.text_snippet if hasattr(cite, 'text_snippet') else str(cite), |
| | "relevance_score": cite.relevance_score if hasattr(cite, 'relevance_score') else 0.0, |
| | "document_id": cite.document_id if hasattr(cite, 'document_id') else "", |
| | "page": cite.page if hasattr(cite, 'page') else 0, |
| | }) |
| |
|
| | return QueryResponse( |
| | success=True, |
| | answer=response.answer, |
| | sources=sources, |
| | confidence=response.confidence, |
| | latency_ms=latency_ms, |
| | validated=response.validated, |
| | ) |
| |
|
| | except Exception as e: |
| | return QueryResponse( |
| | success=False, |
| | error=str(e), |
| | ) |
| |
|
| |
|
| | @app.post("/api/search") |
| | async def search_similar(request: SearchRequest): |
| | """Search for similar chunks.""" |
| | rag = get_rag_system() |
| |
|
| | if not rag: |
| | return {"success": False, "error": "RAG system not available", "results": []} |
| |
|
| | try: |
| | embedder = rag["embedder"] |
| | store = rag["store"] |
| |
|
| | query_embedding = embedder.embed_text(request.query) |
| |
|
| | filters = None |
| | if request.doc_filter: |
| | filters = {"document_id": request.doc_filter} |
| |
|
| | results = store.search( |
| | query_embedding=query_embedding, |
| | top_k=request.top_k, |
| | filters=filters, |
| | ) |
| |
|
| | return { |
| | "success": True, |
| | "results": [ |
| | { |
| | "chunk_id": r.chunk_id, |
| | "document_id": r.document_id, |
| | "text": r.text, |
| | "similarity": r.similarity, |
| | "page": r.page, |
| | "metadata": r.metadata, |
| | } |
| | for r in results |
| | ] |
| | } |
| |
|
| | except Exception as e: |
| | return {"success": False, "error": str(e), "results": []} |
| |
|
| |
|
| | @app.get("/api/documents", response_model=List[DocumentInfo]) |
| | async def list_documents(): |
| | """List all indexed documents.""" |
| | rag = get_rag_system() |
| |
|
| | if not rag: |
| | return [] |
| |
|
| | try: |
| | store = rag["store"] |
| | collection = store._collection |
| |
|
| | results = collection.get(include=["metadatas"]) |
| | if not results or not results.get("metadatas"): |
| | return [] |
| |
|
| | doc_info = {} |
| | for meta in results["metadatas"]: |
| | doc_id = meta.get("document_id", "unknown") |
| | if doc_id not in doc_info: |
| | doc_info[doc_id] = { |
| | "doc_id": doc_id, |
| | "filename": meta.get("source_path", ""), |
| | "chunk_count": 0, |
| | } |
| | doc_info[doc_id]["chunk_count"] += 1 |
| |
|
| | return [DocumentInfo(**info) for info in doc_info.values()] |
| |
|
| | except Exception as e: |
| | return [] |
| |
|
| |
|
| | @app.delete("/api/documents/{doc_id}") |
| | async def delete_document(doc_id: str): |
| | """Delete a document from the index.""" |
| | rag = get_rag_system() |
| |
|
| | if not rag: |
| | return {"success": False, "error": "RAG system not available"} |
| |
|
| | try: |
| | store = rag["store"] |
| | collection = store._collection |
| |
|
| | |
| | results = collection.get( |
| | where={"document_id": doc_id}, |
| | include=[] |
| | ) |
| |
|
| | if results and results.get("ids"): |
| | collection.delete(ids=results["ids"]) |
| | return {"success": True, "deleted_chunks": len(results["ids"])} |
| |
|
| | return {"success": False, "error": "Document not found"} |
| |
|
| | except Exception as e: |
| | return {"success": False, "error": str(e)} |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | if __name__ == "__main__": |
| | import uvicorn |
| | uvicorn.run(app, host="0.0.0.0", port=8000) |
| |
|