iAura_1 / pipeline.py
akpande2's picture
Update pipeline.py
08c2b3f verified
"""
ULTRA-ROBUST CALL CENTER ANALYTICS
===================================
βœ… Multiple gender detection models with voting
βœ… Best STT model (Whisper Large-v3 + optimizations)
βœ… Enhanced for European accents
βœ… Robust pitch analysis with multiple methods
βœ… Production-grade accuracy
MODELS USED:
- STT: Whisper Large-v3 (best for accents)
- Gender: 3 models + voting system
- Age: Wav2Vec2 Large + validation
- Diarization: pyannote 3.1 (SOTA)
"""
from keybert import KeyBERT
from sentence_transformers import SentenceTransformer
import os
import sys
import logging
import torch
import librosa
import whisper
import numpy as np
import warnings
import json
import gc
from collections import Counter, defaultdict
from pyannote.audio import Pipeline
from transformers import (
pipeline,
Wav2Vec2Processor,
Wav2Vec2ForSequenceClassification,
AutoModelForAudioClassification,
AutoFeatureExtractor
)
from datetime import datetime
from scipy import signal as scipy_signal
from scipy.stats import mode as scipy_mode
import parselmouth
from parselmouth.praat import call
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
logging.getLogger("pyannote").setLevel(logging.ERROR)
logging.getLogger("transformers").setLevel(logging.ERROR)
warnings.filterwarnings("ignore")
class NumpyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.integer): return int(obj)
if isinstance(obj, np.floating): return float(obj)
if isinstance(obj, np.ndarray): return obj.tolist()
return super(NumpyEncoder, self).default(obj)
class UltraRobustCallAnalytics:
def __init__(self, hf_token=None, device=None):
# 1. DEFINE DEVICE FIRST (Move this up)
self.device = device if device else ("cuda" if torch.cuda.is_available() else "cpu")
print(f"πŸš€ Initializing ULTRA-ROBUST Analytics Engine on {self.device}...")
print("="*70)
# 2. NOW YOU CAN FLUSH MEMORY (Move this down)
self._flush_memory()
# ===== BEST STT MODEL: Whisper Large-v3 =====
try:
print(" β†’ Loading Whisper Large-v3 (BEST for accents)...")
self.stt_model = whisper.load_model("large-v3", device=self.device)
self.stt_model_name = "large-v3"
print(" βœ“ Whisper Large-v3 loaded")
except:
print(" ⚠ Falling back to Large-v2...")
try:
self.stt_model = whisper.load_model("large-v2", device=self.device)
self.stt_model_name = "large-v2"
print(" βœ“ Whisper Large-v2 loaded")
except:
print(" ⚠ Final fallback to Medium...")
self.stt_model = whisper.load_model("medium", device=self.device)
self.stt_model_name = "medium"
print(" βœ“ Whisper Medium loaded")
# ===== DIARIZATION =====
self.diarization_pipeline = None
if hf_token:
print(f" β†’ Attempting to load Pyannote with token starting: {hf_token[:4]}...")
# Universal Loader: Tries 'token' (New) then 'use_auth_token' (Old)
try:
# Attempt 1: New Syntax
self.diarization_pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1",
token=hf_token
).to(torch.device(self.device))
print(" βœ“ Diarization loaded (New Syntax)")
except TypeError:
# Attempt 2: Old Syntax (Fallback)
print(" ⚠ New syntax failed, trying legacy syntax...")
try:
self.diarization_pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1",
use_auth_token=hf_token
).to(torch.device(self.device))
print(" βœ“ Diarization loaded (Legacy Syntax)")
except Exception as e:
print(f" ❌ CRITICAL PYANNOTE ERROR (Legacy): {e}")
except Exception as e:
print(f" ❌ CRITICAL PYANNOTE ERROR: {e}")
# ===== EMOTION CLASSIFIER =====
print(" β†’ Loading emotion classifier...")
self.emotion_classifier = pipeline(
"audio-classification",
model="superb/wav2vec2-base-superb-er",
device=0 if self.device == "cuda" else -1
)
print(" βœ“ Emotion classifier loaded")
# ===== MULTIPLE GENDER MODELS FOR VOTING =====
print("\n β†’ Loading MULTIPLE gender detection models...")
self.gender_models = {}
# Model 1: Age-Gender (Primary)
try:
print(" Loading Gender Model 1: audeering/wav2vec2-large...")
self.ag_model_name = "audeering/wav2vec2-large-robust-24-ft-age-gender"
self.ag_processor = Wav2Vec2Processor.from_pretrained(self.ag_model_name)
self.ag_model = Wav2Vec2ForSequenceClassification.from_pretrained(self.ag_model_name)
self.ag_model.to(self.device).eval()
self.gender_models['audeering'] = {
'processor': self.ag_processor,
'model': self.ag_model
}
print(" βœ“ Model 1 loaded")
except Exception as e:
print(f" βœ— Model 1 failed: {e}")
# Model 2: Alefiury Gender Classifier
try:
print(" Loading Gender Model 2: alefiury/wav2vec2-large-xlsr-53-gender...")
model2_name = "alefiury/wav2vec2-large-xlsr-53-gender-recognition-librispeech"
processor2 = AutoFeatureExtractor.from_pretrained(model2_name)
model2 = AutoModelForAudioClassification.from_pretrained(model2_name)
model2.to(self.device).eval()
self.gender_models['alefiury'] = {
'processor': processor2,
'model': model2
}
print(" βœ“ Model 2 loaded")
except Exception as e:
print(f" βœ— Model 2 failed: {e}")
# Model 3: MIT Gender Detection
try:
print(" Loading Gender Model 3: MIT/ast-finetuned-speech-commands...")
model3_name = "MIT/ast-finetuned-speech-commands-v2"
processor3 = AutoFeatureExtractor.from_pretrained(model3_name)
model3 = AutoModelForAudioClassification.from_pretrained(model3_name)
model3.to(self.device).eval()
self.gender_models['mit'] = {
'processor': processor3,
'model': model3
}
print(" βœ“ Model 3 loaded")
except Exception as e:
print(f" βœ— Model 3 failed: {e}")
print(f" βœ“ Loaded {len(self.gender_models)} gender detection models")
print("\n" + "="*70)
print("βœ… Engine initialized successfully")
print("="*70 + "\n")
print(" β†’ Loading KeyBERT for keyword extraction...")
try:
self.keyword_model = KeyBERT('all-MiniLM-L6-v2')
print(" βœ“ Keyword extractor loaded")
except Exception as e:
print(f" ⚠ Keyword model failed: {e}")
self.keyword_model = None
print(" β†’ Loading zero-shot topic classifier...")
try:
self.topic_classifier = pipeline(
"zero-shot-classification",
model="facebook/bart-large-mnli",
device=0 if self.device == "cuda" else -1
)
self.topic_labels = [
"billing_payment",
"technical_support",
"product_inquiry",
"complaint_issue",
"account_management",
"sales_marketing",
"service_cancellation",
"feedback_survey",
"appointment_scheduling",
"general_inquiry"
]
print(" βœ“ Topic classifier loaded")
except Exception as e:
print(f" ⚠ Topic classifier failed: {e}")
self.topic_classifier = None
def process_call(self, audio_path):
"""Main processing with maximum robustness"""
if not os.path.exists(audio_path):
raise FileNotFoundError(f"Audio file not found: {audio_path}")
self._flush_memory()
print(f"πŸ“ Processing: {audio_path}")
print("="*70)
# Load and preprocess
wav, sr = librosa.load(audio_path, sr=16000, mono=True)
wav = wav.astype(np.float32)
# Audio enhancement for call center quality
wav = self._enhance_audio_for_callcenter(wav, sr)
duration = len(wav) / sr
print(f" βœ“ Audio loaded: {duration:.1f}s @ {sr}Hz")
# Enhanced diarization
print("\n β†’ Running enhanced diarization...")
segments = self._run_enhanced_diarization(wav, sr, audio_path)
print(f" βœ“ Found {len(set(s['speaker'] for s in segments))} speakers, {len(segments)} segments")
# Smart merging
merged = self._merge_segments_smart(segments, min_gap=0.25)
print(f" βœ“ Merged to {len(merged)} segments")
# Process segments
results = []
spk_audio_buffer = defaultdict(list)
pad = int(0.1 * sr) # Increased padding
print("\n β†’ Transcribing with Whisper Large-v3...")
for i, seg in enumerate(merged):
seg_duration = seg['end'] - seg['start']
if seg_duration < 0.1:
continue
start_idx = max(0, int(seg['start'] * sr) - pad)
end_idx = min(len(wav), int(seg['end'] * sr) + pad)
chunk = wav[start_idx:end_idx]
if self._is_silence(chunk):
continue
# Collect audio for biometrics
if seg_duration > 0.4:
spk_audio_buffer[seg['speaker']].append(chunk)
# ENHANCED TRANSCRIPTION
text = self._transcribe_chunk_robust(chunk, sr)
if not text:
continue
emotion = self._detect_emotion(chunk)
sentiment = self._map_emotion_to_sentiment(emotion)
speech_rate = self._calculate_speech_rate(text, seg_duration)
keywords = self._extract_keywords(text, top_n=5)
topic = self._classify_topic(text)
results.append({
"segment_id": i + 1,
"start": float(f"{seg['start']:.2f}"),
"end": float(f"{seg['end']:.2f}"),
"duration": float(f"{seg_duration:.2f}"),
"speaker": seg['speaker'],
"role": "UNKNOWN",
"text": text,
"emotion": emotion,
"sentiment": sentiment, # NEW
"speech_rate": speech_rate, # NEW
"keywords": keywords, # NEW
"topic": topic, # NEW
"tone": self._calculate_tone_advanced(chunk, sr, text)
})
if (i + 1) % 10 == 0:
print(f" Processed {i + 1}/{len(merged)} segments...")
print(f" βœ“ Transcribed {len(results)} segments with text")
# Assign roles
print("\n β†’ Assigning speaker roles...")
results = self._assign_roles_smart(results)
identification = {}
for r in results:
identification[r['speaker']] = r['role']
print(f" βœ“ Roles: {identification}")
# ULTRA-ROBUST BIOMETRICS WITH VOTING
print("\n β†’ Analyzing biometrics with multi-model voting...")
biometrics = self._analyze_biometrics_ultra_robust(spk_audio_buffer, results, wav, sr)
for spk, bio in biometrics.items():
print(f" {spk}: {bio['gender']} (confidence: {bio['gender_confidence']:.2f}), {bio['age_bracket']}")
# Customer journey
print("\n β†’ Analyzing customer journey...")
cust_metrics = self._analyze_customer_journey(results)
print(f" βœ“ Journey: {cust_metrics['emotional_arc']}")
# Agent KPI
print("\n β†’ Analyzing agent performance...")
agent_metrics = self._analyze_agent_kpi(results, cust_metrics['impact_score'])
print(f" βœ“ Agent score: {agent_metrics.get('overall_score', 'N/A')}/100")
# Compile output
call_summary = self._aggregate_call_insights(results)
final_output = {
"metadata": {
"file": os.path.basename(audio_path),
"duration_seconds": float(f"{duration:.2f}"),
"sample_rate": sr,
"total_segments": len(results),
"stt_model": self.stt_model_name,
"gender_models_used": len(self.gender_models),
"speakers": biometrics,
"call_summary": call_summary # NEW
},
"identification": identification,
"agent_metrics": agent_metrics,
"customer_metrics": cust_metrics,
"transcript": results
}
self._flush_memory()
print("\n" + "="*70)
print("βœ… Processing complete")
print("="*70 + "\n")
return final_output
def _enhance_audio_for_callcenter(self, wav, sr):
"""Enhance audio quality for better transcription"""
# 1. Normalize
wav = wav / (np.max(np.abs(wav)) + 1e-7)
# 2. High-pass filter to remove low-frequency noise
try:
sos = scipy_signal.butter(4, 80, 'hp', fs=sr, output='sos')
wav = scipy_signal.sosfilt(sos, wav)
except:
pass
# 3. Gentle compression to balance volume
wav = np.sign(wav) * np.log1p(np.abs(wav) * 10) / np.log1p(10)
return wav.astype(np.float32)
def _transcribe_chunk_robust(self, chunk, sr):
"""
ULTRA-ROBUST TRANSCRIPTION
Optimized for:
- European accents
- Call center quality
- Background noise
"""
# Ensure minimum length
if len(chunk) < sr * 0.3:
pad = np.zeros(int(sr * 0.5), dtype=np.float32)
chunk = np.concatenate([pad, chunk, pad])
try:
# BEST SETTINGS FOR CALL CENTER + EUROPEAN ACCENTS
result = self.stt_model.transcribe(
chunk.astype(np.float32),
language="en", # English only
task="transcribe",
# Quality settings
beam_size=5, # Higher = more accurate but slower
best_of=5, # Sample best of 5 runs
temperature=0.0, # Deterministic
# Accent handling
condition_on_previous_text=True, # Use context
# Noise handling
compression_ratio_threshold=2.4, # More lenient
logprob_threshold=-1.0, # More lenient
no_speech_threshold=0.6, # Standard
# Speed vs accuracy
fp16=(self.device == "cuda"), # Use FP16 on GPU
# Word timestamps for quality check
word_timestamps=True
)
text = result['text'].strip()
# Quality filters
if len(text) < 2:
return None
# Filter garbage
garbage = ["you", "thank you", ".", "...", "bye", "okay"]
if text.lower() in garbage:
return None
# Check if it's actual speech (has vowels and consonants)
if not any(c in text.lower() for c in 'aeiou'):
return None
# Check word-level confidence if available
if 'words' in result and result['words']:
avg_prob = np.mean([w.get('probability', 1.0) for w in result['words']])
if avg_prob < 0.3: # Very low confidence
return None
return text
except Exception as e:
print(f" ⚠ Transcription error: {e}")
return None
def _analyze_biometrics_ultra_robust(self, audio_buffer, transcript, full_wav, sr):
"""
ULTRA-ROBUST GENDER DETECTION
Uses multiple models + voting + pitch + conversation context
"""
profiles = {}
# Collect conversation context
context_gender = self._extract_gender_from_conversation(transcript)
for spk, chunks in audio_buffer.items():
if not chunks:
continue
print(f"\n Analyzing {spk}...")
# Concatenate audio (max 15 seconds from different parts)
raw_audio = self._prepare_audio_for_analysis(chunks, sr)
# ===== METHOD 1: ADVANCED PITCH ANALYSIS =====
pitch_gender, pitch_confidence, pitch_stats = self._analyze_pitch_robust(raw_audio, sr, full_wav, transcript, spk)
print(f" Pitch analysis: {pitch_gender} (conf: {pitch_confidence:.2f})")
# ===== METHOD 2: MULTI-MODEL AI VOTING =====
ai_gender, ai_confidence, all_predictions = self._multi_model_gender_detection(raw_audio, sr)
print(f" AI models: {ai_gender} (conf: {ai_confidence:.2f})")
print(f" Individual: {all_predictions}")
# ===== METHOD 3: CONVERSATION CONTEXT =====
context_gend = context_gender.get(spk, "UNKNOWN")
print(f" Context clues: {context_gend}")
# ===== METHOD 4: FORMANT ANALYSIS =====
formant_gender, formant_confidence = self._analyze_formants(raw_audio, sr)
print(f" Formant analysis: {formant_gender} (conf: {formant_confidence:.2f})")
# ===== VOTING SYSTEM WITH CONFIDENCE WEIGHTING =====
votes = []
# Context vote (HIGHEST priority if available)
if context_gend != "UNKNOWN":
votes.extend([context_gend] * 4) # 4 votes for context
# Pitch vote (HIGH priority)
if pitch_confidence > 0.6:
votes.extend([pitch_gender] * 3) # 3 votes for confident pitch
elif pitch_confidence > 0.4:
votes.append(pitch_gender) # 1 vote for moderate pitch
# AI models vote (MEDIUM priority)
if ai_confidence > 0.7:
votes.extend([ai_gender] * 2) # 2 votes for confident AI
elif ai_confidence > 0.5:
votes.append(ai_gender) # 1 vote for moderate AI
# Formant vote (MEDIUM priority)
if formant_confidence > 0.6:
votes.extend([formant_gender] * 2)
elif formant_confidence > 0.4:
votes.append(formant_gender)
# Count votes
if votes:
vote_counts = Counter(votes)
final_gender = vote_counts.most_common(1)[0][0]
total_votes = len(votes)
winning_votes = vote_counts[final_gender]
final_confidence = winning_votes / total_votes
else:
# Fallback
final_gender = ai_gender if ai_confidence > 0.5 else "UNKNOWN"
final_confidence = ai_confidence
print(f" FINAL: {final_gender} (confidence: {final_confidence:.2f})")
print(f" Vote breakdown: {dict(Counter(votes))}")
# ===== AGE DETECTION =====
age_bracket = self._detect_age_robust(raw_audio, sr, pitch_stats)
# Get role
role = [r['role'] for r in transcript if r['speaker'] == spk]
role = role[0] if role else "UNKNOWN"
profiles[spk] = {
"gender": final_gender,
"gender_confidence": round(final_confidence, 2),
"gender_methods": {
"context": context_gend,
"pitch": f"{pitch_gender} ({pitch_confidence:.2f})",
"ai_models": f"{ai_gender} ({ai_confidence:.2f})",
"formants": f"{formant_gender} ({formant_confidence:.2f})",
"vote_breakdown": dict(Counter(votes))
},
"age_bracket": age_bracket,
"voice_stats": {
"avg_pitch_hz": pitch_stats['mean'],
"pitch_range": f"{pitch_stats['min']:.0f}-{pitch_stats['max']:.0f}Hz",
"pitch_std": pitch_stats['std']
}
}
return profiles
def _prepare_audio_for_analysis(self, chunks, sr, max_duration=15):
"""Prepare audio by taking samples from different parts"""
raw = np.concatenate(chunks)
# Take samples from beginning, middle, end
if len(raw) > sr * max_duration:
segment_len = sr * 5 # 5 seconds each
total_len = len(raw)
samples = []
# Beginning
samples.append(raw[:segment_len])
# Middle
mid_start = (total_len // 2) - (segment_len // 2)
samples.append(raw[mid_start:mid_start + segment_len])
# End
samples.append(raw[-segment_len:])
raw = np.concatenate(samples)
# Normalize
raw = raw - np.mean(raw)
std = np.std(raw)
if std > 1e-7:
raw = raw / std
return raw
def _analyze_pitch_robust(self, audio, sr, full_wav, transcript, speaker):
"""Advanced pitch analysis using multiple methods"""
# Collect all pitch values from transcript
transcript_pitches = [
t['tone']['pitch_hz']
for t in transcript
if t['speaker'] == speaker and t['tone']['pitch_hz'] > 60
]
# Method 1: YIN algorithm
try:
f0_yin = librosa.yin(audio.astype(np.float64), fmin=60, fmax=400, sr=sr)
f0_yin_valid = f0_yin[f0_yin > 0]
except:
f0_yin_valid = []
# Method 2: PYIN (probabilistic YIN)
try:
f0_pyin, voiced_flag, voiced_probs = librosa.pyin(
audio.astype(np.float64),
fmin=60,
fmax=400,
sr=sr
)
f0_pyin_valid = f0_pyin[~np.isnan(f0_pyin)]
except:
f0_pyin_valid = []
# Combine all pitch measurements
all_pitches = []
if len(f0_yin_valid) > 0:
all_pitches.extend(f0_yin_valid)
if len(f0_pyin_valid) > 0:
all_pitches.extend(f0_pyin_valid)
if len(transcript_pitches) > 0:
all_pitches.extend(transcript_pitches)
if len(all_pitches) == 0:
return "UNKNOWN", 0.0, {'mean': 0, 'std': 0, 'min': 0, 'max': 0}
# Calculate statistics
mean_pitch = np.mean(all_pitches)
std_pitch = np.std(all_pitches)
min_pitch = np.min(all_pitches)
max_pitch = np.max(all_pitches)
pitch_stats = {
'mean': round(mean_pitch, 1),
'std': round(std_pitch, 1),
'min': round(min_pitch, 1),
'max': round(max_pitch, 1)
}
# Gender classification with refined thresholds
# Research-based ranges:
# Male: 85-180 Hz (average ~120 Hz)
# Female: 165-255 Hz (average ~210 Hz)
if mean_pitch < 150:
gender = "MALE"
# Confidence based on how far below 150
confidence = min(1.0, (150 - mean_pitch) / 40)
elif mean_pitch > 180:
gender = "FEMALE"
# Confidence based on how far above 180
confidence = min(1.0, (mean_pitch - 180) / 40)
else:
# Ambiguous range (150-180 Hz)
if mean_pitch < 165:
gender = "MALE"
confidence = 0.5
else:
gender = "FEMALE"
confidence = 0.5
return gender, confidence, pitch_stats
def _multi_model_gender_detection(self, audio, sr):
"""Run multiple AI models and aggregate predictions"""
predictions = []
confidences = []
for model_name, model_dict in self.gender_models.items():
try:
processor = model_dict['processor']
model = model_dict['model']
# Prepare inputs
inputs = processor(
audio,
sampling_rate=sr,
return_tensors="pt",
padding=True
).to(self.device)
# Predict
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
probs = torch.softmax(logits, dim=-1)[0].cpu().numpy()
# Extract gender prediction
labels = model.config.id2label
# Find male/female labels (different models use different names)
male_score = 0
female_score = 0
for idx, label in labels.items():
label_lower = label.lower()
if 'male' in label_lower and 'female' not in label_lower:
male_score = max(male_score, probs[idx])
elif 'female' in label_lower:
female_score = max(female_score, probs[idx])
if male_score > female_score:
predictions.append("MALE")
confidences.append(male_score)
else:
predictions.append("FEMALE")
confidences.append(female_score)
except Exception as e:
print(f" Model {model_name} error: {e}")
continue
if not predictions:
return "UNKNOWN", 0.0, {}
# Aggregate predictions
pred_counter = Counter(predictions)
majority_vote = pred_counter.most_common(1)[0][0]
# Calculate confidence
majority_indices = [i for i, p in enumerate(predictions) if p == majority_vote]
avg_confidence = np.mean([confidences[i] for i in majority_indices])
# Individual predictions
individual = {
f"model_{i+1}": f"{pred} ({conf:.2f})"
for i, (pred, conf) in enumerate(zip(predictions, confidences))
}
return majority_vote, float(avg_confidence), individual
def _extract_gender_from_conversation(self, transcript):
"""Extract gender clues from conversation"""
context_map = {}
# Extended keyword lists
male_keywords = [
"sir", "mr.", "mister", "mr ", "gentleman", "he", "him", "his",
"man", "guy", "male", "father", "dad", "son", "brother", "husband"
]
female_keywords = [
"ma'am", "miss", "mrs", "mrs.", "madam", "madame", "ms", "ms.",
"she", "her", "hers", "woman", "lady", "female", "mother", "mom",
"daughter", "sister", "wife"
]
for line in transcript:
if line['role'] == "AGENT":
txt = line['text'].lower()
# Find who agent is talking to
customers = [x['speaker'] for x in transcript if x['role'] == "CUSTOMER"]
if not customers:
continue
target = customers[0]
# Check for keywords
if any(keyword in txt for keyword in male_keywords):
context_map[target] = "MALE"
elif any(keyword in txt for keyword in female_keywords):
context_map[target] = "FEMALE"
return context_map
def _analyze_formants(self, audio, sr):
"""Analyze formant frequencies (F1, F2) for gender detection"""
try:
# Use Praat for formant analysis
import parselmouth
from parselmouth.praat import call
snd = parselmouth.Sound(audio, sampling_frequency=sr)
formant = snd.to_formant_burg()
# Extract F1 and F2 for voiced segments
f1_values = []
f2_values = []
duration = snd.get_total_duration()
time_step = 0.01 # 10ms steps
for t in np.arange(0, duration, time_step):
f1 = formant.get_value_at_time(1, t)
f2 = formant.get_value_at_time(2, t)
if not np.isnan(f1) and not np.isnan(f2):
f1_values.append(f1)
f2_values.append(f2)
if len(f1_values) < 10:
return "UNKNOWN", 0.0
avg_f1 = np.mean(f1_values)
avg_f2 = np.mean(f2_values)
# Gender classification based on formants
# Typical ranges:
# Male: F1 ~120 Hz, F2 ~1200 Hz
# Female: F1 ~220 Hz, F2 ~2100 Hz
# Combined metric
if avg_f1 < 170 and avg_f2 < 1650:
gender = "MALE"
confidence = 0.7
elif avg_f1 > 190 and avg_f2 > 1750:
gender = "FEMALE"
confidence = 0.7
else:
# Use F2 as primary indicator
if avg_f2 < 1600:
gender = "MALE"
else:
gender = "FEMALE"
confidence = 0.5
return gender, confidence
except ImportError:
return "UNKNOWN", 0.0
except Exception as e:
return "UNKNOWN", 0.0
def _detect_age_robust(self, audio, sr, pitch_stats):
"""Robust age detection"""
try:
if 'audeering' not in self.gender_models:
return "26-35" # Default
processor = self.gender_models['audeering']['processor']
model = self.gender_models['audeering']['model']
inputs = processor(audio, sampling_rate=sr, return_tensors="pt").to(self.device)
with torch.no_grad():
logits = model(**inputs).logits
probs = torch.softmax(logits, dim=-1)[0].cpu().numpy()
# Map labels to age buckets (aggregating across genders)
# Labels usually look like: 'female_20-29', 'male_20-29', etc.
labels = model.config.id2label
age_scores = defaultdict(float)
for i, score in enumerate(probs):
label = labels[i]
# Extract age part (assuming format gender_age)
parts = label.split('_')
if len(parts) > 1:
age_group = parts[-1] # e.g., "20-29"
age_scores[age_group] += score
# Get best age bracket
if age_scores:
best_age = max(age_scores, key=age_scores.get)
return best_age
return "UNKNOWN"
except Exception as e:
print(f" ⚠ Age detection failed: {e}")
return "UNKNOWN"
def _run_enhanced_diarization(self, wav, sr, file_path):
"""
Run Pyannote diarization or fallback to simple segmentation
"""
if self.diarization_pipeline is None:
print(" ⚠ No auth token provided, using energy-based fallback segmentation")
return self._energy_based_segmentation(wav, sr)
try:
# Run pipeline
diarization = self.diarization_pipeline(file_path, min_speakers=2, max_speakers=2)
segments = []
for turn, _, speaker in diarization.itertracks(yield_label=True):
segments.append({
"start": turn.start,
"end": turn.end,
"speaker": speaker
})
return segments
except Exception as e:
print(f" ⚠ Diarization error: {e}, using fallback")
return self._energy_based_segmentation(wav, sr)
def _energy_based_segmentation(self, wav, sr):
"""Fallback if deep learning diarization fails"""
# Simple energy detection to split speech from silence
# Treating as single speaker (SPEAKER_00)
intervals = librosa.effects.split(wav, top_db=30)
segments = []
for start, end in intervals:
segments.append({
"start": start / sr,
"end": end / sr,
"speaker": "SPEAKER_00"
})
return segments
def _merge_segments_smart(self, segments, min_gap=0.5):
"""Merge segments from same speaker that are close together"""
if not segments:
return []
merged = []
current = segments[0]
for next_seg in segments[1:]:
# If same speaker and gap is small
if (next_seg['speaker'] == current['speaker'] and
(next_seg['start'] - current['end']) < min_gap):
# Extend current segment
current['end'] = next_seg['end']
else:
merged.append(current)
current = next_seg
merged.append(current)
return merged
def _is_silence(self, chunk, threshold=0.005):
"""Check if audio chunk is essentially silence"""
return np.max(np.abs(chunk)) < threshold
def _detect_emotion(self, chunk):
"""Detect emotion from audio chunk"""
try:
# Ensure chunk is long enough for model
if len(chunk) < 16000 * 0.5:
return "neutral"
# Use the pipeline loaded in init
# Note: Pipeline expects file path or numpy array
preds = self.emotion_classifier(chunk, top_k=1)
return preds[0]['label']
except:
return "neutral"
def _calculate_tone_advanced(self, chunk, sr, text):
"""
Calculate pitch, jitter, and shimmer using Parselmouth (Praat)
"""
try:
if len(chunk) < sr * 0.1:
return {"pitch_hz": 0, "jitter": 0, "shimmer": 0}
snd = parselmouth.Sound(chunk, sampling_frequency=sr)
# Pitch
pitch = snd.to_pitch()
pitch_val = pitch.selected_array['frequency']
pitch_val = pitch_val[pitch_val != 0]
avg_pitch = np.mean(pitch_val) if len(pitch_val) > 0 else 0
# Pulses for Jitter/Shimmer
point_process = call(snd, "To PointProcess (periodic, cc)", 75, 500)
try:
jitter = call(point_process, "Get jitter (local)", 0, 0, 0.0001, 0.02, 1.3)
except:
jitter = 0
try:
shimmer = call([snd, point_process], "Get shimmer (local)", 0, 0, 0.0001, 0.02, 1.3, 1.6)
except:
shimmer = 0
return {
"pitch_hz": round(float(avg_pitch), 1),
"jitter": round(float(jitter * 100), 2), # percentage
"shimmer": round(float(shimmer * 100), 2) # db
}
except:
return {"pitch_hz": 0, "jitter": 0, "shimmer": 0}
def _assign_roles_smart(self, results):
"""
Assign AGENT vs CUSTOMER roles using Golden Phrases and Verbosity.
"""
speakers = list(set(r['speaker'] for r in results))
if len(speakers) == 1:
# If only one speaker found, assume it's the Agent monologuing
for r in results: r['role'] = "AGENT"
return results
speaker_scores = defaultdict(int)
word_counts = defaultdict(int)
# 1. GOLDEN PHRASES (Almost 100% guarantee of Agent)
# These override normal scoring
golden_agent_phrases = [
"my name is", "this is steve", "this is sam", "this is mike", # Common names
"calling from", "on a recorded line", "green solutions",
"energy solutions", "federal government", "rebate program"
]
# 2. STANDARD SCORING KEYWORDS
agent_keywords = [
"manager", "supervisor", "qualified", "eligible",
"whatsapp", "ping you", "verification", "consumption"
]
customer_keywords = [
"who is this", "stop calling", "not interested",
"take me off", "do not call", "why are you asking"
]
agent_found_via_golden = None
for res in results:
text = res['text'].lower()
spk = res['speaker']
# Count words for verbosity check
words = text.split()
word_counts[spk] += len(words)
# Check Golden Phrases (Instant Win)
if agent_found_via_golden is None:
for phrase in golden_agent_phrases:
if phrase in text:
print(f" β˜… Golden Phrase found for {spk}: '{phrase}'")
agent_found_via_golden = spk
break
# Standard Scoring
if any(k in text for k in agent_keywords):
speaker_scores[spk] += 2
if any(k in text for k in customer_keywords):
speaker_scores[spk] -= 3 # Strong negative for objections
# 3. DECISION LOGIC
final_agent = None
if agent_found_via_golden:
# If we found a golden phrase, trust it implicitly
final_agent = agent_found_via_golden
else:
# Fallback: Verbosity Check (Agent usually talks more)
# Get speaker with max words
talkative_spk = max(word_counts, key=word_counts.get)
total_words = sum(word_counts.values())
# If one speaker dominates >60% of conversation, likely the agent
if word_counts[talkative_spk] / max(1, total_words) > 0.60:
speaker_scores[talkative_spk] += 5
# Validating scores
final_agent = max(speaker_scores, key=speaker_scores.get)
# 4. ASSIGN ROLES
print(f" βœ“ Role Assignment: Identified {final_agent} as AGENT")
identification = {}
for res in results:
if res['speaker'] == final_agent:
res['role'] = "AGENT"
else:
res['role'] = "CUSTOMER"
identification[res['speaker']] = res['role']
return results
def _analyze_customer_journey(self, results):
"""Analyze sentiment flow of the customer"""
cust_segments = [r for r in results if r['role'] == "CUSTOMER"]
if not cust_segments:
return {"emotional_arc": "No customer audio", "impact_score": 0}
# Map emotions to scores
emo_map = {
"happy": 1.0, "joy": 1.0, "neutral": 0.1,
"sad": -0.5, "angry": -1.0, "frustrated": -1.0
}
start_score = sum(emo_map.get(s['emotion'], 0) for s in cust_segments[:3]) / min(3, len(cust_segments))
end_score = sum(emo_map.get(s['emotion'], 0) for s in cust_segments[-3:]) / min(3, len(cust_segments))
impact = end_score - start_score
if impact > 0.2: arc = "Positive Resolution"
elif impact < -0.2: arc = "Negative Escalation"
else: arc = "Neutral/Unresolved"
return {
"emotional_arc": arc,
"start_sentiment": round(start_score, 2),
"end_sentiment": round(end_score, 2),
"impact_score": round(impact, 2)
}
def _analyze_agent_kpi(self, results, customer_impact):
"""Calculate Agent performance metrics"""
agent_segments = [r for r in results if r['role'] == "AGENT"]
if not agent_segments:
return {"overall_score": 0}
# 1. Politeness (Keyword based)
polite_words = ["please", "thank", "sorry", "apologize", "appreciate"]
total_words = sum(len(s['text'].split()) for s in agent_segments)
polite_count = sum(1 for s in agent_segments if any(w in s['text'].lower() for w in polite_words))
politeness_score = min(100, (polite_count / max(1, len(agent_segments))) * 200)
# 2. Tone Consistency (Jitter/Shimmer variance)
jitter_vals = [s['tone']['jitter'] for s in agent_segments]
tone_stability = 100 - min(100, np.std(jitter_vals) * 10) if jitter_vals else 50
# 3. Resolution Impact (from customer journey)
# Map -1.0 to 1.0 range -> 0 to 100
resolution_score = 50 + (customer_impact * 50)
resolution_score = max(0, min(100, resolution_score))
# Overall Weighted Score
overall = (
(politeness_score * 0.3) +
(tone_stability * 0.2) +
(resolution_score * 0.5)
)
return {
"overall_score": int(overall),
"politeness": int(politeness_score),
"tone_stability": int(tone_stability),
"resolution_effectiveness": int(resolution_score)
}
def _flush_memory(self):
"""Aggressive memory cleanup"""
gc.collect()
if self.device == "cuda":
torch.cuda.empty_cache()
def _map_emotion_to_sentiment(self, emotion):
"""Map emotion labels to sentiment with polarity score"""
emotion_lower = emotion.lower()
positive_emotions = {
'happy': 0.8, 'joy': 0.9, 'excited': 0.85,
'pleased': 0.7, 'satisfied': 0.75, 'content': 0.6
}
negative_emotions = {
'sad': -0.6, 'angry': -0.9, 'frustrated': -0.8,
'annoyed': -0.7, 'disappointed': -0.65, 'upset': -0.75
}
if emotion_lower in positive_emotions:
return {
"sentiment": "positive",
"polarity_score": positive_emotions[emotion_lower],
"confidence": "high"
}
if emotion_lower in negative_emotions:
return {
"sentiment": "negative",
"polarity_score": negative_emotions[emotion_lower],
"confidence": "high"
}
return {
"sentiment": "neutral",
"polarity_score": 0.0,
"confidence": "medium"
}
def _calculate_speech_rate(self, text, duration_seconds):
"""Calculate words per minute (WPM) and classify pace"""
if duration_seconds < 0.1:
return {"wpm": 0, "word_count": 0, "speech_pace": "unknown"}
words = text.split()
word_count = len(words)
wpm = (word_count / (duration_seconds / 60.0)) if duration_seconds > 0 else 0
if wpm < 100: pace = "slow"
elif wpm < 140: pace = "normal"
elif wpm < 180: pace = "fast"
else: pace = "very_fast"
return {
"wpm": round(wpm, 1),
"word_count": word_count,
"speech_pace": pace
}
def _extract_keywords(self, text, top_n=5):
"""Extract keywords/keyphrases using KeyBERT"""
if self.keyword_model is None or len(text.split()) < 3:
return []
try:
keywords = self.keyword_model.extract_keywords(
text,
keyphrase_ngram_range=(1, 2),
stop_words='english',
top_n=top_n,
use_maxsum=True,
nr_candidates=20
)
return [
{"keyword": kw[0], "relevance": round(float(kw[1]), 3)}
for kw in keywords
]
except:
return []
def _classify_topic(self, text):
"""Classify text into call center topics"""
if self.topic_classifier is None or len(text.split()) < 5:
return {"topic": "unknown", "confidence": 0.0}
try:
result = self.topic_classifier(text, self.topic_labels, multi_label=False)
return {
"topic": result['labels'][0],
"confidence": round(float(result['scores'][0]), 3),
"top_3_topics": [
{"topic": label, "score": round(float(score), 3)}
for label, score in zip(result['labels'][:3], result['scores'][:3])
]
}
except:
return {"topic": "unknown", "confidence": 0.0}
def _aggregate_call_insights(self, results):
"""Aggregate keywords and topics at call level"""
if not results:
return {"top_keywords": [], "primary_topic": {"topic": "unknown"}}
all_keywords = {}
for seg in results:
if 'keywords' in seg:
for kw in seg['keywords']:
keyword = kw['keyword']
score = kw['relevance']
all_keywords[keyword] = max(all_keywords.get(keyword, 0), score)
top_keywords = [
{"keyword": k, "relevance": round(v, 3)}
for k, v in sorted(all_keywords.items(), key=lambda x: x[1], reverse=True)[:10]
]
# Aggregate topics
topic_votes = defaultdict(float)
for seg in results:
if 'topic' in seg and seg['topic']['confidence'] > 0.5:
topic_votes[seg['topic']['topic']] += seg['topic']['confidence']
primary_topic = {
"topic": max(topic_votes, key=topic_votes.get) if topic_votes else "unknown",
"confidence": round(topic_votes[max(topic_votes, key=topic_votes.get)] / len(results), 3) if topic_votes else 0.0
}
# Calculate stats
total_words = sum(seg.get('speech_rate', {}).get('word_count', 0) for seg in results)
wpm_values = [seg.get('speech_rate', {}).get('wpm', 0) for seg in results if seg.get('speech_rate', {}).get('wpm', 0) > 0]
average_wpm = round(np.mean(wpm_values), 1) if wpm_values else 0
return {
"top_keywords": top_keywords,
"primary_topic": primary_topic,
"total_words": total_words,
"average_wpm": average_wpm
}
if __name__ == "__main__":
# Example usage
print("Initialize with: analyzer = UltraRobustCallAnalytics(hf_token='YOUR_TOKEN')")
print("Process with: result = analyzer.process_call('path/to/audio.wav')")