Geo-Lab / engine /fluids.py
HANSOL
Geo-Lab v4.1 - Clean deployment
2afa69c
import numpy as np
from .grid import WorldGrid
try:
from numba import jit
HAS_NUMBA = True
except ImportError:
HAS_NUMBA = False
# Dummy decorator if numba is missing
def jit(*args, **kwargs):
def decorator(func):
return func
return decorator
@jit(nopython=True)
def _d8_flow_kernel(elev, discharge, flow_dir, underwater, h, w):
"""
Numba-optimized D8 Flow Routing
"""
# Flatten elevation to sort
flat_elev = elev.ravel()
# Sort indices descending (Source -> Sink)
# Note: argsort in numba supports 1D array
flat_indices = np.argsort(flat_elev)[::-1]
# 8-neighbor offsets
# Numba doesn't like list of tuples in loops sometimes, simple arrays are better
dr = np.array([-1, -1, -1, 0, 0, 1, 1, 1])
dc = np.array([-1, 0, 1, -1, 1, -1, 0, 1])
for i in range(len(flat_indices)):
idx = flat_indices[i]
r = idx // w
c = idx % w
# Check underwater
if underwater[r, c]:
continue
current_z = elev[r, c]
min_z = current_z
target_r = -1
target_c = -1
target_k = -1
# Find steepest descent
for k in range(8):
nr = r + dr[k]
nc = c + dc[k]
if 0 <= nr < h and 0 <= nc < w:
n_elev = elev[nr, nc]
if n_elev < min_z:
min_z = n_elev
target_r = nr
target_c = nc
target_k = k
# Pass flow to lowest neighbor
if target_r != -1:
discharge[target_r, target_c] += discharge[r, c]
flow_dir[r, c] = target_k # Store direction (0-7)
class HydroKernel:
"""
์ˆ˜๋ ฅํ•™ ์ปค๋„ (Hydro Kernel)
๋ฌผ์˜ ํ๋ฆ„๊ณผ ๋ถ„ํฌ๋ฅผ ์‹œ๋ฎฌ๋ ˆ์ด์…˜ํ•ฉ๋‹ˆ๋‹ค.
- D8 ์•Œ๊ณ ๋ฆฌ์ฆ˜: ํ•˜์ฒœ ๋„คํŠธ์›Œํฌ ํ˜•์„ฑ (Numba ๊ฐ€์† ์ ์šฉ)
- Shallow Water (๊ฐ„์†Œํ™”): ํ™์ˆ˜ ๋ฐ ํ•ด์ˆ˜๋ฉด ์นจ์ˆ˜
"""
def __init__(self, grid: WorldGrid):
self.grid = grid
def route_flow_d8(self, precipitation: float = 0.001) -> np.ndarray:
"""
D8 ์•Œ๊ณ ๋ฆฌ์ฆ˜์œผ๋กœ ์œ ๋Ÿ‰(Discharge) ๊ณ„์‚ฐ (Numba ๊ฐ€์†)
"""
h, w = self.grid.height, self.grid.width
elev = self.grid.elevation
# 1. ์ดˆ๊ธฐ ๊ฐ•์ˆ˜ ๋ถ„ํฌ
discharge = np.full((h, w), precipitation * (self.grid.cell_size ** 2), dtype=np.float64)
# 2. ํ•ด์ˆ˜๋ฉด ๋งˆ์Šคํฌ
underwater = self.grid.is_underwater()
# 3. Numba Kernel ํ˜ธ์ถœ
if HAS_NUMBA:
_d8_flow_kernel(elev, discharge, self.grid.flow_dir, underwater, h, w)
else:
# Fallback (Slow Python) if numba somehow fails to import
self._route_flow_d8_python(discharge, self.grid.flow_dir, elev, underwater, h, w)
return discharge
def route_flow_mfd(self, precipitation: float = 0.001, p: float = 1.1) -> np.ndarray:
"""
MFD (Multiple Flow Direction) ์œ ๋Ÿ‰ ๋ถ„๋ฐฐ
D8๊ณผ ๋‹ฌ๋ฆฌ ๋‚ฎ์€ ๋ชจ๋“  ์ด์›ƒ์—๊ฒŒ ๊ฒฝ์‚ฌ ๋น„๋ก€๋กœ ์œ ๋Ÿ‰ ๋ถ„๋ฐฐ.
๋ง๋ฅ˜(Braided Stream) ๋ฐ ๋ถ„๊ธฐ๋ฅ˜ ํ‘œํ˜„์— ์ ํ•ฉ.
Args:
precipitation: ๊ฐ•์ˆ˜๋Ÿ‰
p: ๋ถ„๋ฐฐ ์ง€์ˆ˜ (1.0=์„ ํ˜•, >1.0=๊ฐ€ํŒŒ๋ฅธ ๊ณณ์— ์ง‘์ค‘)
Returns:
discharge: ์œ ๋Ÿ‰ ๋ฐฐ์—ด
"""
h, w = self.grid.height, self.grid.width
elev = self.grid.elevation
# ์ดˆ๊ธฐ ๊ฐ•์ˆ˜
discharge = np.full((h, w), precipitation * (self.grid.cell_size ** 2), dtype=np.float64)
# ํ•ด์ˆ˜๋ฉด ๋งˆ์Šคํฌ
underwater = self.grid.is_underwater()
# D8 ๋ฐฉํ–ฅ ๋ฒกํ„ฐ
dr = np.array([-1, -1, -1, 0, 0, 1, 1, 1])
dc = np.array([-1, 0, 1, -1, 1, -1, 0, 1])
dist = np.array([1.414, 1.0, 1.414, 1.0, 1.0, 1.414, 1.0, 1.414]) # ๋Œ€๊ฐ ๊ฑฐ๋ฆฌ
# ์ •๋ ฌ (๋†’์€ ๊ณณ -> ๋‚ฎ์€ ๊ณณ)
flat_indices = np.argsort(elev.ravel())[::-1]
for idx in flat_indices:
r, c = idx // w, idx % w
if underwater[r, c]:
continue
current_z = elev[r, c]
current_q = discharge[r, c]
if current_q <= 0:
continue
# ๋‚ฎ์€ ์ด์›ƒ๋“ค์˜ ๊ฒฝ์‚ฌ ๊ณ„์‚ฐ
slopes = []
targets = []
for k in range(8):
nr, nc = r + dr[k], c + dc[k]
if 0 <= nr < h and 0 <= nc < w:
dz = current_z - elev[nr, nc]
if dz > 0: # ํ•˜๊ฐ•ํ•˜๋Š” ๋ฐฉํ–ฅ๋งŒ
slope = dz / (dist[k] * self.grid.cell_size)
slopes.append(slope ** p)
targets.append((nr, nc))
if not slopes:
continue
# ๊ฒฝ์‚ฌ ๋น„๋ก€ ๋ถ„๋ฐฐ
total_slope = sum(slopes)
for i, (nr, nc) in enumerate(targets):
fraction = slopes[i] / total_slope
discharge[nr, nc] += current_q * fraction
return discharge
def _route_flow_d8_python(self, discharge, flow_dir, elev, underwater, h, w):
"""Legacy Python implementation for fallback"""
flat_indices = np.argsort(elev.ravel())[::-1]
neighbors = [(-1,-1), (-1,0), (-1,1), (0,-1), (0,1), (1,-1), (1,0), (1,1)]
for idx in flat_indices:
r, c = idx // w, idx % w
if underwater[r, c]: continue
min_z = elev[r, c]
target = None
target_k = -1
for k, (dr, dc) in enumerate(neighbors):
nr, nc = r + dr, c + dc
if 0 <= nr < h and 0 <= nc < w:
if elev[nr, nc] < min_z:
min_z = elev[nr, nc]
target = (nr, nc)
target_k = k
if target:
tr, tc = target
discharge[tr, tc] += discharge[r, c]
flow_dir[r, c] = target_k
def calculate_water_depth(self, discharge: np.ndarray, manning_n: float = 0.03) -> np.ndarray:
"""
Manning ๊ณต์‹์„ ์ด์šฉํ•œ ํ•˜์ฒœ ์ˆ˜์‹ฌ ์ถ”์ • (์ •์ƒ ๋“ฑ๋ฅ˜ ๊ฐ€์ •)
Depth = (Q * n / (Width * S^0.5))^(3/5)
* ๊ฒฝ์‚ฌ(S)๊ฐ€ 0์ธ ๊ฒฝ์šฐ ์ตœ์†Œ ๊ฒฝ์‚ฌ ์ ์šฉ
* ํ•˜ํญ(W)์€ ์œ ๋Ÿ‰(Q)์˜ ํ•จ์ˆ˜๋กœ ๊ฐ€์ • (W ~ Q^0.5)
"""
slope, _ = self.grid.get_gradient()
slope = np.maximum(slope, 0.001) # ์ตœ์†Œ ๊ฒฝ์‚ฌ ์„ค์ •
# ํ•˜ํญ ์ถ”์ •: W = 5 * Q^0.5 (๊ฒฝํ—˜์‹)
# Q๊ฐ€ ๋งค์šฐ ์ž‘์œผ๋ฉด W๋„ ์ž‘์•„์ง
width = 5.0 * np.sqrt(discharge)
width = np.maximum(width, 1.0) # ์ตœ์†Œ ํญ 1m
# ์ˆ˜์‹ฌ ๊ณ„์‚ฐ
# Q = V * Area = (1/n * R^(2/3) * S^(1/2)) * (W * D)
# ์ง์‚ฌ๊ฐํ˜• ๋‹จ๋ฉด ๊ฐ€์ • ์‹œ R approx D (๋„“์€ ํ•˜์ฒœ)
# Q = (1/n) * D^(5/3) * W * S^(1/2)
# D = (Q * n / (W * S^0.5)) ^ (3/5)
val = (discharge * manning_n) / (width * np.sqrt(slope))
depth = np.power(val, 0.6)
return depth
def simulate_inundation(self):
"""ํ•ด์ˆ˜๋ฉด ์ƒ์Šน์— ๋”ฐ๋ฅธ ์นจ์ˆ˜ ์‹œ๋ฎฌ๋ ˆ์ด์…˜"""
# ํ•ด์ˆ˜๋ฉด๋ณด๋‹ค ๋‚ฎ์€ ๊ณณ์€ ๋ฐ”๋‹ค๋กœ ๊ฐ„์ฃผํ•˜๊ณ  ์ˆ˜์‹ฌ์„ ์ฑ„์›€
underwater = self.grid.is_underwater()
# ๋ฐ”๋‹ค ์ˆ˜์‹ฌ = ํ•ด์ˆ˜๋ฉด - ์ง€ํ‘œ๋ฉด๊ณ ๋„
sea_depth = np.maximum(0, self.grid.sea_level - self.grid.elevation)
# ๊ธฐ์กด ์ˆ˜์‹ฌ(ํ•˜์ฒœ)๊ณผ ๋ฐ”๋‹ค ์ˆ˜์‹ฌ ์ค‘ ํฐ ๊ฐ’ ์„ ํƒ
# (ํ•˜์ฒœ์ด ๋ฐ”๋‹ค๋กœ ๋“ค์–ด๊ฐ€๋ฉด ๋ฐ”๋‹ค ์ˆ˜์‹ฌ์— ๋ฌปํž˜)
self.grid.water_depth = np.where(underwater, sea_depth, self.grid.water_depth)
def fill_sinks(self, max_iterations: int = 100, tolerance: float = 0.001):
"""
์‹ฑํฌ(์›…๋ฉ์ด) ์ฑ„์šฐ๊ธฐ - ํ˜ธ์ˆ˜ ํ˜•์„ฑ
๋ฌผ์ด ๊ฐ‡ํžˆ๋Š” ๊ณณ์„ ์ฐพ์•„ ์ฑ„์›Œ์„œ ์›”๋ฅ˜(Overflow)๊ฐ€ ๊ฐ€๋Šฅํ•˜๋„๋ก ํ•จ.
๊ฐ„๋‹จํ•œ ๋ฐ˜๋ณต ์Šค๋ฌด๋”ฉ ๋ฐฉ์‹ (Priority-Flood ๊ทผ์‚ฌ)
Args:
max_iterations: ์ตœ๋Œ€ ๋ฐ˜๋ณต ํšŸ์ˆ˜
tolerance: ์ˆ˜๋ ด ํ—ˆ์šฉ ์˜ค์ฐจ
"""
h, w = self.grid.height, self.grid.width
elev = self.grid.elevation.copy()
# ๊ฒฝ๊ณ„๋Š” ๊ณ ์ • (๋ฌผ์ด ๋น ์ ธ๋‚˜๊ฐ)
# ๋‚ด๋ถ€ ์‹ฑํฌ๋งŒ ์ฑ„์›€
dr = [-1, -1, -1, 0, 0, 1, 1, 1]
dc = [-1, 0, 1, -1, 1, -1, 0, 1]
for iteration in range(max_iterations):
changed = False
new_elev = elev.copy()
for r in range(1, h - 1):
for c in range(1, w - 1):
current = elev[r, c]
# ์ด์›ƒ ์ค‘ ์ตœ์†Œ๊ฐ’ ์ฐพ๊ธฐ
min_neighbor = current
for k in range(8):
nr, nc = r + dr[k], c + dc[k]
if 0 <= nr < h and 0 <= nc < w:
min_neighbor = min(min_neighbor, elev[nr, nc])
# ๋ชจ๋“  ์ด์›ƒ๋ณด๋‹ค ๋‚ฎ์œผ๋ฉด (์‹ฑํฌ) โ†’ ์ตœ์†Œ ์ด์›ƒ ๋†’์ด๋กœ ๋งž์ถค
if current < min_neighbor:
# ์‚ด์ง ๋†’์—ฌ์„œ ํ๋ฆ„ ์œ ๋„
new_elev[r, c] = min_neighbor + tolerance
changed = True
elev = new_elev
if not changed:
break
# ์ฑ„์›Œ์ง„ ์–‘ = ์ƒˆ ๊ณ ๋„ - ๊ธฐ์กด ๊ณ ๋„
fill_amount = elev - self.grid.elevation
# ๋ฌผ๋กœ ์ฑ„์›Œ์ง„ ๊ฒƒ์œผ๋กœ ์ฒ˜๋ฆฌ (water_depth ์ฆ๊ฐ€)
self.grid.water_depth += np.maximum(fill_amount, 0)
# bedrock์€ ๊ทธ๋Œ€๋กœ, sediment๋„ ๊ทธ๋Œ€๋กœ (๋ฌผ๋งŒ ์ฑ„์›€)
# ๋˜๋Š” sediment๋กœ ์ฑ„์šธ ์ˆ˜๋„ ์žˆ์Œ (ํ˜ธ์ˆ˜ ํ‡ด์ )
return fill_amount