| import os |
| import zipfile |
| from io import BytesIO |
|
|
| import pandas as pd |
| import streamlit as st |
|
|
| |
|
|
|
|
| def find_header_row(df, keyword="Dist_Name"): |
| for i in range(min(20, len(df))): |
| row = df.iloc[i].astype(str).str.strip().str.lower() |
| if any(keyword.lower() in str(cell) for cell in row): |
| return i |
| raise ValueError(f"No row with '{keyword}' found.") |
|
|
|
|
| def read_sheet_fallback(file_bytes, sheet): |
| file_bytes.seek(0) |
| return pd.read_excel(file_bytes, sheet_name=sheet, header=None, engine="calamine") |
|
|
|
|
| def load_clean_df(file_bytes, sheet): |
| df_raw = read_sheet_fallback(file_bytes, sheet) |
| header_row = find_header_row(df_raw) |
| df_raw.columns = df_raw.iloc[header_row] |
| df = df_raw.drop(index=list(range(header_row + 1))) |
| df.columns = [str(c).strip().replace("\xa0", " ") for c in df.columns] |
| df = df.astype(str).apply(lambda col: col.str.strip()) |
| return df |
|
|
|
|
| def detect_dist_col(columns): |
| for col in columns: |
| if "dist" in col.lower() and "name" in col.lower(): |
| return col |
| raise ValueError("Dist_Name column not found.") |
|
|
|
|
| |
|
|
| st.title(":material/compare_arrows: Dump Compare Tool") |
| st.markdown( |
| ":blue[**Upload the old and new dumps, then input the object class (comma-separated) to compare**]" |
| ) |
|
|
| old_file = st.file_uploader("Upload Old Dump (.xlsb)", type=["xlsb"], key="old") |
| new_file = st.file_uploader("Upload New Dump (.xlsb)", type=["xlsb"], key="new") |
|
|
| sheet_list_input = st.text_input( |
| "Enter object class (comma-separated)", placeholder="e.g. BCF, BTS, CELL" |
| ) |
|
|
| if st.button("Run Comparison", type="primary", use_container_width=True): |
| if not all([old_file, new_file, sheet_list_input.strip()]): |
| st.warning("Please upload both files and provide at least one sheet name.") |
| else: |
| sheet_names = [s.strip() for s in sheet_list_input.split(",") if s.strip()] |
| old_bytes = BytesIO(old_file.read()) |
| new_bytes = BytesIO(new_file.read()) |
|
|
| logs = [] |
| total = 0 |
| all_results = {} |
|
|
| for sheet in sheet_names: |
| try: |
| df_old = load_clean_df(old_bytes, sheet) |
| old_bytes.seek(0) |
| df_new = load_clean_df(new_bytes, sheet) |
| new_bytes.seek(0) |
|
|
| dist_col_old = detect_dist_col(df_old.columns) |
| dist_col_new = detect_dist_col(df_new.columns) |
|
|
| df_old = df_old[df_old[dist_col_old].notna()].set_index(dist_col_old) |
| df_new = df_new[df_new[dist_col_new].notna()].set_index(dist_col_new) |
|
|
| missing_in_new = df_old.index.difference(df_new.index) |
| missing_in_old = df_new.index.difference(df_old.index) |
|
|
| if not missing_in_new.empty: |
| logs.append( |
| f"{len(missing_in_new)} entrées présentes dans l'ancien dump mais absentes dans le nouveau pour '{sheet}'." |
| ) |
| if not missing_in_old.empty: |
| logs.append( |
| f"{len(missing_in_old)} entrées présentes dans le nouveau dump mais absentes dans l'ancien pour '{sheet}'." |
| ) |
|
|
| common = df_old.index.intersection(df_new.index) |
| if common.empty: |
| logs.append(f"Aucun Dist_Name commun trouvé pour '{sheet}'.") |
| continue |
|
|
| common_cols = df_old.columns.intersection(df_new.columns) |
| if common_cols.empty: |
| logs.append( |
| f"Aucune colonne commune entre les dumps pour '{sheet}'." |
| ) |
| continue |
|
|
| df_old_common = df_old.loc[common, common_cols] |
| df_new_common = df_new.loc[common, common_cols] |
|
|
| mask = (df_old_common != df_new_common) & ~( |
| df_old_common.isna() & df_new_common.isna() |
| ) |
|
|
| changes = [] |
| for dist in missing_in_new: |
| changes.append( |
| { |
| "Dist_Name": dist, |
| "Parameter": "Présence ligne", |
| os.path.basename(old_file.name): "Présent", |
| os.path.basename(new_file.name): "Manquant", |
| } |
| ) |
| for dist in missing_in_old: |
| changes.append( |
| { |
| "Dist_Name": dist, |
| "Parameter": "Présence ligne", |
| os.path.basename(old_file.name): "Manquant", |
| os.path.basename(new_file.name): "Présent", |
| } |
| ) |
| for dist in mask.index: |
| for param in mask.columns[mask.loc[dist]]: |
| if param.strip().lower() == "file_name": |
| continue |
| changes.append( |
| { |
| "Dist_Name": dist, |
| "Parameter": param, |
| os.path.basename(old_file.name): df_old_common.loc[ |
| dist, param |
| ], |
| os.path.basename(new_file.name): df_new_common.loc[ |
| dist, param |
| ], |
| } |
| ) |
|
|
| df_changes = pd.DataFrame(changes) |
| if not df_changes.empty: |
| all_results[sheet] = df_changes |
| logs.append(f"{len(df_changes)} changes in '{sheet}'") |
| total += len(df_changes) |
| else: |
| logs.append(f"No changes in '{sheet}'") |
|
|
| except Exception as e: |
| logs.append(f"Error in '{sheet}': {e}") |
|
|
| st.success(f"Comparison completed. Total changes: {total}") |
| for log in logs: |
| st.write(log) |
|
|
| if all_results: |
| output_buffer = BytesIO() |
| with zipfile.ZipFile(output_buffer, mode="w") as zf: |
| for sheet, df in all_results.items(): |
| file_buffer = BytesIO() |
| df.to_excel(file_buffer, index=False) |
| zf.writestr(f"{sheet}_differences.xlsx", file_buffer.getvalue()) |
|
|
| st.download_button( |
| "Download Results (.zip)", |
| data=output_buffer.getvalue(), |
| file_name="differences.zip", |
| mime="application/zip", |
| type="primary", |
| on_click="ignore", |
| ) |
|
|