Spaces:
Sleeping
Sleeping
| from flask import Flask, jsonify, request | |
| from flask_cors import CORS | |
| from datasets import load_dataset, Audio | |
| import pandas as pd | |
| import os | |
| import re | |
| import threading | |
| from dotenv import load_dotenv | |
| from datetime import datetime | |
| import pytz | |
| from utils.load_csv import upload_csv, download_csv | |
| from utils.generate_results import generateResults | |
| from utils.generate_box_plot import box_plot_data | |
| from utils.model_validity import is_valid_asr_model | |
| from utils.send_email import send_email | |
| load_dotenv() | |
| # Set the cache directory for Hugging Face datasets | |
| os.environ["HF_HOME"] = "/tmp/huggingface" | |
| # ASR_model = "openai/whisper-tiny" # Replace with your ASR model | |
| #Check cpu score | |
| import timeit | |
| cpu_score = timeit.timeit("sum(range(1000000))", number=5) | |
| print(f"🧠 CPU benchmark score: {cpu_score:.2f}") | |
| # Vars for stopping running transcription: | |
| stop_transcription_flag = {"active": False} | |
| current_thread = {"thread": None} | |
| job_status = { | |
| "running": False, | |
| "model": None, | |
| "completed": None, | |
| "%_completed" : None, | |
| "message": "No Transcription in progress", | |
| "total": None, | |
| "error": None, | |
| "error_trace": None | |
| } | |
| csv_path = "test.csv" | |
| # csv_transcript = f'test_with_{ASR_model.replace("/", "_")}.csv' | |
| # csv_result = f'test_with_{ASR_model.replace("/","_")}_WER.csv' | |
| df = pd.read_csv(csv_path) | |
| print(f"CSV Loaded with {len(df)} rows") | |
| # Load dataset without decoding audio (required!) | |
| dataset = load_dataset("satyamr196/asr_fairness_audio", split="train") | |
| # dataset = dataset.with_format("python", decode_audio=False) | |
| dataset = dataset.cast_column("audio", Audio(decode=False)) | |
| def generateTranscript(ASR_model): | |
| import os | |
| import time | |
| import tqdm | |
| import pandas as pd | |
| import soundfile as sf | |
| from transformers import pipeline | |
| import pytz | |
| from datetime import datetime | |
| import traceback | |
| stop_transcription_flag["active"] = False | |
| job_status.update({ | |
| "running": True, | |
| "model": ASR_model, | |
| "completed": 0, | |
| "%_completed" : 0, | |
| "message": "Starting transcription...", | |
| "total": None, | |
| "error": None, # Clear any old error state | |
| "error_trace": None | |
| }) | |
| csv_transcript = f'test_with_{ASR_model.replace("/", "_")}.csv' | |
| csv_result = f'test_with_{ASR_model.replace("/","_")}_WER.csv' | |
| try: | |
| # Check if transcript already exists | |
| df_transcript = download_csv(csv_transcript) | |
| if(df_transcript is None): | |
| print(f"CSV not found in the dataset repo. Proceeding to generate transcript.") | |
| # Get current IST time | |
| ist = pytz.timezone("Asia/Kolkata") | |
| current_time = datetime.now(ist).strftime("%H:%M:%S %d %b %Y") | |
| send_email( | |
| to_email="raianand.1991@gmail.com", | |
| subject=f"Audit Started for ASR model {ASR_model}", | |
| message_body=f"Audit started at {current_time} for ASR model {ASR_model}.", | |
| bcc_emails=["pedanticsatoshi0@getsafesurfer.com"] | |
| ) | |
| else: | |
| print(f"Transcript already exists for previously submitted model. Skipping transcription.") | |
| job_status.update({ | |
| "running": False, | |
| "model": None, | |
| "completed": None, | |
| "%_completed" : None, | |
| "message": "No Transcription in progress", | |
| "total": None, | |
| "error": None, | |
| "error_trace": None | |
| }) | |
| return | |
| # # Load test.csv | |
| # df = pd.read_csv(csv_path) | |
| # print(f"CSV Loaded with {len(df)} rows") | |
| total = len(df) | |
| job_status["total"] = total | |
| # Initialize ASR pipeline | |
| pipe = pipeline("automatic-speech-recognition", model=ASR_model) | |
| # Column with filenames in the CSV | |
| filename_column = df.columns[0] | |
| df[filename_column] = df[filename_column].str.strip().str.lower() | |
| # Build map from filename -> dataset sample (without decoding audio) | |
| # print("Creating dataset map from filenames...") | |
| # dataset = dataset.with_format("python", decode_audio=False) | |
| dataset_map = { | |
| os.path.basename(sample["audio"]["path"]).lower(): sample | |
| for sample in dataset #uncomment this line to use the dataset | |
| } | |
| transcripts = [] | |
| rtfx_score = [] | |
| for idx, row in tqdm.tqdm(df.iterrows(), total=len(df)): | |
| #------------------------------------------ | |
| if stop_transcription_flag.get("active", False): | |
| job_status.update({ | |
| "running": False, | |
| "model": None, | |
| "message": "Transcription cancelled by user.", | |
| "error": None, | |
| "error_trace": None, | |
| "completed": idx, | |
| "%_completed": (idx + 1) * 100 / total, | |
| "total": total | |
| }) | |
| print("🛑 Transcription cancelled by user request.") | |
| return | |
| #---------------------------------------------------------- | |
| filename = row[filename_column] + ".wav" | |
| if filename in dataset_map: | |
| sample = dataset_map[filename] | |
| try: | |
| # Decode audio only when needed | |
| file_path = sample["audio"]["path"] | |
| audio_array, sample_rate = sf.read(file_path) | |
| start_time = time.time() | |
| result = pipe({"array": audio_array, "sampling_rate": sample_rate}) | |
| end_time = time.time() | |
| transcript = result["text"] | |
| duration = len(audio_array) / sample_rate | |
| rtfx = (end_time - start_time) / duration if duration > 0 else 0 | |
| transcripts.append(transcript) | |
| rtfx_score.append(rtfx) | |
| print(f"✅ {filename}: RTFX = {rtfx:.2f}, Progress: {(idx + 1) * 100 / total} %") | |
| except Exception as e: | |
| print(f"❌ Error with {filename}: {e}") | |
| transcripts.append("") | |
| rtfx_score.append(0) | |
| else: | |
| print(f"❌ File not found in dataset: {filename}") | |
| transcripts.append("") | |
| rtfx_score.append(0) | |
| job_status["completed"] = idx + 1 | |
| job_status["message"] = f"Processing {idx + 1}/{total}" | |
| job_status["%_completed"] = (idx + 1) * 100 / total | |
| # Save results | |
| df["transcript"] = transcripts | |
| df["rtfx"] = rtfx_score | |
| job_status.update({ | |
| "running": False, | |
| "model": None, | |
| "completed": None, | |
| "%_completed" : None, | |
| "message": "No Transcription in progress", | |
| "total": None, | |
| "error": None, # Clear any old error state | |
| "error_trace": None | |
| }) | |
| # df.to_csv(csv_result, index=False) | |
| upload_csv(df, csv_transcript) | |
| print(f"\n📄 Transcripts saved to: {csv_transcript}") | |
| except Exception as e: | |
| tb = traceback.format_exc() | |
| print(f"❌ Exception during transcription:\n{tb}") | |
| job_status.update({ | |
| "running": False, | |
| "model": None, | |
| "completed": None, | |
| "%_completed": None, | |
| "message": f"Transcription failed: {str(e)}", | |
| "total": None, | |
| "error": str(e), # Add error string | |
| "error_trace": tb[:1000], # Optionally, send part or all of the traceback | |
| }) | |
| # Get current IST time | |
| ist = pytz.timezone("Asia/Kolkata") | |
| current_time = datetime.now(ist).strftime("%H:%M:%S %d %b %Y") | |
| send_email( | |
| to_email="satyamrahangdale196@kgpian.iitkgp.ac.in", | |
| subject=f"{ASR_model} : Error Occured while generating Transcript", | |
| message_body=f"Error : {str(e)} \n Error trace : {tb[:1000]}", | |
| bcc_emails=["pedanticsatoshi0@getsafesurfer.com"] | |
| ) | |
| # generateTranscript(ASR_model) | |
| # print(generate_results(ASR_model)) | |
| # print(box_plot_data(ASR_model)) | |
| # ! FLASK SERVER CODE :- | |
| app = Flask(__name__) | |
| CORS(app,origins="*") | |
| def home(): | |
| return jsonify( | |
| { | |
| "message": "Welcome to the ASR Server! Please use the App link to access ASR-FairBench application.", | |
| "App link": "https://huggingface.co/spaces/satyamr196/ASR-FairBench" | |
| } | |
| ) | |
| def asr_models(): | |
| models = [ | |
| "DeepSpeech", | |
| "Wav2Vec", | |
| "Jasper", | |
| "QuartzNet", | |
| "Conformer", | |
| "whisper", | |
| "Kaldi", | |
| "SpeechBrain", | |
| ] | |
| def background_job(): | |
| generateTranscript("openai/whisper-tiny") | |
| # Start the background job in a separate thread | |
| threading.Thread(target=background_job).start() | |
| print("Transcription started in background") | |
| return jsonify({"asr_models": models}) | |
| def get_status(): | |
| return jsonify(job_status) | |
| def stop_transcription(): | |
| if job_status["running"] == False : | |
| return jsonify({"message": "No transcription is running currently"}) | |
| admin_pass = os.environ.get('ADMIN_PASSWORD') ; | |
| passkey = request.args.get('passkey', default="", type=str) | |
| if passkey == admin_pass : | |
| global stop_transcription_flag | |
| stop_transcription_flag["active"] = True | |
| return jsonify({"message": "Stop requested. The running transcription will terminate soon or as soon as possible."}) | |
| else : | |
| return jsonify({"message": f"{passkey} : Wrong Admin Password(This feature is only for platform admin, users should not try to use it)"}) | |
| def api(): | |
| model = request.args.get('ASR_model', default="", type=str) | |
| # model = re.sub(r"\s+", "", model) | |
| model = re.sub(r"[^a-zA-Z0-9/_\-.]", "", model) # sanitize the model ID | |
| csv_transcript = f'test_with_{model.replace("/","_")}.csv' | |
| csv_result = f'test_with_{model.replace("/","_")}_WER.csv' | |
| if not model: | |
| return jsonify({'error': 'ASR_model parameter is required'}) | |
| elif not is_valid_asr_model(model): | |
| return jsonify({'message': 'Invalid ASR model ID, please check if your model is available on Hugging Face'}), 400 # Return 400 if model is invalid | |
| elif (download_csv(csv_transcript) is not None): | |
| # Load the CSV file from the Hugging Face Hub | |
| Results = generateResults(model) | |
| wer_Gender, wer_SEG, wer_Ethnicity, wer_Language = box_plot_data(model) | |
| return jsonify({ | |
| 'message': f'{model} has been evaluated and results are shown below', | |
| 'endpoint': "/api", | |
| 'model': model, | |
| 'greet' : "Welcome to ASR-FairBench", | |
| **Results, | |
| 'wer_Gender' : wer_Gender, | |
| 'wer_SEG' : wer_SEG, | |
| 'wer_Ethnicity' : wer_Ethnicity, | |
| 'wer_Language' : wer_Language | |
| }) | |
| else: | |
| # Check if `generateTranscript` is already running for this model | |
| if job_status["running"] : | |
| return jsonify({ | |
| 'message': f'Transcription for previously submitted ASR model is in progress. Please wait for it to complete. Then submit your model again.', | |
| 'status': job_status | |
| }) | |
| # Check if there is an error to return | |
| if job_status.get("error"): | |
| error_msg = job_status["error"] | |
| error_trace = job_status.get("error_trace") | |
| # Clear error info immediately after reading | |
| job_status["error"] = None | |
| job_status["error_trace"] = None | |
| return jsonify({ | |
| 'message': f"Transcription failed (Kindly do not run this model again, untill we fix the issue): {error_msg}", | |
| 'error': error_msg, | |
| 'error_trace': error_trace, | |
| 'status': job_status, | |
| }) | |
| response = jsonify({ | |
| 'message': f'Given Model {model} is being Evaluated, Please come back after a few hours and run the query again. Usually, it completes within an hour' | |
| }) | |
| # Run `generateTranscript(model)` in a separate thread | |
| # Start the transcript generation in a separate thread | |
| # thread = threading.Thread(target=generateTranscript, args=(model,), daemon=True) | |
| thread = threading.Thread(target=generateTranscript, args=(model,)) | |
| current_thread["thread"] = thread | |
| thread.start() | |
| return response | |
| def insert_document(): | |
| try: | |
| data = request.json # Get JSON data from request | |
| model_name = data.get("Model") | |
| csv_filename = "leaderboard.csv" | |
| # Try to download the leaderboard CSV from HF dataset | |
| df = download_csv(csv_filename) | |
| if df is None: | |
| # If not found, create a new DataFrame with this single entry | |
| df = pd.DataFrame([data]) | |
| else: | |
| # Check if the model already exists in leaderboard | |
| if model_name in df["Model"].values: | |
| return jsonify({"exists": True}) | |
| # Append the new row | |
| df = pd.concat([df, pd.DataFrame([data])], ignore_index=True) | |
| # Upload the updated CSV back to the Hugging Face dataset | |
| success = upload_csv(df, csv_filename) | |
| if not success: | |
| return jsonify({"exists": "Error", "error": "Upload to Hugging Face failed"}) | |
| return jsonify({"exists": False, "message": "Data inserted into leaderboard successfully!"}) | |
| except Exception as e: | |
| return jsonify({"exists": "Error", "error": str(e)}) | |
| # Fetch all documents | |
| def fetch_documents(): | |
| try: | |
| csv_filename = "leaderboard.csv" | |
| df = download_csv(csv_filename) | |
| if df is None: | |
| return jsonify({"error": "Leaderboard CSV not found in Hugging Face dataset."}) | |
| documents = df.to_dict(orient="records") # Convert DataFrame to list of dicts | |
| return jsonify({"data": documents}) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}) | |
| # if __name__ == "__main__": | |
| # app.run(debug=True) |