Spaces:
Running
Running
batuhanozkose
feat: Implement Podcast Persona Framework (PPF) - Revolutionary adaptive conversation system
2bd49fc
| import time | |
| from generation.script_generator import get_generator | |
| from processing.pdf_reader import extract_text_from_pdf | |
| from processing.url_fetcher import fetch_paper_from_url | |
| from synthesis.tts_engine import get_tts_engine | |
| from utils.config import ( | |
| MAX_CONTEXT_CHARS, | |
| ) | |
| from utils.history import save_to_history | |
| class PodcastAgent: | |
| def __init__( | |
| self, | |
| provider_mode="own_inference", | |
| own_base_url=None, | |
| own_api_key=None, | |
| own_model=None, | |
| openai_key=None, | |
| openai_model=None, | |
| tts_provider="elevenlabs", | |
| elevenlabs_key=None, | |
| host_voice=None, | |
| guest_voice=None, | |
| max_tokens=None, | |
| target_dialogue_count=15, | |
| context_limit=None, | |
| persona_mode="friendly_explainer", | |
| ): | |
| """ | |
| Initialize PodcastAgent with user-provided settings (BYOK). | |
| Args: | |
| provider_mode: "own_inference" or "openai" | |
| own_base_url: Base URL for own inference server | |
| own_api_key: API key for own inference server | |
| own_model: Model name for own inference server | |
| openai_key: OpenAI API key | |
| openai_model: OpenAI model name | |
| tts_provider: "elevenlabs" (ElevenLabs required) | |
| elevenlabs_key: ElevenLabs API key (required) | |
| host_voice: Voice ID for host | |
| guest_voice: Voice ID for guest | |
| max_tokens: Maximum tokens for generation | |
| target_dialogue_count: Target number of dialogue exchanges (default: 15) | |
| context_limit: Maximum characters for multi-paper processing (default: MAX_CONTEXT_CHARS) | |
| persona_mode: Podcast persona mode (default: "friendly_explainer") | |
| """ | |
| self.logs = [] | |
| self.provider_mode = provider_mode # "own_inference" or "openai" | |
| self.own_base_url = own_base_url | |
| self.own_api_key = own_api_key | |
| self.own_model = own_model | |
| self.openai_key = openai_key | |
| self.openai_model = openai_model | |
| self.tts_provider = tts_provider | |
| self.elevenlabs_key = elevenlabs_key | |
| self.host_voice = host_voice | |
| self.guest_voice = guest_voice | |
| self.max_tokens = max_tokens | |
| self.target_dialogue_count = target_dialogue_count | |
| self.context_limit = context_limit if context_limit else MAX_CONTEXT_CHARS | |
| self.persona_mode = persona_mode | |
| def log(self, message): | |
| timestamp = time.strftime("%H:%M:%S") | |
| entry = f"[{timestamp}] {message}" | |
| print(entry) | |
| self.logs.append(entry) | |
| return entry | |
| def process(self, url: str = None, pdf_file=None): | |
| """ | |
| Orchestrates the conversion from URL or uploaded PDF to Podcast. | |
| Args: | |
| url: Paper URL (arXiv or medRxiv) | |
| pdf_file: Uploaded PDF file object | |
| """ | |
| # Determine source | |
| if pdf_file: | |
| yield self.log( | |
| f"Received uploaded PDF: {pdf_file.name if hasattr(pdf_file, 'name') else 'file'}" | |
| ) | |
| pdf_path = pdf_file.name if hasattr(pdf_file, "name") else pdf_file | |
| source_ref = "Uploaded PDF" | |
| elif url: | |
| yield self.log(f"Received request for URL: {url}") | |
| # Step 1: Fetch Paper | |
| yield self.log("Thinking: I need to download the paper first.") | |
| yield self.log(f"Tool Call: fetch_paper({url})") | |
| pdf_path = fetch_paper_from_url(url) | |
| if not pdf_path: | |
| yield self.log("Error: Failed to download paper.") | |
| return None, "\n".join(self.logs) | |
| yield self.log(f"Paper downloaded to: {pdf_path}") | |
| source_ref = url | |
| else: | |
| yield self.log( | |
| "Error: No input provided. Please provide either a URL or upload a PDF." | |
| ) | |
| return None, "\n".join(self.logs) | |
| # Step 2: Read PDF | |
| yield self.log("Thinking: Now I need to extract the text content.") | |
| yield self.log(f"Tool Call: read_pdf({pdf_path})") | |
| text = extract_text_from_pdf(pdf_path) | |
| if not text: | |
| yield self.log("Error: Failed to extract text.") | |
| return None, self.logs | |
| text_length = len(text) | |
| yield self.log(f"Extracted {text_length:,} characters.") | |
| # Check context limit | |
| if text_length > self.context_limit: | |
| yield self.log(f"⚠️ Context limit exceeded!") | |
| yield self.log(f"Paper size: {text_length:,} characters") | |
| yield self.log(f"Maximum allowed: {self.context_limit:,} characters") | |
| yield self.log("") | |
| yield self.log("❌ Error: Paper is too large to process with current settings.") | |
| yield self.log("") | |
| yield self.log("📋 How to fix:") | |
| yield self.log(" 1. Open 'Advanced Options' below") | |
| yield self.log(" 2. Enable 'Batch Mode'") | |
| yield self.log(f" 3. Adjust 'Max Context Limit' slider to at least {text_length:,} characters") | |
| yield self.log(" 4. Try again") | |
| yield self.log("") | |
| return None, "\n".join(self.logs) | |
| # Step 3: Generate Script | |
| yield self.log( | |
| "Thinking: The text is ready. I will now generate a podcast script using the LLM." | |
| ) | |
| if self.provider_mode == "demo": | |
| yield self.log("Using Demo Inference") | |
| elif self.provider_mode == "own_inference": | |
| yield self.log(f"Using Own Inference: {self.own_base_url}") | |
| else: | |
| yield self.log(f"Using OpenAI ({self.openai_model or 'gpt-4o-mini'})") | |
| yield self.log("Tool Call: generate_script(...)") | |
| generator = get_generator( | |
| provider_mode=self.provider_mode, | |
| own_base_url=self.own_base_url, | |
| own_api_key=self.own_api_key, | |
| own_model=self.own_model, | |
| openai_key=self.openai_key, | |
| openai_model=self.openai_model, | |
| max_tokens=self.max_tokens, | |
| ) | |
| script = generator.generate_podcast_script(text, target_dialogue_count=self.target_dialogue_count, persona_mode=self.persona_mode) | |
| if not script: | |
| yield self.log("Error: Failed to generate script.") | |
| return None, self.logs | |
| yield self.log(f"Generated script with {len(script)} dialogue turns (target: {self.target_dialogue_count}).") | |
| # Step 4: Synthesize Audio | |
| yield self.log("Thinking: The script looks good. Sending it to the TTS engine.") | |
| tts_name = "ElevenLabs TTS" if self.tts_provider == "elevenlabs" else "Supertonic TTS (CPU)" | |
| yield self.log(f"Using {tts_name}") | |
| yield self.log("Tool Call: synthesize_podcast(...)") | |
| tts = get_tts_engine( | |
| tts_provider=self.tts_provider, | |
| custom_api_key=self.elevenlabs_key if self.tts_provider == "elevenlabs" else None, | |
| host_voice=self.host_voice, | |
| guest_voice=self.guest_voice | |
| ) | |
| audio_path = tts.synthesize_dialogue(script) | |
| if not audio_path: | |
| yield self.log("Error: Failed to synthesize audio.") | |
| return None, self.logs | |
| yield self.log(f"Podcast generated successfully at: {audio_path}") | |
| # Save to history | |
| save_to_history(source_ref, audio_path, len(script)) | |
| yield self.log("✓ Saved to history") | |
| return audio_path, "\n".join(self.logs) | |
| def process_multiple(self, urls: list = None, pdf_files: list = None): | |
| """ | |
| Orchestrates the conversion from multiple URLs or PDFs to a single comprehensive Podcast. | |
| Args: | |
| urls: List of paper URLs (arXiv or medRxiv) | |
| pdf_files: List of uploaded PDF file objects | |
| """ | |
| all_texts = [] | |
| source_refs = [] | |
| total_chars = 0 | |
| # Process URLs | |
| if urls: | |
| yield self.log(f"Received {len(urls)} URLs to process.") | |
| yield self.log(f"Context limit: {self.context_limit:,} characters") | |
| for i, url in enumerate(urls, 1): | |
| yield self.log(f"\n=== Processing Paper {i}/{len(urls)} ===") | |
| yield self.log(f"URL: {url}") | |
| # Step 1: Fetch Paper | |
| yield self.log(f"Tool Call: fetch_paper({url})") | |
| pdf_path = fetch_paper_from_url(url) | |
| if not pdf_path: | |
| yield self.log(f"Warning: Failed to download paper {i}, skipping.") | |
| continue | |
| yield self.log(f"Paper {i} downloaded successfully.") | |
| # Step 2: Read PDF | |
| yield self.log(f"Tool Call: read_pdf({pdf_path})") | |
| text = extract_text_from_pdf(pdf_path) | |
| if not text: | |
| yield self.log( | |
| f"Warning: Failed to extract text from paper {i}, skipping." | |
| ) | |
| continue | |
| text_length = len(text) | |
| yield self.log(f"Extracted {text_length:,} characters from paper {i}.") | |
| # Check context limit | |
| if total_chars + text_length > self.context_limit: | |
| yield self.log(f"⚠️ Context limit reached!") | |
| yield self.log( | |
| f"Current total: {total_chars:,} chars + Paper {i}: {text_length:,} chars = {total_chars + text_length:,} chars" | |
| ) | |
| yield self.log(f"Maximum allowed: {self.context_limit:,} chars") | |
| yield self.log( | |
| f"Stopping at {len(all_texts)} papers. Remaining papers will be skipped." | |
| ) | |
| break | |
| all_texts.append(f"=== PAPER {i} ===\n{text}\n") | |
| source_refs.append(url) | |
| total_chars += text_length | |
| yield self.log( | |
| f"✓ Paper {i} added. Total context: {total_chars:,} chars ({(total_chars / self.context_limit) * 100:.1f}% of limit)" | |
| ) | |
| # Process PDFs | |
| if pdf_files: | |
| yield self.log(f"\nReceived {len(pdf_files)} PDF files to process.") | |
| if not urls: # Only show limit if we didn't already show it for URLs | |
| yield self.log(f"Context limit: {self.context_limit:,} characters") | |
| for i, pdf_file in enumerate(pdf_files, 1): | |
| # Calculate paper number (continues from URL count) | |
| paper_num = (len(urls) if urls else 0) + i | |
| yield self.log(f"\n=== Processing PDF {i}/{len(pdf_files)} (Paper {paper_num}) ===") | |
| pdf_name = pdf_file.name if hasattr(pdf_file, "name") else f"file_{i}" | |
| yield self.log(f"File: {pdf_name}") | |
| pdf_path = pdf_file.name if hasattr(pdf_file, "name") else pdf_file | |
| # Read PDF | |
| yield self.log(f"Tool Call: read_pdf({pdf_path})") | |
| text = extract_text_from_pdf(pdf_path) | |
| if not text: | |
| yield self.log( | |
| f"Warning: Failed to extract text from PDF {i}, skipping." | |
| ) | |
| continue | |
| text_length = len(text) | |
| yield self.log(f"Extracted {text_length:,} characters from PDF {i}.") | |
| # Check context limit | |
| if total_chars + text_length > self.context_limit: | |
| yield self.log(f"⚠️ Context limit reached!") | |
| yield self.log( | |
| f"Current total: {total_chars:,} chars + PDF {i}: {text_length:,} chars = {total_chars + text_length:,} chars" | |
| ) | |
| yield self.log(f"Maximum allowed: {self.context_limit:,} chars") | |
| yield self.log( | |
| f"Stopping at {len(all_texts)} files. Remaining items will be skipped." | |
| ) | |
| break | |
| all_texts.append(f"=== PAPER {paper_num} ===\n{text}\n") | |
| source_refs.append(f"Uploaded PDF: {pdf_name}") | |
| total_chars += text_length | |
| yield self.log( | |
| f"✓ PDF {i} added. Total context: {total_chars:,} chars ({(total_chars / self.context_limit) * 100:.1f}% of limit)" | |
| ) | |
| if not all_texts: | |
| yield self.log("Error: No papers were successfully processed.") | |
| return None, "\n".join(self.logs) | |
| # Combine all texts | |
| yield self.log(f"\n✓ Successfully processed {len(all_texts)} papers") | |
| yield self.log( | |
| f"Total context: {total_chars:,} characters ({(total_chars / self.context_limit) * 100:.1f}% of limit)" | |
| ) | |
| yield self.log( | |
| f"Thinking: Now I'll combine all papers into a comprehensive podcast script." | |
| ) | |
| combined_text = "\n\n".join(all_texts) | |
| # Step 3: Generate Comprehensive Script | |
| yield self.log( | |
| "\nThinking: Creating a comprehensive podcast script covering all papers." | |
| ) | |
| if self.provider_mode == "demo": | |
| yield self.log("Using Demo Inference") | |
| elif self.provider_mode == "own_inference": | |
| yield self.log(f"Using Own Inference: {self.own_base_url}") | |
| else: | |
| yield self.log(f"Using OpenAI ({self.openai_model or 'gpt-4o-mini'})") | |
| yield self.log("Tool Call: generate_script(...)") | |
| generator = get_generator( | |
| provider_mode=self.provider_mode, | |
| own_base_url=self.own_base_url, | |
| own_api_key=self.own_api_key, | |
| own_model=self.own_model, | |
| openai_key=self.openai_key, | |
| openai_model=self.openai_model, | |
| max_tokens=self.max_tokens, | |
| ) | |
| # Add instruction for multi-paper script | |
| multi_paper_prompt = f"[MULTIPLE PAPERS - {len(all_texts)} papers total. Create a comprehensive podcast discussing all papers.]\n\n{combined_text}" | |
| script = generator.generate_podcast_script(multi_paper_prompt, target_dialogue_count=self.target_dialogue_count, persona_mode=self.persona_mode) | |
| if not script: | |
| yield self.log("Error: Failed to generate script.") | |
| return None, self.logs | |
| yield self.log( | |
| f"Generated comprehensive script with {len(script)} dialogue turns." | |
| ) | |
| # Step 4: Synthesize Audio | |
| yield self.log( | |
| "\nThinking: The script looks good. Sending it to the TTS engine." | |
| ) | |
| tts_name = "ElevenLabs TTS" if self.tts_provider == "elevenlabs" else "Supertonic TTS (CPU)" | |
| yield self.log(f"Using {tts_name}") | |
| yield self.log("Tool Call: synthesize_podcast(...)") | |
| tts = get_tts_engine( | |
| tts_provider=self.tts_provider, | |
| custom_api_key=self.elevenlabs_key if self.tts_provider == "elevenlabs" else None, | |
| host_voice=self.host_voice, | |
| guest_voice=self.guest_voice | |
| ) | |
| audio_path = tts.synthesize_dialogue(script) | |
| if not audio_path: | |
| yield self.log("Error: Failed to synthesize audio.") | |
| return None, self.logs | |
| yield self.log(f"Podcast generated successfully at: {audio_path}") | |
| # Save to history | |
| source_ref = f"Multiple papers: {', '.join(source_refs[:3])}{'...' if len(source_refs) > 3 else ''}" | |
| save_to_history(source_ref, audio_path, len(script)) | |
| yield self.log("✓ Saved to history") | |
| return audio_path, "\n".join(self.logs) | |