Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sparse RangeSearch/AnnIterator to return raw distance #944

Merged
merged 2 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion src/index/sparse/sparse_index_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,47 @@ class SparseInvertedIndexNode : public IndexNode {
return GenResultDataSet(nq, k, p_id.release(), p_dist.release());
}

private:
class RefineIterator : public IndexIterator {
public:
RefineIterator(const sparse::BaseInvertedIndex<T>* index, sparse::SparseRow<T>&& query,
std::shared_ptr<PrecomputedDistanceIterator> precomputed_it,
const sparse::DocValueComputer<T>& computer, const float refine_ratio = 0.5f)
: IndexIterator(true, refine_ratio),
index_(index),
query_(std::move(query)),
computer_(computer),
precomputed_it_(precomputed_it) {
}

protected:
// returns n_rows / 10 DistId for the first time to create a large enough window for refinement.
void
next_batch(std::function<void(const std::vector<DistId>&)> batch_handler) override {
std::vector<DistId> dists;
size_t num = first_return_ ? (std::max(index_->n_rows() / 10, static_cast<size_t>(20))) : 1;
first_return_ = false;
for (size_t i = 0; i < num && precomputed_it_->HasNext(); ++i) {
auto [id, dist] = precomputed_it_->Next();
dists.emplace_back(id, dist);
}
batch_handler(dists);
}

float
raw_distance(int64_t id) override {
return index_->GetRawDistance(id, query_, computer_);
}

private:
const sparse::BaseInvertedIndex<T>* index_;
sparse::SparseRow<T> query_;
const sparse::DocValueComputer<T> computer_;
std::shared_ptr<PrecomputedDistanceIterator> precomputed_it_;
bool first_return_ = true;
};

public:
// TODO: for now inverted index and wand use the same impl for AnnIterator.
[[nodiscard]] expected<std::vector<IndexNode::IteratorPtr>>
AnnIterator(const DataSetPtr dataset, std::unique_ptr<Config> config, const BitsetView& bitset) const override {
Expand All @@ -142,13 +183,23 @@ class SparseInvertedIndexNode : public IndexNode {
auto computer = computer_or.value();
auto drop_ratio_search = cfg.drop_ratio_search.value_or(0.0f);

const bool approximated = index_->IsApproximated() || drop_ratio_search > 0;

auto vec = std::vector<std::shared_ptr<IndexNode::iterator>>(nq, nullptr);
std::vector<folly::Future<folly::Unit>> futs;
futs.reserve(nq);
for (int i = 0; i < nq; ++i) {
futs.emplace_back(search_pool_->push([&, i]() {
vec[i] = std::make_shared<PrecomputedDistanceIterator>(
auto it = std::make_shared<PrecomputedDistanceIterator>(
index_->GetAllDistances(queries[i], drop_ratio_search, bitset, computer), true);
if (!approximated || queries[i].size() == 0) {
vec[i] = it;
} else {
sparse::SparseRow<T> query_copy(queries[i]);
auto refine_it = std::make_shared<RefineIterator>(index_, std::move(query_copy), it, computer);
refine_it->initialize();
vec[i] = std::move(refine_it);
}
}));
}
WaitAllSuccess(futs);
Expand Down Expand Up @@ -372,6 +423,11 @@ class SparseInvertedIndexNodeCC : public SparseInvertedIndexNode<T, use_wand> {
expected<std::vector<IndexNode::IteratorPtr>>
AnnIterator(const DataSetPtr dataset, std::unique_ptr<Config> cfg, const BitsetView& bitset) const override {
ReadPermission permission(*this);
// Always uses PrecomputedDistanceIterator for SparseInvertedIndexNodeCC:
// If we want to use RefineIterator, it needs to get another ReadPermission when calling
// index_->GetRawDistance(). If an Add task is added in between, there will be a deadlock.
auto config = static_cast<const knowhere::SparseInvertedIndexConfig&>(*cfg);
config.drop_ratio_search = 0.0f;
return SparseInvertedIndexNode<T, use_wand>::AnnIterator(dataset, std::move(cfg), bitset);
}

Expand Down
28 changes: 18 additions & 10 deletions src/index/sparse/sparse_inverted_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,18 @@ class BaseInvertedIndex {
GetAllDistances(const SparseRow<T>& query, float drop_ratio_search, const BitsetView& bitset,
const DocValueComputer<T>& computer) const = 0;

virtual float
GetRawDistance(const label_t id, const SparseRow<T>& query, const DocValueComputer<T>& computer) const = 0;

virtual void
GetVectorById(const label_t id, SparseRow<T>& output) const = 0;

virtual expected<DocValueComputer<T>>
GetDocValueComputer(const SparseInvertedIndexConfig& cfg) const = 0;

virtual bool
IsApproximated() const = 0;

[[nodiscard]] virtual size_t
size() const = 0;

Expand Down Expand Up @@ -162,7 +168,6 @@ class InvertedIndex : public BaseInvertedIndex<T> {
*
* Data are densely packed in serialized bytes and no padding is added.
*/
std::shared_lock<std::shared_mutex> lock(mu_);
writeBinaryPOD(writer, n_rows_internal());
writeBinaryPOD(writer, n_cols_internal());
writeBinaryPOD(writer, value_threshold_);
Expand All @@ -180,7 +185,6 @@ class InvertedIndex : public BaseInvertedIndex<T> {
Status
Load(MemoryIOReader& reader, int map_flags = MAP_SHARED,
const std::string& supplement_target_filename = "") override {
std::unique_lock<std::shared_mutex> lock(mu_);
int64_t rows;
readBinaryPOD(reader, rows);
// previous versions used the signness of rows to indicate whether to
Expand Down Expand Up @@ -364,7 +368,6 @@ class InvertedIndex : public BaseInvertedIndex<T> {
}
std::nth_element(vals.begin(), pos, vals.end());

std::unique_lock<std::shared_mutex> lock(mu_);
value_threshold_ = *pos;
drop_during_build_ = true;
return Status::success;
Expand All @@ -376,7 +379,6 @@ class InvertedIndex : public BaseInvertedIndex<T> {
if constexpr (mmapped) {
throw std::invalid_argument("mmapped InvertedIndex does not support Add");
} else {
std::unique_lock<std::shared_mutex> lock(mu_);
auto current_rows = n_rows_internal();
if (current_rows > 0 && drop_during_build_) {
LOG_KNOWHERE_ERROR_ << "Not allowed to add data to a built index with drop_ratio_build > 0.";
Expand Down Expand Up @@ -415,7 +417,6 @@ class InvertedIndex : public BaseInvertedIndex<T> {
std::nth_element(values.begin(), pos, values.end());
auto q_threshold = *pos;

std::shared_lock<std::shared_mutex> lock(mu_);
// if no data was dropped during both build and search, no refinement is
// needed.
if (!drop_during_build_ && drop_ratio_search == 0) {
Expand All @@ -435,6 +436,7 @@ class InvertedIndex : public BaseInvertedIndex<T> {
}
}

// Returned distances are inaccurate based on the drop_ratio.
std::vector<float>
GetAllDistances(const SparseRow<T>& query, float drop_ratio_search, const BitsetView& bitset,
const DocValueComputer<T>& computer) const override {
Expand All @@ -448,7 +450,6 @@ class InvertedIndex : public BaseInvertedIndex<T> {
auto pos = values.begin() + static_cast<size_t>(drop_ratio_search * values.size());
std::nth_element(values.begin(), pos, values.end());
auto q_threshold = *pos;
std::shared_lock<std::shared_mutex> lock(mu_);
auto distances = compute_all_distances(query, q_threshold, computer);
for (size_t i = 0; i < distances.size(); ++i) {
if (bitset.empty() || !bitset.test(i)) {
Expand All @@ -459,14 +460,19 @@ class InvertedIndex : public BaseInvertedIndex<T> {
return distances;
}

float
GetRawDistance(const label_t id, const SparseRow<T>& query, const DocValueComputer<T>& computer) const override {
T doc_sum = bm25 ? bm25_params_->row_sums.at(id) : 0;
return query.dot(raw_data_[id], computer, doc_sum);
}

void
GetVectorById(const label_t id, SparseRow<T>& output) const override {
output = raw_data_[id];
}

[[nodiscard]] size_t
size() const override {
std::shared_lock<std::shared_mutex> lock(mu_);
size_t res = sizeof(*this);
for (size_t i = 0; i < raw_data_.size(); ++i) {
res += raw_data_[i].memory_usage();
Expand All @@ -492,16 +498,19 @@ class InvertedIndex : public BaseInvertedIndex<T> {

[[nodiscard]] size_t
n_rows() const override {
std::shared_lock<std::shared_mutex> lock(mu_);
return n_rows_internal();
}

[[nodiscard]] size_t
n_cols() const override {
std::shared_lock<std::shared_mutex> lock(mu_);
return n_cols_internal();
}

[[nodiscard]] virtual bool
IsApproximated() const override {
return drop_during_build_;
}

private:
size_t
n_rows_internal() const {
Expand Down Expand Up @@ -764,7 +773,6 @@ class InvertedIndex : public BaseInvertedIndex<T> {

// reserve, [], size, emplace_back
Vector<SparseRow<T>> raw_data_;
mutable std::shared_mutex mu_;

Vector<Vector<SparseIdVal<T>>> inverted_lut_;
// If we want to drop small values during build, we must first train the
Expand Down
12 changes: 9 additions & 3 deletions tests/ut/test_sparse.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,18 +222,24 @@ TEST_CASE("Test Mem Sparse Index With Float Vector", "[float metrics]") {
REQUIRE(iterators_or.has_value());
auto& iterators = iterators_or.value();
REQUIRE(iterators.size() == (size_t)nq);
// verify the distances are monotonic decreasing, as INDEX_SPARSE_INVERTED_INDEX and INDEX_SPARSE_WAND
// performs exausitive search for iterator.

int count = 0;
int out_of_order = 0;
for (int i = 0; i < nq; ++i) {
auto& iter = iterators[i];
float prev_dist = std::numeric_limits<float>::max();
while (iter->HasNext()) {
auto [id, dist] = iter->Next();
REQUIRE(!bitset.test(id));
REQUIRE(prev_dist >= dist);
count++;
if (prev_dist < dist) {
out_of_order++;
}
prev_dist = dist;
}
}
// less than 5% of the distances are out of order.
REQUIRE(out_of_order * 20 <= count);
}

SECTION("Test Sparse Range Search") {
Expand Down
Loading