File size: 4,346 Bytes
dd7a19d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | import os
import time
import logging
import sys
import gradio as gr
from pinecone import Pinecone, ServerlessSpec
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, StorageContext
from llama_index.vector_stores.pinecone import PineconeVectorStore
from llama_index.readers.file import PDFReader
# Optional: only if you are using OpenAI as the default LLM / embeddings
# from llama_index.llms.openai import OpenAI
# from llama_index.embeddings.openai import OpenAIEmbedding
# from llama_index.core import Settings
# --- Logging ---
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Load Secrets from Hugging Face Spaces ---
# Add these in: Space Settings -> Variables and secrets
PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY")
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") # optional, if needed
if not PINECONE_API_KEY:
raise ValueError("Missing PINECONE_API_KEY in Hugging Face Space secrets.")
# If you use OpenAI in LlamaIndex, uncomment this check
# if not OPENAI_API_KEY:
# raise ValueError("Missing OPENAI_API_KEY in Hugging Face Space secrets.")
# If you use OpenAI explicitly in LlamaIndex, uncomment this section
# Settings.llm = OpenAI(model="gpt-4.1-mini", api_key=OPENAI_API_KEY)
# Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small", api_key=OPENAI_API_KEY)
# --- Pinecone Config ---
INDEX_NAME = "quickstart"
DIMENSION = 1536
METRIC = "euclidean"
CLOUD = "aws"
REGION = "us-east-1"
# --- Initialize Pinecone ---
pc = Pinecone(api_key=PINECONE_API_KEY)
def get_existing_index_names(pc_client):
"""Safely extract index names across SDK response shapes."""
raw = pc_client.list_indexes()
# Common case: iterable of dicts
try:
return [idx["name"] for idx in raw]
except Exception:
pass
# Alternate case: object with .indexes
try:
return [idx["name"] for idx in raw.indexes]
except Exception:
pass
# Fallback
return []
def ensure_index(pc_client, index_name: str, dimension: int):
existing_indexes = get_existing_index_names(pc_client)
if index_name not in existing_indexes:
logger.info(f"Creating Pinecone index: {index_name}")
pc_client.create_index(
name=index_name,
dimension=dimension,
metric=METRIC,
spec=ServerlessSpec(cloud=CLOUD, region=REGION),
)
# Small wait to avoid race condition on first startup
time.sleep(5)
else:
logger.info(f"Using existing Pinecone index: {index_name}")
return pc_client.Index(index_name)
# --- Load Documents ---
def load_documents():
documents = SimpleDirectoryReader(
input_dir="data",
required_exts=[".pdf"],
file_extractor={".pdf": PDFReader()}
).load_data()
if not documents:
raise ValueError("No PDF documents were loaded from the 'data' folder.")
logger.info(f"Loaded {len(documents)} document chunks/items.")
return documents
# --- Build Query Engine Once at Startup ---
def build_query_engine():
pinecone_index = ensure_index(pc, INDEX_NAME, DIMENSION)
documents = load_documents()
vector_store = PineconeVectorStore(pinecone_index=pinecone_index)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
documents,
storage_context=storage_context
)
return index.as_query_engine()
query_engine = build_query_engine()
# --- Gradio Function ---
def query_doc(prompt):
if not prompt or not prompt.strip():
return "Please enter a question."
try:
response = query_engine.query(prompt)
return str(response)
except Exception as e:
logger.exception("Query failed")
return f"Error: {str(e)}"
# --- Gradio UI ---
demo = gr.Interface(
fn=query_doc,
inputs=gr.Textbox(
label="Ask a question about the document",
placeholder="What does the policy say about social media conduct?"
),
outputs=gr.Textbox(label="Answer"),
title="DDS Enterprise Chatbot",
description="Ask questions based on the indexed Social Media Regulation PDF. Powered by LlamaIndex & Pinecone."
)
if __name__ == "__main__":
demo.launch()
|