| import numpy as np |
| from .grid import WorldGrid |
|
|
| try: |
| from numba import jit |
| HAS_NUMBA = True |
| except ImportError: |
| HAS_NUMBA = False |
| |
| def jit(*args, **kwargs): |
| def decorator(func): |
| return func |
| return decorator |
|
|
| @jit(nopython=True) |
| def _d8_flow_kernel(elev, discharge, flow_dir, underwater, h, w): |
| """ |
| Numba-optimized D8 Flow Routing |
| """ |
| |
| flat_elev = elev.ravel() |
| |
| |
| flat_indices = np.argsort(flat_elev)[::-1] |
| |
| |
| |
| dr = np.array([-1, -1, -1, 0, 0, 1, 1, 1]) |
| dc = np.array([-1, 0, 1, -1, 1, -1, 0, 1]) |
| |
| for i in range(len(flat_indices)): |
| idx = flat_indices[i] |
| r = idx // w |
| c = idx % w |
| |
| |
| if underwater[r, c]: |
| continue |
| |
| current_z = elev[r, c] |
| min_z = current_z |
| target_r = -1 |
| target_c = -1 |
| target_k = -1 |
| |
| |
| for k in range(8): |
| nr = r + dr[k] |
| nc = c + dc[k] |
| |
| if 0 <= nr < h and 0 <= nc < w: |
| n_elev = elev[nr, nc] |
| if n_elev < min_z: |
| min_z = n_elev |
| target_r = nr |
| target_c = nc |
| target_k = k |
| |
| |
| if target_r != -1: |
| discharge[target_r, target_c] += discharge[r, c] |
| flow_dir[r, c] = target_k |
|
|
| class HydroKernel: |
| """ |
| ์๋ ฅํ ์ปค๋ (Hydro Kernel) |
| |
| ๋ฌผ์ ํ๋ฆ๊ณผ ๋ถํฌ๋ฅผ ์๋ฎฌ๋ ์ด์
ํฉ๋๋ค. |
| - D8 ์๊ณ ๋ฆฌ์ฆ: ํ์ฒ ๋คํธ์ํฌ ํ์ฑ (Numba ๊ฐ์ ์ ์ฉ) |
| - Shallow Water (๊ฐ์ํ): ํ์ ๋ฐ ํด์๋ฉด ์นจ์ |
| """ |
| |
| def __init__(self, grid: WorldGrid): |
| self.grid = grid |
| |
| def route_flow_d8(self, precipitation: float = 0.001) -> np.ndarray: |
| """ |
| D8 ์๊ณ ๋ฆฌ์ฆ์ผ๋ก ์ ๋(Discharge) ๊ณ์ฐ (Numba ๊ฐ์) |
| """ |
| h, w = self.grid.height, self.grid.width |
| elev = self.grid.elevation |
| |
| |
| discharge = np.full((h, w), precipitation * (self.grid.cell_size ** 2), dtype=np.float64) |
| |
| |
| underwater = self.grid.is_underwater() |
| |
| |
| if HAS_NUMBA: |
| _d8_flow_kernel(elev, discharge, self.grid.flow_dir, underwater, h, w) |
| else: |
| |
| self._route_flow_d8_python(discharge, self.grid.flow_dir, elev, underwater, h, w) |
| |
| return discharge |
|
|
| def route_flow_mfd(self, precipitation: float = 0.001, p: float = 1.1) -> np.ndarray: |
| """ |
| MFD (Multiple Flow Direction) ์ ๋ ๋ถ๋ฐฐ |
| |
| D8๊ณผ ๋ฌ๋ฆฌ ๋ฎ์ ๋ชจ๋ ์ด์์๊ฒ ๊ฒฝ์ฌ ๋น๋ก๋ก ์ ๋ ๋ถ๋ฐฐ. |
| ๋ง๋ฅ(Braided Stream) ๋ฐ ๋ถ๊ธฐ๋ฅ ํํ์ ์ ํฉ. |
| |
| Args: |
| precipitation: ๊ฐ์๋ |
| p: ๋ถ๋ฐฐ ์ง์ (1.0=์ ํ, >1.0=๊ฐํ๋ฅธ ๊ณณ์ ์ง์ค) |
| |
| Returns: |
| discharge: ์ ๋ ๋ฐฐ์ด |
| """ |
| h, w = self.grid.height, self.grid.width |
| elev = self.grid.elevation |
| |
| |
| discharge = np.full((h, w), precipitation * (self.grid.cell_size ** 2), dtype=np.float64) |
| |
| |
| underwater = self.grid.is_underwater() |
| |
| |
| dr = np.array([-1, -1, -1, 0, 0, 1, 1, 1]) |
| dc = np.array([-1, 0, 1, -1, 1, -1, 0, 1]) |
| dist = np.array([1.414, 1.0, 1.414, 1.0, 1.0, 1.414, 1.0, 1.414]) |
| |
| |
| flat_indices = np.argsort(elev.ravel())[::-1] |
| |
| for idx in flat_indices: |
| r, c = idx // w, idx % w |
| |
| if underwater[r, c]: |
| continue |
| |
| current_z = elev[r, c] |
| current_q = discharge[r, c] |
| |
| if current_q <= 0: |
| continue |
| |
| |
| slopes = [] |
| targets = [] |
| |
| for k in range(8): |
| nr, nc = r + dr[k], c + dc[k] |
| if 0 <= nr < h and 0 <= nc < w: |
| dz = current_z - elev[nr, nc] |
| if dz > 0: |
| slope = dz / (dist[k] * self.grid.cell_size) |
| slopes.append(slope ** p) |
| targets.append((nr, nc)) |
| |
| if not slopes: |
| continue |
| |
| |
| total_slope = sum(slopes) |
| for i, (nr, nc) in enumerate(targets): |
| fraction = slopes[i] / total_slope |
| discharge[nr, nc] += current_q * fraction |
| |
| return discharge |
|
|
| def _route_flow_d8_python(self, discharge, flow_dir, elev, underwater, h, w): |
| """Legacy Python implementation for fallback""" |
| flat_indices = np.argsort(elev.ravel())[::-1] |
| neighbors = [(-1,-1), (-1,0), (-1,1), (0,-1), (0,1), (1,-1), (1,0), (1,1)] |
| |
| for idx in flat_indices: |
| r, c = idx // w, idx % w |
| if underwater[r, c]: continue |
| |
| min_z = elev[r, c] |
| target = None |
| target_k = -1 |
| |
| for k, (dr, dc) in enumerate(neighbors): |
| nr, nc = r + dr, c + dc |
| if 0 <= nr < h and 0 <= nc < w: |
| if elev[nr, nc] < min_z: |
| min_z = elev[nr, nc] |
| target = (nr, nc) |
| target_k = k |
| |
| if target: |
| tr, tc = target |
| discharge[tr, tc] += discharge[r, c] |
| flow_dir[r, c] = target_k |
|
|
| def calculate_water_depth(self, discharge: np.ndarray, manning_n: float = 0.03) -> np.ndarray: |
| """ |
| Manning ๊ณต์์ ์ด์ฉํ ํ์ฒ ์์ฌ ์ถ์ (์ ์ ๋ฑ๋ฅ ๊ฐ์ ) |
| Depth = (Q * n / (Width * S^0.5))^(3/5) |
| |
| * ๊ฒฝ์ฌ(S)๊ฐ 0์ธ ๊ฒฝ์ฐ ์ต์ ๊ฒฝ์ฌ ์ ์ฉ |
| * ํํญ(W)์ ์ ๋(Q)์ ํจ์๋ก ๊ฐ์ (W ~ Q^0.5) |
| """ |
| slope, _ = self.grid.get_gradient() |
| slope = np.maximum(slope, 0.001) |
| |
| |
| |
| width = 5.0 * np.sqrt(discharge) |
| width = np.maximum(width, 1.0) |
| |
| |
| |
| |
| |
| |
| |
| val = (discharge * manning_n) / (width * np.sqrt(slope)) |
| depth = np.power(val, 0.6) |
| |
| return depth |
| |
| def simulate_inundation(self): |
| """ํด์๋ฉด ์์น์ ๋ฐ๋ฅธ ์นจ์ ์๋ฎฌ๋ ์ด์
""" |
| |
| underwater = self.grid.is_underwater() |
| |
| |
| sea_depth = np.maximum(0, self.grid.sea_level - self.grid.elevation) |
| |
| |
| |
| self.grid.water_depth = np.where(underwater, sea_depth, self.grid.water_depth) |
|
|
| def fill_sinks(self, max_iterations: int = 100, tolerance: float = 0.001): |
| """ |
| ์ฑํฌ(์
๋ฉ์ด) ์ฑ์ฐ๊ธฐ - ํธ์ ํ์ฑ |
| |
| ๋ฌผ์ด ๊ฐํ๋ ๊ณณ์ ์ฐพ์ ์ฑ์์ ์๋ฅ(Overflow)๊ฐ ๊ฐ๋ฅํ๋๋ก ํจ. |
| ๊ฐ๋จํ ๋ฐ๋ณต ์ค๋ฌด๋ฉ ๋ฐฉ์ (Priority-Flood ๊ทผ์ฌ) |
| |
| Args: |
| max_iterations: ์ต๋ ๋ฐ๋ณต ํ์ |
| tolerance: ์๋ ด ํ์ฉ ์ค์ฐจ |
| """ |
| h, w = self.grid.height, self.grid.width |
| elev = self.grid.elevation.copy() |
| |
| |
| |
| |
| dr = [-1, -1, -1, 0, 0, 1, 1, 1] |
| dc = [-1, 0, 1, -1, 1, -1, 0, 1] |
| |
| for iteration in range(max_iterations): |
| changed = False |
| new_elev = elev.copy() |
| |
| for r in range(1, h - 1): |
| for c in range(1, w - 1): |
| current = elev[r, c] |
| |
| |
| min_neighbor = current |
| for k in range(8): |
| nr, nc = r + dr[k], c + dc[k] |
| if 0 <= nr < h and 0 <= nc < w: |
| min_neighbor = min(min_neighbor, elev[nr, nc]) |
| |
| |
| if current < min_neighbor: |
| |
| new_elev[r, c] = min_neighbor + tolerance |
| changed = True |
| |
| elev = new_elev |
| |
| if not changed: |
| break |
| |
| |
| fill_amount = elev - self.grid.elevation |
| |
| |
| self.grid.water_depth += np.maximum(fill_amount, 0) |
| |
| |
| |
| |
| return fill_amount |
|
|
|
|