| | |
| | import os |
| |
|
| | import numpy as np |
| | import pandas as pd |
| | import psds_eval |
| | import sed_eval |
| | from psds_eval import PSDSEval, plot_psd_roc |
| |
|
| |
|
| | def get_event_list_current_file(df, fname): |
| | """ |
| | Get list of events for a given filename |
| | Args: |
| | df: pd.DataFrame, the dataframe to search on |
| | fname: the filename to extract the value from the dataframe |
| | Returns: |
| | list of events (dictionaries) for the given filename |
| | """ |
| | event_file = df[df["filename"] == fname] |
| | if len(event_file) == 1: |
| | if pd.isna(event_file["event_label"].iloc[0]): |
| | event_list_for_current_file = [{"filename": fname}] |
| | else: |
| | event_list_for_current_file = event_file.to_dict("records") |
| | else: |
| | event_list_for_current_file = event_file.to_dict("records") |
| |
|
| | return event_list_for_current_file |
| |
|
| |
|
| | def psds_results(psds_obj): |
| | """ Compute psds scores |
| | Args: |
| | psds_obj: psds_eval.PSDSEval object with operating points. |
| | Returns: |
| | """ |
| | try: |
| | psds_score = psds_obj.psds(alpha_ct=0, alpha_st=0, max_efpr=100) |
| | print(f"\nPSD-Score (0, 0, 100): {psds_score.value:.5f}") |
| | psds_score = psds_obj.psds(alpha_ct=1, alpha_st=0, max_efpr=100) |
| | print(f"\nPSD-Score (1, 0, 100): {psds_score.value:.5f}") |
| | psds_score = psds_obj.psds(alpha_ct=0, alpha_st=1, max_efpr=100) |
| | print(f"\nPSD-Score (0, 1, 100): {psds_score.value:.5f}") |
| | except psds_eval.psds.PSDSEvalError as e: |
| | print("psds did not work ....") |
| | raise EnvironmentError |
| |
|
| |
|
| | def event_based_evaluation_df( |
| | reference, estimated, t_collar=0.200, percentage_of_length=0.2 |
| | ): |
| | """ Calculate EventBasedMetric given a reference and estimated dataframe |
| | |
| | Args: |
| | reference: pd.DataFrame containing "filename" "onset" "offset" and "event_label" columns which describe the |
| | reference events |
| | estimated: pd.DataFrame containing "filename" "onset" "offset" and "event_label" columns which describe the |
| | estimated events to be compared with reference |
| | t_collar: float, in seconds, the number of time allowed on onsets and offsets |
| | percentage_of_length: float, between 0 and 1, the percentage of length of the file allowed on the offset |
| | Returns: |
| | sed_eval.sound_event.EventBasedMetrics with the scores |
| | """ |
| |
|
| | evaluated_files = reference["filename"].unique() |
| |
|
| | classes = [] |
| | classes.extend(reference.event_label.dropna().unique()) |
| | classes.extend(estimated.event_label.dropna().unique()) |
| | classes = list(set(classes)) |
| |
|
| | event_based_metric = sed_eval.sound_event.EventBasedMetrics( |
| | event_label_list=classes, |
| | t_collar=t_collar, |
| | percentage_of_length=percentage_of_length, |
| | empty_system_output_handling="zero_score", |
| | ) |
| |
|
| | for fname in evaluated_files: |
| | reference_event_list_for_current_file = get_event_list_current_file( |
| | reference, fname |
| | ) |
| | estimated_event_list_for_current_file = get_event_list_current_file( |
| | estimated, fname |
| | ) |
| |
|
| | event_based_metric.evaluate( |
| | reference_event_list=reference_event_list_for_current_file, |
| | estimated_event_list=estimated_event_list_for_current_file, |
| | ) |
| |
|
| | return event_based_metric |
| |
|
| |
|
| | def segment_based_evaluation_df(reference, estimated, time_resolution=1.0): |
| | """ Calculate SegmentBasedMetrics given a reference and estimated dataframe |
| | |
| | Args: |
| | reference: pd.DataFrame containing "filename" "onset" "offset" and "event_label" columns which describe the |
| | reference events |
| | estimated: pd.DataFrame containing "filename" "onset" "offset" and "event_label" columns which describe the |
| | estimated events to be compared with reference |
| | time_resolution: float, the time resolution of the segment based metric |
| | Returns: |
| | sed_eval.sound_event.SegmentBasedMetrics with the scores |
| | """ |
| | evaluated_files = reference["filename"].unique() |
| |
|
| | classes = [] |
| | classes.extend(reference.event_label.dropna().unique()) |
| | classes.extend(estimated.event_label.dropna().unique()) |
| | classes = list(set(classes)) |
| |
|
| | segment_based_metric = sed_eval.sound_event.SegmentBasedMetrics( |
| | event_label_list=classes, time_resolution=time_resolution |
| | ) |
| |
|
| | for fname in evaluated_files: |
| | reference_event_list_for_current_file = get_event_list_current_file( |
| | reference, fname |
| | ) |
| | estimated_event_list_for_current_file = get_event_list_current_file( |
| | estimated, fname |
| | ) |
| |
|
| | segment_based_metric.evaluate( |
| | reference_event_list=reference_event_list_for_current_file, |
| | estimated_event_list=estimated_event_list_for_current_file, |
| | ) |
| |
|
| | return segment_based_metric |
| |
|
| |
|
| | def compute_sed_eval_metrics(predictions, groundtruth): |
| | """ Compute sed_eval metrics event based and segment based with default parameters used in the task. |
| | Args: |
| | predictions: pd.DataFrame, predictions dataframe |
| | groundtruth: pd.DataFrame, groundtruth dataframe |
| | Returns: |
| | tuple, (sed_eval.sound_event.EventBasedMetrics, sed_eval.sound_event.SegmentBasedMetrics) |
| | """ |
| | metric_event = event_based_evaluation_df( |
| | groundtruth, predictions, t_collar=0.200, percentage_of_length=0.2 |
| | ) |
| | metric_segment = segment_based_evaluation_df( |
| | groundtruth, predictions, time_resolution=1.0 |
| | ) |
| |
|
| | return metric_event, metric_segment |
| |
|
| |
|
| | def compute_per_intersection_macro_f1( |
| | prediction_dfs, |
| | ground_truth_file, |
| | durations_file, |
| | dtc_threshold=0.5, |
| | gtc_threshold=0.5, |
| | cttc_threshold=0.3, |
| | ): |
| | """ Compute F1-score per intersection, using the defautl |
| | Args: |
| | prediction_dfs: dict, a dictionary with thresholds keys and predictions dataframe |
| | ground_truth_file: pd.DataFrame, the groundtruth dataframe |
| | durations_file: pd.DataFrame, the duration dataframe |
| | dtc_threshold: float, the parameter used in PSDSEval, percentage of tolerance for groundtruth intersection |
| | with predictions |
| | gtc_threshold: float, the parameter used in PSDSEval percentage of tolerance for predictions intersection |
| | with groundtruth |
| | gtc_threshold: float, the parameter used in PSDSEval to know the percentage needed to count FP as cross-trigger |
| | |
| | Returns: |
| | |
| | """ |
| | gt = pd.read_csv(ground_truth_file, sep="\t") |
| | durations = pd.read_csv(durations_file, sep="\t") |
| |
|
| | psds = PSDSEval( |
| | ground_truth=gt, |
| | metadata=durations, |
| | dtc_threshold=dtc_threshold, |
| | gtc_threshold=gtc_threshold, |
| | cttc_threshold=cttc_threshold, |
| | ) |
| | psds_macro_f1 = [] |
| | for threshold in prediction_dfs.keys(): |
| | if not prediction_dfs[threshold].empty: |
| | threshold_f1, _ = psds.compute_macro_f_score(prediction_dfs[threshold]) |
| | else: |
| | threshold_f1 = 0 |
| | if np.isnan(threshold_f1): |
| | threshold_f1 = 0.0 |
| | psds_macro_f1.append(threshold_f1) |
| | psds_macro_f1 = np.mean(psds_macro_f1) |
| | return psds_macro_f1 |
| |
|
| |
|
| | def compute_psds_from_operating_points( |
| | prediction_dfs, |
| | ground_truth_file, |
| | durations_file, |
| | dtc_threshold=0.5, |
| | gtc_threshold=0.5, |
| | cttc_threshold=0.3, |
| | alpha_ct=0, |
| | alpha_st=0, |
| | max_efpr=100, |
| | save_dir=None, |
| | ): |
| |
|
| | gt = pd.read_csv(ground_truth_file, sep="\t") |
| | durations = pd.read_csv(durations_file, sep="\t") |
| | psds_eval = PSDSEval( |
| | ground_truth=gt, |
| | metadata=durations, |
| | dtc_threshold=dtc_threshold, |
| | gtc_threshold=gtc_threshold, |
| | cttc_threshold=cttc_threshold, |
| | ) |
| |
|
| | for i, k in enumerate(prediction_dfs.keys()): |
| | det = prediction_dfs[k] |
| | |
| | det["index"] = range(1, len(det) + 1) |
| | det = det.set_index("index") |
| | psds_eval.add_operating_point( |
| | det, info={"name": f"Op {i + 1:02d}", "threshold": k} |
| | ) |
| |
|
| | psds_score = psds_eval.psds(alpha_ct=alpha_ct, alpha_st=alpha_st, max_efpr=max_efpr) |
| |
|
| | if save_dir is not None: |
| | os.makedirs(save_dir, exist_ok=True) |
| |
|
| | pred_dir = os.path.join( |
| | save_dir, |
| | f"predictions_dtc{dtc_threshold}_gtc{gtc_threshold}_cttc{cttc_threshold}", |
| | ) |
| | os.makedirs(pred_dir, exist_ok=True) |
| | for k in prediction_dfs.keys(): |
| | prediction_dfs[k].to_csv( |
| | os.path.join(pred_dir, f"predictions_th_{k:.2f}.tsv"), |
| | sep="\t", |
| | index=False, |
| | ) |
| |
|
| | plot_psd_roc( |
| | psds_score, |
| | filename=os.path.join(save_dir, f"PSDS_ct{alpha_ct}_st{alpha_st}_100.png"), |
| | ) |
| |
|
| | return psds_score.value |
| |
|