123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- import math
- from typing import Optional, Tuple
- from tinygrad import Tensor, dtypes
- import librosa
- import soundfile
- import numpy as np
- import parselmouth
- class PMF0Predictor: # from https://github.com/svc-develop-team/so-vits-svc/
- def __init__(self,hop_length=512,f0_min=50,f0_max=1100,sampling_rate=44100):
- self.hop_length, self.f0_min, self.f0_max, self.sampling_rate, self.name = hop_length, f0_min, f0_max, sampling_rate, "pm"
- def interpolate_f0(self,f0):
- vuv_vector = np.zeros_like(f0, dtype=np.float32)
- vuv_vector[f0 > 0.0] = 1.0
- vuv_vector[f0 <= 0.0] = 0.0
- nzindex = np.nonzero(f0)[0]
- data = f0[nzindex]
- nzindex = nzindex.astype(np.float32)
- time_org = self.hop_length / self.sampling_rate * nzindex
- time_frame = np.arange(f0.shape[0]) * self.hop_length / self.sampling_rate
- if data.shape[0] <= 0: return np.zeros(f0.shape[0], dtype=np.float32),vuv_vector
- if data.shape[0] == 1: return np.ones(f0.shape[0], dtype=np.float32) * f0[0],vuv_vector
- f0 = np.interp(time_frame, time_org, data, left=data[0], right=data[-1])
- return f0,vuv_vector
- def compute_f0(self,wav,p_len=None):
- x = wav
- if p_len is None: p_len = x.shape[0]//self.hop_length
- else: assert abs(p_len-x.shape[0]//self.hop_length) < 4, "pad length error"
- time_step = self.hop_length / self.sampling_rate * 1000
- f0 = parselmouth.Sound(x, self.sampling_rate) \
- .to_pitch_ac(time_step=time_step / 1000, voicing_threshold=0.6,pitch_floor=self.f0_min, pitch_ceiling=self.f0_max) \
- .selected_array['frequency']
- pad_size=(p_len - len(f0) + 1) // 2
- if(pad_size>0 or p_len - len(f0) - pad_size>0):
- f0 = np.pad(f0,[[pad_size,p_len - len(f0) - pad_size]], mode='constant')
- f0,uv = self.interpolate_f0(f0)
- return f0
- def compute_f0_uv(self,wav,p_len=None):
- x = wav
- if p_len is None: p_len = x.shape[0]//self.hop_length
- else: assert abs(p_len-x.shape[0]//self.hop_length) < 4, "pad length error"
- time_step = self.hop_length / self.sampling_rate * 1000
- f0 = parselmouth.Sound(x, self.sampling_rate).to_pitch_ac(
- time_step=time_step / 1000, voicing_threshold=0.6,
- pitch_floor=self.f0_min, pitch_ceiling=self.f0_max).selected_array['frequency']
- pad_size=(p_len - len(f0) + 1) // 2
- if(pad_size>0 or p_len - len(f0) - pad_size>0):
- f0 = np.pad(f0,[[pad_size,p_len - len(f0) - pad_size]], mode='constant')
- f0,uv = self.interpolate_f0(f0)
- return f0,uv
- class Slicer: # from https://github.com/svc-develop-team/so-vits-svc/
- def __init__(self, sr: int, threshold: float = -40., min_length: int = 5000, min_interval: int = 300, hop_size: int = 20, max_sil_kept: int = 5000):
- if not min_length >= min_interval >= hop_size:
- raise ValueError('The following condition must be satisfied: min_length >= min_interval >= hop_size')
- if not max_sil_kept >= hop_size:
- raise ValueError('The following condition must be satisfied: max_sil_kept >= hop_size')
- min_interval = sr * min_interval / 1000
- self.threshold = 10 ** (threshold / 20.)
- self.hop_size = round(sr * hop_size / 1000)
- self.win_size = min(round(min_interval), 4 * self.hop_size)
- self.min_length = round(sr * min_length / 1000 / self.hop_size)
- self.min_interval = round(min_interval / self.hop_size)
- self.max_sil_kept = round(sr * max_sil_kept / 1000 / self.hop_size)
- def _apply_slice(self, waveform, begin, end):
- if len(waveform.shape) > 1: return waveform[:, begin * self.hop_size: min(waveform.shape[1], end * self.hop_size)]
- else: return waveform[begin * self.hop_size: min(waveform.shape[0], end * self.hop_size)]
- def slice(self, waveform):
- samples = librosa.to_mono(waveform) if len(waveform.shape) > 1 else waveform
- if samples.shape[0] <= self.min_length: return {"0": {"slice": False, "split_time": f"0,{len(waveform)}"}}
- rms_list = librosa.feature.rms(y=samples, frame_length=self.win_size, hop_length=self.hop_size).squeeze(0)
- sil_tags, silence_start, clip_start = [], None, 0
- for i, rms in enumerate(rms_list):
- if rms < self.threshold: # Keep looping while frame is silent.
- if silence_start is None: # Record start of silent frames.
- silence_start = i
- continue
- if silence_start is None: continue # Keep looping while frame is not silent and silence start has not been recorded.
- # Clear recorded silence start if interval is not enough or clip is too short
- is_leading_silence = silence_start == 0 and i > self.max_sil_kept
- need_slice_middle = i - silence_start >= self.min_interval and i - clip_start >= self.min_length
- if not is_leading_silence and not need_slice_middle:
- silence_start = None
- continue
- if i - silence_start <= self.max_sil_kept: # Need slicing. Record the range of silent frames to be removed.
- pos = rms_list[silence_start: i + 1].argmin() + silence_start
- sil_tags.append((0, pos) if silence_start == 0 else (pos, pos))
- clip_start = pos
- elif i - silence_start <= self.max_sil_kept * 2:
- pos = rms_list[i - self.max_sil_kept: silence_start + self.max_sil_kept + 1].argmin()
- pos += i - self.max_sil_kept
- pos_l = rms_list[silence_start: silence_start + self.max_sil_kept + 1].argmin() + silence_start
- pos_r = rms_list[i - self.max_sil_kept: i + 1].argmin() + i - self.max_sil_kept
- if silence_start == 0:
- sil_tags.append((0, pos_r))
- clip_start = pos_r
- else:
- sil_tags.append((min(pos_l, pos), max(pos_r, pos)))
- clip_start = max(pos_r, pos)
- else:
- pos_l = rms_list[silence_start: silence_start + self.max_sil_kept + 1].argmin() + silence_start
- pos_r = rms_list[i - self.max_sil_kept: i + 1].argmin() + i - self.max_sil_kept
- sil_tags.append((0, pos_r) if silence_start == 0 else (pos_l, pos_r))
- clip_start = pos_r
- silence_start = None
- total_frames = rms_list.shape[0]
- if silence_start is not None and total_frames - silence_start >= self.min_interval: # Deal with trailing silence.
- silence_end = min(total_frames, silence_start + self.max_sil_kept)
- pos = rms_list[silence_start: silence_end + 1].argmin() + silence_start
- sil_tags.append((pos, total_frames + 1))
- if len(sil_tags) == 0: return {"0": {"slice": False, "split_time": f"0,{len(waveform)}"}} # Apply and return slices.
- chunks = []
- if sil_tags[0][0]:
- chunks.append({"slice": False, "split_time": f"0,{min(waveform.shape[0], sil_tags[0][0] * self.hop_size)}"})
- for i in range(0, len(sil_tags)):
- if i: chunks.append({"slice": False, "split_time": f"{sil_tags[i - 1][1] * self.hop_size},{min(waveform.shape[0], sil_tags[i][0] * self.hop_size)}"})
- chunks.append({"slice": True, "split_time": f"{sil_tags[i][0] * self.hop_size},{min(waveform.shape[0], sil_tags[i][1] * self.hop_size)}"})
- if sil_tags[-1][1] * self.hop_size < len(waveform):
- chunks.append({"slice": False, "split_time": f"{sil_tags[-1][1] * self.hop_size},{len(waveform)}"})
- chunk_dict = {}
- for i in range(len(chunks)): chunk_dict[str(i)] = chunks[i]
- return chunk_dict
- # sinc_interp_hann audio resampling
- class Resample:
- def __init__(self, orig_freq:int=16000, new_freq:int=16000, lowpass_filter_width:int=6, rolloff:float=0.99, beta:Optional[float]=None, dtype:Optional[dtypes]=None):
- self.orig_freq, self.new_freq, self.lowpass_filter_width, self.rolloff, self.beta = orig_freq, new_freq, lowpass_filter_width, rolloff, beta
- self.gcd = math.gcd(int(self.orig_freq), int(self.new_freq))
- self.kernel, self.width = self._get_sinc_resample_kernel(dtype) if self.orig_freq != self.new_freq else (None, None)
- def __call__(self, waveform:Tensor) -> Tensor:
- if self.orig_freq == self.new_freq: return waveform
- return self._apply_sinc_resample_kernel(waveform)
- def _apply_sinc_resample_kernel(self, waveform:Tensor):
- if not waveform.is_floating_point(): raise TypeError(f"Waveform tensor expected to be of type float, but received {waveform.dtype}.")
- orig_freq, new_freq = (int(self.orig_freq) // self.gcd), (int(self.new_freq) // self.gcd)
- shape = waveform.shape
- waveform = waveform.reshape(-1, shape[-1]) # pack batch
- num_wavs, length = waveform.shape
- target_length = int(math.ceil(new_freq * length / orig_freq))
- waveform = waveform.pad2d((self.width, self.width + orig_freq))
- resampled = waveform[:, None].conv2d(self.kernel, stride=orig_freq)
- resampled = resampled.transpose(1, 2).reshape(num_wavs, -1)
- resampled = resampled[..., :target_length]
- resampled = resampled.reshape(shape[:-1] + resampled.shape[-1:]) # unpack batch
- return resampled
- def _get_sinc_resample_kernel(self, dtype=None):
- orig_freq, new_freq = (int(self.orig_freq) // self.gcd), (int(self.new_freq) // self.gcd)
- if self.lowpass_filter_width <= 0: raise ValueError("Low pass filter width should be positive.")
- base_freq = min(orig_freq, new_freq)
- base_freq *= self.rolloff
- width = math.ceil(self.lowpass_filter_width * orig_freq / base_freq)
- idx = Tensor.arange(-width, width + orig_freq, dtype=(dtype if dtype is not None else dtypes.float32))[None, None] / orig_freq
- t = Tensor.arange(0, -new_freq, -1, dtype=dtype)[:, None, None] / new_freq + idx
- t *= base_freq
- t = t.clip(-self.lowpass_filter_width, self.lowpass_filter_width)
- window = (t * math.pi / self.lowpass_filter_width / 2).cos() ** 2
- t *= math.pi
- scale = base_freq / orig_freq
- kernels = Tensor.where(t == 0, Tensor(1.0, dtype=t.dtype).to(t.device), t.sin() / t)
- kernels *= window * scale
- if dtype is None: kernels = kernels.cast(dtype=dtypes.float32)
- return kernels, width
- def sinc_interp_resample(x:Tensor, orig_freq:int=16000, new_freq:int=1600, lowpass_filter_width:int=6, rolloff:float=0.99, beta:Optional[float]=None):
- resamp = Resample(orig_freq, new_freq, lowpass_filter_width, rolloff, beta, x.dtype)
- return resamp(x)
- def cut(audio_path, db_thresh=-30, min_len=5000):
- audio, sr = librosa.load(audio_path, sr=None)
- slicer = Slicer(sr=sr, threshold=db_thresh, min_length=min_len)
- chunks = slicer.slice(audio)
- return chunks
- def chunks2audio(audio_path, chunks):
- chunks = dict(chunks)
- audio, sr = load_audiofile(audio_path)
- if len(audio.shape) == 2 and audio.shape[1] >= 2:
- audio = audio.mean(0).unsqueeze(0)
- audio = audio.numpy()[0]
- result = []
- for k, v in chunks.items():
- tag = v["split_time"].split(",")
- if tag[0] != tag[1]:
- result.append((v["slice"], audio[int(tag[0]):int(tag[1])]))
- return result, sr
- def load_audiofile(filepath:str, frame_offset:int=0, num_frames:int=-1, channels_first:bool=True):
- with soundfile.SoundFile(filepath, "r") as file_:
- frames = file_._prepare_read(frame_offset, None, num_frames)
- waveform = file_.read(frames, "float32", always_2d=True)
- sample_rate = file_.samplerate
- waveform = Tensor(waveform)
- if channels_first: waveform = waveform.transpose(0, 1)
- return waveform, sample_rate
- def get_unit_f0(wav:Tensor, tran, hop_length, target_sample, f0_filter=False) -> Tuple[Tensor,Tensor,Tensor]:
- f0_predictor = PMF0Predictor(hop_length, sampling_rate=target_sample)
- f0, uv = f0_predictor.compute_f0_uv(wav.numpy())
- if f0_filter and sum(f0) == 0: raise RuntimeError("No voice detected")
- f0 = Tensor(f0.astype(np.float32)).float()
- f0 = (f0 * 2 ** (tran / 12)).unsqueeze(0)
- uv = Tensor(uv.astype(np.float32)).float().unsqueeze(0)
- wav16k = sinc_interp_resample(wav[None,:], target_sample, 16000)[0]
- return wav16k.realize(), f0.realize(), uv.realize()
|