#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 조국혁신당 크롤러 - 고성능 비동기 버전 + 허깅페이스 자동 업로드 - 기존 sync(requests) 방식을 async(aiohttp) 로 전환 - 증분 업데이트, 허깅페이스 자동 업로드 """ import os import json import re import asyncio from datetime import datetime, timedelta from typing import List, Dict, Optional import pandas as pd from tqdm.asyncio import tqdm as async_tqdm import aiohttp from bs4 import BeautifulSoup from dotenv import load_dotenv from huggingface_hub import HfApi, login from datasets import Dataset, load_dataset load_dotenv() class RebuildingAsyncCrawler: def __init__(self, config_path="crawler_config.json"): self.base_url = "https://rebuildingkoreaparty.kr" self.party_name = "조국혁신당" self.config_path = config_path self.state_path = "crawler_state.json" self.load_config() self.hf_token = os.getenv("HF_TOKEN") self.hf_repo_id = os.getenv("HF_REPO_ID_REBUILDING", "rebuilding-press-releases") self.semaphore = asyncio.Semaphore(10) def load_config(self): default_config = { "boards": { "기자회견문": "news/press-conference", "논평브리핑": "news/commentary-briefing", "보도자료": "news/press-release" }, "start_date": "2024-03-04", "max_pages": 10000, "concurrent_requests": 10, "request_delay": 0.5, "output_path": "./data" } if os.path.exists(self.config_path): with open(self.config_path, 'r', encoding='utf-8') as f: config = json.load(f) self.config = config.get('rebuilding', default_config) else: self.config = default_config self.boards = self.config["boards"] self.start_date = self.config["start_date"] self.max_pages = self.config["max_pages"] self.output_path = self.config["output_path"] def load_state(self) -> Dict: if os.path.exists(self.state_path): with open(self.state_path, 'r', encoding='utf-8') as f: state = json.load(f) return state.get('rebuilding', {}) return {} def save_state(self, state: Dict): all_state = {} if os.path.exists(self.state_path): with open(self.state_path, 'r', encoding='utf-8') as f: all_state = json.load(f) all_state['rebuilding'] = state with open(self.state_path, 'w', encoding='utf-8') as f: json.dump(all_state, f, ensure_ascii=False, indent=2) @staticmethod def parse_date(date_str: str) -> Optional[datetime]: try: return datetime.strptime(date_str.strip(), '%Y-%m-%d') except: return None @staticmethod def clean_text(text: str) -> str: text = text.replace('\xa0', '').replace('\u200b', '').replace('​', '') return text.strip() async def fetch_with_retry(self, session: aiohttp.ClientSession, url: str, max_retries: int = 3) -> Optional[str]: async with self.semaphore: for attempt in range(max_retries): try: await asyncio.sleep(self.config.get("request_delay", 0.5)) async with session.get(url, timeout=aiohttp.ClientTimeout(total=15)) as response: if response.status == 200: return await response.text() except Exception: if attempt < max_retries - 1: await asyncio.sleep(1) else: return None return None async def fetch_list_page(self, session: aiohttp.ClientSession, board_name: str, board_path: str, page_num: int, start_date: datetime, end_date: datetime) -> tuple: if page_num == 1: url = f"{self.base_url}/{board_path}" else: url = f"{self.base_url}/{board_path}?page={page_num}" html = await self.fetch_with_retry(session, url) if not html: return [], False soup = BeautifulSoup(html, 'html.parser') # 패턴으로 게시글 링크 탐색 article_links = soup.find_all('a', href=re.compile(f'^/news/{re.escape(board_path)}/')) if not article_links: return [], True data = [] stop_flag = False seen_urls = set() for link in article_links: try: article_url = link.get('href', '') if article_url.startswith('/'): article_url = self.base_url + article_url if article_url in seen_urls: continue seen_urls.add(article_url) title = link.get_text(strip=True).replace('\n', ' ') # 같은