Skip to content

Commit

Permalink
Made actually usable syllable count based corrector, passed through u…
Browse files Browse the repository at this point in the history
…nformatted reference words through to GapSequence objects to use in correctors
  • Loading branch information
beveradb committed Jan 20, 2025
1 parent 6e116b1 commit 14337e4
Show file tree
Hide file tree
Showing 12 changed files with 557 additions and 269 deletions.
134 changes: 55 additions & 79 deletions lyrics_transcriber/core/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,68 +184,53 @@ def process(self) -> LyricsControllerResult:
Raises:
Exception: If a critical error occurs during processing.
"""
try:
# Step 1: Fetch lyrics if artist and title are provided
if self.artist and self.title:
self.fetch_lyrics()
# Step 1: Fetch lyrics if artist and title are provided
if self.artist and self.title:
self.fetch_lyrics()

# Step 2: Run transcription
self.transcribe()
# Step 2: Run transcription
self.transcribe()

# Step 3: Process and correct lyrics
self.correct_lyrics()
# Step 3: Process and correct lyrics
self.correct_lyrics()

# Step 4: Generate outputs
self.generate_outputs()
# Step 4: Generate outputs
self.generate_outputs()

self.logger.info("Processing completed successfully")
return self.results

except Exception as e:
self.logger.error(f"Error during processing: {str(e)}")
raise
self.logger.info("Processing completed successfully")
return self.results

def fetch_lyrics(self) -> None:
"""Fetch lyrics from available providers."""
self.logger.info(f"Fetching lyrics for {self.artist} - {self.title}")

try:
for name, provider in self.lyrics_providers.items():
try:
result = provider.fetch_lyrics(self.artist, self.title)
if result:
self.results.lyrics_results.append(result)
self.logger.info(f"Successfully fetched lyrics from {name}")

except Exception as e:
self.logger.error(f"Failed to fetch lyrics from {name}: {str(e)}")
continue
for name, provider in self.lyrics_providers.items():
try:
result = provider.fetch_lyrics(self.artist, self.title)
if result:
self.results.lyrics_results.append(result)
self.logger.info(f"Successfully fetched lyrics from {name}")

if not self.results.lyrics_results:
self.logger.warning("No lyrics found from any source")
except Exception as e:
self.logger.error(f"Failed to fetch lyrics from {name}: {str(e)}")
continue

except Exception as e:
self.logger.error(f"Failed to fetch lyrics: {str(e)}")
# Don't raise - we can continue without lyrics
if not self.results.lyrics_results:
self.logger.warning("No lyrics found from any source")

def transcribe(self) -> None:
"""Run transcription using all available transcribers."""
self.logger.info(f"Starting transcription with providers: {list(self.transcribers.keys())}")

for name, transcriber_info in self.transcribers.items():
self.logger.info(f"Running transcription with {name}")
try:
result = transcriber_info["instance"].transcribe(self.audio_filepath)
if result:
# Add the transcriber name and priority to the result
self.results.transcription_results.append(
TranscriptionResult(name=name, priority=transcriber_info["priority"], result=result)
)
self.logger.debug(f"Transcription completed for {name}")

except Exception as e:
self.logger.error(f"Transcription failed for {name}: {str(e)}", exc_info=True)
continue
result = transcriber_info["instance"].transcribe(self.audio_filepath)
if result:
# Add the transcriber name and priority to the result
self.results.transcription_results.append(
TranscriptionResult(name=name, priority=transcriber_info["priority"], result=result)
)
self.logger.debug(f"Transcription completed for {name}")

if not self.results.transcription_results:
self.logger.warning("No successful transcriptions from any provider")
Expand All @@ -254,44 +239,35 @@ def correct_lyrics(self) -> None:
"""Run lyrics correction using transcription and internet lyrics."""
self.logger.info("Starting lyrics correction process")

try:
# Run correction
corrected_data = self.corrector.run(
transcription_results=self.results.transcription_results, lyrics_results=self.results.lyrics_results
)

# Store corrected results
self.results.transcription_corrected = corrected_data
self.logger.info("Lyrics correction completed")
# Run correction
corrected_data = self.corrector.run(
transcription_results=self.results.transcription_results, lyrics_results=self.results.lyrics_results
)

except Exception as e:
self.logger.error(f"Failed to correct lyrics: {str(e)}", exc_info=True)
# Store corrected results
self.results.transcription_corrected = corrected_data
self.logger.info("Lyrics correction completed")

def generate_outputs(self) -> None:
"""Generate output files."""
self.logger.info("Generating output files")

try:
output_files = self.output_generator.generate_outputs(
transcription_corrected=self.results.transcription_corrected,
lyrics_results=self.results.lyrics_results,
output_prefix=self.output_prefix,
audio_filepath=self.audio_filepath,
artist=self.artist,
title=self.title,
)

# Store all output paths in results
self.results.lrc_filepath = output_files.lrc
self.results.ass_filepath = output_files.ass
self.results.video_filepath = output_files.video
self.results.original_txt = output_files.original_txt
self.results.corrected_txt = output_files.corrected_txt
self.results.corrections_json = output_files.corrections_json
self.results.cdg_filepath = output_files.cdg
self.results.mp3_filepath = output_files.mp3
self.results.cdg_zip_filepath = output_files.cdg_zip

except Exception as e:
self.logger.error(f"Failed to generate outputs: {str(e)}")
raise
output_files = self.output_generator.generate_outputs(
transcription_corrected=self.results.transcription_corrected,
lyrics_results=self.results.lyrics_results,
output_prefix=self.output_prefix,
audio_filepath=self.audio_filepath,
artist=self.artist,
title=self.title,
)

# Store all output paths in results
self.results.lrc_filepath = output_files.lrc
self.results.ass_filepath = output_files.ass
self.results.video_filepath = output_files.video
self.results.original_txt = output_files.original_txt
self.results.corrected_txt = output_files.corrected_txt
self.results.corrections_json = output_files.corrections_json
self.results.cdg_filepath = output_files.cdg
self.results.mp3_filepath = output_files.mp3
self.results.cdg_zip_filepath = output_files.cdg_zip
161 changes: 77 additions & 84 deletions lyrics_transcriber/correction/anchor_sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,132 +347,125 @@ def _get_reference_words(self, source: str, ref_words: List[str], start_pos: Opt
end_pos = len(ref_words)
return ref_words[start_pos:end_pos]

def _create_initial_gap(
self, words: List[str], first_anchor: Optional[ScoredAnchor], ref_texts_clean: Dict[str, List[str]]
) -> Optional[GapSequence]:
"""Create gap sequence before the first anchor.
def find_gaps(self, transcribed: str, anchors: List[ScoredAnchor], references: Dict[str, str]) -> List[GapSequence]:
"""Find gaps between anchor sequences in the transcribed text."""
cache_key = self._get_cache_key(transcribed, references)
cache_path = self.cache_dir / f"gaps_{cache_key}.json"

Args:
words: Transcribed words
first_anchor: First anchor sequence (or None if no anchors)
ref_texts_clean: Cleaned reference texts
# Try to load from cache
if cached_data := self._load_from_cache(cache_path):
self.logger.info("Loading gaps from cache")
return [GapSequence.from_dict(gap) for gap in cached_data]

Returns:
GapSequence if there are words before first anchor, None otherwise
"""
# If not in cache, perform the computation
self.logger.info("Cache miss - computing gaps")
words = self._clean_text(transcribed).split()
ref_texts_clean = {source: self._clean_text(text).split() for source, text in references.items()}
# Store original reference texts split into words
ref_texts_original = {source: text.split() for source, text in references.items()}

gaps = []
sorted_anchors = sorted(anchors, key=lambda x: x.anchor.transcription_position)

# Handle initial gap
if initial_gap := self._create_initial_gap(
words, sorted_anchors[0] if sorted_anchors else None, ref_texts_clean, ref_texts_original
):
gaps.append(initial_gap)

# Handle gaps between anchors
for i in range(len(sorted_anchors) - 1):
if between_gap := self._create_between_gap(
words, sorted_anchors[i], sorted_anchors[i + 1], ref_texts_clean, ref_texts_original
):
gaps.append(between_gap)

# Handle final gap
if sorted_anchors and (final_gap := self._create_final_gap(words, sorted_anchors[-1], ref_texts_clean, ref_texts_original)):
gaps.append(final_gap)

# Save to cache
self._save_to_cache(cache_path, [gap.to_dict() for gap in gaps])
return gaps

def _create_initial_gap(
self,
words: List[str],
first_anchor: Optional[ScoredAnchor],
ref_texts_clean: Dict[str, List[str]],
ref_texts_original: Dict[str, List[str]],
) -> Optional[GapSequence]:
"""Create gap sequence before the first anchor."""
if not first_anchor:
ref_words = {source: words for source, words in ref_texts_clean.items()}
return GapSequence(words, 0, None, None, ref_words)
ref_words_original = {source: words for source, words in ref_texts_original.items()}
return GapSequence(words, 0, None, None, ref_words, ref_words_original)

if first_anchor.anchor.transcription_position > 0:
ref_words = {}
for source, ref_words_list in ref_texts_clean.items():
ref_words_original = {}
for source in ref_texts_clean:
end_pos = first_anchor.anchor.reference_positions.get(source)
ref_words[source] = self._get_reference_words(source, ref_words_list, None, end_pos)
ref_words[source] = self._get_reference_words(source, ref_texts_clean[source], None, end_pos)
ref_words_original[source] = self._get_reference_words(source, ref_texts_original[source], None, end_pos)

return GapSequence(words[: first_anchor.anchor.transcription_position], 0, None, first_anchor.anchor, ref_words)
return GapSequence(
words[: first_anchor.anchor.transcription_position], 0, None, first_anchor.anchor, ref_words, ref_words_original
)
return None

def _create_between_gap(
self, words: List[str], current_anchor: ScoredAnchor, next_anchor: ScoredAnchor, ref_texts_clean: Dict[str, List[str]]
self,
words: List[str],
current_anchor: ScoredAnchor,
next_anchor: ScoredAnchor,
ref_texts_clean: Dict[str, List[str]],
ref_texts_original: Dict[str, List[str]],
) -> Optional[GapSequence]:
"""Create gap sequence between two anchors.
Args:
words: Transcribed words
current_anchor: Preceding anchor
next_anchor: Following anchor
ref_texts_clean: Cleaned reference texts
Returns:
GapSequence if there are words between anchors, None otherwise
"""
"""Create gap sequence between two anchors."""
gap_start = current_anchor.anchor.transcription_position + current_anchor.anchor.length
gap_end = next_anchor.anchor.transcription_position

if gap_end > gap_start:
ref_words = {}
ref_words_original = {}
shared_sources = set(current_anchor.anchor.reference_positions.keys()) & set(next_anchor.anchor.reference_positions.keys())

# Check for large position differences in next_anchor
if len(next_anchor.anchor.reference_positions) > 1:
positions = list(next_anchor.anchor.reference_positions.values())
max_diff = max(positions) - min(positions)
if max_diff > 20:
# Find source with earliest position
earliest_source = min(next_anchor.anchor.reference_positions.items(), key=lambda x: x[1])[0]
self.logger.warning(
f"Large position difference ({max_diff} words) in next anchor '{' '.join(next_anchor.anchor.words)}'. "
f"Using only earliest source: {earliest_source} at position {next_anchor.anchor.reference_positions[earliest_source]}"
f"Large position difference ({max_diff} words) in next anchor. Using only earliest source: {earliest_source}"
)
# Only consider the earliest source for the gap
shared_sources &= {earliest_source}

for source in shared_sources:
start_pos = current_anchor.anchor.reference_positions[source] + current_anchor.anchor.length
end_pos = next_anchor.anchor.reference_positions[source]
words_list = self._get_reference_words(source, ref_texts_clean[source], start_pos, end_pos)
if words_list: # Only add source if it has words
ref_words[source] = words_list
ref_words[source] = self._get_reference_words(source, ref_texts_clean[source], start_pos, end_pos)
ref_words_original[source] = self._get_reference_words(source, ref_texts_original[source], start_pos, end_pos)

return GapSequence(words[gap_start:gap_end], gap_start, current_anchor.anchor, next_anchor.anchor, ref_words)
return GapSequence(
words[gap_start:gap_end], gap_start, current_anchor.anchor, next_anchor.anchor, ref_words, ref_words_original
)
return None

def _create_final_gap(
self, words: List[str], last_anchor: ScoredAnchor, ref_texts_clean: Dict[str, List[str]]
self, words: List[str], last_anchor: ScoredAnchor, ref_texts_clean: Dict[str, List[str]], ref_texts_original: Dict[str, List[str]]
) -> Optional[GapSequence]:
"""Create gap sequence after the last anchor.
Args:
words: Transcribed words
last_anchor: Last anchor sequence
ref_texts_clean: Cleaned reference texts
Returns:
GapSequence if there are words after last anchor, None otherwise
"""
"""Create gap sequence after the last anchor."""
last_pos = last_anchor.anchor.transcription_position + last_anchor.anchor.length
if last_pos < len(words):
ref_words = {}
for source, ref_words_list in ref_texts_clean.items():
ref_words_original = {}
for source in ref_texts_clean:
if source in last_anchor.anchor.reference_positions:
start_pos = last_anchor.anchor.reference_positions[source] + last_anchor.anchor.length
ref_words[source] = self._get_reference_words(source, ref_words_list, start_pos, None)
ref_words[source] = self._get_reference_words(source, ref_texts_clean[source], start_pos, None)
ref_words_original[source] = self._get_reference_words(source, ref_texts_original[source], start_pos, None)

return GapSequence(words[last_pos:], last_pos, last_anchor.anchor, None, ref_words)
return GapSequence(words[last_pos:], last_pos, last_anchor.anchor, None, ref_words, ref_words_original)
return None

def find_gaps(self, transcribed: str, anchors: List[ScoredAnchor], references: Dict[str, str]) -> List[GapSequence]:
"""Find gaps between anchor sequences in the transcribed text."""
cache_key = self._get_cache_key(transcribed, references)
cache_path = self.cache_dir / f"gaps_{cache_key}.json"

# Try to load from cache
if cached_data := self._load_from_cache(cache_path):
self.logger.info("Loading gaps from cache")
return [GapSequence.from_dict(gap) for gap in cached_data]

# If not in cache, perform the computation
self.logger.info("Cache miss - computing gaps")
words = self._clean_text(transcribed).split()
ref_texts_clean = {source: self._clean_text(text).split() for source, text in references.items()}

gaps = []
sorted_anchors = sorted(anchors, key=lambda x: x.anchor.transcription_position)

# Handle initial gap
if initial_gap := self._create_initial_gap(words, sorted_anchors[0] if sorted_anchors else None, ref_texts_clean):
gaps.append(initial_gap)

# Handle gaps between anchors
for i in range(len(sorted_anchors) - 1):
if between_gap := self._create_between_gap(words, sorted_anchors[i], sorted_anchors[i + 1], ref_texts_clean):
gaps.append(between_gap)

# Handle final gap
if sorted_anchors and (final_gap := self._create_final_gap(words, sorted_anchors[-1], ref_texts_clean)):
gaps.append(final_gap)

# Save to cache
self._save_to_cache(cache_path, [gap.to_dict() for gap in gaps])
return gaps
Loading

0 comments on commit 14337e4

Please sign in to comment.