From 533a1c60b077923c91bad6083b04829f5e83bdf5 Mon Sep 17 00:00:00 2001 From: Hieu Pham Date: Sun, 14 Feb 2021 04:33:02 -0800 Subject: [PATCH 1/4] Use a mock S3 client for unit tests --- cloud/aws/aws_file.h | 4 + cloud/aws/aws_s3.cc | 427 +++++++++++++++++++++- cloud/cloud_env.cc | 1 + cloud/db_cloud_test.cc | 4 +- db/db_test_util.cc | 2 +- include/rocksdb/cloud/cloud_env_options.h | 12 +- run_tests.sh | 2 +- 7 files changed, 433 insertions(+), 19 deletions(-) diff --git a/cloud/aws/aws_file.h b/cloud/aws/aws_file.h index 44562834eec..4ddba6f28a8 100644 --- a/cloud/aws/aws_file.h +++ b/cloud/aws/aws_file.h @@ -12,6 +12,10 @@ inline Aws::String ToAwsString(const std::string& s) { return Aws::String(s.data(), s.size()); } +inline std::string ToStdString(const Aws::String& s) { + return std::string(s.data(), s.size()); +} + } // namespace ROCKSDB_NAMESPACE #endif /* USE_AWS */ diff --git a/cloud/aws/aws_s3.cc b/cloud/aws/aws_s3.cc index d739c7c13dd..af2dd28c0a0 100644 --- a/cloud/aws/aws_s3.cc +++ b/cloud/aws/aws_s3.cc @@ -4,6 +4,21 @@ // A directory maps to an an zero-size object in an S3 bucket // A sst file maps to an object in that S3 bucket. // +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "env/io_posix.h" +#include "test_util/testharness.h" #ifdef USE_AWS #include #include @@ -105,7 +120,47 @@ void SetEncryptionParameters(const CloudEnvOptions& cloud_env_options, class AwsS3ClientWrapper { public: - AwsS3ClientWrapper( + virtual ~AwsS3ClientWrapper() = default; + + virtual Aws::S3::Model::ListObjectsOutcome ListCloudObjects( + const Aws::S3::Model::ListObjectsRequest& request) = 0; + + virtual Aws::S3::Model::CreateBucketOutcome CreateBucket( + const Aws::S3::Model::CreateBucketRequest& request) = 0; + + virtual Aws::S3::Model::HeadBucketOutcome HeadBucket( + const Aws::S3::Model::HeadBucketRequest& request) = 0; + + virtual Aws::S3::Model::DeleteObjectOutcome DeleteCloudObject( + const Aws::S3::Model::DeleteObjectRequest& request) = 0; + + virtual Aws::S3::Model::CopyObjectOutcome CopyCloudObject( + const Aws::S3::Model::CopyObjectRequest& request) = 0; + + virtual Aws::S3::Model::GetObjectOutcome GetCloudObject( + const Aws::S3::Model::GetObjectRequest& request) = 0; + + virtual std::shared_ptr DownloadFile( + const Aws::String& bucket_name, const Aws::String& object_path, + const Aws::String& destination) = 0; + + virtual Aws::S3::Model::PutObjectOutcome PutCloudObject( + const Aws::S3::Model::PutObjectRequest& request, + uint64_t size_hint = 0) = 0; + + virtual std::shared_ptr UploadFile( + const Aws::String& bucket_name, const Aws::String& object_path, + const Aws::String& destination, uint64_t file_size) = 0; + + virtual Aws::S3::Model::HeadObjectOutcome HeadObject( + const Aws::S3::Model::HeadObjectRequest& request) = 0; + + virtual bool HasTransferManager() const { return false; } +}; + +class AwsS3ClientWrapperImpl : public AwsS3ClientWrapper { + public: + AwsS3ClientWrapperImpl( const std::shared_ptr& creds, const Aws::Client::ClientConfiguration& config, const CloudEnvOptions& cloud_options) @@ -129,7 +184,7 @@ class AwsS3ClientWrapper { } Aws::S3::Model::ListObjectsOutcome ListCloudObjects( - const Aws::S3::Model::ListObjectsRequest& request) { + const Aws::S3::Model::ListObjectsRequest& request) override { CloudRequestCallbackGuard t(cloud_request_callback_.get(), CloudRequestOpType::kListOp); auto outcome = client_->ListObjects(request); @@ -138,7 +193,7 @@ class AwsS3ClientWrapper { } Aws::S3::Model::CreateBucketOutcome CreateBucket( - const Aws::S3::Model::CreateBucketRequest& request) { + const Aws::S3::Model::CreateBucketRequest& request) override { CloudRequestCallbackGuard t(cloud_request_callback_.get(), CloudRequestOpType::kCreateOp); auto outcome = client_->CreateBucket(request); @@ -147,15 +202,16 @@ class AwsS3ClientWrapper { } Aws::S3::Model::HeadBucketOutcome HeadBucket( - const Aws::S3::Model::HeadBucketRequest& request) { + const Aws::S3::Model::HeadBucketRequest& request) override { CloudRequestCallbackGuard t(cloud_request_callback_.get(), CloudRequestOpType::kInfoOp); auto outcome = client_->HeadBucket(request); t.SetSuccess(outcome.IsSuccess()); return outcome; } + Aws::S3::Model::DeleteObjectOutcome DeleteCloudObject( - const Aws::S3::Model::DeleteObjectRequest& request) { + const Aws::S3::Model::DeleteObjectRequest& request) override { CloudRequestCallbackGuard t(cloud_request_callback_.get(), CloudRequestOpType::kDeleteOp); auto outcome = client_->DeleteObject(request); @@ -164,7 +220,7 @@ class AwsS3ClientWrapper { } Aws::S3::Model::CopyObjectOutcome CopyCloudObject( - const Aws::S3::Model::CopyObjectRequest& request) { + const Aws::S3::Model::CopyObjectRequest& request) override { CloudRequestCallbackGuard t(cloud_request_callback_.get(), CloudRequestOpType::kCopyOp); auto outcome = client_->CopyObject(request); @@ -173,7 +229,7 @@ class AwsS3ClientWrapper { } Aws::S3::Model::GetObjectOutcome GetCloudObject( - const Aws::S3::Model::GetObjectRequest& request) { + const Aws::S3::Model::GetObjectRequest& request) override { CloudRequestCallbackGuard t(cloud_request_callback_.get(), CloudRequestOpType::kReadOp); auto outcome = client_->GetObject(request); @@ -183,9 +239,10 @@ class AwsS3ClientWrapper { } return outcome; } + std::shared_ptr DownloadFile( const Aws::String& bucket_name, const Aws::String& object_path, - const Aws::String& destination) { + const Aws::String& destination) override { CloudRequestCallbackGuard guard(cloud_request_callback_.get(), CloudRequestOpType::kReadOp); auto handle = @@ -202,7 +259,8 @@ class AwsS3ClientWrapper { } Aws::S3::Model::PutObjectOutcome PutCloudObject( - const Aws::S3::Model::PutObjectRequest& request, uint64_t size_hint = 0) { + const Aws::S3::Model::PutObjectRequest& request, + uint64_t size_hint = 0) override { CloudRequestCallbackGuard t(cloud_request_callback_.get(), CloudRequestOpType::kWriteOp, size_hint); auto outcome = client_->PutObject(request); @@ -212,7 +270,7 @@ class AwsS3ClientWrapper { std::shared_ptr UploadFile( const Aws::String& bucket_name, const Aws::String& object_path, - const Aws::String& destination, uint64_t file_size) { + const Aws::String& destination, uint64_t file_size) override { CloudRequestCallbackGuard guard(cloud_request_callback_.get(), CloudRequestOpType::kWriteOp, file_size); @@ -227,7 +285,7 @@ class AwsS3ClientWrapper { } Aws::S3::Model::HeadObjectOutcome HeadObject( - const Aws::S3::Model::HeadObjectRequest& request) { + const Aws::S3::Model::HeadObjectRequest& request) override { CloudRequestCallbackGuard t(cloud_request_callback_.get(), CloudRequestOpType::kInfoOp); auto outcome = client_->HeadObject(request); @@ -237,7 +295,10 @@ class AwsS3ClientWrapper { CloudRequestCallback* GetRequestCallback() { return cloud_request_callback_.get(); } - bool HasTransferManager() const { return transfer_manager_.get() != nullptr; } + + bool HasTransferManager() const override { + return transfer_manager_.get() != nullptr; + } private: static Aws::Utils::Threading::Executor* GetAwsTransferManagerExecutor() { @@ -250,6 +311,339 @@ class AwsS3ClientWrapper { std::shared_ptr cloud_request_callback_; }; +namespace test { +// An wrapper that mimicks S3 operations in local storage. Used for test. +class TestS3ClientWrapper : public AwsS3ClientWrapper { + public: + TestS3ClientWrapper(Env* base_env) : base_env_(base_env) { + root_dir_ = test::TmpDir(base_env_); + base_env_->CreateDirIfMissing(root_dir_); + } + + ~TestS3ClientWrapper() { base_env_->DeleteDir(root_dir_); } + + Aws::S3::Model::ListObjectsOutcome ListCloudObjects( + const Aws::S3::Model::ListObjectsRequest& request) override { + using namespace Aws::S3::Model; + auto bucket = ToStdString(request.GetBucket()); + auto prefix = ToStdString(request.GetPrefix()); + auto dir = getLocalPath(request.GetBucket(), request.GetPrefix()); + bool is_dir{false}; + { + Status st = base_env_->IsDirectory(dir, &is_dir); + if (!st.ok() || !is_dir) { + return ListObjectsOutcome(ListObjectsResult()); + } + } + std::vector children; + base_env_->GetChildren(dir, &children); + + ListObjectsResult res; + Aws::Vector contents; + for (const auto& c : children) { + if (c == "." || c == "..") { + continue; + } + Object o; + std::string x = prefix + c; + o.SetKey(ToAwsString(x)); + contents.emplace_back(std::move(o)); + } + res.SetContents(contents); + + ListObjectsOutcome outcome(res); + return outcome; + } + + virtual Aws::S3::Model::CreateBucketOutcome CreateBucket( + const Aws::S3::Model::CreateBucketRequest& request) override { + using namespace Aws::S3::Model; + auto bucket = ToStdString(request.GetBucket()); + base_env_->CreateDirIfMissing(root_dir_ + "/" + bucket); + CreateBucketResult res; + CreateBucketOutcome outcome(res); + return outcome; + } + + virtual Aws::S3::Model::HeadBucketOutcome HeadBucket( + const Aws::S3::Model::HeadBucketRequest& /* request */) override { + using namespace Aws::S3::Model; + HeadBucketOutcome outcome; + return outcome; + } + + virtual Aws::S3::Model::DeleteObjectOutcome DeleteCloudObject( + const Aws::S3::Model::DeleteObjectRequest& request) override { + using namespace Aws::S3::Model; + auto path = getLocalPath(request.GetBucket(), request.GetKey()); + auto status = base_env_->DeleteFile(path); + DeleteObjectResult res; + DeleteObjectOutcome outcome; + if (status.IsNotFound()) { + Aws::Client::AWSError error( + Aws::S3::S3Errors::RESOURCE_NOT_FOUND, false); + outcome = DeleteObjectOutcome(error); + } + + return outcome; + } + + virtual Aws::S3::Model::CopyObjectOutcome CopyCloudObject( + const Aws::S3::Model::CopyObjectRequest& request) override { + using namespace Aws::S3::Model; + auto from = getLocalPath(request.GetCopySource(), ""); + auto to = getLocalPath(request.GetBucket(), request.GetKey()); + + auto parent_path = getParentPath(to); + createDirRecursively(parent_path); + + std::unique_ptr toFile; + base_env_->NewWritableFile(to, &toFile, EnvOptions()); + + std::unique_ptr fromFile; + base_env_->NewSequentialFile(from, &fromFile, EnvOptions()); + + uint64_t file_size{0}; + base_env_->GetFileSize(from, &file_size); + + std::vector scratch; + scratch.reserve(file_size); + Slice buffer; + fromFile->Read(5000, &buffer, scratch.data()); + toFile->Append(buffer); + + CopyObjectResult res; + return CopyObjectOutcome(res); + } + + virtual Aws::S3::Model::GetObjectOutcome GetCloudObject( + const Aws::S3::Model::GetObjectRequest& request) override { + using namespace Aws::S3::Model; + auto from = getLocalPath(request.GetBucket(), request.GetKey()); + auto parent_path = getParentPath(from); + createDirRecursively(parent_path); + + GetObjectResult res; + + uint64_t file_size = 0; + Status status = base_env_->GetFileSize(from, &file_size); + if (!status.ok()) { + Aws::Client::AWSError error( + Aws::S3::S3Errors::RESOURCE_NOT_FOUND, false); + return GetObjectOutcome(error); + } + + res.SetContentLength(file_size); + + std::unique_ptr fromFile; + base_env_->NewSequentialFile(from, &fromFile, EnvOptions()); + std::vector scratch; + scratch.reserve(file_size); + Slice buffer; + fromFile->Read(file_size, &buffer, scratch.data()); + + if (request.RangeHasBeenSet()) { + auto range = request.GetRange(); + int start = 0; + int end = 0; + int matches = std::sscanf(range.c_str(), "bytes=%d-%d", &start, &end); + (void)matches; + assert(matches == 2); + + buffer.remove_suffix(buffer.size() - end - 1); + buffer.remove_prefix(start); + } + + const auto& responseStreamFactory = request.GetResponseStreamFactory(); + std::unique_ptr ss(new Aws::StringStream()); + if (responseStreamFactory) { + Aws::Utils::Stream::ResponseStream responseStream(responseStreamFactory); + responseStream.GetUnderlyingStream().write(buffer.data(), buffer.size()); + } + + (*ss) << buffer.ToString(); + res.ReplaceBody(ss.release()); + + return GetObjectOutcome(std::move(res)); + } + + virtual std::shared_ptr DownloadFile( + const Aws::String&, const Aws::String&, const Aws::String&) override { + return nullptr; + } + + virtual Aws::S3::Model::PutObjectOutcome PutCloudObject( + const Aws::S3::Model::PutObjectRequest& request, + uint64_t /* size_hint */) override { + using namespace Aws::S3::Model; + Aws::StringStream ss; + auto stream = request.GetBody(); + if (stream) { + ss << stream->rdbuf(); + } + std::string buffer = ToStdString(ss.str()); + { + auto to = getLocalPath(request.GetBucket(), request.GetKey()); + + auto parent_path = getParentPath(to); + createDirRecursively(parent_path); + + std::unique_ptr f; + base_env_->NewWritableFile(to, &f, EnvOptions()); + + if (!buffer.empty()) { + f->Append(buffer); + } + + f->Sync(); + } + + if (request.MetadataHasBeenSet()) { + auto to = getMetadataLocalPath(request.GetBucket(), request.GetKey()); + auto parent_path = getParentPath(to); + createDirRecursively(parent_path); + + std::unique_ptr f; + base_env_->NewWritableFile(to, &f, EnvOptions()); + + std::string metadata = buildMetadata(request.GetMetadata()); + f->Append(metadata); + } + + PutObjectResult res; + return PutObjectOutcome(res); + } + + virtual std::shared_ptr UploadFile( + const Aws::String&, const Aws::String&, const Aws::String&, + uint64_t) override { + return nullptr; + } + + virtual Aws::S3::Model::HeadObjectOutcome HeadObject( + const Aws::S3::Model::HeadObjectRequest& request) override { + using namespace Aws::S3::Model; + auto path = getLocalPath(request.GetBucket(), request.GetKey()); + + if (!base_env_->FileExists(path).ok()) { + Aws::Client::AWSError error( + Aws::S3::S3Errors::RESOURCE_NOT_FOUND, false); + return HeadObjectOutcome(error); + } + + uint64_t file_size; + uint64_t file_mtime; + base_env_->GetFileSize(path, &file_size); + base_env_->GetFileModificationTime(path, &file_mtime); + + HeadObjectResult res; + res.SetContentLength(file_size); + + Aws::Utils::DateTime lastModified((int64_t(file_mtime))); + res.SetLastModified(lastModified); + res.SetETag(ToAwsString(std::to_string(std::hash{}(path)))); + + auto metadataPath = + getMetadataLocalPath(request.GetBucket(), request.GetKey()); + if (base_env_->FileExists(metadataPath).ok()) { + std::ifstream metadataFile(metadataPath); + std::string line; + Aws::String key; + Aws::String value; + Aws::Map meta; + int i = 0; + if (metadataFile.is_open()) { + while (getline(metadataFile, line)) { + if (i % 2 == 0) { + key = ToAwsString(line); + } else { + value = ToAwsString(line); + meta[key] = value; + } + i++; + } + + res.SetMetadata(meta); + } + } + + return HeadObjectOutcome(res); + } + + bool HasTransferManager() const override { + // Don't support transfer manager for now. + return false; + } + + private: + std::string getLocalPathStr(const std::string& bucket, + const std::string& prefix) { + if (prefix.empty()) { + return root_dir_ + "/" + bucket; + } + return root_dir_ + "/" + bucket + "/" + prefix; + } + + std::string getMetadataLocalPathStr(const std::string& bucket, + const std::string& prefix) { + return root_dir_ + "/.metadata/" + bucket + "/" + prefix; + } + + std::string getLocalPath(const Aws::String& bucket, + const Aws::String& prefix) { + return getLocalPathStr(ToStdString(bucket), ToStdString(prefix)); + } + + std::string getMetadataLocalPath(const Aws::String& bucket, + const Aws::String& prefix) { + return getMetadataLocalPathStr(ToStdString(bucket), ToStdString(prefix)); + } + + std::string buildMetadata( + const Aws::Map& metadata) { + Aws::StringStream ss; + for (const auto& kv : metadata) { + ss << kv.first << std::endl << kv.second << std::endl; + } + return ToStdString(ss.str()); + } + + void destroyDir(const std::string& dir) { + std::string cmd = "rm -rf " + dir; + int rc = system(cmd.c_str()); + (void)rc; + } + + void createDirRecursively(const std::string& dir) { + std::string cmd = "mkdir -p " + dir; + int rc = system(cmd.c_str()); + (void)rc; + } + + std::string getParentPath(const std::string& path) { + auto parts = StringSplit(path, '/'); + if (parts.empty()) { + return ""; + } + + std::string res = "/"; + size_t num_parts = parts.size(); + for (size_t i = 0; i < num_parts - 1; i++) { + if (parts[i].empty()) { + continue; + } + res += parts[i]; + res += "/"; + } + + return res; + } + + std::string root_dir_; + Env* base_env_; +}; +} // namespace test + static bool IsNotFound(const Aws::S3::S3Errors& s3err) { return (s3err == Aws::S3::S3Errors::NO_SUCH_BUCKET || s3err == Aws::S3::S3Errors::NO_SUCH_KEY || @@ -464,8 +858,13 @@ Status S3StorageProvider::Initialize(CloudEnv* env) { } else { Header(env->info_log_, "S3 connection to endpoint in region: %s", config.region.c_str()); - s3client_ = - std::make_shared(creds, config, cloud_opts); + if (!cloud_opts.initialize_test_client) { + s3client_ = + std::make_shared(creds, config, cloud_opts); + } else { + s3client_ = + std::make_shared(env->GetBaseEnv()); + } } } return status; diff --git a/cloud/cloud_env.cc b/cloud/cloud_env.cc index 5d016a13d55..70e60d3d738 100644 --- a/cloud/cloud_env.cc +++ b/cloud/cloud_env.cc @@ -38,6 +38,7 @@ bool CloudEnvOptions::GetNameFromEnvironment(const char* name, const char* alt, void CloudEnvOptions::TEST_Initialize(const std::string& bucket, const std::string& object, const std::string& region) { + initialize_test_client = true; src_bucket.TEST_Initialize(bucket, object, region); dest_bucket = src_bucket; credentials.TEST_Initialize(); diff --git a/cloud/db_cloud_test.cc b/cloud/db_cloud_test.cc index 18cc26102c5..85bff810353 100644 --- a/cloud/db_cloud_test.cc +++ b/cloud/db_cloud_test.cc @@ -33,8 +33,10 @@ namespace ROCKSDB_NAMESPACE { class CloudTest : public testing::Test { public: CloudTest() { + auto test_name = + ::testing::UnitTest::GetInstance()->current_test_info()->name(); Random64 rng(time(nullptr)); - test_id_ = std::to_string(rng.Next()); + test_id_ = std::to_string(rng.Next()) + test_name; fprintf(stderr, "Test ID: %s\n", test_id_.c_str()); base_env_ = Env::Default(); diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 4e1799949c9..97b52c88eb4 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -134,7 +134,6 @@ DBTestBase::~DBTestBase() { } else { EXPECT_OK(DestroyDB(dbname_, options)); } - delete env_; #ifdef USE_AWS auto aenv = dynamic_cast(s3_env_); @@ -142,6 +141,7 @@ DBTestBase::~DBTestBase() { aenv->GetSrcBucketName(), aenv->GetSrcObjectPath()); #endif delete s3_env_; + delete env_; } bool DBTestBase::ShouldSkipOptions(int option_config, int skip_mask) { diff --git a/include/rocksdb/cloud/cloud_env_options.h b/include/rocksdb/cloud/cloud_env_options.h index 2106e7b4b9c..b3fe598e9f7 100644 --- a/include/rocksdb/cloud/cloud_env_options.h +++ b/include/rocksdb/cloud/cloud_env_options.h @@ -303,6 +303,12 @@ class CloudEnvOptions { // Default: false. bool skip_cloud_files_in_getchildren; + // Instead of creating actual client, create a test client instead. Used for + // test. + // + // Default: false. + bool initialize_test_client; + CloudEnvOptions( CloudType _cloud_type = CloudType::kCloudAws, LogType _log_type = LogType::kLogKafka, @@ -317,7 +323,8 @@ class CloudEnvOptions { bool _use_aws_transfer_manager = false, int _number_objects_listed_in_one_iteration = 5000, int _constant_sst_file_size_in_sst_file_manager = -1, - bool _skip_cloud_files_in_getchildren = false) + bool _skip_cloud_files_in_getchildren = false, + bool _initialize_test_client = false) : cloud_type(_cloud_type), log_type(_log_type), keep_local_sst_files(_keep_local_sst_files), @@ -337,7 +344,8 @@ class CloudEnvOptions { _number_objects_listed_in_one_iteration), constant_sst_file_size_in_sst_file_manager( _constant_sst_file_size_in_sst_file_manager), - skip_cloud_files_in_getchildren(_skip_cloud_files_in_getchildren) {} + skip_cloud_files_in_getchildren(_skip_cloud_files_in_getchildren), + initialize_test_client(_initialize_test_client) {} // print out all options to the log void Dump(Logger* log) const; diff --git a/run_tests.sh b/run_tests.sh index 6c448faf95d..a2563d5a06a 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -70,7 +70,7 @@ docker run -v $SRC_ROOT:/opt/rocksdb-cloud/src -w /opt/rocksdb-cloud/src \ -u $UID -e V=1 -e USE_AWS=1 -e USE_KAFKA=1 \ -e AWS_ACCESS_KEY_ID -e AWS_SECRET_ACCESS_KEY \ --rm rockset/rocksdb_cloud_runtime:test \ - /bin/bash -c "make -j4 db_test db_test2 db_basic_test env_basic_test db_cloud_test cloud_manifest_test" + /bin/bash -c "make -j $PARALLEL_JOBS db_test db_test2 db_basic_test env_basic_test db_cloud_test cloud_manifest_test" echo "Running db_test. This test might take a while. Get some coffee :)" docker run -v $SRC_ROOT:/opt/rocksdb-cloud/src -w /opt/rocksdb-cloud/src \ From 9e6aea7220236cbbabd6f400709e5c145cf0401e Mon Sep 17 00:00:00 2001 From: Hieu Pham Date: Sun, 14 Feb 2021 04:37:51 -0800 Subject: [PATCH 2/4] Fix run_tests.sh --- run_tests.sh | 42 +++++++++++++++++------------------------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/run_tests.sh b/run_tests.sh index a2563d5a06a..7fb5fe84b3a 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -50,45 +50,37 @@ echo "Running with $PARALLEL_JOBS parallel jobs" echo "Pulling base image..." docker pull rockset/rocksdb_cloud_runtime:test -echo "Checking AWS access keys" -if [[ -z ${AWS_ACCESS_KEY_ID+x} ]]; then - AWS_ACCESS_KEY_ID=$(aws configure get aws_access_key_id) || die_error "AWS access key ID not found" -fi -if [[ -z ${AWS_SECRET_ACCESS_KEY+x} ]]; then - AWS_SECRET_ACCESS_KEY=$(aws configure get aws_secret_access_key) || die_error "AWS secret access key not found" -fi - -export AWS_ACCESS_KEY_ID -export AWS_SECRET_ACCESS_KEY - +# echo "Checking AWS access keys" +# if [[ -z ${AWS_ACCESS_KEY_ID+x} ]]; then +# AWS_ACCESS_KEY_ID=$(aws configure get aws_access_key_id) || die_error "AWS access key ID not found" +# fi +# if [[ -z ${AWS_SECRET_ACCESS_KEY+x} ]]; then +# AWS_SECRET_ACCESS_KEY=$(aws configure get aws_secret_access_key) || die_error "AWS secret access key not found" +# fi +# export AWS_ACCESS_KEY_ID +# export AWS_SECRET_ACCESS_KEY export SRC_ROOT=$(git rev-parse --show-toplevel) -echo $UID - echo "Building tests..." docker run -v $SRC_ROOT:/opt/rocksdb-cloud/src -w /opt/rocksdb-cloud/src \ -u $UID -e V=1 -e USE_AWS=1 -e USE_KAFKA=1 \ - -e AWS_ACCESS_KEY_ID -e AWS_SECRET_ACCESS_KEY \ --rm rockset/rocksdb_cloud_runtime:test \ /bin/bash -c "make -j $PARALLEL_JOBS db_test db_test2 db_basic_test env_basic_test db_cloud_test cloud_manifest_test" -echo "Running db_test. This test might take a while. Get some coffee :)" -docker run -v $SRC_ROOT:/opt/rocksdb-cloud/src -w /opt/rocksdb-cloud/src \ - -u $UID -e V=1 -e USE_AWS=1 -e USE_KAFKA=1 \ - -e AWS_ACCESS_KEY_ID -e AWS_SECRET_ACCESS_KEY \ - --rm rockset/rocksdb_cloud_runtime:test \ - /bin/bash -c "./db_test" - echo "Running db_test2, db_basic_test and env_basic_test" -docker run -v $SRC_ROOT:/opt/rocksdb-cloud/src -w /opt/rocksdb-cloud/src \ +docker run --network none -v $SRC_ROOT:/opt/rocksdb-cloud/src -w /opt/rocksdb-cloud/src \ -u $UID -e V=1 -e USE_AWS=1 -e USE_KAFKA=1 \ - -e AWS_ACCESS_KEY_ID -e AWS_SECRET_ACCESS_KEY \ --rm rockset/rocksdb_cloud_runtime:test \ /bin/bash -c "./db_test2 && ./db_basic_test && ./env_basic_test" echo "Running cloud tests..." -docker run -v $SRC_ROOT:/opt/rocksdb-cloud/src -w /opt/rocksdb-cloud/src \ +docker run --network none -v $SRC_ROOT:/opt/rocksdb-cloud/src -w /opt/rocksdb-cloud/src \ -u $UID -e V=1 -e USE_AWS=1 -e USE_KAFKA=1 \ - -e AWS_ACCESS_KEY_ID -e AWS_SECRET_ACCESS_KEY \ --rm rockset/rocksdb_cloud_runtime:test \ /bin/bash -c "./cloud_manifest_test && ./db_cloud_test --gtest_filter=-CloudTest.KeepLocalLogKafka" + +echo "Running db_test. This test might take a while. Get some coffee :)" +docker run --network none -v $SRC_ROOT:/opt/rocksdb-cloud/src -w /opt/rocksdb-cloud/src \ + -u $UID -e V=1 -e USE_AWS=1 -e USE_KAFKA=1 \ + --rm rockset/rocksdb_cloud_runtime:test \ + /bin/bash -c "./db_test --gtest_filter=-*MultiThreadedDBTest*" From 7dae9ae7532dbaf7dc841a08f37bf618355652b5 Mon Sep 17 00:00:00 2001 From: Hieu Pham Date: Sun, 14 Feb 2021 05:07:59 -0800 Subject: [PATCH 3/4] Remove AWS from run_tests.sh --- cloud/aws/aws_s3.cc | 28 +++++++++++++--------------- run_tests.sh | 9 --------- 2 files changed, 13 insertions(+), 24 deletions(-) diff --git a/cloud/aws/aws_s3.cc b/cloud/aws/aws_s3.cc index af2dd28c0a0..21ca533a7e7 100644 --- a/cloud/aws/aws_s3.cc +++ b/cloud/aws/aws_s3.cc @@ -4,26 +4,16 @@ // A directory maps to an an zero-size object in an S3 bucket // A sst file maps to an object in that S3 bucket. // -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -#include "env/io_posix.h" -#include "test_util/testharness.h" #ifdef USE_AWS #include +#include #include #include #include +#include +#include +#include +#include #include #include #include @@ -52,17 +42,25 @@ #include #include +#include #include +#include #include +#include +#include +#include +#include #include "cloud/aws/aws_env.h" #include "cloud/aws/aws_file.h" #include "cloud/cloud_storage_provider_impl.h" #include "cloud/filename.h" +#include "env/io_posix.h" #include "port/port.h" #include "rocksdb/cloud/cloud_env_options.h" #include "rocksdb/cloud/cloud_storage_provider.h" #include "rocksdb/options.h" +#include "test_util/testharness.h" #include "util/stderr_logger.h" #include "util/string_util.h" diff --git a/run_tests.sh b/run_tests.sh index 7fb5fe84b3a..a6a67d84590 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -50,15 +50,6 @@ echo "Running with $PARALLEL_JOBS parallel jobs" echo "Pulling base image..." docker pull rockset/rocksdb_cloud_runtime:test -# echo "Checking AWS access keys" -# if [[ -z ${AWS_ACCESS_KEY_ID+x} ]]; then -# AWS_ACCESS_KEY_ID=$(aws configure get aws_access_key_id) || die_error "AWS access key ID not found" -# fi -# if [[ -z ${AWS_SECRET_ACCESS_KEY+x} ]]; then -# AWS_SECRET_ACCESS_KEY=$(aws configure get aws_secret_access_key) || die_error "AWS secret access key not found" -# fi -# export AWS_ACCESS_KEY_ID -# export AWS_SECRET_ACCESS_KEY export SRC_ROOT=$(git rev-parse --show-toplevel) echo "Building tests..." From f36fd1a51fe9e27124202de522f3862253cb5290 Mon Sep 17 00:00:00 2001 From: Hieu Pham Date: Sun, 14 Feb 2021 05:23:25 -0800 Subject: [PATCH 4/4] Add more comments --- cloud/aws/aws_s3.cc | 41 ++++++++++++++++++++++++++++++++++------- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/cloud/aws/aws_s3.cc b/cloud/aws/aws_s3.cc index 21ca533a7e7..50ef1e3e99d 100644 --- a/cloud/aws/aws_s3.cc +++ b/cloud/aws/aws_s3.cc @@ -311,6 +311,8 @@ class AwsS3ClientWrapperImpl : public AwsS3ClientWrapper { namespace test { // An wrapper that mimicks S3 operations in local storage. Used for test. +// There are a lot of string copies between std::string and Aws::String +// unfortunately. However, since it's used for tests only, it's probably fine. class TestS3ClientWrapper : public AwsS3ClientWrapper { public: TestS3ClientWrapper(Env* base_env) : base_env_(base_env) { @@ -326,6 +328,8 @@ class TestS3ClientWrapper : public AwsS3ClientWrapper { auto bucket = ToStdString(request.GetBucket()); auto prefix = ToStdString(request.GetPrefix()); auto dir = getLocalPath(request.GetBucket(), request.GetPrefix()); + + // If this directory is not found, just ignore and return nothing. bool is_dir{false}; { Status st = base_env_->IsDirectory(dir, &is_dir); @@ -333,21 +337,23 @@ class TestS3ClientWrapper : public AwsS3ClientWrapper { return ListObjectsOutcome(ListObjectsResult()); } } + std::vector children; base_env_->GetChildren(dir, &children); ListObjectsResult res; Aws::Vector contents; for (const auto& c : children) { + // Ignore these 2 special directories. if (c == "." || c == "..") { continue; } - Object o; std::string x = prefix + c; + Object o; o.SetKey(ToAwsString(x)); contents.emplace_back(std::move(o)); } - res.SetContents(contents); + res.SetContents(std::move(contents)); ListObjectsOutcome outcome(res); return outcome; @@ -407,7 +413,7 @@ class TestS3ClientWrapper : public AwsS3ClientWrapper { std::vector scratch; scratch.reserve(file_size); Slice buffer; - fromFile->Read(5000, &buffer, scratch.data()); + fromFile->Read(file_size, &buffer, scratch.data()); toFile->Append(buffer); CopyObjectResult res; @@ -441,6 +447,8 @@ class TestS3ClientWrapper : public AwsS3ClientWrapper { fromFile->Read(file_size, &buffer, scratch.data()); if (request.RangeHasBeenSet()) { + // Range is a string in this format: "bytes=start-end". + // start and end are inclusive. auto range = request.GetRange(); int start = 0; int end = 0; @@ -467,6 +475,7 @@ class TestS3ClientWrapper : public AwsS3ClientWrapper { virtual std::shared_ptr DownloadFile( const Aws::String&, const Aws::String&, const Aws::String&) override { + // Don't support AWS Transfer Manager for now. return nullptr; } @@ -479,10 +488,11 @@ class TestS3ClientWrapper : public AwsS3ClientWrapper { if (stream) { ss << stream->rdbuf(); } - std::string buffer = ToStdString(ss.str()); + + // First, create the actual file. { + std::string buffer = ToStdString(ss.str()); auto to = getLocalPath(request.GetBucket(), request.GetKey()); - auto parent_path = getParentPath(to); createDirRecursively(parent_path); @@ -493,9 +503,12 @@ class TestS3ClientWrapper : public AwsS3ClientWrapper { f->Append(buffer); } + // Even if this is an empty file, we still want to create it. f->Sync(); } + // If metadata is set on this file, create the metadata file. + // The metadata is stored in a separate directory. if (request.MetadataHasBeenSet()) { auto to = getMetadataLocalPath(request.GetBucket(), request.GetKey()); auto parent_path = getParentPath(to); @@ -504,7 +517,7 @@ class TestS3ClientWrapper : public AwsS3ClientWrapper { std::unique_ptr f; base_env_->NewWritableFile(to, &f, EnvOptions()); - std::string metadata = buildMetadata(request.GetMetadata()); + std::string metadata = serializeMetadata(request.GetMetadata()); f->Append(metadata); } @@ -515,6 +528,7 @@ class TestS3ClientWrapper : public AwsS3ClientWrapper { virtual std::shared_ptr UploadFile( const Aws::String&, const Aws::String&, const Aws::String&, uint64_t) override { + // Don't support AWS Transfer Manager for now. return nullptr; } @@ -544,6 +558,13 @@ class TestS3ClientWrapper : public AwsS3ClientWrapper { auto metadataPath = getMetadataLocalPath(request.GetBucket(), request.GetKey()); if (base_env_->FileExists(metadataPath).ok()) { + // Metadata is stored in the following format + // ``` + // key1 + // value1 + // key2 + // value2 + // ``` std::ifstream metadataFile(metadataPath); std::string line; Aws::String key; @@ -587,17 +608,20 @@ class TestS3ClientWrapper : public AwsS3ClientWrapper { return root_dir_ + "/.metadata/" + bucket + "/" + prefix; } + // Given an S3 location, return actual directory on local disk. std::string getLocalPath(const Aws::String& bucket, const Aws::String& prefix) { return getLocalPathStr(ToStdString(bucket), ToStdString(prefix)); } + // Given an S3 location, return the metadata directory on local disk. std::string getMetadataLocalPath(const Aws::String& bucket, const Aws::String& prefix) { return getMetadataLocalPathStr(ToStdString(bucket), ToStdString(prefix)); } - std::string buildMetadata( + // Serialize metadata information in order to store on disk. + std::string serializeMetadata( const Aws::Map& metadata) { Aws::StringStream ss; for (const auto& kv : metadata) { @@ -606,18 +630,21 @@ class TestS3ClientWrapper : public AwsS3ClientWrapper { return ToStdString(ss.str()); } + // Recurisvely delete the directory. void destroyDir(const std::string& dir) { std::string cmd = "rm -rf " + dir; int rc = system(cmd.c_str()); (void)rc; } + // Recursively create the directory and any missing component in the path. void createDirRecursively(const std::string& dir) { std::string cmd = "mkdir -p " + dir; int rc = system(cmd.c_str()); (void)rc; } + // Given /a/b/c, return /a/b/ std::string getParentPath(const std::string& path) { auto parts = StringSplit(path, '/'); if (parts.empty()) {