|
|
""" |
|
|
ULTRA-ROBUST CALL CENTER ANALYTICS |
|
|
=================================== |
|
|
β
Multiple gender detection models with voting |
|
|
β
Best STT model (Whisper Large-v3 + optimizations) |
|
|
β
Enhanced for European accents |
|
|
β
Robust pitch analysis with multiple methods |
|
|
β
Production-grade accuracy |
|
|
|
|
|
MODELS USED: |
|
|
- STT: Whisper Large-v3 (best for accents) |
|
|
- Gender: 3 models + voting system |
|
|
- Age: Wav2Vec2 Large + validation |
|
|
- Diarization: pyannote 3.1 (SOTA) |
|
|
""" |
|
|
|
|
|
from keybert import KeyBERT |
|
|
from sentence_transformers import SentenceTransformer |
|
|
import os |
|
|
import sys |
|
|
import logging |
|
|
import torch |
|
|
import librosa |
|
|
import whisper |
|
|
import numpy as np |
|
|
import warnings |
|
|
import json |
|
|
import gc |
|
|
from collections import Counter, defaultdict |
|
|
from pyannote.audio import Pipeline |
|
|
from transformers import ( |
|
|
pipeline, |
|
|
Wav2Vec2Processor, |
|
|
Wav2Vec2ForSequenceClassification, |
|
|
AutoModelForAudioClassification, |
|
|
AutoFeatureExtractor |
|
|
) |
|
|
from datetime import datetime |
|
|
from scipy import signal as scipy_signal |
|
|
from scipy.stats import mode as scipy_mode |
|
|
import parselmouth |
|
|
from parselmouth.praat import call |
|
|
|
|
|
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" |
|
|
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" |
|
|
logging.getLogger("pyannote").setLevel(logging.ERROR) |
|
|
logging.getLogger("transformers").setLevel(logging.ERROR) |
|
|
warnings.filterwarnings("ignore") |
|
|
|
|
|
class NumpyEncoder(json.JSONEncoder): |
|
|
def default(self, obj): |
|
|
if isinstance(obj, np.integer): return int(obj) |
|
|
if isinstance(obj, np.floating): return float(obj) |
|
|
if isinstance(obj, np.ndarray): return obj.tolist() |
|
|
return super(NumpyEncoder, self).default(obj) |
|
|
|
|
|
|
|
|
class UltraRobustCallAnalytics: |
|
|
def __init__(self, hf_token=None, device=None): |
|
|
|
|
|
self.device = device if device else ("cuda" if torch.cuda.is_available() else "cpu") |
|
|
print(f"π Initializing ULTRA-ROBUST Analytics Engine on {self.device}...") |
|
|
print("="*70) |
|
|
|
|
|
|
|
|
self._flush_memory() |
|
|
|
|
|
|
|
|
try: |
|
|
print(" β Loading Whisper Large-v3 (BEST for accents)...") |
|
|
self.stt_model = whisper.load_model("large-v3", device=self.device) |
|
|
self.stt_model_name = "large-v3" |
|
|
print(" β Whisper Large-v3 loaded") |
|
|
except: |
|
|
print(" β Falling back to Large-v2...") |
|
|
try: |
|
|
self.stt_model = whisper.load_model("large-v2", device=self.device) |
|
|
self.stt_model_name = "large-v2" |
|
|
print(" β Whisper Large-v2 loaded") |
|
|
except: |
|
|
print(" β Final fallback to Medium...") |
|
|
self.stt_model = whisper.load_model("medium", device=self.device) |
|
|
self.stt_model_name = "medium" |
|
|
print(" β Whisper Medium loaded") |
|
|
|
|
|
|
|
|
self.diarization_pipeline = None |
|
|
if hf_token: |
|
|
print(f" β Attempting to load Pyannote with token starting: {hf_token[:4]}...") |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
self.diarization_pipeline = Pipeline.from_pretrained( |
|
|
"pyannote/speaker-diarization-3.1", |
|
|
token=hf_token |
|
|
).to(torch.device(self.device)) |
|
|
print(" β Diarization loaded (New Syntax)") |
|
|
except TypeError: |
|
|
|
|
|
print(" β New syntax failed, trying legacy syntax...") |
|
|
try: |
|
|
self.diarization_pipeline = Pipeline.from_pretrained( |
|
|
"pyannote/speaker-diarization-3.1", |
|
|
use_auth_token=hf_token |
|
|
).to(torch.device(self.device)) |
|
|
print(" β Diarization loaded (Legacy Syntax)") |
|
|
except Exception as e: |
|
|
print(f" β CRITICAL PYANNOTE ERROR (Legacy): {e}") |
|
|
except Exception as e: |
|
|
print(f" β CRITICAL PYANNOTE ERROR: {e}") |
|
|
|
|
|
|
|
|
print(" β Loading emotion classifier...") |
|
|
self.emotion_classifier = pipeline( |
|
|
"audio-classification", |
|
|
model="superb/wav2vec2-base-superb-er", |
|
|
device=0 if self.device == "cuda" else -1 |
|
|
) |
|
|
print(" β Emotion classifier loaded") |
|
|
|
|
|
|
|
|
print("\n β Loading MULTIPLE gender detection models...") |
|
|
self.gender_models = {} |
|
|
|
|
|
|
|
|
try: |
|
|
print(" Loading Gender Model 1: audeering/wav2vec2-large...") |
|
|
self.ag_model_name = "audeering/wav2vec2-large-robust-24-ft-age-gender" |
|
|
self.ag_processor = Wav2Vec2Processor.from_pretrained(self.ag_model_name) |
|
|
self.ag_model = Wav2Vec2ForSequenceClassification.from_pretrained(self.ag_model_name) |
|
|
self.ag_model.to(self.device).eval() |
|
|
self.gender_models['audeering'] = { |
|
|
'processor': self.ag_processor, |
|
|
'model': self.ag_model |
|
|
} |
|
|
print(" β Model 1 loaded") |
|
|
except Exception as e: |
|
|
print(f" β Model 1 failed: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
print(" Loading Gender Model 2: alefiury/wav2vec2-large-xlsr-53-gender...") |
|
|
model2_name = "alefiury/wav2vec2-large-xlsr-53-gender-recognition-librispeech" |
|
|
processor2 = AutoFeatureExtractor.from_pretrained(model2_name) |
|
|
model2 = AutoModelForAudioClassification.from_pretrained(model2_name) |
|
|
model2.to(self.device).eval() |
|
|
self.gender_models['alefiury'] = { |
|
|
'processor': processor2, |
|
|
'model': model2 |
|
|
} |
|
|
print(" β Model 2 loaded") |
|
|
except Exception as e: |
|
|
print(f" β Model 2 failed: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
print(" Loading Gender Model 3: MIT/ast-finetuned-speech-commands...") |
|
|
model3_name = "MIT/ast-finetuned-speech-commands-v2" |
|
|
processor3 = AutoFeatureExtractor.from_pretrained(model3_name) |
|
|
model3 = AutoModelForAudioClassification.from_pretrained(model3_name) |
|
|
model3.to(self.device).eval() |
|
|
self.gender_models['mit'] = { |
|
|
'processor': processor3, |
|
|
'model': model3 |
|
|
} |
|
|
print(" β Model 3 loaded") |
|
|
except Exception as e: |
|
|
print(f" β Model 3 failed: {e}") |
|
|
|
|
|
print(f" β Loaded {len(self.gender_models)} gender detection models") |
|
|
|
|
|
print("\n" + "="*70) |
|
|
print("β
Engine initialized successfully") |
|
|
print("="*70 + "\n") |
|
|
|
|
|
print(" β Loading KeyBERT for keyword extraction...") |
|
|
try: |
|
|
self.keyword_model = KeyBERT('all-MiniLM-L6-v2') |
|
|
print(" β Keyword extractor loaded") |
|
|
except Exception as e: |
|
|
print(f" β Keyword model failed: {e}") |
|
|
self.keyword_model = None |
|
|
|
|
|
print(" β Loading zero-shot topic classifier...") |
|
|
try: |
|
|
self.topic_classifier = pipeline( |
|
|
"zero-shot-classification", |
|
|
model="facebook/bart-large-mnli", |
|
|
device=0 if self.device == "cuda" else -1 |
|
|
) |
|
|
self.topic_labels = [ |
|
|
"billing_payment", |
|
|
"technical_support", |
|
|
"product_inquiry", |
|
|
"complaint_issue", |
|
|
"account_management", |
|
|
"sales_marketing", |
|
|
"service_cancellation", |
|
|
"feedback_survey", |
|
|
"appointment_scheduling", |
|
|
"general_inquiry" |
|
|
] |
|
|
print(" β Topic classifier loaded") |
|
|
except Exception as e: |
|
|
print(f" β Topic classifier failed: {e}") |
|
|
self.topic_classifier = None |
|
|
|
|
|
def process_call(self, audio_path): |
|
|
"""Main processing with maximum robustness""" |
|
|
if not os.path.exists(audio_path): |
|
|
raise FileNotFoundError(f"Audio file not found: {audio_path}") |
|
|
|
|
|
self._flush_memory() |
|
|
|
|
|
print(f"π Processing: {audio_path}") |
|
|
print("="*70) |
|
|
|
|
|
|
|
|
wav, sr = librosa.load(audio_path, sr=16000, mono=True) |
|
|
wav = wav.astype(np.float32) |
|
|
|
|
|
|
|
|
wav = self._enhance_audio_for_callcenter(wav, sr) |
|
|
|
|
|
duration = len(wav) / sr |
|
|
print(f" β Audio loaded: {duration:.1f}s @ {sr}Hz") |
|
|
|
|
|
|
|
|
print("\n β Running enhanced diarization...") |
|
|
segments = self._run_enhanced_diarization(wav, sr, audio_path) |
|
|
print(f" β Found {len(set(s['speaker'] for s in segments))} speakers, {len(segments)} segments") |
|
|
|
|
|
|
|
|
merged = self._merge_segments_smart(segments, min_gap=0.25) |
|
|
print(f" β Merged to {len(merged)} segments") |
|
|
|
|
|
|
|
|
results = [] |
|
|
spk_audio_buffer = defaultdict(list) |
|
|
pad = int(0.1 * sr) |
|
|
|
|
|
print("\n β Transcribing with Whisper Large-v3...") |
|
|
for i, seg in enumerate(merged): |
|
|
seg_duration = seg['end'] - seg['start'] |
|
|
if seg_duration < 0.1: |
|
|
continue |
|
|
|
|
|
start_idx = max(0, int(seg['start'] * sr) - pad) |
|
|
end_idx = min(len(wav), int(seg['end'] * sr) + pad) |
|
|
chunk = wav[start_idx:end_idx] |
|
|
|
|
|
if self._is_silence(chunk): |
|
|
continue |
|
|
|
|
|
|
|
|
if seg_duration > 0.4: |
|
|
spk_audio_buffer[seg['speaker']].append(chunk) |
|
|
|
|
|
|
|
|
text = self._transcribe_chunk_robust(chunk, sr) |
|
|
if not text: |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
emotion = self._detect_emotion(chunk) |
|
|
sentiment = self._map_emotion_to_sentiment(emotion) |
|
|
speech_rate = self._calculate_speech_rate(text, seg_duration) |
|
|
keywords = self._extract_keywords(text, top_n=5) |
|
|
topic = self._classify_topic(text) |
|
|
results.append({ |
|
|
"segment_id": i + 1, |
|
|
"start": float(f"{seg['start']:.2f}"), |
|
|
"end": float(f"{seg['end']:.2f}"), |
|
|
"duration": float(f"{seg_duration:.2f}"), |
|
|
"speaker": seg['speaker'], |
|
|
"role": "UNKNOWN", |
|
|
"text": text, |
|
|
"emotion": emotion, |
|
|
"sentiment": sentiment, |
|
|
"speech_rate": speech_rate, |
|
|
"keywords": keywords, |
|
|
"topic": topic, |
|
|
"tone": self._calculate_tone_advanced(chunk, sr, text) |
|
|
}) |
|
|
|
|
|
if (i + 1) % 10 == 0: |
|
|
print(f" Processed {i + 1}/{len(merged)} segments...") |
|
|
|
|
|
print(f" β Transcribed {len(results)} segments with text") |
|
|
|
|
|
|
|
|
print("\n β Assigning speaker roles...") |
|
|
results = self._assign_roles_smart(results) |
|
|
|
|
|
identification = {} |
|
|
for r in results: |
|
|
identification[r['speaker']] = r['role'] |
|
|
print(f" β Roles: {identification}") |
|
|
|
|
|
|
|
|
print("\n β Analyzing biometrics with multi-model voting...") |
|
|
biometrics = self._analyze_biometrics_ultra_robust(spk_audio_buffer, results, wav, sr) |
|
|
for spk, bio in biometrics.items(): |
|
|
print(f" {spk}: {bio['gender']} (confidence: {bio['gender_confidence']:.2f}), {bio['age_bracket']}") |
|
|
|
|
|
|
|
|
print("\n β Analyzing customer journey...") |
|
|
cust_metrics = self._analyze_customer_journey(results) |
|
|
print(f" β Journey: {cust_metrics['emotional_arc']}") |
|
|
|
|
|
|
|
|
print("\n β Analyzing agent performance...") |
|
|
agent_metrics = self._analyze_agent_kpi(results, cust_metrics['impact_score']) |
|
|
print(f" β Agent score: {agent_metrics.get('overall_score', 'N/A')}/100") |
|
|
|
|
|
|
|
|
call_summary = self._aggregate_call_insights(results) |
|
|
final_output = { |
|
|
"metadata": { |
|
|
"file": os.path.basename(audio_path), |
|
|
"duration_seconds": float(f"{duration:.2f}"), |
|
|
"sample_rate": sr, |
|
|
"total_segments": len(results), |
|
|
"stt_model": self.stt_model_name, |
|
|
"gender_models_used": len(self.gender_models), |
|
|
"speakers": biometrics, |
|
|
"call_summary": call_summary |
|
|
}, |
|
|
"identification": identification, |
|
|
"agent_metrics": agent_metrics, |
|
|
"customer_metrics": cust_metrics, |
|
|
"transcript": results |
|
|
} |
|
|
|
|
|
self._flush_memory() |
|
|
print("\n" + "="*70) |
|
|
print("β
Processing complete") |
|
|
print("="*70 + "\n") |
|
|
|
|
|
return final_output |
|
|
|
|
|
def _enhance_audio_for_callcenter(self, wav, sr): |
|
|
"""Enhance audio quality for better transcription""" |
|
|
|
|
|
wav = wav / (np.max(np.abs(wav)) + 1e-7) |
|
|
|
|
|
|
|
|
try: |
|
|
sos = scipy_signal.butter(4, 80, 'hp', fs=sr, output='sos') |
|
|
wav = scipy_signal.sosfilt(sos, wav) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
wav = np.sign(wav) * np.log1p(np.abs(wav) * 10) / np.log1p(10) |
|
|
|
|
|
return wav.astype(np.float32) |
|
|
|
|
|
def _transcribe_chunk_robust(self, chunk, sr): |
|
|
""" |
|
|
ULTRA-ROBUST TRANSCRIPTION |
|
|
Optimized for: |
|
|
- European accents |
|
|
- Call center quality |
|
|
- Background noise |
|
|
""" |
|
|
|
|
|
if len(chunk) < sr * 0.3: |
|
|
pad = np.zeros(int(sr * 0.5), dtype=np.float32) |
|
|
chunk = np.concatenate([pad, chunk, pad]) |
|
|
|
|
|
try: |
|
|
|
|
|
result = self.stt_model.transcribe( |
|
|
chunk.astype(np.float32), |
|
|
language="en", |
|
|
task="transcribe", |
|
|
|
|
|
|
|
|
beam_size=5, |
|
|
best_of=5, |
|
|
temperature=0.0, |
|
|
|
|
|
|
|
|
condition_on_previous_text=True, |
|
|
|
|
|
|
|
|
compression_ratio_threshold=2.4, |
|
|
logprob_threshold=-1.0, |
|
|
no_speech_threshold=0.6, |
|
|
|
|
|
|
|
|
fp16=(self.device == "cuda"), |
|
|
|
|
|
|
|
|
word_timestamps=True |
|
|
) |
|
|
|
|
|
text = result['text'].strip() |
|
|
|
|
|
|
|
|
if len(text) < 2: |
|
|
return None |
|
|
|
|
|
|
|
|
garbage = ["you", "thank you", ".", "...", "bye", "okay"] |
|
|
if text.lower() in garbage: |
|
|
return None |
|
|
|
|
|
|
|
|
if not any(c in text.lower() for c in 'aeiou'): |
|
|
return None |
|
|
|
|
|
|
|
|
if 'words' in result and result['words']: |
|
|
avg_prob = np.mean([w.get('probability', 1.0) for w in result['words']]) |
|
|
if avg_prob < 0.3: |
|
|
return None |
|
|
|
|
|
return text |
|
|
|
|
|
except Exception as e: |
|
|
print(f" β Transcription error: {e}") |
|
|
return None |
|
|
|
|
|
def _analyze_biometrics_ultra_robust(self, audio_buffer, transcript, full_wav, sr): |
|
|
""" |
|
|
ULTRA-ROBUST GENDER DETECTION |
|
|
Uses multiple models + voting + pitch + conversation context |
|
|
""" |
|
|
profiles = {} |
|
|
|
|
|
|
|
|
context_gender = self._extract_gender_from_conversation(transcript) |
|
|
|
|
|
for spk, chunks in audio_buffer.items(): |
|
|
if not chunks: |
|
|
continue |
|
|
|
|
|
print(f"\n Analyzing {spk}...") |
|
|
|
|
|
|
|
|
raw_audio = self._prepare_audio_for_analysis(chunks, sr) |
|
|
|
|
|
|
|
|
pitch_gender, pitch_confidence, pitch_stats = self._analyze_pitch_robust(raw_audio, sr, full_wav, transcript, spk) |
|
|
print(f" Pitch analysis: {pitch_gender} (conf: {pitch_confidence:.2f})") |
|
|
|
|
|
|
|
|
ai_gender, ai_confidence, all_predictions = self._multi_model_gender_detection(raw_audio, sr) |
|
|
print(f" AI models: {ai_gender} (conf: {ai_confidence:.2f})") |
|
|
print(f" Individual: {all_predictions}") |
|
|
|
|
|
|
|
|
context_gend = context_gender.get(spk, "UNKNOWN") |
|
|
print(f" Context clues: {context_gend}") |
|
|
|
|
|
|
|
|
formant_gender, formant_confidence = self._analyze_formants(raw_audio, sr) |
|
|
print(f" Formant analysis: {formant_gender} (conf: {formant_confidence:.2f})") |
|
|
|
|
|
|
|
|
votes = [] |
|
|
|
|
|
|
|
|
if context_gend != "UNKNOWN": |
|
|
votes.extend([context_gend] * 4) |
|
|
|
|
|
|
|
|
if pitch_confidence > 0.6: |
|
|
votes.extend([pitch_gender] * 3) |
|
|
elif pitch_confidence > 0.4: |
|
|
votes.append(pitch_gender) |
|
|
|
|
|
|
|
|
if ai_confidence > 0.7: |
|
|
votes.extend([ai_gender] * 2) |
|
|
elif ai_confidence > 0.5: |
|
|
votes.append(ai_gender) |
|
|
|
|
|
|
|
|
if formant_confidence > 0.6: |
|
|
votes.extend([formant_gender] * 2) |
|
|
elif formant_confidence > 0.4: |
|
|
votes.append(formant_gender) |
|
|
|
|
|
|
|
|
if votes: |
|
|
vote_counts = Counter(votes) |
|
|
final_gender = vote_counts.most_common(1)[0][0] |
|
|
total_votes = len(votes) |
|
|
winning_votes = vote_counts[final_gender] |
|
|
final_confidence = winning_votes / total_votes |
|
|
else: |
|
|
|
|
|
final_gender = ai_gender if ai_confidence > 0.5 else "UNKNOWN" |
|
|
final_confidence = ai_confidence |
|
|
|
|
|
print(f" FINAL: {final_gender} (confidence: {final_confidence:.2f})") |
|
|
print(f" Vote breakdown: {dict(Counter(votes))}") |
|
|
|
|
|
|
|
|
age_bracket = self._detect_age_robust(raw_audio, sr, pitch_stats) |
|
|
|
|
|
|
|
|
role = [r['role'] for r in transcript if r['speaker'] == spk] |
|
|
role = role[0] if role else "UNKNOWN" |
|
|
|
|
|
profiles[spk] = { |
|
|
"gender": final_gender, |
|
|
"gender_confidence": round(final_confidence, 2), |
|
|
"gender_methods": { |
|
|
"context": context_gend, |
|
|
"pitch": f"{pitch_gender} ({pitch_confidence:.2f})", |
|
|
"ai_models": f"{ai_gender} ({ai_confidence:.2f})", |
|
|
"formants": f"{formant_gender} ({formant_confidence:.2f})", |
|
|
"vote_breakdown": dict(Counter(votes)) |
|
|
}, |
|
|
"age_bracket": age_bracket, |
|
|
"voice_stats": { |
|
|
"avg_pitch_hz": pitch_stats['mean'], |
|
|
"pitch_range": f"{pitch_stats['min']:.0f}-{pitch_stats['max']:.0f}Hz", |
|
|
"pitch_std": pitch_stats['std'] |
|
|
} |
|
|
} |
|
|
|
|
|
return profiles |
|
|
|
|
|
def _prepare_audio_for_analysis(self, chunks, sr, max_duration=15): |
|
|
"""Prepare audio by taking samples from different parts""" |
|
|
raw = np.concatenate(chunks) |
|
|
|
|
|
|
|
|
if len(raw) > sr * max_duration: |
|
|
segment_len = sr * 5 |
|
|
total_len = len(raw) |
|
|
|
|
|
samples = [] |
|
|
|
|
|
samples.append(raw[:segment_len]) |
|
|
|
|
|
mid_start = (total_len // 2) - (segment_len // 2) |
|
|
samples.append(raw[mid_start:mid_start + segment_len]) |
|
|
|
|
|
samples.append(raw[-segment_len:]) |
|
|
|
|
|
raw = np.concatenate(samples) |
|
|
|
|
|
|
|
|
raw = raw - np.mean(raw) |
|
|
std = np.std(raw) |
|
|
if std > 1e-7: |
|
|
raw = raw / std |
|
|
|
|
|
return raw |
|
|
|
|
|
def _analyze_pitch_robust(self, audio, sr, full_wav, transcript, speaker): |
|
|
"""Advanced pitch analysis using multiple methods""" |
|
|
|
|
|
|
|
|
transcript_pitches = [ |
|
|
t['tone']['pitch_hz'] |
|
|
for t in transcript |
|
|
if t['speaker'] == speaker and t['tone']['pitch_hz'] > 60 |
|
|
] |
|
|
|
|
|
|
|
|
try: |
|
|
f0_yin = librosa.yin(audio.astype(np.float64), fmin=60, fmax=400, sr=sr) |
|
|
f0_yin_valid = f0_yin[f0_yin > 0] |
|
|
except: |
|
|
f0_yin_valid = [] |
|
|
|
|
|
|
|
|
try: |
|
|
f0_pyin, voiced_flag, voiced_probs = librosa.pyin( |
|
|
audio.astype(np.float64), |
|
|
fmin=60, |
|
|
fmax=400, |
|
|
sr=sr |
|
|
) |
|
|
f0_pyin_valid = f0_pyin[~np.isnan(f0_pyin)] |
|
|
except: |
|
|
f0_pyin_valid = [] |
|
|
|
|
|
|
|
|
all_pitches = [] |
|
|
if len(f0_yin_valid) > 0: |
|
|
all_pitches.extend(f0_yin_valid) |
|
|
if len(f0_pyin_valid) > 0: |
|
|
all_pitches.extend(f0_pyin_valid) |
|
|
if len(transcript_pitches) > 0: |
|
|
all_pitches.extend(transcript_pitches) |
|
|
|
|
|
if len(all_pitches) == 0: |
|
|
return "UNKNOWN", 0.0, {'mean': 0, 'std': 0, 'min': 0, 'max': 0} |
|
|
|
|
|
|
|
|
mean_pitch = np.mean(all_pitches) |
|
|
std_pitch = np.std(all_pitches) |
|
|
min_pitch = np.min(all_pitches) |
|
|
max_pitch = np.max(all_pitches) |
|
|
|
|
|
pitch_stats = { |
|
|
'mean': round(mean_pitch, 1), |
|
|
'std': round(std_pitch, 1), |
|
|
'min': round(min_pitch, 1), |
|
|
'max': round(max_pitch, 1) |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if mean_pitch < 150: |
|
|
gender = "MALE" |
|
|
|
|
|
confidence = min(1.0, (150 - mean_pitch) / 40) |
|
|
elif mean_pitch > 180: |
|
|
gender = "FEMALE" |
|
|
|
|
|
confidence = min(1.0, (mean_pitch - 180) / 40) |
|
|
else: |
|
|
|
|
|
if mean_pitch < 165: |
|
|
gender = "MALE" |
|
|
confidence = 0.5 |
|
|
else: |
|
|
gender = "FEMALE" |
|
|
confidence = 0.5 |
|
|
|
|
|
return gender, confidence, pitch_stats |
|
|
|
|
|
def _multi_model_gender_detection(self, audio, sr): |
|
|
"""Run multiple AI models and aggregate predictions""" |
|
|
predictions = [] |
|
|
confidences = [] |
|
|
|
|
|
for model_name, model_dict in self.gender_models.items(): |
|
|
try: |
|
|
processor = model_dict['processor'] |
|
|
model = model_dict['model'] |
|
|
|
|
|
|
|
|
inputs = processor( |
|
|
audio, |
|
|
sampling_rate=sr, |
|
|
return_tensors="pt", |
|
|
padding=True |
|
|
).to(self.device) |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = model(**inputs) |
|
|
logits = outputs.logits |
|
|
probs = torch.softmax(logits, dim=-1)[0].cpu().numpy() |
|
|
|
|
|
|
|
|
labels = model.config.id2label |
|
|
|
|
|
|
|
|
male_score = 0 |
|
|
female_score = 0 |
|
|
|
|
|
for idx, label in labels.items(): |
|
|
label_lower = label.lower() |
|
|
if 'male' in label_lower and 'female' not in label_lower: |
|
|
male_score = max(male_score, probs[idx]) |
|
|
elif 'female' in label_lower: |
|
|
female_score = max(female_score, probs[idx]) |
|
|
|
|
|
if male_score > female_score: |
|
|
predictions.append("MALE") |
|
|
confidences.append(male_score) |
|
|
else: |
|
|
predictions.append("FEMALE") |
|
|
confidences.append(female_score) |
|
|
|
|
|
except Exception as e: |
|
|
print(f" Model {model_name} error: {e}") |
|
|
continue |
|
|
|
|
|
if not predictions: |
|
|
return "UNKNOWN", 0.0, {} |
|
|
|
|
|
|
|
|
pred_counter = Counter(predictions) |
|
|
majority_vote = pred_counter.most_common(1)[0][0] |
|
|
|
|
|
|
|
|
majority_indices = [i for i, p in enumerate(predictions) if p == majority_vote] |
|
|
avg_confidence = np.mean([confidences[i] for i in majority_indices]) |
|
|
|
|
|
|
|
|
individual = { |
|
|
f"model_{i+1}": f"{pred} ({conf:.2f})" |
|
|
for i, (pred, conf) in enumerate(zip(predictions, confidences)) |
|
|
} |
|
|
|
|
|
return majority_vote, float(avg_confidence), individual |
|
|
|
|
|
def _extract_gender_from_conversation(self, transcript): |
|
|
"""Extract gender clues from conversation""" |
|
|
context_map = {} |
|
|
|
|
|
|
|
|
male_keywords = [ |
|
|
"sir", "mr.", "mister", "mr ", "gentleman", "he", "him", "his", |
|
|
"man", "guy", "male", "father", "dad", "son", "brother", "husband" |
|
|
] |
|
|
|
|
|
female_keywords = [ |
|
|
"ma'am", "miss", "mrs", "mrs.", "madam", "madame", "ms", "ms.", |
|
|
"she", "her", "hers", "woman", "lady", "female", "mother", "mom", |
|
|
"daughter", "sister", "wife" |
|
|
] |
|
|
|
|
|
for line in transcript: |
|
|
if line['role'] == "AGENT": |
|
|
txt = line['text'].lower() |
|
|
|
|
|
|
|
|
customers = [x['speaker'] for x in transcript if x['role'] == "CUSTOMER"] |
|
|
if not customers: |
|
|
continue |
|
|
|
|
|
target = customers[0] |
|
|
|
|
|
|
|
|
if any(keyword in txt for keyword in male_keywords): |
|
|
context_map[target] = "MALE" |
|
|
elif any(keyword in txt for keyword in female_keywords): |
|
|
context_map[target] = "FEMALE" |
|
|
|
|
|
return context_map |
|
|
|
|
|
def _analyze_formants(self, audio, sr): |
|
|
"""Analyze formant frequencies (F1, F2) for gender detection""" |
|
|
try: |
|
|
|
|
|
import parselmouth |
|
|
from parselmouth.praat import call |
|
|
snd = parselmouth.Sound(audio, sampling_frequency=sr) |
|
|
formant = snd.to_formant_burg() |
|
|
|
|
|
|
|
|
f1_values = [] |
|
|
f2_values = [] |
|
|
|
|
|
duration = snd.get_total_duration() |
|
|
time_step = 0.01 |
|
|
|
|
|
for t in np.arange(0, duration, time_step): |
|
|
f1 = formant.get_value_at_time(1, t) |
|
|
f2 = formant.get_value_at_time(2, t) |
|
|
|
|
|
if not np.isnan(f1) and not np.isnan(f2): |
|
|
f1_values.append(f1) |
|
|
f2_values.append(f2) |
|
|
|
|
|
if len(f1_values) < 10: |
|
|
return "UNKNOWN", 0.0 |
|
|
|
|
|
avg_f1 = np.mean(f1_values) |
|
|
avg_f2 = np.mean(f2_values) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if avg_f1 < 170 and avg_f2 < 1650: |
|
|
gender = "MALE" |
|
|
confidence = 0.7 |
|
|
elif avg_f1 > 190 and avg_f2 > 1750: |
|
|
gender = "FEMALE" |
|
|
confidence = 0.7 |
|
|
else: |
|
|
|
|
|
if avg_f2 < 1600: |
|
|
gender = "MALE" |
|
|
else: |
|
|
gender = "FEMALE" |
|
|
confidence = 0.5 |
|
|
|
|
|
return gender, confidence |
|
|
except ImportError: |
|
|
return "UNKNOWN", 0.0 |
|
|
except Exception as e: |
|
|
return "UNKNOWN", 0.0 |
|
|
|
|
|
def _detect_age_robust(self, audio, sr, pitch_stats): |
|
|
"""Robust age detection""" |
|
|
try: |
|
|
if 'audeering' not in self.gender_models: |
|
|
return "26-35" |
|
|
|
|
|
processor = self.gender_models['audeering']['processor'] |
|
|
model = self.gender_models['audeering']['model'] |
|
|
|
|
|
inputs = processor(audio, sampling_rate=sr, return_tensors="pt").to(self.device) |
|
|
|
|
|
with torch.no_grad(): |
|
|
logits = model(**inputs).logits |
|
|
probs = torch.softmax(logits, dim=-1)[0].cpu().numpy() |
|
|
|
|
|
|
|
|
|
|
|
labels = model.config.id2label |
|
|
age_scores = defaultdict(float) |
|
|
|
|
|
for i, score in enumerate(probs): |
|
|
label = labels[i] |
|
|
|
|
|
parts = label.split('_') |
|
|
if len(parts) > 1: |
|
|
age_group = parts[-1] |
|
|
age_scores[age_group] += score |
|
|
|
|
|
|
|
|
if age_scores: |
|
|
best_age = max(age_scores, key=age_scores.get) |
|
|
return best_age |
|
|
|
|
|
return "UNKNOWN" |
|
|
|
|
|
except Exception as e: |
|
|
print(f" β Age detection failed: {e}") |
|
|
return "UNKNOWN" |
|
|
|
|
|
def _run_enhanced_diarization(self, wav, sr, file_path): |
|
|
""" |
|
|
Run Pyannote diarization or fallback to simple segmentation |
|
|
""" |
|
|
if self.diarization_pipeline is None: |
|
|
print(" β No auth token provided, using energy-based fallback segmentation") |
|
|
return self._energy_based_segmentation(wav, sr) |
|
|
|
|
|
try: |
|
|
|
|
|
diarization = self.diarization_pipeline(file_path, min_speakers=2, max_speakers=2) |
|
|
|
|
|
segments = [] |
|
|
for turn, _, speaker in diarization.itertracks(yield_label=True): |
|
|
segments.append({ |
|
|
"start": turn.start, |
|
|
"end": turn.end, |
|
|
"speaker": speaker |
|
|
}) |
|
|
return segments |
|
|
|
|
|
except Exception as e: |
|
|
print(f" β Diarization error: {e}, using fallback") |
|
|
return self._energy_based_segmentation(wav, sr) |
|
|
|
|
|
def _energy_based_segmentation(self, wav, sr): |
|
|
"""Fallback if deep learning diarization fails""" |
|
|
|
|
|
|
|
|
intervals = librosa.effects.split(wav, top_db=30) |
|
|
segments = [] |
|
|
for start, end in intervals: |
|
|
segments.append({ |
|
|
"start": start / sr, |
|
|
"end": end / sr, |
|
|
"speaker": "SPEAKER_00" |
|
|
}) |
|
|
return segments |
|
|
|
|
|
def _merge_segments_smart(self, segments, min_gap=0.5): |
|
|
"""Merge segments from same speaker that are close together""" |
|
|
if not segments: |
|
|
return [] |
|
|
|
|
|
merged = [] |
|
|
current = segments[0] |
|
|
|
|
|
for next_seg in segments[1:]: |
|
|
|
|
|
if (next_seg['speaker'] == current['speaker'] and |
|
|
(next_seg['start'] - current['end']) < min_gap): |
|
|
|
|
|
current['end'] = next_seg['end'] |
|
|
else: |
|
|
merged.append(current) |
|
|
current = next_seg |
|
|
|
|
|
merged.append(current) |
|
|
return merged |
|
|
|
|
|
def _is_silence(self, chunk, threshold=0.005): |
|
|
"""Check if audio chunk is essentially silence""" |
|
|
return np.max(np.abs(chunk)) < threshold |
|
|
|
|
|
def _detect_emotion(self, chunk): |
|
|
"""Detect emotion from audio chunk""" |
|
|
try: |
|
|
|
|
|
if len(chunk) < 16000 * 0.5: |
|
|
return "neutral" |
|
|
|
|
|
|
|
|
|
|
|
preds = self.emotion_classifier(chunk, top_k=1) |
|
|
return preds[0]['label'] |
|
|
except: |
|
|
return "neutral" |
|
|
|
|
|
def _calculate_tone_advanced(self, chunk, sr, text): |
|
|
""" |
|
|
Calculate pitch, jitter, and shimmer using Parselmouth (Praat) |
|
|
""" |
|
|
try: |
|
|
if len(chunk) < sr * 0.1: |
|
|
return {"pitch_hz": 0, "jitter": 0, "shimmer": 0} |
|
|
|
|
|
snd = parselmouth.Sound(chunk, sampling_frequency=sr) |
|
|
|
|
|
|
|
|
pitch = snd.to_pitch() |
|
|
pitch_val = pitch.selected_array['frequency'] |
|
|
pitch_val = pitch_val[pitch_val != 0] |
|
|
avg_pitch = np.mean(pitch_val) if len(pitch_val) > 0 else 0 |
|
|
|
|
|
|
|
|
point_process = call(snd, "To PointProcess (periodic, cc)", 75, 500) |
|
|
|
|
|
try: |
|
|
jitter = call(point_process, "Get jitter (local)", 0, 0, 0.0001, 0.02, 1.3) |
|
|
except: |
|
|
jitter = 0 |
|
|
|
|
|
try: |
|
|
shimmer = call([snd, point_process], "Get shimmer (local)", 0, 0, 0.0001, 0.02, 1.3, 1.6) |
|
|
except: |
|
|
shimmer = 0 |
|
|
|
|
|
return { |
|
|
"pitch_hz": round(float(avg_pitch), 1), |
|
|
"jitter": round(float(jitter * 100), 2), |
|
|
"shimmer": round(float(shimmer * 100), 2) |
|
|
} |
|
|
except: |
|
|
return {"pitch_hz": 0, "jitter": 0, "shimmer": 0} |
|
|
|
|
|
def _assign_roles_smart(self, results): |
|
|
""" |
|
|
Assign AGENT vs CUSTOMER roles using Golden Phrases and Verbosity. |
|
|
""" |
|
|
speakers = list(set(r['speaker'] for r in results)) |
|
|
if len(speakers) == 1: |
|
|
|
|
|
for r in results: r['role'] = "AGENT" |
|
|
return results |
|
|
|
|
|
speaker_scores = defaultdict(int) |
|
|
word_counts = defaultdict(int) |
|
|
|
|
|
|
|
|
|
|
|
golden_agent_phrases = [ |
|
|
"my name is", "this is steve", "this is sam", "this is mike", |
|
|
"calling from", "on a recorded line", "green solutions", |
|
|
"energy solutions", "federal government", "rebate program" |
|
|
] |
|
|
|
|
|
|
|
|
agent_keywords = [ |
|
|
"manager", "supervisor", "qualified", "eligible", |
|
|
"whatsapp", "ping you", "verification", "consumption" |
|
|
] |
|
|
|
|
|
customer_keywords = [ |
|
|
"who is this", "stop calling", "not interested", |
|
|
"take me off", "do not call", "why are you asking" |
|
|
] |
|
|
|
|
|
agent_found_via_golden = None |
|
|
|
|
|
for res in results: |
|
|
text = res['text'].lower() |
|
|
spk = res['speaker'] |
|
|
|
|
|
|
|
|
words = text.split() |
|
|
word_counts[spk] += len(words) |
|
|
|
|
|
|
|
|
if agent_found_via_golden is None: |
|
|
for phrase in golden_agent_phrases: |
|
|
if phrase in text: |
|
|
print(f" β
Golden Phrase found for {spk}: '{phrase}'") |
|
|
agent_found_via_golden = spk |
|
|
break |
|
|
|
|
|
|
|
|
if any(k in text for k in agent_keywords): |
|
|
speaker_scores[spk] += 2 |
|
|
|
|
|
if any(k in text for k in customer_keywords): |
|
|
speaker_scores[spk] -= 3 |
|
|
|
|
|
|
|
|
final_agent = None |
|
|
|
|
|
if agent_found_via_golden: |
|
|
|
|
|
final_agent = agent_found_via_golden |
|
|
else: |
|
|
|
|
|
|
|
|
talkative_spk = max(word_counts, key=word_counts.get) |
|
|
total_words = sum(word_counts.values()) |
|
|
|
|
|
|
|
|
if word_counts[talkative_spk] / max(1, total_words) > 0.60: |
|
|
speaker_scores[talkative_spk] += 5 |
|
|
|
|
|
|
|
|
final_agent = max(speaker_scores, key=speaker_scores.get) |
|
|
|
|
|
|
|
|
print(f" β Role Assignment: Identified {final_agent} as AGENT") |
|
|
|
|
|
identification = {} |
|
|
for res in results: |
|
|
if res['speaker'] == final_agent: |
|
|
res['role'] = "AGENT" |
|
|
else: |
|
|
res['role'] = "CUSTOMER" |
|
|
identification[res['speaker']] = res['role'] |
|
|
|
|
|
return results |
|
|
|
|
|
def _analyze_customer_journey(self, results): |
|
|
"""Analyze sentiment flow of the customer""" |
|
|
cust_segments = [r for r in results if r['role'] == "CUSTOMER"] |
|
|
|
|
|
if not cust_segments: |
|
|
return {"emotional_arc": "No customer audio", "impact_score": 0} |
|
|
|
|
|
|
|
|
emo_map = { |
|
|
"happy": 1.0, "joy": 1.0, "neutral": 0.1, |
|
|
"sad": -0.5, "angry": -1.0, "frustrated": -1.0 |
|
|
} |
|
|
|
|
|
start_score = sum(emo_map.get(s['emotion'], 0) for s in cust_segments[:3]) / min(3, len(cust_segments)) |
|
|
end_score = sum(emo_map.get(s['emotion'], 0) for s in cust_segments[-3:]) / min(3, len(cust_segments)) |
|
|
|
|
|
impact = end_score - start_score |
|
|
|
|
|
if impact > 0.2: arc = "Positive Resolution" |
|
|
elif impact < -0.2: arc = "Negative Escalation" |
|
|
else: arc = "Neutral/Unresolved" |
|
|
|
|
|
return { |
|
|
"emotional_arc": arc, |
|
|
"start_sentiment": round(start_score, 2), |
|
|
"end_sentiment": round(end_score, 2), |
|
|
"impact_score": round(impact, 2) |
|
|
} |
|
|
|
|
|
def _analyze_agent_kpi(self, results, customer_impact): |
|
|
"""Calculate Agent performance metrics""" |
|
|
agent_segments = [r for r in results if r['role'] == "AGENT"] |
|
|
|
|
|
if not agent_segments: |
|
|
return {"overall_score": 0} |
|
|
|
|
|
|
|
|
polite_words = ["please", "thank", "sorry", "apologize", "appreciate"] |
|
|
total_words = sum(len(s['text'].split()) for s in agent_segments) |
|
|
polite_count = sum(1 for s in agent_segments if any(w in s['text'].lower() for w in polite_words)) |
|
|
|
|
|
politeness_score = min(100, (polite_count / max(1, len(agent_segments))) * 200) |
|
|
|
|
|
|
|
|
jitter_vals = [s['tone']['jitter'] for s in agent_segments] |
|
|
tone_stability = 100 - min(100, np.std(jitter_vals) * 10) if jitter_vals else 50 |
|
|
|
|
|
|
|
|
|
|
|
resolution_score = 50 + (customer_impact * 50) |
|
|
resolution_score = max(0, min(100, resolution_score)) |
|
|
|
|
|
|
|
|
overall = ( |
|
|
(politeness_score * 0.3) + |
|
|
(tone_stability * 0.2) + |
|
|
(resolution_score * 0.5) |
|
|
) |
|
|
|
|
|
return { |
|
|
"overall_score": int(overall), |
|
|
"politeness": int(politeness_score), |
|
|
"tone_stability": int(tone_stability), |
|
|
"resolution_effectiveness": int(resolution_score) |
|
|
} |
|
|
|
|
|
def _flush_memory(self): |
|
|
"""Aggressive memory cleanup""" |
|
|
gc.collect() |
|
|
if self.device == "cuda": |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
def _map_emotion_to_sentiment(self, emotion): |
|
|
"""Map emotion labels to sentiment with polarity score""" |
|
|
emotion_lower = emotion.lower() |
|
|
|
|
|
positive_emotions = { |
|
|
'happy': 0.8, 'joy': 0.9, 'excited': 0.85, |
|
|
'pleased': 0.7, 'satisfied': 0.75, 'content': 0.6 |
|
|
} |
|
|
negative_emotions = { |
|
|
'sad': -0.6, 'angry': -0.9, 'frustrated': -0.8, |
|
|
'annoyed': -0.7, 'disappointed': -0.65, 'upset': -0.75 |
|
|
} |
|
|
if emotion_lower in positive_emotions: |
|
|
return { |
|
|
"sentiment": "positive", |
|
|
"polarity_score": positive_emotions[emotion_lower], |
|
|
"confidence": "high" |
|
|
} |
|
|
|
|
|
if emotion_lower in negative_emotions: |
|
|
return { |
|
|
"sentiment": "negative", |
|
|
"polarity_score": negative_emotions[emotion_lower], |
|
|
"confidence": "high" |
|
|
} |
|
|
return { |
|
|
"sentiment": "neutral", |
|
|
"polarity_score": 0.0, |
|
|
"confidence": "medium" |
|
|
} |
|
|
|
|
|
def _calculate_speech_rate(self, text, duration_seconds): |
|
|
"""Calculate words per minute (WPM) and classify pace""" |
|
|
if duration_seconds < 0.1: |
|
|
return {"wpm": 0, "word_count": 0, "speech_pace": "unknown"} |
|
|
words = text.split() |
|
|
word_count = len(words) |
|
|
wpm = (word_count / (duration_seconds / 60.0)) if duration_seconds > 0 else 0 |
|
|
if wpm < 100: pace = "slow" |
|
|
elif wpm < 140: pace = "normal" |
|
|
elif wpm < 180: pace = "fast" |
|
|
else: pace = "very_fast" |
|
|
|
|
|
return { |
|
|
"wpm": round(wpm, 1), |
|
|
"word_count": word_count, |
|
|
"speech_pace": pace |
|
|
} |
|
|
|
|
|
def _extract_keywords(self, text, top_n=5): |
|
|
"""Extract keywords/keyphrases using KeyBERT""" |
|
|
if self.keyword_model is None or len(text.split()) < 3: |
|
|
return [] |
|
|
|
|
|
try: |
|
|
keywords = self.keyword_model.extract_keywords( |
|
|
text, |
|
|
keyphrase_ngram_range=(1, 2), |
|
|
stop_words='english', |
|
|
top_n=top_n, |
|
|
use_maxsum=True, |
|
|
nr_candidates=20 |
|
|
) |
|
|
return [ |
|
|
{"keyword": kw[0], "relevance": round(float(kw[1]), 3)} |
|
|
for kw in keywords |
|
|
] |
|
|
except: |
|
|
return [] |
|
|
|
|
|
|
|
|
def _classify_topic(self, text): |
|
|
"""Classify text into call center topics""" |
|
|
if self.topic_classifier is None or len(text.split()) < 5: |
|
|
return {"topic": "unknown", "confidence": 0.0} |
|
|
try: |
|
|
result = self.topic_classifier(text, self.topic_labels, multi_label=False) |
|
|
return { |
|
|
"topic": result['labels'][0], |
|
|
"confidence": round(float(result['scores'][0]), 3), |
|
|
"top_3_topics": [ |
|
|
{"topic": label, "score": round(float(score), 3)} |
|
|
for label, score in zip(result['labels'][:3], result['scores'][:3]) |
|
|
] |
|
|
} |
|
|
except: |
|
|
return {"topic": "unknown", "confidence": 0.0} |
|
|
|
|
|
|
|
|
def _aggregate_call_insights(self, results): |
|
|
"""Aggregate keywords and topics at call level""" |
|
|
if not results: |
|
|
return {"top_keywords": [], "primary_topic": {"topic": "unknown"}} |
|
|
all_keywords = {} |
|
|
for seg in results: |
|
|
if 'keywords' in seg: |
|
|
for kw in seg['keywords']: |
|
|
keyword = kw['keyword'] |
|
|
score = kw['relevance'] |
|
|
all_keywords[keyword] = max(all_keywords.get(keyword, 0), score) |
|
|
|
|
|
top_keywords = [ |
|
|
{"keyword": k, "relevance": round(v, 3)} |
|
|
for k, v in sorted(all_keywords.items(), key=lambda x: x[1], reverse=True)[:10] |
|
|
] |
|
|
|
|
|
|
|
|
topic_votes = defaultdict(float) |
|
|
for seg in results: |
|
|
if 'topic' in seg and seg['topic']['confidence'] > 0.5: |
|
|
topic_votes[seg['topic']['topic']] += seg['topic']['confidence'] |
|
|
|
|
|
primary_topic = { |
|
|
"topic": max(topic_votes, key=topic_votes.get) if topic_votes else "unknown", |
|
|
"confidence": round(topic_votes[max(topic_votes, key=topic_votes.get)] / len(results), 3) if topic_votes else 0.0 |
|
|
} |
|
|
|
|
|
|
|
|
total_words = sum(seg.get('speech_rate', {}).get('word_count', 0) for seg in results) |
|
|
wpm_values = [seg.get('speech_rate', {}).get('wpm', 0) for seg in results if seg.get('speech_rate', {}).get('wpm', 0) > 0] |
|
|
average_wpm = round(np.mean(wpm_values), 1) if wpm_values else 0 |
|
|
|
|
|
return { |
|
|
"top_keywords": top_keywords, |
|
|
"primary_topic": primary_topic, |
|
|
"total_words": total_words, |
|
|
"average_wpm": average_wpm |
|
|
} |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
print("Initialize with: analyzer = UltraRobustCallAnalytics(hf_token='YOUR_TOKEN')") |
|
|
print("Process with: result = analyzer.process_call('path/to/audio.wav')") |