| |
| """ |
| Document Intelligence RAG End-to-End Example |
| |
| Demonstrates the complete RAG workflow: |
| 1. Parse documents into semantic chunks |
| 2. Index chunks into vector store |
| 3. Semantic retrieval with filters |
| 4. Grounded question answering with evidence |
| 5. Evidence visualization |
| |
| Requirements: |
| - ChromaDB: pip install chromadb |
| - Ollama running with nomic-embed-text model: ollama pull nomic-embed-text |
| - PyMuPDF: pip install pymupdf |
| """ |
|
|
| import sys |
| from pathlib import Path |
|
|
| |
| sys.path.insert(0, str(Path(__file__).parent.parent)) |
|
|
|
|
| def check_dependencies(): |
| """Check that required dependencies are available.""" |
| missing = [] |
|
|
| try: |
| import chromadb |
| except ImportError: |
| missing.append("chromadb") |
|
|
| try: |
| import fitz |
| except ImportError: |
| missing.append("pymupdf") |
|
|
| if missing: |
| print("Missing dependencies:") |
| for dep in missing: |
| print(f" - {dep}") |
| print("\nInstall with: pip install " + " ".join(missing)) |
| return False |
|
|
| |
| try: |
| import requests |
| response = requests.get("http://localhost:11434/api/tags", timeout=2) |
| if response.status_code != 200: |
| print("Warning: Ollama server not responding") |
| print("Start Ollama with: ollama serve") |
| print("Then pull the embedding model: ollama pull nomic-embed-text") |
| except: |
| print("Warning: Could not connect to Ollama server") |
| print("The example will still work but with mock embeddings") |
|
|
| return True |
|
|
|
|
| def demo_parse_and_index(doc_paths: list): |
| """ |
| Demo: Parse documents and index into vector store. |
| |
| Args: |
| doc_paths: List of document file paths |
| """ |
| print("\n" + "=" * 60) |
| print("STEP 1: PARSE AND INDEX DOCUMENTS") |
| print("=" * 60) |
|
|
| from src.document_intelligence import DocumentParser, ParserConfig |
| from src.document_intelligence.tools import get_rag_tool |
|
|
| |
| index_tool = get_rag_tool("index_document") |
|
|
| results = [] |
| for doc_path in doc_paths: |
| print(f"\nProcessing: {doc_path}") |
|
|
| |
| config = ParserConfig(render_dpi=200, max_pages=10) |
| parser = DocumentParser(config=config) |
|
|
| try: |
| parse_result = parser.parse(doc_path) |
| print(f" Parsed: {len(parse_result.chunks)} chunks, {parse_result.num_pages} pages") |
|
|
| |
| result = index_tool.execute(parse_result=parse_result) |
|
|
| if result.success: |
| print(f" Indexed: {result.data['chunks_indexed']} chunks") |
| print(f" Document ID: {result.data['document_id']}") |
| results.append({ |
| "path": doc_path, |
| "doc_id": result.data['document_id'], |
| "chunks": result.data['chunks_indexed'], |
| }) |
| else: |
| print(f" Error: {result.error}") |
|
|
| except Exception as e: |
| print(f" Failed: {e}") |
|
|
| return results |
|
|
|
|
| def demo_semantic_retrieval(query: str, document_id: str = None): |
| """ |
| Demo: Semantic retrieval from vector store. |
| |
| Args: |
| query: Search query |
| document_id: Optional document filter |
| """ |
| print("\n" + "=" * 60) |
| print("STEP 2: SEMANTIC RETRIEVAL") |
| print("=" * 60) |
|
|
| from src.document_intelligence.tools import get_rag_tool |
|
|
| retrieve_tool = get_rag_tool("retrieve_chunks") |
|
|
| print(f"\nQuery: \"{query}\"") |
| if document_id: |
| print(f"Document filter: {document_id}") |
|
|
| result = retrieve_tool.execute( |
| query=query, |
| top_k=5, |
| document_id=document_id, |
| include_evidence=True, |
| ) |
|
|
| if result.success: |
| chunks = result.data.get("chunks", []) |
| print(f"\nFound {len(chunks)} relevant chunks:\n") |
|
|
| for i, chunk in enumerate(chunks, 1): |
| print(f"{i}. [similarity={chunk['similarity']:.3f}]") |
| print(f" Page {chunk.get('page', '?')}, Type: {chunk.get('chunk_type', 'unknown')}") |
| print(f" Text: {chunk['text'][:150]}...") |
| print() |
|
|
| |
| if result.evidence: |
| print("Evidence references:") |
| for ev in result.evidence[:3]: |
| print(f" - Chunk {ev['chunk_id'][:12]}... Page {ev.get('page', '?')}") |
|
|
| return chunks |
| else: |
| print(f"Error: {result.error}") |
| return [] |
|
|
|
|
| def demo_grounded_qa(question: str, document_id: str = None): |
| """ |
| Demo: Grounded question answering with evidence. |
| |
| Args: |
| question: Question to answer |
| document_id: Optional document filter |
| """ |
| print("\n" + "=" * 60) |
| print("STEP 3: GROUNDED QUESTION ANSWERING") |
| print("=" * 60) |
|
|
| from src.document_intelligence.tools import get_rag_tool |
|
|
| qa_tool = get_rag_tool("rag_answer") |
|
|
| print(f"\nQuestion: \"{question}\"") |
|
|
| result = qa_tool.execute( |
| question=question, |
| document_id=document_id, |
| top_k=5, |
| ) |
|
|
| if result.success: |
| data = result.data |
| print(f"\nAnswer: {data.get('answer', 'No answer')}") |
| print(f"Confidence: {data.get('confidence', 0):.2f}") |
|
|
| if data.get('abstained'): |
| print("Note: System abstained due to low confidence") |
|
|
| |
| citations = data.get('citations', []) |
| if citations: |
| print("\nCitations:") |
| for cit in citations: |
| print(f" [{cit['index']}] {cit.get('text', '')[:80]}...") |
|
|
| |
| if result.evidence: |
| print("\nEvidence locations:") |
| for ev in result.evidence: |
| print(f" - Page {ev.get('page', '?')}: {ev.get('snippet', '')[:60]}...") |
|
|
| return data |
| else: |
| print(f"Error: {result.error}") |
| return None |
|
|
|
|
| def demo_filtered_retrieval(): |
| """ |
| Demo: Retrieval with various filters. |
| """ |
| print("\n" + "=" * 60) |
| print("STEP 4: FILTERED RETRIEVAL") |
| print("=" * 60) |
|
|
| from src.document_intelligence.tools import get_rag_tool |
|
|
| retrieve_tool = get_rag_tool("retrieve_chunks") |
|
|
| |
| print("\n--- Retrieving only table chunks ---") |
| result = retrieve_tool.execute( |
| query="data values", |
| top_k=3, |
| chunk_types=["table"], |
| ) |
|
|
| if result.success: |
| chunks = result.data.get("chunks", []) |
| print(f"Found {len(chunks)} table chunks") |
| for chunk in chunks: |
| print(f" - Page {chunk.get('page', '?')}: {chunk['text'][:80]}...") |
|
|
| |
| print("\n--- Retrieving from pages 1-3 only ---") |
| result = retrieve_tool.execute( |
| query="content", |
| top_k=3, |
| page_range=(1, 3), |
| ) |
|
|
| if result.success: |
| chunks = result.data.get("chunks", []) |
| print(f"Found {len(chunks)} chunks from pages 1-3") |
| for chunk in chunks: |
| print(f" - Page {chunk.get('page', '?')}: {chunk['text'][:80]}...") |
|
|
|
|
| def demo_index_stats(): |
| """ |
| Demo: Show index statistics. |
| """ |
| print("\n" + "=" * 60) |
| print("INDEX STATISTICS") |
| print("=" * 60) |
|
|
| from src.document_intelligence.tools import get_rag_tool |
|
|
| stats_tool = get_rag_tool("get_index_stats") |
| result = stats_tool.execute() |
|
|
| if result.success: |
| data = result.data |
| print(f"\nTotal chunks indexed: {data.get('total_chunks', 0)}") |
| print(f"Embedding model: {data.get('embedding_model', 'unknown')}") |
| print(f"Embedding dimension: {data.get('embedding_dimension', 'unknown')}") |
| else: |
| print(f"Error: {result.error}") |
|
|
|
|
| def main(): |
| """Run the complete RAG demo.""" |
| print("=" * 60) |
| print("SPARKNET Document Intelligence RAG Demo") |
| print("=" * 60) |
|
|
| |
| if not check_dependencies(): |
| print("\nPlease install missing dependencies and try again.") |
| return |
|
|
| |
| sample_paths = [ |
| Path("Dataset/Patent_1.pdf"), |
| Path("data/sample.pdf"), |
| Path("tests/fixtures/sample.pdf"), |
| ] |
|
|
| doc_paths = [] |
| for path in sample_paths: |
| if path.exists(): |
| doc_paths.append(str(path)) |
| break |
|
|
| if not doc_paths: |
| print("\nNo sample documents found.") |
| print("Please provide a PDF file path as argument.") |
| print("\nUsage: python document_rag_end_to_end.py [path/to/document.pdf]") |
|
|
| if len(sys.argv) > 1: |
| doc_paths = sys.argv[1:] |
| else: |
| return |
|
|
| print(f"\nUsing documents: {doc_paths}") |
|
|
| try: |
| |
| indexed_docs = demo_parse_and_index(doc_paths) |
|
|
| if not indexed_docs: |
| print("\nNo documents were indexed. Exiting.") |
| return |
|
|
| |
| first_doc_id = indexed_docs[0]["doc_id"] |
|
|
| |
| demo_semantic_retrieval( |
| query="main topic content", |
| document_id=first_doc_id, |
| ) |
|
|
| |
| demo_grounded_qa( |
| question="What is this document about?", |
| document_id=first_doc_id, |
| ) |
|
|
| |
| demo_filtered_retrieval() |
|
|
| |
| demo_index_stats() |
|
|
| print("\n" + "=" * 60) |
| print("Demo complete!") |
| print("=" * 60) |
|
|
| print("\nNext steps:") |
| print(" 1. Try the CLI: sparknet docint index your_document.pdf") |
| print(" 2. Query the index: sparknet docint retrieve 'your query'") |
| print(" 3. Ask questions: sparknet docint ask doc.pdf 'question' --use-rag") |
|
|
| except ImportError as e: |
| print(f"\nImport error: {e}") |
| print("Make sure all dependencies are installed:") |
| print(" pip install pymupdf pillow numpy pydantic chromadb") |
|
|
| except Exception as e: |
| print(f"\nError: {e}") |
| import traceback |
| traceback.print_exc() |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|