import os
import math
import random
import multiprocessing
import numpy as np
import librosa
from .datasets import Dataset
[docs]def normalise_amplitude(y, amplitude_factor):
return 2**(amplitude_factor-1) * y / max(np.max(np.abs(y)), 1e-16)
[docs]def pitch_shift(y, shift_factor, sr):
bins_per_octave = 12
n_steps = int(shift_factor * bins_per_octave)
return librosa.effects.pitch_shift(y, sr, n_steps)
[docs]def time_stretch(y, stretch_factor):
return librosa.effects.time_stretch(y, 2**stretch_factor)
[docs]class Sampler:
def __init__(self):
self.duration = 2.
self.samplerate = 44100
self.amplitude_factor = (0, 0)
self.stretch_factor = (0, 0)
self.shift_factor = (0, 0)
@property
def duration(self):
return self._duration
@duration.setter
def duration(self, value):
self._duration = float(value)
@property
def samplerate(self):
return self._samplerate
@samplerate.setter
def samplerate(self, value):
self._samplerate = int(value)
@property
def amplitude_factor(self):
return self._amplitude_factor
@amplitude_factor.setter
def amplitude_factor(self, value):
try:
value = float(value)
value = (value, value)
except TypeError:
if not hasattr(value, '__len__') or not 0 < len(value) <= 2:
raise TypeError
if len(value) == 1:
value = next(iter(value))
value = (value, value)
self._amplitude_factor = value
@property
def stretch_factor(self):
return self._stretch_factor
@stretch_factor.setter
def stretch_factor(self, value):
try:
value = float(value)
value = (value, value)
except TypeError:
if not hasattr(value, '__len__') or not 0 < len(value) <= 2:
raise TypeError
if len(value) == 1:
value = next(iter(value))
value = (value, value)
self._stretch_factor = value
@property
def shift_factor(self):
return self._shift_factor
@shift_factor.setter
def shift_factor(self, value):
try:
value = float(value)
value = (value, value)
except TypeError:
if not hasattr(value, '__len__') or not 0 < len(value) <= 2:
raise TypeError
if len(value) == 1:
value = next(iter(value))
value = (value, value)
self._shift_factor = value
[docs] def dataset_size(self):
return 0
[docs] def sample(self, n_samples=1, n_jobs=1):
return [None] * n_samples
[docs] def generate(self, batch_size=1, n_batch_per_round=1, n_jobs=1):
while True:
rand_idx = np.arange(batch_size * n_batch_per_round)
np.random.shuffle(rand_idx)
samples = self.sample(
batch_size * n_batch_per_round, n_jobs)[rand_idx]
for i in range(n_batch):
yield samples[i*batch_size:(i+1)*batch_size]
[docs]class DatasetSampler(Sampler):
def __init__(self, dataset):
super().__init__()
self.dataset = dataset
[docs] def dataset_size(self):
return len(self.dataset)
[docs] def load(self, index, **kwargs):
offset = kwargs.get('offset', random.uniform(0, 1))
amplitude_factor = kwargs.get(
'amplitude_factor', random.uniform(*self.amplitude_factor))
shift_factor = kwargs.get(
'shift_factor', random.uniform(*self.shift_factor))
stretch_factor = kwargs.get(
'stretch_factor', random.uniform(*self.stretch_factor))
load_duration = self.duration * (2**stretch_factor)
y = self.dataset.load(
index, self.samplerate, offset=offset, duration=load_duration)
y = transform(
y, self.samplerate,
amplitude_factor=amplitude_factor,
shift_factor=shift_factor,
stretch_factor=stretch_factor
)
samples = librosa.time_to_samples(self.duration, sr=self.samplerate)
if y.size < samples:
y = np.hstack((y, np.zeros((samples - y.size,))))
elif y.size > samples:
y = y[:samples]
return y[None, :]
[docs] def sample(self, n_samples=1, n_jobs=1):
m = self.dataset_size()
# generate random index
idx = np.arange(m)
np.random.shuffle(idx)
idx = np.hstack((
np.arange(m).repeat(int(math.ceil(n_samples // m))).flatten(),
idx[:n_samples%m]
))
np.random.shuffle(idx)
# load
if n_jobs <= 0:
n_jobs = os.cpu_count()
if n_jobs > 1:
p = multiprocessing.Pool(n_jobs, maxtasksperchild=1)
samples = np.array(list(p.map(self.load, idx)))
p.close()
p.join()
else:
samples = np.array(list(map(self.load, idx)))
return samples
[docs]class AggregateSampler(Sampler):
def __init__(self, *samplers):
self.samplers = []
for s in samplers:
if isinstance(s, Dataset):
s = DatasetSampler(s)
if not isinstance(s, Sampler):
raise TypeError
self.samplers.append(s)
super().__init__()
@property
def duration(self):
durations = tuple(s.duration for s in self.samplers)
duration = durations[0]
if any(duration != d for d in durations):
raise ValueError
return duration
@duration.setter
def duration(self, value):
for s in self.samplers:
s.duration = value
@property
def samplerate(self):
samplerates = tuple(s.samplerate for s in self.samplers)
samplerate = samplerates[0]
if any(samplerate != s for s in samplerates):
raise ValueError
return samplerate
@samplerate.setter
def samplerate(self, value):
for s in self.samplers:
s.samplerate = value
@property
def amplitude_factor(self):
return None
@amplitude_factor.setter
def amplitude_factor(self, value):
pass
@property
def stretch_factor(self):
return None
@stretch_factor.setter
def stretch_factor(self, value):
pass
@property
def shift_factor(self):
return None
@shift_factor.setter
def shift_factor(self, value):
pass
def __getitem__(self, key):
return self.samplers[key]
[docs]class AsyncSampler(AggregateSampler):
[docs] def dataset_size(self):
return max(s.dataset_size() for s in self.samplers)
[docs] def sample(self, n_samples=1, n_jobs=1):
return np.hstack([s.sample(n_samples, n_jobs) for s in self.samplers])
[docs]class SyncSampler(AggregateSampler):
@property
def amplitude_factor(self):
i = iter(self.samplers)
min_factor, max_factor = next(i).amplitude_factor
for s in i:
f = s.amplitude_factor
if min_factor < f[0]:
min_factor = f[0]
if max_factor > f[1]:
max_factor = f[1]
if min_factor > max_factor:
raise RuntimeError('no overlap among sub-samplers')
return min_factor, max_factor
@amplitude_factor.setter
def amplitude_factor(self, value):
pass
@property
def stretch_factor(self):
i = iter(self.samplers)
min_factor, max_factor = next(i).stretch_factor
for s in i:
f = s.stretch_factor
if min_factor < f[0]:
min_factor = f[0]
if max_factor > f[1]:
max_factor = f[1]
if min_factor > max_factor:
raise RuntimeError('no overlap among sub-samplers')
return min_factor, max_factor
@stretch_factor.setter
def stretch_factor(self, value):
pass
@property
def shift_factor(self):
i = iter(self.samplers)
min_factor, max_factor = next(i).shift_factor
for s in i:
f = s.shift_factor
if min_factor < f[0]:
min_factor = f[0]
if max_factor > f[1]:
max_factor = f[1]
if min_factor > max_factor:
raise RuntimeError('no overlap among sub-samplers')
return min_factor, max_factor
@shift_factor.setter
def shift_factor(self, value):
pass
[docs] def load(self, index, **kwargs):
offset = kwargs.get('offset', random.uniform(0, 1))
amplitude_factor = kwargs.get(
'amplitude_factor', random.uniform(*self.amplitude_factor))
shift_factor = kwargs.get(
'shift_factor', random.uniform(*self.shift_factor))
stretch_factor = kwargs.get(
'stretch_factor', random.uniform(*self.stretch_factor))
ys = np.array([
s.load(
index, offset=offset,
amplitude_factor=amplitude_factor,
shift_factor=shift_factor,
stretch_factor=stretch_factor
)
for s in self.samplers
])
return ys
[docs] def dataset_size(self):
return min(s.dataset_size() for s in self.samplers)
[docs] def sample(self, n_samples=1, n_jobs=1):
m = self.dataset_size()
# generate random index
idx = np.arange(m)
np.random.shuffle(idx)
idx = np.hstack((
np.arange(m).repeat(int(math.ceil(n_samples // m))).flatten(),
idx[:n_samples%m]
))
np.random.shuffle(idx)
# load
if n_jobs <= 0:
n_jobs = os.cpu_count()
if n_jobs > 1:
p = multiprocessing.Pool(n_jobs, maxtasksperchild=1)
samples = np.hstack(list(p.map(self.load, idx)))
p.close()
p.join()
else:
samples = np.hstack(list(map(self.load, idx)))
return samples.transpose((1, 0, 2))