import os import numpy as np import gradio as gr import requests from genai_chat_ai import AI,create_chat_session import torch from typing import Any, Callable, Optional, Tuple, Union,Iterator import numpy as np import torch.nn as nn # Import the missing module import noisereduce as nr def remove_noise_nr(audio_data,sr=16000): """يزيل الضوضاء باستخدام مكتبة noisereduce.""" reduced_noise = nr.reduce_noise(y=audio_data, sr=sr) return reduced_noise def _inference_forward_stream( self, input_ids: Optional[torch.Tensor] = None, attention_mask: Optional[torch.Tensor] = None, speaker_embeddings: Optional[torch.Tensor] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, padding_mask: Optional[torch.Tensor] = None, chunk_size: int = 32, # Chunk size for streaming output ) -> Iterator[torch.Tensor]: """Generates speech waveforms in a streaming fashion.""" if attention_mask is not None: padding_mask = attention_mask.unsqueeze(-1).float() else: padding_mask = torch.ones_like(input_ids).unsqueeze(-1).float() text_encoder_output = self.text_encoder( input_ids=input_ids, padding_mask=padding_mask, attention_mask=attention_mask, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) hidden_states = text_encoder_output[0] if not return_dict else text_encoder_output.last_hidden_state hidden_states = hidden_states.transpose(1, 2) input_padding_mask = padding_mask.transpose(1, 2) prior_means = text_encoder_output[1] if not return_dict else text_encoder_output.prior_means prior_log_variances = text_encoder_output[2] if not return_dict else text_encoder_output.prior_log_variances if self.config.use_stochastic_duration_prediction: log_duration = self.duration_predictor( hidden_states, input_padding_mask, speaker_embeddings, reverse=True, noise_scale=self.noise_scale_duration, ) else: log_duration = self.duration_predictor(hidden_states, input_padding_mask, speaker_embeddings) length_scale = 1.0 / self.speaking_rate duration = torch.ceil(torch.exp(log_duration) * input_padding_mask * length_scale) predicted_lengths = torch.clamp_min(torch.sum(duration, [1, 2]), 1).long() # Create a padding mask for the output lengths of shape (batch, 1, max_output_length) indices = torch.arange(predicted_lengths.max(), dtype=predicted_lengths.dtype, device=predicted_lengths.device) output_padding_mask = indices.unsqueeze(0) < predicted_lengths.unsqueeze(1) output_padding_mask = output_padding_mask.unsqueeze(1).to(input_padding_mask.dtype) # Reconstruct an attention tensor of shape (batch, 1, out_length, in_length) attn_mask = torch.unsqueeze(input_padding_mask, 2) * torch.unsqueeze(output_padding_mask, -1) batch_size, _, output_length, input_length = attn_mask.shape cum_duration = torch.cumsum(duration, -1).view(batch_size * input_length, 1) indices = torch.arange(output_length, dtype=duration.dtype, device=duration.device) valid_indices = indices.unsqueeze(0) < cum_duration valid_indices = valid_indices.to(attn_mask.dtype).view(batch_size, input_length, output_length) padded_indices = valid_indices - nn.functional.pad(valid_indices, [0, 0, 1, 0, 0, 0])[:, :-1] attn = padded_indices.unsqueeze(1).transpose(2, 3) * attn_mask # Expand prior distribution prior_means = torch.matmul(attn.squeeze(1), prior_means).transpose(1, 2) prior_log_variances = torch.matmul(attn.squeeze(1), prior_log_variances).transpose(1, 2) prior_latents = prior_means + torch.randn_like(prior_means) * torch.exp(prior_log_variances) * self.noise_scale latents = self.flow(prior_latents, output_padding_mask, speaker_embeddings, reverse=True) spectrogram = latents * output_padding_mask for i in range(0, spectrogram.size(-1), chunk_size): with torch.no_grad(): wav=self.decoder(spectrogram[:,:,i : i + chunk_size] ,speaker_embeddings) yield wav.squeeze().cpu().numpy() api_key = os.environ.get("Id_mode_vits") headers = {"Authorization": f"Bearer {api_key}"} from transformers import AutoTokenizer,VitsModel import torch models= {} tokenizer = AutoTokenizer.from_pretrained("wasmdashai/vits-ar-sa-huba",token=api_key) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") def get_model(name_model): global models if name_model in models: return models[name_model] models[name_model]=VitsModel.from_pretrained(name_model,token=api_key).to(device) models[name_model].decoder.apply_weight_norm() # torch.nn.utils.weight_norm(self.decoder.conv_pre) # torch.nn.utils.weight_norm(self.decoder.conv_post) for flow in models[name_model].flow.flows: torch.nn.utils.weight_norm(flow.conv_pre) torch.nn.utils.weight_norm(flow.conv_post) return models[name_model] def genrate_speech(text,name_model): inputs=tokenizer(text,return_tensors="pt") model=get_model(name_model) with torch.no_grad(): wav=model( input_ids= inputs.input_ids.to(device), attention_mask=inputs.attention_mask.to(device), speaker_id=0 ).waveform.cpu().numpy().reshape(-1) return model.config.sampling_rate,wav def generate_audio(text,name_model,speaker_id=None): inputs = tokenizer(text, return_tensors="pt")#.input_ids speaker_embeddings = None model=get_model(name_model) #torch.cuda.empty_cache() with torch.no_grad(): for chunk in _inference_forward_stream(model,input_ids=inputs.input_ids,attention_mask=inputs.attention_mask,speaker_embeddings= speaker_embeddings,chunk_size=256): yield 16000,chunk#.squeeze().cpu().numpy()#.astype(np.int16).tobytes() def generate_audio_ai(text,name_model): text_answer = get_answer_ai(text) text_answer = remove_extra_spaces(text_answer) inputs = tokenizer(text_answer, return_tensors="pt")#.input_ids speaker_embeddings = None model=get_model(name_model) #torch.cuda.empty_cache() with torch.no_grad(): for chunk in _inference_forward_stream(model,input_ids=inputs.input_ids,attention_mask=inputs.attention_mask,speaker_embeddings= speaker_embeddings,chunk_size=256): yield 16000,remove_noise_nr(chunk)#.cpu().numpy().squeeze()#.astype(np.int16).tobytes() def remove_extra_spaces(text): return ' '.join(text.split()) def query(text,API_URL): payload={"inputs": text} response = requests.post(API_URL, headers=headers, json=payload) return response.content def get_answer_ai(text): global AI try: response = AI.send_message(text) return response.text except : AI=create_chat_session() response = AI.send_message(text) return response.text def get_answer_ai_stream(text): #if session_ai is None: global AI try: response = AI.send_message(text,stream=True) return response except : AI=create_chat_session() response = AI.send_message(text,stream=True) return response def t2t(text): return get_answer_ai(text) def t2tstream(text): st='' response=get_answer_ai_stream(text) for chk in response: st+=chk.text yield st #return get_answer_ai(text) import gradio as gr import os import plotly.express as px # Chatbot demo with multimodal input (text, markdown, LaTeX, code blocks, image, audio, & video). Plus shows support for streaming text. def random_plot(): df = px.data.iris() fig = px.scatter(df, x="sepal_width", y="sepal_length", color="species", size='petal_length', hover_data=['petal_width']) return fig def print_like_dislike(x: gr.LikeData): print(x.index, x.value, x.liked) from gradio_multimodalchatbot import MultimodalChatbot from gradio.data_classes import FileData import tempfile import soundfile as sf from gradio_client import Client def add_message(history, message): for x in message["files"]: history.append(((x,), None)) if message["text"] is not None: history.append((message["text"], None)) response_audio = genrate_speech(message["text"],'wasmdashai/vits-ar-sa-huba') history.append((gr.Audio(response_audio,scale=1,streaming=True),None)) return history def bot(history,message): if message["text"] is not None: txt_ai=get_answer_ai(message["text"] ) history[-1][1]=txt_ai#((None,txt_ai)) response_audio = genrate_speech(txt_ai,'wasmdashai/vits-ar-sa-huba') history.append((None,gr.Audio(response_audio,scale=1,streaming=True))) return history, gr.MultimodalTextbox(value=None, interactive=False) fig = random_plot() # متغير لتخزين سجل المحادثة with gr.Blocks() as demo: # Use gr.Blocks to wrap the entire interface with gr.Tab("ChatBot "): chatbot = gr.Chatbot( elem_id="chatbot", bubble_full_width=False, scale=1, ) chat_input = gr.MultimodalTextbox(interactive=True, file_count="single", placeholder="Enter message or upload file...", show_label=False,) chat_msg = chat_input.submit(add_message, [chatbot, chat_input], [chatbot]) bot_msg = chat_msg.then(bot, [chatbot, chat_input], [chatbot, chat_input], api_name="bot_response") bot_msg.then(lambda: gr.MultimodalTextbox(interactive=True), None, [chat_input]) chatbot.like(print_like_dislike, None, None) # audio.change(chatbot_fn, [txt, audio], chatbot) with gr.Tab("Chat AI "): gr.Markdown("## AI: محادثة صوتية بالذكاء الاصطناعي باللهجة السعودية") with gr.Row(): # Arrange input/output components side-by-side with gr.Column(): text_input = gr.Textbox(label="أدخل أي نص") with gr.Column(): model_choices = gr.Dropdown( choices=[ "wasmdashai/vits-ar-sa", "wasmdashai/vits-ar-sa-huba", "wasmdashai/vits-ar-sa-ms", "wasmdashai/vits-ar-sa-magd", "wasmdashai/vits-ar-sa-fahd", ], label="اختر النموذج", value="wasmdashai/vits-ar-sa-huba", ) with gr.Row(): btn = gr.Button("إرسال") btn_ai_only = gr.Button("توليد رد الذكاء الاصطناعي فقط") with gr.Row(): user_audio = gr.Audio(label="صوت المدخل") ai_audio = gr.Audio(label="رد AI الصوتي") ai_text = gr.Textbox(label="رد AI النصي") ai_audio2 = gr.Audio(label="2رد AI الصوتي",streaming=True) # Use a single button to trigger both functionalities def process_audio(text, model_choice, generate_user_audio=True): API_URL = f"https://api-inference.huggingface.co/models/{model_choice}" text_answer = get_answer_ai(text) text_answer = remove_extra_spaces(text_answer) data_ai = genrate_speech(text_answer,model_choice)#query(text_answer, API_URL) if generate_user_audio: # Generate user audio if needed data_user =genrate_speech(text,model_choice)# query(text, API_URL) return data_user, data_ai, text_answer else: return data_ai # Return None for user_audio btn.click( process_audio, # Call the combined function inputs=[text_input, model_choices], outputs=[user_audio, ai_audio, ai_text], ) # btn_ai_only.click( generate_audio_ai, inputs=[text_input, model_choices], outputs=[ai_audio2], ) with gr.Tab("Live "): gr.Markdown("## VITS: تحويل النص إلى كلام") with gr.Row(): speaker_id_input = gr.Number(label="معرّف المتحدث (اختياري)", interactive=True) with gr.Column(): model_choices2 = gr.Dropdown( choices=[ "wasmdashai/vits-ar-sa", "wasmdashai/vits-ar-sa-huba", "wasmdashai/vits-ar-sa-ms", "wasmdashai/vits-ar-sa-magd", "wasmdashai/vits-ar-sa-fahd", ], label="اختر النموذج", value="wasmdashai/vits-ar-sa-huba", ) text_input = gr.Textbox(label="أدخل النص هنا") generate_button = gr.Button("توليد وتشغيل الصوت") audio_player = gr.Audio(label="أ audio",streaming=True) # Update the event binding generate_button.click(generate_audio, inputs=[text_input,model_choices2], outputs=audio_player) with gr.Tab("T2T "): gr.Markdown("## T2T") text_inputk = gr.Textbox(label="أدخل النص هنا") text_out = gr.Textbox() text_inputk.submit(t2t, [text_inputk], [text_out]) with gr.Tab("T2TSTREAM "): gr.Markdown("## T2TSTREAM ") text_inputk2 = gr.Textbox(label="أدخل النص هنا") text_out1 = gr.Textbox() text_inputk2.submit(t2tstream, [text_inputk2], [text_out1]) if __name__ == "__main__": demo.launch(show_error=True)