Geo-Lab / engine /mass_movement.py
HANSOL
Geo-Lab v4.1 - Clean deployment
2afa69c
"""
Mass Movement Kernel (๋งค์Šค๋ฌด๋ธŒ๋จผํŠธ)
์ค‘๋ ฅ์— ์˜ํ•œ ์‚ฌ๋ฉด ์ด๋™ ํ”„๋กœ์„ธ์Šค
- ์‚ฐ์‚ฌํƒœ (Landslide)
- ์Šฌ๋Ÿผํ”„ (Slump)
- ๋‚™์„ (Rockfall)
ํ•ต์‹ฌ ์›๋ฆฌ:
๊ฒฝ์‚ฌ(Slope) > ์ž„๊ณ„ ๊ฒฝ์‚ฌ(Critical Angle) โ†’ ๋ฌผ์งˆ ์ด๋™
"""
import numpy as np
from .grid import WorldGrid
class MassMovementKernel:
"""
๋งค์Šค๋ฌด๋ธŒ๋จผํŠธ ์ปค๋„
๊ฒฝ์‚ฌ ์•ˆ์ •์„ฑ์„ ๊ฒ€์‚ฌํ•˜๊ณ , ๋ถˆ์•ˆ์ •ํ•œ ๊ณณ์—์„œ ๋ฌผ์งˆ ์ด๋™์„ ์‹œ๋ฎฌ๋ ˆ์ด์…˜.
"""
def __init__(self, grid: WorldGrid,
friction_angle: float = 35.0, # ๋‚ด๋ถ€ ๋งˆ์ฐฐ๊ฐ (๋„)
cohesion: float = 0.0): # ์ ์ฐฉ๋ ฅ (Pa, ๊ฐ„์†Œํ™”)
self.grid = grid
self.friction_angle = friction_angle
self.cohesion = cohesion
# ์ž„๊ณ„ ๊ฒฝ์‚ฌ (ํƒ„์  ํŠธ ๊ฐ’)
self.critical_slope = np.tan(np.radians(friction_angle))
def check_stability(self) -> np.ndarray:
"""
๊ฒฝ์‚ฌ ์•ˆ์ •์„ฑ ๊ฒ€์‚ฌ
Returns:
unstable_mask: ๋ถˆ์•ˆ์ •ํ•œ ์…€ ๋งˆ์Šคํฌ (True = ๋ถˆ์•ˆ์ •)
"""
slope, _ = self.grid.get_gradient()
# ๊ฒฝ์‚ฌ > ์ž„๊ณ„ ๊ฒฝ์‚ฌ โ†’ ๋ถˆ์•ˆ์ •
unstable = slope > self.critical_slope
return unstable
def trigger_landslide(self, unstable_mask: np.ndarray,
efficiency: float = 0.5) -> np.ndarray:
"""
์‚ฐ์‚ฌํƒœ ๋ฐœ์ƒ
๋ถˆ์•ˆ์ •ํ•œ ์…€์—์„œ ๋ฌผ์งˆ์„ ๋‚ฎ์€ ๊ณณ์œผ๋กœ ์ด๋™.
Args:
unstable_mask: ๋ถˆ์•ˆ์ • ๋งˆ์Šคํฌ
efficiency: ์ด๋™ ํšจ์œจ (0.0~1.0, 1.0์ด๋ฉด ์™„์ „ ์ด๋™)
Returns:
change: ์ง€ํ˜• ๋ณ€ํ™”๋Ÿ‰
"""
h, w = self.grid.height, self.grid.width
change = np.zeros((h, w), dtype=np.float64)
if not np.any(unstable_mask):
return change
# D8 ๋ฐฉํ–ฅ
dr = np.array([-1, -1, -1, 0, 0, 1, 1, 1])
dc = np.array([-1, 0, 1, -1, 1, -1, 0, 1])
elev = self.grid.elevation
slope, _ = self.grid.get_gradient()
# ๋ถˆ์•ˆ์ • ์…€ ์ขŒํ‘œ
unstable_coords = np.argwhere(unstable_mask)
for r, c in unstable_coords:
# ํ˜„์žฌ ๊ฒฝ์‚ฌ ์ดˆ๊ณผ๋ถ„ ๊ณ„์‚ฐ
excess_slope = slope[r, c] - self.critical_slope
if excess_slope <= 0:
continue
# ์ด๋™๋Ÿ‰ = ์ดˆ๊ณผ ๊ฒฝ์‚ฌ์— ๋น„๋ก€
# ํ‡ด์ ์ธต ๋จผ์ € ์ด๋™, ๋ถ€์กฑํ•˜๋ฉด ๊ธฐ๋ฐ˜์•”
available = self.grid.sediment[r, c] + self.grid.bedrock[r, c] * 0.1
move_amount = min(excess_slope * efficiency * 5.0, available)
if move_amount <= 0:
continue
# ๊ฐ€์žฅ ๋‚ฎ์€ ์ด์›ƒ ์ฐพ๊ธฐ
min_z = elev[r, c]
target = None
for k in range(8):
nr, nc = r + dr[k], c + dc[k]
if 0 <= nr < h and 0 <= nc < w:
if elev[nr, nc] < min_z:
min_z = elev[nr, nc]
target = (nr, nc)
if target is None:
continue
tr, tc = target
# ๋ฌผ์งˆ ์ด๋™
change[r, c] -= move_amount
change[tr, tc] += move_amount * 0.9 # ์ผ๋ถ€ ์†์‹ค (๋ถ„์‚ฐ)
# ์ง€ํ˜• ์—…๋ฐ์ดํŠธ
# ์†์‹ค๋ถ„: sediment์—์„œ ์ œ๊ฑฐ
loss_mask = change < 0
loss = -change[loss_mask]
sed_loss = np.minimum(loss, self.grid.sediment[loss_mask])
rock_loss = loss - sed_loss
self.grid.sediment[loss_mask] -= sed_loss
self.grid.bedrock[loss_mask] -= rock_loss
# ํ‡ด์ ๋ถ„: sediment์— ์ถ”๊ฐ€
self.grid.sediment += np.maximum(change, 0)
self.grid.update_elevation()
return change
def step(self, dt: float = 1.0) -> np.ndarray:
"""
1๋‹จ๊ณ„ ๋งค์Šค๋ฌด๋ธŒ๋จผํŠธ ์‹คํ–‰
Args:
dt: ์‹œ๊ฐ„ ๊ฐ„๊ฒฉ (์‚ฌ์šฉํ•˜์ง€ ์•Š์ง€๋งŒ ์ธํ„ฐํŽ˜์ด์Šค ์ผ๊ด€์„ฑ)
Returns:
change: ์ง€ํ˜• ๋ณ€ํ™”๋Ÿ‰
"""
# 1. ์•ˆ์ •์„ฑ ๊ฒ€์‚ฌ
unstable = self.check_stability()
# 2. ๋ถˆ์•ˆ์ • ์ง€์ ์—์„œ ์‚ฐ์‚ฌํƒœ ๋ฐœ์ƒ
change = self.trigger_landslide(unstable)
return change