AI-Digital-Library-Assistant / services /elevenlabs_service.py
Nihal2000's picture
Initial deployment of AI Digital Library Assistant
86aa5e4
raw
history blame
11.9 kB
import logging
import asyncio
from typing import Optional, Dict, Any, List
import json
try:
from elevenlabs.client import ElevenLabs
from elevenlabs.conversational_ai.conversation import Conversation, ClientTools
from elevenlabs.conversational_ai.default_audio_interface import DefaultAudioInterface
ELEVENLABS_AVAILABLE = True
except ImportError:
ELEVENLABS_AVAILABLE = False
logger = logging.getLogger(__name__)
logger.warning("ElevenLabs SDK not available. Voice features will be disabled.")
import config
from services.llamaindex_service import LlamaIndexService
logger = logging.getLogger(__name__)
class ElevenLabsService:
"""
Service for integrating ElevenLabs Conversational AI with RAG capabilities.
Provides voice-based interaction with the document library.
"""
def __init__(self, llamaindex_service: LlamaIndexService):
"""
Initialize ElevenLabs service with RAG integration
Args:
llamaindex_service: LlamaIndex service for document queries
"""
self.config = config.config
self.llamaindex_service = llamaindex_service
self.client = None
self.client_tools = None
self.active_conversations: Dict[str, Conversation] = {}
if not ELEVENLABS_AVAILABLE:
logger.error("ElevenLabs SDK not installed. Run: pip install elevenlabs")
return
if not self.config.ELEVENLABS_API_KEY:
logger.warning("ELEVENLABS_API_KEY not configured. Voice features will be limited.")
return
try:
# Initialize ElevenLabs client
self.client = ElevenLabs(api_key=self.config.ELEVENLABS_API_KEY)
logger.info("ElevenLabs client initialized successfully")
# Initialize client tools for custom tool registration
self.client_tools = ClientTools()
# Register RAG tool
self._register_rag_tool()
logger.info("ElevenLabs service initialized with RAG tool")
except Exception as e:
logger.error(f"Error initializing ElevenLabs service: {str(e)}")
def _register_rag_tool(self):
"""Register RAG query tool with ElevenLabs agent"""
if not self.client_tools:
return
try:
# Register the query_documents tool
# Modern ElevenLabs SDK: register(tool_name, handler=callable)
self.client_tools.register("query_documents", handler=self._rag_query_tool)
logger.info("RAG tool 'query_documents' registered successfully")
except Exception as e:
logger.error(f"Error registering RAG tool: {str(e)}")
async def _rag_query_tool(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Custom tool for querying documents using LlamaIndex agentic RAG
Args:
params: Dictionary containing the query
- query (str): The user's question or search query
Returns:
Dictionary with answer and metadata
"""
try:
query = params.get("query", "")
if not query:
return {
"error": "No query provided",
"answer": "I didn't receive a question to search for."
}
logger.info(f"RAG tool called with query: '{query}'")
# Query the LlamaIndex agentic RAG system
try:
result = await asyncio.wait_for(
self.llamaindex_service.query(query),
timeout=self.config.CONVERSATION_TIMEOUT
)
logger.info(f"RAG query successful")
return {
"answer": result,
"source": "document_library",
"confidence": "high"
}
except asyncio.TimeoutError:
logger.error("RAG query timeout")
return {
"error": "timeout",
"answer": "The search took too long. Please try a simpler question."
}
except Exception as e:
logger.error(f"Error in RAG query tool: {str(e)}")
return {
"error": str(e),
"answer": f"I encountered an error searching the documents: {str(e)}"
}
def create_conversation(
self,
agent_id: Optional[str] = None,
session_id: Optional[str] = None
) -> Optional[Conversation]:
"""
Create a new conversation session
Args:
agent_id: ElevenLabs agent ID (uses config default if not provided)
session_id: Optional session ID for tracking
Returns:
Conversation object or None if initialization fails
"""
if not self.client:
logger.error("ElevenLabs client not initialized")
return None
try:
agent_id = agent_id or self.config.ELEVENLABS_AGENT_ID
if not agent_id:
logger.error("No agent ID provided or configured")
return None
# Create audio interface for real-time audio
audio_interface = DefaultAudioInterface()
# Create conversation with RAG tool
conversation = Conversation(
client=self.client,
agent_id=agent_id,
requires_auth=True,
audio_interface=audio_interface,
client_tools=self.client_tools
)
# Store conversation if session ID provided
if session_id:
self.active_conversations[session_id] = conversation
logger.info(f"Created conversation for agent: {agent_id}")
return conversation
except Exception as e:
logger.error(f"Error creating conversation: {str(e)}")
return None
async def start_conversation(self, session_id: Optional[str] = None) -> Dict[str, Any]:
"""
Start a new conversation session (async wrapper for UI)
Args:
session_id: Optional session ID for tracking
Returns:
Dictionary with success status and conversation info
"""
try:
conversation = self.create_conversation(session_id=session_id)
if conversation:
return {
"success": True,
"session_id": session_id,
"message": "Conversation started successfully"
}
else:
return {
"success": False,
"error": "Failed to create conversation"
}
except Exception as e:
logger.error(f"Error starting conversation: {str(e)}")
return {
"success": False,
"error": str(e)
}
async def process_voice_query(
self,
audio_file_path: str,
agent_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Process a voice query file and return response
Args:
audio_file_path: Path to audio file
agent_id: Optional agent ID
Returns:
Dictionary with transcription, answer, and metadata
"""
try:
# For now, this is a placeholder for file-based processing
# ElevenLabs Conversational AI is primarily WebSocket-based
# This would be used for async/batch processing
logger.info(f"Processing voice query from: {audio_file_path}")
# This would require additional implementation for file upload
# and processing through ElevenLabs API
return {
"status": "pending",
"message": "Voice query processing requires WebSocket connection",
"file": audio_file_path
}
except Exception as e:
logger.error(f"Error processing voice query: {str(e)}")
return {
"status": "error",
"error": str(e)
}
async def end_conversation(self, session_id: str) -> bool:
"""
End an active conversation session
Args:
session_id: Session identifier
Returns:
True if conversation ended successfully
"""
try:
if session_id in self.active_conversations:
conversation = self.active_conversations[session_id]
# Try to end the session gracefully
try:
conversation.end_session()
except AttributeError as ae:
# Handle cases where DefaultAudioInterface doesn't have expected methods
logger.warning(f"Could not cleanly end session: {str(ae)}")
except Exception as e:
logger.warning(f"Error during session cleanup: {str(e)}")
# Always remove from active conversations
del self.active_conversations[session_id]
logger.info(f"Ended conversation: {session_id}")
return True
return False
except Exception as e:
logger.error(f"Error ending conversation: {str(e)}")
return False
def get_available_voices(self) -> List[Dict[str, str]]:
"""
Get list of available voice models
Returns:
List of voice model information
"""
try:
if not self.client:
return []
# Get voices from ElevenLabs API
voices = self.client.voices.get_all()
return [
{
"voice_id": voice.voice_id,
"name": voice.name,
"category": voice.category if hasattr(voice, 'category') else "general"
}
for voice in voices.voices
]
except Exception as e:
logger.error(f"Error getting voices: {str(e)}")
return []
def is_available(self) -> bool:
"""Check if ElevenLabs service is available and configured"""
return ELEVENLABS_AVAILABLE and self.client is not None
async def test_connection(self) -> Dict[str, Any]:
"""
Test ElevenLabs API connection
Returns:
Dictionary with test results
"""
try:
if not self.client:
return {
"status": "error",
"message": "Client not initialized"
}
# Try to fetch user info or voices as a connection test
voices = self.get_available_voices()
return {
"status": "success",
"message": "ElevenLabs API connected",
"voices_available": len(voices),
"rag_tool_registered": self.client_tools is not None
}
except Exception as e:
logger.error(f"Connection test failed: {str(e)}")
return {
"status": "error",
"message": str(e)
}