Geo-Lab / engine /physics_engine.py
HANSOL
Geo-Lab v4.1 - Clean deployment
2afa69c
"""
Geo-Lab AI: ์ง„์งœ ๋ฌผ๋ฆฌ ์‹œ๋ฎฌ๋ ˆ์ด์…˜ ์—”์ง„
Stream Power Law ๊ธฐ๋ฐ˜ ์‹ค์ œ ์นจ์‹ ์‹œ๋ฎฌ๋ ˆ์ด์…˜
"""
import numpy as np
from dataclasses import dataclass, field
from typing import List, Tuple, Optional
from scipy.ndimage import gaussian_filter, uniform_filter
@dataclass
class TerrainGrid:
"""2D ์ง€ํ˜• ๊ทธ๋ฆฌ๋“œ"""
width: int = 100
height: int = 100
cell_size: float = 10.0 # ๋ฏธํ„ฐ
elevation: np.ndarray = field(default=None)
bedrock: np.ndarray = field(default=None) # ๊ธฐ๋ฐ˜์•” (์นจ์‹ ๋ถˆ๊ฐ€ ๋ ˆ๋ฒจ)
rock_hardness: np.ndarray = field(default=None) # 0-1
def __post_init__(self):
if self.elevation is None:
self.elevation = np.zeros((self.height, self.width))
if self.bedrock is None:
self.bedrock = np.full((self.height, self.width), -100.0)
if self.rock_hardness is None:
self.rock_hardness = np.full((self.height, self.width), 0.5)
def get_slope(self) -> np.ndarray:
"""๊ฒฝ์‚ฌ๋„ ๊ณ„์‚ฐ (m/m)"""
dy, dx = np.gradient(self.elevation, self.cell_size)
return np.sqrt(dx**2 + dy**2)
def get_slope_direction(self) -> Tuple[np.ndarray, np.ndarray]:
"""์ตœ๋Œ€ ๊ฒฝ์‚ฌ ๋ฐฉํ–ฅ (๋‹จ์œ„ ๋ฒกํ„ฐ)"""
dy, dx = np.gradient(self.elevation, self.cell_size)
magnitude = np.sqrt(dx**2 + dy**2) + 1e-10
return -dx / magnitude, -dy / magnitude
@dataclass
class WaterFlow:
"""์ˆ˜๋ฌธ ์‹œ๋ฎฌ๋ ˆ์ด์…˜"""
terrain: TerrainGrid
# ์œ ๋Ÿ‰ (mยณ/s per cell)
discharge: np.ndarray = field(default=None)
# ์œ ์† (m/s)
velocity: np.ndarray = field(default=None)
# ์ˆ˜์‹ฌ (m)
depth: np.ndarray = field(default=None)
# ์ „๋‹จ์‘๋ ฅ (Pa)
shear_stress: np.ndarray = field(default=None)
manning_n: float = 0.03 # Manning ์กฐ๋„๊ณ„์ˆ˜
def __post_init__(self):
shape = (self.terrain.height, self.terrain.width)
if self.discharge is None:
self.discharge = np.zeros(shape)
if self.velocity is None:
self.velocity = np.zeros(shape)
if self.depth is None:
self.depth = np.zeros(shape)
if self.shear_stress is None:
self.shear_stress = np.zeros(shape)
def flow_accumulation_d8(self, precipitation: float = 0.001):
"""D8 ์•Œ๊ณ ๋ฆฌ์ฆ˜ ๊ธฐ๋ฐ˜ ์œ ๋Ÿ‰ ๋ˆ„์ """
h, w = self.terrain.height, self.terrain.width
elev = self.terrain.elevation
# ์ดˆ๊ธฐ ๊ฐ•์ˆ˜
acc = np.full((h, w), precipitation)
# ๋†’์€ ๊ณณ์—์„œ ๋‚ฎ์€ ๊ณณ์œผ๋กœ ์ •๋ ฌ
flat_elev = elev.ravel()
sorted_indices = np.argsort(flat_elev)[::-1]
# D8 ๋ฐฉํ–ฅ (8๋ฐฉํ–ฅ ์ด์›ƒ)
neighbors = [(-1,-1), (-1,0), (-1,1), (0,-1), (0,1), (1,-1), (1,0), (1,1)]
for idx in sorted_indices:
y, x = idx // w, idx % w
current_elev = elev[y, x]
# ๊ฐ€์žฅ ๋‚ฎ์€ ์ด์›ƒ ์ฐพ๊ธฐ
min_elev = current_elev
min_neighbor = None
for dy, dx in neighbors:
ny, nx = y + dy, x + dx
if 0 <= ny < h and 0 <= nx < w:
if elev[ny, nx] < min_elev:
min_elev = elev[ny, nx]
min_neighbor = (ny, nx)
# ํ•˜๋ฅ˜๋กœ ์œ ๋Ÿ‰ ์ „๋‹ฌ
if min_neighbor is not None:
acc[min_neighbor] += acc[y, x]
self.discharge = acc
return acc
def calculate_hydraulics(self):
"""Manning ๋ฐฉ์ •์‹ ๊ธฐ๋ฐ˜ ์ˆ˜๋ฆฌํ•™ ๊ณ„์‚ฐ"""
slope = self.terrain.get_slope() + 0.0001 # 0 ๋ฐฉ์ง€
# ๊ฐ€์ •: ์ฑ„๋„ ํญ = ์œ ๋Ÿ‰์˜ ํ•จ์ˆ˜
channel_width = 2 * np.power(self.discharge + 0.01, 0.4)
# Manning ๋ฐฉ์ •์‹: V = (1/n) * R^(2/3) * S^(1/2)
# ๋‹จ์ˆœํ™”: R โ‰ˆ depth
# Q = V * A, A = width * depth
# depth = (Q * n / (width * S^0.5))^(3/5)
self.depth = np.power(
self.discharge * self.manning_n / (channel_width * np.sqrt(slope) + 0.01),
0.6
)
self.depth = np.clip(self.depth, 0, 50)
# ์œ ์†
hydraulic_radius = self.depth # ๋‹จ์ˆœํ™”
self.velocity = (1 / self.manning_n) * np.power(hydraulic_radius, 2/3) * np.sqrt(slope)
self.velocity = np.clip(self.velocity, 0, 10)
# ์ „๋‹จ์‘๋ ฅ ฯ„ = ฯgRS
rho_water = 1000 # kg/mยณ
g = 9.81
self.shear_stress = rho_water * g * self.depth * slope
class StreamPowerErosion:
"""Stream Power Law ๊ธฐ๋ฐ˜ ์นจ์‹
E = K * A^m * S^n
- E: ์นจ์‹๋ฅ  (m/yr)
- K: ์นจ์‹ ๊ณ„์ˆ˜ (์•”์„ ํŠน์„ฑ ๋ฐ˜์˜)
- A: ์œ ์—ญ ๋ฉด์  (โ‰ˆ ์œ ๋Ÿ‰)
- S: ๊ฒฝ์‚ฌ
- m: ๋ฉด์  ์ง€์ˆ˜ (typically 0.3-0.6)
- n: ๊ฒฝ์‚ฌ ์ง€์ˆ˜ (typically 1.0-2.0)
"""
def __init__(self, K: float = 1e-5, m: float = 0.5, n: float = 1.0):
self.K = K
self.m = m
self.n = n
def calculate_erosion(self, terrain: TerrainGrid, water: WaterFlow, dt: float = 1.0) -> np.ndarray:
"""์นจ์‹๋Ÿ‰ ๊ณ„์‚ฐ"""
slope = terrain.get_slope()
# Stream Power Law
# K๋Š” ์•”์„ ๊ฒฝ๋„์— ๋ฐ˜๋น„๋ก€
effective_K = self.K * (1 - terrain.rock_hardness * 0.9)
erosion_rate = effective_K * np.power(water.discharge, self.m) * np.power(slope + 0.001, self.n)
erosion = erosion_rate * dt
# ๊ธฐ๋ฐ˜์•” ์ดํ•˜๋กœ ์นจ์‹ ๋ถˆ๊ฐ€
max_erosion = terrain.elevation - terrain.bedrock
erosion = np.minimum(erosion, np.maximum(max_erosion, 0))
return np.clip(erosion, 0, 5.0) # ์—ฐ๊ฐ„ ์ตœ๋Œ€ 5m
class HillslopeProcess:
"""์‚ฌ๋ฉด ํ”„๋กœ์„ธ์Šค (Mass Wasting)
V์ž๊ณก ํ˜•์„ฑ์˜ ํ•ต์‹ฌ - ํ•˜๋ฐฉ ์นจ์‹ ํ›„ ์‚ฌ๋ฉด ๋ถ•๊ดด
"""
def __init__(self, critical_slope: float = 0.7, diffusion_rate: float = 0.01):
self.critical_slope = critical_slope # ์ž„๊ณ„ ๊ฒฝ์‚ฌ (tan ฮธ)
self.diffusion_rate = diffusion_rate # ํ™•์‚ฐ ๊ณ„์ˆ˜
def mass_wasting(self, terrain: TerrainGrid, dt: float = 1.0) -> np.ndarray:
"""์‚ฌ๋ฉด ๋ถ•๊ดด (๊ธ‰๊ฒฝ์‚ฌ โ†’ ๋ฌผ์งˆ ์ด๋™)"""
h, w = terrain.height, terrain.width
change = np.zeros((h, w))
elev = terrain.elevation
slope = terrain.get_slope()
# ์ž„๊ณ„ ๊ฒฝ์‚ฌ ์ดˆ๊ณผ ์ง€์ 
unstable = slope > self.critical_slope
# ๋ถˆ์•ˆ์ • ์ง€์ ์—์„œ ์ด์›ƒ์œผ๋กœ ๋ฌผ์งˆ ๋ถ„๋ฐฐ
for y in range(1, h-1):
for x in range(1, w-1):
if not unstable[y, x]:
continue
current = elev[y, x]
excess = (slope[y, x] - self.critical_slope) * terrain.cell_size
# 8๋ฐฉํ–ฅ ์ด์›ƒ ์ค‘ ๋‚ฎ์€ ๊ณณ์œผ๋กœ ๋ถ„๋ฐฐ
neighbors = [(y-1,x), (y+1,x), (y,x-1), (y,x+1)]
lower_neighbors = [(ny, nx) for ny, nx in neighbors
if elev[ny, nx] < current]
if lower_neighbors:
transfer = excess * 0.2 * dt # ์ „๋‹ฌ๋Ÿ‰
change[y, x] -= transfer
per_neighbor = transfer / len(lower_neighbors)
for ny, nx in lower_neighbors:
change[ny, nx] += per_neighbor
return change
def soil_creep(self, terrain: TerrainGrid, dt: float = 1.0) -> np.ndarray:
"""ํ† ์–‘ ํฌ๋ฆฌํ”„ (๋А๋ฆฐ ํ™•์‚ฐ)"""
# ๋ผํ”Œ๋ผ์‹œ์•ˆ ํ™•์‚ฐ
laplacian = (
np.roll(terrain.elevation, 1, axis=0) +
np.roll(terrain.elevation, -1, axis=0) +
np.roll(terrain.elevation, 1, axis=1) +
np.roll(terrain.elevation, -1, axis=1) -
4 * terrain.elevation
)
return self.diffusion_rate * laplacian * dt
class VValleySimulation:
"""V์ž๊ณก ์‹œ๋ฎฌ๋ ˆ์ด์…˜ - ์‹ค์ œ ๋ฌผ๋ฆฌ ๊ธฐ๋ฐ˜
ํ”„๋กœ์„ธ์Šค:
1. ๊ฐ•์ˆ˜ โ†’ ์œ ์ถœ (D8 flow accumulation)
2. Stream Power Law ์นจ์‹
3. ์‚ฌ๋ฉด ๋ถ•๊ดด (Mass Wasting)
4. ํ† ์–‘ ํฌ๋ฆฌํ”„
"""
def __init__(self, width: int = 100, height: int = 100):
self.terrain = TerrainGrid(width=width, height=height)
self.water = WaterFlow(terrain=self.terrain)
self.erosion = StreamPowerErosion()
self.hillslope = HillslopeProcess()
self.history: List[np.ndarray] = []
self.time = 0.0
def initialize_terrain(self, max_elevation: float = 500.0,
initial_channel_depth: float = 10.0,
rock_hardness: float = 0.5):
"""์ดˆ๊ธฐ ์ง€ํ˜• ์„ค์ •"""
h, w = self.terrain.height, self.terrain.width
# ๋ถโ†’๋‚จ ๊ฒฝ์‚ฌ
for y in range(h):
base = max_elevation * (1 - y / h)
self.terrain.elevation[y, :] = base
# ์ค‘์•™์— ์ดˆ๊ธฐ ํ•˜์ฒœ ์ฑ„๋„
center = w // 2
for x in range(center - 3, center + 4):
if 0 <= x < w:
depth = initial_channel_depth * (1 - abs(x - center) / 4)
self.terrain.elevation[:, x] -= depth
# ์•”์„ ๊ฒฝ๋„
self.terrain.rock_hardness[:] = rock_hardness
# ๊ธฐ๋ฐ˜์•”
self.terrain.bedrock[:] = self.terrain.elevation.min() - 200
self.history = [self.terrain.elevation.copy()]
self.time = 0.0
def step(self, dt: float = 1.0, precipitation: float = 0.001):
"""1 ํƒ€์ž„์Šคํ… ์ง„ํ–‰"""
# 1. ์ˆ˜๋ฌธ ๊ณ„์‚ฐ
self.water.flow_accumulation_d8(precipitation)
self.water.calculate_hydraulics()
# 2. Stream Power ์นจ์‹
erosion = self.erosion.calculate_erosion(self.terrain, self.water, dt)
self.terrain.elevation -= erosion
# 3. ์‚ฌ๋ฉด ๋ถ•๊ดด
wasting = self.hillslope.mass_wasting(self.terrain, dt)
self.terrain.elevation += wasting
# 4. ํ† ์–‘ ํฌ๋ฆฌํ”„
creep = self.hillslope.soil_creep(self.terrain, dt)
self.terrain.elevation += creep
self.time += dt
def run(self, total_time: float, save_interval: float = 100.0, dt: float = 1.0):
"""์‹œ๋ฎฌ๋ ˆ์ด์…˜ ์‹คํ–‰ ๋ฐ ํžˆ์Šคํ† ๋ฆฌ ์ €์žฅ"""
steps = int(total_time / dt)
save_every = int(save_interval / dt)
for i in range(steps):
self.step(dt)
if (i + 1) % save_every == 0:
self.history.append(self.terrain.elevation.copy())
return self.history
def get_cross_section(self, y_position: int = None) -> Tuple[np.ndarray, np.ndarray]:
"""๋‹จ๋ฉด ์ถ”์ถœ"""
if y_position is None:
y_position = self.terrain.height // 2
x = np.arange(self.terrain.width) * self.terrain.cell_size
z = self.terrain.elevation[y_position, :]
return x, z
def measure_valley_depth(self) -> float:
"""V์ž๊ณก ๊นŠ์ด ์ธก์ •"""
center = self.terrain.width // 2
y_mid = self.terrain.height // 2
# ์ค‘์•™๊ณผ ์–‘์ชฝ 20์…€ ๋–จ์–ด์ง„ ๊ณณ์˜ ๊ณ ๋„ ์ฐจ์ด
left = self.terrain.elevation[y_mid, max(0, center-20)]
right = self.terrain.elevation[y_mid, min(self.terrain.width-1, center+20)]
center_elev = self.terrain.elevation[y_mid, center]
return max(0, (left + right) / 2 - center_elev)
# ํ”„๋ฆฌ์ปดํ“จํŒ… ํ•จ์ˆ˜
def precompute_v_valley(max_time: int = 10000,
rock_hardness: float = 0.5,
K: float = 1e-5,
precipitation: float = 0.001,
save_every: int = 100) -> List[np.ndarray]:
"""V์ž๊ณก ์‹œ๋ฎฌ๋ ˆ์ด์…˜ ํ”„๋ฆฌ์ปดํ“จํŒ…"""
sim = VValleySimulation(width=100, height=100)
sim.erosion.K = K
sim.initialize_terrain(rock_hardness=rock_hardness)
history = sim.run(
total_time=max_time,
save_interval=save_every,
dt=1.0
)
return history
if __name__ == "__main__":
print("V์ž๊ณก ๋ฌผ๋ฆฌ ์‹œ๋ฎฌ๋ ˆ์ด์…˜ ํ…Œ์ŠคํŠธ")
print("=" * 50)
sim = VValleySimulation()
sim.initialize_terrain(rock_hardness=0.3)
print(f"์ดˆ๊ธฐ ์ƒํƒœ: ๊นŠ์ด = {sim.measure_valley_depth():.1f}m")
for year in [1000, 2000, 5000, 10000]:
sim.run(1000, save_interval=1000)
depth = sim.measure_valley_depth()
print(f"Year {year}: ๊นŠ์ด = {depth:.1f}m")
print("=" * 50)
print("ํ…Œ์ŠคํŠธ ์™„๋ฃŒ!")