Skip to content

Commit

Permalink
Add ingester close shards gRPC (#4088)
Browse files Browse the repository at this point in the history
* Add ingester close shards gRPC

* Apply suggestions from code review

Co-authored-by: Adrien Guillo <[email protected]>

* Clean.

---------

Co-authored-by: Adrien Guillo <[email protected]>
  • Loading branch information
fmassot and guilload authored Nov 9, 2023
1 parent 83c2cf5 commit beabe88
Show file tree
Hide file tree
Showing 14 changed files with 430 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ use itertools::Itertools;
use quickwit_common::{PrettySample, Progress};
use quickwit_ingest::IngesterPool;
use quickwit_proto::control_plane::{
ClosedShards, ControlPlaneError, ControlPlaneResult, GetOrCreateOpenShardsFailure,
ControlPlaneError, ControlPlaneResult, GetOrCreateOpenShardsFailure,
GetOrCreateOpenShardsFailureReason, GetOrCreateOpenShardsRequest,
GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSuccess,
};
use quickwit_proto::ingest::ingester::{IngesterService, PingRequest};
use quickwit_proto::ingest::{IngestV2Error, ShardState};
use quickwit_proto::ingest::{ClosedShards, IngestV2Error, ShardState};
use quickwit_proto::metastore;
use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient};
use quickwit_proto::types::{IndexUid, NodeId};
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,13 +513,13 @@ async fn fault_tolerant_fetch_task(
}

#[derive(Debug, Clone, Copy)]
struct FetchRange {
pub(super) struct FetchRange {
from_position_exclusive_opt: Option<u64>,
to_position_inclusive_opt: Option<u64>,
}

impl FetchRange {
fn new(from_position_exclusive: Position, to_position_inclusive: Position) -> Self {
pub(super) fn new(from_position_exclusive: Position, to_position_inclusive: Position) -> Self {
Self {
from_position_exclusive_opt: from_position_exclusive.as_u64(),
to_position_inclusive_opt: to_position_inclusive.as_u64(),
Expand Down
158 changes: 147 additions & 11 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ use mrecordlog::MultiRecordLog;
use quickwit_common::tower::Pool;
use quickwit_common::ServiceStream;
use quickwit_proto::ingest::ingester::{
AckReplicationMessage, FetchResponseV2, IngesterService, IngesterServiceClient,
IngesterServiceStream, OpenFetchStreamRequest, OpenReplicationStreamRequest,
OpenReplicationStreamResponse, PersistFailure, PersistFailureReason, PersistRequest,
PersistResponse, PersistSuccess, PingRequest, PingResponse, ReplicateRequest,
ReplicateSubrequest, SynReplicationMessage, TruncateRequest, TruncateResponse,
AckReplicationMessage, CloseShardsRequest, CloseShardsResponse, FetchResponseV2,
IngesterService, IngesterServiceClient, IngesterServiceStream, OpenFetchStreamRequest,
OpenReplicationStreamRequest, OpenReplicationStreamResponse, PersistFailure,
PersistFailureReason, PersistRequest, PersistResponse, PersistSuccess, PingRequest,
PingResponse, ReplicateRequest, ReplicateSubrequest, SynReplicationMessage, TruncateRequest,
TruncateResponse,
};
use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, ShardState};
use quickwit_proto::types::{NodeId, Position, QueueId};
Expand Down Expand Up @@ -396,11 +397,6 @@ impl IngesterService for Ingester {
persist_successes.push(persist_success);
}
}
let _state_guard = self.state.write().await;

for persist_success in &persist_successes {
let _queue_id = persist_success.queue_id();
}
let leader_id = self.self_node_id.to_string();
let persist_response = PersistResponse {
leader_id,
Expand Down Expand Up @@ -537,6 +533,29 @@ impl IngesterService for Ingester {
let truncate_response = TruncateResponse {};
Ok(truncate_response)
}

async fn close_shards(
&mut self,
close_shards_request: CloseShardsRequest,
) -> IngestV2Result<CloseShardsResponse> {
let mut state_guard = self.state.write().await;
for close_shard in close_shards_request.closed_shards {
for queue_id in close_shard.queue_ids() {
if !state_guard.mrecordlog.queue_exists(&queue_id) {
continue;
}
append_eof_record_if_necessary(&mut state_guard.mrecordlog, &queue_id).await;
let shard = state_guard
.shards
.get_mut(&queue_id)
.expect("shard must exist");
// Notify fetch task.
shard.notify_new_records();
shard.close();
}
}
Ok(CloseShardsResponse {})
}
}

/// Appends an EOF record to the queue if the it is empty or the last record is not an EOF
Expand Down Expand Up @@ -574,11 +593,12 @@ mod tests {
IngesterServiceGrpcServer, IngesterServiceGrpcServerAdapter, PersistSubrequest,
TruncateSubrequest,
};
use quickwit_proto::ingest::DocBatchV2;
use quickwit_proto::ingest::{ClosedShards, DocBatchV2};
use quickwit_proto::types::queue_id;
use tonic::transport::{Endpoint, Server};

use super::*;
use crate::ingest_v2::fetch::FetchRange;
use crate::ingest_v2::test_utils::{IngesterShardTestExt, MultiRecordLogTestExt};

#[tokio::test]
Expand Down Expand Up @@ -1248,4 +1268,120 @@ mod tests {
.mrecordlog
.assert_records_eq(&queue_id_02, .., &[]);
}

#[tokio::test]
async fn test_ingester_close_shards() {
let tempdir = tempfile::tempdir().unwrap();
let self_node_id: NodeId = "test-ingester-0".into();
let ingester_pool = IngesterPool::default();
let wal_dir_path = tempdir.path();
let replication_factor = 1;
let mut ingester = Ingester::try_new(
self_node_id.clone(),
ingester_pool,
wal_dir_path,
replication_factor,
)
.await
.unwrap();

let queue_id_01 = queue_id("test-index:0", "test-source:0", 1);
let queue_id_02 = queue_id("test-index:0", "test-source:0", 2);
let queue_id_03 = queue_id("test-index:1", "test-source:1", 3);

let mut state_guard = ingester.state.write().await;
for queue_id in &[&queue_id_01, &queue_id_02, &queue_id_03] {
ingester
.create_shard(&mut state_guard, queue_id, &self_node_id, None)
.await
.unwrap();
let records = [
MRecord::new_doc("test-doc-010").encode(),
MRecord::new_doc("test-doc-011").encode(),
]
.into_iter();
state_guard
.mrecordlog
.append_records(&queue_id_01, None, records)
.await
.unwrap();
}

drop(state_guard);

let client_id = "test-client".to_string();
let open_fetch_stream_request = OpenFetchStreamRequest {
client_id: client_id.clone(),
index_uid: "test-index:0".to_string(),
source_id: "test-source:0".to_string(),
shard_id: 1,
from_position_exclusive: None,
to_position_inclusive: None,
};

let mut fetch_stream = ingester
.open_fetch_stream(open_fetch_stream_request)
.await
.unwrap();
let fetch_response = fetch_stream.next().await.unwrap().unwrap();
assert_eq!(
fetch_response.from_position_exclusive(),
Position::Beginning
);

let close_shard_1 = ClosedShards {
index_uid: "test-index:0".to_string(),
source_id: "test-source:0".to_string(),
shard_ids: vec![1, 2],
};
let close_shard_2 = ClosedShards {
index_uid: "test-index:1".to_string(),
source_id: "test-source:1".to_string(),
shard_ids: vec![3],
};
let close_shard_with_no_queue = ClosedShards {
index_uid: "test-index:2".to_string(),
source_id: "test-source:2".to_string(),
shard_ids: vec![4],
};
let closed_shards = vec![
close_shard_1.clone(),
close_shard_2.clone(),
close_shard_with_no_queue,
];
let close_shards_request = CloseShardsRequest {
closed_shards: closed_shards.clone(),
};
ingester.close_shards(close_shards_request).await.unwrap();

// Check that shards are closed and EOF records are appended.
let state_guard = ingester.state.read().await;
for shard in state_guard.shards.values() {
shard.assert_is_closed();
}
for closed_shards in [&close_shard_1, &close_shard_2] {
for queue_id in closed_shards.queue_ids() {
let last_position = state_guard
.mrecordlog
.range(
&queue_id,
FetchRange::new(Position::Beginning, Position::Beginning),
)
.unwrap()
.last()
.unwrap();
assert!(is_eof_mrecord(&last_position.1));
}
}

// Check that fetch task is notified.
// Note: fetch stream should not block if the close shard call notified the fetch task.
let fetch_response =
tokio::time::timeout(std::time::Duration::from_millis(50), fetch_stream.next())
.await
.unwrap()
.unwrap()
.unwrap();
assert_eq!(fetch_response.to_position_inclusive(), Position::Eof);
}
}
9 changes: 9 additions & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,15 @@ impl IngesterShard {
.is_closed()
}

pub fn close(&mut self) {
let shard_state = match self {
IngesterShard::Primary(primary_shard) => &mut primary_shard.shard_state,
IngesterShard::Replica(replica_shard) => &mut replica_shard.shard_state,
IngesterShard::Solo(solo_shard) => &mut solo_shard.shard_state,
};
*shard_state = ShardState::Closed;
}

pub fn replication_position_inclusive(&self) -> Position {
match self {
IngesterShard::Primary(primary_shard) => &primary_shard.replication_position_inclusive,
Expand Down
44 changes: 21 additions & 23 deletions quickwit/quickwit-ingest/src/ingest_v2/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,24 +323,23 @@ impl ReplicationTask {
let from_position_exclusive = subrequest.from_position_exclusive();
let to_position_inclusive = subrequest.to_position_inclusive();

let replica_shard: &mut IngesterShard =
if from_position_exclusive == Position::Beginning {
// Initialize the replica shard and corresponding mrecordlog queue.
state_guard
.mrecordlog
.create_queue(&queue_id)
.await
.expect("TODO");
let leader_id: NodeId = replicate_request.leader_id.clone().into();
let replica_shard = ReplicaShard::new(leader_id);
let shard = IngesterShard::Replica(replica_shard);
state_guard.shards.entry(queue_id.clone()).or_insert(shard)
} else {
state_guard
.shards
.get_mut(&queue_id)
.expect("replica shard should be initialized")
};
let replica_shard: &IngesterShard = if from_position_exclusive == Position::Beginning {
// Initialize the replica shard and corresponding mrecordlog queue.
state_guard
.mrecordlog
.create_queue(&queue_id)
.await
.expect("TODO");
let leader_id: NodeId = replicate_request.leader_id.clone().into();
let replica_shard = ReplicaShard::new(leader_id);
let shard = IngesterShard::Replica(replica_shard);
state_guard.shards.entry(queue_id.clone()).or_insert(shard)
} else {
state_guard
.shards
.get(&queue_id)
.expect("replica shard should be initialized")
};
if replica_shard.is_closed() {
// TODO
}
Expand Down Expand Up @@ -380,17 +379,16 @@ impl ReplicationTask {
.replicated_num_docs_total
.inc_by(batch_num_docs);

let replica_shard = state_guard
.shards
.get_mut(&queue_id)
.expect("replica shard should exist");

if current_position_inclusive != to_position_inclusive {
return Err(IngestV2Error::Internal(format!(
"bad replica position: expected {to_position_inclusive:?}, got \
{current_position_inclusive:?}"
)));
}
let replica_shard = state_guard
.shards
.get_mut(&queue_id)
.expect("replica shard should be initialized");
replica_shard.set_replication_position_inclusive(current_position_inclusive.clone());

let replicate_success = ReplicateSuccess {
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use async_trait::async_trait;
use futures::stream::FuturesUnordered;
use futures::{Future, StreamExt};
use quickwit_proto::control_plane::{
ClosedShards, ControlPlaneService, ControlPlaneServiceClient, GetOrCreateOpenShardsRequest,
ControlPlaneService, ControlPlaneServiceClient, GetOrCreateOpenShardsRequest,
GetOrCreateOpenShardsSubrequest,
};
use quickwit_proto::ingest::ingester::{
Expand All @@ -35,7 +35,7 @@ use quickwit_proto::ingest::ingester::{
use quickwit_proto::ingest::router::{
IngestRequestV2, IngestResponseV2, IngestRouterService, IngestSubrequest,
};
use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result};
use quickwit_proto::ingest::{ClosedShards, CommitTypeV2, IngestV2Error, IngestV2Result};
use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceId, SubrequestId};
use tokio::sync::RwLock;
use tracing::{error, info, warn};
Expand Down
3 changes: 1 addition & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/shard_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicUsize, Ordering};

use quickwit_proto::control_plane::ClosedShards;
use quickwit_proto::ingest::{Shard, ShardState};
use quickwit_proto::ingest::{ClosedShards, Shard, ShardState};
use quickwit_proto::types::{IndexId, IndexUid, NodeId, ShardId, SourceId};
use tracing::warn;

Expand Down
8 changes: 1 addition & 7 deletions quickwit/quickwit-proto/protos/quickwit/control_plane.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ service ControlPlaneService {

message GetOrCreateOpenShardsRequest {
repeated GetOrCreateOpenShardsSubrequest subrequests = 1;
repeated ClosedShards closed_shards = 2;
repeated quickwit.ingest.ClosedShards closed_shards = 2;
repeated string unavailable_leaders = 3;
}

Expand All @@ -75,12 +75,6 @@ message GetOrCreateOpenShardsSubrequest {
string source_id = 3;
}

message ClosedShards {
string index_uid = 1;
string source_id = 2;
repeated uint64 shard_ids = 3;
}

message GetOrCreateOpenShardsResponse {
repeated GetOrCreateOpenShardsSuccess successes = 1;
repeated GetOrCreateOpenShardsFailure failures = 2;
Expand Down
6 changes: 6 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/ingest.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,9 @@ message Shard {
// For instance, if an indexer goes rogue, eventually the control plane will detect it and assign the shard to another indexer, which will override the publish token.
optional string publish_token = 10;
}

message ClosedShards {
string index_uid = 1;
string source_id = 2;
repeated uint64 shard_ids = 3;
}
9 changes: 9 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/ingester.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ service IngesterService {

// rpc OpenWatchStream(OpenWatchStreamRequest) returns (stream WatchMessage);

rpc CloseShards(CloseShardsRequest) returns (CloseShardsResponse);

// Pings an ingester to check if it is ready to host shards and serve requests.
rpc Ping(PingRequest) returns (PingResponse);

Expand Down Expand Up @@ -186,6 +188,13 @@ message FetchResponseV2 {
quickwit.ingest.Position to_position_inclusive = 6;
}

message CloseShardsRequest {
repeated quickwit.ingest.ClosedShards closed_shards = 1;
}

message CloseShardsResponse {
}

message PingRequest {
string leader_id = 1;
optional string follower_id = 2;
Expand Down
Loading

0 comments on commit beabe88

Please sign in to comment.