Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
xyfsjq authored Mar 22, 2024
2 parents b12373e + a118eb9 commit e18e5fa
Show file tree
Hide file tree
Showing 1,926 changed files with 65,036 additions and 162,623 deletions.
2 changes: 0 additions & 2 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ github:
- FE UT (Doris FE UT)
- BE UT (Doris BE UT)
- Build Broker
- Build Documents
- ShellCheck
- Build Third Party Libraries (Linux)
- Build Third Party Libraries (macOS)
Expand Down Expand Up @@ -93,7 +92,6 @@ github:
- BE UT (Doris BE UT)
- Build Broker
- ShellCheck
- Build Documents
- Build Third Party Libraries (Linux)
- Build Third Party Libraries (macOS)
- COMPILE (DORIS_COMPILE)
Expand Down
10 changes: 10 additions & 0 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ void AgentServer::cloud_start_workers(CloudStorageEngine& engine, ExecEnv* exec_
_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_TASK", _master_info, config::report_task_interval_seconds,
[&master_info = _master_info] { report_task_callback(master_info); }));

_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_DISK_STATE", _master_info, config::report_disk_state_interval_seconds,
[&engine, &master_info = _master_info] { report_disk_callback(engine, master_info); }));
}

// TODO(lingbin): each task in the batch may have it own status or FE must check and
Expand Down Expand Up @@ -259,4 +263,10 @@ void AgentServer::publish_cluster_state(TAgentResult& t_agent_result,
status.to_thrift(&t_agent_result.status);
}

void AgentServer::stop_report_workers() {
for (auto& work : _report_workers) {
work->stop();
}
}

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/agent/agent_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class AgentServer {

TopicSubscriber* get_topic_subscriber() { return _topic_subscriber.get(); }

void stop_report_workers();

private:
// Reference to the ExecEnv::_master_info
const TMasterInfo& _master_info;
Expand Down
3 changes: 2 additions & 1 deletion be/src/agent/be_exec_version_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ class BeExecVersionManager {
* e. add repeat_max_num in repeat function
* 3: start from doris 2.0 (by some mistakes)
* a. aggregation function do not serialize bitmap to string.
* b. support window funnel mode.
* 4: start from doris 2.1
* a. support window funnel mode from 2.0
* a. ignore this line, window funnel mode should be enabled from 2.0.
* b. array contains/position/countequal function return nullable in less situations.
* c. cleared old version of Version 2.
* d. unix_timestamp function support timestamp with float for datetimev2, and change nullable mode.
Expand Down
204 changes: 147 additions & 57 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include "io/fs/hdfs_file_system.h"
#include "io/fs/local_file_system.h"
#include "io/fs/path.h"
#include "io/fs/remote_file_system.h"
#include "io/fs/s3_file_system.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/data_dir.h"
Expand Down Expand Up @@ -748,6 +749,23 @@ void update_tablet_meta_callback(StorageEngine& engine, const TAgentTaskRequest&
continue;
}
bool need_to_save = false;
if (tablet_meta_info.__isset.partition_id) {
// for fix partition_id = 0
LOG(WARNING) << "change be tablet id: " << tablet->tablet_meta()->tablet_id()
<< "partition id from : " << tablet->tablet_meta()->partition_id()
<< " to : " << tablet_meta_info.partition_id;
auto succ = engine.tablet_manager()->update_tablet_partition_id(
tablet_meta_info.partition_id, tablet->tablet_meta()->tablet_id());
if (!succ) {
std::string err_msg = fmt::format(
"change be tablet id : {} partition_id : {} failed",
tablet->tablet_meta()->tablet_id(), tablet_meta_info.partition_id);
LOG(WARNING) << err_msg;
status = Status::InvalidArgument(err_msg);
continue;
}
need_to_save = true;
}
if (tablet_meta_info.__isset.storage_policy_id) {
tablet->tablet_meta()->set_storage_policy_id(tablet_meta_info.storage_policy_id);
need_to_save = true;
Expand Down Expand Up @@ -999,6 +1017,31 @@ void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info)
}
}

void report_disk_callback(CloudStorageEngine& engine, const TMasterInfo& master_info) {
// Random sleep 1~5 seconds before doing report.
// In order to avoid the problem that the FE receives many report requests at the same time
// and can not be processed.
if (config::report_random_wait) {
random_sleep(5);
}
(void)engine; // To be used in the future

TReportRequest request;
request.__set_backend(BackendOptions::get_local_backend());
request.__isset.disks = true;

// TODO(deardeng): report disk info in cloud mode. And make it more clear
// that report CPU by using a separte report procedure
// or abstracting disk report as "host info report"
request.__set_num_cores(CpuInfo::num_cores());
request.__set_pipeline_executor_size(config::pipeline_executor_size > 0
? config::pipeline_executor_size
: CpuInfo::num_cores());
bool succ = handle_report(request, master_info, "disk");
report_disk_total << 1;
report_disk_failed << !succ;
}

void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_info) {
if (config::report_random_wait) {
random_sleep(5);
Expand Down Expand Up @@ -1034,10 +1077,16 @@ void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_inf
}
request.__isset.storage_policy = true;
auto& resource_list = request.resource;
for (auto [id, version] : get_storage_resource_ids()) {
for (auto [id_str, version] : get_storage_resource_ids()) {
auto& resource = resource_list.emplace_back();
resource.__set_id(id);
resource.__set_version(version);
int64_t id = -1;
if (auto [_, ec] = std::from_chars(id_str.data(), id_str.data() + id_str.size(), id);
ec == std::errc {}) [[unlikely]] {
LOG(ERROR) << "invalid resource id format: " << id_str;
} else {
resource.__set_id(id);
resource.__set_version(version);
}
}
request.__isset.resource = true;

Expand Down Expand Up @@ -1285,77 +1334,118 @@ void submit_table_compaction_callback(StorageEngine& engine, const TAgentTaskReq
}
}

namespace {

void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr existed_fs) {
Status st;
io::RemoteFileSystemSPtr fs;

if (!existed_fs) {
// No such FS instance on BE
S3Conf s3_conf {
.bucket = param.s3_storage_param.bucket,
.prefix = param.s3_storage_param.root_path,
.client_conf = {
.endpoint = param.s3_storage_param.endpoint,
.region = param.s3_storage_param.region,
.ak = param.s3_storage_param.ak,
.sk = param.s3_storage_param.sk,
.max_connections = param.s3_storage_param.max_conn,
.request_timeout_ms = param.s3_storage_param.request_timeout_ms,
.connect_timeout_ms = param.s3_storage_param.conn_timeout_ms,
// When using cold heat separation in minio, user might use ip address directly,
// which needs enable use_virtual_addressing to true
.use_virtual_addressing = !param.s3_storage_param.use_path_style,
}};
auto res = io::S3FileSystem::create(std::move(s3_conf), std::to_string(param.id));
if (!res.has_value()) {
st = std::move(res).error();
} else {
fs = std::move(res).value();
}
} else {
DCHECK_EQ(existed_fs->type(), io::FileSystemType::S3) << param.id << ' ' << param.name;
auto client = static_cast<io::S3FileSystem*>(existed_fs.get())->client_holder();
S3ClientConf conf {
.ak = param.s3_storage_param.ak,
.sk = param.s3_storage_param.sk,
};
st = client->reset(conf);
fs = std::move(existed_fs);
}

if (!st.ok()) {
LOG(WARNING) << "update s3 resource failed: " << st;
} else {
LOG_INFO("successfully update hdfs resource")
.tag("resource_id", param.id)
.tag("resource_name", param.name);
put_storage_resource(param.id, {std::move(fs), param.version});
}
}

void update_hdfs_resource(const TStorageResource& param, io::RemoteFileSystemSPtr existed_fs) {
Status st;
io::RemoteFileSystemSPtr fs;

if (!existed_fs) {
// No such FS instance on BE
auto res = io::HdfsFileSystem::create(param.hdfs_storage_param,
param.hdfs_storage_param.fs_name,
std::to_string(param.id), nullptr);
if (!res.has_value()) {
st = std::move(res).error();
} else {
fs = std::move(res).value();
}

} else {
DCHECK_EQ(existed_fs->type(), io::FileSystemType::HDFS) << param.id << ' ' << param.name;
// TODO(plat1ko): update hdfs conf
fs = std::move(existed_fs);
}

if (!st.ok()) {
LOG(WARNING) << "update hdfs resource failed: " << st;
} else {
LOG_INFO("successfully update hdfs resource")
.tag("resource_id", param.id)
.tag("resource_name", param.name);
put_storage_resource(param.id, {std::move(fs), param.version});
}
}

} // namespace

void push_storage_policy_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
const auto& push_storage_policy_req = req.push_storage_policy_req;
// refresh resource
for (auto& resource : push_storage_policy_req.resource) {
auto existed_resource = get_storage_resource(resource.id);
if (existed_resource.version >= resource.version) {
for (auto&& param : push_storage_policy_req.resource) {
auto existed_resource = get_storage_resource(param.id);
if (existed_resource.version >= param.version) {
// Stale request, ignore
continue;
}
if (resource.__isset.s3_storage_param) {
Status st;
S3Conf s3_conf;
s3_conf.ak = std::move(resource.s3_storage_param.ak);
s3_conf.sk = std::move(resource.s3_storage_param.sk);
s3_conf.endpoint = std::move(resource.s3_storage_param.endpoint);
s3_conf.region = std::move(resource.s3_storage_param.region);
s3_conf.prefix = std::move(resource.s3_storage_param.root_path);
s3_conf.bucket = std::move(resource.s3_storage_param.bucket);
s3_conf.connect_timeout_ms = resource.s3_storage_param.conn_timeout_ms;
s3_conf.max_connections = resource.s3_storage_param.max_conn;
s3_conf.request_timeout_ms = resource.s3_storage_param.request_timeout_ms;
// When using cold heat separation in minio, user might use ip address directly,
// which needs enable use_virtual_addressing to true
s3_conf.use_virtual_addressing = !resource.s3_storage_param.use_path_style;
std::shared_ptr<io::S3FileSystem> fs;
if (existed_resource.fs == nullptr) {
st = io::S3FileSystem::create(s3_conf, std::to_string(resource.id), &fs);
} else {
fs = std::static_pointer_cast<io::S3FileSystem>(existed_resource.fs);
fs->set_conf(s3_conf);
}
if (!st.ok()) {
LOG(WARNING) << "update s3 resource failed: " << st;
} else {
LOG_INFO("successfully update s3 resource")
.tag("resource_id", resource.id)
.tag("resource_name", resource.name)
.tag("s3_conf", s3_conf.to_string());
put_storage_resource(resource.id, {std::move(fs), resource.version});
}
} else if (resource.__isset.hdfs_storage_param) {
Status st;
std::shared_ptr<io::HdfsFileSystem> fs;
if (existed_resource.fs == nullptr) {
st = io::HdfsFileSystem::create(resource.hdfs_storage_param,
std::to_string(resource.id), "", nullptr, &fs);
} else {
fs = std::static_pointer_cast<io::HdfsFileSystem>(existed_resource.fs);
}
if (!st.ok()) {
LOG(WARNING) << "update hdfs resource failed: " << st;
} else {
LOG_INFO("successfully update hdfs resource")
.tag("resource_id", resource.id)
.tag("resource_name", resource.name);
put_storage_resource(resource.id, {std::move(fs), resource.version});
}

if (param.__isset.s3_storage_param) {
update_s3_resource(param, std::move(existed_resource.fs));
} else if (param.__isset.hdfs_storage_param) {
update_hdfs_resource(param, std::move(existed_resource.fs));
} else {
LOG(WARNING) << "unknown resource=" << resource;
LOG(WARNING) << "unknown resource=" << param;
}
}
// drop storage policy
for (auto policy_id : push_storage_policy_req.dropped_storage_policy) {
delete_storage_policy(policy_id);
}
// refresh storage policy
for (auto& storage_policy : push_storage_policy_req.storage_policy) {
for (auto&& storage_policy : push_storage_policy_req.storage_policy) {
auto existed_storage_policy = get_storage_policy(storage_policy.id);
if (existed_storage_policy == nullptr ||
existed_storage_policy->version < storage_policy.version) {
auto storage_policy1 = std::make_shared<StoragePolicy>();
storage_policy1->name = std::move(storage_policy.name);
storage_policy1->name = storage_policy.name;
storage_policy1->version = storage_policy.version;
storage_policy1->cooldown_datetime = storage_policy.cooldown_datetime;
storage_policy1->cooldown_ttl = storage_policy.cooldown_ttl;
Expand Down
2 changes: 2 additions & 0 deletions be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ void report_task_callback(const TMasterInfo& master_info);

void report_disk_callback(StorageEngine& engine, const TMasterInfo& master_info);

void report_disk_callback(CloudStorageEngine& engine, const TMasterInfo& master_info);

void report_tablet_callback(StorageEngine& engine, const TMasterInfo& master_info);

void calc_delete_bimtap_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req);
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ CloudBackendService::CloudBackendService(CloudStorageEngine& engine, ExecEnv* ex
CloudBackendService::~CloudBackendService() = default;

Status CloudBackendService::create_service(CloudStorageEngine& engine, ExecEnv* exec_env, int port,
std::unique_ptr<ThriftServer>* server) {
auto service = std::make_shared<CloudBackendService>(engine, exec_env);
std::unique_ptr<ThriftServer>* server,
std::shared_ptr<doris::CloudBackendService> service) {
service->_agent_server->cloud_start_workers(engine, exec_env);
// TODO: do we want a BoostThreadFactory?
// TODO: we want separate thread factories here, so that fe requests can't starve
Expand Down
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_backend_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ class CloudStorageEngine;
class CloudBackendService final : public BaseBackendService {
public:
static Status create_service(CloudStorageEngine& engine, ExecEnv* exec_env, int port,
std::unique_ptr<ThriftServer>* server);
std::unique_ptr<ThriftServer>* server,
std::shared_ptr<doris::CloudBackendService> service);

CloudBackendService(CloudStorageEngine& engine, ExecEnv* exec_env);

Expand Down
Loading

0 comments on commit e18e5fa

Please sign in to comment.