| import os |
| import sys |
| import json |
| import pickle |
| import csv |
| import io |
| import uuid |
| import asyncio |
| import time |
| import logging |
| from collections import OrderedDict, defaultdict, Counter |
| from functools import wraps |
| from typing import Dict, List, Any, Optional, Tuple |
|
|
| import torch |
| import torch.nn as nn |
| import numpy as np |
| import re |
| import networkx as nx |
| import pymorphy3 |
| import requests |
| import psutil |
| from fastapi import FastAPI, Request, Form, HTTPException, File, UploadFile, BackgroundTasks |
| from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse |
| from fastapi.templating import Jinja2Templates |
| from pydantic import BaseModel |
| import uvicorn |
| from transformers import BertTokenizer, BertModel |
| from sklearn.preprocessing import LabelEncoder |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
| logger = logging.getLogger(__name__) |
|
|
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
| print(f"Используется устройство: {device}") |
|
|
|
|
| SMILEY_MAPPING = { |
| ':)': 'смайлик_радость', ')': 'смайлик_радость', '🥰': 'смайлик_радость', |
| '😀': 'смайлик_радость', '😃': 'смайлик_радость', '😄': 'смайлик_радость', |
| '😁': 'смайлик_радость', '😘': 'смайлик_радость', '😍': 'смайлик_радость', |
| '😇': 'смайлик_радость', '😊': 'смайлик_радость', |
| ':D': 'смайлик_смех', ';)': 'смайлик_подмигивание', |
| '❤️': 'смайлик_радость_любовь', '🩷': 'смайлик_радость_любовь', |
| '🧡': 'смайлик_радость_любовь', '💛': 'смайлик_радость_любовь', |
| '💚': 'смайлик_радость_любовь', '💙': 'смайлик_радость_любовь', |
| '🩵': 'смайлик_радость_любовь', '💜': 'смайлик_радость_любовь', |
| '👍': 'смайлик_радость_класс', '👌': 'смайлик_радость_ок', |
| ':(': 'смайлик_грусть', '(': 'смайлик_грусть', '👎': 'смайлик_грусть', |
| '🥺': 'смайлик_грусть', '😞': 'смайлик_грусть', '🙁': 'смайлик_грусть', |
| '😭': 'смайлик_грусть_слезы', '🥲': 'смайлик_грусть', '☹️': 'смайлик_грусть', |
| '😔': 'смайлик_грусть_слезы', '😓': 'смайлик_грусть', '😢': 'смайлик_грусть_слезы', |
| '😡': 'смайлик_злость', '👿': 'смайлик_злость', '🤬': 'смайлик_злость', '😈': 'смайлик_злость', |
| '😠': 'смайлик_злость', |
| } |
|
|
| def clean_russian_text(text): |
| if not isinstance(text, str): |
| return "" |
| text = text.lower() |
| text = re.sub(r'http\S+|www\S+|https\S+', '', text) |
| text = re.sub(r'\S+@\S+', '', text) |
| |
| for smiley, replacement in SMILEY_MAPPING.items(): |
| text = text.replace(smiley, f' {replacement} ') |
| |
| text = re.sub(r'[^\w\sа-яё.,!?;:)(-]', ' ', text) |
| text = re.sub(r'\s+', ' ', text).strip() |
| return text |
|
|
| class LRUCache: |
| def __init__(self, maxsize=10000): |
| self.cache = OrderedDict() |
| self.maxsize = maxsize |
| def get(self, key): |
| if key not in self.cache: |
| return None |
| self.cache.move_to_end(key) |
| return self.cache[key] |
| def put(self, key, value): |
| if key in self.cache: |
| self.cache.move_to_end(key) |
| self.cache[key] = value |
| if len(self.cache) > self.maxsize: |
| self.cache.popitem(last=False) |
| def size(self): |
| return len(self.cache) |
|
|
| class OntologyEmotionModel: |
| def __init__(self, emotions: List[str], train_texts: List[str] = None, train_labels: List[int] = None): |
| self.emotions = emotions |
| self.morph = pymorphy3.MorphAnalyzer() |
| self.ontology_graph = nx.DiGraph() |
| self.empirical_base = defaultdict(list) |
| self.hypotheses_db = {} |
| self.verified_hypotheses = defaultdict(list) |
| self.sentiment_lexicon = {} |
| self.rule_stats = {} |
|
|
| if train_texts is not None and train_labels is not None: |
| self._build_sentiment_lexicon(train_texts, train_labels) |
|
|
| self._load_rusentilex() |
| print(f"📊 Всего слов в лексиконе: {len(self.sentiment_lexicon)}") |
| |
| fear_words = ['бояться', 'страшно', 'опасно', 'угроза', 'тревога', 'паника'] |
| for w in fear_words: |
| lemma = self.morph.parse(w)[0].normal_form |
| self.sentiment_lexicon[lemma] = 'страх' |
| print(f"📊 Добавлено {len(fear_words)} слов для эмоции 'страх'") |
| |
| explicit_sadness = ['грустно', 'печально', 'тоскливо', 'уныло', 'горестно', 'жалко', 'сожаление', 'обидно', 'печалька'] |
| added_sad = 0 |
| for w in explicit_sadness: |
| try: |
| lemma = self.morph.parse(w)[0].normal_form |
| self.sentiment_lexicon[lemma] = 'грусть' |
| added_sad += 1 |
| except: |
| pass |
| print(f"📊 Добавлено {added_sad} явных слов для эмоции 'грусть'") |
|
|
| disappointment_words = [ |
| 'разочарован', 'ожидал', 'надеялся', 'обманулся', 'не оправдал', |
| 'не впечатлило', 'слабо', 'посредственно', 'неудовлетворен' |
| ] |
| added_dis = 0 |
| for w in disappointment_words: |
| try: |
| lemma = self.morph.parse(w)[0].normal_form |
| self.sentiment_lexicon[lemma] = 'грусть' |
| added_dis += 1 |
| except: |
| pass |
| print(f"📊 Добавлено {added_dis} слов для эмоции 'грусть' (разочарование)") |
|
|
| positive_colloquial = [ |
| 'спасибо', 'благодарю', 'благодарить', 'норм', 'нормально', 'ок', 'окей', |
| 'класс', 'супер', 'здорово', 'прекрасно', 'отлично', 'ого', 'вау', 'круто', |
| 'зачет', 'лады', 'добре', 'хорошо', 'неплохо', 'приемлемо', 'удовлетворительно', |
| 'пойдет', 'в порядке', 'без проблем', 'лепота', 'зашибись', 'ого-го' |
| ] |
| negative_colloquial = [ |
| 'плохо', 'ужасно', 'кошмар', 'отвратительно', 'неуд', 'брак', 'сломалось', |
| 'не работает', 'фигня', 'ерунда', 'разочарование', 'недоволен', 'не доволен', |
| 'жаль', 'обидно', 'печаль', 'тоска', 'неудачно', 'провал', 'не годен', |
| 'неприемлемо', 'неудовлетворительно', 'нехорошо', 'не ок', 'не окей', 'не норм' |
| ] |
|
|
| for w in positive_colloquial: |
| try: |
| lemma = self.morph.parse(w)[0].normal_form |
| if lemma not in self.sentiment_lexicon: |
| self.sentiment_lexicon[lemma] = 'радость' |
| except: |
| pass |
| for w in negative_colloquial: |
| try: |
| lemma = self.morph.parse(w)[0].normal_form |
| if lemma not in self.sentiment_lexicon: |
| self.sentiment_lexicon[lemma] = 'грусть' |
| except: |
| pass |
|
|
| smiley_texts = set(SMILEY_MAPPING.values()) |
| for text in smiley_texts: |
| if 'радость' in text or 'смех' in text or 'подмигивание' in text or 'класс' in text or 'ок' in text or 'любовь' in text: |
| emotion = 'радость' |
| elif 'грусть' in text or 'слезы' in text: |
| emotion = 'грусть' |
| elif 'злость' in text: |
| emotion = 'злость' |
| else: |
| emotion = 'радость' |
| self.sentiment_lexicon[text] = emotion |
| print(f"📊 Добавлено {len(smiley_texts)} текстовых меток для распознавания смайликов") |
| |
| self.init_ontology_level1() |
| self.init_ontology_level2() |
|
|
| def _build_sentiment_lexicon(self, texts: List[str], labels: List[int]): |
| word_class_counts = defaultdict(lambda: np.zeros(len(self.emotions))) |
| for text, label in zip(texts, labels): |
| words = set(clean_russian_text(text).split()) |
| for word in words: |
| lemma = self.morph.parse(word)[0].normal_form |
| word_class_counts[lemma][label] += 1 |
| for lemma, counts in word_class_counts.items(): |
| prob = counts / (counts.sum() + 1e-10) |
| if prob.max() > 0.6 and counts.sum() > 5: |
| dominant_class = self.emotions[np.argmax(prob)] |
| self.sentiment_lexicon[lemma] = dominant_class |
|
|
| def _parse_rusentilex(self, content): |
| lines = content.splitlines() |
| added = 0 |
| sample_line = None |
| for line in lines: |
| line = line.strip() |
| if line: |
| sample_line = line |
| break |
| if not sample_line: |
| return |
| parts = sample_line.split(',') |
| is_two_column = len(parts) == 2 |
| for line in lines[1:]: |
| line = line.strip() |
| if not line: |
| continue |
| try: |
| if is_two_column: |
| term, tone_str = line.split(',') |
| term = term.strip().lower() |
| tone = int(tone_str) |
| lemma = self.morph.parse(term)[0].normal_form |
| if tone == 1: |
| self.sentiment_lexicon[lemma] = 'радость' |
| added += 1 |
| elif tone == -1: |
| self.sentiment_lexicon[lemma] = 'грусть' |
| added += 1 |
| else: |
| parts = line.split(',') |
| if len(parts) >= 3: |
| term = parts[0].strip().lower() |
| sentiment = parts[2].strip().lower() |
| lemma = self.morph.parse(term)[0].normal_form |
| if sentiment == 'positive': |
| self.sentiment_lexicon[lemma] = 'радость' |
| added += 1 |
| elif sentiment == 'negative': |
| self.sentiment_lexicon[lemma] = 'грусть' |
| added += 1 |
| except Exception: |
| continue |
| print(f" Добавлено слов из RuSentiLex: {added}") |
|
|
| def _load_rusentilex(self): |
| possible_paths = [ |
| 'model/rusentilex.csv', |
| 'rusentilex.csv', |
| '/app/model/rusentilex.csv', |
| os.path.join(os.path.dirname(__file__), 'model', 'rusentilex.csv') |
| ] |
| loaded = False |
| print("📂 Поиск RuSentiLex...") |
| for path in possible_paths: |
| if os.path.exists(path): |
| try: |
| with open(path, 'r', encoding='utf-8') as f: |
| content = f.read() |
| self._parse_rusentilex(content) |
| print(f"✅ RuSentiLex загружен из файла: {path}") |
| loaded = True |
| break |
| except Exception as e: |
| print(f"⚠️ Ошибка при загрузке {path}: {e}") |
| if not loaded: |
| print("⚠️ RuSentiLex не загружен. Используется только статистический лексикон.") |
|
|
| def init_ontology_level1(self): |
| self.emotion_definitions = { |
| 'радость': { |
| 'valence': 'positive', 'arousal': 'high', |
| 'definition': 'Позитивное эмоциональное состояние', |
| 'opposite': ['грусть', 'злость'] |
| }, |
| 'грусть': { |
| 'valence': 'negative', 'arousal': 'low', |
| 'definition': 'Негативное эмоциональное состояние', |
| 'opposite': ['радость'] |
| }, |
| 'злость': { |
| 'valence': 'negative', 'arousal': 'high', |
| 'definition': 'Негативное эмоциональное состояние', |
| 'opposite': ['радость'] |
| }, |
| 'страх': { |
| 'valence': 'negative', 'arousal': 'high', |
| 'definition': 'Эмоциональная реакция на угрозу', |
| 'opposite': ['уверенность', 'спокойствие'] |
| }, |
| 'сарказм': { |
| 'valence': 'negative', 'arousal': 'high', |
| 'definition': 'Язвительная насмешка', |
| 'opposite': ['радость'] |
| } |
| } |
| for emotion in self.emotions: |
| if emotion in self.emotion_definitions: |
| self.ontology_graph.add_node(emotion, **self.emotion_definitions[emotion]) |
| else: |
| self.ontology_graph.add_node(emotion, valence='neutral', arousal='neutral') |
| for emotion, data in self.emotion_definitions.items(): |
| if 'opposite' in data: |
| for opposite in data['opposite']: |
| if opposite in self.emotions: |
| self.ontology_graph.add_edge(emotion, opposite, relation='opposite') |
|
|
| def init_ontology_level2(self): |
| self.linguistic_rules = { |
| 'усилители': { |
| 'words': ['очень', 'сильно', 'крайне', 'чрезвычайно', 'невероятно', 'абсолютно'], |
| 'effect': 'increase_arousal', |
| 'weight': 0.3, |
| 'learnable': True |
| }, |
| 'ослабители': { |
| 'words': ['слегка', 'немного', 'чуть-чуть', 'отчасти', 'несколько'], |
| 'effect': 'decrease_arousal', |
| 'weight': -0.2, |
| 'learnable': True |
| }, |
| 'отрицания': { |
| 'words': ['не', 'ни', 'нет', 'нельзя', 'невозможно'], |
| 'effect': 'negation', |
| 'weight': -0.5, |
| 'learnable': True |
| }, |
| 'восклицания': { |
| 'patterns': [r'!+', r'\?+'], |
| 'effect': 'increase_arousal', |
| 'weight': 0.4, |
| 'learnable': True |
| }, |
| 'вопросительные': { |
| 'patterns': [r'\?+'], |
| 'effect': 'uncertainty', |
| 'weight': 0.2, |
| 'learnable': True |
| }, |
| 'сарказм_маркеры': { |
| 'words': ['ага', 'да что вы', 'ну да', 'кто бы мог подумать'], |
| 'effect': 'sarcasm', |
| 'weight': 0.6, |
| 'learnable': True |
| } |
| } |
|
|
| def add_empirical_knowledge(self, text: str, emotion: str, confidence: float): |
| self.empirical_base[emotion].append({'text': text, 'confidence': confidence}) |
| if len(self.empirical_base[emotion]) > 1000: |
| self.empirical_base[emotion] = self.empirical_base[emotion][-1000:] |
|
|
| def formulate_hypothesis(self, text: str, model_prediction: Dict, rule_based_prediction: Dict) -> Dict: |
| hypothesis_id = f"hyp_{len(self.hypotheses_db) + 1:06d}" |
| hypothesis = { |
| 'id': hypothesis_id, 'text': text, |
| 'model_prediction': model_prediction, |
| 'rule_based_prediction': rule_based_prediction, |
| 'disagreement': self.calculate_disagreement(model_prediction, rule_based_prediction), |
| 'status': 'pending' |
| } |
| self.hypotheses_db[hypothesis_id] = hypothesis |
| return hypothesis |
|
|
| def verify_hypothesis(self, hypothesis_id: str, actual_emotion: str = None) -> Dict: |
| if hypothesis_id not in self.hypotheses_db: |
| return None |
| hypothesis = self.hypotheses_db[hypothesis_id] |
| if actual_emotion: |
| model_correct = hypothesis['model_prediction']['emotion'] == actual_emotion |
| rule_correct = hypothesis['rule_based_prediction']['emotion'] == actual_emotion |
| if model_correct and not rule_correct: |
| hypothesis['status'] = 'model_superior' |
| elif rule_correct and not model_correct: |
| hypothesis['status'] = 'rule_superior' |
| elif model_correct and rule_correct: |
| hypothesis['status'] = 'both_correct' |
| else: |
| hypothesis['status'] = 'both_incorrect' |
| return hypothesis |
|
|
| def apply_linguistic_rules(self, text: str) -> Dict: |
| rules_applied = [] |
| adjustments = {'valence': 0, 'arousal': 0, 'uncertainty': 0, 'sarcasm': 0} |
| words = text.lower().split() |
| parsed = [self.morph.parse(w)[0] for w in words] |
| lemmas = [p.normal_form for p in parsed] |
| pos_tags = [p.tag.POS for p in parsed] |
|
|
| for lemma in lemmas: |
| sentiment = self.sentiment_lexicon.get(lemma, 'neutral') |
| if sentiment == 'радость': |
| rules_applied.append(f"позитивное слово: {lemma}") |
| adjustments['valence'] += 0.2 |
| elif sentiment in ('грусть', 'злость', 'страх'): |
| rules_applied.append(f"негативное слово: {lemma}") |
| adjustments['valence'] -= 0.2 |
|
|
| for category, rule in self.linguistic_rules.items(): |
| if 'words' in rule: |
| for word in rule['words']: |
| if word in lemmas: |
| rules_applied.append(f"{category}: {word}") |
| effect = rule['effect'] |
| weight = rule['weight'] |
| if effect == 'increase_arousal': |
| adjustments['arousal'] += weight |
| elif effect == 'decrease_arousal': |
| adjustments['arousal'] += weight |
| elif effect == 'negation': |
| adjustments['valence'] += weight |
| elif effect == 'sarcasm': |
| adjustments['sarcasm'] += weight |
| if 'patterns' in rule: |
| for pattern in rule['patterns']: |
| if re.search(pattern, text): |
| rules_applied.append(f"{category}: {pattern}") |
| weight = rule['weight'] |
| if rule['effect'] == 'increase_arousal': |
| adjustments['arousal'] += weight |
| elif rule['effect'] == 'uncertainty': |
| adjustments['uncertainty'] += weight |
|
|
| if 'не' in lemmas: |
| idx = lemmas.index('не') |
| if idx + 1 < len(lemmas) and lemmas[idx+1] == 'очень': |
| adjustments['arousal'] -= 0.2 |
| adjustments['valence'] -= 0.3 |
| rules_applied.append("сочетание: не очень") |
| else: |
| for j in range(idx+1, min(idx+4, len(lemmas))): |
| if pos_tags[j] in ('ADJF', 'ADJS', 'ADVB'): |
| target_word = lemmas[j] |
| sentiment = self.sentiment_lexicon.get(target_word, 'neutral') |
| if sentiment in ('грусть', 'злость', 'страх'): |
| adjustments['valence'] += 1.0 |
| rules_applied.append(f"инверсия негатива: не {target_word}") |
| elif sentiment == 'радость': |
| adjustments['valence'] -= 1.0 |
| rules_applied.append(f"инверсия позитива: не {target_word}") |
| break |
|
|
| pos_words = [w for w in lemmas if self.sentiment_lexicon.get(w) == 'радость'] |
| neg_words = [w for w in lemmas if self.sentiment_lexicon.get(w) in ('грусть', 'злость', 'страх')] |
| if pos_words and neg_words: |
| adjustments['sarcasm'] += 0.5 |
| rules_applied.append(f"контраст тональности: позитив {pos_words[:2]} vs негатив {neg_words[:2]}") |
|
|
| sarcasm_phrases = ['ага', 'ну да', 'да что вы', 'кто бы мог подумать'] |
| for phrase in sarcasm_phrases: |
| if phrase in text.lower(): |
| adjustments['sarcasm'] += 0.6 |
| rules_applied.append(f"саркастическая фраза: {phrase}") |
|
|
| if adjustments['sarcasm'] > 0.5: |
| rules_applied.append("обнаружен сарказм") |
|
|
| disappointment_verbs = ['ожидать', 'надеяться', 'думать'] |
| disappointment_adjs = ['большой', 'лучший', 'хороший', 'много', 'высокий'] |
| found_disappointment = False |
| for v in disappointment_verbs: |
| if v in lemmas: |
| for a in disappointment_adjs: |
| if a in lemmas: |
| adjustments['valence'] -= 0.8 |
| rules_applied.append(f"разочарование: {v} {a}") |
| found_disappointment = True |
| break |
| if found_disappointment: |
| break |
| if 'оправдать' in lemmas and 'ожидание' in lemmas: |
| adjustments['valence'] -= 0.7 |
| rules_applied.append("разочарование: не оправдал ожиданий") |
|
|
| return {'rules_applied': rules_applied, 'adjustments': adjustments, 'lemmas': lemmas} |
|
|
| def calculate_disagreement(self, pred1: Dict, pred2: Dict) -> float: |
| if pred1['emotion'] == pred2['emotion']: |
| return 0.0 |
| emotions = list(self.emotion_definitions.keys()) |
| idx1 = emotions.index(pred1['emotion']) if pred1['emotion'] in emotions else -1 |
| idx2 = emotions.index(pred2['emotion']) if pred2['emotion'] in emotions else -1 |
| if idx1 == -1 or idx2 == -1: |
| return 0.5 |
| distance = abs(idx1 - idx2) / len(emotions) |
| return 0.7 * distance |
|
|
| def explain_transition(self, from_emotion: str, to_emotion: str) -> List[str]: |
| try: |
| return nx.shortest_path(self.ontology_graph, source=from_emotion, target=to_emotion) |
| except: |
| return [] |
|
|
| def adjust_prediction_with_rules(self, prediction: Dict, rule_analysis: Dict) -> Dict: |
| original_emotion = prediction['emotion'] |
| original_confidence = prediction['confidence'] |
| adj = rule_analysis['adjustments'] |
| rules = rule_analysis['rules_applied'] |
| |
| original_confidence_value = original_confidence |
| was_corrected = len(rules) > 0 |
|
|
| conf_mult = 1.0 + adj['arousal'] * 0.2 + adj['uncertainty'] * 0.1 - abs(adj['valence']) * 0.1 |
| conf_mult = np.clip(conf_mult, 0.5, 1.5) |
| new_confidence = original_confidence * conf_mult |
| new_emotion = original_emotion |
|
|
| has_negative = (any('негативное слово' in r for r in rules) or |
| any('разочарование' in r for r in rules) or |
| any('инверсия негатива' in r for r in rules) or |
| any('сочетание: не очень' in r for r in rules)) |
| has_positive = any('позитивное слово' in r for r in rules) or any('радость' in r for r in rules) |
| |
| fear_keywords = ['страшно', 'бояться', 'опасно', 'угроза', 'тревога', 'паника'] |
| has_fear_word = any(any(kw in r for kw in fear_keywords) for r in rules) |
| if has_fear_word and original_emotion in ('грусть', 'злость', 'радость'): |
| new_emotion = 'страх' |
| new_confidence *= 0.9 |
| rules.append("коррекция: обнаружены слова страха → страх") |
| |
| |
| if not has_positive and not has_negative: |
| new_emotion = 'не определено' |
| new_confidence = 1.0 |
| rules.append("нет эмоциональных слов → не определено") |
| else: |
| if has_negative and not has_positive: |
| if original_emotion == 'радость': |
| new_emotion = 'грусть' |
| new_confidence *= 0.8 |
| rules.append("коррекция: негативные слова без позитивных") |
| elif original_emotion == 'сарказм': |
| new_emotion = 'грусть' |
| new_confidence *= 0.9 |
| elif has_positive and not has_negative and original_emotion in ('грусть', 'злость', 'страх'): |
| new_emotion = 'радость' |
| rules.append("коррекция: позитивные слова") |
|
|
| for rule in rules: |
| if rule.startswith("инверсия негатива:"): |
| new_emotion = 'радость' |
| break |
| elif rule.startswith("инверсия позитива:"): |
| if adj['arousal'] > 0.3: |
| new_emotion = 'злость' |
| else: |
| new_emotion = 'грусть' |
| break |
|
|
| sarcasm_flag = adj['sarcasm'] > 0.5 |
| if sarcasm_flag: |
| new_emotion = 'сарказм' |
| new_confidence = min(new_confidence * 0.8, 0.9) |
| if "саркастическая фраза" in str(rules): |
| new_confidence = min(new_confidence * 1.1, 0.95) |
|
|
| if any('восклицание' in r for r in rules): |
| new_confidence = min(new_confidence * 1.2, 1.0) |
| |
| if not was_corrected and original_confidence_value < 0.9: |
| new_confidence = min(new_confidence * 1.10, 1.0) |
|
|
| new_confidence = min(new_confidence, 1.0) |
|
|
| return { |
| 'emotion': new_emotion, |
| 'confidence': new_confidence, |
| 'rules_applied': rules |
| } |
|
|
| def get_ontology_analysis(self, text: str, model_prediction: Dict) -> Dict: |
| rule_analysis = self.apply_linguistic_rules(text) |
| adjusted = self.adjust_prediction_with_rules(model_prediction, rule_analysis) |
| disagreement = self.calculate_disagreement(model_prediction, adjusted) |
| hypothesis = self.formulate_hypothesis(text, model_prediction, adjusted) if disagreement > 0.2 else None |
| return { |
| 'rule_analysis': rule_analysis, |
| 'adjusted_prediction': adjusted, |
| 'disagreement': disagreement, |
| 'hypothesis': hypothesis |
| } |
|
|
| def get_statistics(self) -> Dict: |
| return { |
| 'ontology_nodes': len(self.ontology_graph.nodes), |
| 'ontology_edges': len(self.ontology_graph.edges), |
| 'linguistic_rules': len(self.linguistic_rules), |
| 'emotions_covered': len(self.emotions), |
| 'pending_hypotheses': len([h for h in self.hypotheses_db.values() if h['status'] == 'pending']) |
| } |
|
|
| class EmotionLSTM(nn.Module): |
| def __init__(self, vocab_size, embed_dim=128, hidden_dim=256, num_classes=3, dropout=0.3, num_layers=2): |
| super().__init__() |
| self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0) |
| self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers, batch_first=True, bidirectional=True, dropout=dropout) |
| self.dropout = nn.Dropout(dropout) |
| self.classifier = nn.Sequential( |
| nn.Linear(hidden_dim * 2, 128), nn.ReLU(), nn.Dropout(dropout), |
| nn.Linear(128, 64), nn.ReLU(), nn.Linear(64, num_classes) |
| ) |
| def forward(self, x, return_confidence=False): |
| embedded = self.embedding(x) |
| lstm_out, (hidden, cell) = self.lstm(embedded) |
| lstm_last = torch.cat((hidden[-2, :, :], hidden[-1, :, :]), dim=1) |
| features = self.dropout(lstm_last) |
| logits = self.classifier(features) |
| if return_confidence: |
| probs = torch.softmax(logits, dim=1) |
| conf, _ = torch.max(probs, dim=1) |
| return logits, conf |
| return logits |
|
|
| class EmotionBERT(nn.Module): |
| def __init__(self, bert_model_name, num_classes, dropout=0.3): |
| super().__init__() |
| self.bert = BertModel.from_pretrained(bert_model_name) |
| hidden = self.bert.config.hidden_size |
| self.classifier = nn.Sequential( |
| nn.Dropout(dropout), nn.Linear(hidden, 256), nn.ReLU(), |
| nn.Dropout(dropout), nn.Linear(256, 128), nn.ReLU(), |
| nn.Linear(128, num_classes) |
| ) |
| def forward(self, input_ids, attention_mask, return_confidence=False): |
| out = self.bert(input_ids, attention_mask, return_dict=True) |
| cls = out.last_hidden_state[:, 0, :] |
| logits = self.classifier(cls) |
| if return_confidence: |
| probs = torch.softmax(logits, dim=1) |
| conf, _ = torch.max(probs, dim=1) |
| return logits, conf |
| return logits |
|
|
| class CascadeEmotionClassifier: |
| def __init__(self, lstm_model, bert_model, vocab, tokenizer, label_encoder, ontology_model, threshold=0.95, device='cpu', max_length_lstm=100, max_length_bert=128, enable_cache=True, cache_maxsize=10000): |
| self.lstm_model = lstm_model |
| self.bert_model = bert_model |
| self.vocab = vocab |
| self.tokenizer = tokenizer |
| self.label_encoder = label_encoder |
| self.ontology_model = ontology_model |
| self.threshold = threshold |
| self.device = device |
| self.max_length_lstm = max_length_lstm |
| self.max_length_bert = max_length_bert |
| self.lstm_model.eval() |
| self.bert_model.eval() |
| self.lstm_model.to(device) |
| self.bert_model.to(device) |
| self.stats = {'total': 0, 'lstm': 0, 'bert': 0, 'corrections': 0} |
| |
| |
| self.enable_cache = enable_cache |
| self.cache = LRUCache(maxsize=cache_maxsize) if enable_cache else None |
| self.cache_hits = 0 |
| self.cache_misses = 0 |
|
|
| def text_to_sequence(self, text): |
| words = str(text).split()[:self.max_length_lstm] |
| sequence = [self.vocab.get(word, self.vocab.get('<UNK>', 1)) for word in words] |
| if len(sequence) < self.max_length_lstm: |
| sequence += [self.vocab.get('<PAD>', 0)] * (self.max_length_lstm - len(sequence)) |
| return sequence[:self.max_length_lstm] |
|
|
| def predict(self, text): |
| if self.enable_cache: |
| text_clean = clean_russian_text(text) |
| cached = self.cache.get(text_clean) |
| if cached is not None: |
| self.cache_hits += 1 |
| return cached |
| self.cache_misses += 1 |
| |
| result = self.predict_batch([text])[0] |
| |
| if self.enable_cache: |
| self.cache.put(text_clean, result) |
| return result |
|
|
| def predict_batch(self, texts: List[str]) -> List[Dict]: |
| self.stats['total'] += len(texts) |
| texts_clean = [clean_russian_text(t) for t in texts] |
| |
| valid_mask = [bool(re.search(r'[а-яё]', t)) for t in texts_clean] |
| valid_indices = [i for i, v in enumerate(valid_mask) if v] |
| |
| results = [None] * len(texts) |
| |
| if not valid_indices: |
| for i in range(len(texts)): |
| results[i] = { |
| 'text': texts[i], |
| 'predicted_emotion': 'не определено', |
| 'confidence': 1.0, |
| 'used_model': '', |
| 'rules_applied': ['текст не содержит русских букв'] |
| } |
| return results |
| |
| lstm_inputs = [self.text_to_sequence(texts_clean[i]) for i in valid_indices] |
| lstm_inputs_tensor = torch.LongTensor(lstm_inputs).to(self.device) |
| |
| with torch.no_grad(): |
| lstm_logits, lstm_conf = self.lstm_model(lstm_inputs_tensor, return_confidence=True) |
| lstm_probs = torch.softmax(lstm_logits, dim=1) |
| lstm_preds = lstm_probs.argmax(dim=1).cpu().numpy() |
| lstm_confs = lstm_conf.cpu().numpy() |
| |
| lstm_group = [] |
| bert_group = [] |
| for idx, conf in enumerate(lstm_confs): |
| if conf >= self.threshold: |
| lstm_group.append(idx) |
| else: |
| bert_group.append(idx) |
| |
| for idx_in_batch in lstm_group: |
| orig_idx = valid_indices[idx_in_batch] |
| text_clean = texts_clean[orig_idx] |
| lstm_emo = self.label_encoder.inverse_transform([lstm_preds[idx_in_batch]])[0] |
| lstm_pred_dict = { |
| 'emotion': lstm_emo, |
| 'confidence': lstm_confs[idx_in_batch], |
| 'probabilities': lstm_probs[idx_in_batch].cpu().numpy().tolist() |
| } |
| lstm_onto = self.ontology_model.get_ontology_analysis(text_clean, lstm_pred_dict) |
| final = lstm_onto['adjusted_prediction'] |
| results[orig_idx] = { |
| 'text': texts[orig_idx], |
| 'predicted_emotion': final['emotion'], |
| 'confidence': final['confidence'], |
| 'used_model': "LSTM + онтология", |
| 'rules_applied': lstm_onto['rule_analysis']['rules_applied'] |
| } |
| self.stats['lstm'] += 1 |
| |
| if bert_group: |
| bert_indices_orig = [valid_indices[idx] for idx in bert_group] |
| bert_texts_clean = [texts_clean[i] for i in bert_indices_orig] |
| enc = self.tokenizer(bert_texts_clean, truncation=True, padding=True, max_length=self.max_length_bert, return_tensors='pt').to(self.device) |
| with torch.no_grad(): |
| bert_logits, bert_conf = self.bert_model(enc['input_ids'], enc['attention_mask'], return_confidence=True) |
| bert_probs = torch.softmax(bert_logits, dim=1) |
| bert_preds = bert_probs.argmax(dim=1).cpu().numpy() |
| bert_confs = bert_conf.cpu().numpy() |
| |
| for j, orig_idx in enumerate(bert_indices_orig): |
| text_clean = bert_texts_clean[j] |
| bert_emo = self.label_encoder.inverse_transform([bert_preds[j]])[0] |
| bert_pred_dict = { |
| 'emotion': bert_emo, |
| 'confidence': bert_confs[j], |
| 'probabilities': bert_probs[j].cpu().numpy().tolist() |
| } |
| bert_onto = self.ontology_model.get_ontology_analysis(text_clean, bert_pred_dict) |
| final = bert_onto['adjusted_prediction'] |
| results[orig_idx] = { |
| 'text': texts[orig_idx], |
| 'predicted_emotion': final['emotion'], |
| 'confidence': final['confidence'], |
| 'used_model': "BERT + онтология", |
| 'rules_applied': bert_onto['rule_analysis']['rules_applied'] |
| } |
| self.stats['bert'] += 1 |
| |
| for i in range(len(texts)): |
| if results[i] is None: |
| results[i] = { |
| 'text': texts[i], |
| 'predicted_emotion': 'не определено', |
| 'confidence': 1.0, |
| 'used_model': '', |
| 'rules_applied': ['текст не содержит русских букв'] |
| } |
| |
| return results |
|
|
| def predict_batch_with_cache(self, texts): |
| if not self.enable_cache: |
| return self.predict_batch(texts) |
| |
| results = [None] * len(texts) |
| texts_to_predict = [] |
| indices_to_predict = [] |
| |
| for i, text in enumerate(texts): |
| text_clean = clean_russian_text(text) |
| cached = self.cache.get(text_clean) |
| if cached is not None: |
| results[i] = cached |
| self.cache_hits += 1 |
| else: |
| texts_to_predict.append(text) |
| indices_to_predict.append(i) |
| |
| if texts_to_predict: |
| batch_results = self.predict_batch(texts_to_predict) |
| for idx, orig_idx in enumerate(indices_to_predict): |
| result = batch_results[idx] |
| results[orig_idx] = result |
| self.cache.put(clean_russian_text(texts[orig_idx]), result) |
| self.cache_misses += 1 |
| |
| return results |
|
|
| def load_model(): |
| print("Загрузка модели...") |
| model_dir = 'model' |
| |
| with open(f'{model_dir}/model_info.json', 'r', encoding='utf-8') as f: |
| model_info = json.load(f) |
| |
| with open(f'{model_dir}/vocab.json', 'r', encoding='utf-8') as f: |
| vocab = json.load(f) |
| |
| print("📂 Создание label_encoder...") |
| label_encoder = LabelEncoder() |
| label_encoder.classes_ = np.array(model_info['classes']) |
| print(f"✅ label_encoder создан, классы: {list(label_encoder.classes_)}") |
| |
| print("📂 Создание онтологии...") |
| ontology_model = OntologyEmotionModel( |
| emotions=list(label_encoder.classes_), |
| train_texts=None, |
| train_labels=None |
| ) |
| print("✅ Онтология создана") |
| |
| print("📂 Загрузка LSTM...") |
| lstm_model = EmotionLSTM( |
| vocab_size=len(vocab), |
| embed_dim=model_info.get('embed_dim', 300), |
| hidden_dim=256, |
| num_classes=model_info['num_classes'], |
| dropout=0.3, |
| num_layers=2 |
| ) |
| lstm_state = torch.load(f'{model_dir}/lstm_model.pth', map_location=device, weights_only=False) |
| lstm_model.load_state_dict(lstm_state) |
| print("✅ LSTM загружена") |
| |
| print("📂 Загрузка BERT...") |
| bert_model = EmotionBERT( |
| bert_model_name=model_info['bert_model_name'], |
| num_classes=model_info['num_classes'], |
| dropout=0.3 |
| ) |
| bert_state = torch.load(f'{model_dir}/bert_model.pth', map_location=device, weights_only=False) |
| bert_model.load_state_dict(bert_state) |
| print("✅ BERT загружена") |
| |
| print("📂 Загрузка токенизатора...") |
| try: |
| tokenizer = BertTokenizer.from_pretrained(model_dir) |
| print("✅ Токенизатор загружен из model_dir") |
| except Exception as e: |
| print(f"⚠️ Ошибка: {e}") |
| print("🔄 Загружаем токенизатор из Hugging Face...") |
| tokenizer = BertTokenizer.from_pretrained('DeepPavlov/rubert-base-cased') |
| print("✅ Токенизатор загружен из Hugging Face") |
| |
| print("📂 Создание каскадного классификатора с кэшем...") |
| cascade = CascadeEmotionClassifier( |
| lstm_model=lstm_model, |
| bert_model=bert_model, |
| vocab=vocab, |
| tokenizer=tokenizer, |
| label_encoder=label_encoder, |
| ontology_model=ontology_model, |
| threshold=model_info.get('threshold', 0.95), |
| device=device, |
| max_length_lstm=model_info.get('max_length_lstm', 100), |
| max_length_bert=model_info.get('max_length_bert', 128), |
| enable_cache=True, |
| cache_maxsize=10000 |
| ) |
| |
| print("✅ Модель успешно загружена!") |
| return cascade, model_info |
|
|
| class ModelLoader: |
| def __init__(self): |
| self.classifier = None |
| self.model_info = None |
| self.loading = True |
| self.error = None |
| self._task = None |
|
|
| async def load_async(self): |
| self.loading = True |
| self.error = None |
| try: |
| loop = asyncio.get_event_loop() |
| self.classifier, self.model_info = await loop.run_in_executor(None, load_model) |
| logger.info("Модели успешно загружены!") |
| except Exception as e: |
| logger.exception("Ошибка загрузки модели") |
| self.error = str(e) |
| finally: |
| self.loading = False |
|
|
| def is_ready(self): |
| return self.classifier is not None and not self.loading |
|
|
| class Monitor: |
| def __init__(self): |
| self.request_count = 0 |
| self.total_prediction_time = 0.0 |
| self.emotion_counter = defaultdict(int) |
| self.error_count = 0 |
| self.start_time = time.time() |
| |
| def record_request(self, duration_ms: float, emotion: str = None): |
| self.request_count += 1 |
| self.total_prediction_time += duration_ms |
| if emotion: |
| self.emotion_counter[emotion] += 1 |
| |
| def record_error(self): |
| self.error_count += 1 |
| |
| def get_metrics(self): |
| avg_time_ms = (self.total_prediction_time / self.request_count) if self.request_count > 0 else 0 |
| uptime = time.time() - self.start_time |
| cpu_percent = psutil.cpu_percent(interval=0.1) |
| memory = psutil.virtual_memory() |
| cache_stats = {} |
| if model_loader.classifier and model_loader.classifier.cache: |
| cache_stats = { |
| "hits": model_loader.classifier.cache_hits, |
| "misses": model_loader.classifier.cache_misses, |
| "size": model_loader.classifier.cache.size() |
| } |
| return { |
| "requests": { |
| "total": self.request_count, |
| "errors": self.error_count, |
| "avg_prediction_time_ms": round(avg_time_ms, 2) |
| }, |
| "emotions": dict(self.emotion_counter), |
| "system": { |
| "cpu_percent": cpu_percent, |
| "memory_percent": memory.percent, |
| "memory_used_mb": memory.used // (1024*1024), |
| "uptime_seconds": round(uptime, 1) |
| }, |
| "cache": cache_stats |
| } |
| class TaskStatus(BaseModel): |
| task_id: str |
| status: str |
| progress: int |
| total: int |
| processed: int |
| result: Optional[Dict] = None |
| error: Optional[str] = None |
| created_at: float |
| updated_at: float |
|
|
| task_storage = {} |
|
|
| async def process_file_async(task_id: str, file_content: bytes, filename: str, classifier): |
| task = task_storage[task_id] |
| task.status = "processing" |
| task.updated_at = time.time() |
| |
| try: |
| try: |
| text_content = file_content.decode('utf-8') |
| except UnicodeDecodeError: |
| text_content = file_content.decode('cp1251') |
| |
| reader = csv.reader(text_content.splitlines(), delimiter='|') |
| rows = list(reader) |
| if not rows: |
| raise ValueError("Файл пуст") |
| |
| header = rows[0] |
| text_col_idx = None |
| for i, col in enumerate(header): |
| if col.strip().lower() == 'text': |
| text_col_idx = i |
| break |
| if text_col_idx is None: |
| text_col_idx = 0 |
| |
| texts = [] |
| for row in rows[1:]: |
| if len(row) > text_col_idx: |
| texts.append(row[text_col_idx]) |
| |
| task.total = len(texts) |
| |
| batch_size = 100 |
| all_results = [] |
| for i in range(0, len(texts), batch_size): |
| batch = texts[i:i+batch_size] |
| batch_results = classifier.predict_batch_with_cache(batch) |
| all_results.extend(batch_results) |
| task.processed = min(i + batch_size, len(texts)) |
| task.progress = int(task.processed / task.total * 100) |
| task.updated_at = time.time() |
| await asyncio.sleep(0.01) |
| |
| emotion_counts = defaultdict(int) |
| examples = defaultdict(list) |
| for r in all_results: |
| emotion = r['predicted_emotion'] |
| if emotion == 'не определено': |
| continue |
| emotion_counts[emotion] += 1 |
| examples[emotion].append((r['confidence'], r['text'])) |
| |
| examples_top = {} |
| for emotion, lst in examples.items(): |
| lst.sort(key=lambda x: x[0], reverse=True) |
| examples_top[emotion] = [{'text': t, 'confidence': f"{c*100:.1f}%"} for c, t in lst[:5]] |
| |
| task.result = { |
| "emotion_counts": dict(emotion_counts), |
| "examples": examples_top, |
| "results": all_results |
| } |
| task.status = "completed" |
| task.progress = 100 |
| logger.info(f"Task {task_id} completed") |
| except Exception as e: |
| logger.exception(f"Task {task_id} failed") |
| task.status = "failed" |
| task.error = str(e) |
| finally: |
| task.updated_at = time.time() |
|
|
| app = FastAPI(title="Emotion Analysis with BERT, Ontology, Cache and Async") |
| templates = Jinja2Templates(directory="templates") |
|
|
| model_loader = ModelLoader() |
| monitor = Monitor() |
|
|
| @app.on_event("startup") |
| async def startup_event(): |
| asyncio.create_task(model_loader.load_async()) |
|
|
| @app.middleware("http") |
| async def add_monitoring(request: Request, call_next): |
| start_time = time.time() |
| try: |
| response = await call_next(request) |
| duration_ms = (time.time() - start_time) * 1000 |
| if request.url.path not in ("/metrics", "/health"): |
| monitor.record_request(duration_ms) |
| return response |
| except Exception as e: |
| monitor.record_error() |
| raise |
|
|
| @app.get("/", response_class=HTMLResponse) |
| async def home(request: Request): |
| return templates.TemplateResponse("index.html", {"request": request}) |
|
|
| @app.post("/predict") |
| async def predict(text: str = Form(...)): |
| if not model_loader.is_ready(): |
| raise HTTPException(status_code=503, detail="Модель ещё загружается") |
| if not text or len(text.strip()) < 3: |
| return JSONResponse({"error": "Введите хотя бы 3 символа."}, status_code=400) |
| try: |
| start = time.time() |
| result = model_loader.classifier.predict(text) |
| duration = (time.time() - start) * 1000 |
| monitor.record_request(duration, result['predicted_emotion']) |
| rules_display = [] |
| for rule in result['rules_applied'][:10]: |
| if ':' in rule: |
| cat, val = rule.split(':', 1) |
| rules_display.append(f"<span class='rule-tag'>{cat}: {val}</span>") |
| else: |
| rules_display.append(f"<span class='rule-tag'>{rule}</span>") |
| return JSONResponse({ |
| "success": True, |
| "emotion": result['predicted_emotion'], |
| "confidence": f"{result['confidence']*100:.1f}%", |
| "used_model": result['used_model'], |
| "rules": "".join(rules_display) if rules_display else "Нет правил" |
| }) |
| except Exception as e: |
| monitor.record_error() |
| return JSONResponse({"error": str(e)}, status_code=500) |
|
|
| @app.post("/upload") |
| async def upload_file(file: UploadFile = File(...)): |
| if not model_loader.is_ready(): |
| raise HTTPException(status_code=503, detail="Модель ещё загружается") |
| if not file.filename.endswith('.csv'): |
| return JSONResponse({"error": "Файл должен быть в формате CSV"}, status_code=400) |
| |
| try: |
| content = await file.read() |
| try: |
| text_content = content.decode('utf-8') |
| except UnicodeDecodeError: |
| text_content = content.decode('cp1251') |
| |
| reader = csv.reader(text_content.splitlines(), delimiter='|') |
| rows = list(reader) |
| if not rows: |
| return JSONResponse({"error": "Файл пуст"}, status_code=400) |
| |
| header = rows[0] |
| text_col_idx = None |
| for i, col in enumerate(header): |
| if col.strip().lower() == 'text': |
| text_col_idx = i |
| break |
| if text_col_idx is None: |
| text_col_idx = 0 |
| |
| texts = [] |
| for row in rows[1:]: |
| if len(row) > text_col_idx: |
| texts.append(row[text_col_idx]) |
| |
| if not texts: |
| return JSONResponse({"error": "В файле нет данных для анализа"}, status_code=400) |
| |
| results = model_loader.classifier.predict_batch_with_cache(texts) |
| |
| emotion_counts = defaultdict(int) |
| examples = defaultdict(list) |
| for r in results: |
| emotion = r['predicted_emotion'] |
| if emotion == 'не определено': |
| continue |
| emotion_counts[emotion] += 1 |
| examples[emotion].append((r['confidence'], r['text'])) |
| |
| examples_top = {} |
| for emotion, lst in examples.items(): |
| lst.sort(key=lambda x: x[0], reverse=True) |
| examples_top[emotion] = [{'text': t, 'confidence': f"{c*100:.1f}%"} for c, t in lst[:5]] |
| |
| session_id = str(uuid.uuid4()) |
| task_storage[session_id] = TaskStatus( |
| task_id=session_id, status="completed", progress=100, |
| total=len(texts), processed=len(texts), |
| result={"results": results, "emotion_counts": emotion_counts, "examples": examples_top}, |
| created_at=time.time(), updated_at=time.time() |
| ) |
| |
| return JSONResponse({ |
| "success": True, |
| "emotion_counts": dict(emotion_counts), |
| "examples": examples_top, |
| "session_id": session_id |
| }) |
| except Exception as e: |
| return JSONResponse({"error": f"Ошибка обработки файла: {str(e)}"}, status_code=500) |
|
|
| @app.post("/upload_async") |
| async def upload_file_async(file: UploadFile = File(...)): |
| if not model_loader.is_ready(): |
| raise HTTPException(status_code=503, detail="Модель ещё загружается") |
| if not file.filename.endswith('.csv'): |
| raise HTTPException(status_code=400, detail="Файл должен быть в формате CSV") |
| |
| content = await file.read() |
| task_id = str(uuid.uuid4()) |
| task_status = TaskStatus( |
| task_id=task_id, |
| status="pending", |
| progress=0, |
| total=0, |
| processed=0, |
| created_at=time.time(), |
| updated_at=time.time() |
| ) |
| task_storage[task_id] = task_status |
| |
| asyncio.create_task(process_file_async(task_id, content, file.filename, model_loader.classifier)) |
| |
| return JSONResponse({ |
| "task_id": task_id, |
| "status": "pending", |
| "message": "Файл принят в обработку. Используйте /status/{task_id} для отслеживания прогресса." |
| }) |
|
|
| @app.get("/status/{task_id}") |
| async def get_task_status(task_id: str): |
| task = task_storage.get(task_id) |
| if not task: |
| raise HTTPException(status_code=404, detail="Задача не найдена") |
| return JSONResponse(task.dict()) |
|
|
| @app.get("/result/{task_id}") |
| async def get_task_result(task_id: str): |
| task = task_storage.get(task_id) |
| if not task: |
| raise HTTPException(status_code=404, detail="Задача не найдена") |
| if task.status != "completed": |
| raise HTTPException(status_code=400, detail=f"Задача ещё не завершена (статус: {task.status})") |
| return JSONResponse({ |
| "emotion_counts": task.result["emotion_counts"], |
| "examples": task.result["examples"], |
| "task_id": task_id |
| }) |
|
|
| @app.get("/download/{session_id}") |
| async def download_results(session_id: str): |
| task = task_storage.get(session_id) |
| if not task or task.status != "completed": |
| raise HTTPException(status_code=404, detail="Результаты не найдены или задача не завершена") |
| results = task.result.get("results", []) |
| output = io.StringIO() |
| writer = csv.writer(output) |
| writer.writerow(["text", "predicted_emotion", "confidence", "used_model", "rules_applied"]) |
| for r in results: |
| writer.writerow([ |
| r['text'], |
| r['predicted_emotion'], |
| f"{r['confidence']:.4f}", |
| r['used_model'], |
| "|".join(r['rules_applied']) |
| ]) |
| output.seek(0) |
| return StreamingResponse( |
| iter([output.getvalue().encode('utf-8-sig')]), |
| media_type="text/csv", |
| headers={"Content-Disposition": f"attachment; filename=emotion_results_{session_id}.csv"} |
| ) |
|
|
| @app.get("/download_async/{task_id}") |
| async def download_async_results(task_id: str): |
| task = task_storage.get(task_id) |
| if not task or task.status != "completed": |
| raise HTTPException(status_code=404, detail="Результаты не найдены или задача не завершена") |
| results = task.result.get("results", []) |
| output = io.StringIO() |
| writer = csv.writer(output) |
| writer.writerow(["text", "predicted_emotion", "confidence", "used_model", "rules_applied"]) |
| for r in results: |
| writer.writerow([ |
| r['text'], |
| r['predicted_emotion'], |
| f"{r['confidence']:.4f}", |
| r['used_model'], |
| "|".join(r['rules_applied']) |
| ]) |
| output.seek(0) |
| return StreamingResponse( |
| iter([output.getvalue().encode('utf-8-sig')]), |
| media_type="text/csv", |
| headers={"Content-Disposition": f"attachment; filename=emotion_results_{task_id}.csv"} |
| ) |
|
|
| @app.get("/health") |
| async def health_check(): |
| return { |
| "status": "healthy" if model_loader.is_ready() else "loading", |
| "model_loaded": model_loader.is_ready(), |
| "loading": model_loader.loading, |
| "error": model_loader.error |
| } |
|
|
| @app.get("/metrics") |
| async def get_metrics(): |
| return JSONResponse(monitor.get_metrics()) |
|
|
| if __name__ == "__main__": |
| port = int(os.environ.get("PORT", 7860)) |
| uvicorn.run(app, host="0.0.0.0", port=port) |