| import numpy as np
|
| import pandas as pd
|
| import matplotlib.pyplot as plt
|
| from shapely.geometry import Point, Polygon
|
| import random
|
| import datetime
|
| import gradio as gr
|
| import tempfile
|
| import os
|
| import requests
|
| import json
|
| from typing import List, Tuple, Optional, Dict, Any, Union
|
|
|
| def fetch_osm_exclusion_zones(bounds: Tuple[float, float, float, float], exclusion_types: List[str]) -> Optional[Any]:
|
| """
|
| Fetch exclusion zones from OpenStreetMap using Overpass API.
|
|
|
| Args:
|
| bounds: (min_lat, min_lon, max_lat, max_lon) bounding box
|
| exclusion_types: List of exclusion types to fetch
|
|
|
| Returns:
|
| GeoDataFrame with exclusion polygons or None if failed
|
| """
|
| try:
|
| import geopandas as gpd
|
| from shapely.geometry import Polygon, MultiPolygon, LineString
|
|
|
|
|
| overpass_url = "http://overpass-api.de/api/interpreter"
|
|
|
|
|
| queries = []
|
|
|
| if "Water bodies" in exclusion_types:
|
|
|
| queries.extend([
|
|
|
| f'way["natural"="water"]({bounds[0]},{bounds[1]},{bounds[2]},{bounds[3]});',
|
| f'relation["natural"="water"]({bounds[0]},{bounds[1]},{bounds[2]},{bounds[3]});',
|
| f'way["landuse"="reservoir"]({bounds[0]},{bounds[1]},{bounds[2]},{bounds[3]});',
|
| f'way["water"="lake"]({bounds[0]},{bounds[1]},{bounds[2]},{bounds[3]});',
|
| f'way["water"="pond"]({bounds[0]},{bounds[1]},{bounds[2]},{bounds[3]});',
|
|
|
| f'way["waterway"="river"]({bounds[0]},{bounds[1]},{bounds[2]},{bounds[3]});',
|
| f'way["waterway"="stream"]({bounds[0]},{bounds[1]},{bounds[2]},{bounds[3]});',
|
| f'way["waterway"="canal"]({bounds[0]},{bounds[1]},{bounds[2]},{bounds[3]});'
|
| ])
|
|
|
| if "Parks & green spaces" in exclusion_types:
|
| queries.extend([
|
| f'way["leisure"="park"]({bounds[0]},{bounds[1]},{bounds[2]},{bounds[3]});',
|
| f'way["landuse"="forest"]({bounds[0]},{bounds[1]},{bounds[2]},{bounds[3]});',
|
| f'way["landuse"="grass"]({bounds[0]},{bounds[1]},{bounds[2]},{bounds[3]});',
|
| f'way["natural"="wood"]({bounds[0]},{bounds[1]},{bounds[2]},{bounds[3]});'
|
| ])
|
|
|
| if "Industrial areas" in exclusion_types:
|
| queries.extend([
|
| f'way["landuse"="industrial"]({bounds[0]},{bounds[1]},{bounds[2]},{bounds[3]});',
|
| f'way["landuse"="commercial"]({bounds[0]},{bounds[1]},{bounds[2]},{bounds[3]});'
|
| ])
|
|
|
| if "Major roads" in exclusion_types:
|
| queries.extend([
|
| f'way["highway"~"motorway|trunk|primary"]({bounds[0]},{bounds[1]},{bounds[2]},{bounds[3]});'
|
| ])
|
|
|
| if not queries:
|
| return None
|
|
|
|
|
| overpass_query = f"""
|
| [out:json][timeout:25];
|
| (
|
| {chr(10).join(queries)}
|
| );
|
| out geom;
|
| """
|
|
|
| print(f"Fetching OSM data for exclusion zones: {exclusion_types}")
|
|
|
|
|
| response = requests.get(overpass_url, params={'data': overpass_query})
|
| response.raise_for_status()
|
|
|
| data = response.json()
|
|
|
| if 'elements' not in data or not data['elements']:
|
| print("No exclusion zones found in the specified area")
|
| return None
|
|
|
|
|
| polygons = []
|
| zone_types = []
|
|
|
| for element in data['elements']:
|
| try:
|
| if element['type'] == 'way' and 'geometry' in element:
|
| tags = element.get('tags', {})
|
|
|
|
|
| zone_type = None
|
| if 'natural' in tags and tags['natural'] == 'water':
|
| zone_type = 'Water'
|
| elif 'landuse' in tags and tags['landuse'] == 'reservoir':
|
| zone_type = 'Water'
|
| elif 'water' in tags:
|
| zone_type = 'Water'
|
| elif 'waterway' in tags and tags['waterway'] in ['river', 'stream', 'canal']:
|
| zone_type = 'Water'
|
| elif 'leisure' in tags and tags['leisure'] == 'park':
|
| zone_type = 'Park'
|
| elif 'landuse' in tags and tags['landuse'] in ['forest', 'grass']:
|
| zone_type = 'Green space'
|
| elif 'natural' in tags and tags['natural'] == 'wood':
|
| zone_type = 'Forest'
|
| elif 'landuse' in tags and tags['landuse'] in ['industrial', 'commercial']:
|
| zone_type = 'Industrial/Commercial'
|
| elif 'highway' in tags:
|
| zone_type = 'Major road'
|
|
|
| if zone_type is None:
|
| continue
|
|
|
|
|
| coords = [(node['lon'], node['lat']) for node in element['geometry']]
|
|
|
|
|
| if 'waterway' in tags or 'highway' in tags:
|
|
|
| if len(coords) >= 2:
|
| try:
|
| line = LineString(coords)
|
|
|
| if 'waterway' in tags:
|
| if tags['waterway'] == 'river':
|
| buffer_size = 50 / 111320
|
| elif tags['waterway'] == 'canal':
|
| buffer_size = 30 / 111320
|
| else:
|
| buffer_size = 20 / 111320
|
| else:
|
| buffer_size = 25 / 111320
|
|
|
| polygon = line.buffer(buffer_size)
|
| if polygon.is_valid and polygon.area > 0:
|
| polygons.append(polygon)
|
| zone_types.append(zone_type)
|
| except Exception as e:
|
| print(f"Error buffering linear feature: {str(e)}")
|
| continue
|
| else:
|
|
|
| if len(coords) > 2:
|
|
|
| if coords[0] != coords[-1]:
|
| coords.append(coords[0])
|
|
|
| if len(coords) >= 4:
|
| try:
|
| polygon = Polygon(coords)
|
| if polygon.is_valid and polygon.area > 0:
|
| polygons.append(polygon)
|
| zone_types.append(zone_type)
|
| except Exception as e:
|
| print(f"Error creating polygon: {str(e)}")
|
| continue
|
|
|
| except Exception as e:
|
| print(f"Error processing OSM element: {str(e)}")
|
| continue
|
|
|
| if not polygons:
|
| print("No valid polygons found in OSM data")
|
| return None
|
|
|
|
|
| gdf = gpd.GeoDataFrame(
|
| {'zone_type': zone_types},
|
| geometry=polygons,
|
| crs='EPSG:4326'
|
| )
|
|
|
| print(f"Successfully fetched {len(gdf)} exclusion zones from OpenStreetMap")
|
| print(f"Zone types found: {gdf['zone_type'].value_counts().to_dict()}")
|
| return gdf
|
|
|
| except ImportError:
|
| print("GeoPandas not available for OSM processing")
|
| return None
|
| except requests.exceptions.RequestException as e:
|
| print(f"Error fetching data from OpenStreetMap: {str(e)}")
|
| return None
|
| except Exception as e:
|
| print(f"Error processing OpenStreetMap data: {str(e)}")
|
| return None
|
|
|
| def calculate_bounds_from_points(input_df: pd.DataFrame, buffer_km: float = 2.0) -> Tuple[float, float, float, float]:
|
| """Calculate bounding box around input points with buffer"""
|
|
|
| min_lat = input_df['lat'].min()
|
| max_lat = input_df['lat'].max()
|
| min_lon = input_df['lon'].min()
|
| max_lon = input_df['lon'].max()
|
|
|
|
|
| buffer_deg = buffer_km / 111.0
|
|
|
| return (
|
| min_lat - buffer_deg,
|
| min_lon - buffer_deg,
|
| max_lat + buffer_deg,
|
| max_lon + buffer_deg
|
| )
|
|
|
| class SpatialDiffuser:
|
| """
|
| Class for performing spatial diffusion - takes points with counts and diffuses them
|
| according to specified distributions within given radii, with optional exclusion zones.
|
| """
|
|
|
| def __init__(self):
|
| self.distribution_methods = {
|
| "uniform": self._uniform_distribution,
|
| "normal": self._normal_distribution,
|
| "exponential_decay": self._exponential_decay,
|
| "distance_weighted": self._distance_weighted
|
| }
|
|
|
| def diffuse_points(self,
|
| input_data: pd.DataFrame,
|
| distribution_type: str = "uniform",
|
| global_radius: Optional[float] = None,
|
| time_start: Optional[datetime.datetime] = None,
|
| time_end: Optional[datetime.datetime] = None,
|
| seed: Optional[int] = None,
|
| exclusion_zones_gdf: Optional[Any] = None) -> pd.DataFrame:
|
| """
|
| Generate diffused points based on input coordinates and counts.
|
|
|
| Args:
|
| input_data: DataFrame with columns: lat, lon, count, radius (optional)
|
| distribution_type: Type of spatial distribution to use
|
| global_radius: Radius to use for all points if not specified individually (in meters)
|
| time_start: Start time for temporal distribution
|
| time_end: End time for temporal distribution
|
| seed: Random seed for reproducible results
|
| exclusion_zones_gdf: GeoDataFrame with polygons to exclude points from
|
|
|
| Returns:
|
| DataFrame with columns: lat, lon, source_id, timestamp (if temporal)
|
| """
|
|
|
| if seed is not None:
|
| np.random.seed(seed)
|
| random.seed(seed)
|
|
|
| if distribution_type not in self.distribution_methods:
|
| raise ValueError(f"Distribution type '{distribution_type}' not supported. Choose from: {list(self.distribution_methods.keys())}")
|
|
|
|
|
| all_points = []
|
|
|
|
|
| for idx, row in input_data.iterrows():
|
|
|
| radius = row.get('radius', global_radius)
|
| if radius is None:
|
| raise ValueError("Radius must be specified either globally or per point")
|
|
|
|
|
| count = int(row['count'])
|
| if count <= 0:
|
| continue
|
|
|
|
|
| new_points = self._generate_points_with_exclusions(
|
| lat=row['lat'],
|
| lon=row['lon'],
|
| count=count,
|
| radius=radius,
|
| distribution_type=distribution_type,
|
| exclusion_zones_gdf=exclusion_zones_gdf
|
| )
|
|
|
|
|
| source_ids = [idx] * len(new_points)
|
|
|
|
|
| if time_start is not None and time_end is not None:
|
| timestamps = self._generate_timestamps(len(new_points), time_start, time_end)
|
|
|
|
|
| for i, point in enumerate(new_points):
|
| all_points.append({
|
| 'lat': point[0],
|
| 'lon': point[1],
|
| 'source_id': source_ids[i],
|
| 'timestamp': timestamps[i]
|
| })
|
| else:
|
|
|
| for i, point in enumerate(new_points):
|
| all_points.append({
|
| 'lat': point[0],
|
| 'lon': point[1],
|
| 'source_id': source_ids[i]
|
| })
|
|
|
|
|
| result = pd.DataFrame(all_points)
|
| return result
|
|
|
| def _generate_points_with_exclusions(self, lat: float, lon: float, count: int, radius: float,
|
| distribution_type: str, exclusion_zones_gdf: Optional[Any] = None) -> List[Tuple[float, float]]:
|
| """Generate points while avoiding exclusion zones"""
|
|
|
| if exclusion_zones_gdf is None or len(exclusion_zones_gdf) == 0:
|
|
|
| return self.distribution_methods[distribution_type](lat, lon, count, radius)
|
|
|
| try:
|
| import geopandas as gpd
|
| from shapely.geometry import Point
|
|
|
| valid_points = []
|
| max_attempts = count * 10
|
| attempts = 0
|
|
|
|
|
| if exclusion_zones_gdf.crs is None:
|
| exclusion_zones_gdf = exclusion_zones_gdf.set_crs('EPSG:4326')
|
| elif exclusion_zones_gdf.crs != 'EPSG:4326':
|
| exclusion_zones_gdf = exclusion_zones_gdf.to_crs('EPSG:4326')
|
|
|
| while len(valid_points) < count and attempts < max_attempts:
|
|
|
| batch_size = min(count * 2, max_attempts - attempts)
|
| candidate_points = self.distribution_methods[distribution_type](
|
| lat, lon, batch_size, radius
|
| )
|
|
|
|
|
| for point in candidate_points:
|
| if len(valid_points) >= count:
|
| break
|
|
|
| point_geom = Point(point[1], point[0])
|
|
|
|
|
| is_excluded = False
|
| for _, exclusion_zone in exclusion_zones_gdf.iterrows():
|
| if point_geom.intersects(exclusion_zone.geometry):
|
| is_excluded = True
|
| break
|
|
|
| if not is_excluded:
|
| valid_points.append(point)
|
|
|
| attempts += batch_size
|
|
|
|
|
| if len(valid_points) < count:
|
| print(f"Warning: Could only generate {len(valid_points)} valid points out of {count} requested for location ({lat}, {lon}). Exclusion zones may be too restrictive.")
|
|
|
| return valid_points
|
|
|
| except ImportError:
|
| print("GeoPandas not available for exclusion zone processing. Generating points without exclusions.")
|
| return self.distribution_methods[distribution_type](lat, lon, count, radius)
|
| except Exception as e:
|
| print(f"Error processing exclusion zones: {str(e)}. Generating points without exclusions.")
|
| return self.distribution_methods[distribution_type](lat, lon, count, radius)
|
|
|
| def _uniform_distribution(self, lat: float, lon: float, count: int, radius: float) -> List[Tuple[float, float]]:
|
| """Generate points uniformly distributed within a circle"""
|
| points = []
|
|
|
| for _ in range(count):
|
|
|
| angle = random.uniform(0, 2 * np.pi)
|
|
|
| r = radius * np.sqrt(random.uniform(0, 1))
|
|
|
|
|
| x = r * np.cos(angle)
|
| y = r * np.sin(angle)
|
|
|
|
|
|
|
| lat_offset = y / 111320
|
|
|
| lon_offset = x / (111320 * np.cos(np.radians(lat)))
|
|
|
| new_lat = lat + lat_offset
|
| new_lon = lon + lon_offset
|
|
|
| points.append((new_lat, new_lon))
|
|
|
| return points
|
|
|
| def _normal_distribution(self, lat: float, lon: float, count: int, radius: float) -> List[Tuple[float, float]]:
|
| """Generate points with normal distribution (more concentrated near center)"""
|
| points = []
|
|
|
|
|
| std_dev = radius / 3
|
|
|
| for _ in range(count):
|
|
|
| while True:
|
|
|
| x = np.random.normal(0, std_dev)
|
| y = np.random.normal(0, std_dev)
|
|
|
|
|
| distance = np.sqrt(x**2 + y**2)
|
|
|
|
|
| if distance <= radius:
|
| break
|
|
|
|
|
| lat_offset = y / 111320
|
| lon_offset = x / (111320 * np.cos(np.radians(lat)))
|
|
|
| new_lat = lat + lat_offset
|
| new_lon = lon + lon_offset
|
|
|
| points.append((new_lat, new_lon))
|
|
|
| return points
|
|
|
| def _exponential_decay(self, lat: float, lon: float, count: int, radius: float) -> List[Tuple[float, float]]:
|
| """Generate points with exponential decay from center"""
|
| points = []
|
|
|
|
|
| rate = 3.0 / radius
|
|
|
| for _ in range(count):
|
|
|
| angle = random.uniform(0, 2 * np.pi)
|
|
|
|
|
|
|
| while True:
|
|
|
| r = random.expovariate(rate)
|
| if r <= radius:
|
| break
|
|
|
|
|
| x = r * np.cos(angle)
|
| y = r * np.sin(angle)
|
|
|
|
|
| lat_offset = y / 111320
|
| lon_offset = x / (111320 * np.cos(np.radians(lat)))
|
|
|
| new_lat = lat + lat_offset
|
| new_lon = lon + lon_offset
|
|
|
| points.append((new_lat, new_lon))
|
|
|
| return points
|
|
|
| def _distance_weighted(self, lat: float, lon: float, count: int, radius: float) -> List[Tuple[float, float]]:
|
| """
|
| Generate points with a custom distance-weighted distribution
|
| (more points at medium distances than at center or edge)
|
| """
|
| points = []
|
|
|
| for _ in range(count):
|
|
|
| angle = random.uniform(0, 2 * np.pi)
|
|
|
|
|
|
|
| r_squared = random.betavariate(2, 2)
|
| r = np.sqrt(r_squared) * radius
|
|
|
|
|
| x = r * np.cos(angle)
|
| y = r * np.sin(angle)
|
|
|
|
|
| lat_offset = y / 111320
|
| lon_offset = x / (111320 * np.cos(np.radians(lat)))
|
|
|
| new_lat = lat + lat_offset
|
| new_lon = lon + lon_offset
|
|
|
| points.append((new_lat, new_lon))
|
|
|
| return points
|
|
|
| def _generate_timestamps(self, count: int, start_time: datetime.datetime, end_time: datetime.datetime) -> List[datetime.datetime]:
|
| """Generate uniformly distributed timestamps"""
|
| timestamps = []
|
|
|
|
|
| start_ts = start_time.timestamp()
|
| end_ts = end_time.timestamp()
|
|
|
| for _ in range(count):
|
|
|
| random_ts = random.uniform(start_ts, end_ts)
|
| timestamp = datetime.datetime.fromtimestamp(random_ts)
|
| timestamps.append(timestamp)
|
|
|
|
|
| timestamps.sort()
|
|
|
| return timestamps
|
|
|
| def create_visualization(input_df, output_df, show_basemap=False, exclusion_zones_gdf=None):
|
| """Create visualization of input and diffused points"""
|
| fig, ax = plt.subplots(figsize=(12, 10))
|
|
|
|
|
| fig.patch.set_facecolor('white')
|
| ax.set_facecolor('#f8f9fa')
|
|
|
|
|
| exclusion_colors = {
|
| 'Water': '#4FC3F7',
|
| 'Park': '#66BB6A',
|
| 'Green space': '#81C784',
|
| 'Forest': '#4CAF50',
|
| 'Industrial/Commercial': '#90A4AE',
|
| 'Major road': '#FFD54F',
|
| 'Other': '#FFAB91'
|
| }
|
|
|
|
|
| if show_basemap:
|
| try:
|
| import contextily as ctx
|
| import geopandas as gpd
|
| from shapely.geometry import Point
|
|
|
|
|
| input_gdf = gpd.GeoDataFrame(
|
| input_df,
|
| geometry=[Point(lon, lat) for lon, lat in zip(input_df['lon'], input_df['lat'])],
|
| crs='EPSG:4326'
|
| )
|
| output_gdf = gpd.GeoDataFrame(
|
| output_df,
|
| geometry=[Point(lon, lat) for lon, lat in zip(output_df['lon'], output_df['lat'])],
|
| crs='EPSG:4326'
|
| )
|
|
|
|
|
| input_gdf_merc = input_gdf.to_crs('EPSG:3857')
|
| output_gdf_merc = output_gdf.to_crs('EPSG:3857')
|
|
|
|
|
| if exclusion_zones_gdf is not None and len(exclusion_zones_gdf) > 0:
|
| try:
|
| exclusion_zones_merc = exclusion_zones_gdf.to_crs('EPSG:3857')
|
|
|
|
|
| plotted_types = set()
|
| for zone_type in exclusion_zones_merc['zone_type'].unique():
|
| zone_subset = exclusion_zones_merc[exclusion_zones_merc['zone_type'] == zone_type]
|
| color = exclusion_colors.get(zone_type, exclusion_colors['Other'])
|
|
|
|
|
| label = zone_type if zone_type not in plotted_types else None
|
| if label:
|
| plotted_types.add(zone_type)
|
|
|
| zone_subset.plot(ax=ax, color=color, alpha=0.6, edgecolor='white',
|
| linewidth=0.5, label=label)
|
|
|
| except Exception as e:
|
| print(f"Error plotting exclusion zones: {str(e)}")
|
|
|
|
|
| input_x = input_gdf_merc.geometry.x
|
| input_y = input_gdf_merc.geometry.y
|
| output_x = output_gdf_merc.geometry.x
|
| output_y = output_gdf_merc.geometry.y
|
|
|
|
|
| ax.scatter(output_x, output_y,
|
| alpha=0.7, color='#FF9800', s=12, label=f'Generated Points (n={len(output_df)})',
|
| edgecolors='white', linewidth=0.3)
|
|
|
|
|
| for idx, row in input_df.iterrows():
|
| radius = row.get('radius', None)
|
|
|
| if radius is not None:
|
|
|
| center_point = gpd.GeoDataFrame(
|
| [1], geometry=[Point(row['lon'], row['lat'])], crs='EPSG:4326'
|
| ).to_crs('EPSG:3857')
|
|
|
| center_x = center_point.geometry.x.iloc[0]
|
| center_y = center_point.geometry.y.iloc[0]
|
|
|
|
|
| circle = plt.Circle((center_x, center_y), radius,
|
| fill=False, color='#9C27B0', linestyle='--',
|
| alpha=0.5, linewidth=2)
|
| ax.add_patch(circle)
|
|
|
|
|
| min_size = 100
|
| max_size = 800
|
| if len(input_df) > 1:
|
| size_range = input_df['count'].max() - input_df['count'].min()
|
| if size_range > 0:
|
| sizes = min_size + (input_df['count'] - input_df['count'].min()) / size_range * (max_size - min_size)
|
| else:
|
| sizes = [min_size] * len(input_df)
|
| else:
|
| sizes = [max_size]
|
|
|
|
|
| ax.scatter(input_x, input_y,
|
| s=sizes, c='#9C27B0', alpha=0.9,
|
| edgecolors='white', linewidth=2,
|
| label='Source Points (size = count)', zorder=5)
|
|
|
|
|
| for idx, row in input_df.iterrows():
|
| point_merc = gpd.GeoDataFrame(
|
| [1], geometry=[Point(row['lon'], row['lat'])], crs='EPSG:4326'
|
| ).to_crs('EPSG:3857')
|
|
|
| x_merc = point_merc.geometry.x.iloc[0]
|
| y_merc = point_merc.geometry.y.iloc[0]
|
|
|
| ax.annotate(f'{int(row["count"])}',
|
| (x_merc, y_merc),
|
| xytext=(8, 8), textcoords='offset points',
|
| fontsize=10, fontweight='bold', color='white',
|
| bbox=dict(boxstyle='round,pad=0.3', facecolor='#9C27B0', alpha=0.8),
|
| zorder=6)
|
|
|
|
|
| try:
|
| ctx.add_basemap(ax, crs='EPSG:3857', source=ctx.providers.CartoDB.Positron, alpha=0.8)
|
| basemap_added = True
|
| except Exception as e:
|
| print(f"Could not add basemap: {str(e)}")
|
| basemap_added = False
|
|
|
|
|
| ax.set_xlabel('Easting (Web Mercator)', fontsize=12)
|
| ax.set_ylabel('Northing (Web Mercator)', fontsize=12)
|
|
|
|
|
| x_coords = list(input_x) + list(output_x)
|
| y_coords = list(input_y) + list(output_y)
|
|
|
| except ImportError:
|
| print("Contextily not available for basemap. Falling back to simple plot.")
|
| show_basemap = False
|
| except Exception as e:
|
| print(f"Error creating basemap: {str(e)}. Falling back to simple plot.")
|
| show_basemap = False
|
|
|
|
|
| if not show_basemap:
|
|
|
| if exclusion_zones_gdf is not None and len(exclusion_zones_gdf) > 0:
|
| try:
|
|
|
| if exclusion_zones_gdf.crs != 'EPSG:4326':
|
| exclusion_zones_gdf = exclusion_zones_gdf.to_crs('EPSG:4326')
|
|
|
|
|
| plotted_types = set()
|
| for idx, zone in exclusion_zones_gdf.iterrows():
|
| zone_type = zone.get('zone_type', 'Other')
|
| color = exclusion_colors.get(zone_type, exclusion_colors['Other'])
|
|
|
|
|
| label = zone_type if zone_type not in plotted_types else None
|
| if label:
|
| plotted_types.add(zone_type)
|
|
|
| if zone.geometry.geom_type == 'Polygon':
|
| x, y = zone.geometry.exterior.xy
|
| ax.fill(x, y, color=color, alpha=0.6, edgecolor='white',
|
| linewidth=0.5, label=label)
|
| elif zone.geometry.geom_type == 'MultiPolygon':
|
| for poly in zone.geometry.geoms:
|
| x, y = poly.exterior.xy
|
| ax.fill(x, y, color=color, alpha=0.6, edgecolor='white',
|
| linewidth=0.5, label=label)
|
| label = None
|
|
|
| except Exception as e:
|
| print(f"Error plotting exclusion zones: {str(e)}")
|
|
|
|
|
| ax.scatter(output_df['lon'], output_df['lat'],
|
| alpha=0.7, color='#FF9800', s=12, label=f'Generated Points (n={len(output_df)})',
|
| edgecolors='white', linewidth=0.3)
|
|
|
|
|
| for idx, row in input_df.iterrows():
|
| radius = row.get('radius', None)
|
|
|
| if radius is not None:
|
|
|
| radius_deg_lat = radius / 111320
|
| radius_deg_lon = radius / (111320 * np.cos(np.radians(row['lat'])))
|
|
|
|
|
| radius_deg = (radius_deg_lat + radius_deg_lon) / 2
|
|
|
|
|
| circle = plt.Circle((row['lon'], row['lat']), radius_deg,
|
| fill=False, color='#9C27B0', linestyle='--',
|
| alpha=0.5, linewidth=2)
|
| ax.add_patch(circle)
|
|
|
|
|
| min_size = 100
|
| max_size = 800
|
| if len(input_df) > 1:
|
| size_range = input_df['count'].max() - input_df['count'].min()
|
| if size_range > 0:
|
| sizes = min_size + (input_df['count'] - input_df['count'].min()) / size_range * (max_size - min_size)
|
| else:
|
| sizes = [min_size] * len(input_df)
|
| else:
|
| sizes = [max_size]
|
|
|
|
|
| ax.scatter(input_df['lon'], input_df['lat'],
|
| s=sizes, c='#9C27B0', alpha=0.9,
|
| edgecolors='white', linewidth=2,
|
| label='Source Points (size = count)', zorder=5)
|
|
|
|
|
| for idx, row in input_df.iterrows():
|
| ax.annotate(f'{int(row["count"])}',
|
| (row['lon'], row['lat']),
|
| xytext=(8, 8), textcoords='offset points',
|
| fontsize=10, fontweight='bold', color='white',
|
| bbox=dict(boxstyle='round,pad=0.3', facecolor='#9C27B0', alpha=0.8),
|
| zorder=6)
|
|
|
|
|
| ax.set_xlabel('Longitude', fontsize=12)
|
| ax.set_ylabel('Latitude', fontsize=12)
|
|
|
|
|
| x_coords = list(input_df['lon']) + list(output_df['lon'])
|
| y_coords = list(input_df['lat']) + list(output_df['lat'])
|
|
|
|
|
| title = 'Spatial Diffusion Results'
|
| if show_basemap:
|
| title += ' (with Basemap)'
|
| if exclusion_zones_gdf is not None and len(exclusion_zones_gdf) > 0:
|
| title += ' - Exclusion Zones Applied'
|
| subtitle = 'Purple source points sized by count, orange generated points, dashed circles show diffusion radius'
|
|
|
| ax.set_title(f'{title}\n{subtitle}',
|
| fontsize=14, fontweight='bold', pad=20)
|
|
|
|
|
| legend = ax.legend(loc='upper right', bbox_to_anchor=(1, 1),
|
| frameon=True, fancybox=True, shadow=True)
|
| legend.get_frame().set_facecolor('white')
|
| legend.get_frame().set_alpha(0.9)
|
|
|
|
|
| grid_alpha = 0.2 if show_basemap else 0.3
|
| ax.grid(True, alpha=grid_alpha, linestyle='-', linewidth=0.5)
|
|
|
|
|
| ax.set_aspect('equal', 'box')
|
|
|
|
|
| x_margin = (max(x_coords) - min(x_coords)) * 0.1
|
| y_margin = (max(y_coords) - min(y_coords)) * 0.1
|
|
|
| if x_margin == 0:
|
| x_margin = 1000 if show_basemap else 0.01
|
| if y_margin == 0:
|
| y_margin = 1000 if show_basemap else 0.01
|
|
|
| ax.set_xlim(min(x_coords) - x_margin, max(x_coords) + x_margin)
|
| ax.set_ylim(min(y_coords) - y_margin, max(y_coords) + y_margin)
|
|
|
|
|
| plt.tight_layout()
|
|
|
| return fig
|
|
|
| def process_csv(file_obj, distribution_type, global_radius, show_basemap, auto_exclusions, exclusion_file, include_time, time_start, time_end, seed):
|
| """Process input CSV and generate diffused points"""
|
| try:
|
|
|
| df = pd.read_csv(file_obj.name)
|
|
|
|
|
| required_cols = ['lat', 'lon', 'count']
|
| if not all(col in df.columns for col in required_cols):
|
| return None, f"Error: CSV must contain columns: {', '.join(required_cols)}"
|
|
|
|
|
| if global_radius and global_radius.strip():
|
| try:
|
| global_radius = float(global_radius)
|
| except ValueError:
|
| return None, "Error: Global radius must be a number"
|
| else:
|
| global_radius = None
|
|
|
| if 'radius' not in df.columns:
|
| return None, "Error: Either provide a global radius or include a 'radius' column in the CSV"
|
|
|
|
|
| if seed and seed.strip():
|
| try:
|
| seed = int(seed)
|
| except ValueError:
|
| return None, "Error: Seed must be an integer"
|
| else:
|
| seed = None
|
|
|
|
|
| exclusion_zones_gdf = None
|
|
|
|
|
| if exclusion_file is not None:
|
| try:
|
| import geopandas as gpd
|
|
|
|
|
| file_extension = os.path.splitext(exclusion_file.name)[1].lower()
|
|
|
| if file_extension in ['.geojson', '.json']:
|
| exclusion_zones_gdf = gpd.read_file(exclusion_file.name)
|
| elif file_extension == '.gpkg':
|
| exclusion_zones_gdf = gpd.read_file(exclusion_file.name)
|
| elif file_extension == '.shp':
|
| exclusion_zones_gdf = gpd.read_file(exclusion_file.name)
|
| else:
|
| return None, f"Error: Unsupported exclusion zone file format: {file_extension}"
|
|
|
|
|
| if exclusion_zones_gdf.crs is None:
|
| exclusion_zones_gdf = exclusion_zones_gdf.set_crs('EPSG:4326')
|
|
|
| print(f"Loaded {len(exclusion_zones_gdf)} custom exclusion zones from {exclusion_file.name}")
|
|
|
| except ImportError:
|
| return None, "Error: GeoPandas required for exclusion zones processing"
|
| except Exception as e:
|
| return None, f"Error reading exclusion zones file: {str(e)}"
|
|
|
|
|
| elif auto_exclusions and len(auto_exclusions) > 0:
|
| try:
|
|
|
| bounds = calculate_bounds_from_points(df)
|
| print(f"Fetching automatic exclusions for bounds: {bounds}")
|
|
|
|
|
| exclusion_zones_gdf = fetch_osm_exclusion_zones(bounds, auto_exclusions)
|
|
|
| if exclusion_zones_gdf is not None:
|
| print(f"Fetched {len(exclusion_zones_gdf)} exclusion zones from OpenStreetMap")
|
| else:
|
| print("No exclusion zones found in OpenStreetMap for this area")
|
|
|
| except Exception as e:
|
| print(f"Warning: Could not fetch automatic exclusions: {str(e)}")
|
|
|
| exclusion_zones_gdf = None
|
|
|
|
|
| if include_time:
|
| if not time_start or not time_end:
|
| return None, "Error: If time distribution is enabled, both start and end times must be provided"
|
| try:
|
| time_start_dt = datetime.datetime.strptime(time_start, "%Y-%m-%d %H:%M:%S")
|
| time_end_dt = datetime.datetime.strptime(time_end, "%Y-%m-%d %H:%M:%S")
|
| if time_start_dt >= time_end_dt:
|
| return None, "Error: End time must be after start time"
|
| except ValueError:
|
| return None, "Error: Invalid time format. Use YYYY-MM-DD HH:MM:SS"
|
| else:
|
| time_start_dt = None
|
| time_end_dt = None
|
|
|
|
|
| diffuser = SpatialDiffuser()
|
| result_df = diffuser.diffuse_points(
|
| input_data=df,
|
| distribution_type=distribution_type,
|
| global_radius=global_radius,
|
| time_start=time_start_dt,
|
| time_end=time_end_dt,
|
| seed=seed,
|
| exclusion_zones_gdf=exclusion_zones_gdf
|
| )
|
|
|
|
|
| temp_file = "diffused_points.csv"
|
| result_df.to_csv(temp_file, index=False)
|
|
|
|
|
| fig = create_visualization(df, result_df, show_basemap, exclusion_zones_gdf)
|
|
|
| return fig, temp_file
|
|
|
| except Exception as e:
|
| return None, f"Error: {str(e)}"
|
|
|
| def create_diffusion_interface():
|
| """Create Gradio interface for the spatial diffusion tool"""
|
|
|
| with gr.Blocks() as diffusion_interface:
|
| gr.Markdown("## 🗺️ Spatial Diffusion Tool")
|
|
|
| with gr.Row():
|
| with gr.Column(scale=1):
|
|
|
| gr.Markdown("""
|
| ### About This Tool
|
| Transform aggregated geographic points with counts into individual points using spatial diffusion methods.
|
|
|
| **Input CSV Format:**
|
| - `lat`: Latitude of source point
|
| - `lon`: Longitude of source point
|
| - `count`: Number of points to generate
|
| - `radius`: (Optional) Diffusion radius in meters
|
|
|
| **Distribution Types:**
|
| - **Uniform**: Equal probability throughout circle
|
| - **Normal**: Higher density near center
|
| - **Exponential Decay**: Density decreases from center
|
| - **Distance-Weighted**: More points at medium distances
|
| """)
|
|
|
|
|
| input_file = gr.File(label="Input CSV File", file_types=[".csv"])
|
|
|
|
|
| gr.Markdown("### 🎯 Distribution Options")
|
| with gr.Row():
|
| distribution = gr.Dropdown(
|
| choices=["uniform", "normal", "exponential_decay", "distance_weighted"],
|
| value="uniform",
|
| label="Distribution Type",
|
| scale=2
|
| )
|
| seed = gr.Textbox(
|
| label="Random Seed (optional)",
|
| placeholder="e.g. 42",
|
| scale=1
|
| )
|
|
|
| global_radius = gr.Textbox(
|
| label="Global Radius (meters)",
|
| placeholder="Only if radius column not in CSV"
|
| )
|
|
|
|
|
| with gr.Accordion("⏰ Temporal Distribution (Optional)", open=False):
|
| include_time = gr.Checkbox(label="Enable Temporal Distribution", value=False)
|
| with gr.Group() as time_group:
|
| time_start = gr.Textbox(
|
| label="Start Time",
|
| placeholder="YYYY-MM-DD HH:MM:SS"
|
| )
|
| time_end = gr.Textbox(
|
| label="End Time",
|
| placeholder="YYYY-MM-DD HH:MM:SS"
|
| )
|
|
|
|
|
| gr.Markdown("### 🗺️ Map & Exclusion Options")
|
| show_basemap = gr.Checkbox(
|
| label="Show underlying map (requires internet)",
|
| value=False
|
| )
|
| gr.Markdown("*Adds geographic context with street/satellite imagery*")
|
|
|
|
|
| auto_exclusions = gr.CheckboxGroup(
|
| label="Auto-exclude from OpenStreetMap:",
|
| choices=["Water bodies", "Parks & green spaces", "Industrial areas", "Major roads"],
|
| value=[]
|
| )
|
|
|
|
|
| with gr.Accordion("🔧 Advanced: Custom Exclusion Zones", open=False):
|
| exclusion_file = gr.File(
|
| label="Upload custom shapefile (optional)",
|
| file_types=[".geojson", ".json", ".gpkg", ".shp"]
|
| )
|
| gr.Markdown("*Overrides automatic exclusions if provided*")
|
|
|
| process_btn = gr.Button(
|
| "🎯 Generate Diffused Points",
|
| variant="primary",
|
| size="lg"
|
| )
|
|
|
| with gr.Column(scale=2):
|
|
|
| plot_output = gr.Plot(
|
| label="📍 Spatial Diffusion Visualization",
|
| show_label=True
|
| )
|
|
|
| with gr.Row():
|
| with gr.Column(scale=2):
|
| file_output = gr.File(label="📥 Download Generated Points")
|
| with gr.Column(scale=1):
|
| gr.Markdown(
|
| """
|
| **Legend:**
|
| 🟣 Source points (sized by count)
|
| 🟠 Generated points
|
| ⭕ Diffusion radius
|
| 🟦 Water bodies
|
| 🟢 Parks & green spaces
|
| ⬜ Industrial areas
|
| 🟡 Major roads
|
| """
|
| )
|
|
|
|
|
| process_btn.click(
|
| fn=process_csv,
|
| inputs=[input_file, distribution, global_radius, show_basemap, auto_exclusions, exclusion_file, include_time, time_start, time_end, seed],
|
| outputs=[plot_output, file_output]
|
| )
|
|
|
|
|
| include_time.change(
|
| fn=lambda x: gr.update(visible=x),
|
| inputs=[include_time],
|
| outputs=[time_group]
|
| )
|
|
|
| return diffusion_interface
|
|
|
| if __name__ == "__main__":
|
|
|
| app = create_diffusion_interface()
|
| app.launch() |