import os import time import logging import sys import gradio as gr from pinecone import Pinecone, ServerlessSpec from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, StorageContext from llama_index.vector_stores.pinecone import PineconeVectorStore from llama_index.readers.file import PDFReader # Optional: only if you are using OpenAI as the default LLM / embeddings # from llama_index.llms.openai import OpenAI # from llama_index.embeddings.openai import OpenAIEmbedding # from llama_index.core import Settings # --- Logging --- logging.basicConfig(stream=sys.stdout, level=logging.INFO) logger = logging.getLogger(__name__) # --- Load Secrets from Hugging Face Spaces --- # Add these in: Space Settings -> Variables and secrets PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY") OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") # optional, if needed if not PINECONE_API_KEY: raise ValueError("Missing PINECONE_API_KEY in Hugging Face Space secrets.") # If you use OpenAI in LlamaIndex, uncomment this check # if not OPENAI_API_KEY: # raise ValueError("Missing OPENAI_API_KEY in Hugging Face Space secrets.") # If you use OpenAI explicitly in LlamaIndex, uncomment this section # Settings.llm = OpenAI(model="gpt-4.1-mini", api_key=OPENAI_API_KEY) # Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small", api_key=OPENAI_API_KEY) # --- Pinecone Config --- INDEX_NAME = "quickstart" DIMENSION = 1536 METRIC = "euclidean" CLOUD = "aws" REGION = "us-east-1" # --- Initialize Pinecone --- pc = Pinecone(api_key=PINECONE_API_KEY) def get_existing_index_names(pc_client): """Safely extract index names across SDK response shapes.""" raw = pc_client.list_indexes() # Common case: iterable of dicts try: return [idx["name"] for idx in raw] except Exception: pass # Alternate case: object with .indexes try: return [idx["name"] for idx in raw.indexes] except Exception: pass # Fallback return [] def ensure_index(pc_client, index_name: str, dimension: int): existing_indexes = get_existing_index_names(pc_client) if index_name not in existing_indexes: logger.info(f"Creating Pinecone index: {index_name}") pc_client.create_index( name=index_name, dimension=dimension, metric=METRIC, spec=ServerlessSpec(cloud=CLOUD, region=REGION), ) # Small wait to avoid race condition on first startup time.sleep(5) else: logger.info(f"Using existing Pinecone index: {index_name}") return pc_client.Index(index_name) # --- Load Documents --- def load_documents(): documents = SimpleDirectoryReader( input_dir="data", required_exts=[".pdf"], file_extractor={".pdf": PDFReader()} ).load_data() if not documents: raise ValueError("No PDF documents were loaded from the 'data' folder.") logger.info(f"Loaded {len(documents)} document chunks/items.") return documents # --- Build Query Engine Once at Startup --- def build_query_engine(): pinecone_index = ensure_index(pc, INDEX_NAME, DIMENSION) documents = load_documents() vector_store = PineconeVectorStore(pinecone_index=pinecone_index) storage_context = StorageContext.from_defaults(vector_store=vector_store) index = VectorStoreIndex.from_documents( documents, storage_context=storage_context ) return index.as_query_engine() query_engine = build_query_engine() # --- Gradio Function --- def query_doc(prompt): if not prompt or not prompt.strip(): return "Please enter a question." try: response = query_engine.query(prompt) return str(response) except Exception as e: logger.exception("Query failed") return f"Error: {str(e)}" # --- Gradio UI --- demo = gr.Interface( fn=query_doc, inputs=gr.Textbox( label="Ask a question about the document", placeholder="What does the policy say about social media conduct?" ), outputs=gr.Textbox(label="Answer"), title="DDS Enterprise Chatbot", description="Ask questions based on the indexed Social Media Regulation PDF. Powered by LlamaIndex & Pinecone." ) if __name__ == "__main__": demo.launch()