agentV2 / preprocess.py
drewli20200316's picture
AgentV2: Medical AI Agent with multi-route RAG
e45316c
import os
import glob
import logging
import pandas as pd
from tqdm import tqdm
from typing import List, Dict, Optional
from pathlib import Path
import pdfplumber
# 工业级PDF批量处理器, 生产一线级别的代码
class PDFBatchProcessor:
def __init__(self, output_dir: str = "./output"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
# 配置日志系统
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(self.output_dir / "pdf_processing.log"),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
# 查找指定路径下的所有PDF文件
def find_pdf_files(self, input_path: str) -> List[Path]:
path = Path(input_path)
if path.is_file() and path.suffix.lower() == '.pdf':
return [path]
elif path.is_dir():
# 递归查找所有PDF文件
pdf_files = list(path.glob("**/*.pdf"))
self.logger.info(f"在 {input_path} 中找到 {len(pdf_files)} 个PDF文件")
return pdf_files
else:
raise ValueError(f"路径不存在,或不是PDF文件: {input_path}")
# 提取单个PDF文件的内容
def extract_pdf_content(self,
pdf_path: Path,
extract_text: bool = True,
extract_tables: bool = True,
table_settings: Optional[dict] = None) -> Dict:
"""
Args:
pdf_path: PDF文件路径
extract_text: 是否提取文本
extract_tables: 是否提取表格
table_settings: 表格提取配置
"""
result = {
"file_name": pdf_path.name,
"file_path": str(pdf_path),
"metadata": {},
"pages": [],
"error": None
}
try:
with pdfplumber.open(pdf_path) as pdf:
# 提取元数据
result["metadata"] = pdf.metadata
for page_num, page in enumerate(pdf.pages, 1):
page_result = {"page_number": page_num, "text": "", "tables": []}
# 提取文本
if extract_text:
try:
# 布局模式根据需求调整
text = page.extract_text(layout=False)
page_result["text"] = text if text else ""
except Exception as e:
self.logger.warning(f"页面 {page_num} 文本提取失败: {str(e)}")
pass
# 提取表格
if extract_tables:
try:
tables = page.extract_tables(table_settings or {})
if tables:
page_result["tables"] = tables
except Exception as e:
self.logger.warning(f"页面 {page_num} 表格提取失败: {str(e)}")
pass
# 添加当前页面page的提取结果
result["pages"].append(page_result)
# 单一PDF文档提取完毕后, 写日志处理
self.logger.info(f"成功处理: {pdf_path.name} - {len(pdf.pages)} 页")
# 单一PDF文档提取失败后, 写日志处理
except Exception as e:
# 明确记录一下哪篇PDF文档处理失败, 并记录失败原因, 便于后续回溯与 "bad case分析"
error_msg = f"处理文件失败 {pdf_path}: {str(e)}"
result["error"] = error_msg
self.logger.error(error_msg)
return result
# 批量处理PDF文件
def process_batch(self, pdf_files: List[Path],
save_format: str = "excel",
**extract_kwargs) -> pd.DataFrame:
"""
Args:
pdf_files: PDF文件列表
save_format: 保存格式 (excel, csv, parquet)
**extract_kwargs: 提取参数
"""
all_results = []
for i, pdf_file in tqdm(enumerate(pdf_files, 1)):
self.logger.info(f"处理进度: {i}/{len(pdf_files)} - {pdf_file.name}")
result = self.extract_pdf_content(pdf_file, **extract_kwargs)
all_results.append(result)
# 实时保存进度 (针对大批量处理)
if i % 10 == 0:
self._save_intermediate_results(all_results, f"batch_{i}")
# 保存最终结果
return self._save_results(all_results, save_format)
# 保存处理结果
def _save_results(self, results: List[Dict], format: str) -> pd.DataFrame:
# 扁平化结果, 以便保存
flat_data = []
for result in results:
if result["error"]:
flat_data.append(
{
"file_name": result["file_name"],
"status": "Error",
"error_message": result["error"],
"page_count": 0,
"text_length": 0,
"table_count": 0
}
)
continue
total_text = ""
total_tables = 0
for page in result["pages"]:
total_text += page["text"]
total_tables += len(page["tables"])
flat_data.append({
"file_name": result["file_name"],
"status": "Success",
"error_message": "",
"page_count": len(result["pages"]),
"text_length": len(total_text),
"table_count": total_tables,
"author": result["metadata"].get("Author", ""),
"creation_date": result["metadata"].get("CreationDate", "")
})
# for循环处理完毕后, 所有数据封装成 Pandas 的 DataFrame 格式
df = pd.DataFrame(flat_data)
# 根据格式保存
if format.lower() == "excel":
df.to_excel(self.output_dir / "pdf_extraction_summary.xlsx", index=False)
# 同时保存详细文本内容
detailed_results = []
for result in results:
if not result["error"]:
for page in result["pages"]:
if page["text"]:
detailed_results.append({
"file_name": result["file_name"],
"page_number": page["page_number"],
"text_content": page["text"]
})
if detailed_results:
pd.DataFrame(detailed_results).to_excel(
self.output_dir / "pdf_detailed_text.xlsx", index=False
)
elif format.lower() == "csv":
df.to_csv(self.output_dir / "pdf_extraction_summary.csv", index=False)
self.logger.info(f"结果已保存到 {self.output_dir}")
return df
# 保存中间结果 (工业界一线生产环境, 异常因素很多, 防止处理中断丢失数据)
def _save_intermediate_results(self, results: List[Dict], batch_name: str):
try:
temp_df = pd.DataFrame([{
"file_name": r["file_name"],
"status": "Error" if r["error"] else "Success",
"pages_processed": len(r["pages"])
} for r in results])
temp_df.to_csv(self.output_dir / f"progress_{batch_name}.csv", index=False)
except Exception as e:
self.logger.warning(f"保存中间结果失败: {str(e)}")
# 高级表格提取配置
ADVANCED_TABLE_SETTINGS = {
"vertical_strategy": "lines",
"horizontal_strategy": "lines",
"snap_tolerance": 4,
"join_tolerance": 10,
"edge_min_length": 3,
"min_words_vertical": 2,
"min_words_horizontal": 1
}
def main():
# 实例化PDF处理器对象
processor = PDFBatchProcessor(output_dir="./pdf_output")
try:
# 查找PDF文件
pdf_files = processor.find_pdf_files("./pdf_documents")
if not pdf_files:
processor.logger.warning("未找到PDF文件")
return
# 批量处理
results_df = processor.process_batch(
pdf_files,
save_format="excel",
extract_text=True,
extract_tables=True,
table_settings=ADVANCED_TABLE_SETTINGS
)
# 打印摘要统计
success_count = len(results_df[results_df["status"] == "Success"])
processor.logger.info(f"处理完成: {success_count}/{len(pdf_files)} 个文件成功")
if success_count > 0:
avg_text_length = results_df[results_df["status"] == "Success"]["text_length"].mean()
avg_tables = results_df[results_df["status"] == "Success"]["table_count"].mean()
processor.logger.info(f"平均每文件: {avg_text_length:.0f} 字符, {avg_tables:.1f} 个表格")
# 处理过程中发生错误, 记录日志
except Exception as e:
processor.logger.error(f"处理过程发生错误: {str(e)}")
if __name__ == "__main__":
main()