Source code for s3prl.problem.asv.superb_asv

"""
The setting of Superb ASV

Authors
  * Po-Han Chi 2021
  * Leo 2021
  * Haibin Wu 2022
  * Leo 2022
"""

import logging
import pickle
from dataclasses import dataclass
from pathlib import Path

import pandas as pd
from omegaconf import MISSING
from torch.utils.data import Subset

from s3prl.dataio.corpus.voxceleb1sv import VoxCeleb1SV
from s3prl.dataio.dataset import EncodeCategory, LoadAudio, get_info
from s3prl.dataio.encoder.category import CategoryEncoder
from s3prl.dataio.sampler import FixedBatchSizeBatchSampler
from s3prl.nn.speaker_model import SuperbXvector

from .run import ASV

SAMPLE_RATE = 16000
EFFECTS = [
    ["channels", "1"],
    ["rate", "16000"],
    ["gain", "-3.0"],
    ["silence", "1", "0.1", "0.1%", "-1", "0.1", "0.1%"],
]

logger = logging.getLogger(__name__)

__all__ = [
    "prepare_voxceleb1_for_sv",
    "SuperbASV",
]


[docs]def prepare_voxceleb1_for_sv( target_dir: str, cache_dir: str, get_path_only: str, dataset_root: str, force_download: bool = False, ): """ Prepare VoxCeleb1 for speaker verification following :obj:`SuperbASV.prepare_data` format. Args: dataset_root (str): The root path of Fluent Speech Command force_download (bool): always re-download the metadata for VoxCeleb1 """ train_path = target_dir / "train.csv" test_trial_path = target_dir / "test_trial.csv" if get_path_only: return train_path, [test_trial_path] corpus = VoxCeleb1SV(dataset_root, cache_dir, force_download) train_data, valid_data, test_data, test_trials = corpus.all_data all_data = {**train_data, **valid_data} ids = sorted(all_data.keys()) wav_paths = [all_data[idx]["wav_path"] for idx in ids] labels = [all_data[idx]["label"] for idx in ids] pd.DataFrame({"id": ids, "wav_path": wav_paths, "spk": labels}).to_csv( train_path, index=False ) labels, id1s, id2s = zip(*test_trials) wav_path1 = [test_data[idx]["wav_path"] for idx in id1s] wav_path2 = [test_data[idx]["wav_path"] for idx in id2s] pd.DataFrame( { "id1": id1s, "id2": id2s, "wav_path1": wav_path1, "wav_path2": wav_path2, "label": labels, } ).to_csv(test_trial_path, index=False) return train_path, [test_trial_path]
[docs]class SuperbASV(ASV):
[docs] def default_config(self): return dict( target_dir=MISSING, cache_dir=None, test_ckpt_steps=None, # eval all saved checkpoints prepare_data=dict( dataset_root=MISSING, ), build_dataset=dict( train=dict( min_secs=2.0, max_secs=8.0, ), ), build_batch_sampler=dict( train=dict( batch_size=10, shuffle=True, ), test=dict( batch_size=1, ), ), build_upstream=dict( name=MISSING, ), build_featurizer=dict( layer_selections=None, normalize=False, ), build_model=dict( upstream_trainable=False, ), build_task=dict( loss_type="amsoftmax", loss_conf=dict( margin=0.4, scale=30, ), ), build_optimizer=dict( name="AdamW", conf=dict( lr=1.0e-4, ), ), build_scheduler=dict( name="ExponentialLR", gamma=0.9, ), train=dict( total_steps=200000, log_step=500, eval_step=1e20, save_step=10000, gradient_clipping=1.0e3, gradient_accumulate=5, valid_metric=None, valid_higher_better=None, auto_resume=True, resume_ckpt_dir=None, keep_num_ckpts=None, ), )
[docs] def prepare_data( self, prepare_data: dict, target_dir: str, cache_dir: str, get_path_only: bool ): """ Prepare the task-specific data metadata (path, labels...). By default call :obj:`prepare_voxceleb1_for_sv` with :code:`**prepare_data` Args: prepare_data (dict): same in :obj:`default_config`, support arguments in :obj:`prepare_voxceleb1_for_sv` target_dir (str): Parse your corpus and save the csv file into this directory cache_dir (str): If the parsing or preprocessing takes too long time, you can save the temporary files into this directory. This directory is expected to be shared across different training sessions (different hypers and :code:`target_dir`) get_path_only (bool): Directly return the filepaths no matter they exist or not. Returns: tuple 1. train_path (str) 2. test_trial_paths (List[str]) The :code:`train_path` should be a csv file containing the following columns: ==================== ==================== column description ==================== ==================== id (str) - the unique id for this utterance wav_path (str) - the absolute path of the waveform file spk (str) - a string speaker label ==================== ==================== Each :code:`test_trial_path` should be a csv file containing the following columns: ==================== ==================== column description ==================== ==================== id1 (str) - the unique id of the first utterance id2 (str) - the unique id of the second utterance wav_path1 (str) - the absolute path of the first utterance wav_path2 (str) - the absolute path of the second utterance label (int) - 0 when two utterances are from different speakers, \ 1 when same speaker ==================== ==================== """ return prepare_voxceleb1_for_sv( **self._get_current_arguments(flatten_dict="prepare_data") )
[docs] def build_encoder( self, build_encoder: dict, target_dir: str, cache_dir: str, train_csv: str, test_csvs: list, get_path_only: bool, ): """ Build the encoder (for the labels) given the data metadata, and return the saved encoder path. By default generate and save a :obj:`s3prl.dataio.encoder.CategoryEncoder` from the :code:`label` column of the train csv. Args: build_encoder (dict): same in :obj:`default_config`, no argument supported for now target_dir (str): Save your encoder into this directory cache_dir (str): If the preprocessing takes too long time, you can save the temporary files into this directory. This directory is expected to be shared across different training sessions (different hypers and :code:`target_dir`) train_csv_path (str): the train path from :obj:`prepare_data` valid_csv_path (str): the valid path from :obj:`prepare_data` test_csv_paths (List[str]): the test paths from :obj:`prepare_data` get_path_only (bool): Directly return the filepaths no matter they exist or not Returns: str encoder_path: The encoder should be saved in the pickle format """ encoder_path = Path(target_dir) / "spk2int.pkl" if get_path_only: return encoder_path csv = pd.read_csv(train_csv) all_spk = sorted(set(csv["spk"])) spk2int = CategoryEncoder(all_spk) with open(encoder_path, "wb") as f: pickle.dump(spk2int, f) return encoder_path
[docs] def build_dataset( self, build_dataset: dict, target_dir: str, cache_dir: str, mode: str, data_csv: str, encoder_path: str, ): """ Build the dataset for train/valid/test. Args: build_dataset (dict): same in :obj:`default_config`, have :code:`train` and :code:`test` keys, each is a dictionary, for :code:`train` dictionary: ==================== ==================== key description ==================== ==================== min_secs (float) - Drop a waveform if it is not longer than :code:`min_secs` max_secs (float) - If a waveform is longer than :code:`max_secs` seconds, \ randomly crop the waveform into :code:`max_secs` seconds. \ Default: None, no cropping ==================== ==================== for :code:`test` dictionary, no argument supported yet target_dir (str): Current experiment directory cache_dir (str): If the preprocessing takes too long time, you can save the temporary files into this directory. This directory is expected to be shared across different training sessions (different hypers and :code:`target_dir`) mode (str): train/valid/test data_csv (str): The metadata csv file for the specific :code:`mode` encoder_path (str): The pickled encoder path for encoding the labels Returns: torch Dataset For train mode, the dataset should return each item as a dictionary containing the following keys: ==================== ==================== key description ==================== ==================== x (torch.FloatTensor) - the waveform in (seq_len, 1) x_len (int) - the waveform length :code:`seq_len` class_id (str) - the label class id encoded by :code:`encoder_path` unique_name (str) - the unique id for this datapoint ==================== ==================== For test mode: ==================== ==================== key description ==================== ==================== x (torch.FloatTensor) - the waveform in (seq_len, 1) x_len (int) - the waveform length :code:`seq_len` unique_name (str) - the unique id for this datapoint """ assert mode in [ "train", "test", ], "Only support train & test mode (no validation)" if mode == "train": @dataclass class Config: min_secs: float = None max_secs: float = None conf = build_dataset.get("train", {}) conf = Config(**conf) csv = pd.read_csv(data_csv) wav_paths = csv["wav_path"].tolist() audio_loader = LoadAudio( wav_paths, sox_effects=EFFECTS, max_secs=conf.max_secs ) labels = csv["spk"].tolist() with open(encoder_path, "rb") as f: encoder = pickle.load(f) label_encoder = EncodeCategory(labels, encoder) ids = csv["id"].tolist() class SVTrainDataset: def __len__(self): return len(audio_loader) def __getitem__(self, index: int): audio = audio_loader[index] label = label_encoder[index] return { "x": audio["wav"], "x_len": audio["wav_len"], "class_id": label["class_id"], "unique_name": ids[index], } dataset = SVTrainDataset() if conf.min_secs is not None: x_lens, unique_names = get_info( dataset, ["x_len", "unique_name"], target_dir / "train_utt_len", ) indices = [] removed_indices = [] for idx, (x_len, unique_name) in enumerate(zip(x_lens, unique_names)): secs = x_len / SAMPLE_RATE if secs <= conf.min_secs: logger.info( f"Remove utt {unique_name} since too short after sox effects: {secs} secs" ) removed_indices.append(idx) else: indices.append(idx) if len(removed_indices) > 0: logger.info(f"Remove in total {len(removed_indices)} utts") dataset = Subset(dataset, indices) elif mode == "test": csv = pd.read_csv(data_csv) ids = pd.concat([csv["id1"], csv["id2"]], ignore_index=True).tolist() wav_paths = pd.concat( [csv["wav_path1"], csv["wav_path2"]], ignore_index=True ).tolist() data_list = sorted(set([(idx, path) for idx, path in zip(ids, wav_paths)])) ids, wav_paths = zip(*data_list) audio_loader = LoadAudio(wav_paths) class SVTestDataset: def __len__(self): return len(audio_loader) def __getitem__(self, index: int): audio = audio_loader[index] return { "x": audio["wav"], "x_len": audio["wav_len"], "unique_name": ids[index], } dataset = SVTestDataset() return dataset
[docs] def build_batch_sampler( self, build_batch_sampler: dict, target_dir: str, cache_dir: str, mode: str, data_csv: str, dataset, ): """ Return the batch sampler for torch DataLoader. Args: build_batch_sampler (dict): same in :obj:`default_config` ==================== ==================== key description ==================== ==================== train (dict) - arguments for :obj:`FixedBatchSizeBatchSampler` test (dict) - arguments for :obj:`FixedBatchSizeBatchSampler` ==================== ==================== Note that ASV does not support valid target_dir (str): Current experiment directory cache_dir (str): If the preprocessing takes too long time, save the temporary files into this directory. This directory is expected to be shared across different training sessions (different hypers and :code:`target_dir`) mode (str): train/valid/test data_csv (str): the :code:`mode` specific csv from :obj:`prepare_data` dataset: the dataset from :obj:`build_dataset` Returns: batch sampler for torch DataLoader """ train = build_batch_sampler.get("train", {}) test = build_batch_sampler.get("test", {}) if mode == "train": return FixedBatchSizeBatchSampler(dataset, **train) elif mode == "test": return FixedBatchSizeBatchSampler(dataset, **test) else: raise ValueError("ASV only supports train/test modes")
[docs] def build_downstream( self, build_downstream: dict, downstream_input_size: int, downstream_output_size: int, downstream_input_stride: int, ): """ Return the task-specific downstream model. By default build the :obj:`SuperbXvector` model Args: build_downstream (dict): same in :obj:`default_config`, support arguments of :obj:`SuperbXvector` downstream_input_size (int): the required input size of the model downstream_output_size (int): the required output size of the model downstream_input_stride (int): the input feature's stride (from 16 KHz) Returns: :obj:`s3prl.nn.interface.AbsUtteranceModel` """ model = SuperbXvector(downstream_input_size, **build_downstream) return model