diff --git a/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/Execute.scala b/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/Execute.scala index f9940e371..f7c8970f5 100644 --- a/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/Execute.scala +++ b/benchmarks/src/main/scala/com/klibisz/elastiknn/benchmarks/Execute.scala @@ -72,8 +72,9 @@ object Execute extends App { parallelism: Int): ZIO[Logging with Clock with DatasetClient with ElastiknnZioClient, Throwable, BenchmarkResult] = { // Index name is a function of dataset, mapping and holdout so we can check if it already exists and avoid re-indexing. - val trainIndexName = s"ix-${dataset.name}-${MurmurHash3.orderedHash(Seq(dataset, eknnMapping))}".toLowerCase - val testIndexName = s"$trainIndexName-test" + val trainIndex = s"ix-${dataset.name}-${MurmurHash3.orderedHash(Seq(dataset, eknnMapping))}".toLowerCase + val testIndex = s"$trainIndex-test" + val storedIdField = "id" // Create a primary and holdout index with same mappings. // Split stream of vectors into primary and holdout vectors and index them separately. @@ -81,30 +82,30 @@ object Execute extends App { def buildIndex(chunkSize: Int = 500) = { for { eknnClient <- ZIO.access[ElastiknnZioClient](_.get) - _ <- log.info(s"Creating index $trainIndexName with mapping $eknnMapping and parallelism $parallelism") - _ <- eknnClient.execute(createIndex(trainIndexName).replicas(0).shards(parallelism).indexSetting("refresh_interval", "-1")) - _ <- eknnClient.putMapping(trainIndexName, eknnQuery.field, eknnMapping) - _ <- eknnClient.execute(createIndex(testIndexName).replicas(0).shards(parallelism).indexSetting("refresh_interval", "-1")) + _ <- log.info(s"Creating index $trainIndex with mapping $eknnMapping and parallelism $parallelism") + _ <- eknnClient.execute(createIndex(trainIndex).replicas(0).shards(parallelism).indexSetting("refresh_interval", "-1")) + _ <- eknnClient.putMapping(trainIndex, eknnQuery.field, storedIdField, eknnMapping) + _ <- eknnClient.execute(createIndex(testIndex).replicas(0).shards(parallelism).indexSetting("refresh_interval", "-1")) datasets <- ZIO.access[DatasetClient](_.get) _ <- log.info(s"Indexing vectors for dataset $dataset") _ <- datasets.streamTrain(dataset).grouped(chunkSize).zipWithIndex.foreach { case (vecs, batchIndex) => - val ids = Some(vecs.indices.map(i => s"$batchIndex-$i")) + val ids = vecs.indices.map(i => s"$batchIndex-$i") for { - (dur, _) <- eknnClient.index(trainIndexName, eknnQuery.field, vecs, ids = ids).timed - _ <- log.debug(s"Indexed batch $batchIndex to $trainIndexName in ${dur.toMillis} ms") + (dur, _) <- eknnClient.index(trainIndex, eknnQuery.field, vecs, storedIdField, ids).timed + _ <- log.debug(s"Indexed batch $batchIndex to $trainIndex in ${dur.toMillis} ms") } yield () } _ <- datasets.streamTest(dataset).grouped(chunkSize).zipWithIndex.foreach { case (vecs, batchIndex) => - val ids = Some(vecs.indices.map(i => s"$batchIndex-$i")) + val ids = vecs.indices.map(i => s"$batchIndex-$i") for { - (dur, _) <- eknnClient.index(testIndexName, eknnQuery.field, vecs, ids = ids).timed - _ <- log.debug(s"Indexed batch $batchIndex to $testIndexName in ${dur.toMillis} ms") + (dur, _) <- eknnClient.index(testIndex, eknnQuery.field, vecs, storedIdField, ids).timed + _ <- log.debug(s"Indexed batch $batchIndex to $testIndex in ${dur.toMillis} ms") } yield () } - _ <- eknnClient.execute(refreshIndex(trainIndexName, testIndexName)) - _ <- eknnClient.execute(forceMerge(trainIndexName, testIndexName).maxSegments(1)) + _ <- eknnClient.execute(refreshIndex(trainIndex, testIndex)) + _ <- eknnClient.execute(forceMerge(trainIndex, testIndex).maxSegments(1)) } yield () } @@ -130,11 +131,9 @@ object Execute extends App { requests = testVecs.zipWithIndex.mapMPar(parallelism) { case (vec, i) => for { - (dur, res) <- eknnClient.nearestNeighbors(trainIndexName, eknnQuery.withVec(vec), k).timed + (dur, res) <- eknnClient.nearestNeighbors(trainIndex, eknnQuery.withVec(vec), k, storedIdField).timed _ <- if (i % 10 == 0) log.debug(s"Completed query $i in ${dur.toMillis} ms") else ZIO.succeed(()) - } yield { - QueryResult(res.result.hits.hits.map(_.id), res.result.took) - } + } yield QueryResult(res.result.hits.hits.map(_.id), res.result.took) } (dur, responses) <- requests.run(ZSink.collectAll).timed } yield (responses, dur.toMillis) @@ -144,21 +143,21 @@ object Execute extends App { eknnClient <- ZIO.access[ElastiknnZioClient](_.get) // Check if the index already exists. - _ <- log.info(s"Checking for index $trainIndexName with mapping $eknnMapping") - trainExists <- eknnClient.execute(indexExists(trainIndexName)).map(_.result.exists).catchSome { + _ <- log.info(s"Checking for index $trainIndex with mapping $eknnMapping") + trainExists <- eknnClient.execute(indexExists(trainIndex)).map(_.result.exists).catchSome { case _: ElastiknnClient.StrictFailureException => ZIO.succeed(false) } - testExists <- eknnClient.execute(indexExists(testIndexName)).map(_.result.exists).catchSome { + testExists <- eknnClient.execute(indexExists(testIndex)).map(_.result.exists).catchSome { case _: ElastiknnClient.StrictFailureException => ZIO.succeed(false) } // Create the index if primary and holdout don't exist. _ <- if (trainExists && testExists) - log.info(s"Found indices $trainIndexName and $testIndexName") + log.info(s"Found indices $trainIndex and $testIndex") else buildIndex() // Load a stream of vectors from the holdout index. - testVecs <- streamFromIndex(testIndexName) + testVecs <- streamFromIndex(testIndex) // Run searches on the holdout vectors. (singleResults, totalDuration) <- search(testVecs) diff --git a/changelog.md b/changelog.md index 7d411b56e..431c31baa 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,6 @@ +- Switched scala client to store the ID as a doc-value field. This avoids decompressing the document source + when reading results, which is about 40% faster on benchmarks for both exact and approx. search. +--- - Re-implemented LSH and sparse-indexed queries using an optimized custom Lucene query based on the [TermInSetQuery](https://lucene.apache.org/core/8_5_0/core/org/apache/lucene/search/TermInSetQuery.html). This is 3-5x faster on LSH benchmarks. - Updated L1, and L2 similarities such that they're bounded in [0,1]. diff --git a/client-elastic4s/src/main/scala/com/klibisz/elastiknn/client/ElastiknnClient.scala b/client-elastic4s/src/main/scala/com/klibisz/elastiknn/client/ElastiknnClient.scala index 7b5eb13b4..925552ff8 100644 --- a/client-elastic4s/src/main/scala/com/klibisz/elastiknn/client/ElastiknnClient.scala +++ b/client-elastic4s/src/main/scala/com/klibisz/elastiknn/client/ElastiknnClient.scala @@ -5,9 +5,7 @@ import com.sksamuel.elastic4s.ElasticDsl._ import com.sksamuel.elastic4s._ import com.sksamuel.elastic4s.http.JavaClient import com.sksamuel.elastic4s.requests.bulk.{BulkResponse, BulkResponseItem} -import com.sksamuel.elastic4s.requests.common.RefreshPolicy import com.sksamuel.elastic4s.requests.indexes.PutMappingResponse -import com.sksamuel.elastic4s.requests.mappings.PutMappingRequest import com.sksamuel.elastic4s.requests.searches.{SearchRequest, SearchResponse} import org.apache.http.HttpHost import org.elasticsearch.client.RestClient @@ -18,35 +16,63 @@ import scala.language.higherKinds trait ElastiknnClient[F[_]] extends AutoCloseable { + /** + * Underlying client from the elastic4s library. + */ val elasticClient: ElasticClient + /** + * Abstract method for executing a request. + */ def execute[T, U](t: T)(implicit handler: Handler[T, U], manifest: Manifest[U]): F[Response[U]] - def putMapping(index: String, field: String, mapping: Mapping): F[Response[PutMappingResponse]] = - execute(ElastiknnRequests.putMapping(index, field, mapping)) - - def index(index: String, - field: String, - vecs: Seq[Vec], - ids: Option[Seq[String]] = None, - refresh: RefreshPolicy = RefreshPolicy.NONE): F[Response[BulkResponse]] = { - val reqs = vecs.map(v => ElastiknnRequests.indexVec(index, field, v)) - val withIds = ids match { - case Some(idSeq) if idSeq.length == reqs.length => - reqs.zip(idSeq).map { - case (req, id) => req.id(id) - } - case _ => reqs + /** + * See [[ElastiknnRequests.putMapping()]]. + */ + def putMapping(index: String, vecField: String, storedIdField: String, vecMapping: Mapping): F[Response[PutMappingResponse]] = + execute(ElastiknnRequests.putMapping(index, vecField, storedIdField, vecMapping)) + + /** + * Index a batch of vectors as new Elasticsearch docs, one doc per vector. + * Also see [[ElastiknnRequests.index()]]. + * + * @param index Index where vectors are stored. + * @param vecField Field in each doc where vector is stored. + * @param vecs Sequence of vectors to store. + * @param storedIdField Field in each doc where ID is stored as a doc value. + * @param ids Sequence of ids. Assumed one-to-one correspondence to given vectors. + * @return [[Response]] containing [[BulkResponse]] containing indexing responses. + */ + def index(index: String, vecField: String, vecs: Seq[Vec], storedIdField: String, ids: Seq[String]): F[Response[BulkResponse]] = { + val reqs = vecs.zip(ids).map { + case (vec, id) => ElastiknnRequests.index(index, vecField, vec, storedIdField, id) } - execute(bulk(withIds).refresh(refresh)) + execute(bulk(reqs)) } - def nearestNeighbors(index: String, - query: NearestNeighborsQuery, - k: Int, - fetchSource: Boolean = false, - preference: Option[String] = None): F[Response[SearchResponse]] = - execute(ElastiknnRequests.nearestNeighborsQuery(index, query, k, fetchSource, preference)) + /** + * See [[ElastiknnRequests.nearestNeighbors()]]. + */ + def nearestNeighbors(index: String, query: NearestNeighborsQuery, k: Int, storedIdField: String): F[Response[SearchResponse]] = { + + // Handler that reads the id from the stored field and places it in the id field. + // Otherwise it will be null since [[ElastiknnRequests.nearestNeighbors]] doesn't return stored fields. + implicit val handler: Handler[SearchRequest, SearchResponse] = new Handler[SearchRequest, SearchResponse] { + override def build(t: SearchRequest): ElasticRequest = SearchHandler.build(t) + override def responseHandler: ResponseHandler[SearchResponse] = (response: HttpResponse) => { + val handled: Either[ElasticError, SearchResponse] = SearchHandler.responseHandler.handle(response) + handled.map { sr: SearchResponse => + val hitsWithIds = sr.hits.hits.map(h => + h.copy(id = h.fields.get(storedIdField) match { + case Some(List(id: String)) => id + case _ => "" + })) + sr.copy(hits = sr.hits.copy(hits = hitsWithIds)) + } + } + } + execute(ElastiknnRequests.nearestNeighbors(index, query, k, storedIdField))(handler, implicitly[Manifest[SearchResponse]]) + } } diff --git a/client-elastic4s/src/main/scala/com/klibisz/elastiknn/client/ElastiknnRequests.scala b/client-elastic4s/src/main/scala/com/klibisz/elastiknn/client/ElastiknnRequests.scala index c1fffb224..f89678818 100644 --- a/client-elastic4s/src/main/scala/com/klibisz/elastiknn/client/ElastiknnRequests.scala +++ b/client-elastic4s/src/main/scala/com/klibisz/elastiknn/client/ElastiknnRequests.scala @@ -1,5 +1,6 @@ package com.klibisz.elastiknn.client +import com.klibisz.elastiknn.ELASTIKNN_NAME import com.klibisz.elastiknn.api.{ElasticsearchCodec, Mapping, NearestNeighborsQuery, Vec} import com.sksamuel.elastic4s.{ElasticDsl, Indexes, XContentBuilder, XContentFactory} import com.sksamuel.elastic4s.requests.indexes.IndexRequest @@ -7,36 +8,76 @@ import com.sksamuel.elastic4s.requests.mappings.PutMappingRequest import com.sksamuel.elastic4s.requests.searches.SearchRequest import com.sksamuel.elastic4s.requests.searches.queries.CustomQuery +/** + * Methods for creating Elastic4s requests for common elastiknn tasks. + * Methods are optimized for documents containing only a vector and running searches as quickly as possible. + * For example, we store the ID as a doc value instead of using the default stored ID which is slower to access. + * I am open to creating less-optimized, more-convenient methods in the future. + */ trait ElastiknnRequests { - def indexVec(indexName: String, fieldName: String, vec: Vec, id: Option[String] = None): IndexRequest = { - val xcb = XContentFactory.jsonBuilder.rawField(fieldName, ElasticsearchCodec.nospaces(vec)) - IndexRequest(indexName, source = Some(xcb.string()), id = id) + /** + * Create a request for indexing a vector. + * + * @param index Name of the index. + * @param vecField Field where vector is stored. + * @param vec Vector to index. + * @param storedIdField Field where document ID is stored. + * @param id Document ID. Stored as the ID known by Elasticsearch, and in the document for faster retrieval. + * @return Instance of a [[com.sksamuel.elastic4s.requests.indexes.IndexRequest]]. + */ + def index(index: String, vecField: String, vec: Vec, storedIdField: String, id: String): IndexRequest = { + val xcb = XContentFactory.jsonBuilder.rawField(vecField, ElasticsearchCodec.nospaces(vec)).field(storedIdField, id) + IndexRequest(index, source = Some(xcb.string()), id = Some(id)) } - def nearestNeighborsQuery(index: String, - query: NearestNeighborsQuery, - k: Int, - fetchSource: Boolean = false, - preference: Option[String] = None): SearchRequest = { + /** + * Create a request for running a nearest neighbors query. + * Optimized for high performance, so it returns the document ID in the body. + * Sets the preference parameter (see: https://www.elastic.co/guide/en/elasticsearch/reference/master/consistent-scoring.html) + * as the hash of the query for more deterministic results. + * + * @param index Index being searched against. + * @param query Constructed query, containing the vector, field, etc. + * @param k Number of results to return. + * @param storedIdField Field containing the document ID. See [[ElastiknnRequests.index()]] method. + * @return Instance of [[com.sksamuel.elastic4s.requests.searches.SearchRequest]]. + */ + def nearestNeighbors(index: String, query: NearestNeighborsQuery, k: Int, storedIdField: String): SearchRequest = { val json = ElasticsearchCodec.nospaces(query) val customQuery = new CustomQuery { - override def buildQueryBody(): XContentBuilder = XContentFactory.jsonBuilder.rawField("elastiknn_nearest_neighbors", json) - } - val request = ElasticDsl.search(index).query(customQuery).fetchSource(fetchSource).size(k) - // https://www.elastic.co/guide/en/elasticsearch/reference/master/consistent-scoring.html - preference match { - case Some(pref) => request.preference(pref) - case None => request + override def buildQueryBody(): XContentBuilder = + XContentFactory.jsonBuilder.rawField(s"${ELASTIKNN_NAME}_nearest_neighbors", json) } + ElasticDsl + .search(index) + .query(customQuery) + .fetchSource(false) + .storedFields("_none_") + .docValues(Seq(storedIdField)) + .preference(query.hashCode.toString) + .size(k) } - def putMapping(index: String, field: String, mapping: Mapping): PutMappingRequest = { + /** + * Create a mapping containing a vector field and a stored ID field. + * + * @param index Index to which this mapping is applied. + * @param vecField Field where vector is stored. + * @param storedIdField Field where ID is stored. + * @param vecMapping Mapping for the stored vector. + * @return Instance of [[com.sksamuel.elastic4s.requests.mappings.PutMappingRequest]]. + */ + def putMapping(index: String, vecField: String, storedIdField: String, vecMapping: Mapping): PutMappingRequest = { val mappingJsonString = s""" |{ | "properties": { - | "$field": ${ElasticsearchCodec.encode(mapping).spaces2} + | "$vecField": ${ElasticsearchCodec.encode(vecMapping).spaces2}, + | "$storedIdField": { + | "type": "keyword", + | "store": true + | } | } |} |""".stripMargin diff --git a/core/build.gradle b/core/build.gradle index 5114f09aa..2e208b3f1 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -1,3 +1,8 @@ +// TODO: split this project into core and api. +// api should just contain the api objects and codecs. +// elastic4s-client and core should both depend on api. +// No reason for the client to have access to the LSH models, etc. + buildscript { repositories { mavenLocal() diff --git a/core/src/main/scala/com/klibisz/elastiknn/storage/StoredVec.scala b/core/src/main/scala/com/klibisz/elastiknn/storage/StoredVec.scala index 12bd8f8b4..6be6de890 100644 --- a/core/src/main/scala/com/klibisz/elastiknn/storage/StoredVec.scala +++ b/core/src/main/scala/com/klibisz/elastiknn/storage/StoredVec.scala @@ -1,5 +1,7 @@ package com.klibisz.elastiknn.storage +import java.util + import com.klibisz.elastiknn.api.Vec import scala.language.implicitConversions @@ -30,6 +32,7 @@ object StoredVec { sealed trait SparseBool extends StoredVec { val trueIndices: Array[Int] + override def hashCode: Int = util.Arrays.hashCode(trueIndices) } object SparseBool { @@ -40,6 +43,7 @@ object StoredVec { sealed trait DenseFloat extends StoredVec { val values: Array[Float] + override def hashCode: Int = util.Arrays.hashCode(values) } object DenseFloat { @@ -77,6 +81,8 @@ object StoredVec { new DenseFloat { override val values: Array[Float] = UnsafeSerialization.readFloats(barr, offset, length) } + implicit def derived[V <: Vec, S <: StoredVec](implicit codec: Codec[V, S]): Decoder[S] = + (barr: Array[Byte], offset: Int, length: Int) => codec.decode(barr, offset, length) } trait Encoder[V <: Vec] { diff --git a/docs/pages/api.md b/docs/pages/api.md index d5b3113ba..d2bafc3ca 100644 --- a/docs/pages/api.md +++ b/docs/pages/api.md @@ -690,3 +690,12 @@ From Elasticsearch's perspective, the `elastiknn_nearest_neighbors` query is no Elasticsearch receives a JSON query containing an `elastiknn_nearest_neighbors` key, passes the JSON to a parser implemented by Elastiknn, the parser produces a Lucene query, and Elasticsearch executes that query on each shard in the index. This means the simplest way to increase query parallelism is to add shards to your index. Obviously this has an upper limit, but the general performance implications of sharding are beyond the scope of this document. + +### Use stored fields for faster queries + +This is a fairly well-known Elasticsearch optimization that applies nicely to some elastiknn use cases. +If you only need to retrieve a small subset of the document source (e.g. only the ID), you can store the +relavant fields as `stored` fields to get a meaningful speedup. +The Elastiknn scala client uses this optimization to store and retrieve document IDs, yielding a ~40% speedup. +The setting [is documented here](https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-store.html) +and discussed in detail [in this Github issue.](https://github.com/elastic/elasticsearch/issues/17159) diff --git a/examples/demo/webapp/app/controllers/DemoController.scala b/examples/demo/webapp/app/controllers/DemoController.scala index 55e8482de..777e93c29 100644 --- a/examples/demo/webapp/app/controllers/DemoController.scala +++ b/examples/demo/webapp/app/controllers/DemoController.scala @@ -38,7 +38,7 @@ class DemoController @Inject()(val controllerComponents: ControllerComponents, p case (accF, ex) => for { acc <- accF - q = nearestNeighborsQuery(ex.index, ex.query.withVec(Vec.Indexed(ex.index, queryId, ex.field)), 10, true) + q = nearestNeighbors(ex.index, ex.query.withVec(Vec.Indexed(ex.index, queryId, ex.field)), 10, "id") response <- eknn.execute(q) hits = response.result.hits.hits.toSeq results <- Future.traverse(hits.map(ds.parseHit))(Future.fromTry) diff --git a/examples/demo/webapp/app/models/Dataset.scala b/examples/demo/webapp/app/models/Dataset.scala index 562d16bfa..1a4d9c97f 100644 --- a/examples/demo/webapp/app/models/Dataset.scala +++ b/examples/demo/webapp/app/models/Dataset.scala @@ -31,7 +31,7 @@ object Dataset extends ElastiknnRequests { name, index, "vec", - putMapping(index, "vec", mapping), + putMapping(index, "vec", "id", mapping), mkQuery("vec", Vec.Indexed(index, "1", "vec")) ) diff --git a/plugin/src/main/scala/com/klibisz/elastiknn/ElastiKnnPlugin.scala b/plugin/src/main/scala/com/klibisz/elastiknn/ElastiKnnPlugin.scala index f42df300e..bc0aae46e 100644 --- a/plugin/src/main/scala/com/klibisz/elastiknn/ElastiKnnPlugin.scala +++ b/plugin/src/main/scala/com/klibisz/elastiknn/ElastiKnnPlugin.scala @@ -4,10 +4,18 @@ import java.util import com.klibisz.elastiknn.mapper.VectorMapper import com.klibisz.elastiknn.query._ -import org.elasticsearch.common.settings.Settings +import org.elasticsearch.client.Client +import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.common.io.stream.NamedWriteableRegistry +import org.elasticsearch.common.settings.{Setting, Settings} +import org.elasticsearch.common.xcontent.NamedXContentRegistry +import org.elasticsearch.env.{Environment, NodeEnvironment} import org.elasticsearch.index.mapper.Mapper import org.elasticsearch.plugins.SearchPlugin.QuerySpec import org.elasticsearch.plugins._ +import org.elasticsearch.script.ScriptService +import org.elasticsearch.threadpool.ThreadPool +import org.elasticsearch.watcher.ResourceWatcherService class ElastiKnnPlugin(settings: Settings) extends Plugin with IngestPlugin with SearchPlugin with ActionPlugin with MapperPlugin { diff --git a/plugin/src/main/scala/com/klibisz/elastiknn/query/ExactQuery.scala b/plugin/src/main/scala/com/klibisz/elastiknn/query/ExactQuery.scala index 36964fbd3..8195b3b40 100644 --- a/plugin/src/main/scala/com/klibisz/elastiknn/query/ExactQuery.scala +++ b/plugin/src/main/scala/com/klibisz/elastiknn/query/ExactQuery.scala @@ -6,6 +6,7 @@ import com.klibisz.elastiknn.ELASTIKNN_NAME import com.klibisz.elastiknn.api.Vec import com.klibisz.elastiknn.models.ExactSimilarityFunction import com.klibisz.elastiknn.storage.StoredVec +import com.klibisz.elastiknn.storage.StoredVec.Decoder import org.apache.lucene.document.BinaryDocValuesField import org.apache.lucene.index.{IndexableField, LeafReaderContext} import org.apache.lucene.search.{DocValuesFieldExistsQuery, Explanation} @@ -19,18 +20,14 @@ object ExactQuery { extends ScoreFunction(CombineFunction.REPLACE) { override def getLeafScoreFunction(ctx: LeafReaderContext): LeafScoreFunction = { - val vecDocVals = ctx.reader.getBinaryDocValues(vectorDocValuesField(field)) + val cachedReader = new StoredVecReader[S](ctx, field) new LeafScoreFunction { - override def score(docId: Int, subQueryScore: Float): Double = - if (vecDocVals.advanceExact(docId)) { - val binVal = vecDocVals.binaryValue() - val storedVec = codec.decode(binVal.bytes, binVal.offset, binVal.length) - simFunc(queryVec, storedVec) - } else throw new RuntimeException(s"Couldn't advance to doc with id [$docId]") - - override def explainScore(docId: Int, subQueryScore: Explanation): Explanation = { - Explanation.`match`(100, s"Elastiknn exact query") + override def score(docId: Int, subQueryScore: Float): Double = { + val storedVec = cachedReader(docId) + simFunc(queryVec, storedVec) } + override def explainScore(docId: Int, subQueryScore: Explanation): Explanation = + Explanation.`match`(100, s"Elastiknn exact query") } } @@ -44,19 +41,41 @@ object ExactQuery { override def doHashCode(): Int = Objects.hash(field, queryVec, simFunc) } + /** + * Helper class that makes it easy to read vectors that were stored using the conventions in this class. + */ + final class StoredVecReader[S <: StoredVec: Decoder](lrc: LeafReaderContext, field: String) { + private val vecDocVals = lrc.reader.getBinaryDocValues(vecDocValuesField(field)) + + def apply(docId: Int): S = + if (vecDocVals.advanceExact(docId)) { + val bytesRef = vecDocVals.binaryValue() + implicitly[StoredVec.Decoder[S]].apply(bytesRef.bytes, bytesRef.offset, bytesRef.length) + } else throw new RuntimeException(s"Couldn't advance to binary doc values for doc with id [$docId]") + + } + + /** + * Instantiate an exact query, implemented as an Elasticsearch [[FunctionScoreQuery]]. + */ def apply[V <: Vec, S <: StoredVec](field: String, queryVec: V, simFunc: ExactSimilarityFunction[V, S])( implicit codec: StoredVec.Codec[V, S]): FunctionScoreQuery = { - val subQuery = new DocValuesFieldExistsQuery(vectorDocValuesField(field)) + val subQuery = new DocValuesFieldExistsQuery(vecDocValuesField(field)) val func = new ExactScoreFunction(field, queryVec, simFunc) new FunctionScoreQuery(subQuery, func) } - // Docvalue fields can have a custom name, but "regular" values (e.g. Terms) must keep the name of the field. - def vectorDocValuesField(field: String): String = s"$field.$ELASTIKNN_NAME.vector" + /** + * Appends to the given field name to produce the name where a vector is stored. + */ + def vecDocValuesField(field: String): String = s"$field.$ELASTIKNN_NAME.vector" + /** + * Creates and returns a single indexable field that stores the vector contents as a [[BinaryDocValuesField]]. + */ def index[V <: Vec: StoredVec.Encoder](field: String, vec: V): Seq[IndexableField] = { - val bytes = implicitly[StoredVec.Encoder[V]].apply(vec) - Seq(new BinaryDocValuesField(vectorDocValuesField(field), new BytesRef(bytes))) + val storedVec = implicitly[StoredVec.Encoder[V]].apply(vec) + Seq(new BinaryDocValuesField(vecDocValuesField(field), new BytesRef(storedVec))) } } diff --git a/plugin/src/main/scala/com/klibisz/elastiknn/query/LshQuery.scala b/plugin/src/main/scala/com/klibisz/elastiknn/query/LshQuery.scala index 777fa8b75..f1f5e149c 100644 --- a/plugin/src/main/scala/com/klibisz/elastiknn/query/LshQuery.scala +++ b/plugin/src/main/scala/com/klibisz/elastiknn/query/LshQuery.scala @@ -23,13 +23,10 @@ object LshQuery { val terms = lshFunction(query).map(h => new BytesRef(UnsafeSerialization.writeInt(h))) val scoreFunction = (lrc: LeafReaderContext) => { - val binaryDocValues = lrc.reader.getBinaryDocValues(ExactQuery.vectorDocValuesField(field)) - (docId: Int, _: Int) => - if (binaryDocValues.advanceExact(docId)) { - val bref = binaryDocValues.binaryValue() - val storedVec = codec.decode(bref.bytes, bref.offset, bref.length) - lshFunction.exact(query, storedVec) - } else throw new RuntimeException(s"Couldn't advance to doc with id [$docId]") + val cachedReader = new ExactQuery.StoredVecReader[S](lrc, field) + (docId: Int, approximateScore: Int) => + val storedVec = cachedReader(docId) + lshFunction.exact(query, storedVec) } new MatchTermsAndScoreQuery( diff --git a/plugin/src/main/scala/org/apache/lucene/search/MatchTermsAndScoreQuery.scala b/plugin/src/main/scala/org/apache/lucene/search/MatchTermsAndScoreQuery.scala index d33ad28a3..9ef816b6a 100644 --- a/plugin/src/main/scala/org/apache/lucene/search/MatchTermsAndScoreQuery.scala +++ b/plugin/src/main/scala/org/apache/lucene/search/MatchTermsAndScoreQuery.scala @@ -14,8 +14,9 @@ import scala.collection.mutable.ArrayBuffer /** * Custom query optimized to find the set of candidate documents that contain the greatest number of the given terms. - * Then scores the the top `candidates` candidates using the given ScoreFunction. - * Inspired by the TermInSetQuery. + * Then scores the top `candidates` docs using the given ScoreFunction. + * Inspired by Lucene's TermInSetQuery. + * * @param termsField Field containing tokens. * @param terms Set of tokens, serialized to Bytesrefs. * @param candidates Number of top candidates to pick and score per _segment_. diff --git a/testing/src/test/scala/com/klibisz/elastiknn/mapper/VectorMapperSuite.scala b/testing/src/test/scala/com/klibisz/elastiknn/mapper/VectorMapperSuite.scala index 4fa049b0b..c9515aafc 100644 --- a/testing/src/test/scala/com/klibisz/elastiknn/mapper/VectorMapperSuite.scala +++ b/testing/src/test/scala/com/klibisz/elastiknn/mapper/VectorMapperSuite.scala @@ -20,7 +20,8 @@ class VectorMapperSuite extends AsyncFunSuite with Matchers with Inspectors with implicit val rng: Random = new Random(0) test("create index and put mapping") { - val indexName = s"test-${UUID.randomUUID()}" + val index = s"test-${UUID.randomUUID()}" + val storedIdField = "id" val mappings: Seq[(String, Mapping)] = Seq( ("vec_spv", Mapping.SparseBool(100)), ("vec_dfv", Mapping.DenseFloat(100)), @@ -28,16 +29,16 @@ class VectorMapperSuite extends AsyncFunSuite with Matchers with Inspectors with ("vec_jcdlsh", Mapping.JaccardLsh(100, 65, 1)) ) for { - createIndexRes <- eknn.execute(createIndex(indexName)) + createIndexRes <- eknn.execute(createIndex(index)) _ = createIndexRes.shouldBeSuccess putMappingReqs = mappings.map { - case (fieldName, mapping) => eknn.putMapping(indexName, fieldName, mapping) + case (vecField, mapping) => eknn.putMapping(index, vecField, storedIdField, mapping) } _ <- Future.sequence(putMappingReqs) getMappingReqs = mappings.map { - case (fieldName, _) => eknn.execute(getMapping(Indexes(indexName), fieldName)) + case (fieldName, _) => eknn.execute(getMapping(Indexes(index), fieldName)) } getMappingRes <- Future.sequence(getMappingReqs) } yield @@ -70,7 +71,7 @@ class VectorMapperSuite extends AsyncFunSuite with Matchers with Inspectors with val mappingJsonOpt: Option[JsonObject] = for { x <- json.toOption - x <- x.findAllByKey(indexName).headOption + x <- x.findAllByKey(index).headOption x <- x.findAllByKey("mappings").headOption x <- x.findAllByKey(fieldName).headOption x <- x.findAllByKey("mapping").headOption @@ -86,7 +87,8 @@ class VectorMapperSuite extends AsyncFunSuite with Matchers with Inspectors with } test("store and read vectors") { - val fieldName = "vec" + val vecField = "vec" + val storedIdField = "id" val (dims, n) = (100, 10) def ids: Seq[String] = (0 until n).map(_ => UUID.randomUUID().toString) val inputs: Seq[(String, Mapping, Vector[Vec], Seq[String])] = Seq( @@ -104,19 +106,20 @@ class VectorMapperSuite extends AsyncFunSuite with Matchers with Inspectors with case (indexName, mapping, _, _) => for { _ <- eknn.execute(createIndex(indexName)) - _ <- eknn.putMapping(indexName, fieldName, mapping) + _ <- eknn.putMapping(indexName, vecField, storedIdField, mapping) } yield () } _ <- Future.sequence(putMappingReqs) indexReqs = inputs.map { - case (indexName, _, vecs, ids) => eknn.index(indexName, fieldName, vecs, Some(ids), refresh = RefreshPolicy.IMMEDIATE) + case (indexName, _, vecs, ids) => eknn.index(indexName, vecField, vecs, storedIdField, ids) } _ <- Future.sequence(indexReqs) + _ <- eknn.execute(refreshIndex(inputs.map(_._1))) getReqs = inputs.map { case (indexName, _, _, ids) => - Future.sequence(ids.map(id => eknn.execute(get(indexName, id).fetchSourceInclude(fieldName)))) + Future.sequence(ids.map(id => eknn.execute(get(indexName, id).fetchSourceInclude(vecField)))) } getResponses: Seq[Seq[Response[GetResponse]]] <- Future.sequence(getReqs) @@ -127,29 +130,30 @@ class VectorMapperSuite extends AsyncFunSuite with Matchers with Inspectors with getResponses should have length vectors.length val parsedVectors = getResponses.map(_.result.sourceAsString).map(parse) forAll(parsedVectors)(_ shouldBe 'right) - val encodedVectors = vectors.map(v => Json.fromJsonObject(JsonObject(fieldName -> ElasticsearchCodec.encode(v)))) + val encodedVectors = vectors.map(v => Json.fromJsonObject(JsonObject(vecField -> ElasticsearchCodec.encode(v)))) forAll(encodedVectors)(v => parsedVectors should contain(Right(v))) } } test("throw an error given vector with bad dimensions") { - val indexName = s"test-intentional-failure-${UUID.randomUUID()}" + val index = s"test-intentional-failure-${UUID.randomUUID()}" + val storedIdField = "id" val dims = 100 val inputs = Seq( ("intentional-failure-sbv", Mapping.SparseBool(dims), Vec.SparseBool.random(dims + 1)), ("intentional-failure-dfv", Mapping.DenseFloat(dims), Vec.DenseFloat.random(dims + 1)) ) for { - _ <- eknn.execute(createIndex(indexName)) + _ <- eknn.execute(createIndex(index)) putMappingReqs = inputs.map { - case (fieldName, mapping, _) => eknn.putMapping(indexName, fieldName, mapping) + case (fieldName, mapping, _) => eknn.putMapping(index, fieldName, storedIdField, mapping) } _ <- Future.sequence(putMappingReqs) indexReqs = inputs.map { case (fieldName, _, vec) => recoverToExceptionIf[RuntimeException] { - eknn.index(indexName, fieldName, Seq(vec), refresh = RefreshPolicy.IMMEDIATE) + eknn.index(index, fieldName, Seq(vec), storedIdField, Seq(UUID.randomUUID().toString)) } } exceptions <- Future.sequence(indexReqs) diff --git a/testing/src/test/scala/com/klibisz/elastiknn/query/NearestNeighborsQueryRecallSuite.scala b/testing/src/test/scala/com/klibisz/elastiknn/query/NearestNeighborsQueryRecallSuite.scala index de03eedbd..207119259 100644 --- a/testing/src/test/scala/com/klibisz/elastiknn/query/NearestNeighborsQueryRecallSuite.scala +++ b/testing/src/test/scala/com/klibisz/elastiknn/query/NearestNeighborsQueryRecallSuite.scala @@ -26,7 +26,8 @@ class NearestNeighborsQueryRecallSuite extends AsyncFunSuite with Matchers with // Each query has an expected recall that will be checked. private case class Test(mapping: Mapping, queriesAndExpectedRecall: Seq[(NearestNeighborsQuery, Double)]) - private val fieldName: String = "vec" + private val vecField: String = "vec" + private val storedIdField: String = "id" private val dims: Int = 1024 private val k: Int = 100 private val shards: Int = 2 @@ -41,91 +42,91 @@ class NearestNeighborsQueryRecallSuite extends AsyncFunSuite with Matchers with Test( Mapping.SparseBool(dims), Seq( - NearestNeighborsQuery.Exact(fieldName, Similarity.Jaccard) -> 1d, - NearestNeighborsQuery.Exact(fieldName, Similarity.Hamming) -> 1d + NearestNeighborsQuery.Exact(vecField, Similarity.Jaccard) -> 1d, + NearestNeighborsQuery.Exact(vecField, Similarity.Hamming) -> 1d ) ), Test( Mapping.DenseFloat(dims), Seq( - NearestNeighborsQuery.Exact(fieldName, Similarity.L1) -> 1d, - NearestNeighborsQuery.Exact(fieldName, Similarity.L2) -> 1d, - NearestNeighborsQuery.Exact(fieldName, Similarity.Angular) -> 1d + NearestNeighborsQuery.Exact(vecField, Similarity.L1) -> 1d, + NearestNeighborsQuery.Exact(vecField, Similarity.L2) -> 1d, + NearestNeighborsQuery.Exact(vecField, Similarity.Angular) -> 1d ) ), // SparseIndexed Test( Mapping.SparseIndexed(dims), Seq( - NearestNeighborsQuery.Exact(fieldName, Similarity.Jaccard) -> 1d, - NearestNeighborsQuery.Exact(fieldName, Similarity.Hamming) -> 1d, - NearestNeighborsQuery.SparseIndexed(fieldName, Similarity.Jaccard) -> 1d, - NearestNeighborsQuery.SparseIndexed(fieldName, Similarity.Hamming) -> 1d + NearestNeighborsQuery.Exact(vecField, Similarity.Jaccard) -> 1d, + NearestNeighborsQuery.Exact(vecField, Similarity.Hamming) -> 1d, + NearestNeighborsQuery.SparseIndexed(vecField, Similarity.Jaccard) -> 1d, + NearestNeighborsQuery.SparseIndexed(vecField, Similarity.Hamming) -> 1d ) ), // Jaccard LSH Test( Mapping.JaccardLsh(dims, 200, 1), Seq( - NearestNeighborsQuery.Exact(fieldName, Similarity.Jaccard) -> 1d, - NearestNeighborsQuery.Exact(fieldName, Similarity.Hamming) -> 1d, - NearestNeighborsQuery.JaccardLsh(fieldName, 400) -> 0.73, - NearestNeighborsQuery.JaccardLsh(fieldName, 800) -> 0.89 + NearestNeighborsQuery.Exact(vecField, Similarity.Jaccard) -> 1d, + NearestNeighborsQuery.Exact(vecField, Similarity.Hamming) -> 1d, + NearestNeighborsQuery.JaccardLsh(vecField, 400) -> 0.73, + NearestNeighborsQuery.JaccardLsh(vecField, 800) -> 0.89 ) ), Test( Mapping.JaccardLsh(dims, 300, 2), Seq( - NearestNeighborsQuery.JaccardLsh(fieldName, 400) -> 0.72, - NearestNeighborsQuery.JaccardLsh(fieldName, 800) -> 0.86 + NearestNeighborsQuery.JaccardLsh(vecField, 400) -> 0.72, + NearestNeighborsQuery.JaccardLsh(vecField, 800) -> 0.86 ) ), // Hamming LSH Test( Mapping.HammingLsh(dims, dims * 5 / 10), Seq( - NearestNeighborsQuery.Exact(fieldName, Similarity.Jaccard) -> 1d, - NearestNeighborsQuery.Exact(fieldName, Similarity.Hamming) -> 1d, - NearestNeighborsQuery.HammingLsh(fieldName, 200) -> 0.71, - NearestNeighborsQuery.HammingLsh(fieldName, 400) -> 0.86 + NearestNeighborsQuery.Exact(vecField, Similarity.Jaccard) -> 1d, + NearestNeighborsQuery.Exact(vecField, Similarity.Hamming) -> 1d, + NearestNeighborsQuery.HammingLsh(vecField, 200) -> 0.71, + NearestNeighborsQuery.HammingLsh(vecField, 400) -> 0.86 ) ), Test( Mapping.HammingLsh(dims, dims * 7 / 10), Seq( - NearestNeighborsQuery.HammingLsh(fieldName, 200) -> 0.89, - NearestNeighborsQuery.HammingLsh(fieldName, 400) -> 0.97 + NearestNeighborsQuery.HammingLsh(vecField, 200) -> 0.89, + NearestNeighborsQuery.HammingLsh(vecField, 400) -> 0.97 ) ), // Angular Lsh Test( Mapping.AngularLsh(dims, 400, 1), Seq( - NearestNeighborsQuery.Exact(fieldName, Similarity.L1) -> 1d, - NearestNeighborsQuery.Exact(fieldName, Similarity.L2) -> 1d, - NearestNeighborsQuery.Exact(fieldName, Similarity.Angular) -> 1d, - NearestNeighborsQuery.AngularLsh(fieldName, 400) -> 0.48, - NearestNeighborsQuery.AngularLsh(fieldName, 800) -> 0.69 + NearestNeighborsQuery.Exact(vecField, Similarity.L1) -> 1d, + NearestNeighborsQuery.Exact(vecField, Similarity.L2) -> 1d, + NearestNeighborsQuery.Exact(vecField, Similarity.Angular) -> 1d, + NearestNeighborsQuery.AngularLsh(vecField, 400) -> 0.48, + NearestNeighborsQuery.AngularLsh(vecField, 800) -> 0.69 ) ), Test( Mapping.AngularLsh(dims, 400, 2), Seq( - NearestNeighborsQuery.AngularLsh(fieldName, 200) -> 0.36, - NearestNeighborsQuery.AngularLsh(fieldName, 400) -> 0.52, - NearestNeighborsQuery.AngularLsh(fieldName, 800) -> 0.74 + NearestNeighborsQuery.AngularLsh(vecField, 200) -> 0.36, + NearestNeighborsQuery.AngularLsh(vecField, 400) -> 0.52, + NearestNeighborsQuery.AngularLsh(vecField, 800) -> 0.74 ) ), // L2 Lsh Test( Mapping.L2Lsh(dims, 600, 1, 4), Seq( - NearestNeighborsQuery.Exact(fieldName, Similarity.L1) -> 1d, - NearestNeighborsQuery.Exact(fieldName, Similarity.L2) -> 1d, - NearestNeighborsQuery.Exact(fieldName, Similarity.Angular) -> 1d, - NearestNeighborsQuery.L2Lsh(fieldName, 200) -> 0.13, - NearestNeighborsQuery.L2Lsh(fieldName, 400) -> 0.24, - NearestNeighborsQuery.L2Lsh(fieldName, 800) -> 0.44 + NearestNeighborsQuery.Exact(vecField, Similarity.L1) -> 1d, + NearestNeighborsQuery.Exact(vecField, Similarity.L2) -> 1d, + NearestNeighborsQuery.Exact(vecField, Similarity.Angular) -> 1d, + NearestNeighborsQuery.L2Lsh(vecField, 200) -> 0.13, + NearestNeighborsQuery.L2Lsh(vecField, 400) -> 0.24, + NearestNeighborsQuery.L2Lsh(vecField, 800) -> 0.44 ) ) ) @@ -138,16 +139,16 @@ class NearestNeighborsQueryRecallSuite extends AsyncFunSuite with Matchers with else for { _ <- eknn.execute(createIndex(corpusIndex).shards(shards).replicas(0)) - _ <- eknn.putMapping(corpusIndex, fieldName, mapping) + _ <- eknn.putMapping(corpusIndex, vecField, storedIdField, mapping) _ <- eknn.execute(createIndex(queriesIndex).shards(shards).replicas(0)) - _ <- eknn.putMapping(queriesIndex, fieldName, mapping) - _ <- Future.traverse(testData.corpus.zipWithIndex.grouped(500)) { batch => + _ <- eknn.putMapping(queriesIndex, vecField, storedIdField, mapping) + _ <- Future.traverse(testData.corpus.zipWithIndex.grouped(400)) { batch => val (vecs, ids) = (batch.map(_._1), batch.map(x => s"v${x._2}")) - eknn.index(corpusIndex, fieldName, vecs, Some(ids)) + eknn.index(corpusIndex, vecField, vecs, storedIdField, ids) } - _ <- Future.traverse(testData.queries.zipWithIndex.grouped(500)) { batch => + _ <- Future.traverse(testData.queries.zipWithIndex.grouped(400)) { batch => val (vecs, ids) = (batch.map(_._1.vector), batch.map(x => s"v${x._2}")) - eknn.index(queriesIndex, fieldName, vecs, Some(ids)) + eknn.index(queriesIndex, vecField, vecs, storedIdField, ids) } _ <- eknn.execute(refreshIndex(corpusIndex, queriesIndex)) _ <- eknn.execute(forceMerge(corpusIndex, queriesIndex).maxSegments(segmentsPerShard)) @@ -183,25 +184,25 @@ class NearestNeighborsQueryRecallSuite extends AsyncFunSuite with Matchers with val uuid = UUID.randomUUID().toString val corpusIndex = f"test-data-$uuid-c" val queriesIndex = f"test-data-$uuid-q" - val testName = f"${uuid}%-20s ${mapping.toString}%-30s ${query.toString}%-50s ~= ${expectedRecall}%-8f" + val testName = f"$uuid%-20s ${mapping.toString}%-30s ${query.toString}%-50s ~= ${expectedRecall}%-8f" // Lookup the correct results based on the similarity function. val resultsIx = testData.queries.head.results.zipWithIndex.filter(_._1.similarity == query.similarity).head._2 test(testName) { for { _ <- index(corpusIndex, queriesIndex, mapping, testData) explicitResponses1 <- Future.sequence(testData.queries.map { q => - eknn.nearestNeighbors(corpusIndex, query.withVec(q.vector), k, preference = Some("")) + eknn.nearestNeighbors(corpusIndex, query.withVec(q.vector), k, storedIdField) }) explicitResponses2 <- Future.sequence(testData.queries.map { q => - eknn.nearestNeighbors(corpusIndex, query.withVec(q.vector), k, preference = Some("")) + eknn.nearestNeighbors(corpusIndex, query.withVec(q.vector), k, storedIdField) }) explicitResponses3 <- Future.sequence(testData.queries.map { q => - eknn.nearestNeighbors(corpusIndex, query.withVec(q.vector), k, preference = Some("")) + eknn.nearestNeighbors(corpusIndex, query.withVec(q.vector), k, storedIdField) }) indexedResponses <- Future.sequence(testData.queries.zipWithIndex.map { - case (q, i) => - val vec = Vec.Indexed(queriesIndex, s"v$i", fieldName) - eknn.nearestNeighbors(corpusIndex, query.withVec(vec), k, preference = Some("")) + case (_, i) => + val vec = Vec.Indexed(queriesIndex, s"v$i", vecField) + eknn.nearestNeighbors(corpusIndex, query.withVec(vec), k, storedIdField) }) } yield { @@ -218,12 +219,12 @@ class NearestNeighborsQueryRecallSuite extends AsyncFunSuite with Matchers with } // Make sure recall is at or above expected. - withClue(s"Explicit query recall should be ${expectedRecall} +/- ${recallTolerance}") { - explicitRecall1 shouldBe expectedRecall +- (recallTolerance) + withClue(s"Explicit query recall should be $expectedRecall +/- $recallTolerance") { + explicitRecall1 shouldBe expectedRecall +- recallTolerance } - withClue(s"Indexed query recall should be ${expectedRecall} +/- ${recallTolerance}") { - indexedRecall shouldBe expectedRecall +- (recallTolerance) + withClue(s"Indexed query recall should be $expectedRecall +/- $recallTolerance") { + indexedRecall shouldBe expectedRecall +- recallTolerance } } }