Geo-Lab / engine /erosion_process.py
HANSOL
Geo-Lab v4.1 - Clean deployment
2afa69c
import numpy as np
from .grid import WorldGrid
class ErosionProcess:
"""
์ง€ํ˜• ๋ณ€๊ฒฝ ์ปค๋„ (Erosion/Deposition Kernel)
๋ฌผ๋ฆฌ ๊ณต์‹์„ ์ ์šฉํ•˜์—ฌ ์ง€ํ˜•(๊ณ ๋„)์„ ๋ณ€๊ฒฝํ•ฉ๋‹ˆ๋‹ค.
1. Stream Power Law: ํ•˜์ฒœ ์นจ์‹ (E = K * A^m * S^n)
2. Hillslope Diffusion: ์‚ฌ๋ฉด ๋ถ•๊ดด/ํ™•์‚ฐ (dz/dt = D * del^2 z)
"""
def __init__(self, grid: WorldGrid, K: float = 1e-4, m: float = 0.5, n: float = 1.0, D: float = 0.01):
self.grid = grid
self.K = K # ์นจ์‹ ๊ณ„์ˆ˜
self.m = m # ์œ ๋Ÿ‰ ์ง€์ˆ˜
self.n = n # ๊ฒฝ์‚ฌ ์ง€์ˆ˜
self.D = D # ํ™•์‚ฐ ๊ณ„์ˆ˜ (์‚ฌ๋ฉด)
def stream_power_erosion(self, discharge: np.ndarray, dt: float = 1.0) -> np.ndarray:
"""Stream Power Law ๊ธฐ๋ฐ˜ ํ•˜์ฒœ ์นจ์‹"""
slope, _ = self.grid.get_gradient()
# E = K * Q^m * S^n
# (์œ ๋Ÿ‰ Q๋ฅผ ์œ ์—ญ๋ฉด์  A ๋Œ€์‹  ์‚ฌ์šฉ)
erosion_rate = self.K * np.power(discharge, self.m) * np.power(slope, self.n)
# ์‹ค์ œ ์นจ์‹๋Ÿ‰ = rate * time
erosion_amount = erosion_rate * dt
# ๊ธฐ๋ฐ˜์•” ์ดํ•˜๋กœ๋Š” ์นจ์‹ ๋ถˆ๊ฐ€ (available sediment first, then bedrock)
# ์—ฌ๊ธฐ์„œ๋Š” ๋‹จ์ˆœํ™”๋ฅผ ์œ„ํ•ด Topography(elevation)์„ ๋ฐ”๋กœ ๊นŽ์Œ.
# ๋‹จ, ํ•ด์ˆ˜๋ฉด ์•„๋ž˜๋Š” ์นจ์‹ ์ž‘์šฉ ๊ฐ์†Œ (๋ฌผ ์†์—์„œ๋Š” Stream Power๊ฐ€ ์•„๋‹˜)
underwater = self.grid.is_underwater()
erosion_amount[underwater] *= 0.1
# ์ง€ํ˜• ์—…๋ฐ์ดํŠธ
self.grid.elevation -= erosion_amount
# ๋‹จ์ˆœํ™”: ๊นŽ์ธ ๋งŒํผ ํ‡ด์ ๋ฌผ๋กœ ๋ณ€ํ™˜๋˜์–ด ์–ด๋”˜๊ฐ€๋กœ ๊ฐ€์•ผ ํ•˜์ง€๋งŒ,
# Stream Power Model(SPL)์€ ๋ณดํ†ต Detachment-limited ๋ชจ๋ธ์ด๋ผ ํ‡ด์ ์„ ๋ช…์‹œ์ ์œผ๋กœ ๋‹ค๋ฃจ์ง€ ์•Š์Œ.
# ํ†ตํ•ฉ ๋ชจ๋ธ์„ ์œ„ํ•ด, ์นจ์‹๋œ ์–‘์„ ํ‡ด์ ๋ฌผ ํ”Œ๋Ÿญ์Šค์— ๋”ํ•ด์ค„ ์ˆ˜ ์žˆ์Œ (๊ตฌํ˜„ ์˜ˆ์ •)
return erosion_amount
def hillslope_diffusion(self, dt: float = 1.0) -> np.ndarray:
"""์‚ฌ๋ฉด ํ™•์‚ฐ ํ”„๋กœ์„ธ์Šค (Linear Diffusion)"""
elev = self.grid.elevation
# Laplacian calculation (์ด์‚ฐํ™”)
# del^2 z = (z_up + z_down + z_left + z_right - 4*z) / dx^2
# Numpy roll์„ ์ด์šฉํ•œ ๋น ๋ฅธ ๊ณ„์‚ฐ
up = np.roll(elev, -1, axis=0)
down = np.roll(elev, 1, axis=0)
left = np.roll(elev, -1, axis=1)
right = np.roll(elev, 1, axis=1)
dx2 = self.grid.cell_size ** 2
laplacian = (up + down + left + right - 4 * elev) / dx2
# ๊ฒฝ๊ณ„ ์กฐ๊ฑด ์ฒ˜๋ฆฌ (๊ฐ€์žฅ์ž๋ฆฌ๋Š” ๊ณ„์‚ฐ ์ œ์™ธ or 0)
laplacian[0, :] = 0
laplacian[-1, :] = 0
laplacian[:, 0] = 0
laplacian[:, -1] = 0
# dz/dt = D * del^2 z
change = self.D * laplacian * dt
self.grid.elevation += change
return change
def overbank_deposition(self, discharge: np.ndarray,
bankfull_capacity: float = 100.0,
decay_rate: float = 0.1,
dt: float = 1.0) -> np.ndarray:
"""
๋ฒ”๋žŒ์› ํ‡ด์  (Overbank Deposition)
ํ•˜์ฒœ ์šฉ๋Ÿ‰ ์ดˆ๊ณผ ์‹œ ๋ฒ”๋žŒํ•˜์—ฌ ์ฃผ๋ณ€์— ์„ธ๋ฆฝ์งˆ ํ‡ด์ .
ํ•˜๋„์—์„œ ๋ฉ€์–ด์งˆ์ˆ˜๋ก ํ‡ด์ ๋Ÿ‰ ๊ฐ์†Œ (์ž์—ฐ์ œ๋ฐฉ ํ˜•์„ฑ).
Args:
discharge: ์œ ๋Ÿ‰ ๋ฐฐ์—ด
bankfull_capacity: ํ•˜๋„ ์šฉ๋Ÿ‰ (์ดˆ๊ณผ ์‹œ ๋ฒ”๋žŒ)
decay_rate: ๊ฑฐ๋ฆฌ์— ๋”ฐ๋ฅธ ํ‡ด์  ๊ฐ์‡ ์œจ
dt: ์‹œ๊ฐ„ ๊ฐ„๊ฒฉ
Returns:
deposition: ํ‡ด์ ๋Ÿ‰ ๋ฐฐ์—ด
"""
from scipy.ndimage import distance_transform_edt
h, w = self.grid.height, self.grid.width
# 1. ๋ฒ”๋žŒ ์ง€์  ์‹๋ณ„ (์šฉ๋Ÿ‰ ์ดˆ๊ณผ)
overflow = np.maximum(0, discharge - bankfull_capacity)
flood_mask = overflow > 0
if not np.any(flood_mask):
return np.zeros((h, w))
# 2. ํ•˜๋„๋กœ๋ถ€ํ„ฐ์˜ ๊ฑฐ๋ฆฌ ๊ณ„์‚ฐ
# flood_mask๊ฐ€ ์žˆ๋Š” ๊ณณ์ด ํ•˜๋„
channel_mask = discharge > bankfull_capacity * 0.5
if not np.any(channel_mask):
return np.zeros((h, w))
# Distance Transform (ํ•˜๋„๋กœ๋ถ€ํ„ฐ์˜ ๊ฑฐ๋ฆฌ)
distance = distance_transform_edt(~channel_mask) * self.grid.cell_size
# 3. ํ‡ด์ ๋Ÿ‰ ๊ณ„์‚ฐ (์ง€์ˆ˜ ๊ฐ์‡ )
# Deposition = overflow * exp(-k * distance)
# ํ•˜๋„ ๊ทผ์ฒ˜(์ž์—ฐ์ œ๋ฐฉ)์— ๋งŽ์ด, ๋ฉ€์ˆ˜๋ก(๋ฐฐํ›„์Šต์ง€) ์ ๊ฒŒ
max_overflow = overflow.max()
if max_overflow <= 0:
return np.zeros((h, w))
normalized_overflow = overflow / max_overflow
# ๋ฒ”๋žŒ ์˜ํ–ฅ ๋ฒ”์œ„ (์ตœ๋Œ€ 50 ์…€)
max_distance = 50 * self.grid.cell_size
influence = np.exp(-decay_rate * distance / self.grid.cell_size)
influence[distance > max_distance] = 0
# ํ‡ด์ ๋Ÿ‰
deposition = normalized_overflow.max() * influence * 0.1 * dt
# ํ•ด์ˆ˜๋ฉด ์•„๋ž˜๋Š” ์ œ์™ธ
underwater = self.grid.is_underwater()
deposition[underwater] = 0
# ํ•˜๋„ ์ž์ฒด๋Š” ์ œ์™ธ (ํ•˜๋„๋Š” ์นจ์‹์ด ์šฐ์„ธ)
deposition[channel_mask] = 0
# 4. ํ‡ด์ ์ธต์— ์ถ”๊ฐ€
self.grid.add_sediment(deposition)
return deposition
def transport_and_deposit(self, discharge: np.ndarray, dt: float = 1.0, Kf: float = 0.01) -> np.ndarray:
"""
ํ‡ด์ ๋ฌผ ์šด๋ฐ˜ ๋ฐ ํ‡ด์  (Sediment Transport & Deposition)
Transport Capacity Law:
Q_cap = Kf * Q^m * S^n
- Q_cap > Q_sed: ์นจ์‹ (Erosion) -> ํ‡ด์ ๋ฌผ ์ฆ๊ฐ€
- Q_cap < Q_sed: ํ‡ด์  (Deposition) -> ํ‡ด์ ๋ฌผ ๊ฐ์†Œ, ์ง€ํ˜• ์ƒ์Šน
Args:
discharge: ์œ ๋Ÿ‰
dt: ์‹œ๊ฐ„ ๊ฐ„๊ฒฉ
Kf: ์šด๋ฐ˜ ํšจ์œจ ๊ณ„์ˆ˜ (Transport Efficiency)
"""
slope, _ = self.grid.get_gradient()
# ๊ฒฝ์‚ฌ๊ฐ€ 0์ด๋ฉด ๋ฌดํ•œ ํ‡ด์  ๋ฐฉ์ง€๋ฅผ ์œ„ํ•ด ์ตœ์†Œ๊ฐ’ ์„ค์ •
slope = np.maximum(slope, 0.001)
# 1. ์šด๋ฐ˜ ๋Šฅ๋ ฅ (Transport Capacity) ๊ณ„์‚ฐ
# ์นจ์‹ ๊ณ„์ˆ˜ K ๋Œ€์‹  ์šด๋ฐ˜ ๊ณ„์ˆ˜ Kf ์‚ฌ์šฉ (์ผ๋ฐ˜์ ์œผ๋กœ K๋ณด๋‹ค ํผ)
capacity = Kf * np.power(discharge, self.m) * np.power(slope, self.n)
# 2. ํ˜„์žฌ ๋ถ€์œ ์‚ฌ(Suspended Sediment) ๊ฐ€์ •
# ์ƒ๋ฅ˜์—์„œ ๋“ค์–ด์˜ค๋Š” ์œ ์‚ฌ๋Ÿ‰์€ ์ด์ „ ๋‹จ๊ณ„์˜ ์นจ์‹๋Ÿ‰์ด๋‚˜ ๊ธฐ์œ ์ž…๋Ÿ‰์— ์˜์กด
# ์—ฌ๊ธฐ์„œ๋Š” ๋‹จ์ˆœํ™”๋ฅผ ์œ„ํ•ด 'Local Equilibrium'์„ ๊ฐ€์ •ํ•˜์ง€ ์•Š๊ณ ,
# ์œ ๋Ÿ‰์— ๋น„๋ก€ํ•˜๋Š” ์ดˆ๊ธฐ ์œ ์‚ฌ๋Ÿ‰์„ ๊ฐ€์ •ํ•˜๊ฑฐ๋‚˜,
# ์ด์ „ ์Šคํ…์˜ ์นจ์‹ ๊ฒฐ๊ณผ๋ฅผ ์ด์šฉํ•ด์•ผ ํ•จ.
# ํ†ตํ•ฉ ๋ชจ๋ธ์„ ์œ„ํ•ด: "Erosion" ํ•จ์ˆ˜๊ฐ€ ๊นŽ์•„๋‚ธ ํ™์„ ๋ฐ˜ํ™˜ํ•˜๋„๋ก ํ•˜๊ณ ,
# ์ด๋ฅผ capacity์™€ ๋น„๊ตํ•˜์—ฌ ์žฌํ‡ด์ ์‹œํ‚ค๊ฑฐ๋‚˜ ํ•˜๋ฅ˜๋กœ ๋ณด๋ƒ„.
# ํ•˜์ง€๋งŒ D8 ์•Œ๊ณ ๋ฆฌ์ฆ˜ ์ƒ ํ•˜๋ฅ˜๋กœ์˜ '์ „๋‹ฌ(Routing)'์ด ํ•„์š”ํ•จ.
# ์—ฌ๊ธฐ์„œ๋Š” Simplified Landform Evolution Model (SLEM) ๋ฐฉ์‹ ์ ์šฉ:
# dZs/dt = U - E + D
# D = (Q_cap - Q_sed) / Length_scale (if Q_sed > Q_cap)
# E = Stream Power (detachment limited)
# Delta ์‹œ๋ฎฌ๋ ˆ์ด์…˜์„ ์œ„ํ•œ ์ ‘๊ทผ:
# ํ‡ด์ ๋ฌผ ํ”Œ๋Ÿญ์Šค(Flux)๋ฅผ ํ•˜๋ฅ˜๋กœ ๋ฐ€์–ด๋‚ด๋Š” ๋กœ์ง ์ถ”๊ฐ€.
h, w = self.grid.height, self.grid.width
sediment_flux = np.zeros((h, w))
# ์œ ๋Ÿ‰ ์ˆœ์„œ๋Œ€๋กœ ์ฒ˜๋ฆฌ (Upstream -> Downstream)
# discharge๊ฐ€ ๋‚ฎ์€ ๊ณณ(์ƒ๋ฅ˜)์—์„œ ๋†’์€ ๊ณณ(ํ•˜๋ฅ˜)์œผ๋กœ?
# D8 ํ๋ฆ„ ๋ฐฉํ–ฅ์„ ๋‹ค์‹œ ์ถ”์ ํ•ด์•ผ ์ •ํ™•ํ•จ.
# ์—ฌ๊ธฐ์„œ๋Š” ๊ฐ„๋‹จํžˆ 'Capacity ์ดˆ๊ณผ๋ถ„ ํ‡ด์ '๋งŒ ๊ตฌํ˜„ํ•˜๊ณ ,
# Flux Routing์€ HydroKernel๊ณผ ์—ฐ๋™๋˜์–ด์•ผ ํ•จ.
# ์ž„์‹œ: Capacity based deposition only (Local)
# ํ‡ด์ ๋Ÿ‰ = (ํ˜„์žฌ ์œ ์‚ฌ๋Ÿ‰ - ์šฉ๋Ÿ‰) * ๋น„์œจ
# ํ˜„์žฌ ์œ ์‚ฌ๋Ÿ‰์ด ์—†์œผ๋ฏ€๋กœ, ์นจ์‹๋œ ํ™(Stream Power ๊ฒฐ๊ณผ)์ด
# ํ•ด๋‹น ์…€์˜ Capacity๋ฅผ ๋„˜์œผ๋ฉด ์ฆ‰์‹œ ํ‡ด์ ๋œ๋‹ค๊ณ  ๊ฐ€์ •.
pass
# TODO: Flux Routing ๊ตฌํ˜„ ํ•„์š”. ํ˜„์žฌ ๊ตฌ์กฐ์—์„œ๋Š” ์–ด๋ ค์›€.
# ErosionProcess๋ฅผ ์ˆ˜์ •ํ•˜์—ฌ 'simulate_transport' ๋ฉ”์„œ๋“œ๋กœ ํ†ตํ•ฉ.
return capacity
def simulate_transport(self, discharge: np.ndarray, dt: float = 1.0,
sediment_influx_map: np.ndarray = None) -> np.ndarray:
"""
ํ†ตํ•ฉ ํ‡ด์ ๋ฌผ ์ด์†ก ์‹œ๋ฎฌ๋ ˆ์ด์…˜ (Flux-based)
1. ์ƒ๋ฅ˜์—์„œ ํ‡ด์ ๋ฌผ ์œ ์ž… (Flux In)
2. ๋กœ์ปฌ ์นจ์‹/ํ‡ด์  (Erosion/Deposition)
3. ํ•˜๋ฅ˜๋กœ ๋ฐฐ์ถœ (Flux Out)
"""
h, w = self.grid.height, self.grid.width
elev = self.grid.elevation
# 1. ์ •๋ ฌ (๋†’์€ ๊ณณ -> ๋‚ฎ์€ ๊ณณ)
indices = np.argsort(elev.ravel())[::-1]
# ํ‡ด์ ๋ฌผ ํ”Œ๋Ÿญ์Šค ์ดˆ๊ธฐํ™” (์œ ์ž…์› ๋ฐ˜์˜)
flux = np.zeros((h, w))
if sediment_influx_map is not None:
flux += sediment_influx_map
slope, _ = self.grid.get_gradient()
slope = np.maximum(slope, 0.001)
change = np.zeros((h, w))
# D8 Neighbors (Lookup for flow_dir)
d8_dr = [-1, -1, -1, 0, 0, 1, 1, 1]
d8_dc = [-1, 0, 1, -1, 1, -1, 0, 1]
# Check if flow_dir is available
use_flow_dir = (self.grid.flow_dir is not None)
for idx in indices:
r, c = idx // w, idx % w
# ํ•ด์ˆ˜๋ฉด ์•„๋ž˜ ๊นŠ์€ ๊ณณ์€ ํ‡ด์  ์œ„์ฃผ
underwater = self.grid.is_underwater()[r, c]
# A. ์šด๋ฐ˜ ๋Šฅ๋ ฅ (Capacity)
# ๋ฌผ ์†์—์„œ๋Š” ์œ ์†์ด ๊ธ‰๊ฐํ•œ๋‹ค๊ณ  ๊ฐ€์ • -> Capacity ๊ฐ์†Œ
# eff_slope calculation fix: slope can be very small on flat land
eff_slope = slope[r, c] if not underwater else slope[r, c] * 0.01
# Kf (Transportation efficiency) should be high enough
# Use 'self.K * 100' or similar
qs_cap = self.K * 500 * np.power(discharge[r, c], self.m) * np.power(eff_slope, self.n)
# B. ํ˜„์žฌ ํ”Œ๋Ÿญ์Šค (์ƒ๋ฅ˜์—์„œ ๋“ค์–ด์˜จ ๊ฒƒ + ๋กœ์ปฌ ์นจ์‹ ์ž ์žฌ๋Ÿ‰)
qs_in = flux[r, c]
# C. ์นจ์‹ vs ํ‡ด์  ๊ฒฐ์ •
# ๊ธฐ๊ณ„์  ์นจ์‹ (Stream Power)
potential_erosion = self.K * np.power(discharge[r, c], self.m) * np.power(slope[r, c], self.n) * dt
# ๋งŒ์•ฝ ๋“ค์–ด์˜จ ํ™(qs_in)์ด ์šฉ๋Ÿ‰(qs_cap)๋ณด๋‹ค ๋งŽ์œผ๋ฉด -> ํ‡ด์ 
if qs_in > qs_cap:
# ํ‡ด์ ๋Ÿ‰ = ์ดˆ๊ณผ๋ถ„ * 1.0 (์ผ๋‹จ 100% ํ‡ด์  ๊ฐ€์ •ํ•˜์—ฌ ํšจ๊ณผ ํ™•์ธ)
deposition_amount = (qs_in - qs_cap) * 1.0
change[r, c] += deposition_amount
qs_out = qs_cap # ๋‚˜๋จธ์ง€๋Š” ํ•˜๋ฅ˜๋กœ? ์•„๋‹ˆ, ํ‡ด์  ํ›„ ๋‚จ์€๊ฑด qs_cap์ž„ (Transport-limited)
else:
# ์šฉ๋Ÿ‰์ด ๋‚จ์œผ๋ฉด -> ์นจ์‹ํ•˜์—ฌ ํ™์„ ๋” ์‹ฃ๊ณ ๊ฐ
# ์‹ค์ œ ์นจ์‹ = ์ž ์žฌ ์นจ์‹ (๊ธฐ๋ฐ˜์•”๋„ ์นจ์‹ ๊ฐ€๋Šฅ)
erosion_amount = potential_erosion
change[r, c] -= erosion_amount
qs_out = qs_in + erosion_amount
# D. ํ•˜๋ฅ˜๋กœ ์ „๋‹ฌ (Qs Out Routing)
# Use pre-calculated flow direction if available
target_r, target_c = -1, -1
if use_flow_dir:
k = self.grid.flow_dir[r, c]
# k could be default 0 even if no flow?
# Usually sink nodes have special value (e.g. -1 or point to self)
# But here we initialized to 0.
# Need to check constraints.
# If discharge[r,c] > 0, flow_dir should be valid.
if discharge[r, c] > 0:
nr = r + d8_dr[k]
nc = c + d8_dc[k]
if 0 <= nr < h and 0 <= nc < w:
target_r, target_c = nr, nc
else:
# Fallback: Local Seek (Slow)
min_z = elev[r, c]
for k in range(8):
nr = r + d8_dr[k]
nc = c + d8_dc[k]
if 0 <= nr < h and 0 <= nc < w:
if elev[nr, nc] < min_z:
min_z = elev[nr, nc]
target_r, target_c = nr, nc
if target_r != -1:
flux[target_r, target_c] += qs_out
else:
# ๊ฐ‡ํžŒ ๊ณณ(Sink) -> ๊ทธ ์ž๋ฆฌ์— ํ‡ด์ 
# ์นจ์‹์ด ๋ฐœ์ƒํ–ˆ๋‹ค๋ฉด ๋˜๋Œ๋ ค๋†“๊ณ  ํ‡ด์ 
change[r, c] += qs_out
# ์ง€ํ˜• ์—…๋ฐ์ดํŠธ
# ์นจ์‹์€ elevation ๊ฐ์†Œ, ํ‡ด์ ์€ sediment ์ฆ๊ฐ€์ด์ง€๋งŒ
# ์—ฌ๊ธฐ์„œ๋Š” ํ†ตํ•ฉํ•˜์—ฌ elevation/sediment ์กฐ์ •
# ํ‡ด์ ๋ถ„: sediment ์ธต์— ์ถ”๊ฐ€
self.grid.add_sediment(np.maximum(change, 0))
# ์นจ์‹๋ถ„: elevation ๊ฐ์†Œ (grid.add_sediment๊ฐ€ ์Œ์ˆ˜๋„ ์ฒ˜๋ฆฌํ•˜๋‚˜? ์•„๋‹˜)
# ์นจ์‹์€ bedrock์ด๋‚˜ sediment๋ฅผ ๊นŽ์•„์•ผ ํ•จ.
# erosion_process.py์˜ ์—ญํ• ์ƒ ์ง์ ‘ grid ์ˆ˜์ •์„ ํ•ด๋„ ๋จ.
erosion_mask = change < 0
loss = -change[erosion_mask]
# ํ‡ด์ ์ธต ๋จผ์ € ๊นŽ๊ณ  ๊ธฐ๋ฐ˜์•” ๊นŽ๊ธฐ
sed_thickness = self.grid.sediment[erosion_mask]
sed_loss = np.minimum(loss, sed_thickness)
rock_loss = loss - sed_loss
self.grid.sediment[erosion_mask] -= sed_loss
self.grid.bedrock[erosion_mask] -= rock_loss
self.grid.update_elevation()
return change