| | import numpy as np |
| | import pandas as pd |
| | from dcase_util.data import DecisionEncoder |
| |
|
| |
|
| | class ManyHotEncoder: |
| | """" |
| | Adapted after DecisionEncoder.find_contiguous_regions method in |
| | https://github.com/DCASE-REPO/dcase_util/blob/master/dcase_util/data/decisions.py |
| | |
| | Encode labels into numpy arrays where 1 correspond to presence of the class and 0 absence. |
| | Multiple 1 can appear on the same line, it is for multi label problem. |
| | Args: |
| | labels: list, the classes which will be encoded |
| | n_frames: int, (Default value = None) only useful for strong labels. The number of frames of a segment. |
| | Attributes: |
| | labels: list, the classes which will be encoded |
| | n_frames: int, only useful for strong labels. The number of frames of a segment. |
| | """ |
| |
|
| | def __init__( |
| | self, labels, audio_len, frame_len, frame_hop, net_pooling=1, fs=16000 |
| | ): |
| | if type(labels) in [np.ndarray, np.array]: |
| | labels = labels.tolist() |
| | self.labels = labels |
| | self.audio_len = audio_len |
| | self.frame_len = frame_len |
| | self.frame_hop = frame_hop |
| | self.fs = fs |
| | self.net_pooling = net_pooling |
| | n_frames = self.audio_len * self.fs |
| | |
| | |
| | |
| | self.n_frames = int(int((n_frames / self.frame_hop)) / self.net_pooling) |
| |
|
| | def encode_weak(self, labels): |
| | """ Encode a list of weak labels into a numpy array |
| | |
| | Args: |
| | labels: list, list of labels to encode (to a vector of 0 and 1) |
| | |
| | Returns: |
| | numpy.array |
| | A vector containing 1 for each label, and 0 everywhere else |
| | """ |
| | |
| | if type(labels) is str: |
| | if labels == "empty": |
| | y = np.zeros(len(self.labels)) - 1 |
| | return y |
| | else: |
| | labels = labels.split(",") |
| | if type(labels) is pd.DataFrame: |
| | if labels.empty: |
| | labels = [] |
| | elif "event_label" in labels.columns: |
| | labels = labels["event_label"] |
| | y = np.zeros(len(self.labels)) |
| | for label in labels: |
| | if not pd.isna(label): |
| | i = self.labels.index(label) |
| | y[i] = 1 |
| | return y |
| |
|
| | def _time_to_frame(self, time): |
| | samples = time * self.fs |
| | frame = (samples) / self.frame_hop |
| | return np.clip(frame / self.net_pooling, a_min=0, a_max=self.n_frames) |
| |
|
| | def _frame_to_time(self, frame): |
| | frame = frame * self.net_pooling / (self.fs / self.frame_hop) |
| | return np.clip(frame, a_min=0, a_max=self.audio_len) |
| |
|
| | def encode_strong_df(self, label_df): |
| | """Encode a list (or pandas Dataframe or Serie) of strong labels, they correspond to a given filename |
| | |
| | Args: |
| | label_df: pandas DataFrame or Series, contains filename, onset (in frames) and offset (in frames) |
| | If only filename (no onset offset) is specified, it will return the event on all the frames |
| | onset and offset should be in frames |
| | Returns: |
| | numpy.array |
| | Encoded labels, 1 where the label is present, 0 otherwise |
| | """ |
| |
|
| | assert any( |
| | [x is not None for x in [self.audio_len, self.frame_len, self.frame_hop]] |
| | ) |
| |
|
| | samples_len = self.n_frames |
| | if type(label_df) is str: |
| | if label_df == "empty": |
| | y = np.zeros((samples_len, len(self.labels))) - 1 |
| | return y |
| | y = np.zeros((samples_len, len(self.labels))) |
| | if type(label_df) is pd.DataFrame: |
| | if {"onset", "offset", "event_label"}.issubset(label_df.columns): |
| | for _, row in label_df.iterrows(): |
| | if not pd.isna(row["event_label"]): |
| | i = self.labels.index(row["event_label"]) |
| | onset = int(self._time_to_frame(row["onset"])) |
| | offset = int(np.ceil(self._time_to_frame(row["offset"]))) |
| | y[ |
| | onset:offset, i |
| | ] = 1 |
| |
|
| | elif type(label_df) in [ |
| | pd.Series, |
| | list, |
| | np.ndarray, |
| | ]: |
| | if type(label_df) is pd.Series: |
| | if {"onset", "offset", "event_label"}.issubset( |
| | label_df.index |
| | ): |
| | if not pd.isna(label_df["event_label"]): |
| | i = self.labels.index(label_df["event_label"]) |
| | onset = int(self._time_to_frame(label_df["onset"])) |
| | offset = int(np.ceil(self._time_to_frame(label_df["offset"]))) |
| | y[onset:offset, i] = 1 |
| | return y |
| |
|
| | for event_label in label_df: |
| | |
| | if type(event_label) is str: |
| | if event_label != "": |
| | i = self.labels.index(event_label) |
| | y[:, i] = 1 |
| |
|
| | |
| | elif len(event_label) == 3: |
| | if event_label[0] != "": |
| | i = self.labels.index(event_label[0]) |
| | onset = int(self._time_to_frame(event_label[1])) |
| | offset = int(np.ceil(self._time_to_frame(event_label[2]))) |
| | y[onset:offset, i] = 1 |
| |
|
| | else: |
| | raise NotImplementedError( |
| | "cannot encode strong, type mismatch: {}".format( |
| | type(event_label) |
| | ) |
| | ) |
| |
|
| | else: |
| | raise NotImplementedError( |
| | "To encode_strong, type is pandas.Dataframe with onset, offset and event_label" |
| | "columns, or it is a list or pandas Series of event labels, " |
| | "type given: {}".format(type(label_df)) |
| | ) |
| | return y |
| |
|
| | def decode_weak(self, labels): |
| | """ Decode the encoded weak labels |
| | Args: |
| | labels: numpy.array, the encoded labels to be decoded |
| | |
| | Returns: |
| | list |
| | Decoded labels, list of string |
| | |
| | """ |
| | result_labels = [] |
| | for i, value in enumerate(labels): |
| | if value == 1: |
| | result_labels.append(self.labels[i]) |
| | return result_labels |
| |
|
| | def decode_strong(self, labels): |
| | """ Decode the encoded strong labels |
| | Args: |
| | labels: numpy.array, the encoded labels to be decoded |
| | Returns: |
| | list |
| | Decoded labels, list of list: [[label, onset offset], ...] |
| | |
| | """ |
| | result_labels = [] |
| | for i, label_column in enumerate(labels.T): |
| | change_indices = DecisionEncoder().find_contiguous_regions(label_column) |
| |
|
| | |
| | for row in change_indices: |
| | result_labels.append( |
| | [ |
| | self.labels[i], |
| | self._frame_to_time(row[0]), |
| | self._frame_to_time(row[1]), |
| | ] |
| | ) |
| | return result_labels |
| |
|
| | def state_dict(self): |
| | return { |
| | "labels": self.labels, |
| | "audio_len": self.audio_len, |
| | "frame_len": self.frame_len, |
| | "frame_hop": self.frame_hop, |
| | "net_pooling": self.net_pooling, |
| | "fs": self.fs, |
| | } |
| |
|
| | @classmethod |
| | def load_state_dict(cls, state_dict): |
| | labels = state_dict["labels"] |
| | audio_len = state_dict["audio_len"] |
| | frame_len = state_dict["frame_len"] |
| | frame_hop = state_dict["frame_hop"] |
| | net_pooling = state_dict["net_pooling"] |
| | fs = state_dict["fs"] |
| | return cls(labels, audio_len, frame_len, frame_hop, net_pooling, fs) |
| |
|