| import os |
| import json |
| import random |
| from sklearn.ensemble import IsolationForest |
| from sklearn.model_selection import train_test_split |
| from sklearn.preprocessing import OneHotEncoder |
| from sklearn.neural_network import MLPClassifier |
| from deap import base, creator, tools, algorithms |
| import torch |
| import torch.nn as nn |
| import torch.optim as optim |
| import torch.nn.functional as F |
| import datetime |
| import time |
| import threading |
| import logging |
| import multiprocessing |
| from collections import deque |
|
|
| |
| logging.basicConfig(filename='app.log', level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') |
|
|
| |
| class MemoryModel: |
| def __init__(self, memory_file='memory.json', max_memory=1000): |
| self.memory_file = memory_file |
| self.max_memory = max_memory |
| self.memory = self.load_memory() |
|
|
| def load_memory(self): |
| if os.path.exists(self.memory_file): |
| with open(self.memory_file, 'r') as file: |
| return json.load(file) |
| return [] |
|
|
| def save_memory(self): |
| with open(self.memory_file, 'w') as file: |
| json.dump(self.memory, file) |
|
|
| def add_entry(self, context, response, emotion_state, timestamp=None): |
| timestamp = timestamp or datetime.datetime.now().isoformat() |
| entry = { |
| 'timestamp': timestamp, |
| 'context': context, |
| 'response': response, |
| 'emotion_state': emotion_state |
| } |
| self.memory.append(entry) |
| if len(self.memory) > self.max_memory: |
| self.memory.pop(0) |
| self.save_memory() |
|
|
| def retrieve_memory(self, query, context_window=5): |
| relevant_entries = [entry for entry in self.memory if query.lower() in entry['context'].lower()] |
| if relevant_entries: |
| sorted_entries = sorted(relevant_entries, key=lambda x: x['timestamp'], reverse=True) |
| return sorted_entries[:context_window] |
| return None |
|
|
| |
| class TemporalAwareness: |
| def __init__(self, context_window=5): |
| self.start_time = datetime.datetime.now() |
| self.last_event_time = None |
| self.event_sequence = deque(maxlen=context_window) |
| self.context_window = context_window |
|
|
| def update_event_time(self, event): |
| current_time = datetime.datetime.now() |
| if self.last_event_time: |
| duration = (current_time - self.last_event_time).total_seconds() |
| self.event_sequence.append({ |
| 'event': event, |
| 'timestamp': current_time.isoformat(), |
| 'duration_since_last': duration |
| }) |
| else: |
| self.event_sequence.append({ |
| 'event': event, |
| 'timestamp': current_time.isoformat(), |
| 'duration_since_last': None |
| }) |
| self.last_event_time = current_time |
|
|
| def estimate_duration(self, event): |
| recent_events = list(self.event_sequence) |
| durations = [ |
| seq['duration_since_last'] for seq in recent_events if seq['event'] == event and seq['duration_since_last'] is not None |
| ] |
| return sum(durations) / len(durations) if durations else None |
|
|
| |
| class HRLNeuron(nn.Module): |
| def __init__(self, input_dim, output_dim): |
| super(HRLNeuron, self).__init__() |
| self.fc1 = nn.Linear(input_dim, 128) |
| self.fc2 = nn.Linear(128, output_dim) |
|
|
| def forward(self, x): |
| x = F.relu(self.fc1(x)) |
| x = self.fc2(x) |
| return x |
|
|
| class HRLAgent: |
| def __init__(self, input_dim, output_dim, lr=0.001): |
| self.model = HRLNeuron(input_dim, output_dim) |
| self.optimizer = optim.Adam(self.model.parameters(), lr=lr) |
| self.criterion = nn.MSELoss() |
|
|
| def act(self, state): |
| state = torch.FloatTensor(state) |
| q_values = self.model(state) |
| return q_values |
|
|
| def learn(self, state, action, reward, next_state, gamma=0.99): |
| state = torch.FloatTensor(state) |
| next_state = torch.FloatTensor(next_state) |
| reward = torch.FloatTensor([reward]) |
| action = torch.LongTensor([action]) |
|
|
| q_values = self.model(state) |
| next_q_values = self.model(next_state) |
| target_q_value = reward + gamma * torch.max(next_q_values) |
|
|
| loss = self.criterion(q_values[action], target_q_value) |
|
|
| self.optimizer.zero_grad() |
| loss.backward() |
| self.optimizer.step() |
|
|
| |
| data = { |
| 'context': [ |
| 'I am happy', 'I am sad', 'I am angry', 'I am excited', 'I am calm', |
| 'I am feeling joyful', 'I am grieving', 'I am feeling peaceful', 'I am frustrated', |
| 'I am determined', 'I feel resentment', 'I am feeling glorious', 'I am motivated', |
| 'I am surprised', 'I am fearful', 'I am trusting', 'I feel disgust', 'I am optimistic', |
| 'I am pessimistic', 'I feel bored', 'I am envious' |
| ], |
| 'emotion': [ |
| 'joy', 'sadness', 'anger', 'joy', 'calmness', 'joy', 'grief', 'calmness', 'anger', |
| 'determination', 'resentment', 'glory', 'motivation', 'surprise', 'fear', 'trust', |
| 'disgust', 'optimism', 'pessimism', 'boredom', 'envy' |
| ] |
| } |
| df = pd.DataFrame(data) |
|
|
| |
| encoder = OneHotEncoder(handle_unknown='ignore') |
| contexts_encoded = encoder.fit_transform(df[['context']]).toarray() |
|
|
| |
| emotions_target = df['emotion'].astype('category').cat.codes |
| emotion_classes = df['emotion'].astype('category').cat.categories |
|
|
| |
| X_train, X_test, y_train, y_test = train_test_split(contexts_encoded, emotions_target, test_size=0.2, random_state=42) |
| model = MLPClassifier(hidden_layer_sizes=(10, 10), max_iter=1000, random_state=42) |
| model.fit(X_train, y_train) |
|
|
| |
| historical_data = np.array([model.predict(contexts_encoded)]).T |
| isolation_forest = IsolationForest(contamination=0.1, random_state=42) |
| isolation_forest.fit(historical_data) |
|
|
| |
| emotions = { |
| 'joy': {'percentage': 10, 'motivation': 'positive'}, |
| 'pleasure': {'percentage': 10, 'motivation': 'selfish'}, |
| 'sadness': {'percentage': 10, 'motivation': 'negative'}, |
| 'grief': {'percentage': 10, 'motivation': 'negative'}, |
| 'anger': {'percentage': 10, 'motivation': 'traumatic or strong'}, |
| 'calmness': {'percentage': 10, 'motivation': 'neutral'}, |
| 'determination': {'percentage': 10, 'motivation': 'positive'}, |
| 'resentment': {'percentage': 10, 'motivation': 'negative'}, |
| 'glory': {'percentage': 10, 'motivation': 'positive'}, |
| 'motivation': {'percentage': 10, 'motivation': 'positive'}, |
| 'ideal_state': {'percentage': 100, 'motivation': 'balanced'}, |
| 'fear': {'percentage': 10, 'motivation': 'defensive'}, |
| 'surprise': {'percentage': 10, 'motivation': 'unexpected'}, |
| 'anticipation': {'percentage': 10, 'motivation': 'predictive'}, |
| 'trust': {'percentage': 10, 'motivation': 'reliable'}, |
| 'disgust': {'percentage': 10, 'motivation': 'repulsive'}, |
| 'optimism': {'percentage': 10, 'motivation': 'hopeful'}, |
| 'pessimism': {'percentage': 10, 'motivation': 'doubtful'}, |
| 'boredom': {'percentage': 10, 'motivation': 'indifferent'}, |
| 'envy': {'percentage': 10, 'motivation': 'jealous'} |
| } |
|
|
| |
| total_percentage = 200 |
| default_percentage = total_percentage / len(emotions) |
| for emotion in emotions: |
| emotions[emotion]['percentage'] = default_percentage |
|
|
| emotion_history_file = 'emotion_history.json' |
|
|
| |
| def load_historical_data(file_path=emotion_history_file): |
| if os.path.exists(file_path): |
| with open(file_path, 'r') as file: |
| return json.load(file) |
| return [] |
|
|
| |
| def save_historical_data(historical_data, file_path=emotion_history_file): |
| with open(file_path, 'w') as file: |
| json.dump(historical_data, file) |
|
|
| |
| emotion_history = load_historical_data() |
|
|
| |
| def update_emotion(emotion, percentage): |
| emotions['ideal_state']['percentage'] -= percentage |
| emotions[emotion]['percentage'] += percentage |
|
|
| |
| total_current = sum(e['percentage'] for e in emotions.values()) |
| adjustment = total_percentage - total_current |
| emotions['ideal_state']['percentage'] += adjustment |
|
|
| |
| def normalize_context(context): |
| return context.lower().strip() |
|
|
| |
| def evolve_emotions(): |
| def evaluate(individual): |
| ideal_state = individual[-1] |
| other_emotions = individual[:-1] |
| return abs(ideal_state - 100), sum(other_emotions) |
|
|
| creator.create("FitnessMin", base.Fitness, weights=(-1.0, -1.0)) |
| creator.create("Individual", list, fitness=creator.FitnessMin) |
|
|
| toolbox = base.Toolbox() |
| toolbox.register("attribute", lambda: random.uniform(0, 20)) |
| toolbox.register("individual", tools.initCycle, creator.Individual, toolbox.attribute, n=(len(emotions) - 1)) |
| toolbox.register("ideal_state", lambda: random.uniform(80, 120)) |
| toolbox.register("complete_individual", tools.initConcat, creator.Individual, toolbox.individual, toolbox.ideal_state) |
| toolbox.register("population", tools.initRepeat, list, toolbox.complete_individual) |
|
|
| toolbox.register("evaluate", evaluate) |
| toolbox.register("mate", tools.cxTwoPoint) |
| toolbox.register("mutate", tools.mutGaussian, mu=0, sigma=1, indpb=0.2) |
| toolbox.register("select", tools.selTournament, tournsize=3) |
|
|
| population = toolbox.population(n=100) |
|
|
| for gen in range(100): |
| offspring = algorithms.varAnd(population, toolbox, cxpb=0.5, mutpb=0.2) |
| fits = toolbox.map(toolbox.evaluate, offspring) |
| for fit, ind in zip(fits, offspring): |
| ind.fitness.values = fit |
| population = toolbox.select(offspring, k=len(population)) |
|
|
| if gen % 20 == 0: |
| toolbox.register("mate", tools.cxBlend, alpha=random.uniform(0.1, 0.9)) |
| toolbox.register("mutate", tools.mutPolynomialBounded, eta=random.uniform(0.5, 1.5), low=0, up=20, indpb=0.2) |
|
|
| best_ind = tools.selBest(population, k=1)[0] |
| return best_ind[:-1], best_ind[-1] |
|
|
| |
| def evolve_language_model(): |
| def evaluate_language(individual): |
| return random.random(), |
|
|
| creator.create("FitnessMax", base.Fitness, weights=(1.0,)) |
| creator.create("LanguageIndividual", list, fitness=creator.FitnessMax) |
|
|
| toolbox = base.Toolbox() |
| toolbox.register("language_gene", lambda: random.randint(0, 1)) |
| toolbox.register("language_individual", tools.initRepeat, creator.LanguageIndividual, toolbox.language_gene, n=100) |
| toolbox.register("language_population", tools.initRepeat, list, toolbox.language_individual) |
|
|
| toolbox.register("evaluate", evaluate_language) |
| toolbox.register("mate", tools.cxTwoPoint) |
| toolbox.register("mutate", tools.mutFlipBit, indpb=0.05) |
| toolbox.register("select", tools.selTournament, tournsize=3) |
|
|
| population = toolbox.language_population(n=50) |
|
|
| for gen in range(100): |
| offspring = algorithms.varAnd(population, toolbox, cxpb=0.5, mutpb=0.1) |
| fits = toolbox.map(toolbox.evaluate, offspring) |
| for fit, ind in zip(fits, offspring): |
| ind.fitness.values = fit |
| population = toolbox.select(offspring, k=len(population)) |
|
|
| best_language_model = tools.selBest(population, k=1)[0] |
| return best_language_model |
|
|
| def evolve_emotion_recognition(): |
| def evaluate_emotion_recognition(individual): |
| return random.random(), |
|
|
| creator.create("FitnessMax", base.Fitness, weights=(1.0,)) |
| creator.create("EmotionRecognitionIndividual", list, fitness=creator.FitnessMax) |
|
|
| toolbox = base.Toolbox() |
| toolbox.register("emotion_gene", lambda: random.randint(0, 1)) |
| toolbox.register("emotion_individual", tools.initRepeat, creator.EmotionRecognitionIndividual, toolbox.emotion_gene, n=100) |
| toolbox.register("emotion_population", tools.initRepeat, list, toolbox.emotion_individual) |
|
|
| toolbox.register("evaluate", evaluate_emotion_recognition) |
| toolbox.register("mate", tools.cxTwoPoint) |
| toolbox.register("mutate", tools.mutFlipBit, indpb=0.05) |
| toolbox.register("select", tools.selTournament, tournsize=3) |
|
|
| population = toolbox.emotion_population(n=50) |
|
|
| for gen in range(100): |
| offspring = algorithms.varAnd(population, toolbox, cxpb=0.5, mutpb=0.1) |
| fits = toolbox.map(toolbox.evaluate, offspring) |
| for fit, ind in zip(fits, offspring): |
| ind.fitness.values = fit |
| population = toolbox.select(offspring, k=len(population)) |
|
|
| best_emotion_recognition = tools.selBest(population, k=1)[0] |
| return best_emotion_recognition |
|
|
| |
|
|
| DNA_LENGTH = 10 |
| POPULATION_SIZE = 50 |
| GENERATIONS = 100 |
| NUM_ALGORITHMS = 3 |
|
|
| |
| def generate_random_dna(): |
| return [random.uniform(0, 1) for _ in range(DNA_LENGTH)] |
|
|
| |
| populations = [[generate_random_dna() for _ in range(POPULATION_SIZE)] for _ in range(NUM_ALGORITHMS)] |
|
|
| |
| def fitness_function_1(dna): |
| return sum(dna) |
|
|
| def fitness_function_2(dna): |
| return np.prod(dna) |
|
|
| def fitness_function_3(dna): |
| return np.mean(dna) |
|
|
| fitness_functions = [fitness_function_1, fitness_function_2, fitness_function_3] |
|
|
| |
| def tournament_selection(population, fitness_fn): |
| tournament_size = 5 |
| selected = random.sample(population, tournament_size) |
| selected.sort(key=fitness_fn, reverse=True) |
| return selected[0] |
|
|
| def crossover(parent1, parent2): |
| point = random.randint(0, DNA_LENGTH - 1) |
| child1 = parent1[:point] + parent2[point:] |
| child2 = parent2[:point] + parent1[point:] |
| return child1, child2 |
|
|
| def mutate(dna, mutation_rate=0.01): |
| return [gene if random.random() > mutation_rate else random.uniform(0, 1) for gene in dna] |
|
|
| def evolve(population, fitness_fn, generations=GENERATIONS): |
| for _ in range(generations): |
| new_population = [] |
| for _ in range(POPULATION_SIZE // 2): |
| parent1 = tournament_selection(population, fitness_fn) |
| parent2 = tournament_selection(population, fitness_fn) |
| child1, child2 = crossover(parent1, parent2) |
| new_population.append(mutate(child1)) |
| new_population.append(mutate(child2)) |
| population = sorted(new_population, key=fitness_fn, reverse=True)[:POPULATION_SIZE] |
| return population |
|
|
| |
| for i in range(NUM_ALGORITHMS): |
| populations[i] = evolve(populations[i], fitness_functions[i]) |
|
|
| |
| def create_hybrid_population(populations, num_best=10): |
| hybrid_population = [] |
| for pop in populations: |
| hybrid_population.extend(sorted(pop, key=lambda dna: sum([fn(dna) for fn in fitness_functions]), reverse=True)[:num_best]) |
| return hybrid_population |
|
|
| hybrid_population = create_hybrid_population(populations) |
|
|
| |
| def evolve_fitness_criteria(hybrid_population): |
| average_gene = np.mean([np.mean(dna) for dna in hybrid_population]) |
| if average_gene > 0.5: |
| return lambda dna: sum(dna) * 1.1 |
| else: |
| return lambda dna: sum(dna) * 0.9 |
|
|
| |
| new_fitness_fn = evolve_fitness_criteria(hybrid_population) |
| fitness_functions = [new_fitness_fn] * NUM_ALGORITHMS |
|
|
| |
| hybrid_population = evolve(hybrid_population, new_fitness_fn) |
|
|
| |
| logging.info("Initial populations evolved independently.") |
| logging.info("Hybrid population created and evolved with new fitness criteria.") |
|
|