sentimentanalyzer01's picture
Update app.py
457f0b5 verified
import os
import sys
import json
import pickle
import csv
import io
import uuid
import asyncio
import time
import logging
from collections import OrderedDict, defaultdict, Counter
from functools import wraps
from typing import Dict, List, Any, Optional, Tuple
import torch
import torch.nn as nn
import numpy as np
import re
import networkx as nx
import pymorphy3
import requests
import psutil
from fastapi import FastAPI, Request, Form, HTTPException, File, UploadFile, BackgroundTasks
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
import uvicorn
from transformers import BertTokenizer, BertModel
from sklearn.preprocessing import LabelEncoder
import warnings
warnings.filterwarnings('ignore')
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Используется устройство: {device}")
SMILEY_MAPPING = {
':)': 'смайлик_радость', ')': 'смайлик_радость', '🥰': 'смайлик_радость',
'😀': 'смайлик_радость', '😃': 'смайлик_радость', '😄': 'смайлик_радость',
'😁': 'смайлик_радость', '😘': 'смайлик_радость', '😍': 'смайлик_радость',
'😇': 'смайлик_радость', '😊': 'смайлик_радость',
':D': 'смайлик_смех', ';)': 'смайлик_подмигивание',
'❤️': 'смайлик_радость_любовь', '🩷': 'смайлик_радость_любовь',
'🧡': 'смайлик_радость_любовь', '💛': 'смайлик_радость_любовь',
'💚': 'смайлик_радость_любовь', '💙': 'смайлик_радость_любовь',
'🩵': 'смайлик_радость_любовь', '💜': 'смайлик_радость_любовь',
'👍': 'смайлик_радость_класс', '👌': 'смайлик_радость_ок',
':(': 'смайлик_грусть', '(': 'смайлик_грусть', '👎': 'смайлик_грусть',
'🥺': 'смайлик_грусть', '😞': 'смайлик_грусть', '🙁': 'смайлик_грусть',
'😭': 'смайлик_грусть_слезы', '🥲': 'смайлик_грусть', '☹️': 'смайлик_грусть',
'😔': 'смайлик_грусть_слезы', '😓': 'смайлик_грусть', '😢': 'смайлик_грусть_слезы',
'😡': 'смайлик_злость', '👿': 'смайлик_злость', '🤬': 'смайлик_злость', '😈': 'смайлик_злость',
'😠': 'смайлик_злость',
}
def clean_russian_text(text):
if not isinstance(text, str):
return ""
text = text.lower()
text = re.sub(r'http\S+|www\S+|https\S+', '', text)
text = re.sub(r'\S+@\S+', '', text)
# Замена смайликов с использованием глобального словаря
for smiley, replacement in SMILEY_MAPPING.items():
text = text.replace(smiley, f' {replacement} ')
# Удаление прочих символов, кроме букв, цифр, знаков препинания
text = re.sub(r'[^\w\sа-яё.,!?;:)(-]', ' ', text)
text = re.sub(r'\s+', ' ', text).strip()
return text
class LRUCache:
def __init__(self, maxsize=10000):
self.cache = OrderedDict()
self.maxsize = maxsize
def get(self, key):
if key not in self.cache:
return None
self.cache.move_to_end(key)
return self.cache[key]
def put(self, key, value):
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = value
if len(self.cache) > self.maxsize:
self.cache.popitem(last=False)
def size(self):
return len(self.cache)
class OntologyEmotionModel:
def __init__(self, emotions: List[str], train_texts: List[str] = None, train_labels: List[int] = None):
self.emotions = emotions
self.morph = pymorphy3.MorphAnalyzer()
self.ontology_graph = nx.DiGraph()
self.empirical_base = defaultdict(list)
self.hypotheses_db = {}
self.verified_hypotheses = defaultdict(list)
self.sentiment_lexicon = {}
self.rule_stats = {}
if train_texts is not None and train_labels is not None:
self._build_sentiment_lexicon(train_texts, train_labels)
self._load_rusentilex()
print(f"📊 Всего слов в лексиконе: {len(self.sentiment_lexicon)}")
fear_words = ['бояться', 'страшно', 'опасно', 'угроза', 'тревога', 'паника']
for w in fear_words:
lemma = self.morph.parse(w)[0].normal_form
self.sentiment_lexicon[lemma] = 'страх'
print(f"📊 Добавлено {len(fear_words)} слов для эмоции 'страх'")
explicit_sadness = ['грустно', 'печально', 'тоскливо', 'уныло', 'горестно', 'жалко', 'сожаление', 'обидно', 'печалька']
added_sad = 0
for w in explicit_sadness:
try:
lemma = self.morph.parse(w)[0].normal_form
self.sentiment_lexicon[lemma] = 'грусть'
added_sad += 1
except:
pass
print(f"📊 Добавлено {added_sad} явных слов для эмоции 'грусть'")
disappointment_words = [
'разочарован', 'ожидал', 'надеялся', 'обманулся', 'не оправдал',
'не впечатлило', 'слабо', 'посредственно', 'неудовлетворен'
]
added_dis = 0
for w in disappointment_words:
try:
lemma = self.morph.parse(w)[0].normal_form
self.sentiment_lexicon[lemma] = 'грусть'
added_dis += 1
except:
pass
print(f"📊 Добавлено {added_dis} слов для эмоции 'грусть' (разочарование)")
positive_colloquial = [
'спасибо', 'благодарю', 'благодарить', 'норм', 'нормально', 'ок', 'окей',
'класс', 'супер', 'здорово', 'прекрасно', 'отлично', 'ого', 'вау', 'круто',
'зачет', 'лады', 'добре', 'хорошо', 'неплохо', 'приемлемо', 'удовлетворительно',
'пойдет', 'в порядке', 'без проблем', 'лепота', 'зашибись', 'ого-го'
]
negative_colloquial = [
'плохо', 'ужасно', 'кошмар', 'отвратительно', 'неуд', 'брак', 'сломалось',
'не работает', 'фигня', 'ерунда', 'разочарование', 'недоволен', 'не доволен',
'жаль', 'обидно', 'печаль', 'тоска', 'неудачно', 'провал', 'не годен',
'неприемлемо', 'неудовлетворительно', 'нехорошо', 'не ок', 'не окей', 'не норм'
]
for w in positive_colloquial:
try:
lemma = self.morph.parse(w)[0].normal_form
if lemma not in self.sentiment_lexicon:
self.sentiment_lexicon[lemma] = 'радость'
except:
pass
for w in negative_colloquial:
try:
lemma = self.morph.parse(w)[0].normal_form
if lemma not in self.sentiment_lexicon:
self.sentiment_lexicon[lemma] = 'грусть'
except:
pass
smiley_texts = set(SMILEY_MAPPING.values())
for text in smiley_texts:
if 'радость' in text or 'смех' in text or 'подмигивание' in text or 'класс' in text or 'ок' in text or 'любовь' in text:
emotion = 'радость'
elif 'грусть' in text or 'слезы' in text:
emotion = 'грусть'
elif 'злость' in text:
emotion = 'злость'
else:
emotion = 'радость'
self.sentiment_lexicon[text] = emotion
print(f"📊 Добавлено {len(smiley_texts)} текстовых меток для распознавания смайликов")
self.init_ontology_level1()
self.init_ontology_level2()
def _build_sentiment_lexicon(self, texts: List[str], labels: List[int]):
word_class_counts = defaultdict(lambda: np.zeros(len(self.emotions)))
for text, label in zip(texts, labels):
words = set(clean_russian_text(text).split())
for word in words:
lemma = self.morph.parse(word)[0].normal_form
word_class_counts[lemma][label] += 1
for lemma, counts in word_class_counts.items():
prob = counts / (counts.sum() + 1e-10)
if prob.max() > 0.6 and counts.sum() > 5:
dominant_class = self.emotions[np.argmax(prob)]
self.sentiment_lexicon[lemma] = dominant_class
def _parse_rusentilex(self, content):
lines = content.splitlines()
added = 0
sample_line = None
for line in lines:
line = line.strip()
if line:
sample_line = line
break
if not sample_line:
return
parts = sample_line.split(',')
is_two_column = len(parts) == 2
for line in lines[1:]:
line = line.strip()
if not line:
continue
try:
if is_two_column:
term, tone_str = line.split(',')
term = term.strip().lower()
tone = int(tone_str)
lemma = self.morph.parse(term)[0].normal_form
if tone == 1:
self.sentiment_lexicon[lemma] = 'радость'
added += 1
elif tone == -1:
self.sentiment_lexicon[lemma] = 'грусть'
added += 1
else:
parts = line.split(',')
if len(parts) >= 3:
term = parts[0].strip().lower()
sentiment = parts[2].strip().lower()
lemma = self.morph.parse(term)[0].normal_form
if sentiment == 'positive':
self.sentiment_lexicon[lemma] = 'радость'
added += 1
elif sentiment == 'negative':
self.sentiment_lexicon[lemma] = 'грусть'
added += 1
except Exception:
continue
print(f" Добавлено слов из RuSentiLex: {added}")
def _load_rusentilex(self):
possible_paths = [
'model/rusentilex.csv',
'rusentilex.csv',
'/app/model/rusentilex.csv',
os.path.join(os.path.dirname(__file__), 'model', 'rusentilex.csv')
]
loaded = False
print("📂 Поиск RuSentiLex...")
for path in possible_paths:
if os.path.exists(path):
try:
with open(path, 'r', encoding='utf-8') as f:
content = f.read()
self._parse_rusentilex(content)
print(f"✅ RuSentiLex загружен из файла: {path}")
loaded = True
break
except Exception as e:
print(f"⚠️ Ошибка при загрузке {path}: {e}")
if not loaded:
print("⚠️ RuSentiLex не загружен. Используется только статистический лексикон.")
def init_ontology_level1(self):
self.emotion_definitions = {
'радость': {
'valence': 'positive', 'arousal': 'high',
'definition': 'Позитивное эмоциональное состояние',
'opposite': ['грусть', 'злость']
},
'грусть': {
'valence': 'negative', 'arousal': 'low',
'definition': 'Негативное эмоциональное состояние',
'opposite': ['радость']
},
'злость': {
'valence': 'negative', 'arousal': 'high',
'definition': 'Негативное эмоциональное состояние',
'opposite': ['радость']
},
'страх': {
'valence': 'negative', 'arousal': 'high',
'definition': 'Эмоциональная реакция на угрозу',
'opposite': ['уверенность', 'спокойствие']
},
'сарказм': {
'valence': 'negative', 'arousal': 'high',
'definition': 'Язвительная насмешка',
'opposite': ['радость']
}
}
for emotion in self.emotions:
if emotion in self.emotion_definitions:
self.ontology_graph.add_node(emotion, **self.emotion_definitions[emotion])
else:
self.ontology_graph.add_node(emotion, valence='neutral', arousal='neutral')
for emotion, data in self.emotion_definitions.items():
if 'opposite' in data:
for opposite in data['opposite']:
if opposite in self.emotions:
self.ontology_graph.add_edge(emotion, opposite, relation='opposite')
def init_ontology_level2(self):
self.linguistic_rules = {
'усилители': {
'words': ['очень', 'сильно', 'крайне', 'чрезвычайно', 'невероятно', 'абсолютно'],
'effect': 'increase_arousal',
'weight': 0.3,
'learnable': True
},
'ослабители': {
'words': ['слегка', 'немного', 'чуть-чуть', 'отчасти', 'несколько'],
'effect': 'decrease_arousal',
'weight': -0.2,
'learnable': True
},
'отрицания': {
'words': ['не', 'ни', 'нет', 'нельзя', 'невозможно'],
'effect': 'negation',
'weight': -0.5,
'learnable': True
},
'восклицания': {
'patterns': [r'!+', r'\?+'],
'effect': 'increase_arousal',
'weight': 0.4,
'learnable': True
},
'вопросительные': {
'patterns': [r'\?+'],
'effect': 'uncertainty',
'weight': 0.2,
'learnable': True
},
'сарказм_маркеры': {
'words': ['ага', 'да что вы', 'ну да', 'кто бы мог подумать'],
'effect': 'sarcasm',
'weight': 0.6,
'learnable': True
}
}
def add_empirical_knowledge(self, text: str, emotion: str, confidence: float):
self.empirical_base[emotion].append({'text': text, 'confidence': confidence})
if len(self.empirical_base[emotion]) > 1000:
self.empirical_base[emotion] = self.empirical_base[emotion][-1000:]
def formulate_hypothesis(self, text: str, model_prediction: Dict, rule_based_prediction: Dict) -> Dict:
hypothesis_id = f"hyp_{len(self.hypotheses_db) + 1:06d}"
hypothesis = {
'id': hypothesis_id, 'text': text,
'model_prediction': model_prediction,
'rule_based_prediction': rule_based_prediction,
'disagreement': self.calculate_disagreement(model_prediction, rule_based_prediction),
'status': 'pending'
}
self.hypotheses_db[hypothesis_id] = hypothesis
return hypothesis
def verify_hypothesis(self, hypothesis_id: str, actual_emotion: str = None) -> Dict:
if hypothesis_id not in self.hypotheses_db:
return None
hypothesis = self.hypotheses_db[hypothesis_id]
if actual_emotion:
model_correct = hypothesis['model_prediction']['emotion'] == actual_emotion
rule_correct = hypothesis['rule_based_prediction']['emotion'] == actual_emotion
if model_correct and not rule_correct:
hypothesis['status'] = 'model_superior'
elif rule_correct and not model_correct:
hypothesis['status'] = 'rule_superior'
elif model_correct and rule_correct:
hypothesis['status'] = 'both_correct'
else:
hypothesis['status'] = 'both_incorrect'
return hypothesis
def apply_linguistic_rules(self, text: str) -> Dict:
rules_applied = []
adjustments = {'valence': 0, 'arousal': 0, 'uncertainty': 0, 'sarcasm': 0}
words = text.lower().split()
parsed = [self.morph.parse(w)[0] for w in words]
lemmas = [p.normal_form for p in parsed]
pos_tags = [p.tag.POS for p in parsed]
for lemma in lemmas:
sentiment = self.sentiment_lexicon.get(lemma, 'neutral')
if sentiment == 'радость':
rules_applied.append(f"позитивное слово: {lemma}")
adjustments['valence'] += 0.2
elif sentiment in ('грусть', 'злость', 'страх'):
rules_applied.append(f"негативное слово: {lemma}")
adjustments['valence'] -= 0.2
for category, rule in self.linguistic_rules.items():
if 'words' in rule:
for word in rule['words']:
if word in lemmas:
rules_applied.append(f"{category}: {word}")
effect = rule['effect']
weight = rule['weight']
if effect == 'increase_arousal':
adjustments['arousal'] += weight
elif effect == 'decrease_arousal':
adjustments['arousal'] += weight
elif effect == 'negation':
adjustments['valence'] += weight
elif effect == 'sarcasm':
adjustments['sarcasm'] += weight
if 'patterns' in rule:
for pattern in rule['patterns']:
if re.search(pattern, text):
rules_applied.append(f"{category}: {pattern}")
weight = rule['weight']
if rule['effect'] == 'increase_arousal':
adjustments['arousal'] += weight
elif rule['effect'] == 'uncertainty':
adjustments['uncertainty'] += weight
if 'не' in lemmas:
idx = lemmas.index('не')
if idx + 1 < len(lemmas) and lemmas[idx+1] == 'очень':
adjustments['arousal'] -= 0.2
adjustments['valence'] -= 0.3
rules_applied.append("сочетание: не очень")
else:
for j in range(idx+1, min(idx+4, len(lemmas))):
if pos_tags[j] in ('ADJF', 'ADJS', 'ADVB'):
target_word = lemmas[j]
sentiment = self.sentiment_lexicon.get(target_word, 'neutral')
if sentiment in ('грусть', 'злость', 'страх'):
adjustments['valence'] += 1.0
rules_applied.append(f"инверсия негатива: не {target_word}")
elif sentiment == 'радость':
adjustments['valence'] -= 1.0
rules_applied.append(f"инверсия позитива: не {target_word}")
break
pos_words = [w for w in lemmas if self.sentiment_lexicon.get(w) == 'радость']
neg_words = [w for w in lemmas if self.sentiment_lexicon.get(w) in ('грусть', 'злость', 'страх')]
if pos_words and neg_words:
adjustments['sarcasm'] += 0.5
rules_applied.append(f"контраст тональности: позитив {pos_words[:2]} vs негатив {neg_words[:2]}")
sarcasm_phrases = ['ага', 'ну да', 'да что вы', 'кто бы мог подумать']
for phrase in sarcasm_phrases:
if phrase in text.lower():
adjustments['sarcasm'] += 0.6
rules_applied.append(f"саркастическая фраза: {phrase}")
if adjustments['sarcasm'] > 0.5:
rules_applied.append("обнаружен сарказм")
disappointment_verbs = ['ожидать', 'надеяться', 'думать']
disappointment_adjs = ['большой', 'лучший', 'хороший', 'много', 'высокий']
found_disappointment = False
for v in disappointment_verbs:
if v in lemmas:
for a in disappointment_adjs:
if a in lemmas:
adjustments['valence'] -= 0.8
rules_applied.append(f"разочарование: {v} {a}")
found_disappointment = True
break
if found_disappointment:
break
if 'оправдать' in lemmas and 'ожидание' in lemmas:
adjustments['valence'] -= 0.7
rules_applied.append("разочарование: не оправдал ожиданий")
return {'rules_applied': rules_applied, 'adjustments': adjustments, 'lemmas': lemmas}
def calculate_disagreement(self, pred1: Dict, pred2: Dict) -> float:
if pred1['emotion'] == pred2['emotion']:
return 0.0
emotions = list(self.emotion_definitions.keys())
idx1 = emotions.index(pred1['emotion']) if pred1['emotion'] in emotions else -1
idx2 = emotions.index(pred2['emotion']) if pred2['emotion'] in emotions else -1
if idx1 == -1 or idx2 == -1:
return 0.5
distance = abs(idx1 - idx2) / len(emotions)
return 0.7 * distance
def explain_transition(self, from_emotion: str, to_emotion: str) -> List[str]:
try:
return nx.shortest_path(self.ontology_graph, source=from_emotion, target=to_emotion)
except:
return []
def adjust_prediction_with_rules(self, prediction: Dict, rule_analysis: Dict) -> Dict:
original_emotion = prediction['emotion']
original_confidence = prediction['confidence']
adj = rule_analysis['adjustments']
rules = rule_analysis['rules_applied']
original_confidence_value = original_confidence
was_corrected = len(rules) > 0
conf_mult = 1.0 + adj['arousal'] * 0.2 + adj['uncertainty'] * 0.1 - abs(adj['valence']) * 0.1
conf_mult = np.clip(conf_mult, 0.5, 1.5)
new_confidence = original_confidence * conf_mult
new_emotion = original_emotion
has_negative = (any('негативное слово' in r for r in rules) or
any('разочарование' in r for r in rules) or
any('инверсия негатива' in r for r in rules) or
any('сочетание: не очень' in r for r in rules))
has_positive = any('позитивное слово' in r for r in rules) or any('радость' in r for r in rules)
fear_keywords = ['страшно', 'бояться', 'опасно', 'угроза', 'тревога', 'паника']
has_fear_word = any(any(kw in r for kw in fear_keywords) for r in rules)
if has_fear_word and original_emotion in ('грусть', 'злость', 'радость'):
new_emotion = 'страх'
new_confidence *= 0.9
rules.append("коррекция: обнаружены слова страха → страх")
# !!! ЗАМЕНА "нейтрально" НА "не определено" С УВЕРЕННОСТЬЮ 100% !!!
if not has_positive and not has_negative:
new_emotion = 'не определено'
new_confidence = 1.0 # 100% уверенность
rules.append("нет эмоциональных слов → не определено")
else:
if has_negative and not has_positive:
if original_emotion == 'радость':
new_emotion = 'грусть'
new_confidence *= 0.8
rules.append("коррекция: негативные слова без позитивных")
elif original_emotion == 'сарказм':
new_emotion = 'грусть'
new_confidence *= 0.9
elif has_positive and not has_negative and original_emotion in ('грусть', 'злость', 'страх'):
new_emotion = 'радость'
rules.append("коррекция: позитивные слова")
for rule in rules:
if rule.startswith("инверсия негатива:"):
new_emotion = 'радость'
break
elif rule.startswith("инверсия позитива:"):
if adj['arousal'] > 0.3:
new_emotion = 'злость'
else:
new_emotion = 'грусть'
break
sarcasm_flag = adj['sarcasm'] > 0.5
if sarcasm_flag:
new_emotion = 'сарказм'
new_confidence = min(new_confidence * 0.8, 0.9)
if "саркастическая фраза" in str(rules):
new_confidence = min(new_confidence * 1.1, 0.95)
if any('восклицание' in r for r in rules):
new_confidence = min(new_confidence * 1.2, 1.0)
if not was_corrected and original_confidence_value < 0.9:
new_confidence = min(new_confidence * 1.10, 1.0)
new_confidence = min(new_confidence, 1.0)
return {
'emotion': new_emotion,
'confidence': new_confidence,
'rules_applied': rules
}
def get_ontology_analysis(self, text: str, model_prediction: Dict) -> Dict:
rule_analysis = self.apply_linguistic_rules(text)
adjusted = self.adjust_prediction_with_rules(model_prediction, rule_analysis)
disagreement = self.calculate_disagreement(model_prediction, adjusted)
hypothesis = self.formulate_hypothesis(text, model_prediction, adjusted) if disagreement > 0.2 else None
return {
'rule_analysis': rule_analysis,
'adjusted_prediction': adjusted,
'disagreement': disagreement,
'hypothesis': hypothesis
}
def get_statistics(self) -> Dict:
return {
'ontology_nodes': len(self.ontology_graph.nodes),
'ontology_edges': len(self.ontology_graph.edges),
'linguistic_rules': len(self.linguistic_rules),
'emotions_covered': len(self.emotions),
'pending_hypotheses': len([h for h in self.hypotheses_db.values() if h['status'] == 'pending'])
}
class EmotionLSTM(nn.Module):
def __init__(self, vocab_size, embed_dim=128, hidden_dim=256, num_classes=3, dropout=0.3, num_layers=2):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers, batch_first=True, bidirectional=True, dropout=dropout)
self.dropout = nn.Dropout(dropout)
self.classifier = nn.Sequential(
nn.Linear(hidden_dim * 2, 128), nn.ReLU(), nn.Dropout(dropout),
nn.Linear(128, 64), nn.ReLU(), nn.Linear(64, num_classes)
)
def forward(self, x, return_confidence=False):
embedded = self.embedding(x)
lstm_out, (hidden, cell) = self.lstm(embedded)
lstm_last = torch.cat((hidden[-2, :, :], hidden[-1, :, :]), dim=1)
features = self.dropout(lstm_last)
logits = self.classifier(features)
if return_confidence:
probs = torch.softmax(logits, dim=1)
conf, _ = torch.max(probs, dim=1)
return logits, conf
return logits
class EmotionBERT(nn.Module):
def __init__(self, bert_model_name, num_classes, dropout=0.3):
super().__init__()
self.bert = BertModel.from_pretrained(bert_model_name)
hidden = self.bert.config.hidden_size
self.classifier = nn.Sequential(
nn.Dropout(dropout), nn.Linear(hidden, 256), nn.ReLU(),
nn.Dropout(dropout), nn.Linear(256, 128), nn.ReLU(),
nn.Linear(128, num_classes)
)
def forward(self, input_ids, attention_mask, return_confidence=False):
out = self.bert(input_ids, attention_mask, return_dict=True)
cls = out.last_hidden_state[:, 0, :]
logits = self.classifier(cls)
if return_confidence:
probs = torch.softmax(logits, dim=1)
conf, _ = torch.max(probs, dim=1)
return logits, conf
return logits
class CascadeEmotionClassifier:
def __init__(self, lstm_model, bert_model, vocab, tokenizer, label_encoder, ontology_model, threshold=0.95, device='cpu', max_length_lstm=100, max_length_bert=128, enable_cache=True, cache_maxsize=10000):
self.lstm_model = lstm_model
self.bert_model = bert_model
self.vocab = vocab
self.tokenizer = tokenizer
self.label_encoder = label_encoder
self.ontology_model = ontology_model
self.threshold = threshold
self.device = device
self.max_length_lstm = max_length_lstm
self.max_length_bert = max_length_bert
self.lstm_model.eval()
self.bert_model.eval()
self.lstm_model.to(device)
self.bert_model.to(device)
self.stats = {'total': 0, 'lstm': 0, 'bert': 0, 'corrections': 0}
# Кэш
self.enable_cache = enable_cache
self.cache = LRUCache(maxsize=cache_maxsize) if enable_cache else None
self.cache_hits = 0
self.cache_misses = 0
def text_to_sequence(self, text):
words = str(text).split()[:self.max_length_lstm]
sequence = [self.vocab.get(word, self.vocab.get('<UNK>', 1)) for word in words]
if len(sequence) < self.max_length_lstm:
sequence += [self.vocab.get('<PAD>', 0)] * (self.max_length_lstm - len(sequence))
return sequence[:self.max_length_lstm]
def predict(self, text):
if self.enable_cache:
text_clean = clean_russian_text(text)
cached = self.cache.get(text_clean)
if cached is not None:
self.cache_hits += 1
return cached
self.cache_misses += 1
result = self.predict_batch([text])[0]
if self.enable_cache:
self.cache.put(text_clean, result)
return result
def predict_batch(self, texts: List[str]) -> List[Dict]:
self.stats['total'] += len(texts)
texts_clean = [clean_russian_text(t) for t in texts]
valid_mask = [bool(re.search(r'[а-яё]', t)) for t in texts_clean]
valid_indices = [i for i, v in enumerate(valid_mask) if v]
results = [None] * len(texts)
if not valid_indices:
for i in range(len(texts)):
results[i] = {
'text': texts[i],
'predicted_emotion': 'не определено',
'confidence': 1.0,
'used_model': '',
'rules_applied': ['текст не содержит русских букв']
}
return results
lstm_inputs = [self.text_to_sequence(texts_clean[i]) for i in valid_indices]
lstm_inputs_tensor = torch.LongTensor(lstm_inputs).to(self.device)
with torch.no_grad():
lstm_logits, lstm_conf = self.lstm_model(lstm_inputs_tensor, return_confidence=True)
lstm_probs = torch.softmax(lstm_logits, dim=1)
lstm_preds = lstm_probs.argmax(dim=1).cpu().numpy()
lstm_confs = lstm_conf.cpu().numpy()
lstm_group = []
bert_group = []
for idx, conf in enumerate(lstm_confs):
if conf >= self.threshold:
lstm_group.append(idx)
else:
bert_group.append(idx)
for idx_in_batch in lstm_group:
orig_idx = valid_indices[idx_in_batch]
text_clean = texts_clean[orig_idx]
lstm_emo = self.label_encoder.inverse_transform([lstm_preds[idx_in_batch]])[0]
lstm_pred_dict = {
'emotion': lstm_emo,
'confidence': lstm_confs[idx_in_batch],
'probabilities': lstm_probs[idx_in_batch].cpu().numpy().tolist()
}
lstm_onto = self.ontology_model.get_ontology_analysis(text_clean, lstm_pred_dict)
final = lstm_onto['adjusted_prediction']
results[orig_idx] = {
'text': texts[orig_idx],
'predicted_emotion': final['emotion'],
'confidence': final['confidence'],
'used_model': "LSTM + онтология",
'rules_applied': lstm_onto['rule_analysis']['rules_applied']
}
self.stats['lstm'] += 1
if bert_group:
bert_indices_orig = [valid_indices[idx] for idx in bert_group]
bert_texts_clean = [texts_clean[i] for i in bert_indices_orig]
enc = self.tokenizer(bert_texts_clean, truncation=True, padding=True, max_length=self.max_length_bert, return_tensors='pt').to(self.device)
with torch.no_grad():
bert_logits, bert_conf = self.bert_model(enc['input_ids'], enc['attention_mask'], return_confidence=True)
bert_probs = torch.softmax(bert_logits, dim=1)
bert_preds = bert_probs.argmax(dim=1).cpu().numpy()
bert_confs = bert_conf.cpu().numpy()
for j, orig_idx in enumerate(bert_indices_orig):
text_clean = bert_texts_clean[j]
bert_emo = self.label_encoder.inverse_transform([bert_preds[j]])[0]
bert_pred_dict = {
'emotion': bert_emo,
'confidence': bert_confs[j],
'probabilities': bert_probs[j].cpu().numpy().tolist()
}
bert_onto = self.ontology_model.get_ontology_analysis(text_clean, bert_pred_dict)
final = bert_onto['adjusted_prediction']
results[orig_idx] = {
'text': texts[orig_idx],
'predicted_emotion': final['emotion'],
'confidence': final['confidence'],
'used_model': "BERT + онтология",
'rules_applied': bert_onto['rule_analysis']['rules_applied']
}
self.stats['bert'] += 1
for i in range(len(texts)):
if results[i] is None:
results[i] = {
'text': texts[i],
'predicted_emotion': 'не определено',
'confidence': 1.0,
'used_model': '',
'rules_applied': ['текст не содержит русских букв']
}
return results
def predict_batch_with_cache(self, texts):
if not self.enable_cache:
return self.predict_batch(texts)
results = [None] * len(texts)
texts_to_predict = []
indices_to_predict = []
for i, text in enumerate(texts):
text_clean = clean_russian_text(text)
cached = self.cache.get(text_clean)
if cached is not None:
results[i] = cached
self.cache_hits += 1
else:
texts_to_predict.append(text)
indices_to_predict.append(i)
if texts_to_predict:
batch_results = self.predict_batch(texts_to_predict)
for idx, orig_idx in enumerate(indices_to_predict):
result = batch_results[idx]
results[orig_idx] = result
self.cache.put(clean_russian_text(texts[orig_idx]), result)
self.cache_misses += 1
return results
def load_model():
print("Загрузка модели...")
model_dir = 'model'
with open(f'{model_dir}/model_info.json', 'r', encoding='utf-8') as f:
model_info = json.load(f)
with open(f'{model_dir}/vocab.json', 'r', encoding='utf-8') as f:
vocab = json.load(f)
print("📂 Создание label_encoder...")
label_encoder = LabelEncoder()
label_encoder.classes_ = np.array(model_info['classes'])
print(f"✅ label_encoder создан, классы: {list(label_encoder.classes_)}")
print("📂 Создание онтологии...")
ontology_model = OntologyEmotionModel(
emotions=list(label_encoder.classes_),
train_texts=None,
train_labels=None
)
print("✅ Онтология создана")
print("📂 Загрузка LSTM...")
lstm_model = EmotionLSTM(
vocab_size=len(vocab),
embed_dim=model_info.get('embed_dim', 300),
hidden_dim=256,
num_classes=model_info['num_classes'],
dropout=0.3,
num_layers=2
)
lstm_state = torch.load(f'{model_dir}/lstm_model.pth', map_location=device, weights_only=False)
lstm_model.load_state_dict(lstm_state)
print("✅ LSTM загружена")
print("📂 Загрузка BERT...")
bert_model = EmotionBERT(
bert_model_name=model_info['bert_model_name'],
num_classes=model_info['num_classes'],
dropout=0.3
)
bert_state = torch.load(f'{model_dir}/bert_model.pth', map_location=device, weights_only=False)
bert_model.load_state_dict(bert_state)
print("✅ BERT загружена")
print("📂 Загрузка токенизатора...")
try:
tokenizer = BertTokenizer.from_pretrained(model_dir)
print("✅ Токенизатор загружен из model_dir")
except Exception as e:
print(f"⚠️ Ошибка: {e}")
print("🔄 Загружаем токенизатор из Hugging Face...")
tokenizer = BertTokenizer.from_pretrained('DeepPavlov/rubert-base-cased')
print("✅ Токенизатор загружен из Hugging Face")
print("📂 Создание каскадного классификатора с кэшем...")
cascade = CascadeEmotionClassifier(
lstm_model=lstm_model,
bert_model=bert_model,
vocab=vocab,
tokenizer=tokenizer,
label_encoder=label_encoder,
ontology_model=ontology_model,
threshold=model_info.get('threshold', 0.95),
device=device,
max_length_lstm=model_info.get('max_length_lstm', 100),
max_length_bert=model_info.get('max_length_bert', 128),
enable_cache=True,
cache_maxsize=10000
)
print("✅ Модель успешно загружена!")
return cascade, model_info
class ModelLoader:
def __init__(self):
self.classifier = None
self.model_info = None
self.loading = True
self.error = None
self._task = None
async def load_async(self):
self.loading = True
self.error = None
try:
loop = asyncio.get_event_loop()
self.classifier, self.model_info = await loop.run_in_executor(None, load_model)
logger.info("Модели успешно загружены!")
except Exception as e:
logger.exception("Ошибка загрузки модели")
self.error = str(e)
finally:
self.loading = False
def is_ready(self):
return self.classifier is not None and not self.loading
class Monitor:
def __init__(self):
self.request_count = 0
self.total_prediction_time = 0.0
self.emotion_counter = defaultdict(int)
self.error_count = 0
self.start_time = time.time()
def record_request(self, duration_ms: float, emotion: str = None):
self.request_count += 1
self.total_prediction_time += duration_ms
if emotion:
self.emotion_counter[emotion] += 1
def record_error(self):
self.error_count += 1
def get_metrics(self):
avg_time_ms = (self.total_prediction_time / self.request_count) if self.request_count > 0 else 0
uptime = time.time() - self.start_time
cpu_percent = psutil.cpu_percent(interval=0.1)
memory = psutil.virtual_memory()
cache_stats = {}
if model_loader.classifier and model_loader.classifier.cache:
cache_stats = {
"hits": model_loader.classifier.cache_hits,
"misses": model_loader.classifier.cache_misses,
"size": model_loader.classifier.cache.size()
}
return {
"requests": {
"total": self.request_count,
"errors": self.error_count,
"avg_prediction_time_ms": round(avg_time_ms, 2)
},
"emotions": dict(self.emotion_counter),
"system": {
"cpu_percent": cpu_percent,
"memory_percent": memory.percent,
"memory_used_mb": memory.used // (1024*1024),
"uptime_seconds": round(uptime, 1)
},
"cache": cache_stats
}
class TaskStatus(BaseModel):
task_id: str
status: str # pending, processing, completed, failed
progress: int
total: int
processed: int
result: Optional[Dict] = None
error: Optional[str] = None
created_at: float
updated_at: float
task_storage = {}
async def process_file_async(task_id: str, file_content: bytes, filename: str, classifier):
task = task_storage[task_id]
task.status = "processing"
task.updated_at = time.time()
try:
try:
text_content = file_content.decode('utf-8')
except UnicodeDecodeError:
text_content = file_content.decode('cp1251')
reader = csv.reader(text_content.splitlines(), delimiter='|')
rows = list(reader)
if not rows:
raise ValueError("Файл пуст")
header = rows[0]
text_col_idx = None
for i, col in enumerate(header):
if col.strip().lower() == 'text':
text_col_idx = i
break
if text_col_idx is None:
text_col_idx = 0
texts = []
for row in rows[1:]:
if len(row) > text_col_idx:
texts.append(row[text_col_idx])
task.total = len(texts)
batch_size = 100
all_results = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i+batch_size]
batch_results = classifier.predict_batch_with_cache(batch)
all_results.extend(batch_results)
task.processed = min(i + batch_size, len(texts))
task.progress = int(task.processed / task.total * 100)
task.updated_at = time.time()
await asyncio.sleep(0.01)
emotion_counts = defaultdict(int)
examples = defaultdict(list)
for r in all_results:
emotion = r['predicted_emotion']
if emotion == 'не определено':
continue
emotion_counts[emotion] += 1
examples[emotion].append((r['confidence'], r['text']))
examples_top = {}
for emotion, lst in examples.items():
lst.sort(key=lambda x: x[0], reverse=True)
examples_top[emotion] = [{'text': t, 'confidence': f"{c*100:.1f}%"} for c, t in lst[:5]]
task.result = {
"emotion_counts": dict(emotion_counts),
"examples": examples_top,
"results": all_results
}
task.status = "completed"
task.progress = 100
logger.info(f"Task {task_id} completed")
except Exception as e:
logger.exception(f"Task {task_id} failed")
task.status = "failed"
task.error = str(e)
finally:
task.updated_at = time.time()
app = FastAPI(title="Emotion Analysis with BERT, Ontology, Cache and Async")
templates = Jinja2Templates(directory="templates")
model_loader = ModelLoader()
monitor = Monitor()
@app.on_event("startup")
async def startup_event():
asyncio.create_task(model_loader.load_async())
@app.middleware("http")
async def add_monitoring(request: Request, call_next):
start_time = time.time()
try:
response = await call_next(request)
duration_ms = (time.time() - start_time) * 1000
if request.url.path not in ("/metrics", "/health"):
monitor.record_request(duration_ms)
return response
except Exception as e:
monitor.record_error()
raise
@app.get("/", response_class=HTMLResponse)
async def home(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/predict")
async def predict(text: str = Form(...)):
if not model_loader.is_ready():
raise HTTPException(status_code=503, detail="Модель ещё загружается")
if not text or len(text.strip()) < 3:
return JSONResponse({"error": "Введите хотя бы 3 символа."}, status_code=400)
try:
start = time.time()
result = model_loader.classifier.predict(text)
duration = (time.time() - start) * 1000
monitor.record_request(duration, result['predicted_emotion'])
rules_display = []
for rule in result['rules_applied'][:10]:
if ':' in rule:
cat, val = rule.split(':', 1)
rules_display.append(f"<span class='rule-tag'>{cat}: {val}</span>")
else:
rules_display.append(f"<span class='rule-tag'>{rule}</span>")
return JSONResponse({
"success": True,
"emotion": result['predicted_emotion'],
"confidence": f"{result['confidence']*100:.1f}%",
"used_model": result['used_model'],
"rules": "".join(rules_display) if rules_display else "Нет правил"
})
except Exception as e:
monitor.record_error()
return JSONResponse({"error": str(e)}, status_code=500)
@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
if not model_loader.is_ready():
raise HTTPException(status_code=503, detail="Модель ещё загружается")
if not file.filename.endswith('.csv'):
return JSONResponse({"error": "Файл должен быть в формате CSV"}, status_code=400)
try:
content = await file.read()
try:
text_content = content.decode('utf-8')
except UnicodeDecodeError:
text_content = content.decode('cp1251')
reader = csv.reader(text_content.splitlines(), delimiter='|')
rows = list(reader)
if not rows:
return JSONResponse({"error": "Файл пуст"}, status_code=400)
header = rows[0]
text_col_idx = None
for i, col in enumerate(header):
if col.strip().lower() == 'text':
text_col_idx = i
break
if text_col_idx is None:
text_col_idx = 0
texts = []
for row in rows[1:]:
if len(row) > text_col_idx:
texts.append(row[text_col_idx])
if not texts:
return JSONResponse({"error": "В файле нет данных для анализа"}, status_code=400)
results = model_loader.classifier.predict_batch_with_cache(texts)
emotion_counts = defaultdict(int)
examples = defaultdict(list)
for r in results:
emotion = r['predicted_emotion']
if emotion == 'не определено':
continue
emotion_counts[emotion] += 1
examples[emotion].append((r['confidence'], r['text']))
examples_top = {}
for emotion, lst in examples.items():
lst.sort(key=lambda x: x[0], reverse=True)
examples_top[emotion] = [{'text': t, 'confidence': f"{c*100:.1f}%"} for c, t in lst[:5]]
session_id = str(uuid.uuid4())
task_storage[session_id] = TaskStatus(
task_id=session_id, status="completed", progress=100,
total=len(texts), processed=len(texts),
result={"results": results, "emotion_counts": emotion_counts, "examples": examples_top},
created_at=time.time(), updated_at=time.time()
)
return JSONResponse({
"success": True,
"emotion_counts": dict(emotion_counts),
"examples": examples_top,
"session_id": session_id
})
except Exception as e:
return JSONResponse({"error": f"Ошибка обработки файла: {str(e)}"}, status_code=500)
@app.post("/upload_async")
async def upload_file_async(file: UploadFile = File(...)):
if not model_loader.is_ready():
raise HTTPException(status_code=503, detail="Модель ещё загружается")
if not file.filename.endswith('.csv'):
raise HTTPException(status_code=400, detail="Файл должен быть в формате CSV")
content = await file.read()
task_id = str(uuid.uuid4())
task_status = TaskStatus(
task_id=task_id,
status="pending",
progress=0,
total=0,
processed=0,
created_at=time.time(),
updated_at=time.time()
)
task_storage[task_id] = task_status
asyncio.create_task(process_file_async(task_id, content, file.filename, model_loader.classifier))
return JSONResponse({
"task_id": task_id,
"status": "pending",
"message": "Файл принят в обработку. Используйте /status/{task_id} для отслеживания прогресса."
})
@app.get("/status/{task_id}")
async def get_task_status(task_id: str):
task = task_storage.get(task_id)
if not task:
raise HTTPException(status_code=404, detail="Задача не найдена")
return JSONResponse(task.dict())
@app.get("/result/{task_id}")
async def get_task_result(task_id: str):
task = task_storage.get(task_id)
if not task:
raise HTTPException(status_code=404, detail="Задача не найдена")
if task.status != "completed":
raise HTTPException(status_code=400, detail=f"Задача ещё не завершена (статус: {task.status})")
return JSONResponse({
"emotion_counts": task.result["emotion_counts"],
"examples": task.result["examples"],
"task_id": task_id
})
@app.get("/download/{session_id}")
async def download_results(session_id: str):
task = task_storage.get(session_id)
if not task or task.status != "completed":
raise HTTPException(status_code=404, detail="Результаты не найдены или задача не завершена")
results = task.result.get("results", [])
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["text", "predicted_emotion", "confidence", "used_model", "rules_applied"])
for r in results:
writer.writerow([
r['text'],
r['predicted_emotion'],
f"{r['confidence']:.4f}",
r['used_model'],
"|".join(r['rules_applied'])
])
output.seek(0)
return StreamingResponse(
iter([output.getvalue().encode('utf-8-sig')]),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=emotion_results_{session_id}.csv"}
)
@app.get("/download_async/{task_id}")
async def download_async_results(task_id: str):
task = task_storage.get(task_id)
if not task or task.status != "completed":
raise HTTPException(status_code=404, detail="Результаты не найдены или задача не завершена")
results = task.result.get("results", [])
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["text", "predicted_emotion", "confidence", "used_model", "rules_applied"])
for r in results:
writer.writerow([
r['text'],
r['predicted_emotion'],
f"{r['confidence']:.4f}",
r['used_model'],
"|".join(r['rules_applied'])
])
output.seek(0)
return StreamingResponse(
iter([output.getvalue().encode('utf-8-sig')]),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=emotion_results_{task_id}.csv"}
)
@app.get("/health")
async def health_check():
return {
"status": "healthy" if model_loader.is_ready() else "loading",
"model_loaded": model_loader.is_ready(),
"loading": model_loader.loading,
"error": model_loader.error
}
@app.get("/metrics")
async def get_metrics():
return JSONResponse(monitor.get_metrics())
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
uvicorn.run(app, host="0.0.0.0", port=port)