| | import os |
| | from pathlib import Path |
| |
|
| | import numpy as np |
| | import pandas as pd |
| | import scipy |
| | import torch |
| |
|
| | from desed_task.evaluation.evaluation_measures import compute_sed_eval_metrics |
| | import json |
| |
|
| | import soundfile |
| | import glob |
| | from thop import profile, clever_format |
| |
|
| | from sed_scores_eval.base_modules.scores import create_score_dataframe |
| |
|
| |
|
| | def batched_decode_preds( |
| | strong_preds, filenames, encoder, thresholds=[0.5], median_filter=7, pad_indx=None, |
| | ): |
| | """ Decode a batch of predictions to dataframes. Each threshold gives a different dataframe and stored in a |
| | dictionary |
| | |
| | Args: |
| | strong_preds: torch.Tensor, batch of strong predictions. |
| | filenames: list, the list of filenames of the current batch. |
| | encoder: ManyHotEncoder object, object used to decode predictions. |
| | thresholds: list, the list of thresholds to be used for predictions. |
| | median_filter: int, the number of frames for which to apply median window (smoothing). |
| | pad_indx: list, the list of indexes which have been used for padding. |
| | |
| | Returns: |
| | dict of predictions, each keys is a threshold and the value is the DataFrame of predictions. |
| | """ |
| | |
| | scores_raw = {} |
| | scores_postprocessed = {} |
| | prediction_dfs = {} |
| | for threshold in thresholds: |
| | prediction_dfs[threshold] = pd.DataFrame() |
| |
|
| | for j in range(strong_preds.shape[0]): |
| | audio_id = Path(filenames[j]).stem |
| | filename = audio_id + ".wav" |
| | c_scores = strong_preds[j] |
| | if pad_indx is not None: |
| | true_len = int(c_scores.shape[-1] * pad_indx[j].item()) |
| | c_scores = c_scores[:true_len] |
| | c_scores = c_scores.transpose(0, 1).detach().cpu().numpy() |
| | scores_raw[audio_id] = create_score_dataframe( |
| | scores=c_scores, |
| | timestamps=encoder._frame_to_time(np.arange(len(c_scores)+1)), |
| | event_classes=encoder.labels, |
| | ) |
| | c_scores = scipy.ndimage.filters.median_filter(c_scores, (median_filter, 1)) |
| | scores_postprocessed[audio_id] = create_score_dataframe( |
| | scores=c_scores, |
| | timestamps=encoder._frame_to_time(np.arange(len(c_scores)+1)), |
| | event_classes=encoder.labels, |
| | ) |
| | for c_th in thresholds: |
| | pred = c_scores > c_th |
| | pred = encoder.decode_strong(pred) |
| | pred = pd.DataFrame(pred, columns=["event_label", "onset", "offset"]) |
| | pred["filename"] = filename |
| | prediction_dfs[c_th] = pd.concat([prediction_dfs[c_th], pred], ignore_index=True) |
| |
|
| | return scores_raw, scores_postprocessed, prediction_dfs |
| |
|
| |
|
| | def convert_to_event_based(weak_dataframe): |
| | """ Convert a weakly labeled DataFrame ('filename', 'event_labels') to a DataFrame strongly labeled |
| | ('filename', 'onset', 'offset', 'event_label'). |
| | |
| | Args: |
| | weak_dataframe: pd.DataFrame, the dataframe to be converted. |
| | |
| | Returns: |
| | pd.DataFrame, the dataframe strongly labeled. |
| | """ |
| |
|
| | new = [] |
| | for i, r in weak_dataframe.iterrows(): |
| |
|
| | events = r["event_labels"].split(",") |
| | for e in events: |
| | new.append( |
| | {"filename": r["filename"], "event_label": e, "onset": 0, "offset": 1} |
| | ) |
| | return pd.DataFrame(new) |
| |
|
| |
|
| | def log_sedeval_metrics(predictions, ground_truth, save_dir=None): |
| | """ Return the set of metrics from sed_eval |
| | Args: |
| | predictions: pd.DataFrame, the dataframe of predictions. |
| | ground_truth: pd.DataFrame, the dataframe of groundtruth. |
| | save_dir: str, path to the folder where to save the event and segment based metrics outputs. |
| | |
| | Returns: |
| | tuple, event-based macro-F1 and micro-F1, segment-based macro-F1 and micro-F1 |
| | """ |
| | if predictions.empty: |
| | return 0.0, 0.0, 0.0, 0.0 |
| |
|
| | gt = pd.read_csv(ground_truth, sep="\t") |
| |
|
| | event_res, segment_res = compute_sed_eval_metrics(predictions, gt) |
| |
|
| | if save_dir is not None: |
| | os.makedirs(save_dir, exist_ok=True) |
| | with open(os.path.join(save_dir, "event_f1.txt"), "w") as f: |
| | f.write(str(event_res)) |
| |
|
| | with open(os.path.join(save_dir, "segment_f1.txt"), "w") as f: |
| | f.write(str(segment_res)) |
| |
|
| | return ( |
| | event_res.results()["class_wise_average"]["f_measure"]["f_measure"], |
| | event_res.results()["overall"]["f_measure"]["f_measure"], |
| | segment_res.results()["class_wise_average"]["f_measure"]["f_measure"], |
| | segment_res.results()["overall"]["f_measure"]["f_measure"], |
| | ) |
| |
|
| |
|
| | def parse_jams(jams_list, encoder, out_json): |
| |
|
| | if len(jams_list) == 0: |
| | raise IndexError("jams list is empty ! Wrong path ?") |
| |
|
| | backgrounds = [] |
| | sources = [] |
| | for jamfile in jams_list: |
| |
|
| | with open(jamfile, "r") as f: |
| | jdata = json.load(f) |
| |
|
| | |
| | assert len(jdata["annotations"][0]["data"]) == len( |
| | jdata["annotations"][-1]["sandbox"]["scaper"]["isolated_events_audio_path"] |
| | ) |
| |
|
| | for indx, sound in enumerate(jdata["annotations"][0]["data"]): |
| | source_name = Path( |
| | jdata["annotations"][-1]["sandbox"]["scaper"][ |
| | "isolated_events_audio_path" |
| | ][indx] |
| | ).stem |
| | source_file = os.path.join( |
| | Path(jamfile).parent, |
| | Path(jamfile).stem + "_events", |
| | source_name + ".wav", |
| | ) |
| |
|
| | if sound["value"]["role"] == "background": |
| | backgrounds.append(source_file) |
| | else: |
| | if ( |
| | sound["value"]["label"] not in encoder.labels |
| | ): |
| | if sound["value"]["label"].startswith("Frying"): |
| | sound["value"]["label"] = "Frying" |
| | elif sound["value"]["label"].startswith("Vacuum_cleaner"): |
| | sound["value"]["label"] = "Vacuum_cleaner" |
| | else: |
| | raise NotImplementedError |
| |
|
| | sources.append( |
| | { |
| | "filename": source_file, |
| | "onset": sound["value"]["event_time"], |
| | "offset": sound["value"]["event_time"] |
| | + sound["value"]["event_duration"], |
| | "event_label": sound["value"]["label"], |
| | } |
| | ) |
| |
|
| | os.makedirs(Path(out_json).parent, exist_ok=True) |
| | with open(out_json, "w") as f: |
| | json.dump({"backgrounds": backgrounds, "sources": sources}, f, indent=4) |
| |
|
| |
|
| | def generate_tsv_wav_durations(audio_dir, out_tsv): |
| | """ |
| | Generate a dataframe with filename and duration of the file |
| | |
| | Args: |
| | audio_dir: str, the path of the folder where audio files are (used by glob.glob) |
| | out_tsv: str, the path of the output tsv file |
| | |
| | Returns: |
| | pd.DataFrame: the dataframe containing filenames and durations |
| | """ |
| | meta_list = [] |
| | for file in glob.glob(os.path.join(audio_dir, "*.wav")): |
| | d = soundfile.info(file).duration |
| | meta_list.append([os.path.basename(file), d]) |
| | meta_df = pd.DataFrame(meta_list, columns=["filename", "duration"]) |
| | if out_tsv is not None: |
| | meta_df.to_csv(out_tsv, sep="\t", index=False, float_format="%.1f") |
| |
|
| | return meta_df |
| |
|
| |
|
| | def calculate_macs(model, config, dataset=None): |
| | """ |
| | The function calculate the multiply–accumulate operation (MACs) of the model given as input. |
| | |
| | Args: |
| | model: deep learning model to calculate the macs for |
| | config: config used to train the model |
| | dataset: dataset used to train the model |
| | |
| | Returns: |
| | |
| | """ |
| | n_frames = int(((config["feats"]["sample_rate"] * config["data"]["audio_max_len"]) / config["feats"]["hop_length"])+1) |
| | input_size = [sum(config["training"]["batch_size"]), config["feats"]["n_mels"], n_frames] |
| | input = torch.randn(input_size) |
| |
|
| | if "use_embeddings" in config["net"] and config["net"]["use_embeddings"]: |
| | audio, label, padded_indxs, path, embeddings = dataset[0] |
| | embeddings = embeddings.repeat((sum(config["training"]["batch_size"])), 1, 1) |
| | macs, params = profile(model, inputs=(input, None, embeddings)) |
| | else: |
| | macs, params = profile(model, inputs=(input,)) |
| | |
| | macs, params = clever_format([macs, params], "%.3f") |
| | return macs, params |
| |
|