Skip to content

Commit

Permalink
Added workign Audioshake API implementation for alternate transcripti…
Browse files Browse the repository at this point in the history
…on option
  • Loading branch information
beveradb committed Jul 18, 2024
1 parent 4c64e58 commit 4750e7b
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 40 deletions.
108 changes: 83 additions & 25 deletions lyrics_transcriber/audioshake_transcriber.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,93 @@
import logging
import requests
import time
import os
import json


class AudioShakeTranscriber:
def __init__(self, api_token, log_level=logging.DEBUG):
def __init__(self, api_token, logger):
self.api_token = api_token
self.logger = logging.getLogger(__name__)
self.logger.setLevel(log_level)
self.base_url = "https://groovy.audioshake.ai"
self.logger = logger

def transcribe(self, audio_filepath):
# This is a placeholder for the actual AudioShake API implementation
self.logger.info(f"Transcribing {audio_filepath} using AudioShake API")

self.logger.debug(f"AudioShake API token: {self.api_token}")
# TODO: Implement the actual API call to AudioShake
# For now, we'll return a dummy result
return {
"transcription_data_dict": {
"segments": [
{
"start": 0,
"end": 5,
"text": "This is a dummy transcription",
"words": [
{"text": "This", "start": 0, "end": 1},
{"text": "is", "start": 1, "end": 2},
{"text": "a", "start": 2, "end": 3},
{"text": "dummy", "start": 3, "end": 4},
{"text": "transcription", "start": 4, "end": 5},
],
}
]
}
# Step 1: Upload the audio file
asset_id = self._upload_file(audio_filepath)
self.logger.debug(f"File uploaded successfully. Asset ID: {asset_id}")

# Step 2: Create a job for transcription and alignment
job_id = self._create_job(asset_id)
self.logger.debug(f"Job created successfully. Job ID: {job_id}")

# Step 3: Wait for the job to complete and get the results
result = self._get_job_result(job_id)
self.logger.debug(f"Job completed. Processing results...")

# Step 4: Process the result and return in the required format
return self._process_result(result)

def _upload_file(self, filepath):
self.logger.debug(f"Uploading {filepath} to AudioShake")
url = f"{self.base_url}/upload"
headers = {"Authorization": f"Bearer {self.api_token}"}
with open(filepath, "rb") as file:
files = {"file": (os.path.basename(filepath), file)}
response = requests.post(url, headers=headers, files=files)

self.logger.debug(f"Upload response status code: {response.status_code}")
self.logger.debug(f"Upload response content: {response.text}")

response.raise_for_status()
return response.json()["id"]

def _create_job(self, asset_id):
self.logger.debug(f"Creating job for asset {asset_id}")
url = f"{self.base_url}/job/"
headers = {"Authorization": f"Bearer {self.api_token}", "Content-Type": "application/json"}
data = {
"metadata": {"format": "json", "name": "alignment", "language": "en"},
"callbackUrl": "https://example.com/webhook/alignment",
"assetId": asset_id,
}
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
return response.json()["job"]["id"]

def _get_job_result(self, job_id):
self.logger.debug(f"Getting job result for job {job_id}")
url = f"{self.base_url}/job/{job_id}"
headers = {"Authorization": f"Bearer {self.api_token}", "Content-Type": "application/json"}
while True:
response = requests.get(url, headers=headers)
response.raise_for_status()
job_data = response.json()["job"]
if job_data["status"] == "completed":
return job_data
elif job_data["status"] == "failed":
raise Exception("Job failed")
time.sleep(5) # Wait 5 seconds before checking again

def _process_result(self, job_data):
self.logger.debug(f"Processing result for job {job_data}")
output_asset = next((asset for asset in job_data["outputAssets"] if asset["name"] == "transcription.json"), None)

if not output_asset:
raise Exception("Transcription output not found in job results")

transcription_url = output_asset["link"]
response = requests.get(transcription_url)
response.raise_for_status()
transcription_data = response.json()

transcription_data = {"segments": transcription_data.get("lines", []), "text": transcription_data.get("text", "")}

# Ensure each segment has the required fields
for segment in transcription_data["segments"]:
if "words" not in segment:
segment["words"] = []
if "text" not in segment:
segment["text"] = " ".join(word["text"] for word in segment["words"])

return transcription_data
30 changes: 15 additions & 15 deletions lyrics_transcriber/transcriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -944,39 +944,39 @@ def transcribe(self):
transcription_cache_suffix = "-audioshake" if self.audioshake_api_token else "-whisper"
self.outputs["transcription_data_filepath"] = self.get_cache_filepath(f"{transcription_cache_suffix}.json")

whisper_cache_filepath = self.outputs["transcription_data_filepath"]
if os.path.isfile(whisper_cache_filepath):
self.logger.debug(f"transcribe found existing file at whisper_cache_filepath, reading: {whisper_cache_filepath}")
with open(whisper_cache_filepath, "r") as cache_file:
transcription_cache_filepath = self.outputs["transcription_data_filepath"]
if os.path.isfile(transcription_cache_filepath):
self.logger.debug(f"transcribe found existing file at transcription_cache_filepath, reading: {transcription_cache_filepath}")
with open(transcription_cache_filepath, "r") as cache_file:
self.outputs["transcription_data_dict"] = json.load(cache_file)
return

if self.audioshake_api_token:
self.logger.debug(f"Using AudioShake API for transcription")
from .audioshake_transcriber import AudioShakeTranscriber

audioshake = AudioShakeTranscriber(self.audioshake_api_token, log_level=self.log_level)
result = audioshake.transcribe(self.audio_filepath)
audioshake = AudioShakeTranscriber(self.audioshake_api_token, logger=self.logger)
transcription_data = audioshake.transcribe(self.audio_filepath)
else:
self.logger.debug(f"Using Whisper for transcription with model: {self.transcription_model}")
audio = whisper.load_audio(self.audio_filepath)
model = whisper.load_model(self.transcription_model, device="cpu")
result = whisper.transcribe(model, audio, language="en", vad="auditok", beam_size=5, temperature=0.2, best_of=5)
transcription_data = whisper.transcribe(model, audio, language="en", vad="auditok", beam_size=5, temperature=0.2, best_of=5)

# Remove segments with no words, only music
result["segments"] = [segment for segment in result["segments"] if segment["text"].strip() != "Music"]
self.logger.debug(f"Removed 'Music' segments. Remaining segments: {len(result['segments'])}")
transcription_data["segments"] = [segment for segment in transcription_data["segments"] if segment["text"].strip() != "Music"]
self.logger.debug(f"Removed 'Music' segments. Remaining segments: {len(transcription_data['segments'])}")

# Split long segments
self.logger.debug("Starting to split long segments")
result["segments"] = self.split_long_segments(result["segments"], max_length=36)
self.logger.debug(f"Finished splitting segments. Total segments after splitting: {len(result['segments'])}")
transcription_data["segments"] = self.split_long_segments(transcription_data["segments"], max_length=36)
self.logger.debug(f"Finished splitting segments. Total segments after splitting: {len(transcription_data['segments'])}")

self.logger.debug(f"writing transcription data JSON to cache file: {whisper_cache_filepath}")
with open(whisper_cache_filepath, "w") as cache_file:
json.dump(result, cache_file, indent=4)
self.logger.debug(f"writing transcription data JSON to cache file: {transcription_cache_filepath}")
with open(transcription_cache_filepath, "w") as cache_file:
json.dump(transcription_data, cache_file, indent=4)

self.outputs["transcription_data_dict"] = result
self.outputs["transcription_data_dict"] = transcription_data

def get_cache_filepath(self, extension):
filename = os.path.split(self.audio_filepath)[1]
Expand Down

0 comments on commit 4750e7b

Please sign in to comment.