Skip to content

Commit

Permalink
Add method to extract live file list from local manifest
Browse files Browse the repository at this point in the history
  • Loading branch information
aanq committed Sep 25, 2023
1 parent 587e968 commit 504e55d
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 40 deletions.
43 changes: 34 additions & 9 deletions cloud/cloud_file_system_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2002,7 +2002,6 @@ IOStatus CloudFileSystemImpl::UploadCloudManifest(
return st;
}


IOStatus CloudFileSystemImpl::ApplyCloudManifestDelta(
const CloudManifestDelta& delta, bool* delta_applied) {
*delta_applied = cloud_manifest_->AddEpoch(delta.file_num, delta.epoch);
Expand Down Expand Up @@ -2271,6 +2270,19 @@ Status CloudFileSystemImpl::CheckValidity() const {
}
}

void CloudFileSystemImpl::RemapFileNumbers(
const std::set<uint64_t>& file_numbers,
std::vector<std::string>* sst_file_names) {
sst_file_names->resize(file_numbers.size());

size_t idx = 0;
for (auto num : file_numbers) {
std::string logical_path = MakeTableFileName("" /* path */, num);
(*sst_file_names)[idx] = RemapFilename(logical_path);
idx++;
}
}

IOStatus CloudFileSystemImpl::FindAllLiveFiles(
const std::string& local_dbname, std::vector<std::string>* live_sst_files,
std::string* manifest_file) {
Expand All @@ -2282,18 +2294,29 @@ IOStatus CloudFileSystemImpl::FindAllLiveFiles(
return st;
}

live_sst_files->resize(file_nums.size());

// filename will be remapped correctly based on current_epoch of
// cloud_manifest
*manifest_file =
RemapFilename(ManifestFileWithEpoch("" /* dbname */, "" /* epoch */));
size_t idx = 0;
for (auto num : file_nums) {
std::string logical_path = MakeTableFileName("" /* path */, num);
(*live_sst_files)[idx] = RemapFilename(logical_path);
idx++;

RemapFileNumbers(file_nums, live_sst_files);

return IOStatus::OK();
}

IOStatus CloudFileSystemImpl::ReadManifestLiveFiles(
const std::string& manifest_file,
std::vector<std::string>* live_sst_files) {
std::unique_ptr<LocalManifestReader> extractor(
new LocalManifestReader(info_log_, this));
std::set<uint64_t> file_nums;
auto st = extractor->GetManifestLiveFiles(manifest_file, &file_nums);
if (!st.ok()) {
return st;
}

RemapFileNumbers(file_nums, live_sst_files);

return IOStatus::OK();
}

Expand All @@ -2307,7 +2330,9 @@ void CloudFileSystemImpl::TEST_InitEmptyCloudManifest() {
}

size_t CloudFileSystemImpl::TEST_NumScheduledJobs() const {
return cloud_file_deletion_scheduler_ ? cloud_file_deletion_scheduler_->TEST_NumScheduledJobs() : 0;
return cloud_file_deletion_scheduler_
? cloud_file_deletion_scheduler_->TEST_NumScheduledJobs()
: 0;
}

#endif
Expand Down
18 changes: 14 additions & 4 deletions cloud/cloud_file_system_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <condition_variable>
#include <mutex>
#include <thread>
#include <set>

#include "cloud/cloud_manifest.h"
#include "port/port_posix.h"
Expand Down Expand Up @@ -151,13 +152,17 @@ class CloudFileSystemImpl : public CloudFileSystem {

// Find all live files based on cloud_manifest_ and local MANIFEST FILE
// If local MANIFEST file doesn't exist, it will pull from cloud
//
//
// REQUIRES: cloud_manifest_ is loaded
// REQUIRES: cloud_manifest_ is not updated when calling this function
IOStatus FindAllLiveFiles(const std::string& local_dbname,
std::vector<std::string>* live_sst_files,
std::string* manifest_file) override;

IOStatus ReadManifestLiveFiles(
const std::string& manifest_file,
std::vector<std::string>* live_sst_files) override;

IOStatus extractParents(const std::string& bucket_name_prefix,
const DbidList& dbid_list, DbidParents* parents);
IOStatus PreloadCloudManifest(const std::string& local_dbname) override;
Expand Down Expand Up @@ -256,8 +261,7 @@ class CloudFileSystemImpl : public CloudFileSystem {
const CloudManifestDelta& delta) const override;

IOStatus GetMaxFileNumberFromCurrentManifest(
const std::string& local_dbname,
uint64_t* max_file_number) override;
const std::string& local_dbname, uint64_t* max_file_number) override;

// Upload MANIFEST-epoch to the cloud
IOStatus UploadManifest(const std::string& local_dbname,
Expand Down Expand Up @@ -374,10 +378,15 @@ class CloudFileSystemImpl : public CloudFileSystem {
// 00010.sst-[epochX], but the real mapping for 00010.sst is [epochY], the
// file will be treated as invisible
bool IsFileInvisible(const std::vector<std::string>& active_cookies,
const std::string& fname) const;
const std::string& fname) const;

void log(InfoLogLevel level, const std::string& fname,
const std::string& msg);

// Remap SST file numbers to file names
void RemapFileNumbers(const std::set<uint64_t>& file_numbers,
std::vector<std::string>* sst_file_names);

// Fetch the cloud manifest based on the cookie
IOStatus FetchCloudManifest(const std::string& local_dbname,
const std::string& cookie);
Expand All @@ -386,6 +395,7 @@ class CloudFileSystemImpl : public CloudFileSystem {
IOStatus FetchManifest(const std::string& local_dbname,
const std::string& epoch);
std::string GenerateNewEpochId();

std::unique_ptr<CloudManifest> cloud_manifest_;
// This runs only in tests when we want to disable cloud manifest
// functionality
Expand Down
4 changes: 4 additions & 0 deletions cloud/cloud_file_system_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ class MockCloudFileSystem : public CloudFileSystem {
return notsup_;
}

IOStatus ReadManifestLiveFiles(const std::string& /* manifest_file */, std::vector<std::string>* /* live_sst_files */) override {
return notsup_;
}

private:
IOStatus notsup_;
std::string empty_;
Expand Down
33 changes: 27 additions & 6 deletions cloud/manifest_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ IOStatus LocalManifestReader::GetLiveFilesLocally(
std::unique_ptr<SequentialFileReader> manifest_file_reader;
IOStatus s;
{
// file name here doesn't matter, it will always be mapped to the correct Manifest file.
// use empty epoch here so that it will be recognized as manifest file type
// file name here doesn't matter, it will always be mapped to the correct
// Manifest file. use empty epoch here so that it will be recognized as
// manifest file type
auto local_manifest_file = cfs_impl->RemapFilename(
ManifestFileWithEpoch(local_dbname, "" /* epoch */));

Expand All @@ -53,6 +54,27 @@ IOStatus LocalManifestReader::GetLiveFilesLocally(
return GetLiveFilesFromFileReader(std::move(manifest_file_reader), list);
}

IOStatus LocalManifestReader::GetManifestLiveFiles(
const std::string& manifest_file, std::set<uint64_t>* list) const {
auto* cfs_impl = dynamic_cast<CloudFileSystemImpl*>(cfs_);
assert(cfs_impl);

std::unique_ptr<SequentialFileReader> manifest_file_reader;
IOStatus s;
{
std::unique_ptr<FSSequentialFile> file;
s = cfs_impl->NewSequentialFile(manifest_file, FileOptions(), &file,
nullptr /*dbg*/);
if (!s.ok()) {
return s;
}
manifest_file_reader.reset(
new SequentialFileReader(std::move(file), manifest_file));
}

return GetLiveFilesFromFileReader(std::move(manifest_file_reader), list);
}

IOStatus LocalManifestReader::GetLiveFilesFromFileReader(
std::unique_ptr<SequentialFileReader> file_reader,
std::set<uint64_t>* list) const {
Expand Down Expand Up @@ -92,8 +114,7 @@ IOStatus LocalManifestReader::GetLiveFilesFromFileReader(
uint64_t num = one.second;
// Deleted files should belong to some CF
auto it = cf_live_files.find(edit.GetColumnFamily());
if ((it == cf_live_files.end()) ||
(it->second.count(level) == 0) ||
if ((it == cf_live_files.end()) || (it->second.count(level) == 0) ||
(it->second[level].count(num) == 0)) {
return IOStatus::Corruption(
"Corrupted Manifest file with unrecognized deleted file: " +
Expand Down Expand Up @@ -158,8 +179,8 @@ IOStatus ManifestReader::GetLiveFiles(const std::string& bucket_path,
}
std::unique_ptr<SequentialFileReader> file_reader;
{
auto manifestFile = ManifestFileWithEpoch(
bucket_path, cloud_manifest->GetCurrentEpoch());
auto manifestFile =
ManifestFileWithEpoch(bucket_path, cloud_manifest->GetCurrentEpoch());
std::unique_ptr<FSSequentialFile> file;
s = cfs_->NewSequentialFileCloud(bucket_prefix_, manifestFile, file_opts,
&file, dbg);
Expand Down
14 changes: 12 additions & 2 deletions cloud/manifest_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,18 @@ class LocalManifestReader {
IOStatus GetLiveFilesLocally(const std::string& local_dbname,
std::set<uint64_t>* list) const;

// Read given local manifest file and return all live files that it
// references. This doesn't rely on CLOUDMANIFEST and just accepts (any valid)
// manifest file.
//
// Provided manifest file is not updated or pulled from cloud when calling the
// function.
IOStatus GetManifestLiveFiles(const std::string& manifest_file,
std::set<uint64_t>* list) const;

protected:
// Get all the live sst file number by reading version_edit records from file_reader
// Get all the live SST file numbers by reading version_edit records from
// file_reader
IOStatus GetLiveFilesFromFileReader(
std::unique_ptr<SequentialFileReader> file_reader,
std::set<uint64_t>* list) const;
Expand All @@ -42,7 +52,7 @@ class LocalManifestReader {
//
// Operates on MANIFEST files stored in the cloud bucket directly
//
class ManifestReader: public LocalManifestReader {
class ManifestReader : public LocalManifestReader {
public:
ManifestReader(std::shared_ptr<Logger> info_log, CloudFileSystem* cfs,
const std::string& bucket_prefix);
Expand Down
49 changes: 30 additions & 19 deletions include/rocksdb/cloud/cloud_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,12 @@ class CloudFileSystemOptions {
bool resync_on_open;

// Experimental option!
// This option only affects how resync_on_open works. If resync_on_open is true,
// and resync_manifest_on_open is true, besides fetching CLOUDMANFIEST from s3,
// we will fetch latest MANIFEST file as well.
//
// This is a temporary option to help quickly rollback the change if something unexpected is wrong.
// This option only affects how resync_on_open works. If resync_on_open is
// true, and resync_manifest_on_open is true, besides fetching CLOUDMANFIEST
// from s3, we will fetch latest MANIFEST file as well.
//
// This is a temporary option to help quickly rollback the change if something
// unexpected is wrong.
// TODO(wei): remove this option once we are confident about the change.
// Default: true
bool resync_manifest_on_open;
Expand Down Expand Up @@ -389,9 +390,9 @@ class CloudFileSystemOptions {
std::string new_cookie_on_open;

// Experimental option!
// - If true, both cloud and local invisible files (i.e, CLOUDMANIFEST, MANIFEST
// and SST files which don't belong to current epoch) will be deleted when db
// is opened.
// - If true, both cloud and local invisible files (i.e, CLOUDMANIFEST,
// MANIFEST and SST files which don't belong to current epoch) will be deleted
// when db is opened.
// - Otherwise, only local invisible files will be deleted
//
// Default: true
Expand Down Expand Up @@ -457,7 +458,7 @@ class CloudFileSystemOptions {
delete_cloud_invisible_files_on_open(
_delete_cloud_invisible_files_on_open),
cloud_file_deletion_delay(_cloud_file_deletion_delay) {
(void) _cloud_type;
(void)_cloud_type;
}

// print out all options to the log
Expand All @@ -471,8 +472,10 @@ class CloudFileSystemOptions {
const std::string& object_path,
const std::string& region = "");

Status Configure(const ConfigOptions& config_options, const std::string& opts_str);
Status Serialize(const ConfigOptions& config_options, std::string* result) const;
Status Configure(const ConfigOptions& config_options,
const std::string& opts_str);
Status Serialize(const ConfigOptions& config_options,
std::string* result) const;

// Is the sst file cache configured?
bool hasSstFileCache() const {
Expand All @@ -494,8 +497,8 @@ typedef std::map<std::string, std::string> DbidList;
// files with file number >= file_num are only visible in the new
// MANIFEST-epoch file
struct CloudManifestDelta {
uint64_t file_num; // next file number for new epoch
std::string epoch; // epoch for the new manifest file
uint64_t file_num; // next file number for new epoch
std::string epoch; // epoch for the new manifest file
};

//
Expand Down Expand Up @@ -561,14 +564,14 @@ class CloudFileSystem : public FileSystem {
const std::string& dbid) = 0;

Logger* GetLogger() const { return info_log_.get(); }
const std::shared_ptr<CloudStorageProvider>& GetStorageProvider() const {
const std::shared_ptr<CloudStorageProvider>& GetStorageProvider() const {
return cloud_fs_options.storage_provider;
}

const std::shared_ptr<CloudLogController>& GetLogController() const {
return cloud_fs_options.cloud_log_controller;
}

// The SrcBucketName identifies the cloud storage bucket and
// GetSrcObjectPath specifies the path inside that bucket
// where data files reside. The specified bucket is used in
Expand Down Expand Up @@ -628,6 +631,16 @@ class CloudFileSystem : public FileSystem {
std::vector<std::string>* live_sst_files,
std::string* manifest_file) = 0;

// Read the given manifest file (either the one used in the database or a
// copy) and populate live_sst_files with the list of all SST files that it
//
// references. Unlike FindAllLiveFiles, this method doesn't resolve the
// manifest name from the cloud manifest and also doesn't pull the manifest
// (i.e., it needs to be present locally).
virtual IOStatus ReadManifestLiveFiles(
const std::string& manifest_file,
std::vector<std::string>* live_sst_files) = 0;

// Apply cloud manifest delta to in-memory cloud manifest. Does not change the
// on-disk state.
//
Expand Down Expand Up @@ -681,12 +694,10 @@ class CloudFileSystem : public FileSystem {
const std::shared_ptr<Logger>& logger,
CloudFileSystem** cfs);


// Creates a new Env that delegates all thread/time related
// calls to env, and all file operations to fs
static std::unique_ptr<Env> NewCompositeEnv(
Env* env,
const std::shared_ptr<FileSystem>& fs);
Env* env, const std::shared_ptr<FileSystem>& fs);
};

} // namespace ROCKSDB_NAMESPACE

0 comments on commit 504e55d

Please sign in to comment.