Geo-Lab / engine /meander_physics.py
HANSOL
Geo-Lab v4.1 - Clean deployment
2afa69c
"""
Geo-Lab AI: ๊ณก๋ฅ˜ ํ•˜์ฒœ ๋ฌผ๋ฆฌ ์‹œ๋ฎฌ๋ ˆ์ด์…˜
Helical Flow ๊ธฐ๋ฐ˜ ์ธก๋ฐฉ ์นจ์‹/ํ‡ด์ 
"""
import numpy as np
from dataclasses import dataclass, field
from typing import List, Tuple
@dataclass
class MeanderChannel:
"""๊ณก๋ฅ˜ ํ•˜์ฒœ ์ฑ„๋„ ํ‘œํ˜„
1D ์ค‘์‹ฌ์„  ๊ธฐ๋ฐ˜์œผ๋กœ ์ฑ„๋„์„ ํ‘œํ˜„ํ•˜๊ณ ,
๊ฐ ์ง€์ ์—์„œ์˜ ๊ณก๋ฅ , ํญ, ๊นŠ์ด๋ฅผ ์ถ”์ 
"""
# ์ฑ„๋„ ์ค‘์‹ฌ์„  ์ขŒํ‘œ
x: np.ndarray = field(default=None)
y: np.ndarray = field(default=None)
# ๊ฐ ์ง€์ ์˜ ์†์„ฑ
width: np.ndarray = field(default=None) # ์ฑ„๋„ ํญ (m)
depth: np.ndarray = field(default=None) # ์ฑ„๋„ ๊นŠ์ด (m)
# ์œ ๋Ÿ‰ ๋ฐ ์œ ์†
discharge: float = 100.0 # mยณ/s
velocity: np.ndarray = field(default=None) # m/s
def __post_init__(self):
if self.x is not None and self.width is None:
n = len(self.x)
self.width = np.full(n, 20.0)
self.depth = np.full(n, 3.0)
self.velocity = np.full(n, 1.5)
@classmethod
def create_initial(cls, length: float = 1000.0,
initial_sinuosity: float = 1.2,
n_points: int = 200,
discharge: float = 100.0):
"""์ดˆ๊ธฐ ๊ณก๋ฅ˜ ํ•˜์ฒœ ์ƒ์„ฑ"""
s = np.linspace(0, 1, n_points) # ์ •๊ทœํ™”๋œ ๊ฑฐ๋ฆฌ
# ์‚ฌ์ธํŒŒ ๊ธฐ๋ฐ˜ ์ดˆ๊ธฐ ๊ณก๋ฅ˜
amplitude = length * 0.1 * (initial_sinuosity - 1)
frequency = 3 # ๊ตฝ์ด ์ˆ˜
x = s * length
y = amplitude * np.sin(2 * np.pi * frequency * s)
return cls(x=x, y=y, discharge=discharge)
def calculate_curvature(self) -> np.ndarray:
"""๊ณก๋ฅ  ๊ณ„์‚ฐ (1/m)
ฮบ = (x'y'' - y'x'') / (x'^2 + y'^2)^(3/2)
"""
dx = np.gradient(self.x)
dy = np.gradient(self.y)
ddx = np.gradient(dx)
ddy = np.gradient(dy)
denominator = np.power(dx**2 + dy**2, 1.5) + 1e-10
curvature = (dx * ddy - dy * ddx) / denominator
return curvature
def calculate_sinuosity(self) -> float:
"""๊ตด๊ณก๋„ = ํ•˜์ฒœ ๊ธธ์ด / ์ง์„  ๊ฑฐ๋ฆฌ"""
# ๊ฒฝ๋กœ ๊ธธ์ด
ds = np.sqrt(np.diff(self.x)**2 + np.diff(self.y)**2)
path_length = np.sum(ds)
# ์ง์„  ๊ฑฐ๋ฆฌ
straight_length = np.sqrt(
(self.x[-1] - self.x[0])**2 +
(self.y[-1] - self.y[0])**2
) + 1e-10
return path_length / straight_length
class HelicalFlowErosion:
"""Helical Flow ๊ธฐ๋ฐ˜ ๊ณก๋ฅ˜ ์นจ์‹/ํ‡ด์ 
๊ณก๋ฅ˜์—์„œ ๋ฌผ์€ ๋‚˜์„ ํ˜•(helical)์œผ๋กœ ํ๋ฆ„:
- ํ‘œ๋ฉด: ๋ฐ”๊นฅ์ชฝ์œผ๋กœ (์›์‹ฌ๋ ฅ)
- ๋ฐ”๋‹ฅ: ์•ˆ์ชฝ์œผ๋กœ (์••๋ ฅ ๊ตฌ๋ฐฐ)
๊ฒฐ๊ณผ:
- ๋ฐ”๊นฅ์ชฝ (๊ณต๊ฒฉ์‚ฌ๋ฉด): ์นจ์‹
- ์•ˆ์ชฝ (ํ‡ด์ ์‚ฌ๋ฉด): ํ‡ด์ 
"""
def __init__(self,
bank_erosion_rate: float = 0.5, # m/yr per unit stress
deposition_rate: float = 0.3):
self.bank_erosion_rate = bank_erosion_rate
self.deposition_rate = deposition_rate
def calculate_bank_migration(self, channel: MeanderChannel,
dt: float = 1.0) -> Tuple[np.ndarray, np.ndarray]:
"""ํ•˜์•ˆ ์ด๋™ ๊ณ„์‚ฐ
๊ณก๋ฅ ์ด ํฐ ๊ณณ์—์„œ ๋ฐ”๊นฅ์ชฝ์œผ๋กœ ์นจ์‹ โ†’ ์ฑ„๋„ ์ด๋™
"""
curvature = channel.calculate_curvature()
# ์นจ์‹/ํ‡ด์  ๋น„๋Œ€์นญ
# ๊ณก๋ฅ  > 0: ์ขŒ์ธก์ด ๋ฐ”๊นฅ (์นจ์‹)
# ๊ณก๋ฅ  < 0: ์šฐ์ธก์ด ๋ฐ”๊นฅ (์นจ์‹)
# ์ด๋™ ๋ฒกํ„ฐ (์ฑ„๋„์— ์ˆ˜์ง)
dx = np.gradient(channel.x)
dy = np.gradient(channel.y)
path_length = np.sqrt(dx**2 + dy**2) + 1e-10
# ์ˆ˜์ง ๋ฐฉํ–ฅ (์™ผ์ชฝ์œผ๋กœ 90๋„ ํšŒ์ „)
normal_x = -dy / path_length
normal_y = dx / path_length
# ์ด๋™๋Ÿ‰ = ๊ณก๋ฅ  ร— ์œ ๋Ÿ‰ ร— erosion rate
migration_rate = curvature * channel.discharge * self.bank_erosion_rate * dt
# ์ด๋™๋Ÿ‰ ์ œํ•œ
migration_rate = np.clip(migration_rate, -2.0, 2.0)
delta_x = migration_rate * normal_x
delta_y = migration_rate * normal_y
return delta_x, delta_y
def check_cutoff(self, channel: MeanderChannel,
threshold_distance: float = 30.0) -> List[Tuple[int, int]]:
"""์šฐ๊ฐํ˜ธ ํ˜•์„ฑ ์กฐ๊ฑด ์ฒดํฌ (์œ ๋กœ ์ ˆ๋‹จ)"""
n = len(channel.x)
cutoffs = []
# ๊ฐ€๊นŒ์šด ๋‘ ์  ์ฐพ๊ธฐ (๊ฒฝ๋กœ์ƒ ๋ฉ€๋ฆฌ ๋–จ์–ด์กŒ์ง€๋งŒ ๊ณต๊ฐ„์ ์œผ๋กœ ๊ฐ€๊นŒ์šด)
for i in range(n):
for j in range(i + 30, n): # ์ตœ์†Œ 30์  ๊ฐ„๊ฒฉ
dist = np.sqrt(
(channel.x[i] - channel.x[j])**2 +
(channel.y[i] - channel.y[j])**2
)
if dist < threshold_distance:
cutoffs.append((i, j))
break # ์ฒซ ๋ฒˆ์งธ cutoff๋งŒ
return cutoffs
class MeanderSimulation:
"""๊ณก๋ฅ˜ ํ•˜์ฒœ ์‹œ๋ฎฌ๋ ˆ์ด์…˜"""
def __init__(self, length: float = 1000.0, initial_sinuosity: float = 1.3):
self.channel = MeanderChannel.create_initial(
length=length,
initial_sinuosity=initial_sinuosity
)
self.erosion = HelicalFlowErosion()
self.history: List[Tuple[np.ndarray, np.ndarray]] = []
self.oxbow_lakes: List[Tuple[np.ndarray, np.ndarray]] = []
self.time = 0.0
def step(self, dt: float = 1.0):
"""1 ํƒ€์ž„์Šคํ…"""
# 1. ํ•˜์•ˆ ์ด๋™
dx, dy = self.erosion.calculate_bank_migration(self.channel, dt)
self.channel.x += dx
self.channel.y += dy
# 2. ์šฐ๊ฐํ˜ธ ์ฒดํฌ
cutoffs = self.erosion.check_cutoff(self.channel)
for start, end in cutoffs:
# ์šฐ๊ฐํ˜ธ ์ €์žฅ
ox = self.channel.x[start:end+1].copy()
oy = self.channel.y[start:end+1].copy()
self.oxbow_lakes.append((ox, oy))
# ์ฑ„๋„ ๋‹จ์ถ•
self.channel.x = np.concatenate([
self.channel.x[:start+1],
self.channel.x[end:]
])
self.channel.y = np.concatenate([
self.channel.y[:start+1],
self.channel.y[end:]
])
# ์†์„ฑ ๋ฐฐ์—ด๋„ ์กฐ์ •
n_new = len(self.channel.x)
self.channel.width = np.full(n_new, 20.0)
self.channel.depth = np.full(n_new, 3.0)
self.channel.velocity = np.full(n_new, 1.5)
self.time += dt
def run(self, total_time: float, save_interval: float = 100.0, dt: float = 1.0):
"""์‹œ๋ฎฌ๋ ˆ์ด์…˜ ์‹คํ–‰"""
steps = int(total_time / dt)
save_every = max(1, int(save_interval / dt))
self.history = [(self.channel.x.copy(), self.channel.y.copy())]
for i in range(steps):
self.step(dt)
if (i + 1) % save_every == 0:
self.history.append((self.channel.x.copy(), self.channel.y.copy()))
return self.history
def get_cross_section(self, position: float = 0.5) -> Tuple[np.ndarray, np.ndarray]:
"""๊ณก๋ฅ˜ ๋‹จ๋ฉด (๋น„๋Œ€์นญ)
position: ๊ณก๋ฅ˜ ๋‚ด ์œ„์น˜ (0=์ง์„ ๋ถ€, 1=์ตœ๋Œ€ ๊ตฝ์ด)
"""
curvature = self.channel.calculate_curvature()
max_curve = np.abs(curvature).max() + 1e-10
asymmetry = np.abs(curvature[int(len(curvature) * 0.5)]) / max_curve
asymmetry = min(1.0, asymmetry * position * 2)
# ๋‹จ๋ฉด ์ƒ์„ฑ
x = np.linspace(-30, 30, 100)
# ๋น„๋Œ€์นญ ๊ณก์„ 
left_depth = 5 + asymmetry * 3 # ๊ณต๊ฒฉ์‚ฌ๋ฉด (๊นŠ์Œ)
right_depth = 3 - asymmetry * 1 # ํ‡ด์ ์‚ฌ๋ฉด (์–•์Œ)
y = np.where(
x < 0,
-left_depth * (1 - np.power(x / -30, 2)),
-right_depth * (1 - np.power(x / 30, 2))
)
return x, y
# ํ”„๋ฆฌ์ปดํ“จํŒ…
def precompute_meander(max_time: int = 10000,
initial_sinuosity: float = 1.3,
save_every: int = 100) -> dict:
"""๊ณก๋ฅ˜ ์‹œ๋ฎฌ๋ ˆ์ด์…˜ ํ”„๋ฆฌ์ปดํ“จํŒ…"""
sim = MeanderSimulation(initial_sinuosity=initial_sinuosity)
history = sim.run(
total_time=max_time,
save_interval=save_every,
dt=1.0
)
return {
'history': history,
'oxbow_lakes': sim.oxbow_lakes,
'final_sinuosity': sim.channel.calculate_sinuosity()
}
if __name__ == "__main__":
print("๊ณก๋ฅ˜ ํ•˜์ฒœ ๋ฌผ๋ฆฌ ์‹œ๋ฎฌ๋ ˆ์ด์…˜ ํ…Œ์ŠคํŠธ")
print("=" * 50)
sim = MeanderSimulation(initial_sinuosity=1.3)
print(f"์ดˆ๊ธฐ ๊ตด๊ณก๋„: {sim.channel.calculate_sinuosity():.2f}")
sim.run(5000, save_interval=1000)
print(f"5000๋…„ ํ›„ ๊ตด๊ณก๋„: {sim.channel.calculate_sinuosity():.2f}")
print(f"ํ˜•์„ฑ๋œ ์šฐ๊ฐํ˜ธ: {len(sim.oxbow_lakes)}๊ฐœ")
print("=" * 50)
print("ํ…Œ์ŠคํŠธ ์™„๋ฃŒ!")