Skip to content

Commit

Permalink
Performance: index sorting
Browse files Browse the repository at this point in the history
  • Loading branch information
alexklibisz committed Nov 30, 2023
1 parent a74973a commit 1c338cb
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 9 deletions.
3 changes: 2 additions & 1 deletion Taskfile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ tasks:
OBJC_DISABLE_INITIALIZE_FORK_SAFETY: "YES"
RESULTS_DIR: ../../docs/pages/performance/fashion-mnist
cmds:
# - venv/bin/pip install ../../client-python
- venv/bin/python run.py --dataset fashion-mnist-784-euclidean --algorithm elastiknn-l2lsh --runs 3 --count 100 --parallelism 1 --force --local
- mkdir -p $RESULTS_DIR
- venv/bin/python plot.py --dataset fashion-mnist-784-euclidean --count 100 --output $RESULTS_DIR/plot.png | venv/bin/python ../parse_results.py > $RESULTS_DIR/results.md
- base64 -b 0 -i $RESULTS_DIR/plot.png > $RESULTS_DIR/plot.b64
- base64 -w 0 -i $RESULTS_DIR/plot.png > $RESULTS_DIR/plot.b64
- cat $RESULTS_DIR/results.md

annbRunOfficialFashionMnist:
Expand Down
5 changes: 5 additions & 0 deletions client-python/elastiknn/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ def to_dict(self):
def __len__(self):
return len(self.values)

@staticmethod
def random(length: int, rng: Random = Random(time())):
values = [rng.random() for _ in range(length)]
return Vec.DenseFloat(values)

@dataclass(frozen=True)
class Indexed(Base):
index: str
Expand Down
63 changes: 59 additions & 4 deletions client-python/elastiknn/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from typing import Iterable, Tuple, Dict, Optional

import numpy as np
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

Expand All @@ -28,7 +29,38 @@ def __init__(self, es: Elasticsearch = None):
else:
self.es = es

def put_mapping(self, index: str, vec_field: str, mapping: Mapping.Base, stored_id_field: str):
def create_index(self,
index: str,
vec_field: str,
mapping: Mapping.Base,
stored_id_field: str,
sort_by_distance_field: str = None):
properties = {
vec_field: mapping.to_dict(),
stored_id_field: {
"type": "keyword",
"store": True
}
}
if sort_by_distance_field is not None:
properties[sort_by_distance_field] = {"type": "double"}
mapping = {"properties": properties}

settings = {}
if sort_by_distance_field is not None:
properties[sort_by_distance_field] = {"type": "double"}
settings["index"] = {
"sort.field": [sort_by_distance_field],
"sort.order": ["asc"]
}

self.es.indices.create(index=index, mappings=mapping, settings=settings)

def put_mapping(self,
index: str,
vec_field: str,
mapping: Mapping.Base,
stored_id_field: str):
"""
Update the mapping at the given index and field to store an Elastiknn vector.
Expand Down Expand Up @@ -57,7 +89,15 @@ def put_mapping(self, index: str, vec_field: str, mapping: Mapping.Base, stored_
}
return self.es.indices.put_mapping(properties=properties, index=index)

def index(self, index: str, vec_field: str, vecs: Iterable[Vec.Base], stored_id_field: str, ids: Iterable[str], refresh: bool = False) -> Tuple[int, List[Dict]]:
def index(self,
index: str,
vec_field: str,
vecs: Iterable[Vec.Base],
stored_id_field: str,
ids: Iterable[str],
refresh: bool = False,
sort_by_distance_field: str = None
) -> Tuple[int, List[Dict]]:
"""Index (i.e. store) the given vectors at the given index and field with the optional ids.
Parameters
Expand All @@ -74,7 +114,10 @@ def index(self, index: str, vec_field: str, vecs: Iterable[Vec.Base], stored_id_
Field containing the document ID. Uses `store: true` setting as an optimization for faster id-only queries.
refresh : bool
Whether to refresh before returning. Set to true if you want to immediately run queries after indexing.
sort_by_distance_field:
Field containing the vector's distance from the origin vector (all zeros).
Used as an optimization to co-locate vectors that are close together.
If None, we don't store this value.
Returns
-------
Int
Expand All @@ -85,7 +128,19 @@ def index(self, index: str, vec_field: str, vecs: Iterable[Vec.Base], stored_id_

def gen():
for vec, _id in zip(vecs, ids):
yield { "_op_type": "index", "_index": index, vec_field: vec.to_dict(), stored_id_field: str(_id), "_id": str(_id) }
doc = {
"_op_type": "index",
"_index": index,
vec_field: vec.to_dict(),
stored_id_field: str(_id),
"_id": str(_id)
}
if sort_by_distance_field is not None and isinstance(vec, Vec.DenseFloat):
zeros = np.zeros(len(vec.values))
npvec = np.array(vec.values)
l2dst = np.sqrt(np.sum((zeros - npvec) ** 2))
doc[sort_by_distance_field] = l2dst
yield doc

res = bulk(self.es, gen(), chunk_size=200, max_retries=9)
if refresh:
Expand Down
29 changes: 25 additions & 4 deletions client-python/elastiknn/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def __init__(self, algorithm: str, metric: str, es: Elasticsearch = None, mappin
self._logger = Logger(self.__class__.__name__)
self._vec_field = "vec"
self._stored_id_field = "id"
self._sort_by_distance_field = "vec_distance_from_origin"
self._index = index
self._mapping_params = mapping_params
self._query_params = query_params
Expand All @@ -43,15 +44,35 @@ def fit(self, X: Union[np.ndarray, csr_matrix, List[Vec.SparseBool], List[Vec.De
self._index = f"{ELASTIKNN_NAME}-{int(time())}"
self._logger.warning(f"index was not given, using {self._index} instead")

settings = {
"index": {
"number_of_shards": shards,
"number_of_replicas": 0,
"sort.field": [self._sort_by_distance_field],
"sort.order": ["asc"]
}
}
mappings = {
"properties": {
self._vec_field: mapping.to_dict(),
self._stored_id_field: {
"type": "keyword",
"store": True
},
self._sort_by_distance_field: {
"type": "double"
}
}
}

self._eknn.es.indices.delete(index=self._index, ignore_unavailable=True)
self._eknn.es.indices.create(index=self._index, settings=dict(number_of_shards=shards, elastiknn=True, number_of_replicas=0))
self._eknn.put_mapping(self._index, self._vec_field, mapping, self._stored_id_field)
self._eknn.es.indices.create(index=self._index, settings=settings, mappings=mappings)

self._logger.info(f"indexing {len(X)} vectors into index {self._index}")
ids = map(lambda i: str(i + 1), range(len(X))) # Add one because 0 is an invalid id in ES.
self._eknn.index(self._index, self._vec_field, vecs, self._stored_id_field, ids, refresh=True)
self._eknn.index(self._index, self._vec_field, vecs, self._stored_id_field, ids, refresh=True,
sort_by_distance_field=self._sort_by_distance_field)
self._eknn.es.indices.forcemerge(index=self._index, max_num_segments=1)
self._eknn.index(self._index, self._vec_field, [], self._stored_id_field, [], refresh=True)


def set_query_params(self, query_params: dict = None):
Expand Down
31 changes: 31 additions & 0 deletions client-python/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,34 @@ def test_exact_jaccard(self):
hits = res['hits']['hits']
assert len(hits) == 10
assert hits[0]["_id"] == ids[0]

def test_sorting(self):
eknn = ElastiknnClient()
n, dim = 1200, 42
index = "py-test-sorting"
vec_field = "vec"
id_field = "id"
sort_by_distance_field = "vec_distance_from_origin"
mapping = Mapping.DenseFloat(dims=dim)

eknn.es.indices.delete(index=index, ignore_unavailable=True)
eknn.es.indices.refresh()
eknn.create_index(index, vec_field, mapping, id_field, sort_by_distance_field)
eknn.es.indices.refresh()

vecs = [Vec.DenseFloat.random(dim) for _ in range(n)]
ids = [f"vec-{i}" for i in range(n)]
(n_, errors) = eknn.index(index, vec_field, vecs, id_field, ids, refresh=True,
sort_by_distance_field=sort_by_distance_field)
assert n_ == n
assert len(errors) == 0

settings = eknn.es.indices.get_settings(index=index)
sort_settings = settings.body[index]["settings"]["index"]["sort"]
assert sort_settings["field"] == [sort_by_distance_field]
assert sort_settings["order"] == ["asc"]

results = eknn.es.search(index=index)
for hit in results.body["hits"]["hits"]:
src = hit["_source"]
assert src[sort_by_distance_field] > 0, src

0 comments on commit 1c338cb

Please sign in to comment.