| """
|
| Data Processing Functions.
|
| Supports:
|
| - Segmentation of long text
|
| - Segmentation of file content
|
| """
|
| from langchain_community.document_loaders import TextLoader, PyPDFLoader, Docx2txtLoader, BSHTMLLoader, JSONLoader
|
| from nltk.tokenize import sent_tokenize
|
| from collections import Counter
|
| import re
|
| import json
|
| import yaml
|
| import os
|
| import yaml
|
| import os
|
| import inspect
|
| import ast
|
| with open(os.path.join(os.path.dirname(__file__), "..", "config.yaml")) as file:
|
| config = yaml.safe_load(file)
|
|
|
|
|
| def load_extraction_config(yaml_path):
|
|
|
| if not os.path.exists(yaml_path):
|
| print(f"Error: The config file '{yaml_path}' does not exist.")
|
| return {}
|
|
|
| with open(yaml_path, 'r') as file:
|
| config = yaml.safe_load(file)
|
|
|
|
|
| model_config = config.get('model', {})
|
| extraction_config = config.get('extraction', {})
|
|
|
|
|
| model_name_or_path = model_config.get('model_name_or_path', "")
|
| model_category = model_config.get('category', "")
|
| api_key = model_config.get('api_key', "")
|
| base_url = model_config.get('base_url', "")
|
| vllm_serve = model_config.get('vllm_serve', False)
|
|
|
|
|
| task = extraction_config.get('task', "")
|
| instruction = extraction_config.get('instruction', "")
|
| text = extraction_config.get('text', "")
|
| output_schema = extraction_config.get('output_schema', "")
|
| constraint = extraction_config.get('constraint', "")
|
| truth = extraction_config.get('truth', "")
|
| use_file = extraction_config.get('use_file', False)
|
| file_path = extraction_config.get('file_path', "")
|
| mode = extraction_config.get('mode', "quick")
|
| update_case = extraction_config.get('update_case', False)
|
| show_trajectory = extraction_config.get('show_trajectory', False)
|
|
|
|
|
| if 'construct' in config:
|
| construct_config = config.get('construct', {})
|
| database = construct_config.get('database', "")
|
| url = construct_config.get('url', "")
|
| username = construct_config.get('username', "")
|
| password = construct_config.get('password', "")
|
|
|
| return {
|
| "model": {
|
| "model_name_or_path": model_name_or_path,
|
| "category": model_category,
|
| "api_key": api_key,
|
| "base_url": base_url,
|
| "vllm_serve": vllm_serve
|
| },
|
| "extraction": {
|
| "task": task,
|
| "instruction": instruction,
|
| "text": text,
|
| "output_schema": output_schema,
|
| "constraint": constraint,
|
| "truth": truth,
|
| "use_file": use_file,
|
| "file_path": file_path,
|
| "mode": mode,
|
| "update_case": update_case,
|
| "show_trajectory": show_trajectory
|
| },
|
| "construct": {
|
| "database": database,
|
| "url": url,
|
| "username": username,
|
| "password": password
|
| }
|
| }
|
|
|
|
|
| return {
|
| "model": {
|
| "model_name_or_path": model_name_or_path,
|
| "category": model_category,
|
| "api_key": api_key,
|
| "base_url": base_url,
|
| "vllm_serve": vllm_serve
|
| },
|
| "extraction": {
|
| "task": task,
|
| "instruction": instruction,
|
| "text": text,
|
| "output_schema": output_schema,
|
| "constraint": constraint,
|
| "truth": truth,
|
| "use_file": use_file,
|
| "file_path": file_path,
|
| "mode": mode,
|
| "update_case": update_case,
|
| "show_trajectory": show_trajectory
|
| }
|
| }
|
|
|
|
|
| def chunk_str(text):
|
| sentences = sent_tokenize(text)
|
| chunks = []
|
| current_chunk = []
|
| current_length = 0
|
|
|
| for sentence in sentences:
|
| token_count = len(sentence.split())
|
| if current_length + token_count <= config['agent']['chunk_token_limit']:
|
| current_chunk.append(sentence)
|
| current_length += token_count
|
| else:
|
| if current_chunk:
|
| chunks.append(' '.join(current_chunk))
|
| current_chunk = [sentence]
|
| current_length = token_count
|
| if current_chunk:
|
| chunks.append(' '.join(current_chunk))
|
| return chunks
|
|
|
|
|
| def chunk_file(file_path):
|
| pages = []
|
|
|
| if file_path.endswith(".pdf"):
|
| loader = PyPDFLoader(file_path)
|
| elif file_path.endswith(".txt"):
|
| loader = TextLoader(file_path)
|
| elif file_path.endswith(".docx"):
|
| loader = Docx2txtLoader(file_path)
|
| elif file_path.endswith(".html"):
|
| loader = BSHTMLLoader(file_path)
|
| elif file_path.endswith(".json"):
|
| loader = JSONLoader(file_path)
|
| else:
|
| raise ValueError("Unsupported file format")
|
|
|
| pages = loader.load_and_split()
|
| docs = ""
|
| for item in pages:
|
| docs += item.page_content
|
| pages = chunk_str(docs)
|
|
|
| return pages
|
|
|
| def process_single_quotes(text):
|
| result = re.sub(r"(?<!\w)'|'(?!\w)", '"', text)
|
| return result
|
|
|
| def remove_empty_values(data):
|
| def is_empty(value):
|
| return value is None or value == [] or value == "" or value == {}
|
| if isinstance(data, dict):
|
| return {
|
| k: remove_empty_values(v)
|
| for k, v in data.items()
|
| if not is_empty(v)
|
| }
|
| elif isinstance(data, list):
|
| return [
|
| remove_empty_values(item)
|
| for item in data
|
| if not is_empty(item)
|
| ]
|
| else:
|
| return data
|
|
|
| def extract_json_dict(text):
|
| if isinstance(text, dict):
|
| return text
|
| pattern = r'\{(?:[^{}]|(?:\{(?:[^{}]|(?:\{[^{}]*\})*)*\})*)*\}'
|
| matches = re.findall(pattern, text)
|
| if matches:
|
| json_string = matches[-1]
|
| json_string = process_single_quotes(json_string)
|
| try:
|
| json_dict = json.loads(json_string)
|
| json_dict = remove_empty_values(json_dict)
|
| if json_dict is None:
|
| return "No valid information found."
|
| return json_dict
|
| except json.JSONDecodeError:
|
| return json_string
|
| else:
|
| return text
|
|
|
| def good_case_wrapper(example: str):
|
| if example is None or example == "":
|
| return ""
|
| example = f"\nHere are some examples:\n{example}\n(END OF EXAMPLES)\nRefer to the reasoning steps and analysis in the examples to help complete the extraction task below.\n\n"
|
| return example
|
|
|
| def bad_case_wrapper(example: str):
|
| if example is None or example == "":
|
| return ""
|
| example = f"\nHere are some examples of bad cases:\n{example}\n(END OF EXAMPLES)\nRefer to the reflection rules and reflection steps in the examples to help optimize the original result below.\n\n"
|
| return example
|
|
|
| def example_wrapper(example: str):
|
| if example is None or example == "":
|
| return ""
|
| example = f"\nHere are some examples:\n{example}\n(END OF EXAMPLES)\n\n"
|
| return example
|
|
|
| def remove_redundant_space(s):
|
| s = ' '.join(s.split())
|
| s = re.sub(r"\s*(,|:|\(|\)|\.|_|;|'|-)\s*", r'\1', s)
|
| return s
|
|
|
| def format_string(s):
|
| s = remove_redundant_space(s)
|
| s = s.lower()
|
| s = s.replace('{','').replace('}','')
|
| s = re.sub(',+', ',', s)
|
| s = re.sub('\.+', '.', s)
|
| s = re.sub(';+', ';', s)
|
| s = s.replace('’', "'")
|
| return s
|
|
|
| def calculate_metrics(y_truth: set, y_pred: set):
|
| TP = len(y_truth & y_pred)
|
| FN = len(y_truth - y_pred)
|
| FP = len(y_pred - y_truth)
|
| precision = TP / (TP + FP) if (TP + FP) > 0 else 0
|
| recall = TP / (TP + FN) if (TP + FN) > 0 else 0
|
| f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
|
| return precision, recall, f1_score
|
|
|
| def current_function_name():
|
| try:
|
| stack = inspect.stack()
|
| if len(stack) > 1:
|
| outer_func_name = stack[1].function
|
| return outer_func_name
|
| else:
|
| print("No caller function found")
|
| return None
|
|
|
| except Exception as e:
|
| print(f"An error occurred: {e}")
|
| pass
|
|
|
| def normalize_obj(value):
|
| if isinstance(value, dict):
|
| return frozenset((k, normalize_obj(v)) for k, v in value.items())
|
| elif isinstance(value, (list, set, tuple)):
|
| return tuple(Counter(map(normalize_obj, value)).items())
|
| elif isinstance(value, str):
|
| return format_string(value)
|
| return value
|
|
|
| def dict_list_to_set(data_list):
|
| result_set = set()
|
| try:
|
| for dictionary in data_list:
|
| value_tuple = tuple(format_string(value) for value in dictionary.values())
|
| result_set.add(value_tuple)
|
| return result_set
|
| except Exception as e:
|
| print (f"Failed to convert dictionary list to set: {data_list}")
|
| return result_set
|
|
|