|
|
import os |
|
|
import time |
|
|
import gradio as gr |
|
|
from openai import OpenAI |
|
|
import PyPDF2 |
|
|
import chromadb |
|
|
from chromadb.utils import embedding_functions |
|
|
|
|
|
from theme import CustomTheme |
|
|
|
|
|
|
|
|
CONTEXT_SIZE = 19 |
|
|
CHUNK_SIZE = 400 |
|
|
CHUNK_OVERLAP = 100 |
|
|
|
|
|
|
|
|
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY")) |
|
|
|
|
|
|
|
|
chroma_client = chromadb.PersistentClient(path="./chroma_db") |
|
|
openai_ef = embedding_functions.OpenAIEmbeddingFunction( |
|
|
api_key=os.environ.get("OPENAI_API_KEY"), |
|
|
model_name="text-embedding-3-small" |
|
|
) |
|
|
|
|
|
|
|
|
def chunk_text(text, chunk_size=CHUNK_SIZE, overlap=CHUNK_OVERLAP): |
|
|
"""Teilt Text in überlappende Chunks""" |
|
|
chunks = [] |
|
|
start = 0 |
|
|
while start < len(text): |
|
|
end = start + chunk_size |
|
|
chunk = text[start:end] |
|
|
if chunk.strip(): |
|
|
chunks.append(chunk) |
|
|
start = end - overlap |
|
|
return chunks |
|
|
|
|
|
|
|
|
def load_documents_to_vectordb(path="./Database/"): |
|
|
print("Pfad zum Dokumentenordner:", path) |
|
|
"""Lädt Dokumente und speichert sie in ChromaDB""" |
|
|
|
|
|
|
|
|
try: |
|
|
chroma_client.delete_collection(name="documents") |
|
|
except: |
|
|
pass |
|
|
|
|
|
collection = chroma_client.create_collection( |
|
|
name="documents", |
|
|
embedding_function=openai_ef |
|
|
) |
|
|
|
|
|
doc_id = 0 |
|
|
|
|
|
for filename in os.listdir(path): |
|
|
filepath = os.path.join(path, filename) |
|
|
|
|
|
if os.path.isdir(filepath): |
|
|
continue |
|
|
|
|
|
print(f"Verarbeite: {filename}") |
|
|
|
|
|
try: |
|
|
text = "" |
|
|
|
|
|
|
|
|
if filename.endswith('.pdf'): |
|
|
with open(filepath, 'rb') as f: |
|
|
pdf_reader = PyPDF2.PdfReader(f) |
|
|
|
|
|
for page_num, page in enumerate(pdf_reader.pages, 1): |
|
|
page_text = page.extract_text() |
|
|
if page_text.strip(): |
|
|
|
|
|
page_chunks = chunk_text(page_text) |
|
|
|
|
|
for i, chunk in enumerate(page_chunks): |
|
|
collection.add( |
|
|
documents=[chunk], |
|
|
metadatas=[{ |
|
|
"filename": filename, |
|
|
"page": page_num, |
|
|
"chunk_id": i, |
|
|
"total_chunks": len(page_chunks), |
|
|
"source_type": "pdf" |
|
|
}], |
|
|
ids=[f"doc_{doc_id}"] |
|
|
) |
|
|
doc_id += 1 |
|
|
|
|
|
print(f" → PDF mit {len(pdf_reader.pages)} Seiten verarbeitet") |
|
|
|
|
|
|
|
|
elif filename.endswith(('.txt', '.md')): |
|
|
with open(filepath, 'r', encoding='utf-8') as f: |
|
|
text = f.read() |
|
|
|
|
|
if text.strip(): |
|
|
chunks = chunk_text(text) |
|
|
|
|
|
for i, chunk in enumerate(chunks): |
|
|
collection.add( |
|
|
documents=[chunk], |
|
|
metadatas=[{ |
|
|
"filename": filename, |
|
|
"page": None, |
|
|
"chunk_id": i, |
|
|
"total_chunks": len(chunks), |
|
|
"source_type": "text" |
|
|
}], |
|
|
ids=[f"doc_{doc_id}"] |
|
|
) |
|
|
doc_id += 1 |
|
|
|
|
|
print(f" → {len(chunks)} Chunks erstellt") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Fehler beim Laden von {filename}: {e}") |
|
|
|
|
|
print(f"\nGesamt: {doc_id} Chunks in VectorDB gespeichert") |
|
|
return collection |
|
|
|
|
|
|
|
|
def get_relevant_context(query, collection, n_results=CONTEXT_SIZE): |
|
|
"""Sucht die relevantesten Dokument-Chunks für eine Anfrage""" |
|
|
results = collection.query( |
|
|
query_texts=[query], |
|
|
n_results=n_results |
|
|
) |
|
|
|
|
|
context = "" |
|
|
if results['documents']: |
|
|
for i, (doc, metadata) in enumerate(zip(results['documents'][0], results['metadatas'][0])): |
|
|
|
|
|
if metadata.get('page'): |
|
|
source_info = f"{metadata['filename']}, Seite {metadata['page']}" |
|
|
else: |
|
|
source_info = f"{metadata['filename']}" |
|
|
|
|
|
context += f"\n--- Quelle {i+1}: {source_info} (Chunk {metadata['chunk_id']+1}/{metadata['total_chunks']}) ---\n" |
|
|
context += doc + "\n" |
|
|
|
|
|
return context |
|
|
|
|
|
|
|
|
|
|
|
print("Initialisiere VectorDB...") |
|
|
try: |
|
|
|
|
|
collection = chroma_client.get_collection( |
|
|
name="documents", |
|
|
embedding_function=openai_ef |
|
|
) |
|
|
print(f"VectorDB geladen mit {collection.count()} Chunks") |
|
|
except: |
|
|
|
|
|
print("Erstelle neue VectorDB und lade Dokumente...") |
|
|
collection = load_documents_to_vectordb() |
|
|
|
|
|
|
|
|
def response(message, history): |
|
|
"""Generiert eine Antwort mit OpenAI Streaming und RAG""" |
|
|
|
|
|
|
|
|
context = get_relevant_context(message, collection, n_results=CONTEXT_SIZE) |
|
|
|
|
|
system_prompt = f"""Task: You are the Fair Travel Food Guide chatbot. You recommend restaurants and cafés from a small curated dataset in a lightly quirky, friendly, relaxed travel-buddy tone—locker, warm und natürlich, ohne übertrieben zu wirken. Kleine spontane Reaktionen wie “Oh nice!” oder “Klingt nach euch!” sind erlaubt, aber nichts Cringiges. Your pronouns mirror the user: “ich” → “du”, “wir” → “ihr”. |
|
|
Please repeat the prompt back as you understand it. |
|
|
|
|
|
Specifics: |
|
|
1. Recommend only cafés and restaurants from the dataset—no invented locations. |
|
|
2. Use a relaxed, playful tone with subtle charm and small natural reactions. |
|
|
3. Add short, friendly hints why each place fits fair-travel values. |
|
|
4. If nothing fits, say it offen und locker and suggest the closest alternatives. |
|
|
|
|
|
KONTEXT: |
|
|
--------------------- |
|
|
{context} |
|
|
--------------------- |
|
|
|
|
|
Beantworte die Frage basierend NUR auf dem bereitgestellten Kontext (z. B. PDF, CSV oder Textinformationen über Lokale). |
|
|
Wenn mehrere Quellen Informationen liefern, fasse sie zusammen. |
|
|
Strukturiere längere Antworten mit Aufzählungen für bessere Lesbarkeit. |
|
|
Wenn die Informationen im Kontext nicht vorhanden sind, sage das klar und stelle ggf. Rückfragen an die Nutzer:innen. Sage ihnen auch wonach sie fragen können, um bessere Antworten zu erhalten. |
|
|
|
|
|
SPRACHE: Deutsch, freundlich, klar und verständlich. |
|
|
|
|
|
Beantworte die folgende Frage: |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
messages = [{"role": "system", "content": system_prompt}] |
|
|
|
|
|
for msg in history: |
|
|
if msg["role"] == "user": |
|
|
messages.append({"role": "user", "content": msg["content"]}) |
|
|
elif msg["role"] == "assistant": |
|
|
messages.append({"role": "assistant", "content": msg["content"]}) |
|
|
|
|
|
messages.append({"role": "user", "content": message}) |
|
|
|
|
|
|
|
|
stream = client.chat.completions.create( |
|
|
model="gpt-4o-mini", |
|
|
messages=messages, |
|
|
temperature=0.1, |
|
|
stream=True, |
|
|
stream_options={"include_usage": True} |
|
|
) |
|
|
|
|
|
answer = "" |
|
|
usage_info = None |
|
|
|
|
|
for chunk in stream: |
|
|
|
|
|
if hasattr(chunk, 'usage') and chunk.usage: |
|
|
usage_info = chunk.usage |
|
|
|
|
|
|
|
|
if chunk.choices and len(chunk.choices) > 0 and chunk.choices[0].delta.content: |
|
|
text = chunk.choices[0].delta.content |
|
|
time.sleep(0.05) |
|
|
answer += text |
|
|
yield answer |
|
|
|
|
|
|
|
|
if usage_info: |
|
|
|
|
|
token_info = f"" |
|
|
print(f"\n📊 Token-Usage für diese Anfrage:") |
|
|
print(f" Input Tokens: {usage_info.prompt_tokens}") |
|
|
print(f" Output Tokens: {usage_info.completion_tokens}") |
|
|
print(f" Total Tokens: {usage_info.total_tokens}") |
|
|
answer += token_info |
|
|
yield answer |
|
|
|
|
|
|
|
|
theme = CustomTheme() |
|
|
|
|
|
def main(): |
|
|
chatbot = gr.Chatbot( |
|
|
value=[{"role": "assistant", "content": "Wie kann ich dir helfen?"}], |
|
|
type="messages", |
|
|
show_label=False, |
|
|
avatar_images=("./avatar_images/human.svg", "./avatar_images/robot.svg"), |
|
|
elem_id="CHATBOT" |
|
|
) |
|
|
|
|
|
chatinterface = gr.ChatInterface( |
|
|
fn=response, |
|
|
chatbot=chatbot, |
|
|
type="messages", |
|
|
theme=theme, |
|
|
css_paths="./style.css" |
|
|
) |
|
|
|
|
|
chatinterface.launch(inbrowser=True) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |