Skip to content

Commit

Permalink
feat: add deleteop command (#3702)
Browse files Browse the repository at this point in the history
* feat: add deleteop command

* docs: add docs

* fix: fix bug

* docs: update docs

---------

Co-authored-by: 4paradigm <[email protected]>
  • Loading branch information
dl239 and 4paradigm authored Feb 22, 2024
1 parent 67031d2 commit 47a61cb
Show file tree
Hide file tree
Showing 8 changed files with 281 additions and 95 deletions.
16 changes: 16 additions & 0 deletions docs/en/maintain/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,22 @@ Command format: `cancelop op\_id`
Cancel op ok!
```

### deleteop

Delete one or more op from nameserver

Command format: deleteop op\_id / op\_status

* op\_id: the operation ID to delete
* op\_status: specify the status of the op that needs to be deleted. The statuses that can be specified are done, failed and canceled

```
> deleteop 5
Delete op ok!
> deleteop done
Delete op ok!
```

### showopstatus

Display operation execution information
Expand Down
16 changes: 16 additions & 0 deletions docs/zh/maintain/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,22 @@ recover table ok
Cancel op ok!
```

### deleteop

删除op。可以指定op\_id删除一个op, 也可以指定op类型删除对应类型的所有op

命令格式: deleteop op\_id / op\_status

* op\_id 需要删除的op id
* op\_status 需要删除的op的状态。 可以指定的状态有done, failed和canceled

```
> deleteop 5
Delete op ok!
> deleteop done
Delete op ok!
```

### showopstatus

显示操作执行信息
Expand Down
19 changes: 19 additions & 0 deletions src/client/ns_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,25 @@ base::Status NsClient::CancelOP(uint64_t op_id) {
return st;
}

base::Status NsClient::DeleteOP(std::optional<uint64_t> op_id, ::openmldb::api::TaskStatus status) {
::openmldb::nameserver::DeleteOPRequest request;
::openmldb::nameserver::GeneralResponse response;
if (!db_.empty()) {
request.set_db(db_);
}
if (op_id.has_value()) {
request.set_op_id(op_id.value());
} else {
request.set_status(status);
}
auto st = client_.SendRequestSt(&::openmldb::nameserver::NameServer_Stub::DeleteOP, &request, &response,
FLAGS_request_timeout_ms, 1);
if (st.OK()) {
return {response.code(), response.msg()};
}
return st;
}

bool NsClient::AddTableField(const std::string& table_name, const ::openmldb::common::ColumnDesc& column_desc,
std::string& msg) {
::openmldb::nameserver::AddTableFieldRequest request;
Expand Down
1 change: 1 addition & 0 deletions src/client/ns_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class NsClient : public Client {
base::Status ShowOPStatus(uint64_t op_id, ::openmldb::nameserver::ShowOPStatusResponse* response);

base::Status CancelOP(uint64_t op_id);
base::Status DeleteOP(std::optional<uint64_t> op_id, openmldb::api::TaskStatus status);

bool AddTableField(const std::string& table_name, const ::openmldb::common::ColumnDesc& column_desc,
std::string& msg); // NOLINT
Expand Down
227 changes: 133 additions & 94 deletions src/cmd/openmldb.cc

Large diffs are not rendered by default.

86 changes: 86 additions & 0 deletions src/nameserver/name_server_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2651,6 +2651,92 @@ void NameServerImpl::RecoverTable(RpcController* controller, const RecoverTableR
response->set_msg("ok");
}

void NameServerImpl::DeleteOP(RpcController* controller, const DeleteOPRequest* request, GeneralResponse* response,
Closure* done) {
brpc::ClosureGuard done_guard(done);
if (!running_.load(std::memory_order_acquire)) {
response->set_code(::openmldb::base::ReturnCode::kNameserverIsNotLeader);
response->set_msg("nameserver is not leader");
PDLOG(WARNING, "cur nameserver is not leader");
return;
}
if (!request->has_op_id() && (request->status() == ::openmldb::api::TaskStatus::kInited ||
request->status() == ::openmldb::api::TaskStatus::kDoing)) {
response->set_code(::openmldb::base::ReturnCode::kInvalidParameter);
response->set_msg("cannot delete the Inited OP");
PDLOG(WARNING, "cannot delete the Inited OP");
return;
}
response->set_code(::openmldb::base::ReturnCode::kOk);
response->set_msg("ok");
auto need_delete = [] (const DeleteOPRequest* request, const ::openmldb::api::OPInfo& op_info) -> bool {
if (request->has_op_id()) {
if (op_info.op_id() != request->op_id()) {
return false;
}
} else if (op_info.task_status() != request->status() ||
(request->has_db() && request->db() != op_info.db())) {
return false;
}
return true;
};
auto delete_zk_op = [](ZkClient* zk_client, const std::string& path, uint64_t op_id) -> bool {
std::string node = absl::StrCat(path, "/", op_id);
if (zk_client->DeleteNode(node)) {
PDLOG(INFO, "delete zk op node[%s] success.", node.c_str());
} else {
PDLOG(WARNING, "delete zk op_node failed. node[%s]", node.c_str());
return false;
}
return true;
};
std::lock_guard<std::mutex> lock(mu_);
for (auto iter = done_op_list_.begin(); iter != done_op_list_.end();) {
const auto& op_info = (*iter)->op_info_;
if (need_delete(request, op_info)) {
if (op_info.task_status() != api::TaskStatus::kDone &&
!delete_zk_op(zk_client_, zk_path_.op_data_path_, op_info.op_id())) {
response->set_code(base::ReturnCode::kDelZkFailed);
response->set_msg("delete zk op_node failed");
return;
}
iter = done_op_list_.erase(iter);
if (request->has_op_id()) {
return;
}
continue;
}
iter++;
}
for (auto& op_list : task_vec_) {
if (op_list.empty()) {
continue;
}
for (auto iter = op_list.begin(); iter != op_list.end();) {
const auto& op_info = (*iter)->op_info_;
if (need_delete(request, op_info)) {
if (op_info.task_status() != api::TaskStatus::kDone &&
!delete_zk_op(zk_client_, zk_path_.op_data_path_, op_info.op_id())) {
response->set_code(base::ReturnCode::kDelZkFailed);
response->set_msg("delete zk op_node failed");
return;
}
iter = op_list.erase(iter);
if (request->has_op_id()) {
return;
}
continue;
}
iter++;
}
}
if (request->has_op_id()) {
response->set_code(base::ReturnCode::kDeleteFailed);
response->set_msg("op id does not exist");
PDLOG(WARNING, "op id %lu does not exist", request->op_id());
}
}

void NameServerImpl::CancelOP(RpcController* controller, const CancelOPRequest* request, GeneralResponse* response,
Closure* done) {
brpc::ClosureGuard done_guard(done);
Expand Down
2 changes: 2 additions & 0 deletions src/nameserver/name_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ class NameServerImpl : public NameServer {

void CancelOP(RpcController* controller, const CancelOPRequest* request, GeneralResponse* response, Closure* done);

void DeleteOP(RpcController* controller, const DeleteOPRequest* request, GeneralResponse* response, Closure* done);

void AddReplicaCluster(RpcController* controller, const ClusterAddress* request, GeneralResponse* response,
Closure* done);

Expand Down
9 changes: 8 additions & 1 deletion src/proto/name_server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,19 @@ message UpdateTableAliveRequest {
optional string db = 5 [default = ""];
}

message ShowOPStatusRequest{
message ShowOPStatusRequest {
optional string name = 1;
optional uint32 pid = 2;
optional string db = 3;
optional uint64 op_id = 4;
}

message DeleteOPRequest {
optional string db = 1;
optional uint64 op_id = 2;
optional openmldb.api.TaskStatus status = 3;
}

message ConnectZKRequest {}
message DisConnectZKRequest {}

Expand Down Expand Up @@ -549,6 +555,7 @@ service NameServer {
rpc DelReplicaNS(DelReplicaNSRequest) returns (GeneralResponse);
rpc ShowOPStatus(ShowOPStatusRequest) returns (ShowOPStatusResponse);
rpc CancelOP(CancelOPRequest) returns (GeneralResponse);
rpc DeleteOP(DeleteOPRequest) returns (GeneralResponse);
rpc ConfSet(ConfSetRequest) returns (GeneralResponse);
rpc ConfGet(ConfGetRequest) returns (ConfGetResponse);
rpc ChangeLeader(ChangeLeaderRequest) returns (GeneralResponse);
Expand Down

0 comments on commit 47a61cb

Please sign in to comment.