Spaces:
Running
Running
| """ Utility functions for Google Earth Engine data extraction and processing. """ | |
| from datetime import datetime, timedelta | |
| import json | |
| import os | |
| import tempfile | |
| import time | |
| import ee | |
| import geopandas as gpd | |
| from shapely.geometry import Point | |
| import pandas as pd | |
| from indices import add_s2_indices, add_s1_indices | |
| from variables import s2_bands, s1_bands | |
| def check_inside_civ(lat: float, lon: float): | |
| """ Check if the given latitude and longitude are inside Côte d'Ivoire. """ | |
| civ = gpd.read_file("data/CIV_0.json") | |
| point = Point(lon, lat) | |
| return civ.contains(point).any() | |
| def initialize_ee(): | |
| """ Initialize Google Earth Engine """ | |
| sa_email = os.getenv("EE_SERVICE_ACCOUNT") | |
| sa_key_json = os.getenv("EE_SERVICE_KEY") | |
| try: | |
| # Hugging Face | |
| if sa_email and sa_key_json: | |
| key_path = os.path.join(tempfile.gettempdir(), "ee-key.json") | |
| with open(key_path, "w", encoding="utf-8") as f: | |
| f.write(sa_key_json) | |
| creds = ee.ServiceAccountCredentials(sa_email, key_path) | |
| ee.Initialize(creds) | |
| print(f"[INFO] GEE initialized with service account {sa_email}") | |
| return | |
| # Local | |
| local_key = "secrets/gcp-sa-key.json" | |
| if os.path.exists(local_key): | |
| with open(local_key, "r", encoding="utf-8") as f: | |
| key_data = json.load(f) | |
| creds = ee.ServiceAccountCredentials(key_data["client_email"], local_key) | |
| ee.Initialize(creds) | |
| print(f"[INFO] GEE initialized from {local_key}") | |
| return | |
| # Neither HF nor Local | |
| ee.Initialize() | |
| print("[INFO] GEE initialized") | |
| except Exception as e: | |
| raise RuntimeError(f"GEE initialization failed : {e}") from e | |
| def mask_s2_clouds(image): | |
| """ Mask clouds and cirrus in Sentinel-2 images. """ | |
| qa = image.select('QA60') | |
| # Bits 10 and 11 are clouds and cirrus, respectively. | |
| cloud_bit_mask = 1 << 10 | |
| cirrus_bit_mask = 1 << 11 | |
| # Both flags should be set to zero, indicating clear conditions. | |
| mask = ( | |
| qa.bitwiseAnd(cloud_bit_mask) | |
| .eq(0) | |
| .And(qa.bitwiseAnd(cirrus_bit_mask).eq(0)) | |
| ) | |
| masked = image.updateMask(mask).divide(10000) | |
| masked = masked.copyProperties( | |
| source=image, | |
| properties=[ | |
| "system:time_start", | |
| "system:time_end", | |
| "CLOUDY_PIXEL_PERCENTAGE", | |
| "SPACECRAFT_NAME" | |
| ] | |
| ) | |
| return masked | |
| def mask_edge(image): | |
| """ Mask pixels at the edge in Sentinel-1 images. """ | |
| edge = image.lt(-30.0) | |
| masked_image = image.mask().And(edge.Not()) | |
| return image.updateMask(masked_image) | |
| def days_since_utc(utc_iso: str) -> int: | |
| """ Compute the number of days since the image was taken. """ | |
| t_img = time.mktime(time.strptime(utc_iso[:19], "%Y-%m-%dT%H:%M:%S")) | |
| return int((time.time() - t_img) // 86400) | |
| def lonlat_to_utm_epsg(lon: float, lat: float) -> int: | |
| """ Convert longitude and latitude to UTM EPSG code. """ | |
| zone = int((lon + 180) // 6) + 1 | |
| if lat >= 0: | |
| return 32600 + zone # WGS84 UTM N | |
| return 32700 + zone # WGS84 UTM S | |
| def projected_xy(lon: float, lat: float): | |
| """ Convert lon/lat to projected coordinates (easting, northing). """ | |
| epsg = f"EPSG:{lonlat_to_utm_epsg(lon, lat)}" | |
| pt = ee.Geometry.Point([lon, lat]) | |
| proj = ee.Projection(epsg) | |
| xy = ee.List(pt.transform(proj, 1).coordinates()).getInfo() | |
| return float(xy[0]), float(xy[1]) | |
| def extract_from_gee(lat: float, lon: float, radius_m: int = 30): | |
| """ Extract data from GEE for given lat/lon. """ | |
| pt = ee.Geometry.Point([float(lon), float(lat)]) | |
| roi = pt.buffer(radius_m).bounds() | |
| end_date = datetime.now() | |
| start_date = end_date - timedelta(days=31) | |
| start = start_date.strftime('%Y-%m-%d') | |
| end = end_date.strftime('%Y-%m-%d') | |
| S2 = ee.ImageCollection("COPERNICUS/S2_SR_HARMONIZED") | |
| S1 = ee.ImageCollection("COPERNICUS/S1_GRD") | |
| DEM = ee.Image("USGS/SRTMGL1_003") | |
| TIME_KEY = "system:time_start" | |
| s2 = (S2.filterBounds(roi) | |
| .filterDate(start, end) | |
| .filter(ee.Filter.lt("CLOUDY_PIXEL_PERCENTAGE", 40)) | |
| .map(mask_s2_clouds) | |
| .select(s2_bands)) | |
| img_s2 = s2.sort(TIME_KEY, False).first() | |
| if img_s2 is None: | |
| return None, {"error": "No Sentinel-2 image available on ROI/time window."} | |
| cloud_cover = ee.Number(img_s2.get("CLOUDY_PIXEL_PERCENTAGE")).getInfo() | |
| acq_iso_s2 = ee.Date(img_s2.get(TIME_KEY)).format().getInfo() | |
| days_s2 = days_since_utc(acq_iso_s2) | |
| s1 = (S1.filterBounds(roi) | |
| .filterDate(start, end) | |
| .filter(ee.Filter.eq('instrumentMode', 'IW')) | |
| .filter(ee.Filter.listContains('transmitterReceiverPolarisation', 'VV')) | |
| .filter(ee.Filter.listContains('transmitterReceiverPolarisation', 'VH')) | |
| .map(mask_edge) | |
| .select(s1_bands)) | |
| img_s1 = s1.sort(TIME_KEY, False).first() | |
| acq_iso_s1 = ee.Date(img_s1.get(TIME_KEY)).format().getInfo() | |
| days_s1 = days_since_utc(acq_iso_s1) | |
| if days_s1 > days_s2: | |
| estimation_date = acq_iso_s2 | |
| else: | |
| estimation_date = acq_iso_s1 | |
| # DEM | |
| s1_proj = img_s1.projection() | |
| elevation = DEM.reproject(s1_proj).clip(roi) | |
| s1_s2_dem_image = ( | |
| img_s1 | |
| .addBands(img_s2) | |
| .addBands(elevation) | |
| ) | |
| # Convert the image to a dictionary | |
| image_dict = s1_s2_dem_image.reduceRegion( | |
| reducer=ee.Reducer.mean(), | |
| geometry=roi, | |
| scale=10, | |
| maxPixels=1e8 | |
| ).getInfo() | |
| if image_dict: | |
| init_dict = {k: image_dict.get(k) for k in (s2_bands + s1_bands + ['elevation'])} | |
| data = pd.DataFrame([init_dict]) | |
| data["lon"] = lon | |
| data["lat"] = lat | |
| easting, northing = projected_xy(lon, lat) | |
| data["latitude_proj"] = northing | |
| data["longitude_proj"] = easting | |
| data = add_s2_indices(data) | |
| data = add_s1_indices(data) | |
| ndvi_mean = data["NDVI"].values[0] if "NDVI" in data else None | |
| else: | |
| data = None | |
| ndvi_mean = None | |
| return { | |
| "X": data, | |
| "cloud": float(cloud_cover), | |
| "estimation_date": estimation_date.split("T")[0], | |
| "ndvi_mean": ndvi_mean | |
| }, None | |