Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added new cloud statistics facility. #25

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 30 additions & 15 deletions cloud/aws/aws_env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ namespace rocksdb {

class AwsS3ClientWrapper::Timer {
public:
Timer(CloudRequestCallback* callback, CloudRequestOpType type,
Timer(CloudRequestCallback& callback, CloudRequestOpType type,
uint64_t size = 0)
: callback_(callback), type_(type), size_(size), start_(now()) {}
~Timer() {
if (callback_) {
(*callback_)(type_, size_, now() - start_, success_);
}
callback_(type_, size_, now() - start_, success_);
}
void SetSize(uint64_t size) { size_ = size; }
void SetSuccess(bool success) { success_ = success; }
Expand All @@ -37,7 +35,7 @@ class AwsS3ClientWrapper::Timer {
std::chrono::system_clock::from_time_t(0))
.count();
}
CloudRequestCallback* callback_;
CloudRequestCallback& callback_;
CloudRequestOpType type_;
uint64_t size_;
bool success_{false};
Expand All @@ -47,43 +45,43 @@ class AwsS3ClientWrapper::Timer {

AwsS3ClientWrapper::AwsS3ClientWrapper(
std::unique_ptr<Aws::S3::S3Client> client,
std::shared_ptr<CloudRequestCallback> cloud_request_callback)
CloudRequestCallback callback)
: client_(std::move(client)),
cloud_request_callback_(std::move(cloud_request_callback)) {}
callback_(std::move(callback)) {}

Aws::S3::Model::ListObjectsOutcome AwsS3ClientWrapper::ListObjects(
const Aws::S3::Model::ListObjectsRequest& request) {
Timer t(cloud_request_callback_.get(), CloudRequestOpType::kListOp);
Timer t(callback_, CloudRequestOpType::kListOp);
auto outcome = client_->ListObjects(request);
t.SetSuccess(outcome.IsSuccess());
return outcome;
}

Aws::S3::Model::CreateBucketOutcome AwsS3ClientWrapper::CreateBucket(
const Aws::S3::Model::CreateBucketRequest& request) {
Timer t(cloud_request_callback_.get(), CloudRequestOpType::kCreateOp);
Timer t(callback_, CloudRequestOpType::kCreateOp);
return client_->CreateBucket(request);
}

Aws::S3::Model::DeleteObjectOutcome AwsS3ClientWrapper::DeleteObject(
const Aws::S3::Model::DeleteObjectRequest& request) {
Timer t(cloud_request_callback_.get(), CloudRequestOpType::kDeleteOp);
Timer t(callback_, CloudRequestOpType::kDeleteOp);
auto outcome = client_->DeleteObject(request);
t.SetSuccess(outcome.IsSuccess());
return outcome;
}

Aws::S3::Model::CopyObjectOutcome AwsS3ClientWrapper::CopyObject(
const Aws::S3::Model::CopyObjectRequest& request) {
Timer t(cloud_request_callback_.get(), CloudRequestOpType::kCopyOp);
Timer t(callback_, CloudRequestOpType::kCopyOp);
auto outcome = client_->CopyObject(request);
t.SetSuccess(outcome.IsSuccess());
return outcome;
}

Aws::S3::Model::GetObjectOutcome AwsS3ClientWrapper::GetObject(
const Aws::S3::Model::GetObjectRequest& request) {
Timer t(cloud_request_callback_.get(), CloudRequestOpType::kReadOp);
Timer t(callback_, CloudRequestOpType::kReadOp);
auto outcome = client_->GetObject(request);
if (outcome.IsSuccess()) {
t.SetSize(outcome.GetResult().GetContentLength());
Expand All @@ -94,7 +92,7 @@ Aws::S3::Model::GetObjectOutcome AwsS3ClientWrapper::GetObject(

Aws::S3::Model::PutObjectOutcome AwsS3ClientWrapper::PutObject(
const Aws::S3::Model::PutObjectRequest& request, uint64_t size_hint) {
Timer t(cloud_request_callback_.get(), CloudRequestOpType::kWriteOp,
Timer t(callback_, CloudRequestOpType::kWriteOp,
size_hint);
auto outcome = client_->PutObject(request);
t.SetSuccess(outcome.IsSuccess());
Expand All @@ -103,7 +101,7 @@ Aws::S3::Model::PutObjectOutcome AwsS3ClientWrapper::PutObject(

Aws::S3::Model::HeadObjectOutcome AwsS3ClientWrapper::HeadObject(
const Aws::S3::Model::HeadObjectRequest& request) {
Timer t(cloud_request_callback_.get(), CloudRequestOpType::kInfoOp);
Timer t(callback_, CloudRequestOpType::kInfoOp);
auto outcome = client_->HeadObject(request);
t.SetSuccess(outcome.IsSuccess());
return outcome;
Expand All @@ -113,6 +111,8 @@ Aws::S3::Model::HeadObjectOutcome AwsS3ClientWrapper::HeadObject(
// The AWS credentials are specified to the constructor via
// access_key_id and secret_key.
//
thread_local AwsS3ClientResult AwsEnv::s3client_result_;

AwsEnv::AwsEnv(Env* underlying_env, const std::string& src_bucket_prefix,
const std::string& src_object_prefix,
const std::string& src_bucket_region,
Expand Down Expand Up @@ -216,8 +216,13 @@ AwsEnv::AwsEnv(Env* underlying_env, const std::string& src_bucket_prefix,
{
std::unique_ptr<Aws::S3::S3Client> s3client(
new Aws::S3::S3Client(creds, config));
std::function<void(CloudRequestOpType, uint64_t, uint64_t, bool)> s3callback =
std::bind(&AwsEnv::S3ClientCallback, this,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4);

s3client_ = std::make_shared<AwsS3ClientWrapper>(
std::move(s3client), cloud_env_options.cloud_request_callback);
std::move(s3client), std::move(s3callback));
}

// create dest bucket if specified
Expand Down Expand Up @@ -312,6 +317,16 @@ AwsEnv::~AwsEnv() {
}
}

void AwsEnv::S3ClientCallback(CloudRequestOpType type, uint64_t size, uint64_t micros, bool ok) {
if (cloud_env_options.cloud_request_callback) {
(*cloud_env_options.cloud_request_callback)(type, size, micros, ok);
}
s3client_result_.type = type;
s3client_result_.size = size;
s3client_result_.micros = micros;
s3client_result_.ok = ok;
}

Status AwsEnv::CreateTailer() {
if (tailer_) {
return Status::Busy("Tailer already started");
Expand Down
26 changes: 20 additions & 6 deletions cloud/aws/aws_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class AwsS3ClientWrapper {
public:
AwsS3ClientWrapper(
std::unique_ptr<Aws::S3::S3Client> client,
std::shared_ptr<CloudRequestCallback> cloud_request_callback);
CloudRequestCallback callback);

Aws::S3::Model::ListObjectsOutcome ListObjects(
const Aws::S3::Model::ListObjectsRequest& request);
Expand All @@ -54,11 +54,18 @@ class AwsS3ClientWrapper {

private:
std::unique_ptr<Aws::S3::S3Client> client_;
std::shared_ptr<CloudRequestCallback> cloud_request_callback_;
CloudRequestCallback callback_;

class Timer;
};

struct AwsS3ClientResult {
CloudRequestOpType type;
uint64_t size;
uint64_t micros;
bool ok;
};

//
// The S3 environment for rocksdb. This class overrides all the
// file/dir access methods and delegates all other methods to the
Expand Down Expand Up @@ -244,6 +251,9 @@ class AwsEnv : public CloudEnvImpl {
// The S3 client
std::shared_ptr<AwsS3ClientWrapper> s3client_;

// Results of last S3 client call; used in stats collection.
static thread_local AwsS3ClientResult s3client_result_;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately we cannot use thread_local, since RocksDB can run on platforms that don't support it. We can use __thread, but only in some scenarios, check out how RocksDB does it: https://github.com/facebook/rocksdb/blob/master/monitoring/perf_context.cc#L20


// The Kinesis client
std::shared_ptr<Aws::Kinesis::KinesisClient> kinesis_client_;

Expand Down Expand Up @@ -304,6 +314,11 @@ class AwsEnv : public CloudEnvImpl {
const CloudEnvOptions& cloud_options,
std::shared_ptr<Logger> info_log = nullptr);

// When the s3 client performs an operation, it calls this function back with the result.
// Arguments are forwarded to every element in the callback list s3client_callbacks_ and to the
// callback supplied in cloud_env_options.
void S3ClientCallback(CloudRequestOpType type, uint64_t size, uint64_t micros, bool ok);

// The pathname that contains a list of all db's inside a bucket.
static constexpr const char* dbid_registry_ = "/.rockset/dbid/";

Expand Down Expand Up @@ -548,11 +563,10 @@ class AwsEnv : public CloudEnvImpl {
const std::string& dbid) override {
return s3_notsup;
}
virtual Status Savepoint() override {
return s3_notsup;
}

const CloudEnvOptions& GetCloudEnvOptions() override {
return CloudEnvOptions();
static CloudEnvOptions options{};
return options;
}
Status ListObjects(const std::string& bucket_name_prefix,
const std::string& bucket_object_prefix,
Expand Down
9 changes: 9 additions & 0 deletions cloud/aws/aws_s3.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "cloud/aws/aws_env.h"
#include "cloud/aws/aws_file.h"
#include "rocksdb/cloud/cloud_statistics.h"
#include "util/coding.h"
#include "util/stderr_logger.h"
#include "util/string_util.h"
Expand Down Expand Up @@ -464,6 +465,13 @@ Status S3WritableFile::CopyManifestToS3(uint64_t size_hint, bool force) {
"[s3] S3WritableFile made manifest %s durable to "
"bucket %s bucketpath %s.",
fname_.c_str(), s3_bucket_.c_str(), s3_object_.c_str());

// If cloud stats are present, record the manifest write and its latency in millis.
auto stats = env_->cloud_env_options.cloud_statistics;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls use auto&. Otherwise, you'll copy shared_ptr here, which needs to write to a std::atomic.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a related note, check out second paragraph here: http://smalldatum.blogspot.com/2016/10/make-myrocks-2x-less-slow.html

if (stats) {
stats->recordTick(NUMBER_MANIFEST_WRITES, 1);
stats->measureTime(MANIFEST_WRITES_TIME, env_->s3client_result_.micros / 1000);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you just measure time here instead of piggy-backing on the time measure in S3 callback? That would make the code much easier to reason about (no need for thread-local, wrapping the callback, etc). Also, you already have the start time of the operation recorded in line 455, so adding an extra time call shouldn't be a big concern.

}
} else {
Log(InfoLogLevel::ERROR_LEVEL, env_->info_log_,
"[s3] S3WritableFile failed to make manifest %s durable to "
Expand All @@ -472,6 +480,7 @@ Status S3WritableFile::CopyManifestToS3(uint64_t size_hint, bool force) {
stat.ToString().c_str());
}
}

return stat;
}

Expand Down
Loading