Skip to content

Commit

Permalink
More memory efficient implementation of ElastiknnModel in python clie…
Browse files Browse the repository at this point in the history
…nt (#153)

This uses iterators (iterables ?) to produce the elastiknn vectors lazily instead of keeping them all in memory.
Seems to fix some memory issues I was having with the ann-benchmarks project, where the containers would spike over 10GB of memory and crash.
  • Loading branch information
alexklibisz authored Sep 14, 2020
1 parent da6ee6a commit 134017b
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 12 deletions.
2 changes: 2 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
- More memory-efficient implementation of Python ElastiknnModel.fit method. Uses an iterator over the vectors instead of a list of the vectors.
---
- Renamed parameter `r` in L2Lsh mapping to `w`, which is more appropriate and common for "width".
- Updates and fixes in the Python client based on usage for ann-benchmarks. Mainly adding/fixing data classes in `elastiknn.api`.
---
Expand Down
9 changes: 3 additions & 6 deletions client-python/elastiknn/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def put_mapping(self, index: str, vec_field: str, mapping: Mapping.Base, stored_
}
return self.es.transport.perform_request("PUT", f"/{index}/_mapping", body=body)

def index(self, index: str, vec_field: str, vecs: Iterable[Vec.Base], stored_id_field: str, ids: List[str], refresh: bool = False) -> Tuple[int, List[Dict]]:
def index(self, index: str, vec_field: str, vecs: Iterable[Vec.Base], stored_id_field: str, ids: Iterable[str], refresh: bool = False) -> Tuple[int, List[Dict]]:
"""Index (i.e. store) the given vectors at the given index and field with the optional ids.
Parameters
Expand All @@ -70,8 +70,8 @@ def index(self, index: str, vec_field: str, vecs: Iterable[Vec.Base], stored_id_
Field containing the vector in each document.
vecs : List of `Vec.Base`
Vectors that should be indexed.
ids : List of strings
List of ids associated with the given vectors. Must have same length as vecs.
ids : Iterable of strings
Ids associated with the given vectors. Should have same length as vecs.
stored_id_field:
Field containing the document ID. Uses `store: true` setting as an optimization for faster id-only queries.
refresh : bool
Expand All @@ -85,9 +85,6 @@ def index(self, index: str, vec_field: str, vecs: Iterable[Vec.Base], stored_id_
Error responses for the vectors that were not indexed.
"""

assert len(vecs) == len(ids), \
f"List of vecs has length [{len(vecs)}], list of ids has length [{len(ids)}]. The two should be equal."

def gen():
for vec, _id in zip(vecs, ids):
yield { "_op_type": "index", "_index": index, vec_field: vec.to_dict(), stored_id_field: str(_id), "_id": str(_id) }
Expand Down
11 changes: 7 additions & 4 deletions client-python/elastiknn/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import json
import itertools

from concurrent.futures import wait
from concurrent.futures.thread import ThreadPoolExecutor
from elasticsearch import Elasticsearch
Expand Down Expand Up @@ -36,8 +38,9 @@ def __init__(self, algorithm: str, metric: str, es: Elasticsearch = None, mappin
self._dims, self._query = None, None

def fit(self, X: Union[np.ndarray, csr_matrix, List[Vec.SparseBool], List[Vec.DenseFloat]], shards: int = 1):
vecs = list(canonical_vectors_to_elastiknn(X))
self._dims = len(vecs[0])
self._dims = len(X[0])
vecs = canonical_vectors_to_elastiknn(X)

mapping, self._query = self._mk_mapping_query(self._query_params)

if self._index is None:
Expand All @@ -49,8 +52,8 @@ def fit(self, X: Union[np.ndarray, csr_matrix, List[Vec.SparseBool], List[Vec.De
self._eknn.es.indices.create(self._index, body=json.dumps(body))
self._eknn.put_mapping(self._index, self._vec_field, mapping, self._stored_id_field)

self._logger.info(f"indexing {len(vecs)} vectors into index {self._index}")
ids = [str(i + 1) for i in range(len(vecs))] # Add one because 0 is an invalid id in ES.
self._logger.info(f"indexing {len(X)} vectors into index {self._index}")
ids = map(lambda i: str(i + 1), range(len(X))) # Add one because 0 is an invalid id in ES.
self._eknn.index(self._index, self._vec_field, vecs, self._stored_id_field, ids, refresh=True)
self._eknn.es.indices.forcemerge(self._index, params=dict(max_num_segments=1))
self._eknn.index(self._index, self._vec_field, [], self._stored_id_field, [], refresh=True)
Expand Down
20 changes: 18 additions & 2 deletions examples/ann-benchmarks/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,25 @@ Plot results for a specific dataset:
python plot.py --dataset glove-100-angular --recompute --count 100 --y-log -o out.png
```

When debugging, it's useful to restrict the size of `X_train` and `X_test` in the `run` method in `runner.py`, e.g.
When debugging, restrict the size of `X_train` and `X_test` in the `run` method in `runner.py`, e.g.

```
X_train = numpy.array(D['train'])
X_test = numpy.array(D['test'][:500])
```
```

When debugging, use a local copy of the elastiknn client.

```
# Run in the ann-benchmarks project.
rsync -av --exclude={'venv','build','target','.minio','.git','.idea','.terraform'} ../elastiknn elastiknn
```

```
# Add these lines to the dockerfile.
COPY elastiknn /tmp/
RUN python3 -m pip install -e /tmp/elastiknn/client-python
```

Run a script that copies Elasticsearch logs into the local filesystem. Useful for inspecting logs of containers that crashed.

15 changes: 15 additions & 0 deletions examples/ann-benchmarks/slurp-logs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/bin/bash

while true;
do
for c in $(docker ps -q);
do
echo "$c"
mkdir -p /tmp/ann-benchmarks-$c
for f in $(docker exec $c ls /var/log/elasticsearch);
do
docker exec $c cat /var/log/elasticsearch/$f > /tmp/ann-benchmarks-$c/$f
done
done
sleep 1
done

0 comments on commit 134017b

Please sign in to comment.