Spaces:
Running
Running
| import os | |
| import sys | |
| import io | |
| import time | |
| import PyPDF2 | |
| import asyncio | |
| # import fitz | |
| import pytesseract | |
| import dotenv | |
| from config.env_constant import EnvFilepath | |
| dotenv.load_dotenv(EnvFilepath.ENVPATH) | |
| from PyPDF2 import PdfReader | |
| from functools import wraps | |
| from typing import ByteString | |
| from pdf2image import convert_from_bytes | |
| def measure_runtime(func): | |
| if asyncio.iscoroutinefunction(func): | |
| async def async_wrapper(*args, **kwargs): | |
| start = time.perf_counter() | |
| result = await func(*args, **kwargs) | |
| end = time.perf_counter() | |
| print(f"β±οΈ Async function '{func.__name__}' executed in {end - start:.10f} seconds") | |
| return result | |
| return async_wrapper | |
| else: | |
| def sync_wrapper(*args, **kwargs): | |
| start = time.perf_counter() | |
| result = func(*args, **kwargs) | |
| end = time.perf_counter() | |
| print(f"β±οΈ Function '{func.__name__}' executed in {end - start:.10f} seconds") | |
| return result | |
| return sync_wrapper | |
| # async def is_nonsearchable_pdf(pdf_path: str) -> str: | |
| # try: | |
| # doc = fitz.open(pdf_path) | |
| # for page_num in range(doc.page_count): | |
| # page = doc.load_page(page_num) | |
| # # Attempt to extract text from the page | |
| # text = page.get_text("text") | |
| # text = f"__{text.strip()}__" | |
| # if text == "____": | |
| # print("Non Searchable") | |
| # return True | |
| # else: | |
| # print("Searchable") | |
| # return False | |
| # except Exception as E: | |
| # print(f"Failed to identify nonsearchable, {E}") | |
| # return False | |
| # finally: | |
| # doc.close() | |
| def pdf_decoder(pdf_bytes: bytes) -> str: | |
| """Decode PDF bytes to a string.""" | |
| pdf_stream = io.BytesIO(pdf_bytes) | |
| reader = PdfReader(pdf_stream) | |
| # Extract and concatenate text from all pages | |
| user_profile = "" | |
| for page in reader.pages: | |
| text = page.extract_text() | |
| if text: | |
| user_profile += text + "\n" | |
| return user_profile.strip() | |
| # from fastapi.datastructures import UploadedFile | |
| # async def pdf_reader(pdf_path: str) -> str: | |
| # """Decode PDF bytes to a string.""" | |
| # try: | |
| # is_empty = await is_nonsearchable_pdf(pdf_path) | |
| # print(f">> Is nonsearchable: {is_empty}") | |
| # if is_empty: | |
| # pdf_writer = PyPDF2.PdfWriter() | |
| # poppler_path = None | |
| # if sys.platform == "win32": | |
| # poppler_path = "src/software/poppler-24.08.0/Library/bin" | |
| # print(f"pdf_path",pdf_path) | |
| # print(f"type pdf_path",type(pdf_path)) | |
| # # if type(pdf_path) != str: | |
| # # images = convert_from_bytes(pdf_path, poppler_path=poppler_path) | |
| # # else: | |
| # images = convert_from_bytes(pdf_path, poppler_path=poppler_path) | |
| # pytesseract.pytesseract.tesseract_cmd = r"src/software/Tesseract-OCR/tesseract.exe" | |
| # for image in images: | |
| # page = pytesseract.image_to_pdf_or_hocr(image, extension='pdf') | |
| # pdf = PyPDF2.PdfReader(io.BytesIO(page)) | |
| # pdf_writer.add_page(pdf.pages[0]) | |
| # output_bytes_stream = io.BytesIO() | |
| # pdf_writer.write(output_bytes_stream) | |
| # reader = PyPDF2.PdfReader(output_bytes_stream) | |
| # user_profile = "" | |
| # for page in reader.pages: | |
| # text = page.extract_text() | |
| # user_profile += text + "\n" | |
| # return user_profile | |
| # else: | |
| # reader = PdfReader(pdf_path) | |
| # # Extract and concatenate text from all pages | |
| # user_profile = "" | |
| # for page in reader.pages: | |
| # text = page.extract_text() | |
| # if text: | |
| # user_profile += text + "\n" | |
| # print(f">>> user profile: {user_profile.strip()}") | |
| # return user_profile.strip() | |
| # except Exception as E: | |
| # print(f"pdf reader error, {E}") | |
| # exc_type, exc_obj, exc_tb = sys.exc_info() | |
| # fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] | |
| # print(exc_type, fname, exc_tb.tb_lineno) | |
| # import tempfile | |
| async def pdf_reader(pdf_path: ByteString) -> str: | |
| """Read PDF bytes to a string.""" | |
| try: | |
| user_profile = "" | |
| if type(pdf_path) == bytes: | |
| reader = PdfReader(io.BytesIO(pdf_path)) | |
| else: | |
| reader = PdfReader(pdf_path) | |
| for page in reader.pages: | |
| text = page.extract_text() | |
| if text: | |
| user_profile += text + "\n" | |
| if user_profile.strip() != "": | |
| return user_profile.strip() | |
| else: | |
| pdf_writer = PyPDF2.PdfWriter() | |
| poppler_path = None | |
| if sys.platform == "win32": | |
| poppler_path = "src/software/poppler-24.08.0/Library/bin" | |
| # images = convert_from_bytes(pdf_path.getvalue(), poppler_path=poppler_path) | |
| images = convert_from_bytes(pdf_path, poppler_path=poppler_path) | |
| pytesseract.pytesseract.tesseract_cmd = r"src/software/Tesseract-OCR/tesseract.exe" | |
| for image in images: | |
| page = pytesseract.image_to_pdf_or_hocr(image, extension='pdf') | |
| pdf = PyPDF2.PdfReader(io.BytesIO(page)) | |
| # pdf = PyPDF2.PdfReader(page) | |
| pdf_writer.add_page(pdf.pages[0]) | |
| output_bytes_stream = io.BytesIO() | |
| pdf_writer.write(output_bytes_stream) | |
| reader = PyPDF2.PdfReader(output_bytes_stream) | |
| user_profile = "" | |
| for page in reader.pages: | |
| text = page.extract_text() | |
| user_profile += text + "\n" | |
| return user_profile.strip() | |
| except Exception as E: | |
| print(f"pdf reader error, {E}") | |
| exc_type, exc_obj, exc_tb = sys.exc_info() | |
| fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] | |
| print(exc_type, fname, exc_tb.tb_lineno) | |
| # cv = pdf_reader("src/data/cvs/1. Balu Rama Chandra_Data Scientist_Linkedin.pdf") | |
| # print(cv) | |
| # len(cv.pages) | |
| # for page in cv.pages: | |
| # print(page.extract_text()) | |
| # ------------------ | |
| # QDRANT | |
| # ------------------ | |
| # from qdrant_client import QdrantClient, models | |
| # import uuid | |
| # from src.embed_model.embed_model import embed_model | |
| # from qdrant_client import AsyncQdrantClient, models | |
| # from fastapi import HTTPException | |
| # from typing import Dict, List, Union | |
| # from src.models.data_model import Profile, OutProfile | |
| # qdrant_client = AsyncQdrantClient( | |
| # url=os.environ.get('ss--qdrant--endpoint--url'), | |
| # api_key=os.environ.get('ss--qdrant--api-key'), | |
| # ) | |
| # qdrant_collection_name = os.environ.get('ss--qdrant--collection--name') | |
| # async def check_collection(qdrant_client:AsyncQdrantClient=qdrant_client, collection_name: str=qdrant_collection_name): | |
| # try: | |
| # colls = await qdrant_client.get_collections() | |
| # if collection_name not in [item.name for item in colls.collections]: | |
| # await qdrant_client.create_collection( | |
| # collection_name=qdrant_collection_name, | |
| # vectors_config=models.VectorParams(size=1536, distance=models.Distance.COSINE), | |
| # ) | |
| # print(f"β collection '{collection_name}' is created!") | |
| # return True | |
| # else: | |
| # print(f"β collection '{collection_name}' already created!") | |
| # except Exception as E: | |
| # print(f"β Something when wrong!, {E}") | |
| # return False | |
| # async def prettyfy_profile(profile:Dict) -> str: | |
| # template = "----\n" | |
| # for k, v in profile.items(): | |
| # template += f"{k}: {v} \n" | |
| # template += "----" | |
| # return template | |
| # async def ingest_one_profile(profile:Profile, qdrant_client:AsyncQdrantClient=qdrant_client, collection_name:str=qdrant_collection_name): | |
| # try: | |
| # await check_collection(qdrant_client, collection_name) | |
| # text = await prettyfy_profile(profile.profile.model_dump()) | |
| # doc_id = profile.profile_id | |
| # embeddings = await embed_model.aembed_query(text = text) | |
| # qdrant_client.upload_points( | |
| # collection_name=collection_name, | |
| # points=[ | |
| # models.PointStruct( | |
| # id=doc_id, | |
| # payload=profile.model_dump(), | |
| # vector=embeddings, | |
| # ) | |
| # ] | |
| # ) | |
| # print(f"β Ingest one profile succeeded!") | |
| # except Exception as E: | |
| # print(f"β Ingest one profile error!, {E}") | |
| # raise HTTPException(status_code=500, detail=f"β Ingest one profile error!, {E}") | |
| # async def ingest_bulk_profile(profiles:List[Profile], qdrant_client:AsyncQdrantClient=qdrant_client, collection_name:str=qdrant_collection_name): | |
| # try: | |
| # await check_collection(qdrant_client, collection_name) | |
| # points = [] | |
| # for profile in profiles: | |
| # text = await prettyfy_profile(profile.profile.model_dump()) | |
| # doc_id = profile.profile_id | |
| # embeddings = await embed_model.aembed_query(text = text) | |
| # points.append( | |
| # models.PointStruct( | |
| # id=doc_id, | |
| # payload=profile.model_dump(), | |
| # vector=embeddings, | |
| # ) | |
| # ) | |
| # qdrant_client.upload_points( | |
| # collection_name=collection_name, | |
| # points=points | |
| # ) | |
| # print(f"β Ingest bulk profile succeeded!") | |
| # except Exception as E: | |
| # print(f"β Ingest bulk profile error!, {E}") | |
| # raise HTTPException(status_code=500, detail=f"β Ingest bulk profile error!, {E}") | |
| # async def pretty_profiles(profiles:List[Union[Profile, Dict]]) -> pd.DataFrame: | |
| # try: | |
| # records = [] | |
| # for profile in profiles: | |
| # temp = {} | |
| # # text = await prettyfy_profile(profile.profile.model_dump()) | |
| # # doc_id = profile.profile_id | |
| # filename = profile.filename | |
| # # if type(profile.profile) != Dict: | |
| # # temp = {**{"filename":filename}, **profile.profile.model_dump()} | |
| # # else: | |
| # if type(profile.profile) == dict: | |
| # temp = {**{"filename":filename}, **profile.profile} | |
| # elif type(profile.profile) == OutProfile: | |
| # temp = {**{"filename":filename}, **profile.profile.model_dump()} | |
| # if type(temp["hardskills"]) == list and temp["hardskills"] != []: | |
| # temp["hardskills"] = ", ".join(temp["hardskills"]) | |
| # else: | |
| # temp["hardskills"] = "-" | |
| # if type(temp["softskills"]) == list and temp["softskills"] != []: | |
| # temp["softskills"] = ", ".join(temp["softskills"]) | |
| # else: | |
| # temp["softskills"] = "-" | |
| # if type(temp["certifications"]) == list and temp["certifications"] != []: | |
| # temp["certifications"] = ", ".join(temp["certifications"]) | |
| # else: | |
| # temp["certifications"] = "-" | |
| # if type(temp["business_domain_experiences"]) == list and temp["business_domain_experiences"] != []: | |
| # temp["business_domain_experiences"] = ", ".join(temp["business_domain_experiences"]) | |
| # else: | |
| # temp["business_domain_experiences"] = "-" | |
| # records.append(temp) | |
| # # embeddings = await embed_model.aembed_query(text = text) | |
| # print(f"β Export profile succeeded!") | |
| # df = pd.DataFrame(records) | |
| # return df | |
| # except Exception as E: | |
| # print(f"β Export profile error!, {E}") | |
| # error_message = f"Processing pretty profile error: {E}" | |
| # print(error_message) | |
| # exc_type, exc_obj, exc_tb = sys.exc_info() | |
| # fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] | |
| # print(exc_type, fname, exc_tb.tb_lineno) | |
| # raise HTTPException(status_code=500, detail=f"β Export profile error!, {E}") | |
| # async def helper_prepare_profiles(file_names:List, output_profiles:List[Union[OutProfile, Dict]]): | |
| # if len(file_names) == len(output_profiles): | |
| # profiles = [] | |
| # for i in range(len(output_profiles)): | |
| # one_profile = Profile( | |
| # filename=file_names[i].split('\\')[-1], | |
| # profile_id=str(uuid.uuid4()), | |
| # profile=output_profiles[i] | |
| # ) | |
| # profiles.append(one_profile) | |
| # return profiles | |
| # else: | |
| # return [] | |
| # asyncio.run(ingest_one_profile(profile)) | |
| # asyncio.run(ingest_one_profile(fake_profile)) | |
| # async def retrieve_profile(input_user: str, qdrant_client:AsyncQdrantClient=qdrant_client, collection_name:str=qdrant_collection_name, limit:int=5): | |
| # try: | |
| # embeddings = await embed_model.aembed_query(text = input_user) | |
| # query_result = await qdrant_client.query_points( | |
| # collection_name=collection_name, | |
| # query=embeddings, | |
| # limit=limit, | |
| # ) | |
| # return query_result.points | |
| # except Exception as E: | |
| # print(f"β retrieve_profile error, {E}") | |
| # return [] | |
| # criteria1 = """latest_university: Institut Teknologi Sepuluh November (ITS) | |
| # major: Matematika | |
| # gpa: >3.6 | |
| # hardskill: Certified Business Strategic Business Analyst, analytics | |
| # business_domain_experience: people analytics""" | |
| # criteria1 = """universitas: Institut Teknologi Sepuluh November (ITS)""" | |
| # retrieved_profiles = asyncio.run(retrieve_profile(criteria1, limit=None)) | |
| # len(retrieved_profiles) | |
| # retrieved_profiles[-1].payload | |
| # from langchain_community.document_loaders import PyPDFLoader | |
| # loader = PyPDFLoader(files_path[0]) | |
| # pages = [] | |
| # for page in loader.lazy_load(): | |
| # pages.append(page) | |
| # len(pages) | |