| import openai |
| import sqlite3 |
| import numpy as np |
| from sklearn.metrics.pairwise import cosine_similarity |
| import os |
| import gradio as gr |
| from docx import Document |
| from PyPDF2 import PdfFileReader |
| import re |
| from gradio import Interface, components |
|
|
|
|
| |
| openai.api_key = os.environ["Secret"] |
|
|
| def find_closest_neighbors(vector1, dictionary_of_vectors): |
| vector = openai.Embedding.create( |
| input=vector1, |
| engine="text-embedding-ada-002" |
| )['data'][0]['embedding'] |
| vector = np.array(vector) |
|
|
| cosine_similarities = {} |
| for key, value in dictionary_of_vectors.items(): |
| cosine_similarities[key] = cosine_similarity(vector.reshape(1, -1), value.reshape(1, -1))[0][0] |
|
|
| sorted_cosine_similarities = sorted(cosine_similarities.items(), key=lambda x: x[1], reverse=True) |
| return sorted_cosine_similarities[0:4] |
|
|
| def extract_words_from_docx(filename): |
| doc = Document(filename) |
| full_text = [] |
| for paragraph in doc.paragraphs: |
| full_text.append(paragraph.text) |
| text = '\n'.join(full_text) |
| return re.findall(r'\b\w+\b', text) |
|
|
| def extract_words_from_pdf(filename): |
| with open(filename, "rb") as file: |
| pdf = PdfFileReader(file) |
| text = "" |
| for page_num in range(pdf.getNumPages()): |
| text += pdf.getPage(page_num).extractText() |
| return re.findall(r'\b\w+\b', text) |
|
|
| def process_file(file_obj): |
| if file_obj is not None: |
| |
| if file_obj.name.endswith('.docx'): |
| words = extract_words_from_docx(file_obj.name) |
| elif file_obj.name.endswith('.pdf'): |
| words = extract_words_from_pdf(file_obj.name) |
| else: |
| return "Unsupported file type." |
|
|
| |
| conn = sqlite3.connect('text_chunks_with_embeddings (1).db') |
| cursor = conn.cursor() |
|
|
| chunks = [" ".join(words[i:i+200]) for i in range(0, len(words), 200)] |
| for chunk in chunks: |
| embedding = openai.Embedding.create(input=chunk, engine="text-embedding-ada-002")['data'][0]['embedding'] |
| embedding_str = " ".join(map(str, embedding)) |
| cursor.execute("INSERT INTO chunks (text, embedding) VALUES (?, ?)", (chunk, embedding_str)) |
|
|
| conn.commit() |
| conn.close() |
| return "File processed and added to database." |
|
|
| return "No file uploaded." |
|
|
| def predict(message, history, file_obj=None): |
| |
| if file_obj: |
| process_file(file_obj) |
|
|
| |
| conn = sqlite3.connect('text_chunks_with_embeddings (1).db') |
| cursor = conn.cursor() |
| cursor.execute("SELECT text, embedding FROM chunks") |
| rows = cursor.fetchall() |
|
|
| dictionary_of_vectors = {} |
| for row in rows: |
| text = row[0] |
| embedding_str = row[1] |
| embedding = np.fromstring(embedding_str, sep=' ') |
| dictionary_of_vectors[text] = embedding |
| conn.close() |
|
|
| match_list = find_closest_neighbors(message, dictionary_of_vectors) |
| context = '' |
| for match in match_list: |
| context += str(match[0]) |
| context = context[:1500] |
|
|
| prep = f"This is an OpenAI model designed to answer questions specific to grant-making applications for an aquarium. Here is some question-specific context: {context}. Q: {message} A: " |
|
|
| history_openai_format = [] |
| for human, assistant in history: |
| history_openai_format.append({"role": "user", "content": human}) |
| history_openai_format.append({"role": "assistant", "content": assistant}) |
| history_openai_format.append({"role": "user", "content": prep}) |
|
|
| response = openai.ChatCompletion.create( |
| model='gpt-4', |
| messages=history_openai_format, |
| temperature=1.0, |
| stream=True |
| ) |
|
|
| partial_message = "" |
| for chunk in response: |
| if len(chunk['choices'][0]['delta']) != 0: |
| partial_message += chunk['choices'][0]['delta']['content'] |
| yield partial_message |
|
|
| |
| Interface(fn=predict, |
| inputs=["text", "list", components.File(label="Upload PDF or DOCX file")], |
| outputs="textbox", |
| live=True).launch() |
|
|
|
|