| import gradio as gr |
| import pandas as pd |
| import numpy as np |
| import matplotlib.pyplot as plt |
| import tensorflow as tf |
| from sklearn.preprocessing import MinMaxScaler |
| import os |
|
|
| |
| TIME_STEP = 100 |
| MODEL_PATH = 'stock_prediction_model.h5' |
|
|
| |
| def create_lstm_model(): |
| """Defines the Bidirectional LSTM model architecture used for training.""" |
| model = tf.keras.models.Sequential() |
| model.add(tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(128, return_sequences=True), input_shape=(TIME_STEP, 1))) |
| model.add(tf.keras.layers.Dropout(0.3)) |
| model.add(tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64, return_sequences=True))) |
| model.add(tf.keras.layers.Dropout(0.3)) |
| model.add(tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(32))) |
| model.add(tf.keras.layers.Dense(64, activation='relu')) |
| model.add(tf.keras.layers.Dense(1)) |
| model.compile(loss='mse', optimizer='adam') |
| return model |
|
|
| |
| try: |
| |
| model = tf.keras.models.load_model(MODEL_PATH) |
| print(f"Successfully loaded model from {MODEL_PATH}") |
| except Exception as e: |
| |
| |
| print(f"Warning: Could not load {MODEL_PATH}. Error: {e}") |
| print("Initializing a dummy model. Please ensure your 'stock_prediction_model.h5' is uploaded.") |
| model = create_lstm_model() |
| |
|
|
|
|
| |
| def forecast_stock(csv_file, days_to_predict=30): |
| """ |
| Takes an uploaded CSV file containing stock data, extracts 'Close' prices, |
| and forecasts the next 'days_to_predict' using the loaded LSTM model. |
| """ |
| if csv_file is None: |
| return None, "Error: Please upload a CSV file.", None |
|
|
| try: |
| |
| df = pd.read_csv(csv_file.name) |
| |
| |
| if 'Close' not in df.columns: |
| return None, "Error: CSV must contain a 'Close' price column.", None |
|
|
| |
| ds_close = df.reset_index()['Close'].values.reshape(-1, 1) |
| scaler = MinMaxScaler(feature_range=(0, 1)) |
| ds_close_scaled = scaler.fit_transform(ds_close) |
| |
| |
| if len(ds_close_scaled) < TIME_STEP: |
| return None, f"Error: Dataset must contain at least {TIME_STEP} entries for initial prediction.", None |
|
|
| |
| x_input = ds_close_scaled[-TIME_STEP:].reshape(1, -1) |
| temp_input = list(x_input[0]) |
|
|
| lst_output = [] |
| i = 0 |
| |
| |
| while i < days_to_predict: |
| if len(temp_input) > TIME_STEP: |
| |
| x_input = np.array(temp_input[1:]) |
| x_input = x_input.reshape(1, TIME_STEP, 1) |
| temp_input = temp_input[1:] |
| else: |
| x_input = np.array(temp_input).reshape(1, TIME_STEP, 1) |
|
|
| |
| yhat = model.predict(x_input, verbose=0) |
| |
| |
| temp_input.extend(yhat[0].tolist()) |
| lst_output.extend(yhat.tolist()) |
| i = i + 1 |
|
|
| |
| predicted_prices = scaler.inverse_transform(lst_output) |
| |
| |
| plt.figure(figsize=(10, 6)) |
| |
| |
| actual_prices = scaler.inverse_transform(ds_close_scaled) |
| day_actual = np.arange(len(actual_prices) - TIME_STEP, len(actual_prices)) |
| |
| plt.plot(day_actual, actual_prices[-TIME_STEP:], label='Last 100 Actual Days', color='blue') |
| |
| |
| day_pred = np.arange(TIME_STEP, TIME_STEP + days_to_predict) |
| plt.plot(day_pred, predicted_prices, label=f'Forecasted {days_to_predict} Days', color='red', linestyle='--') |
| |
| |
| plt.plot([day_actual[-1], day_pred[0]], [actual_prices[-1], predicted_prices[0]], color='red', linestyle='--') |
|
|
| plt.title('Stock Price Forecast (LSTM)') |
| plt.xlabel('Days') |
| plt.ylabel('Close Price') |
| plt.legend() |
| plt.grid(True) |
| plot_output = plt |
| |
| |
| |
| last_date = pd.to_datetime(df.iloc[-1]['Date']) if 'Date' in df.columns else pd.to_datetime(df.index[-1]) |
| |
| |
| future_dates = pd.date_range(start=last_date + pd.Timedelta(days=1), periods=days_to_predict) |
| |
| df_forecast = pd.DataFrame({ |
| 'Date': future_dates.strftime('%Y-%m-%d'), |
| 'Forecasted Price': np.round(predicted_prices.flatten(), 2) |
| }) |
|
|
| return plot_output, "Forecast successful!", df_forecast |
| |
| except Exception as e: |
| return None, f"An unexpected error occurred: {e}", None |
|
|
|
|
| |
|
|
| |
| input_csv = gr.File(label="1. Upload Historical Stock CSV (must include 'Date' and 'Close' columns)") |
| input_days = gr.Slider(minimum=10, maximum=180, value=30, step=1, label="2. Number of days to forecast") |
|
|
| |
| output_plot = gr.Plot(label="Price Forecast Visualization") |
| output_message = gr.Textbox(label="Status / Notes", value="Waiting for file upload...") |
| output_df = gr.Dataframe(label="Forecasted Prices Table") |
|
|
| |
| iface = gr.Interface( |
| fn=forecast_stock, |
| inputs=[input_csv, input_days], |
| outputs=[output_plot, output_message, output_df], |
| title="LSTM Stock Price Prediction", |
| description="Upload a CSV file of historical stock data and use the pre-trained Bidirectional LSTM model to forecast future closing prices. The model requires the latest 100 data points to make the initial forecast.", |
| allow_flagging='never' |
| ) |
|
|
| |
| if __name__ == "__main__": |
| iface.launch() |
|
|