| """ |
| Geo-Lab AI: ์ง์ง ๋ฌผ๋ฆฌ ์๋ฎฌ๋ ์ด์
์์ง |
| Stream Power Law ๊ธฐ๋ฐ ์ค์ ์นจ์ ์๋ฎฌ๋ ์ด์
|
| """ |
| import numpy as np |
| from dataclasses import dataclass, field |
| from typing import List, Tuple, Optional |
| from scipy.ndimage import gaussian_filter, uniform_filter |
|
|
|
|
| @dataclass |
| class TerrainGrid: |
| """2D ์งํ ๊ทธ๋ฆฌ๋""" |
| width: int = 100 |
| height: int = 100 |
| cell_size: float = 10.0 |
| |
| elevation: np.ndarray = field(default=None) |
| bedrock: np.ndarray = field(default=None) |
| rock_hardness: np.ndarray = field(default=None) |
| |
| def __post_init__(self): |
| if self.elevation is None: |
| self.elevation = np.zeros((self.height, self.width)) |
| if self.bedrock is None: |
| self.bedrock = np.full((self.height, self.width), -100.0) |
| if self.rock_hardness is None: |
| self.rock_hardness = np.full((self.height, self.width), 0.5) |
| |
| def get_slope(self) -> np.ndarray: |
| """๊ฒฝ์ฌ๋ ๊ณ์ฐ (m/m)""" |
| dy, dx = np.gradient(self.elevation, self.cell_size) |
| return np.sqrt(dx**2 + dy**2) |
| |
| def get_slope_direction(self) -> Tuple[np.ndarray, np.ndarray]: |
| """์ต๋ ๊ฒฝ์ฌ ๋ฐฉํฅ (๋จ์ ๋ฒกํฐ)""" |
| dy, dx = np.gradient(self.elevation, self.cell_size) |
| magnitude = np.sqrt(dx**2 + dy**2) + 1e-10 |
| return -dx / magnitude, -dy / magnitude |
|
|
|
|
| @dataclass |
| class WaterFlow: |
| """์๋ฌธ ์๋ฎฌ๋ ์ด์
""" |
| terrain: TerrainGrid |
| |
| |
| discharge: np.ndarray = field(default=None) |
| |
| velocity: np.ndarray = field(default=None) |
| |
| depth: np.ndarray = field(default=None) |
| |
| shear_stress: np.ndarray = field(default=None) |
| |
| manning_n: float = 0.03 |
| |
| def __post_init__(self): |
| shape = (self.terrain.height, self.terrain.width) |
| if self.discharge is None: |
| self.discharge = np.zeros(shape) |
| if self.velocity is None: |
| self.velocity = np.zeros(shape) |
| if self.depth is None: |
| self.depth = np.zeros(shape) |
| if self.shear_stress is None: |
| self.shear_stress = np.zeros(shape) |
| |
| def flow_accumulation_d8(self, precipitation: float = 0.001): |
| """D8 ์๊ณ ๋ฆฌ์ฆ ๊ธฐ๋ฐ ์ ๋ ๋์ """ |
| h, w = self.terrain.height, self.terrain.width |
| elev = self.terrain.elevation |
| |
| |
| acc = np.full((h, w), precipitation) |
| |
| |
| flat_elev = elev.ravel() |
| sorted_indices = np.argsort(flat_elev)[::-1] |
| |
| |
| neighbors = [(-1,-1), (-1,0), (-1,1), (0,-1), (0,1), (1,-1), (1,0), (1,1)] |
| |
| for idx in sorted_indices: |
| y, x = idx // w, idx % w |
| current_elev = elev[y, x] |
| |
| |
| min_elev = current_elev |
| min_neighbor = None |
| |
| for dy, dx in neighbors: |
| ny, nx = y + dy, x + dx |
| if 0 <= ny < h and 0 <= nx < w: |
| if elev[ny, nx] < min_elev: |
| min_elev = elev[ny, nx] |
| min_neighbor = (ny, nx) |
| |
| |
| if min_neighbor is not None: |
| acc[min_neighbor] += acc[y, x] |
| |
| self.discharge = acc |
| return acc |
| |
| def calculate_hydraulics(self): |
| """Manning ๋ฐฉ์ ์ ๊ธฐ๋ฐ ์๋ฆฌํ ๊ณ์ฐ""" |
| slope = self.terrain.get_slope() + 0.0001 |
| |
| |
| channel_width = 2 * np.power(self.discharge + 0.01, 0.4) |
| |
| |
| |
| |
| |
| |
| self.depth = np.power( |
| self.discharge * self.manning_n / (channel_width * np.sqrt(slope) + 0.01), |
| 0.6 |
| ) |
| self.depth = np.clip(self.depth, 0, 50) |
| |
| |
| hydraulic_radius = self.depth |
| self.velocity = (1 / self.manning_n) * np.power(hydraulic_radius, 2/3) * np.sqrt(slope) |
| self.velocity = np.clip(self.velocity, 0, 10) |
| |
| |
| rho_water = 1000 |
| g = 9.81 |
| self.shear_stress = rho_water * g * self.depth * slope |
|
|
|
|
| class StreamPowerErosion: |
| """Stream Power Law ๊ธฐ๋ฐ ์นจ์ |
| |
| E = K * A^m * S^n |
| - E: ์นจ์๋ฅ (m/yr) |
| - K: ์นจ์ ๊ณ์ (์์ ํน์ฑ ๋ฐ์) |
| - A: ์ ์ญ ๋ฉด์ (โ ์ ๋) |
| - S: ๊ฒฝ์ฌ |
| - m: ๋ฉด์ ์ง์ (typically 0.3-0.6) |
| - n: ๊ฒฝ์ฌ ์ง์ (typically 1.0-2.0) |
| """ |
| |
| def __init__(self, K: float = 1e-5, m: float = 0.5, n: float = 1.0): |
| self.K = K |
| self.m = m |
| self.n = n |
| |
| def calculate_erosion(self, terrain: TerrainGrid, water: WaterFlow, dt: float = 1.0) -> np.ndarray: |
| """์นจ์๋ ๊ณ์ฐ""" |
| slope = terrain.get_slope() |
| |
| |
| |
| effective_K = self.K * (1 - terrain.rock_hardness * 0.9) |
| |
| erosion_rate = effective_K * np.power(water.discharge, self.m) * np.power(slope + 0.001, self.n) |
| |
| erosion = erosion_rate * dt |
| |
| |
| max_erosion = terrain.elevation - terrain.bedrock |
| erosion = np.minimum(erosion, np.maximum(max_erosion, 0)) |
| |
| return np.clip(erosion, 0, 5.0) |
|
|
|
|
| class HillslopeProcess: |
| """์ฌ๋ฉด ํ๋ก์ธ์ค (Mass Wasting) |
| |
| V์๊ณก ํ์ฑ์ ํต์ฌ - ํ๋ฐฉ ์นจ์ ํ ์ฌ๋ฉด ๋ถ๊ดด |
| """ |
| |
| def __init__(self, critical_slope: float = 0.7, diffusion_rate: float = 0.01): |
| self.critical_slope = critical_slope |
| self.diffusion_rate = diffusion_rate |
| |
| def mass_wasting(self, terrain: TerrainGrid, dt: float = 1.0) -> np.ndarray: |
| """์ฌ๋ฉด ๋ถ๊ดด (๊ธ๊ฒฝ์ฌ โ ๋ฌผ์ง ์ด๋)""" |
| h, w = terrain.height, terrain.width |
| change = np.zeros((h, w)) |
| |
| elev = terrain.elevation |
| slope = terrain.get_slope() |
| |
| |
| unstable = slope > self.critical_slope |
| |
| |
| for y in range(1, h-1): |
| for x in range(1, w-1): |
| if not unstable[y, x]: |
| continue |
| |
| current = elev[y, x] |
| excess = (slope[y, x] - self.critical_slope) * terrain.cell_size |
| |
| |
| neighbors = [(y-1,x), (y+1,x), (y,x-1), (y,x+1)] |
| lower_neighbors = [(ny, nx) for ny, nx in neighbors |
| if elev[ny, nx] < current] |
| |
| if lower_neighbors: |
| transfer = excess * 0.2 * dt |
| change[y, x] -= transfer |
| per_neighbor = transfer / len(lower_neighbors) |
| for ny, nx in lower_neighbors: |
| change[ny, nx] += per_neighbor |
| |
| return change |
| |
| def soil_creep(self, terrain: TerrainGrid, dt: float = 1.0) -> np.ndarray: |
| """ํ ์ ํฌ๋ฆฌํ (๋๋ฆฐ ํ์ฐ)""" |
| |
| laplacian = ( |
| np.roll(terrain.elevation, 1, axis=0) + |
| np.roll(terrain.elevation, -1, axis=0) + |
| np.roll(terrain.elevation, 1, axis=1) + |
| np.roll(terrain.elevation, -1, axis=1) - |
| 4 * terrain.elevation |
| ) |
| |
| return self.diffusion_rate * laplacian * dt |
|
|
|
|
| class VValleySimulation: |
| """V์๊ณก ์๋ฎฌ๋ ์ด์
- ์ค์ ๋ฌผ๋ฆฌ ๊ธฐ๋ฐ |
| |
| ํ๋ก์ธ์ค: |
| 1. ๊ฐ์ โ ์ ์ถ (D8 flow accumulation) |
| 2. Stream Power Law ์นจ์ |
| 3. ์ฌ๋ฉด ๋ถ๊ดด (Mass Wasting) |
| 4. ํ ์ ํฌ๋ฆฌํ |
| """ |
| |
| def __init__(self, width: int = 100, height: int = 100): |
| self.terrain = TerrainGrid(width=width, height=height) |
| self.water = WaterFlow(terrain=self.terrain) |
| self.erosion = StreamPowerErosion() |
| self.hillslope = HillslopeProcess() |
| |
| self.history: List[np.ndarray] = [] |
| self.time = 0.0 |
| |
| def initialize_terrain(self, max_elevation: float = 500.0, |
| initial_channel_depth: float = 10.0, |
| rock_hardness: float = 0.5): |
| """์ด๊ธฐ ์งํ ์ค์ """ |
| h, w = self.terrain.height, self.terrain.width |
| |
| |
| for y in range(h): |
| base = max_elevation * (1 - y / h) |
| self.terrain.elevation[y, :] = base |
| |
| |
| center = w // 2 |
| for x in range(center - 3, center + 4): |
| if 0 <= x < w: |
| depth = initial_channel_depth * (1 - abs(x - center) / 4) |
| self.terrain.elevation[:, x] -= depth |
| |
| |
| self.terrain.rock_hardness[:] = rock_hardness |
| |
| |
| self.terrain.bedrock[:] = self.terrain.elevation.min() - 200 |
| |
| self.history = [self.terrain.elevation.copy()] |
| self.time = 0.0 |
| |
| def step(self, dt: float = 1.0, precipitation: float = 0.001): |
| """1 ํ์์คํ
์งํ""" |
| |
| self.water.flow_accumulation_d8(precipitation) |
| self.water.calculate_hydraulics() |
| |
| |
| erosion = self.erosion.calculate_erosion(self.terrain, self.water, dt) |
| self.terrain.elevation -= erosion |
| |
| |
| wasting = self.hillslope.mass_wasting(self.terrain, dt) |
| self.terrain.elevation += wasting |
| |
| |
| creep = self.hillslope.soil_creep(self.terrain, dt) |
| self.terrain.elevation += creep |
| |
| self.time += dt |
| |
| def run(self, total_time: float, save_interval: float = 100.0, dt: float = 1.0): |
| """์๋ฎฌ๋ ์ด์
์คํ ๋ฐ ํ์คํ ๋ฆฌ ์ ์ฅ""" |
| steps = int(total_time / dt) |
| save_every = int(save_interval / dt) |
| |
| for i in range(steps): |
| self.step(dt) |
| if (i + 1) % save_every == 0: |
| self.history.append(self.terrain.elevation.copy()) |
| |
| return self.history |
| |
| def get_cross_section(self, y_position: int = None) -> Tuple[np.ndarray, np.ndarray]: |
| """๋จ๋ฉด ์ถ์ถ""" |
| if y_position is None: |
| y_position = self.terrain.height // 2 |
| |
| x = np.arange(self.terrain.width) * self.terrain.cell_size |
| z = self.terrain.elevation[y_position, :] |
| |
| return x, z |
| |
| def measure_valley_depth(self) -> float: |
| """V์๊ณก ๊น์ด ์ธก์ """ |
| center = self.terrain.width // 2 |
| y_mid = self.terrain.height // 2 |
| |
| |
| left = self.terrain.elevation[y_mid, max(0, center-20)] |
| right = self.terrain.elevation[y_mid, min(self.terrain.width-1, center+20)] |
| center_elev = self.terrain.elevation[y_mid, center] |
| |
| return max(0, (left + right) / 2 - center_elev) |
|
|
|
|
| |
| def precompute_v_valley(max_time: int = 10000, |
| rock_hardness: float = 0.5, |
| K: float = 1e-5, |
| precipitation: float = 0.001, |
| save_every: int = 100) -> List[np.ndarray]: |
| """V์๊ณก ์๋ฎฌ๋ ์ด์
ํ๋ฆฌ์ปดํจํ
""" |
| sim = VValleySimulation(width=100, height=100) |
| sim.erosion.K = K |
| sim.initialize_terrain(rock_hardness=rock_hardness) |
| |
| history = sim.run( |
| total_time=max_time, |
| save_interval=save_every, |
| dt=1.0 |
| ) |
| |
| return history |
|
|
|
|
| if __name__ == "__main__": |
| print("V์๊ณก ๋ฌผ๋ฆฌ ์๋ฎฌ๋ ์ด์
ํ
์คํธ") |
| print("=" * 50) |
| |
| sim = VValleySimulation() |
| sim.initialize_terrain(rock_hardness=0.3) |
| |
| print(f"์ด๊ธฐ ์ํ: ๊น์ด = {sim.measure_valley_depth():.1f}m") |
| |
| for year in [1000, 2000, 5000, 10000]: |
| sim.run(1000, save_interval=1000) |
| depth = sim.measure_valley_depth() |
| print(f"Year {year}: ๊น์ด = {depth:.1f}m") |
| |
| print("=" * 50) |
| print("ํ
์คํธ ์๋ฃ!") |
|
|