Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…arch into jparismorgan/avx2-support
  • Loading branch information
jparismorgan committed Oct 15, 2024
2 parents 93e0518 + 36978a6 commit f4c55b1
Show file tree
Hide file tree
Showing 366 changed files with 3,406 additions and 1,218 deletions.
17 changes: 15 additions & 2 deletions _quarto.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,27 @@ quartodoc:
package: tiledb.vector_search
dir: "documentation/reference"
sections:
- title: "tiledb.vector_search"
- title: "Vector API"
desc: ""
contents:
- open
- ingestion
- index.Index
- subtitle: "Algorithms"
desc: ""
contents:
- flat_index
- ivf_flat_index
- vamana_index
- ingestion
- ivf_pq_index
- title: "Object API"
desc: ""
contents:
- object_api.create
- object_api.ObjectIndex
- embeddings.ObjectEmbedding
- object_readers.ObjectReader
- object_readers.ObjectPartition

website:
favicon: "documentation/assets/tiledb.ico"
Expand Down
279 changes: 279 additions & 0 deletions apis/python/examples/object_api/multi_modal_pdf_search.ipynb

Large diffs are not rendered by default.

104 changes: 104 additions & 0 deletions apis/python/src/tiledb/vector_search/embeddings/colpali_embedding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from typing import Dict, OrderedDict, Tuple

import numpy as np

from tiledb.vector_search.embeddings import ObjectEmbedding

EMBED_DIM = 128


class ColpaliEmbedding(ObjectEmbedding):
def __init__(
self,
model_name: str = "vidore/colpali-v1.2",
device: str = None,
batch_size: int = 4,
):
self.model_name = model_name
self.device = device
self.batch_size = batch_size
self.model = None
self.processor = None

def init_kwargs(self) -> Dict:
return {
"model_name": self.model_name,
"device": self.device,
"batch_size": self.batch_size,
}

def dimensions(self) -> int:
return EMBED_DIM

def vector_type(self) -> np.dtype:
return np.float32

def load(self) -> None:
import torch
from colpali_engine.models import ColPali
from colpali_engine.models import ColPaliProcessor

if self.device is None:
if torch.cuda.is_available() and torch.cuda.device_count() > 0:
self.device = "cuda"
elif torch.backends.mps.is_available():
self.device = "mps"
else:
self.device = "cpu"

# Load model
self.model = ColPali.from_pretrained(
self.model_name, torch_dtype=torch.bfloat16, device_map=self.device
).eval()
self.processor = ColPaliProcessor.from_pretrained(self.model_name)

def embed(
self, objects: OrderedDict, metadata: OrderedDict
) -> Tuple[np.ndarray, np.array]:
import torch
from PIL import Image
from torch.utils.data import DataLoader
from tqdm import tqdm

if "image" in objects:
images = []
for i in range(len(objects["image"])):
images.append(
Image.fromarray(
np.reshape(objects["image"][i], objects["shape"][i])
)
)
dataloader = DataLoader(
images,
batch_size=self.batch_size,
shuffle=False,
collate_fn=lambda x: self.processor.process_images(x),
)
elif "text" in objects:
dataloader = DataLoader(
objects["text"],
batch_size=self.batch_size,
shuffle=False,
collate_fn=lambda x: self.processor.process_queries(x),
)

embeddings = None
external_ids = None
id = 0
for batch in tqdm(dataloader):
with torch.no_grad():
batch = {k: v.to(self.model.device) for k, v in batch.items()}
batch_embeddings = list(torch.unbind(self.model(**batch).to("cpu")))
for object_embeddings in batch_embeddings:
object_embeddings_np = object_embeddings.to(torch.float32).cpu().numpy()
ext_ids = metadata["external_id"][id] * np.ones(
object_embeddings_np.shape[0], dtype=np.uint64
)
if embeddings is None:
external_ids = ext_ids
embeddings = object_embeddings_np
else:
external_ids = np.concatenate((external_ids, ext_ids))
embeddings = np.vstack((embeddings, object_embeddings_np))
id += 1
return (embeddings, external_ids)
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import ABC
from abc import abstractmethod
from typing import Dict, OrderedDict
from typing import Dict, OrderedDict, Tuple, Union

import numpy as np

Expand Down Expand Up @@ -43,10 +43,12 @@ def load(self) -> None:
raise NotImplementedError

@abstractmethod
def embed(self, objects: OrderedDict, metadata: OrderedDict) -> np.ndarray:
def embed(
self, objects: OrderedDict, metadata: OrderedDict
) -> Union[np.ndarray, Tuple[np.ndarray, np.array]]:
"""
Creates embedding vectors for objects. Returns a numpy array of embedding vectors.
There is no enforced restriction on the object format. ObjectReaders and ObjectEmbeddings should use comatible object and metadata formats.
There is no enforced restriction on the object format. ObjectReaders and ObjectEmbeddings should use compatible object and metadata formats.
Parameters
----------
Expand Down
123 changes: 102 additions & 21 deletions apis/python/src/tiledb/vector_search/flat_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
Stores all vectors in a 2D TileDB array performing exhaustive similarity
search between the query vectors and all the dataset vectors.
"""
from typing import Any, Mapping
from threading import Thread
from typing import Any, Mapping, Sequence

import numpy as np

Expand All @@ -16,7 +17,7 @@
from tiledb.vector_search.utils import MAX_FLOAT32
from tiledb.vector_search.utils import MAX_INT32
from tiledb.vector_search.utils import MAX_UINT64
from tiledb.vector_search.utils import add_to_group
from tiledb.vector_search.utils import create_array_and_add_to_group

TILE_SIZE_BYTES = 128000000 # 128MB
INDEX_TYPE = "FLAT"
Expand Down Expand Up @@ -145,15 +146,41 @@ def query_internal(

return np.transpose(np.array(d)), np.transpose(np.array(i))

def vacuum(self):
"""
The vacuuming process permanently deletes index files that are consolidated through the consolidation
process. TileDB separates consolidation from vacuuming, in order to make consolidation process-safe
in the presence of concurrent reads and writes.
Note:
1. Vacuuming is not process-safe and you should take extra care when invoking it.
2. Vacuuming may affect the granularity of the time traveling functionality.
The Flat class vacuums consolidated fragment, array metadata and commits for the `db`
and `ids` arrays.
"""
super().vacuum()
if not self.uri.startswith("tiledb://"):
modes = ["fragment_meta", "commits", "array_meta"]
for mode in modes:
conf = tiledb.Config(self.config)
conf["sm.consolidation.mode"] = mode
conf["sm.vacuum.mode"] = mode
tiledb.vacuum(self.db_uri, config=conf)
tiledb.vacuum(self.ids_uri, config=conf)


def create(
uri: str,
dimensions: int,
vector_type: np.dtype,
group_exists: bool = False,
group: tiledb.Group = None,
config: Optional[Mapping[str, Any]] = None,
storage_version: str = STORAGE_VERSION,
distance_metric: vspy.DistanceMetric = vspy.DistanceMetric.SUM_OF_SQUARES,
asset_creation_threads: Sequence[Thread] = None,
**kwargs,
) -> FlatIndex:
"""
Expand All @@ -179,21 +206,42 @@ def create(
distance_metric: vspy.DistanceMetric
Distance metric to use for the index.
If not provided, use L2 distance.
group: tiledb.Group
TileDB group open in write mode.
Internal, this is used to avoid opening the group multiple times during
ingestion.
asset_creation_threads: Sequence[Thread]
List of asset creation threads to append new threads.
Internal, this is used to parallelize all asset creation during
ingestion.
"""
validate_storage_version(storage_version)

index.create_metadata(
uri=uri,
dimensions=dimensions,
vector_type=vector_type,
index_type=INDEX_TYPE,
storage_version=storage_version,
distance_metric=distance_metric,
group_exists=group_exists,
config=config,
)
with tiledb.scope_ctx(ctx_or_config=config):
group = tiledb.Group(uri, "w")
if not group_exists:
try:
tiledb.group_create(uri)
except tiledb.TileDBError as err:
raise err
if group is None:
grp = tiledb.Group(uri, "w")
else:
grp = group

if asset_creation_threads is not None:
threads = asset_creation_threads
else:
threads = []

index.create_metadata(
group=grp,
dimensions=dimensions,
vector_type=vector_type,
index_type=INDEX_TYPE,
storage_version=storage_version,
distance_metric=distance_metric,
)

tile_size = TILE_SIZE_BYTES / np.dtype(vector_type).itemsize / dimensions
ids_array_name = storage_formats[storage_version]["IDS_ARRAY_NAME"]
parts_array_name = storage_formats[storage_version]["PARTS_ARRAY_NAME"]
Expand Down Expand Up @@ -221,8 +269,17 @@ def create(
cell_order="col-major",
tile_order="col-major",
)
tiledb.Array.create(ids_uri, ids_schema)
add_to_group(group, ids_uri, ids_array_name)
thread = Thread(
target=create_array_and_add_to_group,
kwargs={
"array_uri": ids_uri,
"array_name": ids_array_name,
"group": grp,
"schema": ids_schema,
},
)
thread.start()
threads.append(thread)

parts_array_rows_dim = tiledb.Dim(
name="rows",
Expand All @@ -249,8 +306,17 @@ def create(
cell_order="col-major",
tile_order="col-major",
)
tiledb.Array.create(parts_uri, parts_schema)
add_to_group(group, parts_uri, parts_array_name)
thread = Thread(
target=create_array_and_add_to_group,
kwargs={
"array_uri": parts_uri,
"array_name": parts_array_name,
"group": grp,
"schema": parts_schema,
},
)
thread.start()
threads.append(thread)

external_id_dim = tiledb.Dim(
name="external_id",
Expand All @@ -265,8 +331,23 @@ def create(
attrs=[vector_attr],
allows_duplicates=False,
)
tiledb.Array.create(updates_array_uri, updates_schema)
add_to_group(group, updates_array_uri, updates_array_name)
thread = Thread(
target=create_array_and_add_to_group,
kwargs={
"array_uri": updates_array_uri,
"array_name": updates_array_name,
"group": grp,
"schema": updates_schema,
},
)
thread.start()
threads.append(thread)

group.close()
return FlatIndex(uri=uri, config=config)
if asset_creation_threads is None:
for thread in threads:
thread.join()
if group is None:
grp.close()
return FlatIndex(uri=uri, config=config)
else:
return None
Loading

0 comments on commit f4c55b1

Please sign in to comment.