AI-Digital-Library-Assistant / services /podcast_generator_service.py
Nihal2000's picture
Update services/podcast_generator_service.py
ac06f3e verified
raw
history blame
24.8 kB
import logging
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
import re
import uuid
try:
from elevenlabs import VoiceSettings
from elevenlabs.client import ElevenLabs
ELEVENLABS_AVAILABLE = True
except ImportError:
ELEVENLABS_AVAILABLE = False
import config
from services.llamaindex_service import LlamaIndexService
from services.llm_service import LLMService
from services.document_store_service import DocumentStoreService
logger = logging.getLogger(__name__)
@dataclass
class DocumentAnalysis:
"""Analysis results from document(s)"""
key_insights: List[str]
topics: List[str]
complexity_level: str
estimated_words: int
source_documents: List[str]
summary: str
@dataclass
class DialogueLine:
"""Single line of podcast dialogue"""
speaker: str
text: str
pause_after: float = 0.5
@dataclass
class PodcastScript:
"""Complete podcast script"""
dialogue: List[DialogueLine]
total_duration_estimate: float
word_count: int
style: str
def to_text(self) -> str:
lines = []
for line in self.dialogue:
lines.append(f"{line.speaker}: {line.text}")
return "\n\n".join(lines)
@dataclass
class PodcastMetadata:
"""Metadata for generated podcast"""
podcast_id: str
title: str
description: str
source_documents: List[str]
style: str
duration_seconds: float
file_size_mb: float
voices: Dict[str, str]
generated_at: str
generation_cost: Dict[str, float]
key_topics: List[str]
@dataclass
class PodcastResult:
"""Complete podcast generation result"""
podcast_id: str
audio_file_path: str
transcript: str
metadata: PodcastMetadata
generation_time: float
success: bool
error: Optional[str] = None
class PodcastGeneratorService:
"""
Service for generating conversational podcasts from documents.
"""
WORDS_PER_MINUTE = 150
SCRIPT_PROMPTS = {
"conversational": """You are an expert podcast script writer. Create an engaging 2-host podcast discussing the provided documents.
DOCUMENT CONTENT:
{document_content}
KEY INSIGHTS:
{key_insights}
REQUIREMENTS:
- Duration: {duration_minutes} minutes (approximately {word_count} words)
- Style: Conversational, friendly, and accessible
- Format: Alternating dialogue between HOST1 and HOST2
- Make the content engaging and easy to understand
- Include natural transitions and enthusiasm
DIALOGUE FORMAT (strictly follow):
HOST1: [What they say]
HOST2: [What they say]
STRUCTURE:
1. Opening Hook (30 seconds): Grab attention
2. Introduction (1 minute): Set context
3. Main Discussion (70% of time): Deep dive into insights
4. Wrap-up (1 minute): Summarize key takeaways
Generate the complete podcast script now:""",
"educational": """Create an educational podcast discussing the provided documents.
DOCUMENT CONTENT:
{document_content}
KEY INSIGHTS:
{key_insights}
REQUIREMENTS:
- Duration: {duration_minutes} minutes (approximately {word_count} words)
- Style: Clear, methodical, educational
- HOST1 acts as teacher, HOST2 as curious learner
DIALOGUE FORMAT:
HOST1: [Expert explanation]
HOST2: [Clarifying question]
Generate the educational podcast script now:""",
"technical": """Create a technical podcast for an informed audience.
DOCUMENT CONTENT:
{document_content}
KEY INSIGHTS:
{key_insights}
REQUIREMENTS:
- Duration: {duration_minutes} minutes (approximately {word_count} words)
- Style: Professional, detailed, technically accurate
- HOST1 is expert, HOST2 is informed interviewer
DIALOGUE FORMAT:
HOST1: [Technical insight]
HOST2: [Probing question]
Generate the technical podcast script now:""",
"casual": """Create a fun, casual podcast discussing the documents.
DOCUMENT CONTENT:
{document_content}
KEY INSIGHTS:
{key_insights}
REQUIREMENTS:
- Duration: {duration_minutes} minutes (approximately {word_count} words)
- Style: Relaxed, humorous, energetic
- Make it entertaining while informative
DIALOGUE FORMAT:
HOST1: [Casual commentary]
HOST2: [Enthusiastic response]
Generate the casual podcast script now:"""
}
def __init__(
self,
llamaindex_service: LlamaIndexService,
llm_service: LLMService,
elevenlabs_api_key: Optional[str] = None
):
self.config = config.config
self.llamaindex_service = llamaindex_service
self.llm_service = llm_service
# Get document store from llamaindex service
self.document_store = llamaindex_service.document_store
# Initialize ElevenLabs client
self.elevenlabs_client = None
if ELEVENLABS_AVAILABLE:
api_key = elevenlabs_api_key or self.config.ELEVENLABS_API_KEY
if api_key:
try:
self.elevenlabs_client = ElevenLabs(api_key=api_key)
logger.info("ElevenLabs client initialized for podcast generation")
except Exception as e:
logger.error(f"Failed to initialize ElevenLabs client: {e}")
# Create podcast storage directory
self.podcast_dir = Path("./data/podcasts")
self.podcast_dir.mkdir(parents=True, exist_ok=True)
# Metadata database file
self.metadata_file = self.podcast_dir / "metadata_db.json"
self._ensure_metadata_db()
# Voice cache
self._voice_cache = {}
def _ensure_metadata_db(self):
"""Ensure metadata database exists"""
if not self.metadata_file.exists():
import json
self.metadata_file.write_text(json.dumps([], indent=2))
async def generate_podcast(
self,
document_ids: List[str],
style: str = "conversational",
duration_minutes: int = 10,
host1_voice: str = "Rachel",
host2_voice: str = "Adam"
) -> PodcastResult:
"""Generate a complete podcast from documents"""
start_time = datetime.now()
podcast_id = str(uuid.uuid4())
try:
logger.info(f"Starting podcast generation {podcast_id}")
logger.info(f"Documents: {document_ids}, Style: {style}, Duration: {duration_minutes}min")
# Step 1: Retrieve and analyze documents
logger.info("Step 1: Retrieving and analyzing documents...")
analysis = await self.analyze_documents(document_ids)
# Step 2: Generate script
logger.info("Step 2: Generating podcast script...")
script = await self.generate_script(analysis, style, duration_minutes)
# Step 3: Synthesize audio
logger.info("Step 3: Synthesizing audio with voices...")
audio_file_path = await self.synthesize_audio(
podcast_id,
script,
host1_voice,
host2_voice
)
# Calculate generation time
generation_time = (datetime.now() - start_time).total_seconds()
# Step 4: Create metadata
logger.info("Step 4: Creating metadata...")
metadata = self._create_metadata(
podcast_id,
analysis,
script,
audio_file_path,
{host1_voice, host2_voice},
document_ids,
style
)
# Save metadata
self._save_metadata(metadata)
# Save transcript
transcript_path = self.podcast_dir / f"{podcast_id}_transcript.txt"
transcript_path.write_text(script.to_text(), encoding="utf-8")
logger.info(f"Podcast generated successfully: {podcast_id}")
return PodcastResult(
podcast_id=podcast_id,
audio_file_path=str(audio_file_path),
transcript=script.to_text(),
metadata=metadata,
generation_time=generation_time,
success=True
)
except Exception as e:
logger.error(f"Podcast generation failed: {str(e)}", exc_info=True)
return PodcastResult(
podcast_id=podcast_id,
audio_file_path="",
transcript="",
metadata=None,
generation_time=(datetime.now() - start_time).total_seconds(),
success=False,
error=str(e)
)
async def analyze_documents(self, document_ids: List[str]) -> DocumentAnalysis:
"""
Retrieve documents and extract key insights for podcast
FIXED: Now actually retrieves document content from document store
"""
try:
# Step 1: Retrieve actual documents from document store
logger.info(f"Retrieving {len(document_ids)} documents from store...")
documents = []
document_contents = []
for doc_id in document_ids:
doc = await self.document_store.get_document(doc_id)
if doc:
documents.append(doc)
document_contents.append(doc.content)
logger.info(f"Retrieved document: {doc.filename} ({len(doc.content)} chars)")
else:
logger.warning(f"Document {doc_id} not found in store")
if not documents:
raise ValueError(f"No documents found for IDs: {document_ids}")
# Step 2: Combine document content
combined_content = "\n\n---DOCUMENT SEPARATOR---\n\n".join(document_contents)
# Truncate if too long (keep first portion for context)
max_content_length = 15000 # Adjust based on your LLM context window
if len(combined_content) > max_content_length:
logger.warning(f"Content too long ({len(combined_content)} chars), truncating to {max_content_length}")
combined_content = combined_content[:max_content_length] + "\n\n[Content truncated...]"
# Step 3: Use LLM to analyze the content
analysis_prompt = f"""Analyze the following document(s) and provide:
1. The 5-7 most important insights or key points (be specific and detailed)
2. Main themes and topics covered
3. The overall complexity level (beginner/intermediate/advanced)
4. A comprehensive summary suitable for podcast discussion
DOCUMENTS:
{combined_content}
Provide a structured analysis optimized for creating an engaging podcast discussion.
Format your response as:
KEY INSIGHTS:
1. [First key insight]
2. [Second key insight]
...
TOPICS:
- [Topic 1]
- [Topic 2]
...
COMPLEXITY: [beginner/intermediate/advanced]
SUMMARY:
[Your comprehensive summary here]
"""
logger.info("Analyzing content with LLM...")
result = await self.llm_service.generate_text(
analysis_prompt,
max_tokens=2000,
temperature=0.7
)
# Step 4: Parse the structured response
insights = self._extract_insights(result)
topics = self._extract_topics(result)
complexity = self._determine_complexity(result)
summary = self._extract_summary(result)
logger.info(f"Analysis complete: {len(insights)} insights, {len(topics)} topics")
return DocumentAnalysis(
key_insights=insights[:7],
topics=topics,
complexity_level=complexity,
estimated_words=len(combined_content.split()),
source_documents=[doc.filename for doc in documents],
summary=summary or result[:500]
)
except Exception as e:
logger.error(f"Document analysis failed: {str(e)}", exc_info=True)
raise RuntimeError(f"Failed to analyze documents: {str(e)}")
def _extract_summary(self, text: str) -> str:
"""Extract summary section from analysis"""
try:
if "SUMMARY:" in text:
parts = text.split("SUMMARY:")
if len(parts) > 1:
summary = parts[1].strip()
# Take first 500 chars if too long
return summary[:500] if len(summary) > 500 else summary
except:
pass
# Fallback: take first few sentences
sentences = text.split('.')
return '. '.join(sentences[:3]) + '.'
def _extract_insights(self, text: str) -> List[str]:
"""Extract key insights from analysis text"""
insights = []
lines = text.split('\n')
in_insights_section = False
for line in lines:
line = line.strip()
if "KEY INSIGHTS:" in line.upper():
in_insights_section = True
continue
elif line.upper().startswith(("TOPICS:", "COMPLEXITY:", "SUMMARY:")):
in_insights_section = False
if in_insights_section and line:
# Match patterns like "1.", "2.", "-", "*", "•"
insight = re.sub(r'^\d+\.|\-|\*|•', '', line).strip()
if len(insight) > 20:
insights.append(insight)
# Fallback if no insights found
if not insights:
sentences = text.split('.')
insights = [s.strip() + '.' for s in sentences[:7] if len(s.strip()) > 20]
return insights
def _extract_topics(self, text: str) -> List[str]:
"""Extract main topics from analysis"""
topics = []
lines = text.split('\n')
in_topics_section = False
for line in lines:
line = line.strip()
if "TOPICS:" in line.upper():
in_topics_section = True
continue
elif line.upper().startswith(("KEY INSIGHTS:", "COMPLEXITY:", "SUMMARY:")):
in_topics_section = False
if in_topics_section and line:
topic = re.sub(r'^\-|\*|•', '', line).strip()
if len(topic) > 2:
topics.append(topic)
# Fallback: simple keyword extraction
if not topics:
common_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
words = text.lower().split()
word_freq = {}
for word in words:
word = re.sub(r'[^\w\s]', '', word)
if len(word) > 4 and word not in common_words:
word_freq[word] = word_freq.get(word, 0) + 1
top_topics = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:5]
topics = [topic[0].title() for topic in top_topics]
return topics[:5]
def _determine_complexity(self, text: str) -> str:
"""Determine content complexity level"""
text_lower = text.lower()
if "complexity:" in text_lower:
for level in ["beginner", "intermediate", "advanced"]:
if level in text_lower.split("complexity:")[1][:100]:
return level
# Heuristic based on keywords
if any(word in text_lower for word in ['basic', 'introduction', 'beginner', 'simple']):
return "beginner"
elif any(word in text_lower for word in ['advanced', 'complex', 'sophisticated', 'expert']):
return "advanced"
else:
return "intermediate"
async def generate_script(
self,
analysis: DocumentAnalysis,
style: str,
duration_minutes: int
) -> PodcastScript:
"""Generate podcast script from analysis"""
target_words = duration_minutes * self.WORDS_PER_MINUTE
# Prepare context with insights
insights_text = "\n".join(f"{i+1}. {insight}" for i, insight in enumerate(analysis.key_insights))
# Get prompt template
prompt_template = self.SCRIPT_PROMPTS.get(style, self.SCRIPT_PROMPTS["conversational"])
# Fill template
prompt = prompt_template.format(
document_content=analysis.summary,
key_insights=insights_text,
duration_minutes=duration_minutes,
word_count=target_words
)
# Generate script
script_text = await self.llm_service.generate_text(
prompt,
max_tokens=target_words * 2,
temperature=0.8
)
# Parse into dialogue
dialogue = self._parse_script(script_text)
if not dialogue:
raise ValueError("Failed to parse script into dialogue lines")
word_count = sum(len(line.text.split()) for line in dialogue)
duration_estimate = word_count / self.WORDS_PER_MINUTE
return PodcastScript(
dialogue=dialogue,
total_duration_estimate=duration_estimate * 60,
word_count=word_count,
style=style
)
def _parse_script(self, script_text: str) -> List[DialogueLine]:
"""Parse generated script into dialogue lines"""
dialogue = []
lines = script_text.split('\n')
for line in lines:
line = line.strip()
if not line:
continue
if line.startswith('HOST1:'):
text = line[6:].strip()
if text:
dialogue.append(DialogueLine(speaker="HOST1", text=text))
elif line.startswith('HOST2:'):
text = line[6:].strip()
if text:
dialogue.append(DialogueLine(speaker="HOST2", text=text))
return dialogue
def _get_voice_id(self, voice_name: str) -> str:
"""Get voice ID from voice name"""
try:
# Use cache if available
if not self._voice_cache:
voices = self.elevenlabs_client.voices.get_all()
if not voices or not voices.voices:
raise RuntimeError("No voices available")
for voice in voices.voices:
self._voice_cache[voice.name.lower()] = voice.voice_id
# Exact match
if voice_name.lower() in self._voice_cache:
return self._voice_cache[voice_name.lower()]
# Partial match
for name, voice_id in self._voice_cache.items():
if voice_name.lower() in name:
logger.info(f"Partial match for '{voice_name}': {name}")
return voice_id
# Fallback
first_voice_id = list(self._voice_cache.values())[0]
logger.warning(f"Voice '{voice_name}' not found, using default")
return first_voice_id
except Exception as e:
logger.error(f"Could not fetch voices: {e}")
raise RuntimeError(f"Failed to get voice ID: {str(e)}")
async def synthesize_audio(
self,
podcast_id: str,
script: PodcastScript,
host1_voice: str,
host2_voice: str
) -> Path:
"""Synthesize audio with alternating voices"""
if not self.elevenlabs_client:
raise RuntimeError("ElevenLabs client not initialized")
audio_file = self.podcast_dir / f"{podcast_id}.mp3"
try:
# Get voice IDs
host1_voice_id = self._get_voice_id(host1_voice)
host2_voice_id = self._get_voice_id(host2_voice)
logger.info(f"HOST1: {host1_voice}, HOST2: {host2_voice}")
voice_map = {
"HOST1": host1_voice_id,
"HOST2": host2_voice_id
}
audio_chunks = []
# Process each line with correct voice
for i, line in enumerate(script.dialogue):
logger.info(f"Line {i+1}/{len(script.dialogue)}: {line.speaker}")
voice_id = voice_map.get(line.speaker, host1_voice_id)
audio_generator = self.elevenlabs_client.text_to_speech.convert(
voice_id=voice_id,
text=line.text,
model_id="eleven_multilingual_v2"
)
line_chunks = []
for chunk in audio_generator:
if chunk:
line_chunks.append(chunk)
if line_chunks:
audio_chunks.append(b''.join(line_chunks))
if not audio_chunks:
raise RuntimeError("No audio chunks generated")
full_audio = b''.join(audio_chunks)
with open(audio_file, 'wb') as f:
f.write(full_audio)
if audio_file.exists() and audio_file.stat().st_size > 1000:
logger.info(f"Audio created: {audio_file} ({audio_file.stat().st_size} bytes)")
return audio_file
else:
raise RuntimeError("Audio file too small or empty")
except Exception as e:
logger.error(f"Audio synthesis failed: {e}", exc_info=True)
raise RuntimeError(f"Failed to generate audio: {str(e)}")
def _create_metadata(
self,
podcast_id: str,
analysis: DocumentAnalysis,
script: PodcastScript,
audio_path: Path,
voices: set,
document_ids: List[str],
style: str
) -> PodcastMetadata:
"""Create podcast metadata"""
title = f"Podcast: {analysis.topics[0] if analysis.topics else 'Document Discussion'}"
description = f"A {style} podcast discussing: {', '.join(analysis.source_documents)}"
file_size_mb = audio_path.stat().st_size / (1024 * 1024) if audio_path.exists() else 0
llm_cost = (script.word_count / 1000) * 0.01
tts_cost = (script.word_count * 5 / 1000) * 0.30
return PodcastMetadata(
podcast_id=podcast_id,
title=title,
description=description,
source_documents=analysis.source_documents,
style=style,
duration_seconds=script.total_duration_estimate,
file_size_mb=file_size_mb,
voices={"host1": list(voices)[0] if len(voices) > 0 else "Rachel",
"host2": list(voices)[1] if len(voices) > 1 else "Adam"},
generated_at=datetime.now().isoformat(),
generation_cost={"llm_cost": llm_cost, "tts_cost": tts_cost, "total": llm_cost + tts_cost},
key_topics=analysis.topics
)
def _save_metadata(self, metadata: PodcastMetadata):
"""Save metadata to database"""
try:
import json
existing = json.loads(self.metadata_file.read_text())
existing.append(asdict(metadata))
self.metadata_file.write_text(json.dumps(existing, indent=2))
logger.info(f"Metadata saved: {metadata.podcast_id}")
except Exception as e:
logger.error(f"Failed to save metadata: {e}")
def list_podcasts(self, limit: int = 10) -> List[PodcastMetadata]:
"""List generated podcasts"""
try:
import json
data = json.loads(self.metadata_file.read_text())
podcasts = [PodcastMetadata(**item) for item in data[-limit:]]
return list(reversed(podcasts))
except Exception as e:
logger.error(f"Failed to list podcasts: {e}")
return []
def get_podcast(self, podcast_id: str) -> Optional[PodcastMetadata]:
"""Get specific podcast metadata"""
try:
import json
data = json.loads(self.metadata_file.read_text())
for item in data:
if item.get('podcast_id') == podcast_id:
return PodcastMetadata(**item)
return None
except Exception as e:
logger.error(f"Failed to get podcast: {e}")
return None