Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import requests | |
| import io | |
| from pypdf import PdfReader | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import numpy as np | |
| import faiss | |
| from sentence_transformers import SentenceTransformer, CrossEncoder | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline, pipeline as hf_pipeline | |
| from accelerate import Accelerator | |
| from langchain.text_splitter import NLTKTextSplitter | |
| from rank_bm25 import BM25Okapi | |
| import os | |
| import pickle | |
| import nltk | |
| nltk.download('punkt_tab') | |
| # --- Global Variables for Caching --- | |
| index = None | |
| chunks = None | |
| embedding_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') | |
| generator = None | |
| # --- PDF Processing and Embedding --- | |
| def download_pdf(url): | |
| response = requests.get(url, stream=True) | |
| response.raise_for_status() | |
| return response.content | |
| def custom_chunking(text, delimiter="\n\n"): | |
| """Splits text based on a specified delimiter.""" | |
| return text.split(delimiter) | |
| def extract_text_from_pdf(pdf_bytes, document_id): | |
| """Extracts text from a PDF, page by page, and then chunks each page.""" | |
| pdf_file = io.BytesIO(pdf_bytes) | |
| reader = PdfReader(pdf_file) | |
| nltk_splitter = NLTKTextSplitter(chunk_size=500) | |
| extracted_data = [] | |
| for page_num, page in enumerate(reader.pages): | |
| page_text = page.extract_text() or "" | |
| clean_text = " ".join(page_text.split()) | |
| if clean_text: | |
| words = clean_text.split() | |
| section_header = " ".join(words[:20]) if words else "No Section Name Found" | |
| custom_chunks = custom_chunking(clean_text) | |
| for custom_chunk in custom_chunks: | |
| clean_custom_chunk = " ".join(custom_chunk.split()) | |
| if clean_custom_chunk: | |
| nltk_chunks = nltk_splitter.split_text(clean_custom_chunk) | |
| for nltk_chunk in nltk_chunks: | |
| clean_nltk_chunk = " ".join(nltk_chunk.split()) | |
| if clean_nltk_chunk: | |
| extracted_data.append({ | |
| "document_id": document_id, | |
| "section_header": section_header, | |
| "text": clean_nltk_chunk | |
| }) | |
| return extracted_data | |
| def process_single_pdf(url, doc_id): | |
| """Processes a single PDF.""" | |
| pdf_bytes = download_pdf(url) | |
| return extract_text_from_pdf(pdf_bytes, doc_id) | |
| def process_pdfs_parallel(pdf_urls, document_ids): | |
| """Processes multiple PDFs in parallel.""" | |
| all_data = [] | |
| with ThreadPoolExecutor() as pdf_executor: | |
| pdf_futures = [pdf_executor.submit(process_single_pdf, url, doc_id) for url, doc_id in zip(pdf_urls, document_ids)] | |
| for future in as_completed(pdf_futures): | |
| all_data.extend(future.result()) | |
| return all_data | |
| def create_embeddings_and_index(data): | |
| """Create Embeddings""" | |
| texts = [item['text'] for item in data] | |
| embeddings = embedding_model.encode(texts) | |
| dimension = embeddings.shape[1] | |
| index = faiss.IndexFlatL2(dimension) | |
| index.add(embeddings) | |
| return index, data | |
| # --- Retrieval Functions --- | |
| def bm25_retrieval(query, documents, top_k=10): | |
| tokenized_docs = [doc['text'].split() for doc in documents] | |
| bm25 = BM25Okapi(tokenized_docs) | |
| doc_scores = bm25.get_scores(query.split()) | |
| top_indices = np.argsort(doc_scores)[::-1][:top_k] | |
| return [documents[i] for i in top_indices] | |
| def adaptive_retrieval(query, index, chunks, top_k=10): | |
| query_embedding = embedding_model.encode([query], convert_to_numpy=True) | |
| _, indices = index.search(query_embedding, top_k) | |
| vector_results = [chunks[i] for i in indices[0]] | |
| bm25_results = bm25_retrieval(query, chunks, top_k) | |
| combined_results = vector_results + bm25_results | |
| unique_results = [] | |
| seen_texts = set() | |
| for result in combined_results: | |
| if result['text'] not in seen_texts: | |
| unique_results.append(result) | |
| seen_texts.add(result['text']) | |
| return unique_results | |
| def rerank(query, results, keyword_weight=0.3, cross_encoder_weight=0.7): | |
| """Combines keyword-based and cross-encoder reranking.""" | |
| # Keyword-based scoring | |
| keywords = query.lower().split() | |
| def score_chunk_keywords(chunk): | |
| text = chunk['text'].lower() | |
| return sum(1 for keyword in keywords if keyword in text) | |
| keyword_scores = [score_chunk_keywords(chunk) for chunk in results] | |
| # Cross-encoder scoring | |
| rerank_model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-12-v2') | |
| query_results = [[query, f"Document: {result['document_id']}, Section: {result['section_header']}, Text: {result['text']}"] for result in results] | |
| cross_encoder_scores = rerank_model.predict(query_results) | |
| # Combine scores | |
| combined_scores = [(keyword_scores[i] * keyword_weight) + (cross_encoder_scores[i] * cross_encoder_weight) for i in range(len(results))] | |
| # Rank and select top 3 | |
| ranked_results = [results[i] for i in np.argsort(combined_scores)[::-1]] | |
| return ranked_results[:3] | |
| def merge_chunks(retrieved_chunks): | |
| """Merges chunks based on their original order, including metadata.""" | |
| merged_text = " ".join([ | |
| f"Document: {chunk['document_id']}, Section: {chunk['section_header']}, Text: {chunk['text']}" | |
| for chunk in retrieved_chunks | |
| ]) | |
| return merged_text | |
| # --- Confidence Calculation --- | |
| def calculate_confidence(query, context, answer): | |
| """Calculates confidence score based on question-context and context-answer similarity.""" | |
| query_embedding = embedding_model.encode([query], convert_to_numpy=True) | |
| context_embedding = embedding_model.encode([context], convert_to_numpy=True) | |
| answer_embedding = embedding_model.encode([answer], convert_to_numpy=True) | |
| query_context_similarity = np.dot(query_embedding, context_embedding.T).item() | |
| context_answer_similarity = np.dot(context_embedding, answer_embedding.T).item() | |
| confidence = (query_context_similarity + context_answer_similarity) / 2.0 # Equal weights | |
| return confidence | |
| # --- Response Generation --- | |
| def generate_response(query, context): | |
| prompt = f"""Your task is to analyze the given Context and take the answer for the Question and provide a clear relevant answer in plain English. | |
| **Guidelines:** | |
| - JUST PROVIDE ONLY THE ANSWER. | |
| - Provide a elaborate, factual answer based strictly on the Context. | |
| - Avoid generating Python code, solutions, or any irrelevant information. | |
| Context: {context} | |
| Question: {query} | |
| Answer:""" | |
| response = generator(prompt, max_new_tokens=500, num_return_sequences=1)[0]['generated_text'] | |
| return response | |
| # --- Guardrail --- | |
| def is_sensitive_query(query): | |
| sensitive_keywords = ["personal", "address", "phone", "ssn", "credit card", "bank account", "password", "social security", "private", "location"] | |
| query_lower = query.lower() | |
| if any(keyword in query_lower for keyword in sensitive_keywords): | |
| return True | |
| classifier = hf_pipeline("text-classification", model="unitary/toxic-bert") | |
| result = classifier(query)[0] | |
| if result["label"] == "toxic" and result["score"] > 0.7: | |
| return True | |
| return False | |
| # --- Process Query --- | |
| def process_query(query): | |
| if is_sensitive_query(query): | |
| return "I cannot answer questions that involve sensitive or personal information, or that are toxic in nature." | |
| retrieved_chunks = adaptive_retrieval(query, index, chunks) | |
| reranked_chunks = rerank(query, retrieved_chunks) | |
| final_chunks = reranked_chunks[:3] | |
| merged_result = merge_chunks(final_chunks) | |
| answer = generate_response(query, merged_result) | |
| if "</think>" in answer: | |
| answer = answer.split("</think>", 1)[-1].strip() | |
| confidence = calculate_confidence(query, merged_result, answer) | |
| full_response = f"{answer}\n\nConfidence: {confidence:.2f}" | |
| return full_response | |
| # --- Initialization --- | |
| def initialize_app(): | |
| global index, chunks, generator | |
| pdf_urls = ["https://www.latentview.com/wp-content/uploads/2023/07/LatentView-Annual-Report-2022-23.pdf", | |
| "https://www.latentview.com/wp-content/uploads/2024/08/LatentView-Annual-Report-2023-24.pdf"] | |
| document_ids = ["LatentView-Annual-Report-2022-23", "LatentView-Annual-Report-2023-24"] | |
| if os.path.exists('vector_cache.pkl'): | |
| with open('vector_cache.pkl', 'rb') as f: | |
| index, chunks = pickle.load(f) | |
| else: | |
| extracted_data = process_pdfs_parallel(pdf_urls, document_ids) | |
| index, chunks = create_embeddings_and_index(extracted_data) | |
| with open('vector_cache.pkl', 'wb') as f: | |
| pickle.dump((index, chunks), f) | |
| accelerator = Accelerator() | |
| accelerator.free_memory() | |
| MODEL_NAME = "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B" | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
| model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, device_map="auto") | |
| model = accelerator.prepare(model) | |
| generator = pipeline("text-generation", model=model, tokenizer=tokenizer) | |
| return "Initialization Complete!" | |
| # --- Gradio Interface --- | |
| def gradio_interface(query): | |
| return process_query(query) | |
| iface = gr.Interface( | |
| fn=gradio_interface, | |
| inputs=gr.Textbox(lines=2, placeholder="Enter your question here..."), | |
| outputs=gr.Textbox(lines=5, placeholder="Answer will appear here..."), | |
| title="Annual Report Q&A Chatbot (LatentView Analytics)", | |
| description="Ask questions about the company's annual reports. (2022-23 & 2023-24)", | |
| examples=[ | |
| ["What is the total revenue from operations for 2023-24?"], | |
| ["Who is the CEO of Latentview Analytics? "], | |
| ["Summarize the key financial highlights in 2023-24"], | |
| ["What were the total expenses for 2022-23?"], | |
| ], | |
| cache_examples=False, | |
| ) | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Annual Report Q&A Chatbot (LatentView Analytics)") | |
| init_button = gr.Button("Initialize") | |
| init_output = gr.Textbox(label="Initialization Status") | |
| init_button.click( | |
| fn=initialize_app, | |
| inputs=[], | |
| outputs=init_output, | |
| ) | |
| iface.render() | |
| demo.launch() |