| import streamlit as st |
| import os |
| import base64 |
| import time |
| from transformers import AutoTokenizer, AutoModelForSeq2SeqLM |
| from transformers import pipeline |
| import torch |
| import textwrap |
| from dotenv import find_dotenv, load_dotenv |
| from langchain.document_loaders import PyPDFLoader, DirectoryLoader, PDFMinerLoader |
| from langchain.text_splitter import RecursiveCharacterTextSplitter |
| from langchain.embeddings import SentenceTransformerEmbeddings |
| from langchain.vectorstores import Chroma |
| from langchain.llms import HuggingFacePipeline |
| from langchain.chains import RetrievalQA |
| from constants import CHROMA_SETTINGS |
| from streamlit_chat import message |
|
|
|
|
| load_dotenv(find_dotenv()) |
|
|
|
|
| st.set_page_config(layout="wide") |
|
|
| checkpoint = "google/flan-ul2" |
| tokenizer = AutoTokenizer.from_pretrained(checkpoint) |
| base_model = AutoModelForSeq2SeqLM.from_pretrained( |
| checkpoint, |
| device_map="auto", |
| torch_dtype = torch.float32 |
| ) |
|
|
| persist_directory = "db" |
|
|
| @st.cache_resource |
| def data_ingestion(): |
| for root, dirs, files in os.walk("docs"): |
| for file in files: |
| if file.endswith(".pdf"): |
| print(file) |
| loader = PDFMinerLoader(os.path.join(root, file)) |
| documents = loader.load() |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=500) |
| texts = text_splitter.split_documents(documents) |
| |
| embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") |
| |
| db = Chroma.from_documents(texts, embeddings, persist_directory=persist_directory, client_settings=CHROMA_SETTINGS) |
| db.persist() |
| db=None |
|
|
| @st.cache_resource |
| def llm_pipeline(): |
| pipe = pipeline( |
| 'text2text-generation', |
| |
| |
| tokenizer = tokenizer, |
| max_length = 256, |
| do_sample = True, |
| temperature = 0.3, |
| top_p= 0.95 |
| ) |
| local_llm = HuggingFacePipeline(pipeline=pipe) |
| return local_llm |
|
|
| @st.cache_resource |
| def qa_llm(): |
| llm = llm_pipeline() |
| embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") |
| db = Chroma(persist_directory="db", embedding_function = embeddings, client_settings=CHROMA_SETTINGS) |
| retriever = db.as_retriever() |
| qa = RetrievalQA.from_chain_type( |
| llm = llm, |
| chain_type = "stuff", |
| retriever = retriever, |
| return_source_documents=True |
| ) |
| return qa |
|
|
| def process_answer(instruction): |
| response = '' |
| instruction = instruction |
| qa = qa_llm() |
| generated_text = qa(instruction) |
| answer = generated_text['result'] |
| return answer |
|
|
| def get_file_size(file): |
| file.seek(0, os.SEEK_END) |
| file_size = file.tell() |
| file.seek(0) |
| return file_size |
|
|
| @st.cache_data |
| |
| def displayPDF(file): |
| |
| with open(file, "rb") as f: |
| base64_pdf = base64.b64encode(f.read()).decode('utf-8') |
|
|
| |
| pdf_display = F'<iframe src="data:application/pdf;base64,{base64_pdf}" width="100%" height="600" type="application/pdf"></iframe>' |
|
|
| |
| st.markdown(pdf_display, unsafe_allow_html=True) |
|
|
| |
| def display_conversation(history): |
| for i in range(len(history["generated"])): |
| message(history["past"][i], is_user=True, key=str(i) + "_user") |
| message(history["generated"][i],key=str(i)) |
|
|
| def main(): |
| st.markdown("<h1 style='text-align: center; color: blue;'>ChatPDFv2</h1>", unsafe_allow_html=True) |
|
|
| st.markdown("<h2 style='text-align: center; color:red;'>Upload your PDF</h2>", unsafe_allow_html=True) |
|
|
| uploaded_file = st.file_uploader("", type=["pdf"]) |
|
|
| if uploaded_file is not None: |
| file_details = { |
| "Filename": uploaded_file.name, |
| "File size": get_file_size(uploaded_file) |
| } |
| filepath = "docs/"+uploaded_file.name |
| with open(filepath, "wb") as temp_file: |
| temp_file.write(uploaded_file.read()) |
|
|
| col1, col2= st.columns([1,2]) |
| with col1: |
| st.markdown("<h4 style color:black;'>File details</h4>", unsafe_allow_html=True) |
| st.json(file_details) |
| st.markdown("<h4 style color:black;'>File preview</h4>", unsafe_allow_html=True) |
| pdf_view = displayPDF(filepath) |
|
|
| with col2: |
| with st.spinner('Embeddings are in process...'): |
| ingested_data = data_ingestion() |
| st.success('Embeddings are created successfully!') |
| st.markdown("<h4 style color:black;'>Chat Here</h4>", unsafe_allow_html=True) |
|
|
|
|
| user_input = st.text_input("", key="input") |
|
|
| |
| if "generated" not in st.session_state: |
| st.session_state["generated"] = ["I am ready to help you"] |
| if "past" not in st.session_state: |
| st.session_state["past"] = ["Hey there!"] |
| |
| |
| if user_input: |
| answer = process_answer({'query': user_input}) |
| st.session_state["past"].append(user_input) |
| response = answer |
| st.session_state["generated"].append(response) |
|
|
| |
| if st.session_state["generated"]: |
| display_conversation(st.session_state) |
| |
|
|
| |
|
|
|
|
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|
|
|