Skip to content

Commit

Permalink
Fixed combined backing instrumental normalization, added anti-duplica…
Browse files Browse the repository at this point in the history
…te effort existing file checks
  • Loading branch information
beveradb committed Dec 8, 2024
1 parent 2c1e9f3 commit 4151650
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 88 deletions.
293 changes: 206 additions & 87 deletions karaoke_prep/karaoke_prep.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
from PIL import Image, ImageDraw, ImageFont
from lyrics_transcriber import LyricsTranscriber
from karaoke_lyrics_processor import KaraokeLyricsProcessor
import json
import subprocess
from pydub import AudioSegment
import numpy as np


class KaraokePrep:
Expand Down Expand Up @@ -290,6 +294,10 @@ def convert_to_wav(self, input_filename, output_filename_no_extension):
return output_filename

def write_lyrics_from_genius(self, artist, title, filename):
if self._file_exists(filename):
with open(filename, "r") as f:
return f.read().split("\n")

genius = lyricsgenius.Genius(access_token=os.environ["GENIUS_API_TOKEN"], verbose=False, remove_section_headers=True)
song = genius.search_song(title, artist)
if song:
Expand Down Expand Up @@ -430,6 +438,9 @@ def transcribe_lyrics(self, input_audio_wav, track_output_dir):
self.logger.info(f"MidiCo LRC output file: {transcriber_outputs['midico_lrc_filepath']}")

def write_processed_lyrics(self, lyrics_file, processed_lyrics_file):
if self._file_exists(processed_lyrics_file):
return processed_lyrics_file

self.logger.info(f"Processing lyrics from {lyrics_file} and writing to {processed_lyrics_file}")

if not self.dry_run:
Expand Down Expand Up @@ -582,6 +593,9 @@ def create_video(
extra_text_padding=600,
fixed_gap=150,
):
if self._file_exists(output_video_filepath):
return output_video_filepath

resolution = (3840, 2160) # 4K resolution
self.logger.info(f"Creating video with format: {format}")
self.logger.info(f"extra_text: {extra_text}, artist_text: {artist_text}, title_text: {title_text}")
Expand Down Expand Up @@ -669,11 +683,40 @@ def create_video(
self.logger.debug(f"Running command: {ffmpeg_command}")
os.system(ffmpeg_command)

return output_video_filepath

def create_title_video(self, artist, title, format, output_image_filepath_noext, output_video_filepath):
if self._file_exists(output_video_filepath):
return output_video_filepath

title_text = title.upper()
artist_text = artist.upper()
return self.create_video(
extra_text=None,
title_text=title_text,
artist_text=artist_text,
format=format,
output_image_filepath_noext=output_image_filepath_noext,
output_video_filepath=output_video_filepath,
existing_image=self.existing_title_image,
title_color=format["title_color"],
artist_color=format["artist_color"],
duration=self.title_video_duration,
initial_font_size=self.title_initial_font_size,
top_padding=self.title_top_padding,
title_padding=self.title_title_padding,
artist_padding=self.title_artist_padding,
fixed_gap=self.title_fixed_gap,
)

def create_end_video(self, artist, title, format, output_image_filepath_noext, output_video_filepath):
if self._file_exists(output_video_filepath):
return output_video_filepath

extra_text = self.end_extra_text
title_text = title.upper()
artist_text = artist.upper()
self.create_video(
return self.create_video(
extra_text=extra_text,
title_text=title_text,
artist_text=artist_text,
Expand All @@ -693,27 +736,6 @@ def create_end_video(self, artist, title, format, output_image_filepath_noext, o
fixed_gap=self.end_fixed_gap,
)

def create_title_video(self, artist, title, format, output_image_filepath_noext, output_video_filepath):
title_text = title.upper()
artist_text = artist.upper()
self.create_video(
extra_text=None,
title_text=title_text,
artist_text=artist_text,
format=format,
output_image_filepath_noext=output_image_filepath_noext,
output_video_filepath=output_video_filepath,
existing_image=self.existing_title_image,
title_color=format["title_color"],
artist_color=format["artist_color"],
duration=self.title_video_duration,
initial_font_size=self.title_initial_font_size,
top_padding=self.title_top_padding,
title_padding=self.title_title_padding,
artist_padding=self.title_artist_padding,
fixed_gap=self.title_fixed_gap,
)

def hex_to_rgb(self, hex_color):
"""Convert hex color to RGB tuple."""
hex_color = hex_color.lstrip("#")
Expand All @@ -731,87 +753,187 @@ def process_audio_separation(self, audio_file, artist_title, track_output_dir):
output_format=self.lossless_output_format,
)

# Create a "stems" subfolder
stems_dir = self._create_stems_directory(track_output_dir)
result = {"clean_instrumental": {}, "other_stems": {}, "backing_vocals": {}, "combined_instrumentals": {}}

result["clean_instrumental"] = self._separate_clean_instrumental(separator, audio_file, artist_title, track_output_dir, stems_dir)
result["other_stems"] = self._separate_other_stems(separator, audio_file, artist_title, stems_dir)
result["backing_vocals"] = self._separate_backing_vocals(separator, result["clean_instrumental"]["vocals"], artist_title, stems_dir)
result["combined_instrumentals"] = self._generate_combined_instrumentals(
result["clean_instrumental"]["instrumental"], result["backing_vocals"], artist_title, track_output_dir
)
self._normalize_audio_files(result, artist_title, track_output_dir)

self.logger.info("Audio separation, combination, and normalization process completed")
return result

def _create_stems_directory(self, track_output_dir):
stems_dir = os.path.join(track_output_dir, "stems")
os.makedirs(stems_dir, exist_ok=True)
self.logger.info(f"Created stems directory: {stems_dir}")
return stems_dir

result = {"clean_instrumental": {}, "other_stems": {}, "backing_vocals": {}}

# Step 1: Separate using clean_instrumental_model
self.logger.info(f"Step 1: Separating using clean instrumental model: {self.clean_instrumental_model}")
def _separate_clean_instrumental(self, separator, audio_file, artist_title, track_output_dir, stems_dir):
self.logger.info(f"Separating using clean instrumental model: {self.clean_instrumental_model}")
instrumental_path = os.path.join(
track_output_dir, f"{artist_title} (Instrumental {self.clean_instrumental_model}).{self.lossless_output_format}"
)
vocals_path = os.path.join(stems_dir, f"{artist_title} (Vocals {self.clean_instrumental_model}).{self.lossless_output_format}")

separator.load_model(model_filename=self.clean_instrumental_model)
clean_output_files = separator.separate(audio_file)
result = {}
if not self._file_exists(instrumental_path) or not self._file_exists(vocals_path):
separator.load_model(model_filename=self.clean_instrumental_model)
clean_output_files = separator.separate(audio_file)

for file in clean_output_files:
if "(Vocals)" in file and not self._file_exists(vocals_path):
os.rename(file, vocals_path)
result["vocals"] = vocals_path
elif "(Instrumental)" in file and not self._file_exists(instrumental_path):
os.rename(file, instrumental_path)
result["instrumental"] = instrumental_path
else:
result["vocals"] = vocals_path
result["instrumental"] = instrumental_path

for file in clean_output_files:
if "(Vocals)" in file:
os.rename(file, vocals_path)
result["clean_instrumental"]["vocals"] = vocals_path
elif "(Instrumental)" in file:
os.rename(file, instrumental_path)
result["clean_instrumental"]["instrumental"] = instrumental_path
return result

# Step 2: Separate using other_stems_models
self.logger.info(f"Step 2: Separating using other stems models: {self.other_stems_models}")
def _separate_other_stems(self, separator, audio_file, artist_title, stems_dir):
self.logger.info(f"Separating using other stems models: {self.other_stems_models}")
result = {}
for model in self.other_stems_models:
self.logger.info(f"Processing with model: {model}")
separator.load_model(model_filename=model)
other_stems_output = separator.separate(audio_file)

result["other_stems"][model] = {}
for file in other_stems_output:
file_name = os.path.basename(file)
stem_name = file_name[file_name.rfind("_(") + 2 : file_name.rfind(")_")]
new_filename = f"{artist_title} ({stem_name} {model}).{self.lossless_output_format}"
other_stem_path = os.path.join(stems_dir, new_filename)
os.rename(file, other_stem_path)
result["other_stems"][model][stem_name] = other_stem_path
result[model] = {}

# Step 3: Separate clean vocals using backing_vocals_models
self.logger.info(f"Step 3: Separating clean vocals using backing vocals models: {self.backing_vocals_models}")
for model in self.backing_vocals_models:
self.logger.info(f"Processing with model: {model}")
separator.load_model(model_filename=model)
backing_vocals_output = separator.separate(vocals_path)

result["backing_vocals"][model] = {}
for file in backing_vocals_output:
if "(Vocals)" in file:
lead_vocals_path = os.path.join(stems_dir, f"{artist_title} (Lead Vocals {model}).{self.lossless_output_format}")
os.rename(file, lead_vocals_path)
result["backing_vocals"][model]["lead_vocals"] = lead_vocals_path
elif "(Instrumental)" in file:
backing_vocals_path = os.path.join(stems_dir, f"{artist_title} (Backing Vocals {model}).{self.lossless_output_format}")
os.rename(file, backing_vocals_path)
result["backing_vocals"][model]["backing_vocals"] = backing_vocals_path

# Step 4: Generate combined instrumental tracks with backing vocals
self.logger.info("Step 4: Generating combined instrumental tracks with backing vocals")
result["combined_instrumentals"] = {}
# Check if any stem files for this model already exist
existing_stems = glob.glob(os.path.join(stems_dir, f"{artist_title} (*{model}).{self.lossless_output_format}"))

if existing_stems:
self.logger.info(f"Found existing stem files for model {model}, skipping separation")
for stem_file in existing_stems:
stem_name = os.path.basename(stem_file).split("(")[1].split(")")[0].strip()
result[model][stem_name] = stem_file
else:
separator.load_model(model_filename=model)
other_stems_output = separator.separate(audio_file)

for file in other_stems_output:
file_name = os.path.basename(file)
stem_name = file_name[file_name.rfind("_(") + 2 : file_name.rfind(")_")]
new_filename = f"{artist_title} ({stem_name} {model}).{self.lossless_output_format}"
other_stem_path = os.path.join(stems_dir, new_filename)
if not self._file_exists(other_stem_path):
os.rename(file, other_stem_path)
result[model][stem_name] = other_stem_path

return result

def _separate_backing_vocals(self, separator, vocals_path, artist_title, stems_dir):
self.logger.info(f"Separating clean vocals using backing vocals models: {self.backing_vocals_models}")
result = {}
for model in self.backing_vocals_models:
backing_vocals_path = result["backing_vocals"][model]["backing_vocals"]
combined_path = os.path.join(track_output_dir, f"{artist_title} (Instrumental with Backing {model}).{self.lossless_output_format}")
self.logger.info(f"Processing with model: {model}")
result[model] = {}
lead_vocals_path = os.path.join(stems_dir, f"{artist_title} (Lead Vocals {model}).{self.lossless_output_format}")
backing_vocals_path = os.path.join(stems_dir, f"{artist_title} (Backing Vocals {model}).{self.lossless_output_format}")

if not self._file_exists(lead_vocals_path) or not self._file_exists(backing_vocals_path):
separator.load_model(model_filename=model)
backing_vocals_output = separator.separate(vocals_path)

for file in backing_vocals_output:
if "(Vocals)" in file and not self._file_exists(lead_vocals_path):
os.rename(file, lead_vocals_path)
result[model]["lead_vocals"] = lead_vocals_path
elif "(Instrumental)" in file and not self._file_exists(backing_vocals_path):
os.rename(file, backing_vocals_path)
result[model]["backing_vocals"] = backing_vocals_path
else:
result[model]["lead_vocals"] = lead_vocals_path
result[model]["backing_vocals"] = backing_vocals_path
return result

ffmpeg_command = (
f'{self.ffmpeg_base_command} -i "{instrumental_path}" -i "{backing_vocals_path}" '
f'-filter_complex "[0:a][1:a]amix=inputs=2:duration=longest:weights=1 1" '
f'-c:a {self.lossless_output_format.lower()} "{combined_path}"'
def _generate_combined_instrumentals(self, instrumental_path, backing_vocals_result, artist_title, track_output_dir):
self.logger.info("Generating combined instrumental tracks with backing vocals")
result = {}
for model, paths in backing_vocals_result.items():
backing_vocals_path = paths["backing_vocals"]
combined_path = os.path.join(
track_output_dir, f"{artist_title} (Instrumental with Backing {model}).{self.lossless_output_format}"
)

self.logger.debug(f"Running command: {ffmpeg_command}")
os.system(ffmpeg_command)
if not self._file_exists(combined_path):
ffmpeg_command = (
f'{self.ffmpeg_base_command} -i "{instrumental_path}" -i "{backing_vocals_path}" '
f'-filter_complex "[0:a][1:a]amix=inputs=2:duration=longest:weights=1 1" '
f'-c:a {self.lossless_output_format.lower()} "{combined_path}"'
)

result["combined_instrumentals"][model] = combined_path
self.logger.debug(f"Running command: {ffmpeg_command}")
os.system(ffmpeg_command)

self.logger.info("Audio separation and combination process completed")
result[model] = combined_path
return result

def _normalize_audio_files(self, separation_result, artist_title, track_output_dir):
self.logger.info("Normalizing clean instrumental and combined instrumentals")

files_to_normalize = [
("clean_instrumental", separation_result["clean_instrumental"]["instrumental"]),
] + [("combined_instrumentals", path) for path in separation_result["combined_instrumentals"].values()]

for key, file_path in files_to_normalize:
if self._file_exists(file_path):
try:
self._normalize_audio(file_path, file_path) # Normalize in-place

# Verify the normalized file
if os.path.getsize(file_path) > 0:
self.logger.info(f"Successfully normalized: {file_path}")
else:
raise Exception("Normalized file is empty")

except Exception as e:
self.logger.error(f"Error during normalization of {file_path}: {e}")
self.logger.warning(f"Normalization failed for {file_path}. Original file remains unchanged.")
else:
self.logger.warning(f"File not found for normalization: {file_path}")

self.logger.info("Audio normalization process completed")

def _normalize_audio(self, input_path, output_path, target_level=0.0):
self.logger.info(f"Normalizing audio file: {input_path}")

# Load audio file
audio = AudioSegment.from_file(input_path, format=self.lossless_output_format.lower())

# Calculate the peak amplitude
peak_amplitude = float(audio.max_dBFS)

# Calculate the necessary gain
gain_db = target_level - peak_amplitude

# Apply gain
normalized_audio = audio.apply_gain(gain_db)

# Ensure the audio is not completely silent
if normalized_audio.rms == 0:
self.logger.warning(f"Normalized audio is silent for {input_path}. Using original audio.")
normalized_audio = audio

# Export normalized audio, overwriting the original file
normalized_audio.export(output_path, format=self.lossless_output_format.lower())

self.logger.info(f"Normalized audio saved, replacing: {output_path}")
self.logger.debug(f"Original peak: {peak_amplitude} dB, Applied gain: {gain_db} dB")

def _file_exists(self, file_path):
"""Check if a file exists and log the result."""
exists = os.path.isfile(file_path)
if exists:
self.logger.info(f"File already exists, skipping creation: {file_path}")
return exists

def prep_single_track(self):
self.logger.info(f"Preparing single track: {self.artist} - {self.title}")

Expand Down Expand Up @@ -920,9 +1042,7 @@ def prep_single_track(self):
processed_track["title_image_jpg"] = f"{output_image_filepath_noext}.jpg"
processed_track["title_video"] = os.path.join(track_output_dir, f"{artist_title} (Title).mov")

if os.path.exists(processed_track["title_video"]):
self.logger.debug(f"Title video already exists, skipping render: {processed_track['title_video']}")
else:
if not self._file_exists(processed_track["title_video"]):
self.logger.info(f"Creating title video...")
self.create_title_video(self.artist, self.title, self.title_format, output_image_filepath_noext, processed_track["title_video"])

Expand All @@ -931,9 +1051,7 @@ def prep_single_track(self):
processed_track["end_image_jpg"] = f"{output_image_filepath_noext}.jpg"
processed_track["end_video"] = os.path.join(track_output_dir, f"{artist_title} (End).mov")

if os.path.exists(processed_track["end_video"]):
self.logger.debug(f"End screen video already exists, skipping render: {processed_track['end_video']}")
else:
if not self._file_exists(processed_track["end_video"]):
self.logger.info(f"Creating end screen video...")
self.create_end_video(self.artist, self.title, self.end_format, output_image_filepath_noext, processed_track["end_video"])

Expand All @@ -943,7 +1061,8 @@ def prep_single_track(self):

instrumental_path = os.path.join(track_output_dir, f"{artist_title} (Instrumental Custom){existing_instrumental_extension}")

shutil.copy2(self.existing_instrumental, instrumental_path)
if not self._file_exists(instrumental_path):
shutil.copy2(self.existing_instrumental, instrumental_path)

processed_track["separated_audio"]["Custom"] = {
"instrumental": instrumental_path,
Expand Down
Loading

0 comments on commit 4151650

Please sign in to comment.