Skip to content

Commit

Permalink
sparse RangeSearch/AnnIterator to return raw distance (zilliztech#944)
Browse files Browse the repository at this point in the history
* sparse: make the distance returned by RangeSearch and AnnIterator to be the raw instead of the quantized distance

Signed-off-by: Buqian Zheng <[email protected]>

* sparse: remove mutex in the index: we now use CC index if concurrent read/write is needed

Signed-off-by: Buqian Zheng <[email protected]>

---------

Signed-off-by: Buqian Zheng <[email protected]>
  • Loading branch information
zhengbuqian authored and foxspy committed Nov 18, 2024
1 parent ba400d1 commit 98aa623
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 14 deletions.
58 changes: 57 additions & 1 deletion src/index/sparse/sparse_index_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,47 @@ class SparseInvertedIndexNode : public IndexNode {
return GenResultDataSet(nq, k, p_id.release(), p_dist.release());
}

private:
class RefineIterator : public IndexIterator {
public:
RefineIterator(const sparse::BaseInvertedIndex<T>* index, sparse::SparseRow<T>&& query,
std::shared_ptr<PrecomputedDistanceIterator> precomputed_it,
const sparse::DocValueComputer<T>& computer, const float refine_ratio = 0.5f)
: IndexIterator(true, refine_ratio),
index_(index),
query_(std::move(query)),
computer_(computer),
precomputed_it_(precomputed_it) {
}

protected:
// returns n_rows / 10 DistId for the first time to create a large enough window for refinement.
void
next_batch(std::function<void(const std::vector<DistId>&)> batch_handler) override {
std::vector<DistId> dists;
size_t num = first_return_ ? (std::max(index_->n_rows() / 10, static_cast<size_t>(20))) : 1;
first_return_ = false;
for (size_t i = 0; i < num && precomputed_it_->HasNext(); ++i) {
auto [id, dist] = precomputed_it_->Next();
dists.emplace_back(id, dist);
}
batch_handler(dists);
}

float
raw_distance(int64_t id) override {
return index_->GetRawDistance(id, query_, computer_);
}

private:
const sparse::BaseInvertedIndex<T>* index_;
sparse::SparseRow<T> query_;
const sparse::DocValueComputer<T> computer_;
std::shared_ptr<PrecomputedDistanceIterator> precomputed_it_;
bool first_return_ = true;
};

public:
// TODO: for now inverted index and wand use the same impl for AnnIterator.
[[nodiscard]] expected<std::vector<IndexNode::IteratorPtr>>
AnnIterator(const DataSetPtr dataset, std::unique_ptr<Config> config, const BitsetView& bitset) const override {
Expand All @@ -142,13 +183,23 @@ class SparseInvertedIndexNode : public IndexNode {
auto computer = computer_or.value();
auto drop_ratio_search = cfg.drop_ratio_search.value_or(0.0f);

const bool approximated = index_->IsApproximated() || drop_ratio_search > 0;

auto vec = std::vector<std::shared_ptr<IndexNode::iterator>>(nq, nullptr);
std::vector<folly::Future<folly::Unit>> futs;
futs.reserve(nq);
for (int i = 0; i < nq; ++i) {
futs.emplace_back(search_pool_->push([&, i]() {
vec[i] = std::make_shared<PrecomputedDistanceIterator>(
auto it = std::make_shared<PrecomputedDistanceIterator>(
index_->GetAllDistances(queries[i], drop_ratio_search, bitset, computer), true);
if (!approximated || queries[i].size() == 0) {
vec[i] = it;
} else {
sparse::SparseRow<T> query_copy(queries[i]);
auto refine_it = std::make_shared<RefineIterator>(index_, std::move(query_copy), it, computer);
refine_it->initialize();
vec[i] = std::move(refine_it);
}
}));
}
WaitAllSuccess(futs);
Expand Down Expand Up @@ -372,6 +423,11 @@ class SparseInvertedIndexNodeCC : public SparseInvertedIndexNode<T, use_wand> {
expected<std::vector<IndexNode::IteratorPtr>>
AnnIterator(const DataSetPtr dataset, std::unique_ptr<Config> cfg, const BitsetView& bitset) const override {
ReadPermission permission(*this);
// Always uses PrecomputedDistanceIterator for SparseInvertedIndexNodeCC:
// If we want to use RefineIterator, it needs to get another ReadPermission when calling
// index_->GetRawDistance(). If an Add task is added in between, there will be a deadlock.
auto config = static_cast<const knowhere::SparseInvertedIndexConfig&>(*cfg);
config.drop_ratio_search = 0.0f;
return SparseInvertedIndexNode<T, use_wand>::AnnIterator(dataset, std::move(cfg), bitset);
}

Expand Down
28 changes: 18 additions & 10 deletions src/index/sparse/sparse_inverted_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,18 @@ class BaseInvertedIndex {
GetAllDistances(const SparseRow<T>& query, float drop_ratio_search, const BitsetView& bitset,
const DocValueComputer<T>& computer) const = 0;

virtual float
GetRawDistance(const label_t id, const SparseRow<T>& query, const DocValueComputer<T>& computer) const = 0;

virtual void
GetVectorById(const label_t id, SparseRow<T>& output) const = 0;

virtual expected<DocValueComputer<T>>
GetDocValueComputer(const SparseInvertedIndexConfig& cfg) const = 0;

virtual bool
IsApproximated() const = 0;

[[nodiscard]] virtual size_t
size() const = 0;

Expand Down Expand Up @@ -162,7 +168,6 @@ class InvertedIndex : public BaseInvertedIndex<T> {
*
* Data are densely packed in serialized bytes and no padding is added.
*/
std::shared_lock<std::shared_mutex> lock(mu_);
writeBinaryPOD(writer, n_rows_internal());
writeBinaryPOD(writer, n_cols_internal());
writeBinaryPOD(writer, value_threshold_);
Expand All @@ -180,7 +185,6 @@ class InvertedIndex : public BaseInvertedIndex<T> {
Status
Load(MemoryIOReader& reader, int map_flags = MAP_SHARED,
const std::string& supplement_target_filename = "") override {
std::unique_lock<std::shared_mutex> lock(mu_);
int64_t rows;
readBinaryPOD(reader, rows);
// previous versions used the signness of rows to indicate whether to
Expand Down Expand Up @@ -364,7 +368,6 @@ class InvertedIndex : public BaseInvertedIndex<T> {
}
std::nth_element(vals.begin(), pos, vals.end());

std::unique_lock<std::shared_mutex> lock(mu_);
value_threshold_ = *pos;
drop_during_build_ = true;
return Status::success;
Expand All @@ -376,7 +379,6 @@ class InvertedIndex : public BaseInvertedIndex<T> {
if constexpr (mmapped) {
throw std::invalid_argument("mmapped InvertedIndex does not support Add");
} else {
std::unique_lock<std::shared_mutex> lock(mu_);
auto current_rows = n_rows_internal();
if (current_rows > 0 && drop_during_build_) {
LOG_KNOWHERE_ERROR_ << "Not allowed to add data to a built index with drop_ratio_build > 0.";
Expand Down Expand Up @@ -415,7 +417,6 @@ class InvertedIndex : public BaseInvertedIndex<T> {
std::nth_element(values.begin(), pos, values.end());
auto q_threshold = *pos;

std::shared_lock<std::shared_mutex> lock(mu_);
// if no data was dropped during both build and search, no refinement is
// needed.
if (!drop_during_build_ && drop_ratio_search == 0) {
Expand All @@ -435,6 +436,7 @@ class InvertedIndex : public BaseInvertedIndex<T> {
}
}

// Returned distances are inaccurate based on the drop_ratio.
std::vector<float>
GetAllDistances(const SparseRow<T>& query, float drop_ratio_search, const BitsetView& bitset,
const DocValueComputer<T>& computer) const override {
Expand All @@ -448,7 +450,6 @@ class InvertedIndex : public BaseInvertedIndex<T> {
auto pos = values.begin() + static_cast<size_t>(drop_ratio_search * values.size());
std::nth_element(values.begin(), pos, values.end());
auto q_threshold = *pos;
std::shared_lock<std::shared_mutex> lock(mu_);
auto distances = compute_all_distances(query, q_threshold, computer);
for (size_t i = 0; i < distances.size(); ++i) {
if (bitset.empty() || !bitset.test(i)) {
Expand All @@ -459,14 +460,19 @@ class InvertedIndex : public BaseInvertedIndex<T> {
return distances;
}

float
GetRawDistance(const label_t id, const SparseRow<T>& query, const DocValueComputer<T>& computer) const override {
T doc_sum = bm25 ? bm25_params_->row_sums.at(id) : 0;
return query.dot(raw_data_[id], computer, doc_sum);
}

void
GetVectorById(const label_t id, SparseRow<T>& output) const override {
output = raw_data_[id];
}

[[nodiscard]] size_t
size() const override {
std::shared_lock<std::shared_mutex> lock(mu_);
size_t res = sizeof(*this);
for (size_t i = 0; i < raw_data_.size(); ++i) {
res += raw_data_[i].memory_usage();
Expand All @@ -492,16 +498,19 @@ class InvertedIndex : public BaseInvertedIndex<T> {

[[nodiscard]] size_t
n_rows() const override {
std::shared_lock<std::shared_mutex> lock(mu_);
return n_rows_internal();
}

[[nodiscard]] size_t
n_cols() const override {
std::shared_lock<std::shared_mutex> lock(mu_);
return n_cols_internal();
}

[[nodiscard]] virtual bool
IsApproximated() const override {
return drop_during_build_;
}

private:
size_t
n_rows_internal() const {
Expand Down Expand Up @@ -764,7 +773,6 @@ class InvertedIndex : public BaseInvertedIndex<T> {

// reserve, [], size, emplace_back
Vector<SparseRow<T>> raw_data_;
mutable std::shared_mutex mu_;

Vector<Vector<SparseIdVal<T>>> inverted_lut_;
// If we want to drop small values during build, we must first train the
Expand Down
12 changes: 9 additions & 3 deletions tests/ut/test_sparse.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,18 +222,24 @@ TEST_CASE("Test Mem Sparse Index With Float Vector", "[float metrics]") {
REQUIRE(iterators_or.has_value());
auto& iterators = iterators_or.value();
REQUIRE(iterators.size() == (size_t)nq);
// verify the distances are monotonic decreasing, as INDEX_SPARSE_INVERTED_INDEX and INDEX_SPARSE_WAND
// performs exausitive search for iterator.

int count = 0;
int out_of_order = 0;
for (int i = 0; i < nq; ++i) {
auto& iter = iterators[i];
float prev_dist = std::numeric_limits<float>::max();
while (iter->HasNext()) {
auto [id, dist] = iter->Next();
REQUIRE(!bitset.test(id));
REQUIRE(prev_dist >= dist);
count++;
if (prev_dist < dist) {
out_of_order++;
}
prev_dist = dist;
}
}
// less than 5% of the distances are out of order.
REQUIRE(out_of_order * 20 <= count);
}

SECTION("Test Sparse Range Search") {
Expand Down

0 comments on commit 98aa623

Please sign in to comment.