File size: 10,195 Bytes
938949f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
"""
Tracker Optimizer: simulate agrivoltaic shading scenarios.
Uses FarquharModel + ShadowModel to compute A at different tracker tilt
angles, then finds the optimal energy/crop tradeoff.
"""

from __future__ import annotations

import sys
from pathlib import Path

import numpy as np
import pandas as pd

PROJECT_ROOT = Path(__file__).resolve().parent.parent
if str(PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(PROJECT_ROOT))

from src.farquhar_model import FarquharModel
from src.solar_geometry import ShadowModel

_model = FarquharModel()
_shadow = ShadowModel()


def load_sensor_data() -> pd.DataFrame:
    """Load sensor sample, filter daytime PAR > 50, add helper columns."""
    from src.sensor_data_loader import SensorDataLoader

    loader = SensorDataLoader()
    df = loader.load()
    df = loader.filter_daytime(df)
    df["time"] = pd.to_datetime(df["time"], utc=True)
    df["hour"] = df["time"].dt.hour + df["time"].dt.minute / 60
    df["date"] = df["time"].dt.date
    df["delta_t"] = df["Air1_leafTemperature_ref"] - df["Air1_airTemperature_ref"]
    return df


def compute_stress_heatmap(df: pd.DataFrame) -> pd.DataFrame:
    """Pivot table: hour-of-day (int) vs date, values = mean deltaT.
    Restricted to daytime hours (5:00-19:00 UTC) — no stress at night."""
    tmp = df.copy()
    tmp["hour_int"] = tmp["time"].dt.hour
    # Keep only daytime hours (sunrise ~5 UTC, sunset ~19 UTC for Sde Boker)
    tmp = tmp[(tmp["hour_int"] >= 5) & (tmp["hour_int"] <= 19)]
    pivot = tmp.pivot_table(
        values="delta_t", index="hour_int", columns="date", aggfunc="mean",
    )
    # Ensure all daytime hours are represented even if some have no data
    full_hours = list(range(5, 20))
    pivot = pivot.reindex(full_hours)
    return pivot


def _compute_A_at_par(row: pd.Series, par_factor: float) -> float:
    """Compute A for a single row with PAR scaled by par_factor."""
    par = float(row["Air1_PAR_ref"]) * par_factor
    if par <= 0:
        return 0.0
    return _model.calc_photosynthesis(
        PAR=par,
        Tleaf=float(row["Air1_leafTemperature_ref"]),
        CO2=float(row["Air1_CO2_ref"]),
        VPD=float(row["Air1_VPD_ref"]),
        Tair=float(row["Air1_airTemperature_ref"]),
    )


def _compute_A_at_par_value(row: pd.Series, par_value: float) -> float:
    """Compute A for a single row with an absolute PAR value."""
    if par_value <= 0:
        return 0.0
    return _model.calc_photosynthesis(
        PAR=par_value,
        Tleaf=float(row["Air1_leafTemperature_ref"]),
        CO2=float(row["Air1_CO2_ref"]),
        VPD=float(row["Air1_VPD_ref"]),
        Tair=float(row["Air1_airTemperature_ref"]),
    )


def simulate_tilt_angles(
    df: pd.DataFrame,
    angles: list[int] | None = None,
) -> pd.DataFrame:
    """
    For each tilt angle offset from astronomical, compute mean A and energy
    fraction across the dataset using the shadow model.
    Returns DataFrame with columns: angle, energy_pct, mean_A, A_pct.
    """
    if angles is None:
        angles = [0, 5, 10, 15, 20, 25, 30, 35, 40, 45]

    # Precompute solar positions for all timestamps
    times = pd.DatetimeIndex(df["time"])
    solar_pos = _shadow.get_solar_position(times)

    # Baseline A at astronomical tracking (offset=0)
    baseline_A_values = []
    for idx, (_, row) in enumerate(df.iterrows()):
        elev = solar_pos["solar_elevation"].iloc[idx]
        azim = solar_pos["solar_azimuth"].iloc[idx]
        if elev <= 2:
            baseline_A_values.append(0.0)
            continue
        tracker = _shadow.compute_tracker_tilt(azim, elev)
        theta_astro = tracker["tracker_theta"]
        mask = _shadow.project_shadow(elev, azim, theta_astro)
        par_dist = _shadow.compute_par_distribution(
            float(row["Air1_PAR_ref"]), mask,
            solar_elevation=elev, solar_azimuth=azim, tracker_tilt=theta_astro)
        # Canopy-average PAR (weighted by LAI)
        par_avg = float(np.average(par_dist, weights=_shadow.lai_weights, axis=0).mean())
        baseline_A_values.append(_compute_A_at_par_value(row, par_avg))
    baseline_A = np.mean(baseline_A_values)

    results = []
    for angle_offset in angles:
        A_values = []
        energy_factors = []
        for idx, (_, row) in enumerate(df.iterrows()):
            elev = solar_pos["solar_elevation"].iloc[idx]
            azim = solar_pos["solar_azimuth"].iloc[idx]
            if elev <= 2:
                A_values.append(0.0)
                energy_factors.append(1.0)
                continue
            tracker = _shadow.compute_tracker_tilt(azim, elev)
            theta_astro = tracker["tracker_theta"]
            aoi_astro = tracker["aoi"]

            # Apply offset
            theta_shade = theta_astro + angle_offset
            mask = _shadow.project_shadow(elev, azim, theta_shade)
            par_dist = _shadow.compute_par_distribution(
                float(row["Air1_PAR_ref"]), mask,
                solar_elevation=elev, solar_azimuth=azim, tracker_tilt=theta_shade)
            par_avg = float(np.average(par_dist, weights=_shadow.lai_weights, axis=0).mean())
            A_values.append(_compute_A_at_par_value(row, par_avg))

            # Energy: cos(aoi) ratio between offset and astronomical
            cos_astro = max(0.0, np.cos(np.radians(aoi_astro)))
            cos_offset = max(0.0, np.cos(np.radians(aoi_astro + angle_offset)))
            energy_factors.append(
                cos_offset / cos_astro if cos_astro > 0.01 else 1.0)

        mean_A = np.mean(A_values)
        energy_pct = np.mean(energy_factors) * 100
        A_pct = (mean_A / baseline_A * 100) if baseline_A > 0 else 0
        results.append({
            "angle": angle_offset,
            "energy_pct": energy_pct,
            "mean_A": mean_A,
            "A_pct": A_pct,
        })
    return pd.DataFrame(results)


def compute_daily_schedule(
    df: pd.DataFrame,
    stress_threshold: float = 2.0,
    shade_angle: int = 20,
) -> pd.DataFrame:
    """
    For each 15-min slot: compute astronomical tracking angle (pvlib),
    and if deltaT > threshold, offset by shade_angle to shade the vine.
    Computes A for both strategies using the shadow model.
    """
    times = pd.DatetimeIndex(df["time"])
    solar_pos = _shadow.get_solar_position(times)

    records = []
    for idx, (_, row) in enumerate(df.iterrows()):
        dt = float(row["delta_t"]) if pd.notna(row["delta_t"]) else 0.0
        stressed = dt > stress_threshold
        elev = solar_pos["solar_elevation"].iloc[idx]
        azim = solar_pos["solar_azimuth"].iloc[idx]

        if elev <= 2:
            records.append({
                "time": row["time"],
                "hour": row["hour"],
                "delta_t": dt,
                "stressed": stressed,
                "tracker_angle": 0.0,
                "recommended_angle": 0.0,
                "A_baseline": 0.0,
                "A_smart": 0.0,
                "energy_fraction": 1.0,
            })
            continue

        # Astronomical tracking angle (full sun-following)
        tracker = _shadow.compute_tracker_tilt(azim, elev)
        theta_astro = tracker["tracker_theta"]
        aoi_astro = tracker["aoi"]

        # Baseline: full astronomical tracking
        mask_baseline = _shadow.project_shadow(elev, azim, theta_astro)
        par_dist_baseline = _shadow.compute_par_distribution(
            float(row["Air1_PAR_ref"]), mask_baseline,
            solar_elevation=elev, solar_azimuth=azim, tracker_tilt=theta_astro)
        par_avg_baseline = float(
            np.average(par_dist_baseline, weights=_shadow.lai_weights, axis=0).mean())
        A_baseline = _compute_A_at_par_value(row, par_avg_baseline)

        # Smart: offset by shade_angle when stressed
        if stressed:
            theta_smart = theta_astro + shade_angle
            mask_smart = _shadow.project_shadow(elev, azim, theta_smart)
            par_dist_smart = _shadow.compute_par_distribution(
                float(row["Air1_PAR_ref"]), mask_smart,
                solar_elevation=elev, solar_azimuth=azim, tracker_tilt=theta_smart)
            par_avg_smart = float(
                np.average(par_dist_smart, weights=_shadow.lai_weights, axis=0).mean())
            A_smart = _compute_A_at_par_value(row, par_avg_smart)

            cos_astro = max(0.0, np.cos(np.radians(aoi_astro)))
            cos_offset = max(0.0, np.cos(np.radians(aoi_astro + shade_angle)))
            energy_frac = cos_offset / cos_astro if cos_astro > 0.01 else 1.0
        else:
            theta_smart = theta_astro
            A_smart = A_baseline
            energy_frac = 1.0

        records.append({
            "time": row["time"],
            "hour": row["hour"],
            "delta_t": dt,
            "stressed": stressed,
            "tracker_angle": theta_astro,
            "recommended_angle": theta_smart,
            "A_baseline": A_baseline,
            "A_smart": A_smart,
            "energy_fraction": energy_frac,
        })
    return pd.DataFrame(records)


def compute_season_summary(schedule: pd.DataFrame) -> dict:
    """Aggregate season totals from the daily schedule."""
    total_slots = len(schedule)
    stress_slots = schedule["stressed"].sum()
    stress_hours = stress_slots * 0.25  # 15-min slots

    energy_baseline = total_slots  # each slot = 1 unit at full tracking
    energy_smart = schedule["energy_fraction"].sum()
    energy_pct = (energy_smart / energy_baseline * 100) if energy_baseline > 0 else 100

    A_baseline_total = schedule["A_baseline"].sum()
    A_smart_total = schedule["A_smart"].sum()

    A_change_pct = ((A_smart_total - A_baseline_total) / A_baseline_total * 100) if A_baseline_total > 0 else 0

    # Water savings estimate: each stress hour shaded reduces transpiration demand
    water_savings_pct = min(30.0, stress_hours * 0.08)

    return {
        "energy_pct": energy_pct,
        "A_baseline_total": A_baseline_total,
        "A_smart_total": A_smart_total,
        "A_change_pct": A_change_pct,
        "stress_hours": stress_hours,
        "total_hours": total_slots * 0.25,
        "water_savings_pct": water_savings_pct,
    }