Source code for s3prl.problem.common.hear_maestro

import json
from collections import defaultdict
from copy import deepcopy
from pathlib import Path

import pandas as pd
import torchaudio
from omegaconf import MISSING

from ._hear_util import resample_hear_corpus
from .hear_dcase_2016_task2 import HearDcase2016Task2

MAESTRO_NUM_FOLDS = 5

__all__ = ["HearMaestro"]


def prepare_maestro(
    target_dir: str,
    cache_dir: str,
    dataset_root: str,
    test_fold: int = 0,
    get_path_only: bool = False,
):
    target_dir: Path = Path(target_dir)
    train_csv = target_dir / "train.csv"
    valid_csv = target_dir / "valid.csv"
    test_csv = target_dir / "test.csv"
    if get_path_only:
        return train_csv, valid_csv, [test_csv]

    assert test_fold < MAESTRO_NUM_FOLDS, (
        f"MAESTRO only has {MAESTRO_NUM_FOLDS} folds but get 'test_fold' "
        f"arguments {test_fold}"
    )

    resample_hear_corpus(dataset_root, target_sr=16000)

    dataset_root = Path(dataset_root)
    wav_root = dataset_root / "16000"

    NUM_FOLD = 5
    test_id = test_fold
    valid_id = (test_fold + 1) % NUM_FOLD
    train_ids = [idx for idx in range(NUM_FOLD) if idx not in [test_id, valid_id]]

    fold_metas = []
    fold_dfs = []
    for fold_id in range(NUM_FOLD):
        with open(dataset_root / f"fold{fold_id:2d}.json".replace(" ", "0")) as f:
            metadata = json.load(f)
            fold_metas.append(metadata)

        data = defaultdict(list)
        for utt in metadata:
            wav_path = (
                wav_root / f"fold{fold_id:2d}".replace(" ", "0") / utt
            ).resolve()
            info = torchaudio.info(wav_path)
            baseinfo = {
                "record_id": utt,
                "wav_path": str(wav_path),
                "duration": info.num_frames / info.sample_rate,
            }
            for segment in metadata[utt]:
                fullinfo = deepcopy(baseinfo)
                fullinfo[
                    "utt_id"
                ] = f"{baseinfo['record_id']}-{int(segment['start'])}-{int(segment['end'])}"
                fullinfo["labels"] = segment["label"]
                fullinfo["start_sec"] = segment["start"] / 1000
                fullinfo["end_sec"] = segment["end"] / 1000

                for key, value in fullinfo.items():
                    data[key].append(value)

        fold_dfs.append(pd.DataFrame(data=data))

    test_meta, test_data = fold_metas[test_id], fold_dfs[test_id]
    valid_meta, valid_data = fold_metas[valid_id], fold_dfs[valid_id]
    train_meta, train_data = {}, []
    for idx in train_ids:
        train_meta.update(fold_metas[idx])
        train_data.append(fold_dfs[idx])
    train_data: pd.DataFrame = pd.concat(train_data)

    train_data.to_csv(train_csv, index=False)
    valid_data.to_csv(valid_csv, index=False)
    test_data.to_csv(test_csv, index=False)

    return train_csv, valid_csv, [test_csv]


[docs]class HearMaestro(HearDcase2016Task2):
[docs] def default_config(self) -> dict: return dict( start=0, stop=None, target_dir=MISSING, cache_dir=None, remove_all_cache=False, prepare_data=dict( dataset_root=MISSING, test_fold=MISSING, ), build_batch_sampler=dict( train=dict( batch_size=5, shuffle=True, ), valid=dict( item="record_id", ), test=dict( item="record_id", ), ), build_upstream=dict( name=MISSING, ), build_featurizer=dict( layer_selections=None, normalize=False, ), build_downstream=dict( hidden_layers=2, ), build_model=dict( upstream_trainable=False, ), build_task=dict( prediction_type="multilabel", scores=["event_onset_50ms_fms", "event_onset_offset_50ms_20perc_fms"], postprocessing_grid={ "median_filter_ms": [150], "min_duration": [50], }, ), build_optimizer=dict( name="Adam", conf=dict( lr=1.0e-3, ), ), build_scheduler=dict( name="ExponentialLR", gamma=0.9, ), save_model=dict(), save_task=dict(), train=dict( total_steps=15000, log_step=100, eval_step=500, save_step=500, gradient_clipping=1.0, gradient_accumulate=1, valid_metric="event_onset_50ms_fms", valid_higher_better=True, auto_resume=True, resume_ckpt_dir=None, ), evaluate=dict(), )
[docs] def prepare_data( self, prepare_data: dict, target_dir: str, cache_dir: str, get_path_only: bool = False, ): return prepare_maestro( **self._get_current_arguments(flatten_dict="prepare_data") )