Skip to content

Commit

Permalink
add api endpoint to get some control-plan internal info (#4339)
Browse files Browse the repository at this point in the history
  • Loading branch information
trinity-1686a authored Jan 12, 2024
1 parent 9a6685f commit 44360ac
Show file tree
Hide file tree
Showing 9 changed files with 415 additions and 2 deletions.
49 changes: 47 additions & 2 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ use quickwit_config::SourceConfig;
use quickwit_ingest::{IngesterPool, LocalShardsUpdate};
use quickwit_metastore::IndexMetadata;
use quickwit_proto::control_plane::{
ControlPlaneError, ControlPlaneResult, GetOrCreateOpenShardsRequest,
GetOrCreateOpenShardsResponse,
ControlPlaneError, ControlPlaneResult, GetDebugStateRequest, GetDebugStateResponse,
GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsResponse, PhysicalIndexingPlanEntry,
ShardTableEntry,
};
use quickwit_proto::indexing::ShardPositionsUpdate;
use quickwit_proto::metastore::{
Expand Down Expand Up @@ -179,6 +180,37 @@ impl ControlPlane {
.schedule_indexing_plan_if_needed(&self.model);
Ok(())
}

fn debug_state(&self) -> GetDebugStateResponse {
let shard_table = self
.model
.all_shards_with_source()
.map(|(source, shards)| ShardTableEntry {
source_id: source.to_string(),
shards: shards
.map(|shard_entry| shard_entry.shard.clone())
.collect(),
})
.collect();
let physical_index_plan = self
.indexing_scheduler
.observable_state()
.last_applied_physical_plan
.map(|plan| {
plan.indexing_tasks_per_indexer()
.iter()
.map(|(node_id, tasks)| PhysicalIndexingPlanEntry {
node_id: node_id.clone(),
tasks: tasks.clone(),
})
.collect()
})
.unwrap_or_default();
GetDebugStateResponse {
shard_table,
physical_index_plan,
}
}
}

#[async_trait]
Expand Down Expand Up @@ -528,6 +560,19 @@ impl Handler<LocalShardsUpdate> for ControlPlane {
}
}

#[async_trait]
impl Handler<GetDebugStateRequest> for ControlPlane {
type Reply = ControlPlaneResult<GetDebugStateResponse>;

async fn handle(
&mut self,
_: GetDebugStateRequest,
_ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
Ok(Ok(self.debug_state()))
}
}

#[derive(Clone)]
pub struct ControlPlaneEventSubscriber(WeakMailbox<ControlPlane>);

Expand Down
6 changes: 6 additions & 0 deletions quickwit/quickwit-control-plane/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,12 @@ impl ControlPlaneModel {
self.shard_table.all_shards()
}

pub(crate) fn all_shards_with_source(
&self,
) -> impl Iterator<Item = (&SourceUid, impl Iterator<Item = &ShardEntry>)> + '_ {
self.shard_table.all_shards_with_source()
}

pub fn list_shards_for_node(
&self,
ingester: &NodeId,
Expand Down
8 changes: 8 additions & 0 deletions quickwit/quickwit-control-plane/src/model/shard_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,14 @@ impl ShardTable {
.flat_map(|table_entry| table_entry.shard_entries.values())
}

pub(crate) fn all_shards_with_source(
&self,
) -> impl Iterator<Item = (&SourceUid, impl Iterator<Item = &ShardEntry>)> + '_ {
self.table_entries
.iter()
.map(|(source, shard_table)| (source, shard_table.shard_entries.values()))
}

pub(crate) fn all_shards_mut(&mut self) -> impl Iterator<Item = &mut ShardEntry> + '_ {
self.table_entries
.values_mut()
Expand Down
22 changes: 22 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/control_plane.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ syntax = "proto3";

package quickwit.control_plane;

import "quickwit/indexing.proto";
import "quickwit/ingest.proto";
import "quickwit/metastore.proto";

Expand Down Expand Up @@ -59,6 +60,9 @@ service ControlPlaneService {
// Returns the list of open shards for one or several sources. If the control plane is not able to find any
// for a source, it will pick a pair of leader-follower ingesters and will open a new shard.
rpc GetOrCreateOpenShards(GetOrCreateOpenShardsRequest) returns (GetOrCreateOpenShardsResponse);

// Return some innerstate of the control plane meant to assist debugging.
rpc GetDebugState(GetDebugStateRequest) returns (GetDebugStateResponse);
}

// Shard API
Expand Down Expand Up @@ -103,3 +107,21 @@ message GetOrCreateOpenShardsFailure {
string source_id = 3;
GetOrCreateOpenShardsFailureReason reason = 4;
}

message GetDebugStateRequest {
}

message GetDebugStateResponse {
repeated ShardTableEntry shard_table = 1;
repeated PhysicalIndexingPlanEntry physical_index_plan = 2;
}

message ShardTableEntry {
string source_id = 1;
repeated quickwit.ingest.Shard shards = 2;
}

message PhysicalIndexingPlanEntry {
string node_id = 1;
repeated quickwit.indexing.IndexingTask tasks = 2;
}
Loading

0 comments on commit 44360ac

Please sign in to comment.