diff --git a/test/assets/100Hz_44100Hz_16bit_05sec.wav b/test/assets/100Hz_44100Hz_16bit_05sec.wav deleted file mode 100644 index c297fded30..0000000000 Binary files a/test/assets/100Hz_44100Hz_16bit_05sec.wav and /dev/null differ diff --git a/test/assets/440Hz_44100Hz_16bit_05sec.wav b/test/assets/440Hz_44100Hz_16bit_05sec.wav deleted file mode 100644 index a746f73df9..0000000000 Binary files a/test/assets/440Hz_44100Hz_16bit_05sec.wav and /dev/null differ diff --git a/test/assets/whitenoise.mp3 b/test/assets/whitenoise.mp3 deleted file mode 100644 index d6fe9f44b9..0000000000 Binary files a/test/assets/whitenoise.mp3 and /dev/null differ diff --git a/test/assets/whitenoise.wav b/test/assets/whitenoise.wav deleted file mode 100644 index b95ce95776..0000000000 Binary files a/test/assets/whitenoise.wav and /dev/null differ diff --git a/test/common_utils/data_utils.py b/test/common_utils/data_utils.py index 321e24ce0a..de19d8174f 100644 --- a/test/common_utils/data_utils.py +++ b/test/common_utils/data_utils.py @@ -13,6 +13,28 @@ def get_asset_path(*paths): return os.path.join(_TEST_DIR_PATH, 'assets', *paths) +def convert_tensor_encoding( + tensor: torch.tensor, + dtype: torch.dtype, +): + """Convert input tensor with values between -1 and 1 to integer encoding + Args: + tensor: input tensor, assumed between -1 and 1 + dtype: desired output tensor dtype + Returns: + Tensor: shape of (n_channels, sample_rate * duration) + """ + if dtype == torch.int32: + tensor *= (tensor > 0) * 2147483647 + (tensor < 0) * 2147483648 + if dtype == torch.int16: + tensor *= (tensor > 0) * 32767 + (tensor < 0) * 32768 + if dtype == torch.uint8: + tensor *= (tensor > 0) * 127 + (tensor < 0) * 128 + tensor += 128 + tensor = tensor.to(dtype) + return tensor + + def get_whitenoise( *, sample_rate: int = 16000, @@ -43,25 +65,17 @@ def get_whitenoise( if dtype not in [torch.float32, torch.int32, torch.int16, torch.uint8]: raise NotImplementedError(f'dtype {dtype} is not supported.') # According to the doc, folking rng on all CUDA devices is slow when there are many CUDA devices, - # so we only folk on CPU, generate values and move the data to the given device + # so we only fork on CPU, generate values and move the data to the given device with torch.random.fork_rng([]): torch.random.manual_seed(seed) - tensor = torch.randn([sample_rate * duration], dtype=torch.float32, device='cpu') + tensor = torch.randn([int(sample_rate * duration)], dtype=torch.float32, device='cpu') tensor /= 2.0 tensor *= scale_factor tensor.clamp_(-1.0, 1.0) - if dtype == torch.int32: - tensor *= (tensor > 0) * 2147483647 + (tensor < 0) * 2147483648 - if dtype == torch.int16: - tensor *= (tensor > 0) * 32767 + (tensor < 0) * 32768 - if dtype == torch.uint8: - tensor *= (tensor > 0) * 127 + (tensor < 0) * 128 - tensor += 128 - tensor = tensor.to(dtype) tensor = tensor.repeat([n_channels, 1]) if not channels_first: tensor = tensor.t() - return tensor.to(device=device) + return convert_tensor_encoding(tensor, dtype) def get_sinusoid( @@ -91,8 +105,8 @@ def get_sinusoid( dtype = getattr(torch, dtype) pie2 = 2 * 3.141592653589793 end = pie2 * frequency * duration - theta = torch.linspace(0, end, sample_rate * duration, dtype=dtype, device=device) - sin = torch.sin(theta, out=None).repeat([n_channels, 1]) + theta = torch.linspace(0, end, int(sample_rate * duration), dtype=torch.float32, device=device) + tensor = torch.sin(theta, out=None).repeat([n_channels, 1]) if not channels_first: - sin = sin.t() - return sin + tensor = tensor.t() + return convert_tensor_encoding(tensor, dtype) diff --git a/test/functional_cpu_test.py b/test/functional_cpu_test.py index ab5fdaed95..7b531932e8 100644 --- a/test/functional_cpu_test.py +++ b/test/functional_cpu_test.py @@ -4,6 +4,7 @@ import torch import torchaudio import torchaudio.functional as F +from parameterized import parameterized import pytest from . import common_utils @@ -299,24 +300,18 @@ def test_linearity_of_istft4(self): class TestDetectPitchFrequency(common_utils.TorchaudioTestCase): - def test_pitch(self): - test_filepath_100 = common_utils.get_asset_path("100Hz_44100Hz_16bit_05sec.wav") - test_filepath_440 = common_utils.get_asset_path("440Hz_44100Hz_16bit_05sec.wav") - - # Files from https://www.mediacollege.com/audio/tone/download/ - tests = [ - (test_filepath_100, 100), - (test_filepath_440, 440), - ] - - for filename, freq_ref in tests: - waveform, sample_rate = common_utils.load_wav(filename) - - freq = torchaudio.functional.detect_pitch_frequency(waveform, sample_rate) - - threshold = 1 - s = ((freq - freq_ref).abs() > threshold).sum() - self.assertFalse(s) + @parameterized.expand([(100,), (440,)]) + def test_pitch(self, frequency): + sample_rate = 44100 + test_sine_waveform = common_utils.get_sinusoid( + frequency=frequency, sample_rate=sample_rate, duration=5, + ) + + freq = torchaudio.functional.detect_pitch_frequency(test_sine_waveform, sample_rate) + + threshold = 1 + s = ((freq - frequency).abs() > threshold).sum() + self.assertFalse(s) class TestDB_to_amplitude(common_utils.TorchaudioTestCase): diff --git a/test/test_batch_consistency.py b/test/test_batch_consistency.py index 2a88598f88..03352ad2b0 100644 --- a/test/test_batch_consistency.py +++ b/test/test_batch_consistency.py @@ -1,5 +1,7 @@ """Test numerical consistency among single input and batched input.""" import unittest +import itertools +from parameterized import parameterized import torch import torchaudio @@ -47,17 +49,15 @@ def test_griffinlim(self): F.griffinlim, tensor, window, n_fft, hop, ws, power, normalize, n_iter, momentum, length, 0, atol=5e-5 ) - def test_detect_pitch_frequency(self): - filenames = [ - 'steam-train-whistle-daniel_simon.wav', # 2ch 44100Hz - # Files from https://www.mediacollege.com/audio/tone/download/ - '100Hz_44100Hz_16bit_05sec.wav', # 1ch - '440Hz_44100Hz_16bit_05sec.wav', # 1ch - ] - for filename in filenames: - filepath = common_utils.get_asset_path(filename) - waveform, sample_rate = torchaudio.load(filepath) - self.assert_batch_consistencies(F.detect_pitch_frequency, waveform, sample_rate) + @parameterized.expand(list(itertools.product( + [100, 440], + [8000, 16000, 44100], + [1, 2], + )), name_func=lambda f, _, p: f'{f.__name__}_{"_".join(str(arg) for arg in p.args)}') + def test_detect_pitch_frequency(self, frequency, sample_rate, n_channels): + waveform = common_utils.get_sinusoid(frequency=frequency, sample_rate=sample_rate, + n_channels=n_channels, duration=5) + self.assert_batch_consistencies(F.detect_pitch_frequency, waveform, sample_rate) def test_istft(self): stft = torch.tensor([ @@ -80,8 +80,10 @@ def test_overdrive(self): self.assert_batch_consistencies(F.overdrive, waveform, gain=45, colour=30) def test_phaser(self): - filepath = common_utils.get_asset_path("whitenoise.wav") - waveform, sample_rate = torchaudio.load(filepath) + sample_rate = 44100 + waveform = common_utils.get_whitenoise( + sample_rate=sample_rate, duration=5, + ) self.assert_batch_consistencies(F.phaser, waveform, sample_rate) def test_flanger(self): diff --git a/test/test_compliance_kaldi.py b/test/test_compliance_kaldi.py index f9e097e09c..d58e5f9159 100644 --- a/test/test_compliance_kaldi.py +++ b/test/test_compliance_kaldi.py @@ -47,14 +47,25 @@ def first_sample_of_frame(frame, window_size, window_shift, snip_edges): @common_utils.skipIfNoSoxBackend -class Test_Kaldi(common_utils.TorchaudioTestCase): +class Test_Kaldi(common_utils.TempDirMixin, common_utils.TorchaudioTestCase): backend = 'sox' - test_filepath = common_utils.get_asset_path('kaldi_file.wav') - test_8000_filepath = common_utils.get_asset_path('kaldi_file_8000.wav') kaldi_output_dir = common_utils.get_asset_path('kaldi') + test_filepath = common_utils.get_asset_path('kaldi_file.wav') test_filepaths = {prefix: [] for prefix in compliance_utils.TEST_PREFIX} + def setUp(self): + super().setUp() + + # 1. test signal for testing resampling + self.test1_signal_sr = 16000 + self.test1_signal = common_utils.get_whitenoise( + sample_rate=self.test1_signal_sr, duration=0.5, + ) + + # 2. test audio file corresponding to saved kaldi ark files + self.test2_filepath = common_utils.get_asset_path('kaldi_file_8000.wav') + # separating test files by their types (e.g 'spec', 'fbank', etc.) for f in os.listdir(kaldi_output_dir): dash_idx = f.find('-') @@ -94,7 +105,6 @@ def test_get_strided(self): def _create_data_set(self): # used to generate the dataset to test on. this is not used in testing (offline procedure) - test_filepath = common_utils.get_asset_path('kaldi_file.wav') sr = 16000 x = torch.arange(0, 20).float() # between [-6,6] @@ -103,8 +113,8 @@ def _create_data_set(self): y = (y / 6 * (1 << 30)).long() # clear the last 16 bits because they aren't used anyways y = ((y >> 16) << 16).float() - torchaudio.save(test_filepath, y, sr) - sound, sample_rate = torchaudio.load(test_filepath, normalization=False) + torchaudio.save(self.test_filepath, y, sr) + sound, sample_rate = torchaudio.load(self.test_filepath, normalization=False) print(y >> 16) self.assertTrue(sample_rate == sr) torch.testing.assert_allclose(y, sound) @@ -123,7 +133,7 @@ def _print_diagnostic(self, output, expect_output): print('relative_mse:', relative_mse.item(), 'relative_max_error:', relative_max_error.item()) def _compliance_test_helper(self, sound_filepath, filepath_key, expected_num_files, - expected_num_args, get_output_fn, atol=1e-5, rtol=1e-8): + expected_num_args, get_output_fn, atol=1e-5, rtol=1e-7): """ Inputs: sound_filepath (str): The location of the sound file @@ -135,7 +145,7 @@ def _compliance_test_helper(self, sound_filepath, filepath_key, expected_num_fil atol (float): absolute tolerance rtol (float): relative tolerance """ - sound, sample_rate = torchaudio.load_wav(sound_filepath) + sound, sr = torchaudio.load_wav(sound_filepath) files = self.test_filepaths[filepath_key] assert len(files) == expected_num_files, ('number of kaldi %s file changed to %d' % (filepath_key, len(files))) @@ -170,22 +180,19 @@ def get_output_fn(sound, args): output = kaldi.resample_waveform(sound, args[1], args[2]) return output - self._compliance_test_helper(self.test_8000_filepath, 'resample', 32, 3, get_output_fn, atol=1e-2, rtol=1e-5) + self._compliance_test_helper(self.test2_filepath, 'resample', 32, 3, get_output_fn, atol=1e-2, rtol=1e-5) def test_resample_waveform_upsample_size(self): - sound, sample_rate = torchaudio.load_wav(self.test_8000_filepath) - upsample_sound = kaldi.resample_waveform(sound, sample_rate, sample_rate * 2) - self.assertTrue(upsample_sound.size(-1) == sound.size(-1) * 2) + upsample_sound = kaldi.resample_waveform(self.test1_signal, self.test1_signal_sr, self.test1_signal_sr * 2) + self.assertTrue(upsample_sound.size(-1) == self.test1_signal.size(-1) * 2) def test_resample_waveform_downsample_size(self): - sound, sample_rate = torchaudio.load_wav(self.test_8000_filepath) - downsample_sound = kaldi.resample_waveform(sound, sample_rate, sample_rate // 2) - self.assertTrue(downsample_sound.size(-1) == sound.size(-1) // 2) + downsample_sound = kaldi.resample_waveform(self.test1_signal, self.test1_signal_sr, self.test1_signal_sr // 2) + self.assertTrue(downsample_sound.size(-1) == self.test1_signal.size(-1) // 2) def test_resample_waveform_identity_size(self): - sound, sample_rate = torchaudio.load_wav(self.test_8000_filepath) - downsample_sound = kaldi.resample_waveform(sound, sample_rate, sample_rate) - self.assertTrue(downsample_sound.size(-1) == sound.size(-1)) + downsample_sound = kaldi.resample_waveform(self.test1_signal, self.test1_signal_sr, self.test1_signal_sr) + self.assertTrue(downsample_sound.size(-1) == self.test1_signal.size(-1)) def _test_resample_waveform_accuracy(self, up_scale_factor=None, down_scale_factor=None, atol=1e-1, rtol=1e-4): @@ -226,19 +233,19 @@ def test_resample_waveform_upsample_accuracy(self): def test_resample_waveform_multi_channel(self): num_channels = 3 - sound, sample_rate = torchaudio.load_wav(self.test_8000_filepath) # (1, 8000) - multi_sound = sound.repeat(num_channels, 1) # (num_channels, 8000) + multi_sound = self.test1_signal.repeat(num_channels, 1) # (num_channels, 8000 smp) for i in range(num_channels): multi_sound[i, :] *= (i + 1) * 1.5 - multi_sound_sampled = kaldi.resample_waveform(multi_sound, sample_rate, sample_rate // 2) + multi_sound_sampled = kaldi.resample_waveform(multi_sound, self.test1_signal_sr, self.test1_signal_sr // 2) # check that sampling is same whether using separately or in a tensor of size (c, n) for i in range(num_channels): - single_channel = sound * (i + 1) * 1.5 - single_channel_sampled = kaldi.resample_waveform(single_channel, sample_rate, sample_rate // 2) - torch.testing.assert_allclose(multi_sound_sampled[i, :], single_channel_sampled[0], rtol=1e-4, atol=1e-8) + single_channel = self.test1_signal * (i + 1) * 1.5 + single_channel_sampled = kaldi.resample_waveform(single_channel, self.test1_signal_sr, + self.test1_signal_sr // 2) + torch.testing.assert_allclose(multi_sound_sampled[i, :], single_channel_sampled[0], rtol=1e-4, atol=1e-7) if __name__ == '__main__': diff --git a/test/test_sox_effects.py b/test/test_sox_effects.py index 6b12d7cd06..e459cd7b01 100644 --- a/test/test_sox_effects.py +++ b/test/test_sox_effects.py @@ -45,8 +45,8 @@ def test_lowpass_speed(self): E.append_effect_to_chain("speed", speed) E.append_effect_to_chain("rate", si.rate) x, sr = E.sox_build_flow_effects() - # check if effects worked - self.assertEqual(x.size(1), int((si.length / si.channels) / speed)) + # check if effects worked, add small tolerance for rounding effects + self.assertEqual(x.size(1), int((si.length / si.channels) / speed), atol=1, rtol=1e-8) def test_ulaw_and_siginfo(self): si_out = torchaudio.sox_signalinfo_t()