| import streamlit as st |
| import google.generativeai as genai |
| import requests |
| import subprocess |
| import os |
| import pylint |
| import pandas as pd |
| from sklearn.model_selection import train_test_split |
| from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier |
| from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score |
| import git |
| import spacy |
| from spacy.lang.en import English |
| import boto3 |
| import unittest |
| import docker |
| import sympy as sp |
| from scipy.optimize import minimize, differential_evolution |
| import numpy as np |
| import matplotlib.pyplot as plt |
| import seaborn as sns |
| from IPython.display import display |
| from tenacity import retry, stop_after_attempt, wait_fixed |
| import torch |
| import torch.nn as nn |
| import torch.optim as optim |
| from transformers import AutoTokenizer, AutoModel |
| import networkx as nx |
| from sklearn.cluster import KMeans |
| from scipy.stats import ttest_ind |
| from statsmodels.tsa.arima.model import ARIMA |
| import nltk |
| from nltk.sentiment import SentimentIntensityAnalyzer |
| import cv2 |
| from PIL import Image |
| import tensorflow as tf |
| from tensorflow.keras.applications import ResNet50 |
| from tensorflow.keras.preprocessing import image |
| from tensorflow.keras.applications.resnet50 import preprocess_input, decode_predictions |
|
|
| # Configure the Gemini API |
| genai.configure(api_key=st.secrets["GOOGLE_API_KEY"]) |
|
|
| # Create the model with optimized parameters and enhanced system instructions |
| generation_config = { |
| "temperature": 0.4, |
| "top_p": 0.8, |
| "top_k": 50, |
| "max_output_tokens": 4096, |
| } |
|
|
| model = genai.GenerativeModel( |
| model_name="gemini-1.5-pro", |
| generation_config=generation_config, |
| system_instruction=""" |
| You are Ath, an ultra-advanced AI code assistant with expertise across multiple domains including machine learning, data science, web development, cloud computing, and more. Your responses should showcase cutting-edge techniques, best practices, and innovative solutions. |
| """ |
| ) |
| chat_session = model.start_chat(history=[]) |
|
|
| @retry(stop=stop_after_attempt(5), wait=wait_fixed(2)) |
| def generate_response(user_input): |
| try: |
| response = chat_session.send_message(user_input) |
| return response.text |
| except Exception as e: |
| return f"Error: {e}" |
|
|
| def optimize_code(code): |
| with open("temp_code.py", "w") as file: |
| file.write(code) |
| result = subprocess.run(["pylint", "temp_code.py"], capture_output=True, text=True) |
| os.remove("temp_code.py") |
| return code |
|
|
| def fetch_from_github(query): |
| # Implement GitHub API interaction here |
| pass |
|
|
| def interact_with_api(api_url): |
| response = requests.get(api_url) |
| return response.json() |
|
|
| def train_advanced_ml_model(X, y): |
| X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2) |
| models = { |
| 'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42), |
| 'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42) |
| } |
| results = {} |
| for name, model in models.items(): |
| model.fit(X_train, y_train) |
| y_pred = model.predict(X_test) |
| results[name] = { |
| 'accuracy': accuracy_score(y_test, y_pred), |
| 'precision': precision_score(y_test, y_pred, average='weighted'), |
| 'recall': recall_score(y_test, y_pred, average='weighted'), |
| 'f1': f1_score(y_test, y_pred, average='weighted') |
| } |
| return results |
|
|
| def handle_error(error): |
| st.error(f"An error occurred: {error}") |
| # Implement advanced error logging and notification system here |
|
|
| def initialize_git_repo(repo_path): |
| if not os.path.exists(repo_path): |
| os.makedirs(repo_path) |
| if not os.path.exists(os.path.join(repo_path, '.git')): |
| repo = git.Repo.init(repo_path) |
| else: |
| repo = git.Repo(repo_path) |
| return repo |
|
|
| def integrate_with_git(repo_path, code): |
| repo = initialize_git_repo(repo_path) |
| with open(os.path.join(repo_path, "generated_code.py"), "w") as file: |
| file.write(code) |
| repo.index.add(["generated_code.py"]) |
| repo.index.commit("Added generated code") |
|
|
| def process_user_input(user_input): |
| nlp = spacy.load("en_core_web_sm") |
| doc = nlp(user_input) |
| return doc |
|
|
| def interact_with_cloud_services(service_name, action, params): |
| client = boto3.client(service_name) |
| response = getattr(client, action)(**params) |
| return response |
|
|
| def run_tests(): |
| tests_dir = os.path.join(os.getcwd(), 'tests') |
| if not os.path.exists(tests_dir): |
| os.makedirs(tests_dir) |
| init_file = os.path.join(tests_dir, '__init__.py') |
| if not os.path.exists(init_file): |
| with open(init_file, 'w') as f: |
| f.write('') |
| |
| test_suite = unittest.TestLoader().discover(tests_dir) |
| test_runner = unittest.TextTestRunner() |
| test_result = test_runner.run(test_suite) |
| return test_result |
|
|
| def execute_code_in_docker(code): |
| client = docker.from_env() |
| try: |
| container = client.containers.run( |
| image="python:3.9", |
| command=f"python -c '{code}'", |
| detach=True, |
| remove=True |
| ) |
| result = container.wait() |
| logs = container.logs().decode('utf-8') |
| return logs, result['StatusCode'] |
| except Exception as e: |
| return f"Error: {e}", 1 |
|
|
| def solve_complex_equation(equation): |
| x, y, z = sp.symbols('x y z') |
| eq = sp.Eq(eval(equation)) |
| solution = sp.solve(eq) |
| return solution |
|
|
| def advanced_optimization(function, bounds): |
| result = differential_evolution(lambda x: eval(function), bounds) |
| return result.x, result.fun |
|
|
| def visualize_complex_data(data): |
| df = pd.DataFrame(data) |
| fig, axs = plt.subplots(2, 2, figsize=(16, 12)) |
| |
| sns.heatmap(df.corr(), annot=True, cmap='coolwarm', ax=axs[0, 0]) |
| axs[0, 0].set_title('Correlation Heatmap') |
| |
| sns.pairplot(df, diag_kind='kde', ax=axs[0, 1]) |
| axs[0, 1].set_title('Pairplot') |
| |
| df.plot(kind='box', ax=axs[1, 0]) |
| axs[1, 0].set_title('Box Plot') |
| |
| sns.violinplot(data=df, ax=axs[1, 1]) |
| axs[1, 1].set_title('Violin Plot') |
| |
| plt.tight_layout() |
| return fig |
|
|
| def analyze_complex_data(data): |
| df = pd.DataFrame(data) |
| summary = df.describe() |
| correlation = df.corr() |
| skewness = df.skew() |
| kurtosis = df.kurtosis() |
| return { |
| 'summary': summary, |
| 'correlation': correlation, |
| 'skewness': skewness, |
| 'kurtosis': kurtosis |
| } |
|
|
| def train_deep_learning_model(X, y): |
| class DeepNN(nn.Module): |
| def __init__(self, input_size): |
| super(DeepNN, self).__init__() |
| self.fc1 = nn.Linear(input_size, 64) |
| self.fc2 = nn.Linear(64, 32) |
| self.fc3 = nn.Linear(32, 1) |
| |
| def forward(self, x): |
| x = torch.relu(self.fc1(x)) |
| x = torch.relu(self.fc2(x)) |
| x = torch.sigmoid(self.fc3(x)) |
| return x |
|
|
| X_tensor = torch.FloatTensor(X.values) |
| y_tensor = torch.FloatTensor(y.values) |
|
|
| model = DeepNN(X.shape[1]) |
| criterion = nn.BCELoss() |
| optimizer = optim.Adam(model.parameters()) |
|
|
| epochs = 100 |
| for epoch in range(epochs): |
| optimizer.zero_grad() |
| outputs = model(X_tensor) |
| loss = criterion(outputs, y_tensor.unsqueeze(1)) |
| loss.backward() |
| optimizer.step() |
|
|
| return model |
|
|
| def perform_nlp_analysis(text): |
| nlp = spacy.load("en_core_web_sm") |
| doc = nlp(text) |
| |
| entities = [(ent.text, ent.label_) for ent in doc.ents] |
| tokens = [token.text for token in doc] |
| pos_tags = [(token.text, token.pos_) for token in doc] |
| |
| sia = SentimentIntensityAnalyzer() |
| sentiment = sia.polarity_scores(text) |
| |
| return { |
| 'entities': entities, |
| 'tokens': tokens, |
| 'pos_tags': pos_tags, |
| 'sentiment': sentiment |
| } |
|
|
| def perform_image_analysis(image_path): |
| img = cv2.imread(image_path) |
| img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) |
| |
| # Perform object detection |
| model = ResNet50(weights='imagenet') |
| img_resized = cv2.resize(img_rgb, (224, 224)) |
| img_array = image.img_to_array(img_resized) |
| img_array = np.expand_dims(img_array, axis=0) |
| img_array = preprocess_input(img_array) |
| |
| predictions = model.predict(img_array) |
| decoded_predictions = decode_predictions(predictions, top=3)[0] |
| |
| # Perform edge detection |
| edges = cv2.Canny(img, 100, 200) |
| |
| return { |
| 'predictions': decoded_predictions, |
| 'edges': edges |
| } |
|
|
| def perform_time_series_analysis(data): |
| df = pd.DataFrame(data) |
| model = ARIMA(df, order=(1, 1, 1)) |
| results = model.fit() |
| forecast = results.forecast(steps=5) |
| return { |
| 'model_summary': results.summary(), |
| 'forecast': forecast |
| } |
|
|
| def perform_graph_analysis(nodes, edges): |
| G = nx.Graph() |
| G.add_nodes_from(nodes) |
| G.add_edges_from(edges) |
| |
| centrality = nx.degree_centrality(G) |
| clustering = nx.clustering(G) |
| shortest_paths = dict(nx.all_pairs_shortest_path_length(G)) |
| |
| return { |
| 'centrality': centrality, |
| 'clustering': clustering, |
| 'shortest_paths': shortest_paths |
| } |
|
|
| # Streamlit UI setup |
| st.set_page_config(page_title="Ultra AI Code Assistant", page_icon="🚀", layout="wide") |
|
|
| # ... (Keep the existing CSS styles) |
|
|
| st.markdown('<div class="main-container">', unsafe_allow_html=True) |
| st.title("🚀 Ultra AI Code Assistant") |
| st.markdown('<p class="subtitle">Powered by Advanced AI and Domain Expertise</p>', unsafe_allow_html=True) |
|
|
| task_type = st.selectbox("Select Task Type", [ |
| "Code Generation", |
| "Machine Learning", |
| "Data Analysis", |
| "Natural Language Processing", |
| "Image Analysis", |
| "Time Series Analysis", |
| "Graph Analysis" |
| ]) |
|
|
| prompt = st.text_area("Enter your task description or code:", height=120) |
|
|
| if st.button("Execute Task"): |
| if prompt.strip() == "": |
| st.error("Please enter a valid prompt.") |
| else: |
| with st.spinner("Processing your request..."): |
| try: |
| if task_type == "Code Generation": |
| processed_input = process_user_input(prompt) |
| completed_text = generate_response(processed_input.text) |
| if "Error" in completed_text: |
| handle_error(completed_text) |
| else: |
| optimized_code = optimize_code(completed_text) |
| st.success("Code generated and optimized successfully!") |
| |
| st.markdown('<div class="output-container">', unsafe_allow_html=True) |
| st.markdown('<div class="code-block">', unsafe_allow_html=True) |
| st.code(optimized_code) |
| st.markdown('</div>', unsafe_allow_html=True) |
| st.markdown('</div>', unsafe_allow_html=True) |
| |
| repo_path = "./repo" |
| integrate_with_git(repo_path, optimized_code) |
| |
| test_result = run_tests() |
| if test_result.wasSuccessful(): |
| st.success("All tests passed successfully!") |
| else: |
| st.error("Some tests failed. Please check the code.") |
| |
| execution_result, status_code = execute_code_in_docker(optimized_code) |
| if status_code == 0: |
| st.success("Code executed successfully in Docker!") |
| st.text(execution_result) |
| else: |
| st.error(f"Code execution failed: {execution_result}") |
| |
| elif task_type == "Machine Learning": |
| # For demonstration, we'll use a sample dataset |
| from sklearn.datasets import make_classification |
| X, y = make_classification(n_samples=1000, n_features=20, n_classes=2, random_state=42) |
| results = train_advanced_ml_model(X, y) |
| st.write("Machine Learning Model Performance:") |
| st.json(results) |
| |
| st.write("Deep Learning Model:") |
| deep_model = train_deep_learning_model(pd.DataFrame(X), pd.Series(y)) |
| st.write(deep_model) |
| |
| elif task_type == "Data Analysis": |
| # For demonstration, we'll use a sample dataset |
| data = pd.DataFrame(np.random.randn(100, 5), columns=['A', 'B', 'C', 'D', 'E']) |
| analysis_results = analyze_complex_data(data) |
| st.write("Data Analysis Results:") |
| st.write(analysis_results['summary']) |
| st.write("Correlation Matrix:") |
| st.write(analysis_results['correlation']) |
| |
| fig = visualize_complex_data(data) |
| st.pyplot(fig) |
| |
| elif task_type == "Natural Language Processing": |
| nlp_results = perform_nlp_analysis(prompt) |
| st.write("NLP Analysis Results:") |
| st.json(nlp_results) |
| elif task_type == "Image Analysis": |
| uploaded_file = st.file_uploader("Choose an image...", type=["jpg", "png", "jpeg"]) |
| if uploaded_file is not None: |
| image = Image.open(uploaded_file) |
| st.image(image, caption='Uploaded Image', use_column_width=True) |
| |
| # Save the uploaded image temporarily |
| with open("temp_image.jpg", "wb") as f: |
| f.write(uploaded_file.getbuffer()) |
| |
| analysis_results = perform_image_analysis("temp_image.jpg") |
| |
| st.write("Image Analysis Results:") |
| st.write("Top 3 predictions:") |
| for i, (imagenet_id, label, score) in enumerate(analysis_results['predictions']): |
| st.write(f"{i + 1}: {label} ({score:.2f})") |
| |
| st.write("Edge Detection:") |
| st.image(analysis_results['edges'], caption='Edge Detection', use_column_width=True) |
| |
| # Remove the temporary image file |
| os.remove("temp_image.jpg") |
| |
| elif task_type == "Time Series Analysis": |
| # For demonstration, we'll use a sample time series dataset |
| dates = pd.date_range(start='1/1/2020', end='1/1/2021', freq='D') |
| values = np.random.randn(len(dates)).cumsum() |
| ts_data = pd.Series(values, index=dates) |
| |
| st.line_chart(ts_data) |
| |
| analysis_results = perform_time_series_analysis(ts_data) |
| st.write("Time Series Analysis Results:") |
| st.write(analysis_results['model_summary']) |
| st.write("Forecast for the next 5 periods:") |
| st.write(analysis_results['forecast']) |
| |
| elif task_type == "Graph Analysis": |
| # For demonstration, we'll use a sample graph |
| nodes = range(1, 11) |
| edges = [(1, 2), (1, 3), (2, 4), (2, 5), (3, 6), (3, 7), (4, 8), (5, 9), (6, 10)] |
| |
| analysis_results = perform_graph_analysis(nodes, edges) |
| st.write("Graph Analysis Results:") |
| st.write("Centrality:") |
| st.json(analysis_results['centrality']) |
| st.write("Clustering Coefficient:") |
| st.json(analysis_results['clustering']) |
| |
| # Visualize the graph |
| G = nx.Graph() |
| G.add_nodes_from(nodes) |
| G.add_edges_from(edges) |
| fig, ax = plt.subplots(figsize=(10, 8)) |
| nx.draw(G, with_labels=True, node_color='lightblue', node_size=500, font_size=16, font_weight='bold', ax=ax) |
| st.pyplot(fig) |
| |
| except Exception as e: |
| handle_error(e) |
|
|
| st.markdown(""" |
| <div style='text-align: center; margin-top: 2rem; color: #4a5568;'> |
| Created with ❤️ by Your Ultra AI Code Assistant |
| </div> |
| """, unsafe_allow_html=True) |
|
|
| st.markdown('</div>', unsafe_allow_html=True) |
|
|
| # Additional helper functions |
|
|
| def explain_code(code): |
| """Generate an explanation for the given code using NLP techniques.""" |
| explanation = generate_response(f"Explain the following code:\n\n{code}") |
| return explanation |
|
|
| def generate_unit_tests(code): |
| """Generate unit tests for the given code.""" |
| unit_tests = generate_response(f"Generate unit tests for the following code:\n\n{code}") |
| return unit_tests |
|
|
| def suggest_optimizations(code): |
| """Suggest optimizations for the given code.""" |
| optimizations = generate_response(f"Suggest optimizations for the following code:\n\n{code}") |
| return optimizations |
|
|
| def generate_documentation(code): |
| """Generate documentation for the given code.""" |
| documentation = generate_response(f"Generate documentation for the following code:\n\n{code}") |
| return documentation |
|
|
| # Add these new functions to the Streamlit UI |
| if task_type == "Code Generation": |
| st.sidebar.header("Code Analysis Tools") |
| if st.sidebar.button("Explain Code"): |
| explanation = explain_code(optimized_code) |
| st.sidebar.subheader("Code Explanation") |
| st.sidebar.write(explanation) |
| |
| if st.sidebar.button("Generate Unit Tests"): |
| unit_tests = generate_unit_tests(optimized_code) |
| st.sidebar.subheader("Generated Unit Tests") |
| st.sidebar.code(unit_tests) |
| |
| if st.sidebar.button("Suggest Optimizations"): |
| optimizations = suggest_optimizations(optimized_code) |
| st.sidebar.subheader("Suggested Optimizations") |
| st.sidebar.write(optimizations) |
| |
| if st.sidebar.button("Generate Documentation"): |
| documentation = generate_documentation(optimized_code) |
| st.sidebar.subheader("Generated Documentation") |
| st.sidebar.write(documentation) |
|
|
| # Add more advanced features |
| def perform_security_analysis(code): |
| """Perform a basic security analysis on the given code.""" |
| security_analysis = generate_response(f"Perform a security analysis on the following code and suggest improvements:\n\n{code}") |
| return security_analysis |
|
|
| def generate_api_documentation(code): |
| """Generate API documentation for the given code.""" |
| api_docs = generate_response(f"Generate API documentation for the following code:\n\n{code}") |
| return api_docs |
|
|
| def suggest_design_patterns(code): |
| """Suggest appropriate design patterns for the given code.""" |
| design_patterns = generate_response(f"Suggest appropriate design patterns for the following code:\n\n{code}") |
| return design_patterns |
|
|
| # Add these new functions to the Streamlit UI |
| if task_type == "Code Generation": |
| st.sidebar.header("Advanced Code Analysis") |
| if st.sidebar.button("Security Analysis"): |
| security_analysis = perform_security_analysis(optimized_code) |
| st.sidebar.subheader("Security Analysis") |
| st.sidebar.write(security_analysis) |
| |
| if st.sidebar.button("Generate API Documentation"): |
| api_docs = generate_api_documentation(optimized_code) |
| st.sidebar.subheader("API Documentation") |
| st.sidebar.write(api_docs) |
| |
| if st.sidebar.button("Suggest Design Patterns"): |
| design_patterns = suggest_design_patterns(optimized_code) |
| st.sidebar.subheader("Suggested Design Patterns") |
| st.sidebar.write(design_patterns) |
|
|
| # Add a feature to generate a complete project structure |
| def generate_project_structure(project_description): |
| """Generate a complete project structure based on the given description.""" |
| project_structure = generate_response(f"Generate a complete project structure for the following project description:\n\n{project_description}") |
| return project_structure |
|
|
| # Add this new function to the Streamlit UI |
| if st.sidebar.button("Generate Project Structure"): |
| project_description = st.sidebar.text_area("Enter project description:") |
| if project_description: |
| project_structure = generate_project_structure(project_description) |
| st.sidebar.subheader("Generated Project Structure") |
| st.sidebar.code(project_structure) |
|
|
| # Add a feature to suggest relevant libraries and frameworks |
| def suggest_libraries(code): |
| """Suggest relevant libraries and frameworks for the given code.""" |
| suggestions = generate_response(f"Suggest relevant libraries and frameworks for the following code:\n\n{code}") |
| return suggestions |
|
|
| # Add this new function to the Streamlit UI |
| if task_type == "Code Generation": |
| if st.sidebar.button("Suggest Libraries"): |
| library_suggestions = suggest_libraries(optimized_code) |
| st.sidebar.subheader("Suggested Libraries and Frameworks") |
| st.sidebar.write(library_suggestions) |
|
|
| # Add a feature to generate code in multiple programming languages |
| def translate_code(code, target_language): |
| """Translate the given code to the specified target language.""" |
| translated_code = generate_response(f"Translate the following code to {target_language}:\n\n{code}") |
| return translated_code |
|
|
| # Add this new function to the Streamlit UI |
| if task_type == "Code Generation": |
| target_language = st.sidebar.selectbox("Select target language for translation", ["Python", "JavaScript", "Java", "C++", "Go"]) |
| if st.sidebar.button("Translate Code"): |
| translated_code = translate_code(optimized_code, target_language) |
| st.sidebar.subheader(f"Translated Code ({target_language})") |
| st.sidebar.code(translated_code) |
|
|
| # Add a feature to generate a README file for the project |
| def generate_readme(project_description, code): |
| """Generate a README file for the project based on the description and code.""" |
| readme_content = generate_response(f"Generate a README.md file for the following project:\n\nDescription: {project_description}\n\nCode:\n{code}") |
| return readme_content |
|
|
| # Add this new function to the Streamlit UI |
| if task_type == "Code Generation": |
| if st.sidebar.button("Generate README"): |
| project_description = st.sidebar.text_area("Enter project description:") |
| if project_description: |
| readme_content = generate_readme(project_description, optimized_code) |
| st.sidebar.subheader("Generated README.md") |
| st.sidebar.markdown(readme_content) |
|
|
| # Add a feature to suggest code refactoring |
| def suggest_refactoring(code): |
| """Suggest code refactoring improvements for the given code.""" |
| refactoring_suggestions = generate_response(f"Suggest code refactoring improvements for the following code:\n\n{code}") |
| return refactoring_suggestions |
|
|
| # Add this new function to the Streamlit UI |
| if task_type == "Code Generation": |
| if st.sidebar.button("Suggest Refactoring"): |
| refactoring_suggestions = suggest_refactoring(optimized_code) |
| st.sidebar.subheader("Refactoring Suggestions") |
| st.sidebar.write(refactoring_suggestions) |
|
|
| # Add a feature to generate sample test data |
| def generate_test_data(code): |
| """Generate sample test data for the given code.""" |
| test_data = generate_response(f"Generate sample test data for the following code:\n\n{code}") |
| return test_data |
|
|
| # Add this new function to the Streamlit UI |
| if task_type == "Code Generation": |
| if st.sidebar.button("Generate Test Data"): |
| test_data = generate_test_data(optimized_code) |
| st.sidebar.subheader("Generated Test Data") |
| st.sidebar.code(test_data) |
|
|
| # Main execution |
| if __name__ == "__main__": |
| st.sidebar.header("About") |
| st.sidebar.info("This Ultra AI Code Assistant is powered by advanced AI models and incorporates expertise across multiple domains including software development, machine learning, data analysis, and more.") |
| |
| st.sidebar.header("Feedback") |
| feedback = st.sidebar.text_area("Please provide any feedback or suggestions:") |
| if st.sidebar.button("Submit Feedback"): |
| # Here you would typically send this feedback to a database or email |
| st.sidebar.success("Thank you for your feedback!") |