diff --git a/cloud/aws/aws_env.cc b/cloud/aws/aws_env.cc index c4f3e72f426..a0ccdf232d1 100644 --- a/cloud/aws/aws_env.cc +++ b/cloud/aws/aws_env.cc @@ -19,13 +19,11 @@ namespace rocksdb { class AwsS3ClientWrapper::Timer { public: - Timer(CloudRequestCallback* callback, CloudRequestOpType type, + Timer(CloudRequestCallback& callback, CloudRequestOpType type, uint64_t size = 0) : callback_(callback), type_(type), size_(size), start_(now()) {} ~Timer() { - if (callback_) { - (*callback_)(type_, size_, now() - start_, success_); - } + callback_(type_, size_, now() - start_, success_); } void SetSize(uint64_t size) { size_ = size; } void SetSuccess(bool success) { success_ = success; } @@ -37,7 +35,7 @@ class AwsS3ClientWrapper::Timer { std::chrono::system_clock::from_time_t(0)) .count(); } - CloudRequestCallback* callback_; + CloudRequestCallback& callback_; CloudRequestOpType type_; uint64_t size_; bool success_{false}; @@ -47,13 +45,13 @@ class AwsS3ClientWrapper::Timer { AwsS3ClientWrapper::AwsS3ClientWrapper( std::unique_ptr client, - std::shared_ptr cloud_request_callback) + CloudRequestCallback callback) : client_(std::move(client)), - cloud_request_callback_(std::move(cloud_request_callback)) {} + callback_(std::move(callback)) {} Aws::S3::Model::ListObjectsOutcome AwsS3ClientWrapper::ListObjects( const Aws::S3::Model::ListObjectsRequest& request) { - Timer t(cloud_request_callback_.get(), CloudRequestOpType::kListOp); + Timer t(callback_, CloudRequestOpType::kListOp); auto outcome = client_->ListObjects(request); t.SetSuccess(outcome.IsSuccess()); return outcome; @@ -61,13 +59,13 @@ Aws::S3::Model::ListObjectsOutcome AwsS3ClientWrapper::ListObjects( Aws::S3::Model::CreateBucketOutcome AwsS3ClientWrapper::CreateBucket( const Aws::S3::Model::CreateBucketRequest& request) { - Timer t(cloud_request_callback_.get(), CloudRequestOpType::kCreateOp); + Timer t(callback_, CloudRequestOpType::kCreateOp); return client_->CreateBucket(request); } Aws::S3::Model::DeleteObjectOutcome AwsS3ClientWrapper::DeleteObject( const Aws::S3::Model::DeleteObjectRequest& request) { - Timer t(cloud_request_callback_.get(), CloudRequestOpType::kDeleteOp); + Timer t(callback_, CloudRequestOpType::kDeleteOp); auto outcome = client_->DeleteObject(request); t.SetSuccess(outcome.IsSuccess()); return outcome; @@ -75,7 +73,7 @@ Aws::S3::Model::DeleteObjectOutcome AwsS3ClientWrapper::DeleteObject( Aws::S3::Model::CopyObjectOutcome AwsS3ClientWrapper::CopyObject( const Aws::S3::Model::CopyObjectRequest& request) { - Timer t(cloud_request_callback_.get(), CloudRequestOpType::kCopyOp); + Timer t(callback_, CloudRequestOpType::kCopyOp); auto outcome = client_->CopyObject(request); t.SetSuccess(outcome.IsSuccess()); return outcome; @@ -83,7 +81,7 @@ Aws::S3::Model::CopyObjectOutcome AwsS3ClientWrapper::CopyObject( Aws::S3::Model::GetObjectOutcome AwsS3ClientWrapper::GetObject( const Aws::S3::Model::GetObjectRequest& request) { - Timer t(cloud_request_callback_.get(), CloudRequestOpType::kReadOp); + Timer t(callback_, CloudRequestOpType::kReadOp); auto outcome = client_->GetObject(request); if (outcome.IsSuccess()) { t.SetSize(outcome.GetResult().GetContentLength()); @@ -94,7 +92,7 @@ Aws::S3::Model::GetObjectOutcome AwsS3ClientWrapper::GetObject( Aws::S3::Model::PutObjectOutcome AwsS3ClientWrapper::PutObject( const Aws::S3::Model::PutObjectRequest& request, uint64_t size_hint) { - Timer t(cloud_request_callback_.get(), CloudRequestOpType::kWriteOp, + Timer t(callback_, CloudRequestOpType::kWriteOp, size_hint); auto outcome = client_->PutObject(request); t.SetSuccess(outcome.IsSuccess()); @@ -103,7 +101,7 @@ Aws::S3::Model::PutObjectOutcome AwsS3ClientWrapper::PutObject( Aws::S3::Model::HeadObjectOutcome AwsS3ClientWrapper::HeadObject( const Aws::S3::Model::HeadObjectRequest& request) { - Timer t(cloud_request_callback_.get(), CloudRequestOpType::kInfoOp); + Timer t(callback_, CloudRequestOpType::kInfoOp); auto outcome = client_->HeadObject(request); t.SetSuccess(outcome.IsSuccess()); return outcome; @@ -113,6 +111,8 @@ Aws::S3::Model::HeadObjectOutcome AwsS3ClientWrapper::HeadObject( // The AWS credentials are specified to the constructor via // access_key_id and secret_key. // +thread_local AwsS3ClientResult AwsEnv::s3client_result_; + AwsEnv::AwsEnv(Env* underlying_env, const std::string& src_bucket_prefix, const std::string& src_object_prefix, const std::string& src_bucket_region, @@ -216,8 +216,13 @@ AwsEnv::AwsEnv(Env* underlying_env, const std::string& src_bucket_prefix, { std::unique_ptr s3client( new Aws::S3::S3Client(creds, config)); + std::function s3callback = + std::bind(&AwsEnv::S3ClientCallback, this, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3, std::placeholders::_4); + s3client_ = std::make_shared( - std::move(s3client), cloud_env_options.cloud_request_callback); + std::move(s3client), std::move(s3callback)); } // create dest bucket if specified @@ -312,6 +317,16 @@ AwsEnv::~AwsEnv() { } } +void AwsEnv::S3ClientCallback(CloudRequestOpType type, uint64_t size, uint64_t micros, bool ok) { + if (cloud_env_options.cloud_request_callback) { + (*cloud_env_options.cloud_request_callback)(type, size, micros, ok); + } + s3client_result_.type = type; + s3client_result_.size = size; + s3client_result_.micros = micros; + s3client_result_.ok = ok; +} + Status AwsEnv::CreateTailer() { if (tailer_) { return Status::Busy("Tailer already started"); diff --git a/cloud/aws/aws_env.h b/cloud/aws/aws_env.h index d37a1e04803..cf6dce7a382 100644 --- a/cloud/aws/aws_env.h +++ b/cloud/aws/aws_env.h @@ -29,7 +29,7 @@ class AwsS3ClientWrapper { public: AwsS3ClientWrapper( std::unique_ptr client, - std::shared_ptr cloud_request_callback); + CloudRequestCallback callback); Aws::S3::Model::ListObjectsOutcome ListObjects( const Aws::S3::Model::ListObjectsRequest& request); @@ -54,11 +54,18 @@ class AwsS3ClientWrapper { private: std::unique_ptr client_; - std::shared_ptr cloud_request_callback_; + CloudRequestCallback callback_; class Timer; }; +struct AwsS3ClientResult { + CloudRequestOpType type; + uint64_t size; + uint64_t micros; + bool ok; +}; + // // The S3 environment for rocksdb. This class overrides all the // file/dir access methods and delegates all other methods to the @@ -244,6 +251,9 @@ class AwsEnv : public CloudEnvImpl { // The S3 client std::shared_ptr s3client_; + // Results of last S3 client call; used in stats collection. + static thread_local AwsS3ClientResult s3client_result_; + // The Kinesis client std::shared_ptr kinesis_client_; @@ -304,6 +314,11 @@ class AwsEnv : public CloudEnvImpl { const CloudEnvOptions& cloud_options, std::shared_ptr info_log = nullptr); + // When the s3 client performs an operation, it calls this function back with the result. + // Arguments are forwarded to every element in the callback list s3client_callbacks_ and to the + // callback supplied in cloud_env_options. + void S3ClientCallback(CloudRequestOpType type, uint64_t size, uint64_t micros, bool ok); + // The pathname that contains a list of all db's inside a bucket. static constexpr const char* dbid_registry_ = "/.rockset/dbid/"; @@ -548,11 +563,10 @@ class AwsEnv : public CloudEnvImpl { const std::string& dbid) override { return s3_notsup; } - virtual Status Savepoint() override { - return s3_notsup; - } + const CloudEnvOptions& GetCloudEnvOptions() override { - return CloudEnvOptions(); + static CloudEnvOptions options{}; + return options; } Status ListObjects(const std::string& bucket_name_prefix, const std::string& bucket_object_prefix, diff --git a/cloud/aws/aws_s3.cc b/cloud/aws/aws_s3.cc index e3ad201c28e..e120f7ef26d 100644 --- a/cloud/aws/aws_s3.cc +++ b/cloud/aws/aws_s3.cc @@ -12,6 +12,7 @@ #include "cloud/aws/aws_env.h" #include "cloud/aws/aws_file.h" +#include "rocksdb/cloud/cloud_statistics.h" #include "util/coding.h" #include "util/stderr_logger.h" #include "util/string_util.h" @@ -464,6 +465,13 @@ Status S3WritableFile::CopyManifestToS3(uint64_t size_hint, bool force) { "[s3] S3WritableFile made manifest %s durable to " "bucket %s bucketpath %s.", fname_.c_str(), s3_bucket_.c_str(), s3_object_.c_str()); + + // If cloud stats are present, record the manifest write and its latency in millis. + auto stats = env_->cloud_env_options.cloud_statistics; + if (stats) { + stats->recordTick(NUMBER_MANIFEST_WRITES, 1); + stats->measureTime(MANIFEST_WRITES_TIME, env_->s3client_result_.micros / 1000); + } } else { Log(InfoLogLevel::ERROR_LEVEL, env_->info_log_, "[s3] S3WritableFile failed to make manifest %s durable to " @@ -472,6 +480,7 @@ Status S3WritableFile::CopyManifestToS3(uint64_t size_hint, bool force) { stat.ToString().c_str()); } } + return stat; } diff --git a/cloud/cloud_statistics_impl.cc b/cloud/cloud_statistics_impl.cc new file mode 100644 index 00000000000..3c9a4f8394f --- /dev/null +++ b/cloud/cloud_statistics_impl.cc @@ -0,0 +1,257 @@ +// Copyright (c) 2017-present, Rockset, Inc. All rights reserved. +#include "cloud/cloud_statistics_impl.h" + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include + +namespace rocksdb { + +std::shared_ptr CreateCloudStatistics() { + return std::make_shared(nullptr, false); +} + +CloudStatisticsImpl::CloudStatisticsImpl( + std::shared_ptr stats, + bool enable_internal_stats) + : stats_impl_(stats, enable_internal_stats) { +} + +CloudStatisticsImpl::~CloudStatisticsImpl() {} + +uint64_t CloudStatisticsImpl::getTickerCount(uint32_t tickerType) const { + uint64_t sum = 0; + if (tickerType & CLOUD_TICKER_ENUM_START) { + { + MutexLock lock(&lock_); + assert(tickerType < CLOUD_TICKER_ENUM_MAX); + int tickerIndex = tickerType ^ CLOUD_TICKER_ENUM_START; + tickers_[tickerIndex].thread_value->Fold( + [](void* curr_ptr, void* res) { + auto* sum_ptr = static_cast(res); + *sum_ptr += static_cast(curr_ptr)->load( + std::memory_order_relaxed); + }, + &sum); + sum += tickers_[tickerIndex].merged_sum.load(std::memory_order_relaxed); + } + return sum; + } else { + sum += stats_impl_.getTickerCount(tickerType); + } + return sum; +} + +uint64_t CloudStatisticsImpl::getAndResetTickerCount(uint32_t tickerType) { + uint64_t sum = 0; + if (tickerType & CLOUD_TICKER_ENUM_START) { + { + MutexLock lock(&lock_); + assert(tickerType < CLOUD_TICKER_ENUM_MAX); + int tickerIndex = tickerType ^ CLOUD_TICKER_ENUM_START; + tickers_[tickerIndex].thread_value->Fold( + [](void* curr_ptr, void* res) { + auto* sum_ptr = static_cast(res); + *sum_ptr += static_cast*>(curr_ptr)->exchange( + 0, std::memory_order_relaxed); + }, + &sum); + sum += tickers_[tickerIndex].merged_sum.exchange( + 0, std::memory_order_relaxed); + } + } else { + sum += stats_impl_.getAndResetTickerCount(tickerType); + } + return sum; +} + +void CloudStatisticsImpl::recordTick(uint32_t tickerType, uint64_t count) { + if (tickerType & CLOUD_TICKER_ENUM_START) { + assert(tickerType < CLOUD_TICKER_ENUM_MAX); + int tickerIndex = tickerType ^ CLOUD_TICKER_ENUM_START; + auto info = getThreadTickerInfo(tickerIndex); + info->value.fetch_add(count, std::memory_order_relaxed); + } else { + stats_impl_.recordTick(tickerType, count); + } +} + +void CloudStatisticsImpl::setTickerCount(uint32_t tickerType, uint64_t count) { + if (tickerType & CLOUD_TICKER_ENUM_START) { + { + MutexLock lock(&lock_); + assert(tickerType < CLOUD_TICKER_ENUM_MAX); + int tickerIndex = tickerType ^ CLOUD_TICKER_ENUM_START; + tickers_[tickerIndex].thread_value->Fold( + [](void* curr_ptr, void* res) { + static_cast*>(curr_ptr)->store( + 0, std::memory_order_relaxed); + }, + nullptr); + tickers_[tickerIndex].merged_sum.store(count, std::memory_order_relaxed); + } + } else { + stats_impl_.setTickerCount(tickerType, count); + } +} + +bool CloudStatisticsImpl::HistEnabledForType(uint32_t histogramType) const { + if (histogramType & CLOUD_HISTOGRAM_ENUM_START) { + return histogramType < CLOUD_HISTOGRAM_ENUM_MAX; + } else { + return stats_impl_.HistEnabledForType(histogramType); + } +} + +std::string CloudStatisticsImpl::getHistogramString(uint32_t histogramType) const { + std::string result; + if (histogramType & CLOUD_HISTOGRAM_ENUM_START) { + { + MutexLock lock(&lock_); + assert(histogramType < CLOUD_HISTOGRAM_ENUM_MAX); + int histogramIndex = histogramType ^ CLOUD_TICKER_ENUM_START; + result = std::move(histograms_[histogramIndex].getMergedHistogram()->ToString()); + } + } else { + result = std::move(stats_impl_.getHistogramString(histogramType)); + } + return result; +} + +void CloudStatisticsImpl::histogramData(uint32_t histogramType, + HistogramData* const data) const { + if (histogramType & CLOUD_HISTOGRAM_ENUM_START) { + { + MutexLock lock(&lock_); + assert(histogramType < CLOUD_HISTOGRAM_ENUM_MAX); + int histogramIndex = histogramType ^ CLOUD_TICKER_ENUM_START; + histograms_[histogramIndex].getMergedHistogram()->Data(data); + } + } else { + stats_impl_.histogramData(histogramType, data); + } +} + +void CloudStatisticsImpl::measureTime(uint32_t histogramType, uint64_t value) { + if (histogramType & CLOUD_HISTOGRAM_ENUM_START) { + assert(histogramType < CLOUD_HISTOGRAM_ENUM_MAX); + int histogramIndex = histogramType ^ CLOUD_TICKER_ENUM_START; + getThreadHistogramInfo(histogramIndex)->value.Add(value); + } else { + stats_impl_.measureTime(histogramType, value); + } +} + +Status CloudStatisticsImpl::Reset() { + { + MutexLock lock(&lock_); + for (uint32_t i = 0; + i < CLOUD_TICKER_ENUM_MAX - CLOUD_TICKER_ENUM_START - 1; ++i) { + tickers_[i].thread_value->Fold( + [](void* curr_ptr, void* res) { + static_cast*>(curr_ptr)->store( + 0, std::memory_order_relaxed); + }, + nullptr); + tickers_[i].merged_sum.store(0, std::memory_order_relaxed); + } + for (uint32_t i = 0; + i < CLOUD_HISTOGRAM_ENUM_MAX - CLOUD_HISTOGRAM_ENUM_START - 1; ++i) { + histograms_[i].thread_value->Fold( + [](void* curr_ptr, void* res) { + static_cast(curr_ptr)->Clear(); + }, + nullptr /* res */); + } + } + return stats_impl_.Reset(); +} + +std::string CloudStatisticsImpl::ToString() const { + static constexpr int kTmpStrBufferSize = 200; + std::string str; + str.reserve(20000); + { + MutexLock lock(&lock_); + char buffer[kTmpStrBufferSize]; + for (const auto& t : CloudTickersNameMap) { + if ((t.first & CLOUD_TICKER_ENUM_START) && (t.first < CLOUD_TICKER_ENUM_MAX)) { + uint64_t tickerIndex = t.first ^ CLOUD_TICKER_ENUM_START; + uint64_t sum = 0; + tickers_[tickerIndex].thread_value->Fold( + [](void* curr_ptr, void* res) { + auto* sum_ptr = static_cast(res); + *sum_ptr += static_cast(curr_ptr)->load( + std::memory_order_relaxed); + }, + &sum); + sum += tickers_[tickerIndex].merged_sum.load(std::memory_order_relaxed); + + snprintf(buffer, kTmpStrBufferSize, "%s COUNT : %" PRIu64 "\n", t.second.c_str(), sum); + str.append(buffer); + } + } + for (const auto& h : CloudHistogramsNameMap) { + if ((h.first & CLOUD_HISTOGRAM_ENUM_START) && (h.first < CLOUD_HISTOGRAM_ENUM_MAX)) { + uint64_t histIndex = h.first ^ CLOUD_HISTOGRAM_ENUM_START; + HistogramData histData; + histograms_[histIndex].getMergedHistogram()->Data(&histData); + + snprintf( + buffer, kTmpStrBufferSize, + "%s statistics Percentiles :=> 50 : %f 95 : %f 99 : %f 100 : %f\n", + h.second.c_str(), histData.median, histData.percentile95, + histData.percentile99, histData.max); + str.append(buffer); + } + } + } + str.append(stats_impl_.ToString()); + str.shrink_to_fit(); + return str; +} + +CloudStatisticsImpl::ThreadTickerInfo* CloudStatisticsImpl::getThreadTickerInfo( + uint32_t tickerIndex) { + auto info_ptr = + static_cast(tickers_[tickerIndex].thread_value->Get()); + if (info_ptr == nullptr) { + info_ptr = + new ThreadTickerInfo(0 /* value */, &tickers_[tickerIndex].merged_sum); + tickers_[tickerIndex].thread_value->Reset(info_ptr); + } + return info_ptr; +} + +CloudStatisticsImpl::ThreadHistogramInfo* CloudStatisticsImpl::getThreadHistogramInfo( + uint32_t histogramIndex) { + auto info_ptr = static_cast( + histograms_[histogramIndex].thread_value->Get()); + if (info_ptr == nullptr) { + info_ptr = new ThreadHistogramInfo(&histograms_[histogramIndex].merged_hist, + &histograms_[histogramIndex].merge_lock); + histograms_[histogramIndex].thread_value->Reset(info_ptr); + } + return info_ptr; +} + +std::unique_ptr +CloudStatisticsImpl::HistogramInfo::getMergedHistogram() const { + std::unique_ptr res_hist(new HistogramImpl()); + { + MutexLock lock(&merge_lock); + res_hist->Merge(merged_hist); + } + thread_value->Fold( + [](void* curr_ptr, void* res) { + auto tmp_res_hist = static_cast(res); + auto curr_hist = static_cast(curr_ptr); + tmp_res_hist->Merge(*curr_hist); + }, + res_hist.get()); + return res_hist; +} + +} diff --git a/cloud/cloud_statistics_impl.h b/cloud/cloud_statistics_impl.h new file mode 100644 index 00000000000..e87ac2bb83b --- /dev/null +++ b/cloud/cloud_statistics_impl.h @@ -0,0 +1,126 @@ +// Copyright (c) 2017-present, Cloudset, Inc. All rights reserved. +#pragma once + +#include "rocksdb/cloud/cloud_statistics.h" +#include "monitoring/statistics.h" + +namespace rocksdb { + +class CloudStatisticsImpl : public CloudStatistics { + public: + CloudStatisticsImpl(std::shared_ptr stats, + bool enable_internal_stats); + virtual ~CloudStatisticsImpl(); + + // Ticker access and manipulation + virtual uint64_t getTickerCount(uint32_t ticker_type) const override; + virtual uint64_t getAndResetTickerCount(uint32_t ticker_type) override; + virtual void recordTick(uint32_t ticker_type, uint64_t count) override; + virtual void setTickerCount(uint32_t ticker_type, uint64_t count) override; + + // Histogram access and manipulation + virtual bool HistEnabledForType(uint32_t type) const override; + std::string getHistogramString(uint32_t histogram_type) const override; + virtual void histogramData(uint32_t histogram_type, + HistogramData* const data) const override; + virtual void measureTime(uint32_t histogram_type, uint64_t value) override; + + virtual Status Reset() override; + virtual std::string ToString() const override; + +private: + StatisticsImpl stats_impl_; + + // Synchronizes anything that operates on other threads' thread-specific data + // such that operations like Reset() can be performed atomically. + mutable port::Mutex lock_; + + // Holds data maintained by each thread for implementing tickers. + struct ThreadTickerInfo { + std::atomic_uint_fast64_t value; + + // During teardown, value will be summed into *merged_sum. + std::atomic_uint_fast64_t* merged_sum; + + ThreadTickerInfo(uint_fast64_t _value, + std::atomic_uint_fast64_t* _merged_sum) + : value(_value), merged_sum(_merged_sum) {} + }; + + // Returns the info for this tickerType/thread. It sets a new info with zeroed + // counter if none exists. + ThreadTickerInfo* getThreadTickerInfo(uint32_t tickerIndex); + + // Holds data maintained by each thread for implementing histograms. + struct ThreadHistogramInfo { + HistogramImpl value; + + // During teardown, value will be merged into *merged_hist while holding + // *merge_lock, which also syncs with the merges necessary for reads. + HistogramImpl* merged_hist; + port::Mutex* merge_lock; + + ThreadHistogramInfo(HistogramImpl* _merged_hist, port::Mutex* _merge_lock) + : value(), merged_hist(_merged_hist), merge_lock(_merge_lock) {} + }; + + // Returns the info for this histogramType/thread. It sets a new histogram + // with zeroed data if none exists. + ThreadHistogramInfo* getThreadHistogramInfo(uint32_t tickerIndex); + + // Holds global data for implementing tickers. + struct TickerInfo { + TickerInfo() : thread_value(new ThreadLocalPtr(&mergeThreadValue)) {} + + // Sum of thread-specific values for tickers that have been reset due to + // thread termination or ThreadLocalPtr destruction. Also, this is used by + // setTickerCount() to conveniently change the global value by setting this + // while simultaneously zeroing all thread-local values. + std::atomic_uint_fast64_t merged_sum{0}; + + // Holds thread-specific pointer to ThreadTickerInfo + std::unique_ptr thread_value; + + static void mergeThreadValue(void* ptr) { + auto info_ptr = static_cast(ptr); + *info_ptr->merged_sum += info_ptr->value; + delete info_ptr; + } + }; + + // Holds global data for implementing histograms. + struct HistogramInfo { + HistogramInfo() : thread_value(new ThreadLocalPtr(&mergeThreadValue)) {} + + // Merged thread-specific values for histograms that have been reset due to + // thread termination or ThreadLocalPtr destruction. Note these must be + // destroyed after thread_value since its destructor accesses them. + mutable port::Mutex merge_lock{}; + HistogramImpl merged_hist{}; + + // Holds thread-specific pointer to ThreadHistogramInfo + std::unique_ptr thread_value; + + static void mergeThreadValue(void* ptr) { + auto info_ptr = static_cast(ptr); + { + MutexLock lock(info_ptr->merge_lock); + info_ptr->merged_hist->Merge(info_ptr->value); + } + delete info_ptr; + } + + // Returns a histogram that merges all histograms (thread-specific and + // previously merged ones). + std::unique_ptr getMergedHistogram() const; + }; + + TickerInfo tickers_[CLOUD_TICKER_ENUM_MAX - CLOUD_TICKER_ENUM_START - 1]; + HistogramInfo histograms_[CLOUD_HISTOGRAM_ENUM_MAX - CLOUD_HISTOGRAM_ENUM_START - 1]; +}; + +} + + + + diff --git a/cloud/examples/cloud_statistics_example.cc b/cloud/examples/cloud_statistics_example.cc new file mode 100644 index 00000000000..1e53e99ab53 --- /dev/null +++ b/cloud/examples/cloud_statistics_example.cc @@ -0,0 +1,125 @@ +// Copyright (c) 2017-present, Rockset, Inc. All rights reserved. +#include +#include +#include + +#include "rocksdb/cloud/db_cloud.h" +#include "rocksdb/options.h" + +using namespace rocksdb; + +// This is the local directory where the db is stored. +std::string kDBPath = "/tmp/rocksdb_cloud_durable"; + +// This is the name of the cloud storage bucket where the db +// is made durable. if you are using AWS, you have to manually +// ensure that this bucket name is unique to you and does not +// conflict with any other S3 users who might have already created +// this bucket name. +std::string kBucketSuffix = "cloud.durable.example."; + +int main() { + // cloud environment config options here + CloudEnvOptions cloud_env_options; + + // Store a reference to a cloud env. A new cloud env object should be + // associated + // with every new cloud-db. + std::unique_ptr cloud_env; + + // Retrieve aws access keys from env + char* keyid = getenv("AWS_ACCESS_KEY_ID"); + char* secret = getenv("AWS_SECRET_ACCESS_KEY"); + if (keyid == nullptr || secret == nullptr) { + fprintf( + stderr, + "Please set env variables " + "AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY with cloud credentials"); + return -1; + } + cloud_env_options.credentials.access_key_id.assign(keyid); + cloud_env_options.credentials.secret_key.assign(secret); + cloud_env_options.region = "us-west-2"; + + // Connect the cloud environment instrumentation with a stats receiver. + cloud_env_options.cloud_statistics = CreateCloudStatistics(); + + // Append the user name to the bucket name in an attempt to make it + // globally unique. S3 bucket-namess need to be groblly unique. + // If you want to rerun this example, then unique user-name suffix here. + char* user = getenv("USER"); + kBucketSuffix.append(user); + + // Create a new AWS cloud env Status + CloudEnv* cenv; + Status s = + CloudEnv::NewAwsEnv(Env::Default(), kBucketSuffix, kDBPath, kBucketSuffix, + kDBPath, cloud_env_options, nullptr, &cenv); + if (!s.ok()) { + fprintf(stderr, "Unable to create cloud env bucket suffix %s. %s\n", + kBucketSuffix.c_str(), s.ToString().c_str()); + return -1; + } + cloud_env.reset(cenv); + + // Create options and use the AWS env that we created earlier + Options options; + options.env = cloud_env.get(); + options.create_if_missing = true; + + // Connect the DB instrumentation with the cloud stats receiver. + // This allows the statistics collected in the cloud environment to be shown + // along with the database statistics in the logs during compaction operations. + options.statistics = cloud_env_options.cloud_statistics; + + // No persistent read-cache + std::string persistent_cache = ""; + + // open DB + DBCloud* db; + s = DBCloud::Open(options, kDBPath, persistent_cache, 0, &db); + if (!s.ok()) { + fprintf(stderr, "Unable to open db at path %s with bucket %s. %s\n", + kDBPath.c_str(), kBucketSuffix.c_str(), s.ToString().c_str()); + return -1; + } + + // Put key-value + s = db->Put(WriteOptions(), "key1", "value"); + assert(s.ok()); + std::string value; + // get value + s = db->Get(ReadOptions(), "key1", &value); + assert(s.ok()); + assert(value == "value"); + + // atomically apply a set of updates + { + WriteBatch batch; + batch.Delete("key1"); + batch.Put("key2", value); + s = db->Write(WriteOptions(), &batch); + } + + s = db->Get(ReadOptions(), "key1", &value); + assert(s.IsNotFound()); + + db->Get(ReadOptions(), "key2", &value); + assert(value == "value"); + + // print all values in the database + rocksdb::Iterator* it = db->NewIterator(rocksdb::ReadOptions()); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + std::cout << it->key().ToString() << ": " << it->value().ToString() + << std::endl; + } + delete it; + + // Flush all data from main db to sst files. Release db. + db->Flush(FlushOptions()); + delete db; + + fprintf(stdout, "Successfully used db at path %s bucket %s.\n", + kDBPath.c_str(), kBucketSuffix.c_str()); + return 0; +} diff --git a/include/rocksdb/cloud/cloud_env_options.h b/include/rocksdb/cloud/cloud_env_options.h index e3853be53f5..74c2d9b5020 100644 --- a/include/rocksdb/cloud/cloud_env_options.h +++ b/include/rocksdb/cloud/cloud_env_options.h @@ -36,9 +36,12 @@ enum class CloudRequestOpType { kCopyOp, kInfoOp }; + using CloudRequestCallback = std::function; +class CloudStatistics; + // // The cloud environment for rocksdb. It allows configuring the rocksdb // Environent used for the cloud. @@ -86,19 +89,24 @@ class CloudEnvOptions { // parameters: (op, size, latency in microseconds, is_success) std::shared_ptr cloud_request_callback; + // If non-null, then we should collect metrics about cloud environment operations + std::shared_ptr cloud_statistics; + CloudEnvOptions( CloudType _cloud_type = CloudType::kAws, bool _keep_local_sst_files = false, bool _keep_local_log_files = true, uint64_t _manifest_durable_periodicity_millis = 60 * 1000, uint64_t _purger_periodicity_millis = 10 * 60 * 1000, - std::shared_ptr _cloud_request_callback = nullptr) + std::shared_ptr _cloud_request_callback = nullptr, + std::shared_ptr _cloud_statistics = nullptr) : cloud_type(_cloud_type), keep_local_sst_files(_keep_local_sst_files), keep_local_log_files(_keep_local_log_files), manifest_durable_periodicity_millis( _manifest_durable_periodicity_millis), purger_periodicity_millis(_purger_periodicity_millis), - cloud_request_callback(_cloud_request_callback) { + cloud_request_callback(_cloud_request_callback), + cloud_statistics(_cloud_statistics) { assert(manifest_durable_periodicity_millis == 0 || keep_local_log_files == true); } diff --git a/include/rocksdb/cloud/cloud_statistics.h b/include/rocksdb/cloud/cloud_statistics.h new file mode 100644 index 00000000000..0e5bb1b47b7 --- /dev/null +++ b/include/rocksdb/cloud/cloud_statistics.h @@ -0,0 +1,47 @@ +// Copyright (c) 2017-present, Rockset, Inc. All rights reserved. +#pragma once + +#include "rocksdb/statistics.h" + +namespace rocksdb { + +/** + * Keep adding cloud tickers here. + * 1. Any ticker should be added between CLOUD_TICKER_ENUM_START and CLOUD_TICKER_ENUM_MAX. + * 2. Add a readable string in CloudTickersNameMap below for the newly added ticker. + */ +enum CloudTickers : uint32_t { + // # of times the manifest is written to the cloud + CLOUD_TICKER_ENUM_START = 1u << 31, + NUMBER_MANIFEST_WRITES, + CLOUD_TICKER_ENUM_MAX +}; + +const std::vector> CloudTickersNameMap = { + {NUMBER_MANIFEST_WRITES, "rocksdb.cloud.number.manifest.writes"} +}; + + +/** + * Keep adding cloud histograms here. + * 1. Any histogram should be added between CLOUD_HISTOGRAM_ENUM_START and CLOUD_HISTOGRAM_ENUM_MAX. + * 2. Add a readable string in CloudHistogramsNameMap below for the newly added histogram. + */ +enum CloudHistograms : uint32_t { + // histogram of # of milliseconds elapsed during manifest writes + CLOUD_HISTOGRAM_ENUM_START = 1u << 31, + MANIFEST_WRITES_TIME, + CLOUD_HISTOGRAM_ENUM_MAX +}; + +const std::vector> CloudHistogramsNameMap = { + {MANIFEST_WRITES_TIME, "rocksdb.cloud.manifest.writes.millis"} +}; + +class CloudStatistics : public Statistics { +}; + +// Create a concrete CloudStatistics object +std::shared_ptr CreateCloudStatistics(); + +} diff --git a/src.mk b/src.mk index d8097ffe779..a9d6346437d 100644 --- a/src.mk +++ b/src.mk @@ -197,6 +197,7 @@ LIB_SOURCES = \ cloud/db_cloud_impl.cc \ cloud/cloud_env.cc \ cloud/cloud_env_options.cc \ + cloud/cloud_statistics_impl.cc \ cloud/manifest.cc \ cloud/purge.cc \