-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun_pipeline.py
247 lines (194 loc) · 8.94 KB
/
run_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
import sys
import os
import subprocess
import json
import faiss
import numpy as np
import voyageai
import pickle
from rank_bm25 import BM25Okapi
import google.generativeai as genai
from config import Config
import tiktoken
import logging
import tkinter as tk
from tkinter import scrolledtext
# Initialize clients
config = Config()
voyage_client = voyageai.Client(api_key=config.VOYAGE_API_KEY)
genai.configure(api_key=config.GEMINI_API_KEY)
# Initialize tokenizer
encoder = tiktoken.encoding_for_model("gpt-3.5-turbo")
# Define maximum tokens
MAX_TOTAL_TOKENS = 2097152 # Updated to allow for more detailed responses
MAX_RESPONSE_TOKENS = 2000000 # Increased to allow for longer answers
MAX_PROMPT_TOKENS = MAX_TOTAL_TOKENS - MAX_RESPONSE_TOKENS
# Global variables for index and texts
index = None
texts = None
bm25 = None
# Set up logging
logging.basicConfig(level=logging.INFO)
def process_pdf(file_path):
"""Process a PDF or DOCX file by extracting text, chunking, and building an index."""
print(f"Processing {file_path}...")
# Sanitize filename to replace spaces and special characters
base_name = os.path.splitext(os.path.basename(file_path))[0]
safe_base_name = base_name.replace(' ', '_').replace("'", '').replace('"', '')
extracted_text_file = os.path.join('output', f"{safe_base_name}_extracted.txt")
chunks_file = os.path.join('output', f"{safe_base_name}_chunks.json")
# Ensure the 'output' directory exists
os.makedirs('output', exist_ok=True)
# Get the directory of the current script
script_dir = os.path.dirname(os.path.abspath(__file__))
try:
# Extract text from the file
print(f"Extracting text from {file_path}...")
subprocess.run(['python', os.path.join(script_dir, 'extract_text.py'), file_path, extracted_text_file], check=True)
# Chunk and contextualize the text
print("Chunking and contextualizing the text...")
subprocess.run(['python', os.path.join(script_dir, 'chunk_and_contextualize.py'), extracted_text_file, chunks_file], check=True)
# Build the index
print("Building the index...")
subprocess.run(['python', os.path.join(script_dir, 'build_index.py'), chunks_file], check=True)
except subprocess.CalledProcessError as e:
print(f"An error occurred during processing: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
def embed_query(query):
"""Embed a query using the Voyage AI client."""
response = voyage_client.embed(
texts=[query],
model="voyage-3",
input_type="query"
)
return np.array(response.embeddings[0]).astype('float32')
def embed_texts(texts):
"""Embed a list of texts using the Voyage AI client."""
response = voyage_client.embed(
texts=texts,
model="voyage-3",
input_type="document"
)
return np.array(response.embeddings).astype('float32')
def retrieve_chunks(query, index, texts, bm25, k=20):
"""Retrieve relevant chunks for a query using both embedding and BM25 methods."""
query_vec = embed_query(query)
D, I = index.search(np.array([query_vec]), k)
embedding_results = [texts[idx] for idx in I[0]]
# BM25 retrieval
bm25_scores = bm25.get_scores(query.split(" "))
bm25_top_indices = np.argsort(bm25_scores)[::-1][:k]
bm25_results = [texts[idx] for idx in bm25_top_indices]
# Combine and deduplicate results
combined_results = list(dict.fromkeys(embedding_results + bm25_results))[:150]
# Use Voyage AI reranker
reranking = voyage_client.rerank(query, combined_results, model="rerank-v3.5")
reranked_results = [result.document for result in reranking.results]
return reranked_results
def generate_answer(query, chunks):
"""Generate an answer to the query using the provided chunks as context."""
if not chunks:
return "The answer to your question was not found in the provided documents."
# Detailed system prompt
system_prompt = "You are an extremely thoughtful and verbose assistant who can help user extremely well with complicated problems that require sophisticated reasoning. You are mathematically enlightened and rely heavily on mathematical and statistical reasoning. You think aloud generally. You use tags strictly as instructed. You do not express your own views or beliefs beyond what's strictly necessary to follow the instruction. Your life depends on strictly following the user instruction."
# Start building the context, ensuring the total tokens stay within limits
context_chunks = []
total_tokens = len(encoder.encode(system_prompt)) + len(encoder.encode(query)) + 500 # Buffer for prompt and instructions
for chunk in chunks:
chunk_tokens = len(encoder.encode(chunk))
if total_tokens + chunk_tokens > MAX_PROMPT_TOKENS:
break
context_chunks.append(chunk)
total_tokens += chunk_tokens
context = "\n\n".join(context_chunks)
user_content = f"Using the information below, answer the following question:\n\nQuestion: {query}\n\nContext:\n{context}\n\nPlease provide a detailed answer."
messages = [
{
"role": "system",
"content": system_prompt
},
{
"role": "user",
"content": user_content
}
]
try:
response = genai.GenerativeModel("models/gemini-1.5-pro").generate_content(
user_content,
generation_config=genai.types.GenerationConfig(
max_output_tokens=MAX_RESPONSE_TOKENS,
temperature=0.5 # Adjusted for more creative responses
)
)
if not response.candidates:
logging.warning(f"Prompt blocked or no response generated for query: {query}")
return "The prompt was blocked or no response was generated. Please try a different query."
answer = response.text.strip()
return answer
except Exception as e:
logging.error(f"Error generating answer: {e}")
return "An error occurred while generating the answer."
def start_chat_interface():
"""Start a Tkinter-based chat interface."""
def send_query(event=None):
query = query_entry.get()
if query.strip() == '':
return
chat_display.insert(tk.END, f"You: {query}\n", "user")
query_entry.delete(0, tk.END)
# Retrieve relevant chunks
chunks = retrieve_chunks(query, index, texts, bm25, k=20) # Reduced k from 50 to 20
# Generate answer
answer = generate_answer(query, chunks)
chat_display.insert(tk.END, f"Bot: {answer}\n", "bot")
root = tk.Tk()
root.title("Interactive Chatbot")
root.geometry("800x600")
root.configure(bg="#f5f5f5")
root.resizable(True, True)
chat_display = scrolledtext.ScrolledText(root, wrap=tk.WORD, width=60, height=20, font=("San Francisco", 14), bg="#ffffff", fg="#333333", relief=tk.FLAT)
chat_display.tag_configure("user", foreground="#007aff")
chat_display.tag_configure("bot", foreground="#34c759")
chat_display.pack(padx=20, pady=20, fill=tk.BOTH, expand=True)
query_entry = tk.Entry(root, width=50, font=("San Francisco", 14), bg="#ffffff", fg="#333333", relief=tk.FLAT)
query_entry.pack(side=tk.LEFT, padx=20, pady=20, fill=tk.X, expand=True)
query_entry.bind("<Return>", send_query)
send_button = tk.Button(root, text="Send", command=send_query, font=("San Francisco", 14, "bold"), bg="#e0e0e0", fg="#333333", relief=tk.FLAT)
send_button.pack(side=tk.LEFT, padx=20, pady=20)
root.mainloop()
def main(input_path):
global index, texts, bm25
# Get the directory of the current script
script_dir = os.path.dirname(os.path.abspath(__file__))
if os.path.isfile(input_path) and input_path.lower().endswith(('.pdf', '.docx')):
# Process single PDF or DOCX file
process_pdf(input_path)
elif os.path.isdir(input_path):
# Process all PDF and DOCX files in the directory
for root_dir, dirs, files in os.walk(input_path):
for file in files:
if file.lower().endswith(('.pdf', '.docx')):
file_path = os.path.join(root_dir, file)
process_pdf(file_path)
else:
print(f"Invalid path or no PDF or DOCX files found at: {input_path}")
sys.exit(1)
# Load indexes and texts
try:
index = faiss.read_index(os.path.join('output', 'faiss_index.index'))
with open(os.path.join('output', 'texts.json'), 'r', encoding='utf-8') as f:
texts = json.load(f)
with open(os.path.join('output', 'bm25_index.pkl'), 'rb') as f:
bm25 = pickle.load(f)
except Exception as e:
print(f"Error loading indexes and texts: {e}")
sys.exit(1)
# Start the chat interface
start_chat_interface()
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Usage: python run_pipeline.py <pdf_or_docx_path_or_directory>")
sys.exit(1)
input_path = sys.argv[1]
main(input_path)