| | import json |
| | import zipfile |
| | import pandas as pd |
| | from huggingface_hub import hf_hub_download, list_repo_files |
| | from llama_index.core import Document |
| | from llama_index.core.text_splitter import SentenceSplitter |
| | from logger.my_logging import log_message |
| | from config import CHUNK_SIZE, CHUNK_OVERLAP, MAX_CHARS_TABLE, MAX_ROWS_TABLE |
| | import re |
| |
|
| | def normalize_text(text): |
| | if not text: |
| | return text |
| | |
| | |
| | |
| | text = text.replace('С-', 'C') |
| | text = re.sub(r'\bС(\d)', r'С\1', text) |
| | return text |
| |
|
| | def normalize_steel_designations(text): |
| | if not text: |
| | return text, 0, [] |
| |
|
| | import re |
| | |
| | changes_count = 0 |
| | changes_list = [] |
| |
|
| | |
| | replacements = { |
| | 'Х': 'X', |
| | 'Н': 'H', |
| | 'Т': 'T', |
| | 'С': 'C', |
| | 'В': 'B', |
| | 'К': 'K', |
| | 'М': 'M', |
| | 'А': 'A', |
| | 'Р': 'P', |
| | } |
| |
|
| | |
| | |
| | pattern = r'\b\d{1,3}(?:[A-ZА-ЯЁ]\d*)+\b' |
| | |
| | |
| | pattern_wire = r'\b[СC][ВB]-\d{1,3}(?:[A-ZА-ЯЁ]\d*)+\b' |
| |
|
| | def replace_in_steel_grade(match): |
| | nonlocal changes_count, changes_list |
| | original = match.group(0) |
| | converted = ''.join(replacements.get(ch, ch) for ch in original) |
| | if converted != original: |
| | changes_count += 1 |
| | changes_list.append(f"{original} → {converted}") |
| | return converted |
| | normalized_text = re.sub(pattern, replace_in_steel_grade, text) |
| | normalized_text = re.sub(pattern_wire, replace_in_steel_grade, normalized_text) |
| |
|
| | return normalized_text, changes_count, changes_list |
| |
|
| | def extract_preamble(text): |
| | """ |
| | Извлекает контекст (первое предложение или преамбулу до двоеточия) |
| | для вставки в продолжение чанков. |
| | """ |
| | if not text: |
| | return "" |
| | |
| | |
| | colon_match = re.match(r'^.*?:', text, re.DOTALL) |
| | if colon_match: |
| | preamble = colon_match.group(0) |
| | if len(preamble) < 300: |
| | return preamble.strip() |
| |
|
| | |
| | sentence_match = re.match(r'^.*?(?:\.|\?|!)(?:\s|$)', text, re.DOTALL) |
| | if sentence_match: |
| | sentence = sentence_match.group(0) |
| | if len(sentence) < 300: |
| | return sentence.strip() |
| | |
| | |
| | return text[:300] + "..." |
| |
|
| | def chunk_text_documents(documents): |
| | text_splitter = SentenceSplitter( |
| | chunk_size=CHUNK_SIZE, |
| | chunk_overlap=CHUNK_OVERLAP |
| | ) |
| | total_normalizations = 0 |
| | chunks_with_changes = 0 |
| | |
| | chunked = [] |
| | for doc in documents: |
| | parent_context = extract_preamble(doc.text) |
| |
|
| | chunks = text_splitter.get_nodes_from_documents([doc]) |
| |
|
| | for i, chunk in enumerate(chunks): |
| | |
| | if i > 0 and parent_context: |
| | if not chunk.text.strip().startswith(parent_context[:20]): |
| | original_len = len(chunk.text) |
| | chunk.text = f"[Текст из начала п. {parent_context}] {chunk.text}" |
| |
|
| | chunk.text, changes, change_list = normalize_steel_designations(chunk.text) |
| | |
| | if changes > 0: |
| | chunks_with_changes += 1 |
| | total_normalizations += changes |
| | |
| | chunk.metadata.update({ |
| | 'chunk_id': i, |
| | 'total_chunks': len(chunks), |
| | 'chunk_size': len(chunk.text) |
| | }) |
| | chunked.append(chunk) |
| | |
| | |
| | if chunked: |
| | avg_size = sum(len(c.text) for c in chunked) / len(chunked) |
| | min_size = min(len(c.text) for c in chunked) |
| | max_size = max(len(c.text) for c in chunked) |
| | log_message(f"✓ Text: {len(documents)} docs → {len(chunked)} chunks") |
| | log_message(f" Size stats: avg={avg_size:.0f}, min={min_size}, max={max_size} chars") |
| | log_message(f" Steel designation normalization:") |
| | log_message(f" - Chunks with changes: {chunks_with_changes}/{len(chunked)}") |
| | log_message(f" - Total steel grades normalized: {total_normalizations}") |
| | log_message(f" - Avg per affected chunk: {total_normalizations/chunks_with_changes:.1f}" if chunks_with_changes > 0 else " - No normalizations needed") |
| | |
| | log_message("="*60) |
| | |
| | return chunked |
| |
|
| | def chunk_table_by_content(table_data, doc_id, max_chars=MAX_CHARS_TABLE, max_rows=MAX_ROWS_TABLE): |
| | headers = table_data.get('headers', []) |
| | rows = table_data.get('data', []) |
| | table_num = table_data.get('table_number', 'unknown') |
| | table_title = table_data.get('table_title', '') |
| | section = table_data.get('section', '') |
| | sheet_name = table_data.get('sheet_name', '') |
| | |
| | |
| | table_title, _, _ = normalize_steel_designations(str(table_title)) |
| | section, _, _ = normalize_steel_designations(section) |
| | table_num_clean = str(table_num).strip() |
| | |
| | |
| | import re |
| | if table_num_clean in ['-', '', 'unknown', 'nan']: |
| | if 'приложени' in sheet_name.lower() or 'приложени' in section.lower(): |
| | appendix_match = re.search(r'приложени[еия]\s*[№]?\s*(\d+)', (sheet_name + ' ' + section).lower()) |
| | table_identifier = f"Приложение {appendix_match.group(1)}" if appendix_match else "Приложение" |
| | else: |
| | if table_title: |
| | table_identifier = ' '.join(table_title.split()[:5]) |
| | else: |
| | table_identifier = section.split(',')[0] if section else "БезНомера" |
| | else: |
| | if 'приложени' in section.lower(): |
| | appendix_match = re.search(r'приложени[еия]\s*[№]?\s*(\d+)', section.lower()) |
| | table_identifier = f"{table_num_clean} Приложение {appendix_match.group(1)}" if appendix_match else table_num_clean |
| | else: |
| | table_identifier = table_num_clean |
| | |
| | if not rows: |
| | return [] |
| | |
| | |
| | normalized_rows = [] |
| | for row in rows: |
| | if isinstance(row, dict): |
| | normalized_row = {} |
| | for k, v in row.items(): |
| | normalized_val, _, _ = normalize_steel_designations(str(v)) |
| | normalized_row[k] = normalized_val |
| | normalized_rows.append(normalized_row) |
| | else: |
| | normalized_rows.append(row) |
| | |
| | |
| | intro_content = format_table_header(table_title) |
| | |
| | |
| | context_content = format_table_footer(section, doc_id, table_identifier) |
| | |
| | |
| | static_size = len(intro_content) + len(context_content) |
| | available_space = max_chars - static_size - 50 |
| | |
| | |
| | full_rows_content = format_table_rows([{**row, '_idx': i+1} for i, row in enumerate(normalized_rows)]) |
| | |
| | if static_size + len(full_rows_content) <= max_chars and len(normalized_rows) <= max_rows: |
| | |
| | content = intro_content + full_rows_content + "\n" + context_content |
| | |
| | metadata = { |
| | 'type': 'table', |
| | 'document_id': doc_id, |
| | 'table_number': table_num_clean if table_num_clean not in ['-', 'unknown'] else table_identifier, |
| | 'table_identifier': table_identifier, |
| | 'table_title': table_title, |
| | 'section': section, |
| | 'sheet_name': sheet_name, |
| | 'total_rows': len(normalized_rows), |
| | 'chunk_size': len(content), |
| | 'is_complete_table': True |
| | } |
| | |
| | return [Document(text=content, metadata=metadata)] |
| |
|
| | |
| | chunks = [] |
| | current_rows = [] |
| | current_size = 0 |
| | chunk_num = 0 |
| | |
| | for i, row in enumerate(normalized_rows): |
| | row_text = format_single_row(row, i + 1) |
| | row_size = len(row_text) |
| | |
| | should_split = (current_size + row_size > available_space or |
| | len(current_rows) >= max_rows) and current_rows |
| | |
| | if should_split: |
| | rows_content = format_table_rows(current_rows) |
| | |
| | content = f"{intro_content}{rows_content}{'='*5}\nСтроки: {current_rows[0]['_idx']}-{current_rows[-1]['_idx']}\n{context_content}" |
| | |
| | metadata = { |
| | 'type': 'table', |
| | 'document_id': doc_id, |
| | 'table_identifier': table_identifier, |
| | 'table_title': table_title, |
| | 'section': section, |
| | 'chunk_id': chunk_num, |
| | 'row_start': current_rows[0]['_idx'] - 1, |
| | 'row_end': current_rows[-1]['_idx'], |
| | 'total_rows': len(normalized_rows), |
| | 'chunk_size': len(content), |
| | 'is_complete_table': False |
| | } |
| | |
| | chunks.append(Document(text=content, metadata=metadata)) |
| | |
| | chunk_num += 1 |
| | current_rows = [] |
| | current_size = 0 |
| | |
| | row_copy = row.copy() if isinstance(row, dict) else {'data': row} |
| | row_copy['_idx'] = i + 1 |
| | current_rows.append(row_copy) |
| | current_size += row_size |
| |
|
| | if current_rows: |
| | rows_content = format_table_rows(current_rows) |
| | content = f"{intro_content}{rows_content}{'='*5}\nСтроки: {current_rows[0]['_idx']}-{current_rows[-1]['_idx']}\n{context_content}" |
| | |
| | metadata = { |
| | 'type': 'table', |
| | 'document_id': doc_id, |
| | 'table_identifier': table_identifier, |
| | 'table_title': table_title, |
| | 'section': section, |
| | 'chunk_id': chunk_num, |
| | 'row_start': current_rows[0]['_idx'] - 1, |
| | 'row_end': current_rows[-1]['_idx'], |
| | 'total_rows': len(normalized_rows), |
| | 'chunk_size': len(content), |
| | 'is_complete_table': False |
| | } |
| | |
| | chunks.append(Document(text=content, metadata=metadata)) |
| | |
| | return chunks |
| |
|
| | def format_table_header(table_title): |
| | content = "" |
| |
|
| | if table_title: |
| | content += f"ТАБЛИЦА {normalize_text(table_title)}\n" |
| | |
| | content += "ДАННЫЕ:\n" |
| |
|
| | return content |
| |
|
| | def format_single_row(row, idx): |
| | if isinstance(row, dict): |
| | parts = [f"{k}: {v}" for k, v in row.items() |
| | if v and str(v).strip() and str(v).lower() not in ['nan', 'none', '']] |
| | if parts: |
| | return f"{idx}. {' | '.join(parts)}\n" |
| | elif isinstance(row, list): |
| | parts = [str(v) for v in row if v and str(v).strip() and str(v).lower() not in ['nan', 'none', '']] |
| | if parts: |
| | return f"{idx}. {' | '.join(parts)}\n" |
| | return "" |
| |
|
| | def format_table_rows(rows): |
| | content = "" |
| | for row in rows: |
| | idx = row.get('_idx', 0) |
| | content += format_single_row(row, idx) |
| | return content |
| |
|
| | def format_table_footer(table_identifier, doc_id, section): |
| | content = "" |
| |
|
| | if table_identifier: |
| | content += f"НОМЕР ТАБЛИЦЫ: {normalize_text(table_identifier)}\n" |
| |
|
| | if section: |
| | content += f"РАЗДЕЛ: {normalize_text(section)}\n" |
| |
|
| | if doc_id: |
| | content += f"ДОКУМЕНТ: {doc_id}\n" |
| |
|
| | return content |
| |
|
| | def load_json_documents(repo_id, hf_token, json_dir): |
| | import zipfile |
| | import tempfile |
| | import os |
| | |
| | log_message("Loading JSON documents...") |
| | |
| | files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) |
| | json_files = [f for f in files if f.startswith(json_dir) and f.endswith('.json')] |
| | zip_files = [f for f in files if f.startswith(json_dir) and f.endswith('.zip')] |
| | |
| | log_message(f"Found {len(json_files)} JSON files and {len(zip_files)} ZIP files") |
| | |
| | documents = [] |
| | stats = {'success': 0, 'failed': 0, 'empty': 0} |
| | |
| | for file_path in json_files: |
| | try: |
| | log_message(f" Loading: {file_path}") |
| | local_path = hf_hub_download( |
| | repo_id=repo_id, |
| | filename=file_path, |
| | repo_type="dataset", |
| | token=hf_token |
| | ) |
| | |
| | docs = extract_sections_from_json(local_path) |
| | if docs: |
| | documents.extend(docs) |
| | stats['success'] += 1 |
| | log_message(f" ✓ Extracted {len(docs)} sections") |
| | else: |
| | stats['empty'] += 1 |
| | log_message(f" ⚠ No sections found") |
| | |
| | except Exception as e: |
| | stats['failed'] += 1 |
| | log_message(f" ✗ Error: {e}") |
| | |
| | for zip_path in zip_files: |
| | try: |
| | log_message(f" Processing ZIP: {zip_path}") |
| | local_zip = hf_hub_download( |
| | repo_id=repo_id, |
| | filename=zip_path, |
| | repo_type="dataset", |
| | token=hf_token |
| | ) |
| | |
| | with zipfile.ZipFile(local_zip, 'r') as zf: |
| | json_files_in_zip = [f for f in zf.namelist() |
| | if f.endswith('.json') |
| | and not f.startswith('__MACOSX') |
| | and not f.startswith('.') |
| | and not '._' in f] |
| | |
| | log_message(f" Found {len(json_files_in_zip)} JSON files in ZIP") |
| | |
| | for json_file in json_files_in_zip: |
| | try: |
| | file_content = zf.read(json_file) |
| | |
| | |
| | if len(file_content) < 10: |
| | log_message(f" ✗ Skipping: {json_file} (file too small)") |
| | stats['failed'] += 1 |
| | continue |
| | |
| | try: |
| | text_content = file_content.decode('utf-8') |
| | except UnicodeDecodeError: |
| | try: |
| | text_content = file_content.decode('utf-8-sig') |
| | except UnicodeDecodeError: |
| | try: |
| | text_content = file_content.decode('utf-16') |
| | except UnicodeDecodeError: |
| | try: |
| | text_content = file_content.decode('windows-1251') |
| | except UnicodeDecodeError: |
| | log_message(f" ✗ Skipping: {json_file} (encoding failed)") |
| | stats['failed'] += 1 |
| | continue |
| | |
| | |
| | if not text_content.strip().startswith('{') and not text_content.strip().startswith('['): |
| | log_message(f" ✗ Skipping: {json_file} (not valid JSON)") |
| | stats['failed'] += 1 |
| | continue |
| | |
| | with tempfile.NamedTemporaryFile(mode='w', delete=False, |
| | suffix='.json', encoding='utf-8') as tmp: |
| | tmp.write(text_content) |
| | tmp_path = tmp.name |
| | |
| | docs = extract_sections_from_json(tmp_path) |
| | if docs: |
| | documents.extend(docs) |
| | stats['success'] += 1 |
| | log_message(f" ✓ {json_file}: {len(docs)} sections") |
| | else: |
| | stats['empty'] += 1 |
| | log_message(f" ⚠ {json_file}: No sections") |
| | |
| | os.unlink(tmp_path) |
| | |
| | except json.JSONDecodeError as e: |
| | stats['failed'] += 1 |
| | log_message(f" ✗ {json_file}: Invalid JSON") |
| | except Exception as e: |
| | stats['failed'] += 1 |
| | log_message(f" ✗ {json_file}: {str(e)[:100]}") |
| | |
| | except Exception as e: |
| | log_message(f" ✗ Error with ZIP: {e}") |
| | |
| | log_message(f"="*60) |
| | log_message(f"JSON Loading Stats:") |
| | log_message(f" Success: {stats['success']}") |
| | log_message(f" Empty: {stats['empty']}") |
| | log_message(f" Failed: {stats['failed']}") |
| | log_message(f"="*60) |
| | |
| | return documents |
| |
|
| | def extract_sections_from_json(json_path): |
| | documents = [] |
| | |
| | try: |
| | with open(json_path, 'r', encoding='utf-8') as f: |
| | data = json.load(f) |
| | |
| | doc_id = data.get('document_metadata', {}).get('document_id', 'unknown') |
| | |
| | |
| | for section in data.get('sections', []): |
| | if section.get('section_text', '').strip(): |
| | documents.append(Document( |
| | text=section['section_text'], |
| | metadata={ |
| | 'type': 'text', |
| | 'document_id': doc_id, |
| | 'section_id': section.get('section_id', '') |
| | } |
| | )) |
| | |
| | |
| | for subsection in section.get('subsections', []): |
| | if subsection.get('subsection_text', '').strip(): |
| | documents.append(Document( |
| | text=subsection['subsection_text'], |
| | metadata={ |
| | 'type': 'text', |
| | 'document_id': doc_id, |
| | 'section_id': subsection.get('subsection_id', '') |
| | } |
| | )) |
| | |
| | |
| | for sub_sub in subsection.get('sub_subsections', []): |
| | if sub_sub.get('sub_subsection_text', '').strip(): |
| | documents.append(Document( |
| | text=sub_sub['sub_subsection_text'], |
| | metadata={ |
| | 'type': 'text', |
| | 'document_id': doc_id, |
| | 'section_id': sub_sub.get('sub_subsection_id', '') |
| | } |
| | )) |
| | |
| | except Exception as e: |
| | log_message(f"Error extracting from {json_path}: {e}") |
| | |
| | return documents |
| |
|
| | def load_table_documents(repo_id, hf_token, table_dir): |
| | log_message("Loading tables...") |
| | log_message("="*60) |
| | files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) |
| | table_files = [f for f in files if f.startswith(table_dir) and (f.endswith('.json') or f.endswith('.xlsx') or f.endswith('.xls'))] |
| | |
| | all_chunks = [] |
| | tables_processed = 0 |
| | |
| | for file_path in table_files: |
| | try: |
| | local_path = hf_hub_download( |
| | repo_id=repo_id, |
| | filename=file_path, |
| | repo_type="dataset", |
| | token=hf_token |
| | ) |
| | |
| | |
| | if file_path.endswith(('.xlsx', '.xls')): |
| | from converters.converter import convert_single_excel_to_json |
| | import tempfile |
| | import os |
| | |
| | with tempfile.TemporaryDirectory() as temp_dir: |
| | json_path = convert_single_excel_to_json(local_path, temp_dir) |
| | local_path = json_path |
| | |
| | with open(local_path, 'r', encoding='utf-8') as f: |
| | data = json.load(f) |
| | |
| | file_doc_id = data.get('document_id', data.get('document', 'unknown')) |
| | |
| | for sheet in data.get('sheets', []): |
| | sheet_doc_id = sheet.get('document_id', sheet.get('document', file_doc_id)) |
| | tables_processed += 1 |
| | |
| | chunks = chunk_table_by_content(sheet, sheet_doc_id, |
| | max_chars=MAX_CHARS_TABLE, |
| | max_rows=MAX_ROWS_TABLE) |
| | all_chunks.extend(chunks) |
| | |
| | except Exception as e: |
| | log_message(f"Error loading {file_path}: {e}") |
| | |
| | log_message(f"✓ Loaded {len(all_chunks)} table chunks from {tables_processed} tables") |
| | log_message("="*60) |
| | |
| | return all_chunks |
| |
|
| |
|
| | def load_image_documents(repo_id, hf_token, image_dir): |
| | log_message("Loading images...") |
| | |
| | files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) |
| | csv_files = [f for f in files if f.startswith(image_dir) and (f.endswith('.csv') or f.endswith('.xlsx') or f.endswith('.xls'))] |
| | |
| | documents = [] |
| | for file_path in csv_files: |
| | try: |
| | local_path = hf_hub_download( |
| | repo_id=repo_id, |
| | filename=file_path, |
| | repo_type="dataset", |
| | token=hf_token |
| | ) |
| | |
| | |
| | if file_path.endswith(('.xlsx', '.xls')): |
| | from converters.converter import convert_single_excel_to_csv |
| | import tempfile |
| | import os |
| | |
| | with tempfile.TemporaryDirectory() as temp_dir: |
| | csv_path = convert_single_excel_to_csv(local_path, temp_dir) |
| | local_path = csv_path |
| | |
| | df = pd.read_csv(local_path) |
| | |
| | for _, row in df.iterrows(): |
| | content = f"Документ: {row.get('Обозначение документа', 'unknown')}\n" |
| | content += f"Рисунок: {row.get('№ Изображения', 'unknown')}\n" |
| | content += f"Название: {row.get('Название изображения', '')}\n" |
| | content += f"Описание: {row.get('Описание изображение', '')}\n" |
| | content += f"Раздел: {row.get('Раздел документа', '')}\n" |
| | |
| | chunk_size = len(content) |
| | |
| | documents.append(Document( |
| | text=content, |
| | metadata={ |
| | 'type': 'image', |
| | 'document_id': str(row.get('Обозначение документа', 'unknown')), |
| | 'image_number': str(row.get('№ Изображения', 'unknown')), |
| | 'section': str(row.get('Раздел документа', '')), |
| | 'chunk_size': chunk_size |
| | } |
| | )) |
| | except Exception as e: |
| | log_message(f"Error loading {file_path}: {e}") |
| | |
| | if documents: |
| | avg_size = sum(d.metadata['chunk_size'] for d in documents) / len(documents) |
| | log_message(f"✓ Loaded {len(documents)} images (avg size: {avg_size:.0f} chars)") |
| | |
| | return documents |
| |
|
| | def load_all_documents(repo_id, hf_token, json_dir, table_dir, image_dir): |
| | """Main loader - combines all document types""" |
| | log_message("="*60) |
| | log_message("STARTING DOCUMENT LOADING") |
| | log_message("="*60) |
| | |
| | |
| | text_docs = load_json_documents(repo_id, hf_token, json_dir) |
| | text_chunks = chunk_text_documents(text_docs) |
| | |
| | |
| | table_chunks = load_table_documents(repo_id, hf_token, table_dir) |
| | |
| | |
| | image_docs = load_image_documents(repo_id, hf_token, image_dir) |
| | |
| | all_docs = text_chunks + table_chunks + image_docs |
| | |
| | log_message("="*60) |
| | log_message(f"TOTAL DOCUMENTS: {len(all_docs)}") |
| | log_message(f" Text chunks: {len(text_chunks)}") |
| | log_message(f" Table chunks: {len(table_chunks)}") |
| | log_message(f" Images: {len(image_docs)}") |
| | log_message("="*60) |
| | |
| | return all_docs |