"""HTTP client for the Cube.js API of Indicateurs Territoriaux.""" import os from typing import Any import httpx from dotenv import load_dotenv load_dotenv() class CubeJsClientError(Exception): """Base exception for Cube.js client errors.""" pass class AuthenticationError(CubeJsClientError): """Raised when authentication fails (401).""" pass class BadRequestError(CubeJsClientError): """Raised when the request is malformed (400).""" pass class CubeJsClient: """HTTP client for the Cube.js REST API. This client handles authentication and provides methods to interact with the Indicateurs Territoriaux API endpoints. """ def __init__( self, base_url: str | None = None, token: str | None = None, timeout: float = 30.0, ): """Initialize the Cube.js client. Args: base_url: Base URL of the API. Defaults to env var INDICATEURS_TE_BASE_URL. token: JWT authentication token. Defaults to env var INDICATEURS_TE_TOKEN. timeout: Request timeout in seconds. """ self.base_url = ( base_url or os.getenv("INDICATEURS_TE_BASE_URL") or "https://api.indicateurs.ecologie.gouv.fr" ) self.token = token or os.getenv("INDICATEURS_TE_TOKEN") if not self.token: raise ValueError( "No API token provided. Set INDICATEURS_TE_TOKEN environment variable " "or pass token parameter." ) self.timeout = timeout self._client: httpx.AsyncClient | None = None @property def headers(self) -> dict[str, str]: """HTTP headers for API requests.""" return { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json", } async def _get_client(self) -> httpx.AsyncClient: """Get or create the async HTTP client.""" if self._client is None or self._client.is_closed: self._client = httpx.AsyncClient( base_url=self.base_url, headers=self.headers, timeout=self.timeout, ) return self._client async def close(self) -> None: """Close the HTTP client.""" if self._client is not None and not self._client.is_closed: await self._client.aclose() self._client = None async def _handle_response(self, response: httpx.Response) -> dict[str, Any]: """Handle API response and raise appropriate errors. Args: response: The HTTP response object. Returns: Parsed JSON response. Raises: AuthenticationError: If the token is invalid or expired (401). BadRequestError: If the request is malformed (400). CubeJsClientError: For other HTTP errors. """ if response.status_code == 401: raise AuthenticationError( "Authentication failed. Your API token may be invalid or expired. " "Please check your INDICATEURS_TE_TOKEN environment variable." ) if response.status_code == 400: try: error_detail = response.json() except Exception: error_detail = response.text raise BadRequestError( f"Bad request to API. Details: {error_detail}" ) if response.status_code >= 400: raise CubeJsClientError( f"API request failed with status {response.status_code}: {response.text}" ) return response.json() async def get_meta(self) -> dict[str, Any]: """Fetch the API schema metadata. Returns the complete schema including all cubes, their measures, dimensions, and available filters. Returns: Dict containing the API metadata with 'cubes' key. Raises: AuthenticationError: If authentication fails. CubeJsClientError: For other API errors. """ client = await self._get_client() response = await client.get("/cubejs-api/v1/meta") return await self._handle_response(response) async def load(self, query: dict[str, Any]) -> dict[str, Any]: """Execute a data query against the Cube.js API. Args: query: The Cube.js query object containing measures, dimensions, filters, and other query parameters. Returns: Dict containing the query results with 'data' key. Raises: AuthenticationError: If authentication fails. BadRequestError: If the query is malformed. CubeJsClientError: For other API errors. Example: >>> query = { ... "measures": ["indicateur_metadata.count"], ... "dimensions": ["indicateur_metadata.id", "indicateur_metadata.libelle"], ... "limit": 10 ... } >>> result = await client.load(query) """ client = await self._get_client() response = await client.post( "/cubejs-api/v1/load", json={"query": query}, ) return await self._handle_response(response) async def load_indicators_metadata( self, dimensions: list[str] | None = None, filters: list[dict[str, Any]] | None = None, limit: int = 500, ) -> list[dict[str, Any]]: """Load indicator metadata from the indicateur_metadata cube. Convenience method for querying the indicator metadata cube. Args: dimensions: List of dimensions to fetch. Defaults to basic info. filters: Optional list of filters to apply. limit: Maximum number of results. Returns: List of indicator metadata records. """ if dimensions is None: dimensions = [ "indicateur_metadata.id", "indicateur_metadata.libelle", "indicateur_metadata.unite", "indicateur_metadata.description", "indicateur_metadata.mailles_disponibles", "indicateur_metadata.thematique_fnv", "indicateur_metadata.annees_disponibles", ] query: dict[str, Any] = { "dimensions": dimensions, "limit": limit, } if filters: query["filters"] = filters result = await self.load(query) return result.get("data", []) async def load_sources_metadata( self, indicator_id: int | None = None, limit: int = 100, ) -> list[dict[str, Any]]: """Load source metadata from the indicateur_x_source_metadata cube. Args: indicator_id: Optional indicator ID to filter sources. limit: Maximum number of results. Returns: List of source metadata records. """ dimensions = [ "indicateur_x_source_metadata.id_indicateur", "indicateur_x_source_metadata.nom_source", "indicateur_x_source_metadata.libelle", "indicateur_x_source_metadata.description", "indicateur_x_source_metadata.producteur_source", "indicateur_x_source_metadata.distributeur_source", "indicateur_x_source_metadata.license_source", "indicateur_x_source_metadata.lien_page", "indicateur_x_source_metadata.date_derniere_extraction", ] query: dict[str, Any] = { "dimensions": dimensions, "limit": limit, } if indicator_id is not None: query["filters"] = [ { "member": "indicateur_x_source_metadata.id_indicateur", "operator": "equals", "values": [str(indicator_id)], } ] result = await self.load(query) return result.get("data", []) async def search_indicators_by_libelle( self, search_term: str, limit: int = 50, ) -> list[dict[str, Any]]: """Search indicators by keyword in libelle using contains filter. This uses Cube.js contains operator for server-side filtering. Note: Limited to single term, for multi-term use client-side filtering. Args: search_term: Term to search for in indicator libelle. limit: Maximum number of results. Returns: List of matching indicator metadata records. """ query: dict[str, Any] = { "dimensions": [ "indicateur_metadata.id", "indicateur_metadata.libelle", "indicateur_metadata.description", "indicateur_metadata.unite", "indicateur_metadata.mailles_disponibles", "indicateur_metadata.thematique_fnv", ], "filters": [ { "member": "indicateur_metadata.libelle", "operator": "contains", "values": [search_term], } ], "limit": limit, } result = await self.load(query) return result.get("data", []) # Singleton instance for the application _client_instance: CubeJsClient | None = None def get_client() -> CubeJsClient: """Get or create the singleton CubeJsClient instance. Returns: The shared CubeJsClient instance. """ global _client_instance if _client_instance is None: _client_instance = CubeJsClient() return _client_instance async def close_client() -> None: """Close the singleton client instance.""" global _client_instance if _client_instance is not None: await _client_instance.close() _client_instance = None