import os import sys from pathlib import Path import importlib import json import base64 import re import pandas as pd import plotly.express as px import streamlit as st sys.path.insert(0, os.path.abspath(os.path.dirname(__file__))) # sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from preprocessing.text_extractor import extract_text_from_file from preprocessing.clause_extraction import extract_clauses from embeddings.sbert_encoder import generate_embeddings from storage.faiss_index import create_faiss_index from analysis.similarity_search import get_similar import analysis.common_analyzer importlib.reload(analysis.common_analyzer) from analysis.common_analyzer import analyze_pair from analysis.nli_verifier import NLIVerifier from analysis.llama_legal_verifier import LlamaLegalVerifier from output.pdf_generator import generate_pdf_report from auth.user_store import authenticate_user, create_user APP_TITLE = "Legal Semantic Integrity" DEFAULT_MODEL_PATH = "TinyLlama/TinyLlama-1.1B-Chat-v1.0" PROJECT_ROOT = Path(__file__).resolve().parents[1] def init_state(): st.session_state.setdefault("is_authenticated", False) st.session_state.setdefault("username", "") st.session_state.setdefault("analysis_done", False) st.session_state.setdefault("results", []) st.session_state.setdefault("line_issues", []) st.session_state.setdefault("uploaded_name", "") st.session_state.setdefault("uploaded_ext", "") st.session_state.setdefault("uploaded_bytes", b"") def _extract_party_name(text: str, role: str) -> str: """ Try to extract a nearby party name for vendor/vendee from clause text. Falls back to role-present markers when exact name is not available. """ if not text: return "Not found" t = " ".join(str(text).split()) role_l = role.lower() # Pattern examples: # "Vendor Mr. Ravi Kumar", "Vendee: Sita Devi", "the vendor, John Doe" patterns = [ rf"\b{role_l}\b\s*[:,-]?\s*(?:mr\.?|mrs\.?|ms\.?)?\s*([A-Z][A-Za-z.\s]{{2,60}}?)(?=,|\.|;|\bson of\b|\bwife of\b|\bresiding\b|\baged\b|$)", rf"\bthe\s+{role_l}\b\s*[:,-]?\s*(?:is\s+)?(?:mr\.?|mrs\.?|ms\.?)?\s*([A-Z][A-Za-z.\s]{{2,60}}?)(?=,|\.|;|\bson of\b|\bwife of\b|\bresiding\b|\baged\b|$)", ] for pat in patterns: m = re.search(pat, t, flags=re.IGNORECASE) if m: name = " ".join(m.group(1).split()) # Filter generic captures like "hereinafter called" if name and not re.search( r"hereinafter|called|referred|party|agreement", name, re.IGNORECASE ): return name[:80] if re.search(rf"\b{role_l}\b", t, flags=re.IGNORECASE): return f"{role.title()} mentioned (name not parsed)" return "Not found" def _clean_candidate_name(name: str) -> str: name = re.sub(r"\s+", " ", str(name)).strip(" ,.;:-") if not name: return "" banned = r"hereinafter|called|referred|party|agreement|vendor|vendee|purchaser|buyer|seller" if re.search(banned, name, flags=re.IGNORECASE): return "" return name[:80] def _extract_document_parties(text_data): full_text = "\n".join(chunk.get("text", "") for chunk in (text_data or [])) compact = " ".join(full_text.split()) parties = {"Vendor": "Not found", "Vendee": "Not found"} # Common legal intro patterns: # "Mr. X ... hereinafter called the VENDOR" # "Y ... hereinafter called the VENDEE" role_patterns = { "Vendor": [ r"(Mr\.?|Mrs\.?|Ms\.?)?\s*([A-Z][A-Za-z.\s]{2,80}?)\s+(?:son of|wife of|daughter of|residing at|aged about|hereinafter)\b[^.]{0,120}\bvendor\b", r"\bvendor\b\s*[:,-]?\s*(?:is\s+)?(?:Mr\.?|Mrs\.?|Ms\.?)?\s*([A-Z][A-Za-z.\s]{2,80})(?=,|\.|;|\bson of\b|\bwife of\b|\bresiding\b|\baged\b|$)", ], "Vendee": [ r"(Mr\.?|Mrs\.?|Ms\.?)?\s*([A-Z][A-Za-z.\s]{2,80}?)\s+(?:son of|wife of|daughter of|residing at|aged about|hereinafter)\b[^.]{0,120}\bvendee\b", r"\bvendee\b\s*[:,-]?\s*(?:is\s+)?(?:Mr\.?|Mrs\.?|Ms\.?)?\s*([A-Z][A-Za-z.\s]{2,80})(?=,|\.|;|\bson of\b|\bwife of\b|\bresiding\b|\baged\b|$)", ], } for role, patterns in role_patterns.items(): for pat in patterns: m = re.search(pat, compact, flags=re.IGNORECASE) if not m: continue candidate = m.group(2) if (m.lastindex or 0) >= 2 else m.group(1) cleaned = _clean_candidate_name(candidate) if cleaned: parties[role] = cleaned break # Secondary fallback: explicit role in text without name if parties[role] == "Not found" and re.search( rf"\b{role.lower()}\b", compact, flags=re.IGNORECASE ): parties[role] = f"{role} mentioned (name not parsed)" return parties def _extract_parties(text1: str, text2: str, doc_parties=None): vendor = _extract_party_name(text1, "vendor") if vendor == "Not found": vendor = _extract_party_name(text2, "vendor") vendee = _extract_party_name(text1, "vendee") if vendee == "Not found": vendee = _extract_party_name(text2, "vendee") if doc_parties: if vendor in [ "Not found", "Vendor mentioned (name not parsed)", ] and doc_parties.get("Vendor"): vendor = doc_parties.get("Vendor") if vendee in [ "Not found", "Vendee mentioned (name not parsed)", ] and doc_parties.get("Vendee"): vendee = doc_parties.get("Vendee") return vendor, vendee @st.cache_resource def load_verifier(backend: str, llama_model_path: str): if backend == "llama": return LlamaLegalVerifier(model_path=llama_model_path) return NLIVerifier(model_name="cross-encoder/nli-distilroberta-base") def apply_theme(): st.markdown( """ """, unsafe_allow_html=True, ) def login_page(): col_intro, col_auth = st.columns([1.15, 1], gap="large") with col_intro: st.markdown( """

Legal Semantic Integrity Portal

Interactive contract diagnostics with line-level visibility and legal conflict tracing.

Step 1: Secure Login Step 2: Upload & Analyze Step 3: Error-Line Dashboard
What You Get
Duplicate clauses, legal contradictions, and exact page/line issue map.
""", unsafe_allow_html=True, ) with col_auth: st.markdown( '
Step 1 of 3: Login
', unsafe_allow_html=True ) tab_login, tab_signup = st.tabs(["Sign In", "Create Account"]) with tab_login: with st.form("login_form", clear_on_submit=False): username = st.text_input("Username") password = st.text_input("Password", type="password") submit = st.form_submit_button("Login") if submit: ok, message = authenticate_user(username, password) if ok: st.session_state.is_authenticated = True st.session_state.username = username.strip().lower() st.success(message) st.rerun() else: st.error(message) with tab_signup: with st.form("signup_form", clear_on_submit=True): new_username = st.text_input("New Username") new_password = st.text_input("New Password", type="password") confirm_password = st.text_input("Confirm Password", type="password") create_submit = st.form_submit_button("Create Account") if create_submit: if new_password != confirm_password: st.error("Passwords do not match.") else: ok, message = create_user(new_username, new_password) if ok: st.success(message) else: st.error(message) st.caption("Local accounts are saved in data/users.db") def run_analysis( uploaded_file, sensitivity: float, backend: str, llama_model_path: str ): file_ext = uploaded_file.name.split(".")[-1].lower() with st.spinner("Extracting text..."): text_data = extract_text_from_file(uploaded_file, file_ext) if not text_data: st.error("Could not extract text from this file.") return [], [] with st.spinner("Extracting clauses..."): clauses = extract_clauses(text_data) doc_parties = _extract_document_parties(text_data) if not clauses: st.warning("No valid clauses were detected.") return [], [] with st.spinner("Building semantic index..."): embeddings = generate_embeddings(clauses) index = create_faiss_index(embeddings) verifier = load_verifier(backend=backend, llama_model_path=llama_model_path) results = [] seen_pairs = set() progress = st.progress(0) total = len(embeddings) for i, emb in enumerate(embeddings): idxs, dists = get_similar(index, emb, k=5) for j, dist in zip(idxs, dists): if i >= j: continue if (i, j) in seen_pairs: continue seen_pairs.add((i, j)) similarity = 1 / (1 + dist) label, confidence, reason = analyze_pair( clauses[i]["text"], clauses[j]["text"], similarity, threshold=sensitivity, ) if not label: continue result = { "Label": label, "Confidence": float(confidence), "Reason": reason, "Clause 1": clauses[i]["text"], "Clause 2": clauses[j]["text"], "Page 1": clauses[i]["page"], "Line 1": clauses[i]["line"], "Page 2": clauses[j]["page"], "Line 2": clauses[j]["line"], "Location 1": f"Pg {clauses[i]['page']}, Ln {clauses[i]['line']}", "Location 2": f"Pg {clauses[j]['page']}, Ln {clauses[j]['line']}", } vendor_name, vendee_name = _extract_parties( result["Clause 1"], result["Clause 2"], doc_parties=doc_parties ) result["Vendor"] = vendor_name result["Vendee"] = vendee_name if backend == "llama": _, llm_conf, llm_label, llm_reason = verifier.predict( result["Clause 1"], result["Clause 2"] ) else: _, llm_conf, llm_label = verifier.predict( result["Clause 1"], result["Clause 2"] ) llm_reason = f"NLI label: {llm_label}" if llm_label == "Neutral": # Do not erase strong rule-based findings just because LLM is neutral. if result["Label"] in ["NUMERIC_INCONSISTENCY", "LEGAL_CONFLICT"]: result["Reason"] = f"{result['Reason']} | LLM neutral review" else: result["Label"] = "NO_CONFLICT" result["Reason"] = "LLM marked as neutral" elif llm_label == "Entailment": result["Label"] = "DUPLICATION" result["Reason"] = "LLM marked as entailment" elif llm_label == "Contradiction": if result["Label"] in ["CANDIDATE", "QUALIFICATION"]: result["Label"] = "LEGAL_CONFLICT" result["Reason"] = llm_reason result["Confidence"] = float(llm_conf) results.append(result) progress.progress((i + 1) / total) progress.empty() line_issues = [] for r in results: if r["Label"] == "NO_CONFLICT": continue line_issues.append( { "Issue Type": r["Label"], "Confidence": round(r["Confidence"], 4), "Page": r["Page 1"], "Line": r["Line 1"], "Snippet": r["Clause 1"][:160], "Reason": r["Reason"], "Vendor": r.get("Vendor", "Not found"), "Vendee": r.get("Vendee", "Not found"), } ) line_issues.append( { "Issue Type": r["Label"], "Confidence": round(r["Confidence"], 4), "Page": r["Page 2"], "Line": r["Line 2"], "Snippet": r["Clause 2"][:160], "Reason": r["Reason"], "Vendor": r.get("Vendor", "Not found"), "Vendee": r.get("Vendee", "Not found"), } ) line_issues.sort(key=lambda item: (item["Page"], item["Line"])) return results, line_issues def upload_page(): st.markdown( """

Upload And Scan

Drop your legal document, choose model/backend, and run full semantic integrity analysis.

""", unsafe_allow_html=True, ) st.markdown( '
Step 2 of 3: Upload Document
', unsafe_allow_html=True ) with st.sidebar: st.header("Scan Settings") scan_mode = st.radio( "Select scan mode", ( "Standard Scan (Recommended)", "Deep Search (Fuzzy)", "Strict (Duplicates Only)", ), index=0, ) if "Standard" in scan_mode: sensitivity = 0.60 elif "Deep" in scan_mode: sensitivity = 0.50 else: sensitivity = 0.85 # Locked configuration requested by user: # always use local fine-tuned Llama verifier and hide controls. model_backend = "llama" llama_model_path = DEFAULT_MODEL_PATH st.caption("Verifier backend: llama (fixed)") st.caption("Local model: merged_tinyllama_instruction (fixed)") st.markdown( f"""
Active Mode
{scan_mode.split("(")[0].strip()}
Sensitivity: {sensitivity} | Backend: {model_backend}
""", unsafe_allow_html=True, ) col_left, col_right = st.columns([1.35, 1], gap="large") with col_left: uploaded_file = st.file_uploader( "Upload a legal document", type=["pdf", "docx", "txt"], help="Supported files: PDF, DOCX, TXT", ) with col_right: st.markdown( """
Supported Inputs
PDF / DOCX / TXT
Output
Pair Findings + Error-Line Dashboard + PDF/JSON Export
""", unsafe_allow_html=True, ) if uploaded_file is None: st.info("Upload a file to continue.") return st.session_state.uploaded_name = uploaded_file.name st.session_state.uploaded_ext = uploaded_file.name.split(".")[-1].lower() st.session_state.uploaded_bytes = uploaded_file.getvalue() st.success(f"File ready: {uploaded_file.name}") if st.button("Run Full Analysis", type="primary"): try: results, line_issues = run_analysis( uploaded_file=uploaded_file, sensitivity=sensitivity, backend=model_backend, llama_model_path=llama_model_path, ) st.session_state.results = results st.session_state.line_issues = line_issues st.session_state.analysis_done = True st.rerun() except Exception as exc: st.error(f"Analysis failed: {exc}") def dashboard_page(): st.markdown( """

Interactive Findings Dashboard

Trace conflicts by issue type, confidence, and exact line location.

""", unsafe_allow_html=True, ) st.markdown( '
Step 3 of 3: Dashboard
', unsafe_allow_html=True ) results = st.session_state.results line_issues = st.session_state.line_issues if not results: st.warning("No results found.") return df = pd.DataFrame(results) df["Confidence"] = df["Confidence"].astype(float) issues_df = df[~df["Label"].isin(["NO_CONFLICT"])].copy() col1, col2, col3, col4 = st.columns(4) with col1: st.markdown( f"""
User
{st.session_state.username or "N/A"}
""", unsafe_allow_html=True, ) with col2: st.markdown( f"""
Pairs Reviewed
{len(df)}
""", unsafe_allow_html=True, ) with col3: st.markdown( f"""
Detected Issues
{len(issues_df)}
""", unsafe_allow_html=True, ) with col4: max_conf = float(df["Confidence"].max()) if not df.empty else 0.0 st.markdown( f"""
Max Confidence
{max_conf:.2f}
""", unsafe_allow_html=True, ) st.subheader("Issue Analytics Dashboard") if line_issues: line_df = pd.DataFrame(line_issues).copy() line_df["Page"] = line_df["Page"].astype(int) line_df["Line"] = line_df["Line"].astype(int) line_df["Confidence"] = line_df["Confidence"].astype(float) filter_col1, filter_col2, filter_col3 = st.columns([1.2, 1, 1], gap="large") with filter_col1: issue_types = sorted(line_df["Issue Type"].dropna().unique().tolist()) issue_sel = st.multiselect("Issue Types", issue_types, default=issue_types) with filter_col2: conf_min = st.slider("Min Confidence (analytics)", 0.0, 1.0, 0.0, 0.01) page_min, page_max = int(line_df["Page"].min()), int(line_df["Page"].max()) if page_min == page_max: st.caption(f"Single issue page: {page_min}") page_sel = (page_min, page_max) else: page_sel = st.slider( "Page Range (analytics)", page_min, page_max, (page_min, page_max) ) with filter_col3: vendors = ["All"] + sorted( line_df["Vendor"].dropna().astype(str).unique().tolist() ) vendees = ["All"] + sorted( line_df["Vendee"].dropna().astype(str).unique().tolist() ) vendor_sel = st.selectbox("Vendor", vendors, index=0) vendee_sel = st.selectbox("Vendee", vendees, index=0) filtered = line_df.copy() if issue_sel: filtered = filtered[filtered["Issue Type"].isin(issue_sel)] filtered = filtered[filtered["Confidence"] >= conf_min] filtered = filtered[ (filtered["Page"] >= page_sel[0]) & (filtered["Page"] <= page_sel[1]) ] if vendor_sel != "All": filtered = filtered[filtered["Vendor"] == vendor_sel] if vendee_sel != "All": filtered = filtered[filtered["Vendee"] == vendee_sel] total_issues = len(filtered) conflict_rate = (len(issues_df) / len(df) * 100.0) if len(df) else 0.0 top_issue = ( filtered["Issue Type"].mode().iloc[0] if not filtered.empty else "N/A" ) highest_risk_page = ( int(filtered.groupby("Page")["Confidence"].mean().idxmax()) if not filtered.empty else "N/A" ) k1, k2, k3, k4 = st.columns(4) k1.metric("Filtered Issues", total_issues) k2.metric("Conflict Rate", f"{conflict_rate:.1f}%") k3.metric("Top Issue Type", top_issue) k4.metric("Highest Risk Page", highest_risk_page) if filtered.empty: st.warning("No analytics data for current filter.") else: pie_df = filtered["Issue Type"].value_counts().reset_index() pie_df.columns = ["Issue Type", "Count"] pie_fig = px.pie( pie_df, names="Issue Type", values="Count", title="Issue Type Split", hole=0.35, ) pie_fig.update_layout(margin=dict(l=10, r=10, t=50, b=10)) st.plotly_chart(pie_fig, use_container_width=True) top_lines = filtered.sort_values(by=["Confidence"], ascending=False).head( 10 ) st.markdown("**Top 10 High-Risk Lines**") st.dataframe( top_lines[ [ "Issue Type", "Confidence", "Page", "Line", "Vendor", "Vendee", "Snippet", "Reason", ] ], use_container_width=True, ) else: st.info("No issue analytics data available.") tab_findings, tab_line_map, tab_export = st.tabs( ["Findings Table", "Error Line Map", "Export"] ) with tab_findings: st.subheader("Detected Issues") left, right = st.columns([1, 1.1]) with left: display_mode = st.radio( "Display mode", ["Issues Only", "All Analyzed Pairs"], horizontal=True, ) with right: conf_threshold = st.slider("Minimum confidence", 0.0, 1.0, 0.0, 0.01) display_df = issues_df if display_mode == "Issues Only" else df display_df = display_df[display_df["Confidence"] >= conf_threshold] if display_mode == "Issues Only" and display_df.empty: st.warning("No issues match this filter.") st.info("Try lower confidence or switch to 'All Analyzed Pairs'.") elif display_df.empty: st.info("No analyzed pairs match this filter.") else: display_df = display_df.copy().reset_index(drop=True) display_df.insert(0, "S.No", range(1, len(display_df) + 1)) cols = [ "S.No", "Label", "Confidence", "Reason", "Location 1", "Location 2", "Clause 1", "Clause 2", ] st.dataframe(display_df[cols], use_container_width=True) with tab_line_map: st.subheader("Error Line Dashboard") if line_issues: line_df = pd.DataFrame(line_issues) labels = sorted(line_df["Issue Type"].dropna().unique().tolist()) selected = st.multiselect("Filter issue types", labels, default=labels) page_min = int(line_df["Page"].min()) if not line_df.empty else 1 page_max = int(line_df["Page"].max()) if not line_df.empty else 1 if page_min == page_max: st.caption(f"Only one page with issues: Page {page_min}") page_range = (page_min, page_max) else: page_range = st.slider( "Page range", page_min, page_max, (page_min, page_max) ) if selected: line_df = line_df[line_df["Issue Type"].isin(selected)] line_df = line_df[ (line_df["Page"] >= page_range[0]) & (line_df["Page"] <= page_range[1]) ] st.dataframe(line_df, use_container_width=True) st.markdown("**Issue Occurrence By Line With Parties**") by_line = line_df.copy() by_line = by_line.sort_values( by=["Page", "Line", "Confidence"], ascending=[True, True, False] ) st.dataframe( by_line[ [ "Issue Type", "Page", "Line", "Vendor", "Vendee", "Confidence", "Reason", ] ], use_container_width=True, ) st.subheader("Jump To Error Line") if not line_df.empty: line_df = line_df.reset_index(drop=True) line_df.insert(0, "Item", range(1, len(line_df) + 1)) line_df["Jump"] = line_df.apply( lambda r: ( f"#{r['Item']} | Pg {int(r['Page'])}, Ln {int(r['Line'])} | {r['Issue Type']}" ), axis=1, ) selected_jump = st.selectbox( "Select issue line", line_df["Jump"].tolist() ) chosen = line_df[line_df["Jump"] == selected_jump].iloc[0] c1, c2 = st.columns([1.1, 1], gap="large") with c1: st.markdown( f"""
Selected Line
Pg {int(chosen["Page"])} ยท Ln {int(chosen["Line"])}
{chosen["Issue Type"]} | Confidence: {float(chosen["Confidence"]):.2f}
""", unsafe_allow_html=True, ) st.caption("Snippet") st.code(str(chosen["Snippet"]), language="text") st.caption("Reason") st.write(str(chosen["Reason"])) with c2: is_pdf = st.session_state.uploaded_ext == "pdf" if is_pdf and st.session_state.uploaded_bytes: st.caption("PDF Preview (jumped to selected page)") page_number = int(chosen["Page"]) pdf_b64 = base64.b64encode( st.session_state.uploaded_bytes ).decode("utf-8") pdf_html = f""" """ st.markdown(pdf_html, unsafe_allow_html=True) else: st.info( "Inline PDF preview is available for PDF uploads. Current file is not PDF." ) else: st.info("No line-level issues to display.") with tab_export: st.subheader("Download Reports") json_payload = json.dumps(results, indent=2) st.download_button( label="Download JSON Report", data=json_payload, file_name="semantic_integrity_report.json", mime="application/json", ) pdf_bytes = generate_pdf_report( [r for r in results if r["Label"] != "NO_CONFLICT"] ) st.download_button( label="Download PDF Report", data=pdf_bytes, file_name="semantic_integrity_report.pdf", mime="application/pdf", ) if st.button("Analyze Another Document"): st.session_state.analysis_done = False st.session_state.results = [] st.session_state.line_issues = [] st.rerun() def main(): st.set_page_config(page_title=APP_TITLE, layout="wide") apply_theme() init_state() top_col1, top_col2 = st.columns([5, 1]) with top_col1: st.title(APP_TITLE) with top_col2: if st.session_state.is_authenticated and st.button("Logout"): st.session_state.is_authenticated = False st.session_state.username = "" st.session_state.analysis_done = False st.session_state.results = [] st.session_state.line_issues = [] st.rerun() if not st.session_state.is_authenticated: login_page() return if not st.session_state.analysis_done: upload_page() else: dashboard_page() if __name__ == "__main__": main()