| import os |
| from dotenv import load_dotenv |
| import streamlit as st |
| import pandas as pd |
| import plotly.express as px |
| import cloudscraper |
| import warnings |
| import logging |
| |
| load_dotenv() |
| API_KEY = os.environ.get("API_KEY") |
| headers = { |
| "Authorization": f"Bearer {API_KEY}", |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " |
| "AppleWebKit/537.36 (KHTML, like Gecko) " |
| "Chrome/115.0.0.0 Safari/537.36" |
| } |
|
|
| url = "https://archeanvision.com/api/signals/available" |
|
|
| |
| scraper = cloudscraper.create_scraper() |
| response = scraper.get(url, headers=headers) |
|
|
| print(response.status_code) |
| print(response.text) |
|
|
| |
| load_dotenv() |
|
|
| |
| warnings.filterwarnings( |
| "ignore", |
| message="Please replace `st.experimental_get_query_params` with `st.query_params`" |
| ) |
| warnings.filterwarnings( |
| "ignore", |
| message="Please replace `st.experimental_set_query_params` with `st.query_params`" |
| ) |
| warnings.filterwarnings("ignore", category=DeprecationWarning) |
|
|
| |
| logging.getLogger("streamlit.deprecation").setLevel(logging.ERROR) |
| logging.getLogger("streamlit.runtime.scriptrunner").setLevel(logging.ERROR) |
|
|
|
|
| |
| |
| |
| st.set_page_config( |
| page_title="Dashboard Auto-Refresh", |
| layout="wide" |
| ) |
|
|
| REFRESH_INTERVAL = 260 |
| st.markdown(f"<meta http-equiv='refresh' content='{REFRESH_INTERVAL}'>", unsafe_allow_html=True) |
| |
|
|
| LOGO_IMAGE_URL = "https://cdn.discordapp.com/attachments/1276553391748812800/1374489683769163827/image.png?ex=682e3cc5&is=682ceb45&hm=ca258b6323ea40faafe307c00e48a3841450ff34b05de452e3a0fb544909615f&" |
| st.sidebar.image(LOGO_IMAGE_URL, use_container_width=True, caption="FrameWorx") |
|
|
| |
| if not API_KEY: |
| st.error("API_KEY is not set. Please add it to your environment (e.g. .env file or Hugging Face Secrets).") |
| st.stop() |
|
|
| |
|
|
| def get_active_markets_cloudscraper(api_key): |
| """Retrieves the list of active markets using cloudscraper to bypass Cloudflare.""" |
| headers = { |
| "Authorization": f"Bearer {api_key}", |
| "User-Agent": ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) " |
| "AppleWebKit/537.36 (KHTML, like Gecko) " |
| "Chrome/115.0.0.0 Safari/537.36") |
| } |
| url = "https://archeanvision.com/api/signals/available" |
| scraper = cloudscraper.create_scraper() |
| response = scraper.get(url, headers=headers) |
| response.raise_for_status() |
| return response.json() |
|
|
| def get_market_data_cloudscraper(api_key, market): |
| """Retrieves market data for the given market using cloudscraper.""" |
| headers = { |
| "Authorization": f"Bearer {api_key}", |
| "User-Agent": ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) " |
| "AppleWebKit/537.36 (KHTML, like Gecko) " |
| "Chrome/115.0.0.0 Safari/537.36") |
| } |
| |
| url = f"https://archeanvision.com/api/signals/{market}/data" |
| scraper = cloudscraper.create_scraper() |
| response = scraper.get(url, headers=headers) |
| response.raise_for_status() |
| return response.json() |
|
|
| def get_market_signals_cloudscraper(api_key, market): |
| """Retrieves market signals for the given market using cloudscraper.""" |
| headers = { |
| "Authorization": f"Bearer {api_key}", |
| "User-Agent": ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) " |
| "AppleWebKit/537.36 (KHTML, like Gecko) " |
| "Chrome/115.0.0.0 Safari/537.36") |
| } |
| url = f"https://archeanvision.com/api/signals/{market}/signals" |
| scraper = cloudscraper.create_scraper() |
| response = scraper.get(url, headers=headers) |
| response.raise_for_status() |
| return response.json() |
|
|
| |
|
|
| def get_selected_market(market_list): |
| """ |
| Retourne le marché sélectionné à partir des paramètres d'URL ou, par défaut, le premier élément. |
| Met à jour le paramètre de l'URL si l'utilisateur choisit un marché différent. |
| """ |
| |
| params = st.query_params |
|
|
| |
| default_market = params.get("market", market_list[0]) |
| |
| if isinstance(default_market, list): |
| default_market = default_market[0] |
|
|
| |
| default_index = market_list.index(default_market) if default_market in market_list else 0 |
|
|
| |
| selected = st.selectbox("Select a market:", market_list, index=default_index) |
|
|
| |
| if selected != default_market: |
| st.query_params.market = selected |
| |
|
|
| return selected |
|
|
|
|
| def main(): |
| st.title("Active AI Crypto Markets - frameWorxVision") |
|
|
| st.markdown(""" |
| ### What is frameWorxVision? |
| **frameWorx** is an autonomous multi-market trading agent. |
| It operates simultaneously on multiple crypto assets, monitoring price movements |
| in real time and delivering **data** as well as **signals** (BUY, SELL, etc.) |
| to automate and optimize decision-making. |
| - **AI Agent**: Continuously analyzes crypto markets. |
| - **Multi-Market**: Manages multiple assets at once. |
| - **Live Data**: Access to streaming data feeds (SSE). |
| - **Buy/Sell Signals**: Generated in real-time to seize market opportunities. |
| Below is a dashboard showcasing the active markets, their 24h data |
| (1,440 most recent data points), and their associated signals. |
| --- |
| **Join our Platform as a beta tester** to help improve the agent and the system. |
| - Official platform: [https://frameworx.site](https://frameworx.fun) |
| |
| """) |
|
|
| |
| try: |
| active_markets = get_active_markets_cloudscraper(API_KEY) |
| except Exception as e: |
| st.error(f"Error fetching active markets: {e}") |
| return |
|
|
| if not active_markets: |
| st.error("No active markets found through the API.") |
| return |
|
|
| |
| market_list = [] |
| if isinstance(active_markets, list): |
| for item in active_markets: |
| |
| if isinstance(item, dict) and "market" in item: |
| market_list.append(item["market"]) |
| elif isinstance(item, str): |
| market_list.append(item) |
| else: |
| st.warning(f"Item missing 'market' key: {item}") |
| else: |
| st.error("The structure of 'active_markets' is not a list as expected.") |
| return |
|
|
| if not market_list: |
| st.error("The market list is empty or 'market' keys not found.") |
| return |
|
|
| selected_market = get_selected_market(market_list) |
| if not selected_market: |
| st.error("No market selected.") |
| return |
|
|
| st.subheader(f"Selected Market: {selected_market}") |
| st.write(f"Fetching data for **{selected_market}** ...") |
|
|
| |
| try: |
| market_data = get_market_data_cloudscraper(API_KEY, selected_market) |
| except Exception as e: |
| st.error(f"Error fetching market data for {selected_market}: {e}") |
| return |
|
|
| if not market_data: |
| st.error(f"No data found for market {selected_market}.") |
| return |
|
|
| df = pd.DataFrame(market_data) |
| if "close_time" in df.columns: |
| df['close_time'] = pd.to_datetime(df['close_time'], unit='ms', errors='coerce') |
| else: |
| st.error("The 'close_time' column is missing from the retrieved data.") |
| return |
|
|
| st.write("### Market Data Overview") |
| st.dataframe(df.head()) |
|
|
| required_cols = {"close", "last_predict_15m", "last_predict_1h"} |
| if not required_cols.issubset(df.columns): |
| st.error( |
| f"The required columns {required_cols} are not all present. " |
| f"Available columns: {list(df.columns)}" |
| ) |
| return |
|
|
| fig = px.line( |
| df, |
| x='close_time', |
| y=['close', 'last_predict_15m', 'last_predict_1h'], |
| title=f"{selected_market} : Close Price & Predictions", |
| labels={ |
| 'close_time': 'Time', |
| 'value': 'Price', |
| 'variable': 'Metric' |
| } |
| ) |
| st.plotly_chart(fig, use_container_width=True) |
|
|
| st.write(f"### Signals for {selected_market}") |
| try: |
| signals = get_market_signals_cloudscraper(API_KEY, selected_market) |
| except Exception as e: |
| st.error(f"Error fetching signals for {selected_market}: {e}") |
| return |
|
|
| if not signals: |
| st.warning(f"No signals found for market {selected_market}.") |
| else: |
| df_signals = pd.DataFrame(signals) |
| if 'date' in df_signals.columns: |
| df_signals['date'] = pd.to_datetime(df_signals['date'], unit='s', errors='coerce') |
| for col in df_signals.columns: |
| if df_signals[col].apply(lambda x: isinstance(x, dict)).any(): |
| df_signals[col] = df_signals[col].apply(lambda x: str(x) if isinstance(x, dict) else x) |
| if 'date' in df_signals.columns: |
| df_signals = df_signals.sort_values('date', ascending=False) |
| st.write("Total number of signals:", len(df_signals)) |
| st.write("Preview of the last 4 signals:") |
| st.dataframe(df_signals.head(4)) |
|
|
| if __name__ == "__main__": |
| main() |
|
|
|
|