preprocess.py 11 KB


  1. import math
  2. from typing import Optional, Tuple
  3. from tinygrad import Tensor, dtypes
  4. import librosa
  5. import soundfile
  6. import numpy as np
  7. import parselmouth
  8. class PMF0Predictor: # from https://github.com/svc-develop-team/so-vits-svc/
  9. def __init__(self,hop_length=512,f0_min=50,f0_max=1100,sampling_rate=44100):
  10. self.hop_length, self.f0_min, self.f0_max, self.sampling_rate, self.name = hop_length, f0_min, f0_max, sampling_rate, "pm"
  11. def interpolate_f0(self,f0):
  12. vuv_vector = np.zeros_like(f0, dtype=np.float32)
  13. vuv_vector[f0 > 0.0] = 1.0
  14. vuv_vector[f0 <= 0.0] = 0.0
  15. nzindex = np.nonzero(f0)[0]
  16. data = f0[nzindex]
  17. nzindex = nzindex.astype(np.float32)
  18. time_org = self.hop_length / self.sampling_rate * nzindex
  19. time_frame = np.arange(f0.shape[0]) * self.hop_length / self.sampling_rate
  20. if data.shape[0] <= 0: return np.zeros(f0.shape[0], dtype=np.float32),vuv_vector
  21. if data.shape[0] == 1: return np.ones(f0.shape[0], dtype=np.float32) * f0[0],vuv_vector
  22. f0 = np.interp(time_frame, time_org, data, left=data[0], right=data[-1])
  23. return f0,vuv_vector
  24. def compute_f0(self,wav,p_len=None):
  25. x = wav
  26. if p_len is None: p_len = x.shape[0]//self.hop_length
  27. else: assert abs(p_len-x.shape[0]//self.hop_length) < 4, "pad length error"
  28. time_step = self.hop_length / self.sampling_rate * 1000
  29. f0 = parselmouth.Sound(x, self.sampling_rate) \
  30. .to_pitch_ac(time_step=time_step / 1000, voicing_threshold=0.6,pitch_floor=self.f0_min, pitch_ceiling=self.f0_max) \
  31. .selected_array['frequency']
  32. pad_size=(p_len - len(f0) + 1) // 2
  33. if(pad_size>0 or p_len - len(f0) - pad_size>0):
  34. f0 = np.pad(f0,[[pad_size,p_len - len(f0) - pad_size]], mode='constant')
  35. f0,uv = self.interpolate_f0(f0)
  36. return f0
  37. def compute_f0_uv(self,wav,p_len=None):
  38. x = wav
  39. if p_len is None: p_len = x.shape[0]//self.hop_length
  40. else: assert abs(p_len-x.shape[0]//self.hop_length) < 4, "pad length error"
  41. time_step = self.hop_length / self.sampling_rate * 1000
  42. f0 = parselmouth.Sound(x, self.sampling_rate).to_pitch_ac(
  43. time_step=time_step / 1000, voicing_threshold=0.6,
  44. pitch_floor=self.f0_min, pitch_ceiling=self.f0_max).selected_array['frequency']
  45. pad_size=(p_len - len(f0) + 1) // 2
  46. if(pad_size>0 or p_len - len(f0) - pad_size>0):
  47. f0 = np.pad(f0,[[pad_size,p_len - len(f0) - pad_size]], mode='constant')
  48. f0,uv = self.interpolate_f0(f0)
  49. return f0,uv
  50. class Slicer: # from https://github.com/svc-develop-team/so-vits-svc/
  51. def __init__(self, sr: int, threshold: float = -40., min_length: int = 5000, min_interval: int = 300, hop_size: int = 20, max_sil_kept: int = 5000):
  52. if not min_length >= min_interval >= hop_size:
  53. raise ValueError('The following condition must be satisfied: min_length >= min_interval >= hop_size')
  54. if not max_sil_kept >= hop_size:
  55. raise ValueError('The following condition must be satisfied: max_sil_kept >= hop_size')
  56. min_interval = sr * min_interval / 1000
  57. self.threshold = 10 ** (threshold / 20.)
  58. self.hop_size = round(sr * hop_size / 1000)
  59. self.win_size = min(round(min_interval), 4 * self.hop_size)
  60. self.min_length = round(sr * min_length / 1000 / self.hop_size)
  61. self.min_interval = round(min_interval / self.hop_size)
  62. self.max_sil_kept = round(sr * max_sil_kept / 1000 / self.hop_size)
  63. def _apply_slice(self, waveform, begin, end):
  64. if len(waveform.shape) > 1: return waveform[:, begin * self.hop_size: min(waveform.shape[1], end * self.hop_size)]
  65. else: return waveform[begin * self.hop_size: min(waveform.shape[0], end * self.hop_size)]
  66. def slice(self, waveform):
  67. samples = librosa.to_mono(waveform) if len(waveform.shape) > 1 else waveform
  68. if samples.shape[0] <= self.min_length: return {"0": {"slice": False, "split_time": f"0,{len(waveform)}"}}
  69. rms_list = librosa.feature.rms(y=samples, frame_length=self.win_size, hop_length=self.hop_size).squeeze(0)
  70. sil_tags, silence_start, clip_start = [], None, 0
  71. for i, rms in enumerate(rms_list):
  72. if rms < self.threshold: # Keep looping while frame is silent.
  73. if silence_start is None: # Record start of silent frames.
  74. silence_start = i
  75. continue
  76. if silence_start is None: continue # Keep looping while frame is not silent and silence start has not been recorded.
  77. # Clear recorded silence start if interval is not enough or clip is too short
  78. is_leading_silence = silence_start == 0 and i > self.max_sil_kept
  79. need_slice_middle = i - silence_start >= self.min_interval and i - clip_start >= self.min_length
  80. if not is_leading_silence and not need_slice_middle:
  81. silence_start = None
  82. continue
  83. if i - silence_start <= self.max_sil_kept: # Need slicing. Record the range of silent frames to be removed.
  84. pos = rms_list[silence_start: i + 1].argmin() + silence_start
  85. sil_tags.append((0, pos) if silence_start == 0 else (pos, pos))
  86. clip_start = pos
  87. elif i - silence_start <= self.max_sil_kept * 2:
  88. pos = rms_list[i - self.max_sil_kept: silence_start + self.max_sil_kept + 1].argmin()
  89. pos += i - self.max_sil_kept
  90. pos_l = rms_list[silence_start: silence_start + self.max_sil_kept + 1].argmin() + silence_start
  91. pos_r = rms_list[i - self.max_sil_kept: i + 1].argmin() + i - self.max_sil_kept
  92. if silence_start == 0:
  93. sil_tags.append((0, pos_r))
  94. clip_start = pos_r
  95. else:
  96. sil_tags.append((min(pos_l, pos), max(pos_r, pos)))
  97. clip_start = max(pos_r, pos)
  98. else:
  99. pos_l = rms_list[silence_start: silence_start + self.max_sil_kept + 1].argmin() + silence_start
  100. pos_r = rms_list[i - self.max_sil_kept: i + 1].argmin() + i - self.max_sil_kept
  101. sil_tags.append((0, pos_r) if silence_start == 0 else (pos_l, pos_r))
  102. clip_start = pos_r
  103. silence_start = None
  104. total_frames = rms_list.shape[0]
  105. if silence_start is not None and total_frames - silence_start >= self.min_interval: # Deal with trailing silence.
  106. silence_end = min(total_frames, silence_start + self.max_sil_kept)
  107. pos = rms_list[silence_start: silence_end + 1].argmin() + silence_start
  108. sil_tags.append((pos, total_frames + 1))
  109. if len(sil_tags) == 0: return {"0": {"slice": False, "split_time": f"0,{len(waveform)}"}} # Apply and return slices.
  110. chunks = []
  111. if sil_tags[0][0]:
  112. chunks.append({"slice": False, "split_time": f"0,{min(waveform.shape[0], sil_tags[0][0] * self.hop_size)}"})
  113. for i in range(0, len(sil_tags)):
  114. if i: chunks.append({"slice": False, "split_time": f"{sil_tags[i - 1][1] * self.hop_size},{min(waveform.shape[0], sil_tags[i][0] * self.hop_size)}"})
  115. chunks.append({"slice": True, "split_time": f"{sil_tags[i][0] * self.hop_size},{min(waveform.shape[0], sil_tags[i][1] * self.hop_size)}"})
  116. if sil_tags[-1][1] * self.hop_size < len(waveform):
  117. chunks.append({"slice": False, "split_time": f"{sil_tags[-1][1] * self.hop_size},{len(waveform)}"})
  118. chunk_dict = {}
  119. for i in range(len(chunks)): chunk_dict[str(i)] = chunks[i]
  120. return chunk_dict
  121. # sinc_interp_hann audio resampling
  122. class Resample:
  123. def __init__(self, orig_freq:int=16000, new_freq:int=16000, lowpass_filter_width:int=6, rolloff:float=0.99, beta:Optional[float]=None, dtype:Optional[dtypes]=None):
  124. self.orig_freq, self.new_freq, self.lowpass_filter_width, self.rolloff, self.beta = orig_freq, new_freq, lowpass_filter_width, rolloff, beta
  125. self.gcd = math.gcd(int(self.orig_freq), int(self.new_freq))
  126. self.kernel, self.width = self._get_sinc_resample_kernel(dtype) if self.orig_freq != self.new_freq else (None, None)
  127. def __call__(self, waveform:Tensor) -> Tensor:
  128. if self.orig_freq == self.new_freq: return waveform
  129. return self._apply_sinc_resample_kernel(waveform)
  130. def _apply_sinc_resample_kernel(self, waveform:Tensor):
  131. if not waveform.is_floating_point(): raise TypeError(f"Waveform tensor expected to be of type float, but received {waveform.dtype}.")
  132. orig_freq, new_freq = (int(self.orig_freq) // self.gcd), (int(self.new_freq) // self.gcd)
  133. shape = waveform.shape
  134. waveform = waveform.reshape(-1, shape[-1]) # pack batch
  135. num_wavs, length = waveform.shape
  136. target_length = int(math.ceil(new_freq * length / orig_freq))
  137. waveform = waveform.pad2d((self.width, self.width + orig_freq))
  138. resampled = waveform[:, None].conv2d(self.kernel, stride=orig_freq)
  139. resampled = resampled.transpose(1, 2).reshape(num_wavs, -1)
  140. resampled = resampled[..., :target_length]
  141. resampled = resampled.reshape(shape[:-1] + resampled.shape[-1:]) # unpack batch
  142. return resampled
  143. def _get_sinc_resample_kernel(self, dtype=None):
  144. orig_freq, new_freq = (int(self.orig_freq) // self.gcd), (int(self.new_freq) // self.gcd)
  145. if self.lowpass_filter_width <= 0: raise ValueError("Low pass filter width should be positive.")
  146. base_freq = min(orig_freq, new_freq)
  147. base_freq *= self.rolloff
  148. width = math.ceil(self.lowpass_filter_width * orig_freq / base_freq)
  149. idx = Tensor.arange(-width, width + orig_freq, dtype=(dtype if dtype is not None else dtypes.float32))[None, None] / orig_freq
  150. t = Tensor.arange(0, -new_freq, -1, dtype=dtype)[:, None, None] / new_freq + idx
  151. t *= base_freq
  152. t = t.clip(-self.lowpass_filter_width, self.lowpass_filter_width)
  153. window = (t * math.pi / self.lowpass_filter_width / 2).cos() ** 2
  154. t *= math.pi
  155. scale = base_freq / orig_freq
  156. kernels = Tensor.where(t == 0, Tensor(1.0, dtype=t.dtype).to(t.device), t.sin() / t)
  157. kernels *= window * scale
  158. if dtype is None: kernels = kernels.cast(dtype=dtypes.float32)
  159. return kernels, width
  160. def sinc_interp_resample(x:Tensor, orig_freq:int=16000, new_freq:int=1600, lowpass_filter_width:int=6, rolloff:float=0.99, beta:Optional[float]=None):
  161. resamp = Resample(orig_freq, new_freq, lowpass_filter_width, rolloff, beta, x.dtype)
  162. return resamp(x)
  163. def cut(audio_path, db_thresh=-30, min_len=5000):
  164. audio, sr = librosa.load(audio_path, sr=None)
  165. slicer = Slicer(sr=sr, threshold=db_thresh, min_length=min_len)
  166. chunks = slicer.slice(audio)
  167. return chunks
  168. def chunks2audio(audio_path, chunks):
  169. chunks = dict(chunks)
  170. audio, sr = load_audiofile(audio_path)
  171. if len(audio.shape) == 2 and audio.shape[1] >= 2:
  172. audio = audio.mean(0).unsqueeze(0)
  173. audio = audio.numpy()[0]
  174. result = []
  175. for k, v in chunks.items():
  176. tag = v["split_time"].split(",")
  177. if tag[0] != tag[1]:
  178. result.append((v["slice"], audio[int(tag[0]):int(tag[1])]))
  179. return result, sr
  180. def load_audiofile(filepath:str, frame_offset:int=0, num_frames:int=-1, channels_first:bool=True):
  181. with soundfile.SoundFile(filepath, "r") as file_:
  182. frames = file_._prepare_read(frame_offset, None, num_frames)
  183. waveform = file_.read(frames, "float32", always_2d=True)
  184. sample_rate = file_.samplerate
  185. waveform = Tensor(waveform)
  186. if channels_first: waveform = waveform.transpose(0, 1)
  187. return waveform, sample_rate
  188. def get_unit_f0(wav:Tensor, tran, hop_length, target_sample, f0_filter=False) -> Tuple[Tensor,Tensor,Tensor]:
  189. f0_predictor = PMF0Predictor(hop_length, sampling_rate=target_sample)
  190. f0, uv = f0_predictor.compute_f0_uv(wav.numpy())
  191. if f0_filter and sum(f0) == 0: raise RuntimeError("No voice detected")
  192. f0 = Tensor(f0.astype(np.float32)).float()
  193. f0 = (f0 * 2 ** (tran / 12)).unsqueeze(0)
  194. uv = Tensor(uv.astype(np.float32)).float().unsqueeze(0)
  195. wav16k = sinc_interp_resample(wav[None,:], target_sample, 16000)[0]
  196. return wav16k.realize(), f0.realize(), uv.realize()