# app.py - Fixed version with proper error handling import sqlite3 import os import streamlit as st import chromadb from typing import Dict, Optional, Any from pathlib import Path from dotenv import load_dotenv from llama_index.core import VectorStoreIndex, StorageContext, Settings from llama_index.vector_stores.chroma import ChromaVectorStore from llama_index.llms.groq import Groq from llama_index.embeddings.cohere import CohereEmbedding # Load environment variables first load_dotenv() # Disable ChromaDB telemetry to remove the warning os.environ["ANONYMIZED_TELEMETRY"] = "False" # Setup OTel via Arize's convenience function with error handling try: from arize.otel import register from openinference.instrumentation.llama_index import LlamaIndexInstrumentor if os.getenv("ARIZE_SPACE_ID") and os.getenv("ARIZE_API_KEY"): tracer_provider = register( space_id=os.getenv("ARIZE_SPACE_ID"), api_key=os.getenv("ARIZE_API_KEY"), project_name="rbacrag" ) LlamaIndexInstrumentor().instrument(tracer_provider=tracer_provider) else: print("Arize credentials not found, skipping instrumentation") except Exception as e: print(f"Warning: Arize instrumentation failed: {e}") # Import database module from database import db, initialize_users # Initialize default users with better error handling try: success_count, error_count = initialize_users() if error_count > 0: print(f"Database initialization completed with {error_count} errors (likely users already exist)") else: print(f"Database initialization successful: {success_count} users ready") except Exception as e: print(f"Error during user initialization: {e}") # Role-based access control for documents ROLE_ACCESS = { "hr": ["hr", "general"], "engineering": ["engineering", "general"], "finance": ["finance", "general"], "marketing": ["marketing", "general"] } def initialize_session_state(): """Initialize or reset the session state""" if "authenticated" not in st.session_state: st.session_state.authenticated = False if "username" not in st.session_state: st.session_state.username = None if "role" not in st.session_state: st.session_state.role = None if "messages" not in st.session_state: st.session_state.messages = [] if "vector_index" not in st.session_state: st.session_state.vector_index = None if "query_engine" not in st.session_state: st.session_state.query_engine = None # Set page config st.set_page_config( page_title="Departmental RAG System", page_icon="🔒", layout="centered", initial_sidebar_state="collapsed" ) # Initialize session state initialize_session_state() def login(username: str, password: str) -> bool: """ Authenticate user and set session state Args: username: The username to authenticate password: The password to verify Returns: bool: True if authentication was successful, False otherwise """ try: user = db.verify_user(username, password) if user: st.session_state.authenticated = True st.session_state.username = user["username"] st.session_state.role = user["role"] st.session_state.messages = [ {"role": "assistant", "content": f"Welcome, {user['username']}! How can I assist you today?"} ] st.rerun() return True return False except Exception as e: st.error(f"An error occurred during login: {str(e)}") return False def logout(): """Log out the current user and clear session state""" username = st.session_state.get('username', 'Unknown') st.session_state.clear() initialize_session_state() st.success(f"Successfully logged out {username}") st.rerun() @st.cache_resource def load_vector_index(role: str): """Load the ChromaDB index for the user's role with enhanced error handling""" try: # Initialize Cohere embeddings cohere_api_key = os.getenv("COHERE_API_KEY") if not cohere_api_key: st.error("❌ COHERE_API_KEY not found in environment variables") st.info("Please set your Cohere API key in the .env file") st.stop() embed_model = CohereEmbedding( cohere_api_key=cohere_api_key, model_name="embed-english-v3.0", input_type="search_document" ) Settings.embed_model = embed_model # Docker-compatible ChromaDB initialization persist_dir = f"./chroma_db/{role}" # Ensure directory exists Path(persist_dir).mkdir(parents=True, exist_ok=True) # Initialize Chroma client with telemetry disabled try: chroma_client = chromadb.PersistentClient( path=persist_dir, settings=chromadb.Settings( anonymized_telemetry=False, allow_reset=True ) ) except Exception as e: st.warning(f"Failed to connect to persistent ChromaDB: {e}") st.info("Attempting to create new collection...") # Try to reset and recreate try: chroma_client = chromadb.PersistentClient(path=persist_dir) chroma_client.reset() chroma_client = chromadb.PersistentClient( path=persist_dir, settings=chromadb.Settings( anonymized_telemetry=False, allow_reset=True ) ) except: # Fallback to in-memory client st.warning("⚠️ Using in-memory ChromaDB (data will not persist)") chroma_client = chromadb.Client( settings=chromadb.Settings(anonymized_telemetry=False) ) # Try to get existing collection, create if it doesn't exist collection_name = "documents" try: chroma_collection = chroma_client.get_collection(collection_name) st.success(f"✅ Connected to existing collection for {role} role") except Exception: st.warning(f"⚠️ Collection '{collection_name}' not found for role '{role}'. Creating empty collection.") try: chroma_collection = chroma_client.create_collection( name=collection_name, metadata={"hnsw:space": "cosine"} ) st.info("📝 Created new empty collection. You may need to add documents first.") except Exception as create_error: st.error(f"❌ Failed to create collection: {create_error}") st.stop() # Create vector store vector_store = ChromaVectorStore(chroma_collection=chroma_collection) # Create storage context storage_context = StorageContext.from_defaults(vector_store=vector_store) # Check if collection has documents if chroma_collection.count() == 0: st.warning(f"📭 No documents found in {role} collection.") st.info("The system will work, but responses will be limited without documents.") # Create empty index for now index = VectorStoreIndex([], storage_context=storage_context, embed_model=embed_model) else: st.info(f"📚 Found {chroma_collection.count()} documents in {role} collection") # Load the index index = VectorStoreIndex.from_vector_store( vector_store=vector_store, storage_context=storage_context, embed_model=embed_model ) return index except Exception as e: st.error(f"❌ Error loading vector index: {str(e)}") st.info("**Possible solutions:**") st.info("1. Check that ChromaDB collections exist for this role") st.info("2. Verify database files are properly mounted in Docker") st.info("3. Check permissions on the database directory") st.info("4. Ensure COHERE_API_KEY is set correctly") st.stop() def chat_interface(): """Main chat interface""" # Add styled heading st.markdown(f"

💬 {st.session_state.role.capitalize()} Department Chat

", unsafe_allow_html=True) # Display chat messages for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"]) # Load the appropriate index for the user's role index = load_vector_index(st.session_state.role) # Initialize Groq LLM try: groq_api_key = os.getenv("GROQ_API_KEY") if not groq_api_key: st.error("❌ GROQ_API_KEY not found in environment variables") st.info("Please set your Groq API key in the .env file") st.stop() llm = Groq( model="llama3-8b-8192", api_key=groq_api_key, temperature=0.5, system_prompt=f"You are a helpful assistant specialized in {st.session_state.role} department documents. Answer the user queries with the help of the provided context with high accuracy and precision." ) # Create query engine with the LLM query_engine = index.as_query_engine( llm=llm, similarity_top_k=3, response_mode="compact" ) except Exception as e: st.error(f"❌ Error initializing LLM: {str(e)}") st.warning("⚠️ Falling back to default LLM settings. Some features may be limited.") query_engine = index.as_query_engine( similarity_top_k=3, response_mode="compact" ) # Chat input if prompt := st.chat_input(f"Ask about {st.session_state.role} documents..."): # Add user message to chat history st.session_state.messages.append({"role": "user", "content": prompt}) # Display user message with st.chat_message("user"): st.markdown(prompt) # Get and display assistant response with st.chat_message("assistant"): message_placeholder = st.empty() full_response = "" try: # Get response from query engine response = query_engine.query(prompt) full_response = str(response) message_placeholder.markdown(full_response) except Exception as e: error_msg = f"❌ Error generating response: {str(e)}" message_placeholder.error(error_msg) full_response = error_msg # Add assistant response to chat history st.session_state.messages.append({"role": "assistant", "content": full_response}) def show_login_form(): """Display the beautiful login form""" st.markdown( """ """, unsafe_allow_html=True ) st.markdown('

🔒

', unsafe_allow_html=True) st.markdown('

Department Portal

', unsafe_allow_html=True) st.markdown('

Sign in to access your department\'s knowledge base

', unsafe_allow_html=True) with st.container(): with st.form("login_form", border=True): username = st.text_input("Username", placeholder="Enter your username") password = st.text_input("Password", type="password", placeholder="Enter your password") login_button = st.form_submit_button("Sign In") if login_button: if not username or not password: st.error("Please enter both username and password") elif login(username, password): st.success(f"Welcome, {username}! Redirecting...") else: st.error("Invalid username or password") with st.expander("Need demo credentials?"): st.markdown(""" - **Engineering:** `Tony` / `password123` - **Marketing:** `Bruce` / `securepass` - **Finance:** `Sam` / `financepass` - **HR:** `Natasha` / `hrpass123` """) st.markdown('

2025 Department RAG System

', unsafe_allow_html=True) def main(): """ Main application entry point Handles routing between login and main application """ # Sidebar for logout and user info if st.session_state.authenticated: with st.sidebar: st.markdown(f"### Welcome, {st.session_state.username}") st.markdown(f"**Role:** {st.session_state.role.capitalize()}") if st.button("Logout", key="logout_btn"): logout() return st.markdown("---") st.markdown("### About") st.markdown(""" This is a secure departmental RAG system that provides role-based access to information across different departments. """) # Show database status try: users = db.list_users() st.markdown("---") st.markdown("### System Status") st.markdown(f"✅ Database: {len(users)} users") st.markdown("✅ Authentication: Active") except: st.markdown("⚠️ Database: Connection issues") # Main content area if not st.session_state.authenticated: show_login_form() else: chat_interface() if __name__ == "__main__": main()