Skip to content

Commit

Permalink
refactor(cluster): refactor cluster for dyn cluster [part 1] (#17023)
Browse files Browse the repository at this point in the history
* refactor(cluster): refactor cluster for dyn cluster [part 1]

* refactor(cluster): refactor cluster for dyn cluster

* refactor(cluster): refactor cluster for dyn cluster

* refactor(cluster): refactor cluster for dyn cluster

* refactor(cluster): refactor cluster for dyn cluster

* refactor(cluster): refactor cluster for dyn cluster
  • Loading branch information
zhang2014 authored Dec 10, 2024
1 parent 644f17b commit 8646b77
Show file tree
Hide file tree
Showing 13 changed files with 376 additions and 244 deletions.
7 changes: 7 additions & 0 deletions src/meta/types/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ pub struct NodeInfo {
pub flight_address: String,
pub discovery_address: String,
pub binary_version: String,

#[serde(skip_serializing_if = "String::is_empty")]
pub cluster_id: String,
#[serde(skip_serializing_if = "String::is_empty")]
pub warehouse_id: String,
}

impl NodeInfo {
Expand All @@ -103,6 +108,8 @@ impl NodeInfo {
flight_address,
discovery_address,
binary_version,
cluster_id: "".to_string(),
warehouse_id: "".to_string(),
}
}

Expand Down
31 changes: 31 additions & 0 deletions src/meta/types/tests/it/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ fn test_node_info_ip_port() -> anyhow::Result<()> {
flight_address: "1.2.3.4:123".to_string(),
discovery_address: "4.5.6.7:456".to_string(),
binary_version: "v0.8-binary-version".to_string(),
cluster_id: "".to_string(),
warehouse_id: "".to_string(),
};

let (ip, port) = n.ip_port()?;
Expand All @@ -34,3 +36,32 @@ fn test_node_info_ip_port() -> anyhow::Result<()> {

Ok(())
}

#[test]
fn test_serde_node_info() {
let mut info = NodeInfo {
id: "test_id".to_string(),
secret: "test_secret".to_string(),
version: 1,
cpu_nums: 1,
http_address: "7.8.9.10:987".to_string(),
flight_address: "1.2.3.4:123".to_string(),
discovery_address: "4.5.6.7:456".to_string(),
binary_version: "v0.8-binary-version".to_string(),
cluster_id: String::new(),
warehouse_id: String::new(),
};

let json_str = serde_json::to_string(&info).unwrap();
assert_eq!(info, serde_json::from_str::<NodeInfo>(&json_str).unwrap());
assert!(!json_str.contains("cluster"));
assert!(!json_str.contains("warehouse"));

info.cluster_id = String::from("test-cluster-id");
info.warehouse_id = String::from("test-warehouse-id");

assert_eq!(
info,
serde_json::from_slice::<NodeInfo>(&serde_json::to_vec(&info).unwrap()).unwrap()
);
}
66 changes: 0 additions & 66 deletions src/query/management/src/cluster/cluster_api.rs

This file was deleted.

129 changes: 0 additions & 129 deletions src/query/management/src/cluster/cluster_mgr.rs

This file was deleted.

6 changes: 3 additions & 3 deletions src/query/management/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#![allow(clippy::uninlined_format_args)]

mod cluster;
mod connection;
mod file_format;
mod network_policy;
Expand All @@ -26,14 +25,13 @@ mod setting;
mod stage;
pub mod udf;
mod user;
mod warehouse;

mod client_session;
pub mod errors;
mod procedure;

pub use client_session::ClientSessionMgr;
pub use cluster::ClusterApi;
pub use cluster::ClusterMgr;
pub use connection::ConnectionMgr;
pub use file_format::FileFormatMgr;
pub use network_policy::NetworkPolicyMgr;
Expand All @@ -52,3 +50,5 @@ pub use stage::StageApi;
pub use stage::StageMgr;
pub use user::UserApi;
pub use user::UserMgr;
pub use warehouse::ClusterApi;
pub use warehouse::ClusterMgr;
34 changes: 34 additions & 0 deletions src/query/management/src/warehouse/cluster_api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_exception::Result;
use databend_common_meta_types::NodeInfo;

/// Databend-query cluster management API
#[async_trait::async_trait]
pub trait ClusterApi: Sync + Send {
/// Add a new node.
async fn add_node(&self, node: NodeInfo) -> Result<()>;

/// Keep the tenant's cluster node alive.
async fn heartbeat(&self, node: &NodeInfo) -> Result<()>;

/// Get the tenant's cluster all nodes.
async fn get_nodes(&self, warehouse: &str, cluster: &str) -> Result<Vec<NodeInfo>>;

/// Drop the tenant's cluster one node by node.id.
async fn drop_node(&self, node_id: String) -> Result<()>;

async fn get_local_addr(&self) -> Result<Option<String>>;
}
Loading

0 comments on commit 8646b77

Please sign in to comment.