Spaces:
Running
Running
| """Audio loading utils.""" | |
| import os | |
| import numpy as np | |
| import torch | |
| import torchaudio | |
| import decord | |
| import librosa | |
| import einops | |
| import PIL | |
| import matplotlib.pyplot as plt | |
| # Add serif font | |
| plt.rcParams['font.family'] = 'serif' | |
| from PIL import Image, ImageOps | |
| import librosa.display | |
| import shared.utils as su | |
| def read_info(path): | |
| """ | |
| Reads the info of the given audio file. | |
| Args: | |
| path (str): path to the audio file | |
| """ | |
| import ffmpeg | |
| probe = ffmpeg.probe(path) | |
| audio_info = next( | |
| (s for s in probe['streams'] if s['codec_type'] == 'audio'), | |
| None, | |
| ) | |
| video_info = next( | |
| (s for s in probe['streams'] if s['codec_type'] == 'video'), | |
| None, | |
| ) | |
| return dict(video=video_info, audio=audio_info) | |
| def load_audio_clips( | |
| audio_path, | |
| clips, | |
| sr, | |
| clip_len, | |
| backend='decord', | |
| load_entire=False, | |
| cut_to_clip_len=True, | |
| ): | |
| """ | |
| Loads audio clips from the given audio file. | |
| Args: | |
| audio_path (str): path to the audio file | |
| clips (np.ndarray): sized [T, 2], where T is the number of clips | |
| and each row is a pair of start and end times of the clip | |
| sr (int): sample rate | |
| clip_len (float): length of the audio clip in seconds | |
| backend (str): backend to use for loading audio clips | |
| load_entire (bool): whether to load the entire audio file | |
| cut_to_clip_len (bool): whether to cut the audio clip to clip_len | |
| """ | |
| if backend == 'torchaudio': | |
| audio_info = read_info(audio_path)["audio"] | |
| true_sr = int(audio_info["sample_rate"]) | |
| true_nf = audio_info["duration_ts"] | |
| audio_duration = true_nf / true_sr | |
| # metadata = torchaudio.info(audio_path) | |
| # true_sr = metadata.sample_rate | |
| # true_nf = metadata.num_frames | |
| elif backend == "decord": | |
| # duration = librosa.get_duration(filename=audio_path) | |
| ar = decord.AudioReader(audio_path, sample_rate=sr, mono=True) | |
| # Mono=False gives NaNs in inputs. | |
| # This (https://gist.github.com/nateraw/fcc2bdb9c8738224957c8617c3360445) might | |
| # be a related issue. Ignoring for now. Need to use torchaudio for now. | |
| true_nf = ar.shape[1] | |
| audio_duration = ar.shape[1] / sr | |
| else: | |
| raise ValueError(f"Unknown backend: {backend}") | |
| if load_entire: | |
| # Load the entire audio as a single clip and return | |
| if backend == 'torchaudio': | |
| y, _ = torchaudio.load(audio_path) | |
| if y.shape[0] > 1: | |
| # Convert to a single channel | |
| y = y.mean(dim=0, keepdim=True) | |
| resampler = torchaudio.transforms.Resample(true_sr, sr) | |
| y = resampler(y) | |
| audio = y | |
| elif backend == "decord": | |
| audio = ar.get_batch(np.arange(true_nf)).asnumpy() | |
| audio = torch.from_numpy(audio) | |
| return [audio] | |
| else: | |
| # Clip the clips to avoid going out of bounds | |
| clips = np.clip(clips, 0, audio_duration) | |
| audio_clips = [] | |
| for st, et in clips: | |
| if backend == 'torchaudio': | |
| # Load audio within the given time range | |
| sf = max(int(true_sr * st), 0) | |
| ef = min(int(true_sr * et), true_nf) | |
| nf = ef - sf | |
| y, _ = torchaudio.load(audio_path, frame_offset=sf, num_frames=nf) | |
| # Stereo to mono | |
| if y.shape[0] > 1: | |
| # Convert to a single channel | |
| y = y.mean(dim=0, keepdim=True) | |
| # Resample to the given sample rate | |
| resampler = torchaudio.transforms.Resample(true_sr, sr) | |
| y = resampler(y) | |
| audio = y | |
| elif backend == "decord": | |
| # Load audio within the given time range | |
| sf = max(int(st * sr), 0) | |
| ef = min(int(et * sr), true_nf) | |
| audio = ar.get_batch(np.arange(sf, ef)).asnumpy() | |
| audio = torch.from_numpy(audio) | |
| # No need to convert to mono since we are using mono=True | |
| # No need to resample since we are using sample_rate=sr | |
| else: | |
| raise ValueError(f"Unknown backend: {backend}") | |
| # Pad the clip to clip_len | |
| nf_reqd = int(clip_len * sr) | |
| nf_curr = audio.shape[1] | |
| npad_side = max(0, nf_reqd - nf_curr) | |
| if nf_curr < nf_reqd: | |
| audio = torch.nn.functional.pad(audio, (0, npad_side)) | |
| elif (nf_curr > nf_reqd) and cut_to_clip_len: | |
| audio = audio[:, :nf_reqd] | |
| audio_clips.append(audio) | |
| return audio_clips | |
| def show_audio_clips_waveform( | |
| audio_clips, clips, title=None, show=True, figsize=(10, 2), | |
| ): | |
| """ | |
| Visualizes the given audio clips. | |
| Args: | |
| audio_clips (list): list of audio clips | |
| sr (int): sample rate | |
| title (str): title of the plot | |
| show (bool): whether to show the clips | |
| figsize (tuple): figure size | |
| """ | |
| clip_centers = (clips[:, 0] + clips[:, 1]) / 2 | |
| clip_durations = clips[:, 1] - clips[:, 0] | |
| fig, ax = plt.subplots(1, len(audio_clips), figsize=figsize) | |
| if len(audio_clips) == 1: | |
| ax = [ax] | |
| for i, audio in enumerate(audio_clips): | |
| timestamps = np.linspace( | |
| clip_centers[i] - clip_durations[i], | |
| clip_centers[i] + clip_durations[i], | |
| audio.shape[-1], | |
| ) | |
| ax[i].plot(timestamps, audio.squeeze().numpy(), alpha=0.5) | |
| ax[i].set_title(f'$t=$ {clip_centers[i]:.2f}') | |
| ax[i].grid(alpha=0.4) | |
| plt.tight_layout() | |
| if show: | |
| plt.show() | |
| else: | |
| plt.savefig('audio_clips_waveform.png') | |
| # TODO: preprocess audio clips (e.g., wav-to-spectrogram, etc.) | |
| # Note that this is different from transforms applied as augmentation | |
| # during training. This is more like a preprocessing step that is applied | |
| # to the entire audio before sampling the clips. | |
| import torchaudio.functional as TAF | |
| import torchaudio.transforms as TAT | |
| def load_audio(path, sr=16000, **kwargs): | |
| y, true_sr = torchaudio.load(path, **kwargs) | |
| y = y.mean(dim=0, keepdim=True) | |
| resampler = torchaudio.transforms.Resample(true_sr, sr) | |
| y = resampler(y) | |
| return y, sr | |
| def load_audio_librosa(path, sr=16000, **kwargs): | |
| y, true_sr = librosa.load(path, sr=sr, **kwargs) | |
| y = torch.from_numpy(y).unsqueeze(0) | |
| return y, sr | |
| def librosa_harmonic_spectrogram_db( | |
| y, sr=16000, n_fft=512, hop_length=256, margin=16., n_mels=64, | |
| ): | |
| if isinstance(y, torch.Tensor): | |
| y = y.numpy() | |
| if len(y.shape) == 2: | |
| y = y.mean(axis=0) | |
| # center=True outputs 1 more frame than center=False | |
| # Currently, using just center=False | |
| D = librosa.stft(y, n_fft=n_fft, hop_length=hop_length, center=False) | |
| DH, DP = librosa.decompose.hpss(D, margin=margin) | |
| amplitude_h = np.sqrt(2) * np.abs(DH) | |
| if n_mels is None: | |
| # Usual dB spectrogram | |
| SH = librosa.amplitude_to_db(amplitude_h, ref=np.max) | |
| else: | |
| # Mel-scaled dB spectrogram | |
| S = librosa.amplitude_to_db(amplitude_h) | |
| SH = librosa.feature.melspectrogram(S=S, n_mels=n_mels, sr=sr) | |
| return SH | |
| def show_logmelspectrogram( | |
| S, | |
| sr, | |
| n_fft=512, | |
| hop_length=256, | |
| figsize=(10, 3), | |
| ax=None, | |
| show=True, | |
| title="LogMelSpectrogram", | |
| xlabel="Time (s)", | |
| ylabel="Mel bins (Hz)", | |
| return_as_image=False, | |
| ): | |
| if ax is None: | |
| fig, ax = plt.subplots(1, 1, figsize=figsize) | |
| librosa.display.specshow( | |
| S, | |
| sr=sr, | |
| hop_length=hop_length, | |
| n_fft=n_fft, | |
| y_axis='mel', | |
| x_axis='time', | |
| ax=ax, | |
| auto_aspect=True, | |
| ) | |
| ax.set_title(title) | |
| ax.set_xlabel(xlabel) | |
| ax.set_ylabel(ylabel) | |
| if return_as_image: | |
| fig.canvas.draw() | |
| image = PIL.Image.frombytes( | |
| 'RGB', fig.canvas.get_width_height(), fig.canvas.tostring_rgb(), | |
| ) | |
| plt.close(fig) | |
| return image | |
| if show: | |
| plt.show() | |
| def show_logspectrogram( | |
| S, sr, n_fft=512, hop_length=256, figsize=(10, 3), ax=None, show=True, | |
| ): | |
| if ax is None: | |
| fig, ax = plt.subplots(1, 1, figsize=figsize) | |
| librosa.display.specshow( | |
| S, | |
| sr=sr, | |
| hop_length=hop_length, | |
| n_fft=n_fft, | |
| y_axis='linear', | |
| x_axis='time', | |
| ax=ax, | |
| ) | |
| ax.set_title("LogSpectrogram") | |
| if show: | |
| plt.show() | |
| def audio_clips_wav_to_spec( | |
| audio_clips, n_fft=512, hop_length=256, margin=16., n_mels=None, | |
| ): | |
| """ | |
| Converts the given audio clips to spectrograms. | |
| Args: | |
| audio_clips (list): list of audio clips | |
| n_fft (int): number of FFT points | |
| hop_length (int): hop length | |
| margin (float): margin for harmonic-percussive source separation | |
| n_mels (int): number of mel bands (optional, if None, then dB spectrogram is returned) | |
| """ | |
| audio_specs = [] | |
| for audio in audio_clips: | |
| spec = librosa_harmonic_spectrogram_db( | |
| audio, | |
| n_fft=n_fft, | |
| hop_length=hop_length, | |
| margin=margin, | |
| n_mels=n_mels, | |
| ) | |
| spec = torch.from_numpy(spec).unsqueeze(0) | |
| audio_specs.append(spec) | |
| return audio_specs | |
| def show_audio_clips_spec( | |
| audio_specs, | |
| clips, | |
| sr, | |
| n_fft=512, | |
| hop_length=256, | |
| margin=16., | |
| cmap='magma', | |
| n_mels=None, | |
| show=True, | |
| ): | |
| """ | |
| Visualizes the given audio clips. | |
| Args: | |
| audio_specs (list): list of audio spectrograms | |
| clips (np.ndarray): sized [T, 2], where T is the number of clips | |
| and each row is a pair of start and end times of the clip | |
| show (bool): whether to show the clips | |
| """ | |
| clip_centers = (clips[:, 0] + clips[:, 1]) / 2 | |
| clip_durations = clips[:, 1] - clips[:, 0] | |
| fig, ax = plt.subplots(1, len(audio_specs), figsize=(10, 4)) | |
| if len(audio_specs) == 1: | |
| ax = [ax] | |
| for i, spec in enumerate(audio_specs): | |
| clip_start = clips[i][0] | |
| # ax[i].imshow(spec, aspect='auto', origin='lower') | |
| if isinstance(spec, torch.Tensor): | |
| spec = spec.numpy() | |
| if len(spec.shape) == 3: | |
| spec = spec[0] | |
| args = dict( | |
| data=spec, | |
| sr=sr, | |
| n_fft=n_fft, | |
| hop_length=hop_length, | |
| ax=ax[i], | |
| x_axis="time", | |
| cmap=cmap, | |
| ) | |
| if n_mels is None: | |
| args.update(dict(y_axis="linear")) | |
| else: | |
| args.update(dict(y_axis="mel")) | |
| librosa.display.specshow(**args) | |
| # Get xticks and replace them by xticks + clip_start | |
| xticks = ax[i].get_xticks() | |
| xticks = xticks + clip_start | |
| ax[i].set_xticklabels([f'{x:.1f}' for x in xticks]) | |
| ax[i].set_title(f'$t=$ {clip_centers[i]:.2f}') | |
| plt.tight_layout() | |
| if show: | |
| plt.show() | |
| else: | |
| plt.savefig('audio_clips_spec.png') | |
| def basic_pipeline_audio_clips( | |
| audio_clips, | |
| spec_args=None, | |
| audio_transform=None, | |
| stack=True, | |
| ): | |
| wave_transform = audio_transform.get('wave', None) | |
| spec_transform = audio_transform.get('spec', None) | |
| # Apply transforms to raw waveforms | |
| if wave_transform is not None: | |
| audio_clips = wave_transform(audio_clips) | |
| if spec_args is not None: | |
| # Convert waveforms to spectrograms | |
| audio_clips = audio_clips_wav_to_spec(audio_clips, **spec_args) | |
| # Apply transforms to spectrograms | |
| if spec_transform is not None: | |
| audio_clips = spec_transform(audio_clips) | |
| if stack: | |
| audio_clips = torch.stack(audio_clips) | |
| return audio_clips | |
| def load_and_process_audio( | |
| audio_path, | |
| clips, | |
| cut_to_clip_len=True, | |
| load_entire=False, | |
| audio_transform=None, | |
| aload_args=dict(), | |
| apipe_args=dict(), | |
| ): | |
| """Loads and preprocess audio.""" | |
| # [C1] Load video clips: List[torch.Tensor] | |
| audio_clips = load_audio_clips( | |
| audio_path=audio_path, | |
| clips=clips, | |
| load_entire=load_entire, | |
| cut_to_clip_len=cut_to_clip_len, | |
| **aload_args, | |
| ) | |
| # [C2] Pipeline: [Preprocessing -> Transform] | |
| audio_clips = basic_pipeline_audio_clips( | |
| audio_clips=audio_clips, | |
| audio_transform=audio_transform, | |
| **apipe_args, | |
| ) | |
| return audio_clips | |
| def crop_height(image, height): | |
| """Crops image from the top and bottom to the desired height.""" | |
| width, curr_height = image.size | |
| if curr_height < height: | |
| raise ValueError(f"Height of the image is less than {height}") | |
| top = (curr_height - height) // 2 | |
| bottom = top + height | |
| return image.crop((0, top, width, bottom)) | |
| def pad_to_height(image, height): | |
| """Pads image with black strips at the top and bottom.""" | |
| width, curr_height = image.size | |
| if curr_height > height: | |
| raise ValueError(f"Height of the image is already greater than {height}") | |
| top = (height - curr_height) // 2 | |
| bottom = height - curr_height - top | |
| return ImageOps.expand(image, (0, top, 0, bottom), fill="black") | |
| def crop_width(image, width): | |
| """Crops image from the left and right to the desired width.""" | |
| curr_width, height = image.size | |
| if curr_width < width: | |
| raise ValueError(f"Width of the image is less than {width}") | |
| left = (curr_width - width) // 2 | |
| right = left + width | |
| return image.crop((left, 0, right, height)) | |
| def crop_or_pad_height(image, height): | |
| """Crops or pads image to the desired height.""" | |
| width, curr_height = image.size | |
| if curr_height < height: | |
| return pad_to_height(image, height) | |
| elif curr_height > height: | |
| return crop_height(image, height) | |
| return image | |
| def crop_or_pad_width(image, width): | |
| """Crops or pads image to the desired width.""" | |
| curr_width, height = image.size | |
| if curr_width < width: | |
| return pad_to_width(image, width) | |
| elif curr_width > width: | |
| return crop_width(image, width) | |
| return image | |
| def pad_to_width(image, width): | |
| """Pads image with black strips at the left and right.""" | |
| curr_width, height = image.size | |
| if curr_width > width: | |
| raise ValueError(f"Width of the image is already greater than {width}") | |
| left = (width - curr_width) // 2 | |
| right = width - curr_width - left | |
| return ImageOps.expand(image, (left, 0, right, 0), fill="black") | |
| def crop_or_pad_to_size(image, size=(270, 480)): | |
| """Crops or pads image to the desired size.""" | |
| image = crop_or_pad_height(image, size[1]) | |
| image = crop_or_pad_width(image, size[0]) | |
| return image | |
| if __name__ == "__main__": | |
| import decord | |
| import sound_of_water.data.audio_transforms as at | |
| # Testing on a sample file | |
| file_path = "media_assets/ayNzH0uygFw_9.0_21.0.mp4" | |
| assert os.path.exists(file_path), f"File not found: {file_path}" | |
| # Define audio transforms | |
| cfg_transform = { | |
| "audio": { | |
| "wave": [ | |
| { | |
| "name": "AddNoise", | |
| "args": { | |
| "noise_level": 0.001 | |
| }, | |
| "augmentation": True, | |
| }, | |
| { | |
| "name": "ChangeVolume", | |
| "args": { | |
| "volume_factor": [0.8, 1.2] | |
| }, | |
| "augmentation": True, | |
| }, | |
| { | |
| "name": "Wav2Vec2WaveformProcessor", | |
| "args": { | |
| "model_name": "facebook/wav2vec2-base-960h", | |
| "sr": 16000 | |
| } | |
| } | |
| ], | |
| "spec": None, | |
| } | |
| } | |
| audio_transform = at.define_audio_transforms( | |
| cfg_transform, augment=False, | |
| ) | |
| # Define audio load arguments | |
| aload_args = { | |
| "sr": 16000, | |
| "clip_len": None, | |
| "backend": "decord", | |
| } | |
| # Define audio pipeline arguments | |
| apipe_args = { | |
| "spec_args": None, | |
| "stack": True, | |
| } | |
| # Run the pipeline (this is used to pass to the model) | |
| audio = load_and_process_audio( | |
| audio_path=file_path, | |
| clips=None, | |
| load_entire=True, | |
| cut_to_clip_len=False, | |
| audio_transform=audio_transform, | |
| aload_args=aload_args, | |
| apipe_args=apipe_args, | |
| )[0] | |
| # This will be used to visualise | |
| visualise_args = { | |
| "sr": 16000, | |
| "n_fft": 400, | |
| "hop_length": 320, | |
| "n_mels": 64, | |
| "margin": 16., | |
| "C": 340 * 100., | |
| "audio_output_fps": 49., | |
| } | |
| y = load_audio_clips( | |
| audio_path=file_path, | |
| clips=None, | |
| load_entire=True, | |
| cut_to_clip_len=False, | |
| **aload_args, | |
| )[0] | |
| S = librosa_harmonic_spectrogram_db( | |
| y, | |
| sr=visualise_args["sr"], | |
| n_fft=visualise_args["n_fft"], | |
| hop_length=visualise_args["hop_length"], | |
| n_mels=visualise_args['n_mels'], | |
| ) | |
| # Load video frame | |
| vr = decord.VideoReader(file_path, num_threads=1) | |
| frame = PIL.Image.fromarray(vr[0].asnumpy()) | |
| """ | |
| # Cut to desired width | |
| new_width, new_height = 270, 480 | |
| width, height = frame.size | |
| if width > new_width: | |
| # Crop the width | |
| left = (width - new_width) // 2 | |
| right = left + new_width | |
| frame = frame.crop((left, 0, right, height)) | |
| else: | |
| # Resize along width to have the desired width | |
| frame = su.visualize.resize_width(frame, new_width) | |
| assert frame.size[0] == new_width, \ | |
| f"Width mismatch: {frame.size[0]} != {new_width}" | |
| # Now pad/crop to desired height | |
| if height > new_height: | |
| # Crop the height | |
| top = (height - new_height) // 2 | |
| bottom = top + new_height | |
| frame = frame.crop((0, top, new_width, bottom)) | |
| else: | |
| # Pad the height | |
| frame = pad_to_height(frame, new_height) | |
| assert frame.size[1] == new_height, \ | |
| f"Height mismatch: {frame.size[1]} != {new_height}" | |
| """ | |
| frame = crop_or_pad_to_size(frame) | |
| # frame.save("1.png") | |
| # Visualise | |
| fig, axes = plt.subplots( | |
| 1, 2, figsize=(13, 4), width_ratios=[0.25, 0.75], | |
| ) | |
| ax = axes[0] | |
| ax.imshow(frame, aspect="auto") | |
| ax.set_title("Example frame") | |
| ax.set_xticks([]) | |
| ax.set_yticks([]) | |
| ax = axes[1] | |
| show_logmelspectrogram( | |
| S=S, | |
| ax=ax, | |
| show=False, | |
| sr=visualise_args["sr"], | |
| n_fft=visualise_args["n_fft"], | |
| hop_length=visualise_args["hop_length"], | |
| ) | |
| plt.savefig("./media_assets/audio_visualisation.png", bbox_inches="tight") | |
| plt.close() | |