| |
| |
| !pip uninstall -y transformers tokenizers accelerate torch torchvision |
|
|
| |
| !pip install torch==2.8.0 torchvision==0.17.2 |
| !pip install transformers==4.57.6 tokenizers==0.22.2 accelerate==0.27.0 |
|
|
|
|
| |
| |
| |
| |
|
|
| import os |
| import sys |
| import json |
| import csv |
| import torch |
| from typing import List |
| from transformers import AutoTokenizer, AutoModel |
|
|
| |
| |
| |
|
|
| ERROR_TYPES = [ |
| "false_causality", |
| "unsupported_claim", |
| "overgeneralization", |
| "missing_premise", |
| "contradiction", |
| "circular_reasoning", |
| ] |
|
|
| ERROR_NAMES_RU = { |
| "false_causality": "Ложная причинно-следственная связь", |
| "unsupported_claim": "Неподкреплённое утверждение", |
| "overgeneralization": "Чрезмерное обобщение", |
| "missing_premise": "Отсутствующая предпосылка", |
| "contradiction": "Противоречие", |
| "circular_reasoning": "Круговое рассуждение", |
| } |
|
|
| ERROR_THRESHOLDS = { |
| "false_causality": 0.55, |
| "unsupported_claim": 0.55, |
| "overgeneralization": 0.60, |
| "missing_premise": 0.80, |
| "contradiction": 0.60, |
| "circular_reasoning": 0.60, |
| } |
|
|
| |
| |
| |
|
|
| class RQAJudge: |
| def __init__(self, model_name="skatzR/RQA-R1", device=None): |
| self.device = device or ("cuda" if torch.cuda.is_available() else "cpu") |
|
|
| self.tokenizer = AutoTokenizer.from_pretrained( |
| model_name, |
| trust_remote_code=True |
| ) |
| self.model = AutoModel.from_pretrained( |
| model_name, |
| trust_remote_code=True |
| ).to(self.device) |
|
|
| self.model.eval() |
|
|
| cfg = self.model.config |
| self.temp_issue = float(cfg.temperature_has_issue) |
| self.temp_errors = list(cfg.temperature_errors) |
|
|
| |
| |
| |
|
|
| @torch.no_grad() |
| def infer( |
| self, |
| text: str, |
| issue_threshold: float = 0.6, |
| disagreement_threshold: float = 0.4, |
| ): |
| inputs = self.tokenizer( |
| text, |
| truncation=True, |
| max_length=512, |
| padding="max_length", |
| return_tensors="pt" |
| ).to(self.device) |
|
|
| outputs = self.model(**inputs) |
|
|
| |
| issue_logit = outputs["has_issue_logits"] / self.temp_issue |
| issue_prob = torch.sigmoid(issue_logit).item() |
| has_issue = issue_prob >= issue_threshold |
|
|
| |
| raw_error_logits = outputs["errors_logits"][0] |
| error_probs = {} |
|
|
| for i, logit in enumerate(raw_error_logits): |
| calibrated = logit / self.temp_errors[i] |
| prob = torch.sigmoid(calibrated).item() |
| error_probs[ERROR_TYPES[i]] = prob |
|
|
| |
| p_any_error = 1.0 |
| for p in error_probs.values(): |
| p_any_error *= (1.0 - p) |
| p_any_error = 1.0 - p_any_error |
|
|
| disagreement = abs(issue_prob - p_any_error) |
|
|
| |
| explicit_errors = [] |
| hidden_problem = False |
|
|
| for err, prob in error_probs.items(): |
| if prob >= ERROR_THRESHOLDS[err]: |
| if err == "missing_premise": |
| hidden_problem = True |
| else: |
| explicit_errors.append((err, prob)) |
|
|
| explicit_errors.sort(key=lambda x: x[1], reverse=True) |
|
|
| |
| if not has_issue: |
| explicit_errors = [] |
|
|
| borderline = ( |
| not has_issue and hidden_problem and disagreement >= disagreement_threshold |
| ) |
|
|
| return { |
| "text": text, |
| "has_issue": has_issue, |
| "issue_probability": issue_prob, |
| "errors": explicit_errors, |
| "hidden_problem": hidden_problem, |
| "borderline": borderline, |
| "disagreement": disagreement, |
| } |
|
|
| |
| |
| |
|
|
| def pretty_print(self, r): |
| print("\n" + "=" * 72) |
| print("📄 Текст:") |
| print(r["text"]) |
|
|
| print(f"\n🔎 Обнаружена проблема: {'ДА' if r['has_issue'] else 'НЕТ'} " |
| f"({r['issue_probability']*100:.2f}%)") |
|
|
| if r["borderline"]: |
| print("⚠️ Пограничный случай: аргументативный текст") |
|
|
| if r["hidden_problem"]: |
| print("🟡 Скрытая проблема: возможны неявные предпосылки") |
|
|
| if r["errors"]: |
| print("\n❌ Явные логические ошибки:") |
| for name, prob in r["errors"]: |
| print(f" • {ERROR_NAMES_RU[name]} — {prob*100:.2f}%") |
| else: |
| print("\n✅ Явных логических ошибок не обнаружено") |
|
|
| print(f"\n📊 Disagreement: {r['disagreement']:.3f}") |
| print("=" * 72) |
|
|
| |
| |
| |
|
|
| def load_texts_from_file(path: str) -> List[str]: |
| ext = os.path.splitext(path)[1].lower() |
|
|
| if ext == ".txt": |
| with open(path, encoding="utf-8") as f: |
| return [l.strip() for l in f if l.strip()] |
|
|
| if ext == ".csv": |
| with open(path, encoding="utf-8") as f: |
| reader = csv.DictReader(f) |
| return [row["text"] for row in reader] |
|
|
| if ext == ".json": |
| with open(path, encoding="utf-8") as f: |
| data = json.load(f) |
| if isinstance(data, list): |
| return data |
|
|
| raise ValueError("Неподдерживаемый формат файла") |
|
|
| |
| |
| |
|
|
| class InteractiveCLI: |
| def __init__(self): |
| self.judge = RQAJudge() |
| self.mode_stack = [] |
| |
| def clear_screen(self): |
| """Очистка экрана для Google Colab""" |
| print("\n" * 2) |
| |
| def show_mode_menu(self): |
| """Показать меню выбора режима""" |
| self.clear_screen() |
| print("=" * 60) |
| print("🤖 RQA — АНАЛИЗ ЛОГИЧЕСКИХ ОШИБОК") |
| print("=" * 60) |
| print("\nВыберите режим работы:") |
| print("1. 📝 Одиночный ввод (одна фраза для анализа)") |
| print("2. 📄 Множественный ввод (несколько фраз, каждая с новой строки)") |
| print("3. 📂 Загрузка из файла (.txt, .csv, .json)") |
| print("\nНажмите Enter без ввода для выхода.") |
| print("-" * 60) |
| |
| def process_single_mode(self): |
| """Обработка одиночного режима""" |
| self.clear_screen() |
| print("[📝 РЕЖИМ: ОДИНОЧНЫЙ ВВОД]") |
| print("Введите текст для анализа:") |
| print("(Нажмите Enter без ввода для возврата в меню)") |
| print("-" * 40) |
| |
| text = input("> ").strip() |
| if not text: |
| return True |
| |
| result = self.judge.infer(text) |
| self.judge.pretty_print(result) |
| |
| print("\n" + "-" * 40) |
| input("Нажмите Enter для продолжения...") |
| return False |
| |
| def process_multiline_mode(self): |
| """Обработка режима множественного ввода""" |
| self.clear_screen() |
| print("[📄 РЕЖИМ: МНОЖЕСТВЕННЫЙ ВВОД]") |
| print("Введите тексты для анализа (каждый с новой строки).") |
| print("Оставьте строку пустой для завершения ввода.") |
| print("(Нажмите Enter без ввода для возврата в меню)") |
| print("-" * 40) |
| |
| texts = [] |
| print("Ввод текстов:") |
| while True: |
| line = input("> ").strip() |
| if not line: |
| if not texts: |
| return True |
| break |
| texts.append(line) |
| |
| if texts: |
| self.clear_screen() |
| print(f"[📄 РЕЖИМ: МНОЖЕСТВЕННЫЙ ВВОД] — найдено {len(texts)} текстов") |
| print("-" * 40) |
| |
| for i, text in enumerate(texts, 1): |
| print(f"\n🔍 Текст #{i}:") |
| result = self.judge.infer(text) |
| self.judge.pretty_print(result) |
| |
| print("\n" + "=" * 60) |
| input("Нажмите Enter для продолжения...") |
| |
| return False |
| |
| def process_file_mode(self): |
| """Обработка режима загрузки из файла""" |
| self.clear_screen() |
| print("[📂 РЕЖИМ: ЗАГРУЗКА ИЗ ФАЙЛА]") |
| print("Поддерживаемые форматы: .txt, .csv, .json") |
| print("Укажите путь к файлу:") |
| print("(Нажмите Enter без ввода для возврата в меню)") |
| print("-" * 40) |
| |
| file_path = input("Путь к файлу> ").strip() |
| if not file_path: |
| return True |
| |
| try: |
| |
| if not os.path.exists(file_path): |
| print(f"\n❌ Ошибка: Файл '{file_path}' не найден!") |
| input("\nНажмите Enter для продолжения...") |
| return False |
| |
| |
| texts = load_texts_from_file(file_path) |
| |
| if not texts: |
| print(f"\n⚠️ Файл '{file_path}' пуст или не содержит текстов!") |
| input("\nНажмите Enter для продолжения...") |
| return False |
| |
| |
| self.clear_screen() |
| print(f"[📂 РЕЖИМ: ЗАГРУЗКА ИЗ ФАЙЛА] — загружено {len(texts)} текстов") |
| print(f"Файл: {file_path}") |
| print("-" * 40) |
| |
| for i, text in enumerate(texts, 1): |
| print(f"\n🔍 Текст #{i}:") |
| result = self.judge.infer(text) |
| self.judge.pretty_print(result) |
| |
| print("\n" + "=" * 60) |
| input("Нажмите Enter для продолжения...") |
| |
| except Exception as e: |
| print(f"\n❌ Ошибка при обработке файла: {str(e)}") |
| input("\nНажмите Enter для продолжения...") |
| |
| return False |
| |
| def run_interactive(self): |
| """Основной цикл интерактивного интерфейса""" |
| current_mode = None |
| |
| while True: |
| |
| if not current_mode: |
| self.show_mode_menu() |
| choice = input("Ваш выбор (1-3)> ").strip() |
| |
| if not choice: |
| print("\n👋 Выход из программы...") |
| break |
| |
| if choice == "1": |
| current_mode = "single" |
| elif choice == "2": |
| current_mode = "multiline" |
| elif choice == "3": |
| current_mode = "file" |
| else: |
| print("\n❌ Неверный выбор! Попробуйте снова.") |
| input("Нажмите Enter для продолжения...") |
| continue |
| |
| |
| should_return_to_menu = False |
| |
| if current_mode == "single": |
| should_return_to_menu = self.process_single_mode() |
| elif current_mode == "multiline": |
| should_return_to_menu = self.process_multiline_mode() |
| elif current_mode == "file": |
| should_return_to_menu = self.process_file_mode() |
| |
| |
| if should_return_to_menu: |
| current_mode = None |
|
|
| |
| |
| |
|
|
| def main(): |
| """Основная функция - запускает интерактивный интерфейс""" |
| cli = InteractiveCLI() |
| cli.run_interactive() |
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| main() |