| """
|
| This file contains a Copernicus Data Space Ecosystem data extraction class
|
| for downloading satellite data
|
| """
|
|
|
|
|
| import os
|
| import json
|
| import yaml
|
| import inspect
|
| import shutil
|
| import re
|
| import requests
|
| from datetime import datetime, timedelta
|
| from typing import List, Optional, Tuple
|
| from oauthlib.oauth2 import BackendApplicationClient
|
| from requests_oauthlib import OAuth2Session
|
| from sentinelhub import bbox_to_dimensions, BBox
|
| from io import BytesIO
|
| import rasterio
|
| from PIL import Image
|
| import numpy as np
|
|
|
| class CopernicusDataExtractor:
|
| """
|
| A class for extracting satellite data from Copernicus Data Space Ecosystem.
|
|
|
| This class uses the Copernicus SentinelHub Process API with OAuth2 authentication
|
| to download processed satellite data with custom evalscripts.
|
|
|
| Attributes:
|
| parameters (dict): User input configurations
|
| oauth_session (OAuth2Session): Authenticated OAuth2 session
|
| consortium (str): the consortium for which we are downloading the data
|
| timespan (list): Start and end date for data request
|
| bbox (list): Region of Interest coordinates [min_lon, min_lat, max_lon, max_lat]
|
| image_dimensions (tuple): Width and height in pixels. To keep previous imagery (requested using SH last year, we use SH bbox to dimensions method)
|
| output_folder (str): dir location where to save retrieved data
|
| datetimes (list): List of timestamps when satellite scanned ROI
|
| evalscript (str): JavaScript evaluation script for processing
|
| response_type (str): Type of output (rgb_nir, vi_values, s1_vv)
|
| obtained_data (list): List of bands/indices in output
|
|
|
| Methods:
|
| __authenticate_copernicus: Authenticate with Copernicus OAuth2
|
| _calculate_dimensions: Calculate image dimensions from bbox and resolution
|
| _get_timestamps: Get available acquisition timestamps using Catalog API
|
| _build_process_request: Build Process API request payload
|
| _download_single_acquisition: Download data for one timestamp
|
| data_request: Main method to download all data
|
| set_evalscript: Change evalscript type
|
| """
|
|
|
| def __init__(self, consortium = 'consortium0', evalscript='default_evalscript.js', crs = 'EPSG:4326'):
|
| """
|
| Initialize the Copernicus Data Extractor.
|
|
|
| Args:
|
| evalscript (str): Name of the evalscript file to use for processing
|
| """
|
|
|
| with open('config/pre_anonym_params.yml', 'r') as f:
|
| self.parameters = yaml.safe_load(f)
|
|
|
|
|
| self.oauth_session = self.__authenticate_copernicus()
|
|
|
| self.consortium = consortium.lower()
|
|
|
| self.timespan = [self.parameters['start_date'], self.parameters['end_date']]
|
| self.bbox = self.parameters['bbox'][self.consortium]
|
|
|
|
|
| self.image_dimensions = bbox_to_dimensions(BBox(self.bbox, crs=crs), resolution=self.parameters['resolution'])
|
|
|
|
|
| self.output_folder = os.path.join( os.getcwd() ,
|
| "data", "01_raw",
|
| self.parameters['consortia_data_folders'][self.consortium],
|
| "satellite_data"
|
| )
|
|
|
| self.datetimes = []
|
|
|
|
|
| self.set_evalscript(evalscript)
|
|
|
| def __authenticate_copernicus(self) -> OAuth2Session:
|
| """
|
| Authenticate with Copernicus Data Space Ecosystem using OAuth2.
|
|
|
| Returns:
|
| OAuth2Session: Authenticated session with automatic token handling
|
|
|
| Raises:
|
| RuntimeError: If authentication fails
|
| """
|
|
|
| with open('config/copernicus_oauth_config.json', 'r') as f:
|
| oauth_credentials = json.load(f)
|
|
|
| client_id = oauth_credentials['client_id']
|
| client_secret = oauth_credentials['client_secret']
|
|
|
| try:
|
|
|
| client = BackendApplicationClient(client_id=client_id)
|
| oauth = OAuth2Session(client=client)
|
|
|
|
|
| token_url = 'https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/token'
|
| token = oauth.fetch_token(
|
| token_url=token_url,
|
| client_secret=client_secret,
|
| include_client_id=True
|
| )
|
|
|
| print('✓ Successfully authenticated with Copernicus Data Space Ecosystem.')
|
| print(f'✓ Process API endpoint: https://sh.dataspace.copernicus.eu')
|
|
|
| return oauth
|
|
|
| except Exception as e:
|
| raise RuntimeError(f"Copernicus authentication failed: {e}")
|
|
|
|
|
| def _calculate_dimensions(self) -> Tuple[int, int]:
|
| """
|
| Calculate image dimensions from bbox and resolution.
|
|
|
| Returns:
|
| tuple: (width, height) in pixels
|
| """
|
| from math import cos, radians
|
|
|
| resolution = self.parameters.get('resolution', 10)
|
|
|
|
|
| min_lon, min_lat, max_lon, max_lat = self.bbox
|
|
|
|
|
| center_lat = (min_lat + max_lat) / 2
|
| width_m = (max_lon - min_lon) * 111320 * cos(radians(center_lat))
|
| height_m = (max_lat - min_lat) * 110540
|
|
|
|
|
| width_px = int(width_m / resolution)
|
| height_px = int(height_m / resolution)
|
|
|
| return (width_px, height_px)
|
|
|
| def _get_timestamps(self, timespan: Optional[List[str]] = None) -> List[str]:
|
| """
|
| Get available satellite acquisition timestamps using STAC Catalog API.
|
|
|
| Args:
|
| timespan (list, optional): [start_date, end_date] in 'YYYY-MM-DD' format
|
|
|
| Returns:
|
| list: List of datetime strings
|
| """
|
| if timespan:
|
| if isinstance(timespan, list) and len(timespan) == 2:
|
| self.timespan = timespan
|
| print(f'New timespan: {timespan[0]} to {timespan[1]}')
|
| else:
|
| timespan = self.timespan
|
|
|
|
|
|
|
|
|
|
|
| catalog_url = "https://sh.dataspace.copernicus.eu/api/v1/catalog/1.0.0/search"
|
|
|
| cloud_cover_max = 100
|
| if 'cloud_cover' in self.parameters:
|
| try:
|
| cloud_str = self.parameters['cloud_cover']
|
| cloud_cover_max = int(re.search(r'(\d+)', cloud_str).group(1))
|
| except:
|
| pass
|
|
|
| stac_collection = self.parameters['collection']
|
|
|
|
|
|
|
| stac_request = {
|
| "collections": [stac_collection],
|
| "bbox": self.bbox,
|
| "datetime": f"{self.timespan[0]}T00:00:00Z/{self.timespan[1]}T23:59:59Z",
|
|
|
|
|
|
|
|
|
| "limit": 100
|
| }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| if 'sentinel1' not in self.parameters['collection'].lower():
|
| stac_request["filter"] = {
|
| "op": "<",
|
| "args": [
|
| {"property": "eo:cloud_cover"},
|
| cloud_cover_max
|
| ]
|
| }
|
| stac_request["filter-lang"] = "cql2-json"
|
|
|
| try:
|
| headers = {
|
| "Authorization": f"Bearer {self.oauth_session.token['access_token']}"
|
| }
|
|
|
| all_features = []
|
| next_token = None
|
|
|
| while True:
|
| req_payload = stac_request.copy()
|
| if next_token is not None:
|
| req_payload["next"] = next_token
|
|
|
| response = requests.post(catalog_url, json=req_payload, timeout=30, headers=headers)
|
| response.raise_for_status()
|
| results = response.json()
|
|
|
|
|
| all_features.extend(results.get("features", []))
|
|
|
|
|
| context = results.get("context", {})
|
| next_token = context.get("next")
|
|
|
| if not next_token:
|
| break
|
|
|
|
|
| self.datetimes = [f['properties']['datetime'] for f in all_features]
|
|
|
| print(f"✓ Found {len(self.datetimes)} acquisitions in timespan")
|
| return self.datetimes
|
|
|
|
|
| except requests.exceptions.HTTPError as e:
|
| print(f'✗ Catalog search failed: {e}')
|
| if hasattr(e.response, 'text'):
|
| print(f' Response: {e.response.text[:300]}')
|
| return []
|
| except Exception as e:
|
| print(f'✗ Catalog search failed: {e}')
|
| return []
|
|
|
| def _build_process_request(self, time_range: Tuple[str, str]) -> dict:
|
| """
|
| Build Process API request payload.
|
|
|
| Args:
|
| time_range (tuple): (start_time, end_time) for the request
|
|
|
| Returns:
|
| dict: Request payload for Process API
|
| """
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| process_collection = self.parameters['collection']
|
|
|
| request = {
|
| "input": {
|
| "bounds": {
|
| "properties": {"crs": "http://www.opengis.net/def/crs/OGC/1.3/CRS84"},
|
| "bbox": self.bbox,
|
| },
|
| "data": [
|
| {
|
| "type": process_collection,
|
| "dataFilter": {
|
| "timeRange": {
|
| "from": time_range[0],
|
| "to": time_range[1],
|
| }
|
| },
|
| }
|
| ],
|
| },
|
| "output": {
|
| "width": self.image_dimensions[0],
|
| "height": self.image_dimensions[1],
|
| "responses": [
|
| {
|
| "identifier": self.response_type,
|
| "format": {"type": "image/tiff"}
|
| }
|
| ]
|
| },
|
| "evalscript": self.evalscript,
|
| }
|
|
|
|
|
| if 'sentinel1' not in self.parameters['collection'].lower():
|
| request["input"]["data"][0]["processing"] = {
|
| "mosaickingOrder": "leastCC"
|
| }
|
|
|
| return request
|
|
|
|
|
| def _download_single_acquisition(self, datetime_str: str, index: int, total: int) -> Optional[str]:
|
| """
|
| Download data for a single acquisition.
|
|
|
| Args:
|
| datetime_str (str): Acquisition datetime
|
| index (int): Current acquisition number
|
| total (int): Total number of acquisitions
|
|
|
| Returns:
|
| str: Filename if successful, None otherwise
|
| """
|
| print(f'\n[{index}/{total}] Processing: {datetime_str}')
|
|
|
|
|
| dt = datetime.fromisoformat(datetime_str.replace('Z', '+00:00'))
|
| date_start = dt.date().strftime('%Y-%m-%d')
|
| date_end = (dt.date() + timedelta(days=1)).strftime('%Y-%m-%d')
|
| time_range = (f"{date_start}T00:00:00Z", f"{date_end}T00:00:00Z")
|
|
|
|
|
| request_payload = self._build_process_request(time_range)
|
|
|
|
|
| process_url = "https://sh.dataspace.copernicus.eu/api/v1/process"
|
|
|
| try:
|
| response = self.oauth_session.post(process_url, json=request_payload, timeout=120)
|
| response.raise_for_status()
|
|
|
|
|
| output_dir = os.path.join(
|
| os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))),
|
| self.output_folder
|
| )
|
| os.makedirs(output_dir, exist_ok=True)
|
|
|
|
|
|
|
| collection_map = {
|
| 'sentinel-2-l2a': 'SENTINEL2_L2A',
|
| 'sentinel-2-l1c': 'SENTINEL2_L1C',
|
|
|
| 'sentinel-1-grd': 'SENTINEL1',
|
| }
|
|
|
| filename = f"{self.consortium}_{self.response_type}_{collection_map[self.parameters['collection']]}_{(datetime_str.split('.')[0] + 'Z').replace(':', '_')}.tiff"
|
| output_path = os.path.join(output_dir, filename)
|
|
|
| self._save_tiff_with_metadata(response.content, output_path)
|
|
|
| print(f' ✓ Saved: {filename}')
|
| return filename
|
|
|
| except requests.exceptions.HTTPError as e:
|
| print(f' ✗ HTTP Error: {e}')
|
| if hasattr(e.response, 'text'):
|
| print(f' Response: {e.response.text}')
|
|
|
| if e.response.status_code == 400:
|
| print(f' Request payload was:')
|
| print(f' Collection type: {request_payload["input"]["data"][0]["type"]}')
|
| return None
|
| except Exception as e:
|
| print(f' ✗ Download failed: {e}')
|
| return None
|
|
|
| def _save_tiff_with_metadata(self, content: bytes, output_path: str):
|
| """
|
| Save TIFF content with corrected metadata.
|
|
|
| Args:
|
| content (bytes): Raw TIFF data from API
|
| output_path (str): Path to save corrected TIFF
|
| """
|
|
|
| with rasterio.open(BytesIO(content)) as src:
|
| data = src.read()
|
| profile = src.profile.copy()
|
|
|
|
|
| profile.update({
|
| 'photometric': 'MINISBLACK',
|
| 'compress': 'deflate',
|
| 'interleave': 'band',
|
| })
|
|
|
| if 'extra_samples' in profile:
|
| del profile['extra_samples']
|
|
|
|
|
| with rasterio.open(output_path, 'w', **profile) as dst:
|
| dst.write(data)
|
|
|
| def data_request(self, timespan: Optional[List[str]] = None):
|
| """
|
| Request and download satellite data for all available timestamps.
|
|
|
| Args:
|
| timespan (list, optional): [start_date, end_date] to override configured timespan
|
| """
|
| print("\n" + "="*70)
|
| print("COPERNICUS DATA EXTRACTION - Process API")
|
| print("="*70)
|
|
|
|
|
| if timespan:
|
| timestamps = self._get_timestamps(timespan)
|
| else:
|
| timestamps = getattr(self, 'datetimes', [])
|
|
|
| if not timestamps:
|
| print('\n✗ No data available for the specified timespan and parameters.')
|
| return
|
|
|
|
|
| successful = 0
|
| failed = 0
|
|
|
| for i, dt in enumerate(timestamps, 1):
|
| result = self._download_single_acquisition(dt, i, len(timestamps))
|
| if result:
|
| successful += 1
|
| else:
|
| failed += 1
|
|
|
|
|
| print("\n" + "="*70)
|
| print(f"SUMMARY: {successful} successful, {failed} failed out of {len(timestamps)} total")
|
| print("="*70 + "\n")
|
|
|
| def set_evalscript(self, new_evalscript: str):
|
| """
|
| Load and set a new evaluation script for data processing.
|
|
|
| Args:
|
| new_evalscript (str): Filename of the evalscript in 'request_scripts' directory
|
| """
|
| evalscript_path = os.path.join('config/request_scripts', new_evalscript)
|
|
|
| with open(evalscript_path, 'r') as evalscript_file:
|
| self.evalscript = evalscript_file.read()
|
|
|
|
|
| if new_evalscript == 'default_evalscript.js':
|
| self.response_type = "rgb_nir"
|
| self.obtained_data = ['red', 'green', 'blue', 'nir']
|
| elif new_evalscript == 'sentinel1_evalscript.js':
|
| self.response_type = "s1_vv"
|
| self.obtained_data = ['vv']
|
| else:
|
| self.response_type = "vi_values"
|
|
|
| pattern = re.compile(r"(?:rgb_nir|vi_values|s1_vv):\s*\[([^\]]+)\]", re.DOTALL)
|
| match = pattern.search(self.evalscript)
|
|
|
| if match:
|
| vi_values_content = match.group(1)
|
| self.obtained_data = [value.strip() for value in vi_values_content.split(",")]
|
| else:
|
| self.obtained_data = []
|
|
|
| print(f'✓ Evalscript set to: {new_evalscript}')
|
| print(f' Response type: {self.response_type}')
|
| print(f' Output layers: {len(self.obtained_data)}')
|
|
|
|
|
|
|
| if __name__ == '__main__':
|
|
|
| extractor = CopernicusDataExtractor(evalscript='vis_evalscript.js')
|
|
|
|
|
| timestamps = extractor._get_timestamps(timespan=['2023-12-29','2023-12-30'])
|
| print(f'\nAvailable acquisitions: {len(timestamps)}')
|
|
|
|
|
| extractor.data_request(['2025-01-29', '2025-01-30'])
|
| print(f'\nObtained data layers: {extractor.obtained_data}')
|
|
|
| print(f'\nObtained data layers: {extractor.obtained_data}') |