28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422 | class Embedder:
"""Creating embeddings for documents. Optionally store to a vectorstore.
Potential functions to override if implementing a custom Embedder class:
- `set_embedder()`: the logic for how the embedder is initialized.
- `set_vectorstore()`: the logic for how the vectorstore is initialized.
- `embed_docs()`: the logic for how documents are embedded.
- `insert_embeddings()`: the logic for how embeddings are inserted into the
vectorstore.
"""
ALLOWED_EMBEDDERS = {"HuggingFace", "OpenAI", "custom"}
ALLOWED_VECTORSTORES = {None, "FAISS", "custom"}
def __init__(
self,
documents_dir=None,
embedder="HuggingFace",
embedders_config: dict = DEFAULT_EMBEDDERS_CONFIG,
vectorstore: Optional[VectorStore] = "FAISS",
vectorstore_config: dict = DEFAULT_VECTORSTORES_CONFIG,
) -> None:
"""
Initializes an Embedder instance with specified configuration for
embedding and vector storage.
Args:
documents_dir (str, optional): Directory containing the documents
to embed.
embedder (str): Type of embedder to use, options include
'HuggingFace', 'OpenAI', or 'custom'.
embedders_config (dict): Configuration settings for the embedder.
vectorstore (Optional[VectorStore]): Type of vector store to use,
options include 'FAISS', 'custom', or None.
vectorstore_config (dict): Configuration settings for the vector
store.
Raises:
ValueError: If the specified embedder or vectorstore is not valid.
"""
self.documents_dir = documents_dir
if embedder not in self.ALLOWED_EMBEDDERS:
raise ValueError(
f"{embedder} is not a valid embedder."
f" Choose from: {self.ALLOWED_EMBEDDERS}"
)
if vectorstore not in self.ALLOWED_VECTORSTORES:
raise ValueError(
f"{vectorstore} is not a valid vectorstore."
f" Choose from: {self.ALLOWED_VECTORSTORES}"
)
self.embedder_name: str = embedder
self.embedders_config = embedders_config
self.set_embedder(embedder, embedders_config)
if vectorstore is not None:
self.set_vectorstore(vectorstore, vectorstore_config)
def embed_and_insert_dataset(
self,
input_dir: str,
detailed_progress: bool = False,
num_workers: Optional[int] = None,
embed_batch_size: int = 1000,
chunk_batch_size: int = 200,
) -> None:
"""
Processes, embeds, and writes documents from the specified directory
to the vectorstore in batches.
Args:
input_dir (str): Directory containing documents to embed.
detailed_progress (bool): Whether to show detailed progress during
embedding.
num_workers (int, optional): Number of worker processes to use;
defaults to the instance's configuration if not provided.
embed_batch_size (int): Batch size in terms of documents with which
to embed documents.
chunk_batch_size (int): Number of files to process in each batch.
Note:
Uses multiprocessing to enhance performance.
"""
num_files = None
if detailed_progress:
num_files = len(list(get_files_from_dir(input_dir)))
with tqdm(
total=num_files, desc="Embedding files", unit=" files", smoothing=0
) as pbar:
while True:
file_chunk = list(
islice(get_files_from_dir(input_dir), chunk_batch_size)
)
if not file_chunk:
break
self.embed_and_insert_files(file_chunk, embed_batch_size)
pbar.update(len(file_chunk))
def embed_and_insert_files(
self, file_paths: List[str], embed_batch_size: int
) -> Tuple[List[str], List[EnhancedDocument], List[List[float]]]:
"""
Embeds documents from specified file paths and inserts them into the
vector store.
Args:
file_paths (List[str]): File paths to embed and insert.
embed_batch_size (int): Batch size in terms of documents with which
to embed documents.
Returns:
tuple: A tuple containing lists of ids, docs, and their embeddings.
"""
self._verify_vectorstore_instance()
all_docs = []
all_embeddings = []
for file in file_paths:
curr_docs, curr_embeddings = self.embed_files(
file_paths, embed_batch_size
)
# NOTE(STP): We're not calling self.embed_and_insert_docs() here
# in order to allow us to batch embed multiple files.
all_docs.extend(curr_docs)
all_embeddings.extend(curr_embeddings)
docs, ids, embeddings = self.insert_embeddings(
all_docs, all_embeddings
)
return ids, docs, embeddings
def embed_files(
self, file_paths: List[str], embed_batch_size: int
) -> Tuple[EnhancedDocument, List[List[float]]]:
"""
Embeds a batch of files specified by their paths.
Args:
file_paths (List[str]): List of file paths to embed.
embed_batch_size (int): Batch size in terms of documents with which
to embed documents.
Returns:
tuple: A tuple containing lists of ids, docs, and their embeddings.
"""
# NOTE(STP): We allow passing multiple files to take advantage of
# batching benefits.
logging.debug("Embedding files: %s", file_paths)
docs = []
for file_path in file_paths:
docs.extend(load_docs_from_jsonl(file_path))
embeddings = self.embed_docs(docs, embed_batch_size)
logging.debug("Embedded files: %s", file_paths)
return docs, embeddings
def embed_and_insert_docs(
self, docs: List[EnhancedDocument], embed_batch_size: int
) -> Tuple[List[str], List[EnhancedDocument], List[List[float]]]:
"""
Embeds documents and inserts their embeddings into the vectorstore,
then returns the IDs, documents, and embeddings.
Args:
docs (List[EnhancedDocument]): Documents to embed and insert.
embed_batch_size (int): Batch size in terms of documents with which
to embed documents.
Returns:
tuple: A tuple containing lists of ids, docs, and their embeddings.
Raises:
ValueError: If the vectorstore instance is not set.
"""
self._verify_vectorstore_instance()
embeddings = self.embed_docs(docs, embed_batch_size)
ids, docs, embeddings = self.insert_embeddings(docs, embeddings)
return ids, docs, embeddings
def embed_docs(
self, docs: List[EnhancedDocument], embed_batch_size: int
) -> List[List[float]]:
"""
Generates embeddings for a list of documents.
Args:
docs (List[EnhancedDocument]): Documents to embed.
embed_batch_size (int): Batch size in terms of documents with which
to embed documents.
Returns:
List[List[float]]: List of embeddings for each document.
"""
# NOTE(STP): This ignores metadata. If we want to include metadata in
# the embedding, we would need to combine it with the page content
# and stringify it in some manner.
# TODO(STP): We might want to batch embed documents here if the number
# of documents exceed a certain threshold. Would need to look more into
# if and when that would be useful.
logging.debug("Embedding %d docs", len(docs))
page_contents = [doc.page_content for doc in docs]
# Loop through the docs in steps of chunk_batch_size
all_embeddings = []
for i in range(0, len(page_contents), embed_batch_size):
end_idx = i + embed_batch_size
batch = page_contents[i:end_idx]
embeddings = self.embedder.embed_documents(batch)
all_embeddings.extend(embeddings)
logging.debug("Embedded %d/%d docs", end_idx, len(page_contents))
logging.debug("Embedded all %d docs", len(page_contents))
return all_embeddings
def insert_embeddings(
self, docs: List[EnhancedDocument], embeddings: List[List[float]]
) -> Tuple[List[str], List[EnhancedDocument], List[List[float]]]:
"""
Inserts the embeddings of the provided documents into the vectorstore
and ensures all documents are unique based on their content hash.
Args:
docs (List[EnhancedDocument]): Documents whose embeddings are to be
inserted.
embeddings (List[List[float]]): Embeddings corresponding to the
documents.
Returns:
tuple: A tuple containing lists of ids, docs, and their embeddings.
Raises:
ValueError: If the vectorstore instance is not set.
"""
self._verify_vectorstore_instance()
logging.debug("Saving %d embedded docs to vectorstore docs", len(docs))
ids = [doc.document_hash for doc in docs]
if len(ids) != len(set(ids)):
# TODO(STP): Improve space efficiency here.
unique_docs = []
unique_embeddings = []
unique_ids = []
seen = set()
for i, curr_id in enumerate(ids):
if curr_id in seen:
logging.debug(
"Found multiple documents from %s with the "
" same content hash. '%s...'",
docs[i].metadata["source"],
docs[i].page_content[:30],
)
else:
unique_ids.append(curr_id)
unique_docs.append(docs[i])
unique_embeddings.append(embeddings[i])
seen.add(curr_id)
docs = unique_docs
ids = unique_ids
embeddings = unique_embeddings
texts = [doc.page_content for doc in docs]
text_embeddings = zip(texts, embeddings)
metadatas = [doc.metadata for doc in docs]
self.vectorstore_instance.add_embeddings(
text_embeddings=text_embeddings, ids=ids, metadatas=metadatas
)
logging.debug("Saved %d embedded docs to vectorstore docs", len(docs))
return ids, docs, embeddings
def save_vectorstore_if_applicable(self) -> None:
"""
Saves the current state of the vector store locally if specified by the
config.
"""
if self.vectorstore_name == "FAISS":
save_local_config = self.vectorstore_config["FAISS"][
"save_local_config"
]
if save_local_config["save_local"]:
self.vectorstore_instance.save_local(
save_local_config["folder_path"],
save_local_config["index_name"],
)
num_records = len(
self.vectorstore_instance.index_to_docstore_id
)
logging.info(
"\nSuccessfully saved vectorstore of length %d to: %s",
num_records,
save_local_config["folder_path"],
)
def set_embedder(self, name: str, config: Dict) -> Embeddings:
"""
Configures and initializes the embedder based on specified name and
configuration.
Args:
name (str): Name of the embedder to configure.
config (dict): Configuration dictionary for the embedder.
Raises:
NotImplementedError: If a 'custom' embedder is specified but
not implemented.
ValueError: If embedder name is not recognized or none provided
when required.
ValueError: If embedder instance is set to None.
"""
if name == "custom":
error_message = """
"If using custom embedder, the Embedder.set_embedder() method
must be overridden.
"""
raise NotImplementedError(error_message)
embedder_config = config[name]
embedder_instance = None
if name == "OpenAI":
embedder_instance = OpenAIEmbeddings(**embedder_config)
elif name == "HuggingFace":
embedder_instance = HuggingFaceEmbeddings(**embedder_config)
else:
raise ValueError("Embedding not recognized: %s", name)
if embedder_instance is None:
raise ValueError("Embedder instance cannot be set to None.")
self.embedder = embedder_instance
def set_vectorstore(self, name: str, config: Dict):
"""
Configures and initializes the vector store based on specified name and
configuration.
Args:
name (str): Name of the vector store to configure.
config (dict): Configuration dictionary for the vector store.
Raises:
NotImplementedError: If a 'custom' vector store is specified but
not implemented.
ValueError: If vector store name is not recognized or none provided
when required.
"""
assert name is not None and name in self.ALLOWED_VECTORSTORES
if name == "custom":
error_message = """
"If using custom vectorstore, the Embedder.set_vectorstore() method
must be overridden.
"""
raise NotImplementedError(error_message)
self.vectorstore_name = name
self.vectorstore_config = config
config = config[name]
if name == "FAISS":
if config["load_local"]:
load_local_config = config["load_local_args"]
load_local_config["embeddings"] = self.embedder
vectorstore_instance = FAISS.load_local(**load_local_config)
num_documents = len(vectorstore_instance.index_to_docstore_id)
logging.debug(
"Total number of documents loaded from saved FAISS "
"vectorstore: %d",
num_documents,
)
else:
config = config["init_args"]
config["embedding_function"] = self.embedder
config["index"] = faiss.IndexFlatL2(
self.embedder.client.get_sentence_embedding_dimension()
)
config["docstore"] = InMemoryDocstore()
config["index_to_docstore_id"] = {}
vectorstore_instance = FAISS(**config)
self.vectorstore_instance = vectorstore_instance
def _verify_vectorstore_instance(self) -> None:
"""
Verifies that the vectorstore instance is properly set up.
Raises:
ValueError: If the vectorstore instance is not set when required.
"""
if self.vectorstore_instance is None:
raise ValueError(
"Vectorstore must be set when saving document embeddings."
)
|