Skip to content

Commit

Permalink
Dep updates, simplifications in client and core, use_cache parameter,…
Browse files Browse the repository at this point in the history
… CI caching (#34)

Updated to latest working scalapb and scalapb-circe versions
Split utils and implicits that were all in the same class into separate classes
Added typeclasses to simplify overloaded client and dsl methods
Added caching to github ci build
Added an experimental use_cache parameter that keeps a guava cache of vectors. This makes the dockerized single-core exact nearest neighbor ann-benchmarks take about 55 seconds and the simplest lsh take about 20 seconds. Otherwise you spend about that same amount of time just deserializing ElastiKnnVector objects from Lucene.
Bumped pre- version to reflect the client changes and caching.
  • Loading branch information
alexklibisz authored Feb 13, 2020
1 parent d553fb7 commit de009de
Show file tree
Hide file tree
Showing 40 changed files with 527 additions and 309 deletions.
49 changes: 45 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,53 @@ jobs:
runs-on: ubuntu-18.04
steps:
- uses: actions/checkout@v2
- name: Setup Java
uses: actions/setup-java@v1
with:
java-version: 12.0.2

# Secrets used for publishing.
- name: Setup Secrets
env:
GPG_SECRET_B64: ${{ secrets.GPG_SECRET_B64 }}
GRADLE_PROPERTIES_B64: ${{ secrets.GRADLE_PROPERTIES_B64 }}
PYPIRC_B64: ${{ secrets.PYPIRC_B64 }}
run: ./.github/scripts/setup-secrets.sh

# Language Setup
- name: Setup Python
run: |
python3 --version
sudo apt-get install -y python3-setuptools
- name: Setup Java
uses: actions/setup-java@v1
with:
java-version: 12.0.2

# Caching Setup
- name: Cache Gradle
uses: actions/cache@v1
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle') }}
restore-keys: |
${{ runner.os }}-gradle-
- name: Cache Pip
uses: actions/cache@v1
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements.txt') }}
restore-keys: |
${{ runner.os }}-pip
- name: Cache SBT ivy cache
uses: actions/cache@v1
with:
path: ~/.ivy2/cache
key: ${{ runner.os }}-sbt-ivy-cache-${{ hashFiles('**/build.sbt') }}
- name: Cache SBT
uses: actions/cache@v1
with:
path: ~/.sbt
key: ${{ runner.os }}-sbt-${{ hashFiles('**/build.sbt') }}

# Actual Build
- name: Compile JVM
run: make compile/gradle
- name: Compile Python
Expand All @@ -43,6 +80,8 @@ jobs:
run: make test/python
- name: Run Examples
run: make examples

# Snapshot Releases
- name: Set Version (PR)
if: github.event_name == 'pull_request'
run: |
Expand All @@ -68,5 +107,7 @@ jobs:
cp version-sonatype version
sudo snap install hub --classic
make publish/snapshot/plugin
# Cleanup
- name: Clean
run: make clean
10 changes: 10 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,21 @@ jobs:
- name: Will release run?
if: env.GET_RELEASE == '404'
run: echo Yes

# Language Setup
- name: Setup Python
if: github.event_name == 'push' && env.GET_RELEASE == '404'
run: |
python3 --version
sudo apt-get install -y python3-setuptools
- name: Setup Java
uses: actions/setup-java@v1
if: github.event_name == 'push' && env.GET_RELEASE == '404'
with:
java-version: 12.0.2

# Release
- name: Release if version changed (on push to master)
env:
GPG_SECRET_B64: ${{ secrets.GPG_SECRET_B64 }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,18 @@ import com.sksamuel.elastic4s.requests.searches.SearchResponse
import com.sksamuel.elastic4s.{ElasticClient, ElasticDsl, Executor, Handler}
import org.apache.http.HttpHost
import org.elasticsearch.client.RestClient
import com.klibisz.elastiknn.KNearestNeighborsQuery._
import com.klibisz.elastiknn.{ElastiKnnVector, ProcessorOptions}
import com.klibisz.elastiknn._

import scala.concurrent.{ExecutionContext, Future}

/**
* Client used to prepare, store, and search vectors using the ElastiKnn plugin.
* This uses elastic4s under the hood and is slightly more "opinionated" than the methods in [[ElastiKnnDsl]].
* So if you want lower-level methods, see [[ElastiKnnDsl]].
*
* @param elastic4sClient A client provided by the elastic4s library.
* @param executionContext The execution context where [[Future]]s are executed.
*/
final class ElastiKnnClient()(implicit elastic4sClient: ElasticClient, executionContext: ExecutionContext) extends AutoCloseable {

import ElastiKnnDsl._
Expand Down Expand Up @@ -47,24 +54,26 @@ final class ElastiKnnClient()(implicit elastic4sClient: ElasticClient, execution
))

/**
* Index a set of vectors.
* @param index
* @param pipelineId
* @param rawField
* @param vectors
* Index a set of vectors. It's better to use this with batches of vectors rather than single vectors.
*
* @param index The index where vectors are stored.
* @param pipelineId The pipeline used to process the vectors. Corresponds to the pipelineId used for [[ElastiKnnClient.createPipeline]].
* @param rawField The name of the field where raw vector data is stored in each document.
* @param vectors A seq of vector-like objects.
* @param ids optional list of ids. There should be one per vector, otherwise they'll be ignored.
* @param refresh if you want to immediately query for the vectors, set this to [[RefreshPolicy.Immediate]].
* @return
* @tparam V A vector-like type implementing the [[ElastiKnnVectorLike]] typeclass.
* @return Returns the elastic4s [[BulkResponse]] resulting from indexing the vectors.
*/
def indexVectors(index: String,
pipelineId: String,
rawField: String,
vectors: Seq[ElastiKnnVector],
ids: Option[Seq[String]] = None,
refresh: RefreshPolicy = RefreshPolicy.None): Future[BulkResponse] = {
def indexVectors[V: ElastiKnnVectorLike](index: String,
pipelineId: String,
rawField: String,
vectors: Seq[V],
ids: Option[Seq[String]] = None,
refresh: RefreshPolicy = RefreshPolicy.None): Future[BulkResponse] = {
val reqs = vectors.map(v => indexVector(index = index, rawField = rawField, vector = v, pipeline = Some(pipelineId)))
val withIds: Seq[IndexRequest] = ids match {
case Some(idsSeq) if (idsSeq.length == reqs.length) =>
case Some(idsSeq) if idsSeq.length == reqs.length =>
reqs.zip(idsSeq).map {
case (req, id) => req.id(id)
}
Expand All @@ -73,17 +82,27 @@ final class ElastiKnnClient()(implicit elastic4sClient: ElasticClient, execution
execute(bulk(withIds).refresh(refresh))
}

def knnQuery(index: String, options: ExactQueryOptions, vector: ElastiKnnVector, k: Int): Future[SearchResponse] =
execute(search(index).query(ElastiKnnDsl.knnQuery(options, vector)).size(k))

def knnQuery(index: String, options: ExactQueryOptions, vector: IndexedQueryVector, k: Int): Future[SearchResponse] =
execute(search(index).query(ElastiKnnDsl.knnQuery(options, vector)).size(k))

def knnQuery(index: String, options: LshQueryOptions, vector: ElastiKnnVector, k: Int): Future[SearchResponse] =
execute(search(index).query(ElastiKnnDsl.knnQuery(QueryOptions.Lsh(options), QueryVector.Given(vector))).size(k))

def knnQuery(index: String, options: LshQueryOptions, vector: IndexedQueryVector, k: Int): Future[SearchResponse] =
execute(search(index).query(ElastiKnnDsl.knnQuery(QueryOptions.Lsh(options), QueryVector.Indexed(vector))).size(k))
/**
* Run a K-Nearest-Neighbor query.
*
* @param index The index against which you're searching.
* @param options An query-option-like object implementing the [[QueryOptionsLike]] typeclass. This defines some
* elastiknn-specific options about your search request, like whether it's an exact or LSH search.
* @param queryVector A query-vector-like object implementing the [[QueryVectorLike]] typeclass. This is typically
* a vector given explicitly (using [[KNearestNeighborsQuery.QueryVector.Given]] or a reference to
* an already-indexed vector (using [[KNearestNeighborsQuery.QueryVector.Indexed]].
* @param k The number of search hits to return.
* @param useInMemoryCache Correspons to [[KNearestNeighborsQuery.useInMemoryCache]].
* @tparam O A query-option-like type implementing the [[QueryOptionsLike]] typeclass.
* @tparam V A query-vector-like type implementing the [[QueryVectorLike]] typeclass.
* @return Returns the elastic4s [[SearchResponse]].
*/
def knnQuery[O: QueryOptionsLike, V: QueryVectorLike](index: String,
options: O,
queryVector: V,
k: Int,
useInMemoryCache: Boolean = false): Future[SearchResponse] =
execute(search(index).query(ElastiKnnDsl.knnQuery(options, queryVector, useInMemoryCache)).size(k))

def close(): Unit = elastic4sClient.close()
}
Expand All @@ -98,6 +117,6 @@ object ElastiKnnClient {
new ElastiKnnClient()
}

def apply(hostname: String, port: Int)(implicit ec: ExecutionContext): ElastiKnnClient =
def apply(hostname: String = "localhost", port: Int = 9200)(implicit ec: ExecutionContext): ElastiKnnClient =
ElastiKnnClient(new HttpHost(hostname, port))
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package com.klibisz.elastiknn.client

import com.klibisz.elastiknn
import com.klibisz.elastiknn.KNearestNeighborsQuery.{ExactQueryOptions, IndexedQueryVector}
import com.klibisz.elastiknn.{ElastiKnnVector, KNearestNeighborsQuery}
import com.klibisz.elastiknn._
import com.klibisz.elastiknn.KNearestNeighborsQuery._
import com.sksamuel.elastic4s._
import com.sksamuel.elastic4s.requests.indexes.IndexRequest
import com.sksamuel.elastic4s.requests.mappings.BasicField
Expand Down Expand Up @@ -52,38 +51,30 @@ object ElastiKnnDsl {
}
}

def indexVector(index: String,
rawField: String,
vector: ElastiKnnVector,
id: Option[String] = None,
pipeline: Option[String] = None): IndexRequest = {
val xcb = XContentFactory.jsonBuilder
.rawField(rawField, JsonFormat.toJsonString(vector))
def indexVector[V: ElastiKnnVectorLike](index: String,
rawField: String,
vector: V,
id: Option[String] = None,
pipeline: Option[String] = None): IndexRequest = {
val ekv = implicitly[ElastiKnnVectorLike[V]].apply(vector)
val xcb = XContentFactory.jsonBuilder.rawField(rawField, JsonFormat.toJsonString(ekv))
IndexRequest(index, source = Some(xcb.string()), id = id, pipeline = pipeline)
}

private def knnQuery(knnq: KNearestNeighborsQuery): CustomQuery =
def knnQuery(knnq: KNearestNeighborsQuery): CustomQuery =
() => {
val json = JsonFormat.toJsonString(knnq)
XContentFactory.jsonBuilder.rawField("elastiknn_knn", json)
}

def knnQuery(options: ExactQueryOptions, vector: IndexedQueryVector): CustomQuery =
def knnQuery[O: QueryOptionsLike, V: QueryVectorLike](options: O, vector: V, useInMemoryCache: Boolean = false): CustomQuery =
knnQuery(
elastiknn.KNearestNeighborsQuery(
KNearestNeighborsQuery.QueryOptions.Exact(options),
KNearestNeighborsQuery.QueryVector.Indexed(vector)
))

def knnQuery(options: ExactQueryOptions, vector: ElastiKnnVector): CustomQuery =
knnQuery(
elastiknn.KNearestNeighborsQuery(
KNearestNeighborsQuery.QueryOptions.Exact(options),
KNearestNeighborsQuery.QueryVector.Given(vector)
))

def knnQuery(options: KNearestNeighborsQuery.QueryOptions, vector: KNearestNeighborsQuery.QueryVector): CustomQuery =
knnQuery(elastiknn.KNearestNeighborsQuery(options, vector))
KNearestNeighborsQuery(
useInMemoryCache,
implicitly[QueryOptionsLike[O]].apply(options),
implicitly[QueryVectorLike[V]].apply(vector)
)
)

def scriptScoreQuery(script: Script, filter: Option[Query] = None): CustomQuery =
() =>
Expand Down
5 changes: 3 additions & 2 deletions client-python/elastiknn/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ def knn_query(self, index: str,
options: Union[KNearestNeighborsQuery.ExactQueryOptions, KNearestNeighborsQuery.LshQueryOptions],
vector: Union[ElastiKnnVector, KNearestNeighborsQuery.IndexedQueryVector],
n_neighbors: int = 10,
source: List[str] = None):
source: List[str] = None,
use_cache: bool = False):
exact, lsh, given, indexed = None, None, None, None
if isinstance(options, KNearestNeighborsQuery.ExactQueryOptions):
exact = options
Expand All @@ -80,7 +81,7 @@ def knn_query(self, index: str,
given = vector
elif isinstance(vector, KNearestNeighborsQuery.IndexedQueryVector):
indexed = vector
query = KNearestNeighborsQuery(exact=exact, lsh=lsh, given=given, indexed=indexed)
query = KNearestNeighborsQuery(exact=exact, lsh=lsh, given=given, indexed=indexed, use_cache=use_cache)
body = dict(query=dict(elastiknn_knn=MessageToDict(query)))
if source:
body["_source"] = source
Expand Down
8 changes: 4 additions & 4 deletions client-python/elastiknn/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(self, n_neighbors: int = None, algorithm: str = 'lsh', metric: str
self._algorithm_params = algorithm_params
self._index = index
self._n_jobs = n_jobs
self._tpex = ThreadPoolExecutor(self._n_jobs)
self._field_proc = "vec_proc"
self._dataset_index_key = "dataset_index"
self._logger = Logger(self.__class__.__name__)
Expand Down Expand Up @@ -101,15 +102,14 @@ def fit(self, X: Union[np.ndarray, csr_matrix, List[ElastiKnnVector], List[Spars
return self

def kneighbors(self, X: Union[np.ndarray, csr_matrix, List[ElastiKnnVector], List[SparseBoolVector], List[FloatVector]] = None,
n_neighbors: int = None, return_distance: bool = True) -> Union[Tuple[np.ndarray, np.ndarray], np.ndarray]:
n_neighbors: int = None, return_distance: bool = True, use_cache: bool = False) -> Union[Tuple[np.ndarray, np.ndarray], np.ndarray]:
X = list(canonical_vectors_to_elastiknn(X))
if n_neighbors is None:
n_neighbors = self.n_neighbors
qopts = self._query_opts()
futures = []
with ThreadPoolExecutor(self._n_jobs) as tpex:
for x in X:
futures.append(tpex.submit(self._eknn.knn_query, self._index, qopts, x, n_neighbors, [self._dataset_index_key]))
for x in X:
futures.append(self._tpex.submit(self._eknn.knn_query, self._index, qopts, x, n_neighbors, [self._dataset_index_key], use_cache))
indices, dists = np.zeros((len(X), n_neighbors), dtype=np.uint32), np.zeros((len(X), n_neighbors), dtype=np.float)
wait(futures) # To ensure same order.
for i, future in enumerate(futures):
Expand Down
13 changes: 8 additions & 5 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,29 @@ plugins {
id 'signing'
}


dependencies {
runtime "org.scala-lang:scala-library:${scalaVersion}"
implementation "org.scala-lang:scala-library:${scalaVersion}"
runtime "com.thesamet.scalapb:scalapb-runtime_${scalaVersion}:${scalapbVersion}"
implementation "com.thesamet.scalapb:scalapb-runtime_${scalaVersion}:${scalapbVersion}"
implementation "io.github.scalapb-json:scalapb-circe_${scalaVersion}:0.5.1"
runtime "io.github.scalapb-json:scalapb-circe_${scalaVersion}:0.5.1"
implementation "io.circe:circe-generic_${scalaVersion}:0.12.2"
implementation "io.github.scalapb-json:scalapb-circe_${scalaVersion}:${scalapbCirceVersion}"
runtime "io.github.scalapb-json:scalapb-circe_${scalaVersion}:${scalapbCirceVersion}"
implementation "io.circe:circe-generic_${scalaVersion}:${circeVersion}"
implementation "com.google.guava:guava:28.1-jre"
runtime "com.google.guava:guava:28.1-jre"
implementation "com.typesafe.scala-logging:scala-logging_${scalaVersion}:3.9.2"
}

protobuf {
protoc {
artifact = 'com.google.protobuf:protoc:3.7.1'
artifact = 'com.google.protobuf:protoc:3.10.0'
}
plugins {
scalapb {
artifact = "com.thesamet.scalapb:protoc-gen-scalapb:${scalapbVersion}:unix@sh"
artifact = (org.gradle.nativeplatform.platform.internal.DefaultNativePlatform.getCurrentOperatingSystem().isWindows()) ?
"com.thesamet.scalapb:protoc-gen-scala:${scalapbVersion}:windows@bat" :
"com.thesamet.scalapb:protoc-gen-scala:${scalapbVersion}:unix@sh"
}
}

Expand Down
12 changes: 11 additions & 1 deletion core/src/main/proto/elastiknn/elastiknn.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ enum Similarity {
message ProcessorOptions {
// Path to the field where the raw vector is stored.
string field_raw = 1;

// Dimensionality of the raw floating-point vector.
uint32 dimension = 2;

// The model options which will determine how the vector gets preprocessed and stored.
// TODO: switch to sealed oneof. It simplifies the scala code a lot and doesn't really affect the other langs.
oneof model_options {
ExactModelOptions exact = 3;
JaccardLshOptions jaccard = 4;
Expand Down Expand Up @@ -52,6 +55,7 @@ message SparseBoolVector {

// Generic representation of a vector.
message ElastiKnnVector {
// TODO: switch to sealed oneof. It simplifies the scala code a lot and doesn't really affect the other langs.
oneof vector {
FloatVector float_vector = 1;
SparseBoolVector sparse_bool_vector = 2;
Expand Down Expand Up @@ -82,13 +86,19 @@ message KNearestNeighborsQuery {
string id = 3;
}

// Options defining how the exact or LSH query should be executed.
oneof query_options {
ExactQueryOptions exact = 1;
LshQueryOptions lsh = 2;
}

// The query vector, either given explicitly or as a reference to an already-indexed vector.
oneof query_vector {
ElastiKnnVector given = 3;
IndexedQueryVector indexed = 4;
}
}

// When true, maintain and use a cache of vectors in memory that avoids re-reading from the Lucene index.
// Useful when querying against data which won't change. DO NOT use if your data changes between queries.
bool use_cache = 5;
}
Loading

0 comments on commit de009de

Please sign in to comment.