import os import numpy as np import gradio as gr import requests from genai_chat_ai import AI,create_chat_session import torch from typing import Any, Callable, Optional, Tuple, Union,Iterator import numpy as np import torch.nn as nn # Import the missing module import noisereduce as nr def remove_noise_nr(audio_data,sr=16000): """يزيل الضوضاء باستخدام مكتبة noisereduce.""" reduced_noise = nr.reduce_noise(y=audio_data, sr=sr) return reduced_noise def _inference_forward_stream( self, input_ids: Optional[torch.Tensor] = None, attention_mask: Optional[torch.Tensor] = None, speaker_embeddings: Optional[torch.Tensor] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, padding_mask: Optional[torch.Tensor] = None, chunk_size: int = 32, # Chunk size for streaming output ) -> Iterator[torch.Tensor]: """Generates speech waveforms in a streaming fashion.""" if attention_mask is not None: padding_mask = attention_mask.unsqueeze(-1).float() else: padding_mask = torch.ones_like(input_ids).unsqueeze(-1).float() text_encoder_output = self.text_encoder( input_ids=input_ids, padding_mask=padding_mask, attention_mask=attention_mask, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) hidden_states = text_encoder_output[0] if not return_dict else text_encoder_output.last_hidden_state hidden_states = hidden_states.transpose(1, 2) input_padding_mask = padding_mask.transpose(1, 2) prior_means = text_encoder_output[1] if not return_dict else text_encoder_output.prior_means prior_log_variances = text_encoder_output[2] if not return_dict else text_encoder_output.prior_log_variances if self.config.use_stochastic_duration_prediction: log_duration = self.duration_predictor( hidden_states, input_padding_mask, speaker_embeddings, reverse=True, noise_scale=self.noise_scale_duration, ) else: log_duration = self.duration_predictor(hidden_states, input_padding_mask, speaker_embeddings) length_scale = 1.0 / self.speaking_rate duration = torch.ceil(torch.exp(log_duration) * input_padding_mask * length_scale) predicted_lengths = torch.clamp_min(torch.sum(duration, [1, 2]), 1).long() # Create a padding mask for the output lengths of shape (batch, 1, max_output_length) indices = torch.arange(predicted_lengths.max(), dtype=predicted_lengths.dtype, device=predicted_lengths.device) output_padding_mask = indices.unsqueeze(0) < predicted_lengths.unsqueeze(1) output_padding_mask = output_padding_mask.unsqueeze(1).to(input_padding_mask.dtype) # Reconstruct an attention tensor of shape (batch, 1, out_length, in_length) attn_mask = torch.unsqueeze(input_padding_mask, 2) * torch.unsqueeze(output_padding_mask, -1) batch_size, _, output_length, input_length = attn_mask.shape cum_duration = torch.cumsum(duration, -1).view(batch_size * input_length, 1) indices = torch.arange(output_length, dtype=duration.dtype, device=duration.device) valid_indices = indices.unsqueeze(0) < cum_duration valid_indices = valid_indices.to(attn_mask.dtype).view(batch_size, input_length, output_length) padded_indices = valid_indices - nn.functional.pad(valid_indices, [0, 0, 1, 0, 0, 0])[:, :-1] attn = padded_indices.unsqueeze(1).transpose(2, 3) * attn_mask # Expand prior distribution prior_means = torch.matmul(attn.squeeze(1), prior_means).transpose(1, 2) prior_log_variances = torch.matmul(attn.squeeze(1), prior_log_variances).transpose(1, 2) prior_latents = prior_means + torch.randn_like(prior_means) * torch.exp(prior_log_variances) * self.noise_scale latents = self.flow(prior_latents, output_padding_mask, speaker_embeddings, reverse=True) spectrogram = latents * output_padding_mask for i in range(0, spectrogram.size(-1), chunk_size): with torch.no_grad(): wav=self.decoder(spectrogram[:,:,i : i + chunk_size] ,speaker_embeddings) yield wav.squeeze().cpu().numpy() api_key = os.environ.get("Id_mode_vits") headers = {"Authorization": f"Bearer {api_key}"} from transformers import AutoTokenizer,VitsModel import torch models= {} tokenizer = AutoTokenizer.from_pretrained("asg2024/vits-ar-sa-huba",token=api_key) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") def get_model(name_model): global models if name_model in models: return models[name_model] models[name_model]=VitsModel.from_pretrained(name_model,token=api_key).to(device) return models[name_model] def genrate_speech(text,name_model): inputs=tokenizer(text,return_tensors="pt") model=get_model(name_model) with torch.no_grad(): wav=model( input_ids= inputs.input_ids.to(device), attention_mask=inputs.attention_mask.to(device), speaker_id=0 ).waveform.cpu().numpy().reshape(-1) return model.config.sampling_rate,wav def generate_audio(text,name_model,speaker_id=None): inputs = tokenizer(text, return_tensors="pt")#.input_ids speaker_embeddings = None model=get_model(name_model) #torch.cuda.empty_cache() with torch.no_grad(): for chunk in _inference_forward_stream(model,input_ids=inputs.input_ids,attention_mask=inputs.attention_mask,speaker_embeddings= speaker_embeddings,chunk_size=256): yield 16000,chunk#.squeeze().cpu().numpy()#.astype(np.int16).tobytes() def generate_audio_ai(text,name_model): text_answer = get_answer_ai(text) text_answer = remove_extra_spaces(text_answer) inputs = tokenizer(text_answer, return_tensors="pt")#.input_ids speaker_embeddings = None model=get_model(name_model) #torch.cuda.empty_cache() with torch.no_grad(): for chunk in _inference_forward_stream(model,input_ids=inputs.input_ids,attention_mask=inputs.attention_mask,speaker_embeddings= speaker_embeddings,chunk_size=256): yield 16000,remove_noise_nr(chunk)#.cpu().numpy().squeeze()#.astype(np.int16).tobytes() def remove_extra_spaces(text): return ' '.join(text.split()) def query(text,API_URL): payload={"inputs": text} response = requests.post(API_URL, headers=headers, json=payload) return response.content def get_answer_ai(text): global AI try: response = AI.send_message(text) return response.text except : AI=create_chat_session() response = AI.send_message(text) return response.text chat_history = [] # متغير لتخزين سجل المحادثة def chatbot_fn(input_text, input_audio): global chat_history if input_text: chat_history.append((input_text, None)) # إضافة رسالة المستخدم response_text = get_answer_ai(input_text) response_audio = genrate_speech(response_text,'asg2024/vits-ar-sa-huba') elif input_audio: pass # chat_history.append((None, input_audio)) # إضافة رسالة صوتية للمستخدم # input_text = convert_speech_to_text(input_audio) # response_text = model.generate_response(input_text, chat_history) # response_audio = convert_text_to_speech(response_text) chat_history.append((None, response_audio)) # إضافة رد البوت return chat_history with gr.Blocks() as demo: # Use gr.Blocks to wrap the entire interface with gr.Tab("ChatBot "): chatbot = gr.Chatbot(label="محادثة") with gr.Row(): txt = gr.Textbox(label="أدخل رسالتك") audio = gr.Audio(sources="microphone", type="filepath") txt.change(chatbot_fn, [txt, audio], chatbot) audio.change(chatbot_fn, [txt, audio], chatbot) with gr.Tab("Chat AI "): gr.Markdown("## AI: محادثة صوتية بالذكاء الاصطناعي باللهجة السعودية") with gr.Row(): # Arrange input/output components side-by-side with gr.Column(): text_input = gr.Textbox(label="أدخل أي نص") with gr.Column(): model_choices = gr.Dropdown( choices=[ "asg2024/vits-ar-sa", "asg2024/vits-ar-sa-huba", "asg2024/vits-ar-sa-ms", "asg2024/vits-ar-sa-magd", "asg2024/vits-ar-sa-fahd", ], label="اختر النموذج", value="asg2024/vits-ar-sa-huba", ) with gr.Row(): btn = gr.Button("إرسال") btn_ai_only = gr.Button("توليد رد الذكاء الاصطناعي فقط") with gr.Row(): user_audio = gr.Audio(label="صوت المدخل") ai_audio = gr.Audio(label="رد AI الصوتي") ai_text = gr.Textbox(label="رد AI النصي") ai_audio2 = gr.Audio(label="2رد AI الصوتي",streaming=True) # Use a single button to trigger both functionalities def process_audio(text, model_choice, generate_user_audio=True): API_URL = f"https://api-inference.huggingface.co/models/{model_choice}" text_answer = get_answer_ai(text) text_answer = remove_extra_spaces(text_answer) data_ai = genrate_speech(text_answer,model_choice)#query(text_answer, API_URL) if generate_user_audio: # Generate user audio if needed data_user =genrate_speech(text,model_choice)# query(text, API_URL) return data_user, data_ai, text_answer else: return data_ai # Return None for user_audio btn.click( process_audio, # Call the combined function inputs=[text_input, model_choices], outputs=[user_audio, ai_audio, ai_text], ) # btn_ai_only.click( generate_audio_ai, inputs=[text_input, model_choices], outputs=[ai_audio2], ) with gr.Tab("Live "): gr.Markdown("## VITS: تحويل النص إلى كلام") with gr.Row(): speaker_id_input = gr.Number(label="معرّف المتحدث (اختياري)", interactive=True) with gr.Column(): model_choices2 = gr.Dropdown( choices=[ "asg2024/vits-ar-sa", "asg2024/vits-ar-sa-huba", "asg2024/vits-ar-sa-ms", "asg2024/vits-ar-sa-magd", "asg2024/vits-ar-sa-fahd", ], label="اختر النموذج", value="asg2024/vits-ar-sa-huba", ) text_input = gr.Textbox(label="أدخل النص هنا") generate_button = gr.Button("توليد وتشغيل الصوت") audio_player = gr.Audio(label="أ audio",streaming=True) # Update the event binding generate_button.click(generate_audio, inputs=[text_input,model_choices2], outputs=audio_player) if __name__ == "__main__": demo.launch()