Spaces:
Runtime error
Runtime error
| import os | |
| import numpy as np | |
| import gradio as gr | |
| import requests | |
| from genai_chat_ai import AI,create_chat_session | |
| import torch | |
| from typing import Any, Callable, Optional, Tuple, Union,Iterator | |
| import numpy as np | |
| import torch.nn as nn # Import the missing module | |
| def _inference_forward_stream( | |
| self, | |
| input_ids: Optional[torch.Tensor] = None, | |
| attention_mask: Optional[torch.Tensor] = None, | |
| speaker_embeddings: Optional[torch.Tensor] = None, | |
| output_attentions: Optional[bool] = None, | |
| output_hidden_states: Optional[bool] = None, | |
| return_dict: Optional[bool] = None, | |
| padding_mask: Optional[torch.Tensor] = None, | |
| chunk_size: int = 32, # Chunk size for streaming output | |
| ) -> Iterator[torch.Tensor]: | |
| """Generates speech waveforms in a streaming fashion.""" | |
| if attention_mask is not None: | |
| padding_mask = attention_mask.unsqueeze(-1).float() | |
| else: | |
| padding_mask = torch.ones_like(input_ids).unsqueeze(-1).float() | |
| text_encoder_output = self.text_encoder( | |
| input_ids=input_ids, | |
| padding_mask=padding_mask, | |
| attention_mask=attention_mask, | |
| output_attentions=output_attentions, | |
| output_hidden_states=output_hidden_states, | |
| return_dict=return_dict, | |
| ) | |
| hidden_states = text_encoder_output[0] if not return_dict else text_encoder_output.last_hidden_state | |
| hidden_states = hidden_states.transpose(1, 2) | |
| input_padding_mask = padding_mask.transpose(1, 2) | |
| prior_means = text_encoder_output[1] if not return_dict else text_encoder_output.prior_means | |
| prior_log_variances = text_encoder_output[2] if not return_dict else text_encoder_output.prior_log_variances | |
| if self.config.use_stochastic_duration_prediction: | |
| log_duration = self.duration_predictor( | |
| hidden_states, | |
| input_padding_mask, | |
| speaker_embeddings, | |
| reverse=True, | |
| noise_scale=self.noise_scale_duration, | |
| ) | |
| else: | |
| log_duration = self.duration_predictor(hidden_states, input_padding_mask, speaker_embeddings) | |
| length_scale = 1.0 / self.speaking_rate | |
| duration = torch.ceil(torch.exp(log_duration) * input_padding_mask * length_scale) | |
| predicted_lengths = torch.clamp_min(torch.sum(duration, [1, 2]), 1).long() | |
| # Create a padding mask for the output lengths of shape (batch, 1, max_output_length) | |
| indices = torch.arange(predicted_lengths.max(), dtype=predicted_lengths.dtype, device=predicted_lengths.device) | |
| output_padding_mask = indices.unsqueeze(0) < predicted_lengths.unsqueeze(1) | |
| output_padding_mask = output_padding_mask.unsqueeze(1).to(input_padding_mask.dtype) | |
| # Reconstruct an attention tensor of shape (batch, 1, out_length, in_length) | |
| attn_mask = torch.unsqueeze(input_padding_mask, 2) * torch.unsqueeze(output_padding_mask, -1) | |
| batch_size, _, output_length, input_length = attn_mask.shape | |
| cum_duration = torch.cumsum(duration, -1).view(batch_size * input_length, 1) | |
| indices = torch.arange(output_length, dtype=duration.dtype, device=duration.device) | |
| valid_indices = indices.unsqueeze(0) < cum_duration | |
| valid_indices = valid_indices.to(attn_mask.dtype).view(batch_size, input_length, output_length) | |
| padded_indices = valid_indices - nn.functional.pad(valid_indices, [0, 0, 1, 0, 0, 0])[:, :-1] | |
| attn = padded_indices.unsqueeze(1).transpose(2, 3) * attn_mask | |
| # Expand prior distribution | |
| prior_means = torch.matmul(attn.squeeze(1), prior_means).transpose(1, 2) | |
| prior_log_variances = torch.matmul(attn.squeeze(1), prior_log_variances).transpose(1, 2) | |
| prior_latents = prior_means + torch.randn_like(prior_means) * torch.exp(prior_log_variances) * self.noise_scale | |
| latents = self.flow(prior_latents, output_padding_mask, speaker_embeddings, reverse=True) | |
| spectrogram = latents * output_padding_mask | |
| for i in range(0, spectrogram.size(-1), chunk_size): | |
| yield self.decoder(spectrogram[:,:,i : i + chunk_size] ,speaker_embeddings) | |
| api_key = os.environ.get("Id_mode_vits") | |
| headers = {"Authorization": f"Bearer {api_key}"} | |
| from transformers import AutoTokenizer,VitsModel | |
| import torch | |
| models= {} | |
| tokenizer = AutoTokenizer.from_pretrained("asg2024/vits-ar-sa-huba",token=api_key) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| def get_model(name_model): | |
| global models | |
| if name_model in models: | |
| return models[name_model] | |
| models[name_model]=VitsModel.from_pretrained(name_model,token=api_key).to(device) | |
| return models[name_model] | |
| def genrate_speech(text,name_model): | |
| inputs=tokenizer(text,return_tensors="pt") | |
| model=get_model(name_model) | |
| with torch.no_grad(): | |
| wav=model( | |
| input_ids= inputs.input_ids.to(device), | |
| attention_mask=inputs.attention_mask.to(device), | |
| speaker_id=0 | |
| ).waveform.cpu().numpy().reshape(-1) | |
| return model.config.sampling_rate,wav | |
| def generate_audio(text,name_model,speaker_id=None): | |
| inputs = tokenizer(text, return_tensors="pt")#.input_ids | |
| speaker_embeddings = None | |
| model=get_model(name_model) | |
| #torch.cuda.empty_cache() | |
| with torch.no_grad(): | |
| for chunk in _inference_forward_stream(model,input_ids=inputs.input_ids,attention_mask=inputs.attention_mask,speaker_embeddings= speaker_embeddings,chunk_size=64): | |
| yield 16000,chunk.squeeze().cpu().numpy()#.astype(np.int16).tobytes() | |
| def remove_extra_spaces(text): | |
| """ | |
| Removes extra spaces between words in a string. | |
| Args: | |
| text: The string to process. | |
| Returns: | |
| The string with extra spaces removed. | |
| """ | |
| return ' '.join(text.split()) | |
| def query(text,API_URL): | |
| payload={"inputs": text} | |
| response = requests.post(API_URL, headers=headers, json=payload) | |
| return response.content | |
| def get_answer_ai(text): | |
| global AI | |
| try: | |
| response = AI.send_message(text) | |
| return response.text | |
| except : | |
| AI=create_chat_session() | |
| response = AI.send_message(text) | |
| return response.text | |
| with gr.Blocks() as demo: # Use gr.Blocks to wrap the entire interface | |
| with gr.Column(): | |
| model_choices = gr.Dropdown( | |
| choices=[ | |
| "asg2024/vits-ar-sa", | |
| "asg2024/vits-ar-sa-huba", | |
| "asg2024/vits-ar-sa-ms", | |
| "asg2024/vits-ar-sa-magd", | |
| "asg2024/vits-ar-sa-fahd", | |
| ], | |
| label="اختر النموذج", | |
| value="asg2024/vits-ar-sa", | |
| ) | |
| with gr.Tab("Chat AI "): | |
| gr.Markdown("## AI: محادثة صوتية بالذكاء الاصطناعي باللهجة السعودية") | |
| with gr.Row(): # Arrange input/output components side-by-side | |
| with gr.Column(): | |
| text_input = gr.Textbox(label="أدخل أي نص") | |
| user_audio = gr.Audio(label="صوتك") | |
| with gr.Row(): | |
| btn = gr.Button("إرسال") | |
| btn_ai_only = gr.Button("توليد رد الذكاء الاصطناعي فقط") | |
| ai_audio = gr.Audio(label="رد الذكاء الاصطناعي الصوتي") | |
| ai_text = gr.Textbox(label="رد الذكاء الاصطناعي النصي") | |
| # Use a single button to trigger both functionalities | |
| def process_audio(text, model_choice, generate_user_audio=True): | |
| API_URL = f"https://api-inference.huggingface.co/models/{model_choice}" | |
| text_answer = get_answer_ai(text) | |
| text_answer = remove_extra_spaces(text_answer) | |
| data_ai = genrate_speech(text_answer,model_choice)#query(text_answer, API_URL) | |
| if generate_user_audio: # Generate user audio if needed | |
| data_user =genrate_speech(text,model_choice)# query(text, API_URL) | |
| return data_user, data_ai, text_answer | |
| else: | |
| return data_ai # Return None for user_audio | |
| btn.click( | |
| process_audio, # Call the combined function | |
| inputs=[text_input, model_choices], | |
| outputs=[user_audio, ai_audio, ai_text], | |
| ) | |
| # Additional button to generate only AI audio | |
| btn_ai_only.click( | |
| lambda text, model_choice: process_audio(text, model_choice, False), | |
| inputs=[text_input, model_choices], | |
| outputs=[ai_audio], | |
| ) | |
| with gr.Tab("Live "): | |
| gr.Markdown("## VITS: تحويل النص إلى كلام") | |
| with gr.Row(): | |
| text_input = gr.Textbox(label="أدخل النص هنا") | |
| speaker_id_input = gr.Number(label="معرّف المتحدث (اختياري)", interactive=True) | |
| generate_button = gr.Button("توليد وتشغيل الصوت") | |
| audio_player = gr.Audio(label="أ audio",streaming=True) | |
| # Update the event binding | |
| generate_button.click(generate_audio, inputs=[text_input,model_choice], outputs=audio_player) | |
| if __name__ == "__main__": | |
| demo.launch() | |