Skip to content

Commit

Permalink
Makes it possible to register batching listeners
Browse files Browse the repository at this point in the history
Upon reset_node and apply_update, we want to be
able to react to a batch of changes rather than individual changes.

In quickwit for instance, we want to be able to react to the reception of a batch of deleted shard and group this reaction into a single metastore call.
  • Loading branch information
fulmicoton committed May 2, 2024
1 parent f783620 commit 0046e5a
Show file tree
Hide file tree
Showing 6 changed files with 1,028 additions and 498 deletions.
6 changes: 2 additions & 4 deletions chitchat-test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ impl Api {
#[oai(path = "/set_kv/", method = "get")]
async fn set_kv(&self, key: Query<String>, value: Query<String>) -> Json<serde_json::Value> {
let mut chitchat_guard = self.chitchat.lock().await;

let cc_state = chitchat_guard.self_node_state();
let mut cc_state = chitchat_guard.self_node_state();
cc_state.set(key.as_str(), value.as_str());

Json(serde_json::to_value(&SetKeyValueResponse { status: true }).unwrap())
Expand All @@ -48,8 +47,7 @@ impl Api {
#[oai(path = "/mark_for_deletion/", method = "get")]
async fn mark_for_deletion(&self, key: Query<String>) -> Json<serde_json::Value> {
let mut chitchat_guard = self.chitchat.lock().await;

let cc_state = chitchat_guard.self_node_state();
let mut cc_state = chitchat_guard.self_node_state();
cc_state.delete(key.as_str());
Json(serde_json::to_value(&SetKeyValueResponse { status: true }).unwrap())
}
Expand Down
12 changes: 7 additions & 5 deletions chitchat/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl Delta {
pub(crate) fn num_tuples(&self) -> usize {
self.node_deltas
.iter()
.map(|node_delta| node_delta.num_tuples())
.map(|node_delta| node_delta.key_values.len())
.sum()
}

Expand Down Expand Up @@ -323,10 +323,12 @@ pub(crate) struct NodeDelta {
pub max_version: Option<Version>,
}

#[cfg(test)]
impl NodeDelta {
pub fn num_tuples(&self) -> usize {
self.key_values.len()
impl IntoIterator for NodeDelta {
type Item = KeyValueMutation;
type IntoIter = std::vec::IntoIter<Self::Item>;

fn into_iter(self) -> Self::IntoIter {
self.key_values.into_iter()
}
}

Expand Down
Loading

0 comments on commit 0046e5a

Please sign in to comment.