| import warnings |
|
|
| import pandas as pd |
| from geopy.distance import geodesic |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
|
|
| |
| |
| |
| |
|
|
| |
| |
|
|
| |
|
|
|
|
| def calculate_distances( |
| df1: pd.DataFrame, |
| df2: pd.DataFrame, |
| code_col1: str, |
| lat_col1: str, |
| long_col1: str, |
| code_col2: str, |
| lat_col2: str, |
| long_col2: str, |
| min_distance: float = 1.0, |
| ) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: |
| """ |
| Calculate distances between points in two datasets and find closest matches. |
| |
| Args: |
| df1: First DataFrame containing reference points |
| df2: Second DataFrame containing points to compare |
| code_col1: Column name in df1 containing point identifiers |
| lat_col1: Column name in df1 containing latitude |
| long_col1: Column name in df1 containing longitude |
| code_col2: Column name in df2 containing point identifiers |
| lat_col2: Column name in df2 containing latitude |
| long_col2: Column name in df2 containing longitude |
| min_distance: Minimum distance threshold in kilometers |
| |
| Returns: |
| tuple: (all_distances, closest_matches, matches_below_threshold) |
| """ |
| |
| required_cols_1 = {code_col1, lat_col1, long_col1} |
| required_cols_2 = {code_col2, lat_col2, long_col2} |
|
|
| if not required_cols_1.issubset(df1.columns): |
| raise ValueError( |
| f"df1 is missing required columns: {required_cols_1 - set(df1.columns)}" |
| ) |
| if not required_cols_2.issubset(df2.columns): |
| raise ValueError( |
| f"df2 is missing required columns: {required_cols_2 - set(df2.columns)}" |
| ) |
|
|
| |
| coords1 = df1[[lat_col1, long_col1]].apply(tuple, axis=1).tolist() |
| coords2 = df2[[lat_col2, long_col2]].apply(tuple, axis=1).tolist() |
|
|
| |
| distances = [] |
| for i, coord1 in enumerate(coords1): |
| for j, coord2 in enumerate(coords2): |
| try: |
| distance_km = geodesic(coord1, coord2).kilometers |
| distances.append( |
| { |
| **df1.iloc[i].to_dict(), |
| **{f"{col}_Dataset2": df2.iloc[j][col] for col in df2.columns}, |
| "Distance_km": distance_km, |
| } |
| ) |
| except ValueError as e: |
| warnings.warn( |
| f"Skipping invalid coordinates: {coord1} or {coord2}: {e}" |
| ) |
| continue |
|
|
| if not distances: |
| raise ValueError("No valid coordinate pairs were processed") |
|
|
| df_distances = pd.DataFrame(distances) |
|
|
| |
| df_closest = df_distances.loc[ |
| df_distances.groupby(code_col1)["Distance_km"].idxmin() |
| ] |
|
|
| |
| df_closest_min_distance = df_distances[df_distances["Distance_km"] < min_distance] |
|
|
| return df_distances, df_closest, df_closest_min_distance |
|
|