-
Notifications
You must be signed in to change notification settings - Fork 546
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Batch generation using Gradio API #119
Comments
Try this out. You won't need to "batch" a bunch of text files. you can just input any length context you like.import torch
import torchaudio
import gradio as gr
from os import getenv
import re
import numpy as np
from pathlib import Path
from zonos.model import Zonos, DEFAULT_BACKBONE_CLS as ZonosBackbone
from zonos.conditioning import make_cond_dict, supported_language_codes
from zonos.utils import DEFAULT_DEVICE as device
CURRENT_MODEL_TYPE = None
CURRENT_MODEL = None
SPEAKER_EMBEDDING = None
SPEAKER_AUDIO_PATH = None
def load_model_if_needed(model_choice: str):
global CURRENT_MODEL_TYPE, CURRENT_MODEL
if CURRENT_MODEL_TYPE != model_choice:
if CURRENT_MODEL is not None:
del CURRENT_MODEL
torch.cuda.empty_cache()
print(f"Loading {model_choice} model...")
CURRENT_MODEL = Zonos.from_pretrained(model_choice, device=device)
CURRENT_MODEL.requires_grad_(False).eval()
CURRENT_MODEL_TYPE = model_choice
print(f"{model_choice} model loaded successfully!")
return CURRENT_MODEL
def update_ui(model_choice):
"""
Dynamically show/hide UI elements based on the model's conditioners.
We do NOT display 'language_id' or 'ctc_loss' even if they exist in the model.
"""
model = load_model_if_needed(model_choice)
cond_names = [c.name for c in model.prefix_conditioner.conditioners]
print("Conditioners in this model:", cond_names)
text_update = gr.update(visible=("espeak" in cond_names))
language_update = gr.update(visible=("espeak" in cond_names))
speaker_audio_update = gr.update(visible=("speaker" in cond_names))
prefix_audio_update = gr.update(visible=True)
emotion1_update = gr.update(visible=("emotion" in cond_names))
emotion2_update = gr.update(visible=("emotion" in cond_names))
emotion3_update = gr.update(visible=("emotion" in cond_names))
emotion4_update = gr.update(visible=("emotion" in cond_names))
emotion5_update = gr.update(visible=("emotion" in cond_names))
emotion6_update = gr.update(visible=("emotion" in cond_names))
emotion7_update = gr.update(visible=("emotion" in cond_names))
emotion8_update = gr.update(visible=("emotion" in cond_names))
vq_single_slider_update = gr.update(visible=("vqscore_8" in cond_names))
fmax_slider_update = gr.update(visible=("fmax" in cond_names))
pitch_std_slider_update = gr.update(visible=("pitch_std" in cond_names))
speaking_rate_slider_update = gr.update(visible=("speaking_rate" in cond_names))
dnsmos_slider_update = gr.update(visible=("dnsmos_ovrl" in cond_names))
speaker_noised_checkbox_update = gr.update(visible=("speaker_noised" in cond_names))
unconditional_keys_update = gr.update(
choices=[name for name in cond_names if name not in ("espeak", "language_id")]
)
return (
text_update,
language_update,
speaker_audio_update,
prefix_audio_update,
emotion1_update,
emotion2_update,
emotion3_update,
emotion4_update,
emotion5_update,
emotion6_update,
emotion7_update,
emotion8_update,
vq_single_slider_update,
fmax_slider_update,
pitch_std_slider_update,
speaking_rate_slider_update,
dnsmos_slider_update,
speaker_noised_checkbox_update,
unconditional_keys_update,
)
def split_into_sentences(text):
# Simplified sentence splitting (handles basic cases)
sentences = re.split(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)(?=\s|[A-Z]|$)', text)
return [s.strip() for s in sentences if s.strip()]
def count_words(text):
return len(text.split())
def split_into_chunks(text, word_limit=50):
sentences = split_into_sentences(text)
chunks = []
current_chunk = []
current_word_count = 0
... (468 lines left)
Collapse
message.txt
21 KB
import torch
import torchaudio
import gradio as gr
from os import getenv
import re
import numpy as np
from pathlib import Path
from zonos.model import Zonos, DEFAULT_BACKBONE_CLS as ZonosBackbone
from zonos.conditioning import make_cond_dict, supported_language_codes
from zonos.utils import DEFAULT_DEVICE as device
CURRENT_MODEL_TYPE = None
CURRENT_MODEL = None
SPEAKER_EMBEDDING = None
SPEAKER_AUDIO_PATH = None
def load_model_if_needed(model_choice: str):
global CURRENT_MODEL_TYPE, CURRENT_MODEL
if CURRENT_MODEL_TYPE != model_choice:
if CURRENT_MODEL is not None:
del CURRENT_MODEL
torch.cuda.empty_cache()
print(f"Loading {model_choice} model...")
CURRENT_MODEL = Zonos.from_pretrained(model_choice, device=device)
CURRENT_MODEL.requires_grad_(False).eval()
CURRENT_MODEL_TYPE = model_choice
print(f"{model_choice} model loaded successfully!")
return CURRENT_MODEL
def update_ui(model_choice):
"""
Dynamically show/hide UI elements based on the model's conditioners.
We do NOT display 'language_id' or 'ctc_loss' even if they exist in the model.
"""
model = load_model_if_needed(model_choice)
cond_names = [c.name for c in model.prefix_conditioner.conditioners]
print("Conditioners in this model:", cond_names)
text_update = gr.update(visible=("espeak" in cond_names))
language_update = gr.update(visible=("espeak" in cond_names))
speaker_audio_update = gr.update(visible=("speaker" in cond_names))
prefix_audio_update = gr.update(visible=True)
emotion1_update = gr.update(visible=("emotion" in cond_names))
emotion2_update = gr.update(visible=("emotion" in cond_names))
emotion3_update = gr.update(visible=("emotion" in cond_names))
emotion4_update = gr.update(visible=("emotion" in cond_names))
emotion5_update = gr.update(visible=("emotion" in cond_names))
emotion6_update = gr.update(visible=("emotion" in cond_names))
emotion7_update = gr.update(visible=("emotion" in cond_names))
emotion8_update = gr.update(visible=("emotion" in cond_names))
vq_single_slider_update = gr.update(visible=("vqscore_8" in cond_names))
fmax_slider_update = gr.update(visible=("fmax" in cond_names))
pitch_std_slider_update = gr.update(visible=("pitch_std" in cond_names))
speaking_rate_slider_update = gr.update(visible=("speaking_rate" in cond_names))
dnsmos_slider_update = gr.update(visible=("dnsmos_ovrl" in cond_names))
speaker_noised_checkbox_update = gr.update(visible=("speaker_noised" in cond_names))
unconditional_keys_update = gr.update(
choices=[name for name in cond_names if name not in ("espeak", "language_id")]
)
return (
text_update,
language_update,
speaker_audio_update,
prefix_audio_update,
emotion1_update,
emotion2_update,
emotion3_update,
emotion4_update,
emotion5_update,
emotion6_update,
emotion7_update,
emotion8_update,
vq_single_slider_update,
fmax_slider_update,
pitch_std_slider_update,
speaking_rate_slider_update,
dnsmos_slider_update,
speaker_noised_checkbox_update,
unconditional_keys_update,
)
def split_into_sentences(text):
# Simplified sentence splitting (handles basic cases)
sentences = re.split(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)(?=\s|[A-Z]|$)', text)
return [s.strip() for s in sentences if s.strip()]
def count_words(text):
return len(text.split())
def split_into_chunks(text, word_limit=50):
sentences = split_into_sentences(text)
chunks = []
current_chunk = []
current_word_count = 0
for sentence in sentences:
sentence_word_count = count_words(sentence)
if sentence_word_count > word_limit:
# Handle very long sentences
if current_chunk:
chunks.append(' '.join(current_chunk))
current_chunk = []
current_word_count = 0
# Split long sentence into smaller parts
long_sentence_parts = re.split(r'(?<=[,.])\s+', sentence)
for part in long_sentence_parts:
part_word_count = count_words(part)
if current_word_count + part_word_count <= word_limit:
current_chunk.append(part)
current_word_count += part_word_count
else:
if current_chunk:
chunks.append(' '.join(current_chunk))
current_chunk = [part]
current_word_count = part_word_count
if current_chunk: # Add any remaining part
chunks.append(' '.join(current_chunk))
current_chunk = []
current_word_count = 0
elif current_word_count + sentence_word_count <= word_limit:
current_chunk.append(sentence)
current_word_count += sentence_word_count
else:
if current_chunk:
chunks.append(' '.join(current_chunk))
current_chunk = [sentence]
current_word_count = sentence_word_count
if current_chunk:
chunks.append(' '.join(current_chunk))
return chunks
def concatenate_audio(audio_segments, silence_duration=0.2):
"""Concatenates audio segments with optional silence between them."""
silence = np.zeros(int(48000 * silence_duration)) # Assuming 48kHz sample rate
concatenated = []
for audio in audio_segments:
concatenated.append(audio)
concatenated.append(silence) # Add silence
return np.concatenate(concatenated)
def generate_audio_chunk(
model_choice,
text,
language,
speaker_audio,
prefix_audio,
e1,
e2,
e3,
e4,
e5,
e6,
e7,
e8,
vq_single,
fmax,
pitch_std,
speaking_rate,
dnsmos_ovrl,
speaker_noised,
cfg_scale,
min_p,
seed,
randomize_seed,
unconditional_keys,
progress=gr.Progress(),
):
"""
Generates audio for a single chunk. This is the original generate_audio function,
renamed to clarify its role in the chunking process.
"""
selected_model = load_model_if_needed(model_choice)
speaker_noised_bool = bool(speaker_noised)
fmax = float(fmax)
pitch_std = float(pitch_std)
speaking_rate = float(speaking_rate)
dnsmos_ovrl = float(dnsmos_ovrl)
cfg_scale = float(cfg_scale)
min_p = float(min_p)
seed = int(seed)
max_new_tokens = 86 * 30
# This is a bit ew, but works for now.
global SPEAKER_AUDIO_PATH, SPEAKER_EMBEDDING
if randomize_seed:
seed = torch.randint(0, 2**32 - 1, (1,)).item()
torch.manual_seed(seed)
if speaker_audio is not None and "speaker" not in unconditional_keys:
if speaker_audio != SPEAKER_AUDIO_PATH:
print("Recomputed speaker embedding")
wav, sr = torchaudio.load(speaker_audio)
SPEAKER_EMBEDDING = selected_model.make_speaker_embedding(wav, sr)
SPEAKER_EMBEDDING = SPEAKER_EMBEDDING.to(device, dtype=torch.bfloat16)
SPEAKER_AUDIO_PATH = speaker_audio
audio_prefix_codes = None
if prefix_audio is not None:
wav_prefix, sr_prefix = torchaudio.load(prefix_audio)
wav_prefix = wav_prefix.mean(0, keepdim=True)
wav_prefix = selected_model.autoencoder.preprocess(wav_prefix, sr_prefix)
wav_prefix = wav_prefix.to(device, dtype=torch.float32)
audio_prefix_codes = selected_model.autoencoder.encode(wav_prefix.unsqueeze(0))
emotion_tensor = torch.tensor(list(map(float, [e1, e2, e3, e4, e5, e6, e7, e8])), device=device)
vq_val = float(vq_single)
vq_tensor = torch.tensor([vq_val] * 8, device=device).unsqueeze(0)
cond_dict = make_cond_dict(
text=text,
language=language,
speaker=SPEAKER_EMBEDDING,
emotion=emotion_tensor,
vqscore_8=vq_tensor,
fmax=fmax,
pitch_std=pitch_std,
speaking_rate=speaking_rate,
dnsmos_ovrl=dnsmos_ovrl,
speaker_noised=speaker_noised_bool,
device=device,
unconditional_keys=unconditional_keys,
)
conditioning = selected_model.prepare_conditioning(cond_dict)
estimated_generation_duration = 30 * len(text) / 400
estimated_total_steps = int(estimated_generation_duration * 86)
def update_progress(_frame: torch.Tensor, step: int, _total_steps: int) -> bool:
progress((step, estimated_total_steps))
return True
codes = selected_model.generate(
prefix_conditioning=conditioning,
audio_prefix_codes=audio_prefix_codes,
max_new_tokens=max_new_tokens,
cfg_scale=cfg_scale,
batch_size=1,
sampling_params=dict(min_p=min_p),
callback=update_progress,
)
wav_out = selected_model.autoencoder.decode(codes).cpu().detach()
sr_out = selected_model.autoencoder.sampling_rate
if wav_out.dim() == 2 and wav_out.size(0) > 1:
wav_out = wav_out[0:1, :]
return (sr_out, wav_out.squeeze().numpy()), seed
def generate_audio(
model_choice,
text,
language,
speaker_audio,
prefix_audio,
e1,
e2,
e3,
e4,
e5,
e6,
e7,
e8,
vq_single,
fmax,
pitch_std,
speaking_rate,
dnsmos_ovrl,
speaker_noised,
cfg_scale,
min_p,
seed,
randomize_seed,
unconditional_keys,
progress=gr.Progress(),
):
"""
Modified generate_audio function to handle chunking and concatenation.
"""
global SPEAKER_EMBEDDING, SPEAKER_AUDIO_PATH
if not text.strip():
return (None, None), seed
chunks = split_into_chunks(text)
audio_segments = []
current_seed = seed
# Calculate total steps for progress bar
total_steps = 0
for chunk in chunks:
estimated_generation_duration = 30 * len(chunk) / 400
total_steps += int(estimated_generation_duration * 86)
steps_so_far = 0
for i, chunk in enumerate(chunks):
print(f"Processing chunk {i+1}/{len(chunks)}: {chunk[:50]}...")
# Wrap the progress update for each chunk
def chunk_progress(progress_tuple):
nonlocal steps_so_far # Use nonlocal to modify the outer variable
progress((steps_so_far + progress_tuple[0], total_steps))
return True
# If speaker is provided, only recompute embedding for the first chunk
if speaker_audio is not None and "speaker" not in unconditional_keys:
if i == 0:
wav, sr = torchaudio.load(speaker_audio)
SPEAKER_EMBEDDING = load_model_if_needed(model_choice).make_speaker_embedding(wav, sr)
SPEAKER_EMBEDDING = SPEAKER_EMBEDDING.to(device, dtype=torch.bfloat16)
SPEAKER_AUDIO_PATH = speaker_audio
# For subsequent chunks, use the existing SPEAKER_EMBEDDING
audio_data, current_seed = generate_audio_chunk(
model_choice,
chunk,
language,
None if i > 0 else speaker_audio, # Only pass speaker_audio for the first chunk
prefix_audio if i == 0 else None, # Only pass prefix for first chunk
e1, e2, e3, e4, e5, e6, e7, e8,
vq_single, fmax, pitch_std, speaking_rate, dnsmos_ovrl,
speaker_noised, cfg_scale, min_p, current_seed,
False, # Don't randomize seed for subsequent chunks
unconditional_keys,
progress=chunk_progress
)
else:
audio_data, current_seed = generate_audio_chunk(
model_choice,
chunk,
language,
speaker_audio,
prefix_audio if i == 0 else None, # Only pass prefix for first chunk
e1, e2, e3, e4, e5, e6, e7, e8,
vq_single, fmax, pitch_std, speaking_rate, dnsmos_ovrl,
speaker_noised, cfg_scale, min_p, current_seed,
False, # Don't randomize seed for subsequent chunks
unconditional_keys,
progress=chunk_progress
)
if audio_data is not None:
audio_segments.append(audio_data[1])
# Update steps_so_far for the overall progress
estimated_generation_duration = 30 * len(chunk) / 400
steps_so_far += int(estimated_generation_duration * 86)
if audio_segments:
final_audio = concatenate_audio(audio_segments)
return (48000, final_audio), seed # Return concatenated audio
else:
return (None, None), seed
def build_interface():
supported_models = []
if "transformer" in ZonosBackbone.supported_architectures:
supported_models.append("Zyphra/Zonos-v0.1-transformer")
if "hybrid" in ZonosBackbone.supported_architectures:
supported_models.append("Zyphra/Zonos-v0.1-hybrid")
else:
print(
"| The current ZonosBackbone does not support the hybrid architecture, meaning only the transformer model will be available in the model selector.\n"
"| This probably means the mamba-ssm library has not been installed."
)
with gr.Blocks() as demo:
with gr.Row():
with gr.Column():
model_choice = gr.Dropdown(
choices=supported_models,
value=supported_models[0],
label="Zonos Model Type",
info="Select the model variant to use.",
)
text = gr.Textbox(
label="Text to Synthesize",
value="Zonos uses eSpeak for text to phoneme conversion!",
lines=4,
# Removed maxlength to allow for longer texts
)
language = gr.Dropdown(
choices=supported_language_codes,
value="en-us",
label="Language Code",
info="Select a language code.",
)
prefix_audio = gr.Audio(
value="assets/silence_100ms.wav",
label="Optional Prefix Audio (continue from this audio)",
type="filepath",
)
with gr.Column():
speaker_audio = gr.Audio(
label="Optional Speaker Audio (for cloning)",
type="filepath",
)
speaker_noised_checkbox = gr.Checkbox(label="Denoise Speaker?", value=False)
with gr.Row():
with gr.Column():
gr.Markdown("## Conditioning Parameters")
dnsmos_slider = gr.Slider(1.0, 5.0, value=4.0, step=0.1, label="DNSMOS Overall")
fmax_slider = gr.Slider(0, 24000, value=24000, step=1, label="Fmax (Hz)")
vq_single_slider = gr.Slider(0.5, 0.8, 0.78, 0.01, label="VQ Score")
pitch_std_slider = gr.Slider(0.0, 300.0, value=45.0, step=1, label="Pitch Std")
speaking_rate_slider = gr.Slider(5.0, 30.0, value=15.0, step=0.5, label="Speaking Rate")
with gr.Column():
gr.Markdown("## Generation Parameters")
cfg_scale_slider = gr.Slider(1.0, 5.0, 2.0, 0.1, label="CFG Scale")
min_p_slider = gr.Slider(0.0, 1.0, 0.15, 0.01, label="Min P")
seed_number = gr.Number(label="Seed", value=420, precision=0)
randomize_seed_toggle = gr.Checkbox(label="Randomize Seed (before generation)", value=True)
with gr.Accordion("Advanced Parameters", open=False):
gr.Markdown(
"### Unconditional Toggles\n"
"Checking a box will make the model ignore the corresponding conditioning value and make it unconditional.\n"
'Practically this means the given conditioning feature will be unconstrained and "filled in automatically".'
)
with gr.Row():
unconditional_keys = gr.CheckboxGroup(
[
"speaker",
"emotion",
"vqscore_8",
"fmax",
"pitch_std",
"speaking_rate",
"dnsmos_ovrl",
"speaker_noised",
],
value=["emotion"],
label="Unconditional Keys",
)
gr.Markdown(
"### Emotion Sliders\n"
"Warning: The way these sliders work is not intuitive and may require some trial and error to get the desired effect.\n"
"Certain configurations can cause the model to become unstable. Setting emotion to unconditional may help."
)
with gr.Row():
emotion1 = gr.Slider(0.0, 1.0, 1.0, 0.05, label="Happiness")
emotion2 = gr.Slider(0.0, 1.0, 0.05, 0.05, label="Sadness")
emotion3 = gr.Slider(0.0, 1.0, 0.05, 0.05, label="Disgust")
emotion4 = gr.Slider(0.0, 1.0, 0.05, 0.05, label="Fear")
with gr.Row():
emotion5 = gr.Slider(0.0, 1.0, 0.05, 0.05, label="Surprise")
emotion6 = gr.Slider(0.0, 1.0, 0.05, 0.05, label="Anger")
emotion7 = gr.Slider(0.0, 1.0, 0.1, 0.05, label="Other")
emotion8 = gr.Slider(0.0, 1.0, 0.2, 0.05, label="Neutral")
with gr.Column():
generate_button = gr.Button("Generate Audio")
output_audio = gr.Audio(label="Generated Audio", type="numpy", autoplay=True)
model_choice.change(
fn=update_ui,
inputs=[model_choice],
outputs=[
text,
language,
speaker_audio,
prefix_audio,
emotion1,
emotion2,
emotion3,
emotion4,
emotion5,
emotion6,
emotion7,
emotion8,
vq_single_slider,
fmax_slider,
pitch_std_slider,
speaking_rate_slider,
dnsmos_slider,
speaker_noised_checkbox,
unconditional_keys,
],
)
# On page load, trigger the same UI refresh
demo.load(
fn=update_ui,
inputs=[model_choice],
outputs=[
text,
language,
speaker_audio,
prefix_audio,
emotion1,
emotion2,
emotion3,
emotion4,
emotion5,
emotion6,
emotion7,
emotion8,
vq_single_slider,
fmax_slider,
pitch_std_slider,
speaking_rate_slider,
dnsmos_slider,
speaker_noised_checkbox,
unconditional_keys,
],
)
# Generate audio on button click
generate_button.click(
fn=generate_audio,
inputs=[
model_choice,
text,
language,
speaker_audio,
prefix_audio,
emotion1,
emotion2,
emotion3,
emotion4,
emotion5,
emotion6,
emotion7,
emotion8,
vq_single_slider,
fmax_slider,
pitch_std_slider,
speaking_rate_slider,
dnsmos_slider,
speaker_noised_checkbox,
cfg_scale_slider,
min_p_slider,
seed_number,
randomize_seed_toggle,
unconditional_keys,
],
outputs=[output_audio, seed_number],
)
return demo
if __name__ == "__main__":
demo = build_interface()
share = getenv("GRADIO_SHARE", "False").lower() in ("true", "1", "t")
demo.launch(server_name="localhost", server_port=7860, share=share) |
I tested it, and it works, but it has issues. The voice result is not always what you want; sometimes you get undesirable results, or the model adds emotions like laughing, which you don't like or want. In these scenarios, you need to re-run the whole text again or use audio editors to cut these issues. But in the segmentation approach, you have more control over the result. I have been using this model heavily for the past few days Judging by what I have learned, in the current state of the model, text segmentation approaches give you more control. |
An easy solution for generating audio for several text chunks.
Demo:
https://www.youtube.com/watch?v=3OFkEtTFM84
The text was updated successfully, but these errors were encountered: