From d628fad2035ee188603f00199e4e6b5d088e05f7 Mon Sep 17 00:00:00 2001 From: Guihao Liang Date: Mon, 13 Apr 2020 09:56:54 -0700 Subject: [PATCH 01/11] better progress report --- src/core/storage/fileio/s3_api.cpp | 3 +++ src/core/storage/fileio/s3_filesys.cpp | 24 +++++++++++++++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/src/core/storage/fileio/s3_api.cpp b/src/core/storage/fileio/s3_api.cpp index eed24ffa98..fc495e4d2e 100644 --- a/src/core/storage/fileio/s3_api.cpp +++ b/src/core/storage/fileio/s3_api.cpp @@ -540,6 +540,9 @@ list_objects_response list_objects_impl(const s3url& parsed_url, << "list_objects_impl failed:" << ret.error << std::endl; } else { // continue retry + logprogress_stream << "list_objects retry on" + << parsed_url.string_from_s3url(false) + << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(backoff)); backoff *= 2; } diff --git a/src/core/storage/fileio/s3_filesys.cpp b/src/core/storage/fileio/s3_filesys.cpp index 570988fe54..41d972769c 100644 --- a/src/core/storage/fileio/s3_filesys.cpp +++ b/src/core/storage/fileio/s3_filesys.cpp @@ -12,6 +12,7 @@ #include #include +#include #include #include #include @@ -91,7 +92,12 @@ int AWSReadStreamBase::FillBuffer(char *input_ptr, size_t nwant) { std::stringstream ss; // range is includsive and zero based ss << "bytes=" << curr_bytes_ << '-' << curr_bytes_ + nwant - 1; - logstream(LOG_DEBUG) << "GetObject.Range: " << ss.str() << std::endl; + + std::string url_string = url_.string_from_s3url(false); + logprogress_stream << "start downloading " << url_string << ", " << ss.str() + << std::endl; + + auto start = std::chrono::steady_clock::now(); Aws::S3::Model::GetObjectRequest object_request; object_request.SetRange(ss.str().c_str()); @@ -114,6 +120,13 @@ int AWSReadStreamBase::FillBuffer(char *input_ptr, size_t nwant) { } // std::istreambuf_iterator retrieved_file.read(input_ptr, nwant); + + auto end = std::chrono::steady_clock::now(); + logprogress_stream << "finish downloading" << url_string << ". duration: " + << std::chrono::duration_cast( + end - start) + .count() + << " ms" << std::endl; } else { auto error = get_object_outcome.GetError(); ss.str(""); @@ -169,6 +182,12 @@ void WriteStream::Upload(bool force_upload) { // store the future into completed parts completed_parts_.push_back(s3_client_.UploadPartCallable(my_request)); + + logprogress_stream << "uploading part " << my_request.GetPartNumber() + << " of " << url_.string_from_s3url(false) + << ", with size " + << (double)(buffer_.size()) / (1024 * 1024) << " MB" + << std::endl; } void WriteStream::Finish() { @@ -203,6 +222,9 @@ void WriteStream::Finish() { logstream(LOG_ERROR) << ss.str() << std::endl; log_and_throw_io_failure(ss.str()); } + + logprogress_stream << "finished uploading all parts of " + << " of " << url_.string_from_s3url(false) << std::endl; } /*! From 728d497c29c29b52c4206f3ec8c466db804620b4 Mon Sep 17 00:00:00 2001 From: Guihao Liang Date: Mon, 13 Apr 2020 10:00:09 -0700 Subject: [PATCH 02/11] minor change --- src/core/storage/fileio/s3_filesys.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/storage/fileio/s3_filesys.cpp b/src/core/storage/fileio/s3_filesys.cpp index 41d972769c..93fb44b444 100644 --- a/src/core/storage/fileio/s3_filesys.cpp +++ b/src/core/storage/fileio/s3_filesys.cpp @@ -122,7 +122,7 @@ int AWSReadStreamBase::FillBuffer(char *input_ptr, size_t nwant) { retrieved_file.read(input_ptr, nwant); auto end = std::chrono::steady_clock::now(); - logprogress_stream << "finish downloading" << url_string << ". duration: " + logprogress_stream << "finished downloading" << url_string << ". duration: " << std::chrono::duration_cast( end - start) .count() @@ -224,7 +224,7 @@ void WriteStream::Finish() { } logprogress_stream << "finished uploading all parts of " - << " of " << url_.string_from_s3url(false) << std::endl; + << url_.string_from_s3url(false) << std::endl; } /*! From 567523add0989ab5156061a94f0280a1e640bc8a Mon Sep 17 00:00:00 2001 From: Guihao Liang Date: Mon, 13 Apr 2020 10:33:48 -0700 Subject: [PATCH 03/11] second is better --- src/core/storage/fileio/s3_filesys.cpp | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/core/storage/fileio/s3_filesys.cpp b/src/core/storage/fileio/s3_filesys.cpp index 93fb44b444..89a648fd44 100644 --- a/src/core/storage/fileio/s3_filesys.cpp +++ b/src/core/storage/fileio/s3_filesys.cpp @@ -122,11 +122,10 @@ int AWSReadStreamBase::FillBuffer(char *input_ptr, size_t nwant) { retrieved_file.read(input_ptr, nwant); auto end = std::chrono::steady_clock::now(); - logprogress_stream << "finished downloading" << url_string << ". duration: " - << std::chrono::duration_cast( - end - start) - .count() - << " ms" << std::endl; + logprogress_stream + << "finished downloading" << url_string << ". duration: " + << std::chrono::duration_cast(end - start).count() + << " seconds" << std::endl; } else { auto error = get_object_outcome.GetError(); ss.str(""); From e7024717c69151b6b7916a58629612d54687d820 Mon Sep 17 00:00:00 2001 From: Guihao Liang Date: Mon, 13 Apr 2020 10:38:48 -0700 Subject: [PATCH 04/11] address pr concern --- src/core/storage/fileio/s3_api.cpp | 2 +- src/core/storage/fileio/s3_filesys.cpp | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/core/storage/fileio/s3_api.cpp b/src/core/storage/fileio/s3_api.cpp index fc495e4d2e..12eb255a26 100644 --- a/src/core/storage/fileio/s3_api.cpp +++ b/src/core/storage/fileio/s3_api.cpp @@ -540,7 +540,7 @@ list_objects_response list_objects_impl(const s3url& parsed_url, << "list_objects_impl failed:" << ret.error << std::endl; } else { // continue retry - logprogress_stream << "list_objects retry on" + logprogress_stream << "list_objects retry on " << parsed_url.string_from_s3url(false) << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(backoff)); diff --git a/src/core/storage/fileio/s3_filesys.cpp b/src/core/storage/fileio/s3_filesys.cpp index 89a648fd44..96a2d4e253 100644 --- a/src/core/storage/fileio/s3_filesys.cpp +++ b/src/core/storage/fileio/s3_filesys.cpp @@ -94,7 +94,7 @@ int AWSReadStreamBase::FillBuffer(char *input_ptr, size_t nwant) { ss << "bytes=" << curr_bytes_ << '-' << curr_bytes_ + nwant - 1; std::string url_string = url_.string_from_s3url(false); - logprogress_stream << "start downloading " << url_string << ", " << ss.str() + logprogress_stream << "Start downloading " << url_string << ", " << ss.str() << std::endl; auto start = std::chrono::steady_clock::now(); @@ -123,7 +123,7 @@ int AWSReadStreamBase::FillBuffer(char *input_ptr, size_t nwant) { auto end = std::chrono::steady_clock::now(); logprogress_stream - << "finished downloading" << url_string << ". duration: " + << "Finished downloading" << url_string << ". Duration: " << std::chrono::duration_cast(end - start).count() << " seconds" << std::endl; } else { @@ -182,7 +182,7 @@ void WriteStream::Upload(bool force_upload) { // store the future into completed parts completed_parts_.push_back(s3_client_.UploadPartCallable(my_request)); - logprogress_stream << "uploading part " << my_request.GetPartNumber() + logprogress_stream << "Uploading part " << my_request.GetPartNumber() << " of " << url_.string_from_s3url(false) << ", with size " << (double)(buffer_.size()) / (1024 * 1024) << " MB" @@ -222,7 +222,7 @@ void WriteStream::Finish() { log_and_throw_io_failure(ss.str()); } - logprogress_stream << "finished uploading all parts of " + logprogress_stream << "Finished uploading all parts of " << url_.string_from_s3url(false) << std::endl; } From bc50c6cf8e82d0db27a0403da260428c33f6330c Mon Sep 17 00:00:00 2001 From: Guihao Liang Date: Tue, 14 Apr 2020 09:47:25 -0700 Subject: [PATCH 05/11] add stop watch --- .../storage/fileio/read_caching_device.hpp | 240 ++++++++++++++---- src/core/storage/fileio/s3_filesys.cpp | 20 -- src/core/storage/fileio/s3_filesys.hpp | 8 +- 3 files changed, 200 insertions(+), 68 deletions(-) diff --git a/src/core/storage/fileio/read_caching_device.hpp b/src/core/storage/fileio/read_caching_device.hpp index fe5197813c..6908430313 100644 --- a/src/core/storage/fileio/read_caching_device.hpp +++ b/src/core/storage/fileio/read_caching_device.hpp @@ -1,23 +1,127 @@ /* Copyright © 2017 Apple Inc. All rights reserved. * * Use of this source code is governed by a BSD-3-clause license that can - * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause + * be found in the LICENSE.txt file or at + * https://opensource.org/licenses/BSD-3-Clause */ #ifndef TURI_FILEIO_CACHING_DEVICE_HPP #define TURI_FILEIO_CACHING_DEVICE_HPP #include +#include #include #include -#include #include -#include #include +#include +#include namespace turi { // private namespace namespace { -const size_t READ_CACHING_BLOCK_SIZE = 64*1024*1024; // 64 MB -} // end private namespace +const size_t READ_CACHING_BLOCK_SIZE = 64 * 1024 * 1024; // 64 MB + +using namespace std::chrono; + +/* + * For each parallel (streamlined) network IO, we can view them as intervals + * in parallel universe. + * + * What we do is to merge all universe into one: the very start and the + * very end of the entire IO, which is the elapsed time, using the same time + * axis. + * t1 |-----| + * t2 |----| + * t3 |-----| + * s ------------ e + * + * Since SFrame can be streamed, meaning it does not download all of the files + * at once. It may have multiple times of fetching data. So it consists many + * small parts of IO activities. + * + * |----|<- less than 100 ms -> |----| + * s ------------------------------- e + * + * if 2 IO acticity are only separated by less than 50 ms time intervals, + * they should be considered as the same part of an IO process, instead of 2. + * + * |-----| <- 10 mins -> |-------| + * s --- e s ----- e + * + * Since 2 adjacent IO activities are too distant, we should view it as 2 + * separate activities. For example, user may play with first 1000 rows, and + * later the user jumps to the tail of the frame to mess around with the data. + * + */ +template +class StopWatch { + public: + using my_time_t = decltype(steady_clock::now()); + StopWatch(size_t interval) + : interval_(interval), beg_(steady_clock::now()), end_(beg_) {} + + StopWatch(const StopWatch&) = delete; + StopWatch(StopWatch&& rhs) = delete; + + void start() { + std::lock_guard lk(mx_); + if (thread_num_ == 0) { + if (duration_cast(steady_clock::now() - beg_) > + milliseconds{150}) { + beg_ = steady_clock::now(); + mile_stone_ = beg_; + } + } + ++thread_num_; + } + + bool is_time_to_record() { + std::lock_guard lock(mx_); + + if (thread_num_ == 0) return false; + + // reach to timepoint, log it. + auto cur = steady_clock::now(); + if (cur > mile_stone_) { + mile_stone_ = cur + T(interval_); + return true; + } + + // no need to record + return false; + } + + void end() { + std::lock_guard lk(mx_); + if (thread_num_ > 0 && --thread_num_ == 0) { + end_ = steady_clock::now(); + } + } + + template + Output duration() const { + std::lock_guard lk(mx_); + // the clock is still on + if (thread_num_ > 0) { + return duration_cast(steady_clock::now() - beg_); + } + + return duration_cast(end_ - beg_); + } + + ~StopWatch() { end(); } + + private: + uint64_t interval_; + my_time_t beg_; + my_time_t end_; + my_time_t mile_stone_; + mutable std::mutex mx_; + size_t thread_num_; +}; + +using StopWatchSec_t = StopWatch; + +} // namespace /** * \ingroup fileio @@ -33,18 +137,18 @@ const size_t READ_CACHING_BLOCK_SIZE = 64*1024*1024; // 64 MB * * After: * \code - * typedef boost::iostreams::stream > s3_fstream; - * \endcode + * typedef boost::iostreams::stream > + * s3_fstream; \endcode * * It uses the \ref block_cache to pro */ template class read_caching_device { - public: // boost iostream concepts + public: // boost iostream concepts typedef typename T::char_type char_type; typedef typename T::category category; - read_caching_device() { } + read_caching_device() {} read_caching_device(const std::string& filename, const bool write = false) { logstream(LOG_DEBUG) << "read_cachine_device: " << filename << std::endl; @@ -60,6 +164,9 @@ class read_caching_device { m_contents = std::make_shared(filename, write); m_file_size = m_contents->file_size(); m_filename_to_filesize_map[filename] = m_file_size; + // report downloading every 30s + m_filename_to_stop_watch[filename] = + std::unique_ptr(new StopWatchSec_t(30)); } } else { m_contents = std::make_shared(filename, write); @@ -78,15 +185,16 @@ class read_caching_device { // evict all blocks for this key auto& bc = block_cache::get_instance(); size_t block_number = 0; - while(1) { + while (1) { std::string key = get_key_name(block_number); if (bc.evict_key(key) == false) break; - ++ block_number; + ++block_number; } // evict the file size cache { std::lock_guard file_size_guard(m_filesize_cache_mutex); m_filename_to_filesize_map.erase(m_filename); + m_filename_to_stop_watch.erase(m_filename); } } else if (mode == std::ios_base::in && !m_writing) { @@ -111,18 +219,18 @@ class read_caching_device { std::streamsize ret = 0; - while(n > 0) { + while (n > 0) { // the block number containing the offset. auto block_number = m_file_pos / READ_CACHING_BLOCK_SIZE; // the offset inside the block auto block_offset = m_file_pos % READ_CACHING_BLOCK_SIZE; - // number of bytes I can read inside this block before I hit the next block - size_t n_bytes = (block_number + 1) * READ_CACHING_BLOCK_SIZE - m_file_pos; + // number of bytes I can read inside this block before I hit the next + // block + size_t n_bytes = + (block_number + 1) * READ_CACHING_BLOCK_SIZE - m_file_pos; n_bytes = std::min(n_bytes, n); - bool success = fetch_block(strm_ptr + ret, - block_number, - block_offset, - n_bytes); + bool success = + fetch_block(strm_ptr + ret, block_number, block_offset, n_bytes); if (success == false) { log_and_throw(std::string("Unable to read ") + m_filename); } @@ -135,18 +243,33 @@ class read_caching_device { } std::streamsize write(const char* strm_ptr, std::streamsize n) { - return get_contents()->write(strm_ptr, n); - } + { + std::lock_guard file_size_guard(m_filesize_cache_mutex); + m_filename_to_stop_watch[m_filename]->start(); + } + + auto bytes_written = get_contents()->write(strm_ptr, n); + + { + std::lock_guard file_size_guard(m_filesize_cache_mutex); + auto& stop_watch = m_filename_to_stop_watch[m_filename]; + if (stop_watch->is_time_to_record()) { + logprogress_stream << "Finish uploading " << sanitize_url(m_filename) + << ". Elapsed time " + << stop_watch->duration().count() + << " seconds" << std::endl; + } + } - bool good() const { - return get_contents()->good(); + return bytes_written; } + bool good() const { return get_contents()->good(); } + /** * Seeks to a different location. */ - std::streampos seek(std::streamoff off, - std::ios_base::seekdir way, + std::streampos seek(std::streamoff off, std::ios_base::seekdir way, std::ios_base::openmode openmode) { if (openmode == std::ios_base::in) { if (way == std::ios_base::beg) { @@ -155,7 +278,8 @@ class read_caching_device { m_file_pos = std::min(m_file_pos + off, m_file_size); m_file_pos = std::max(m_file_pos, 0); } else if (way == std::ios_base::end) { - m_file_pos = std::min(m_file_size + off - 1, m_file_size); + m_file_pos = + std::min(m_file_size + off - 1, m_file_size); m_file_pos = std::max(m_file_pos, 0); } return m_file_pos; @@ -169,14 +293,10 @@ class read_caching_device { * Returns (size_t)(-1) if there is no file opened, or if there is an * error obtaining the file size. */ - size_t file_size() const { - return m_file_size; - } + size_t file_size() const { return m_file_size; } /// Not supported - std::shared_ptr get_underlying_stream() { - return nullptr; - } + std::shared_ptr get_underlying_stream() { return nullptr; } private: std::string m_filename; @@ -187,6 +307,8 @@ class read_caching_device { static mutex m_filesize_cache_mutex; static std::map m_filename_to_filesize_map; + static std::map> + m_filename_to_stop_watch; std::shared_ptr& get_contents() { if (!m_contents) { @@ -196,35 +318,56 @@ class read_caching_device { } std::string get_key_name(size_t block_number) { // we generate a key name that will never appear in any filename - return m_filename + "////:" + std::to_string(block_number); + return m_filename + "////:" + std::to_string(block_number); } /** * Fetches the contents of a block. * Returns true on success and false on failure. */ - bool fetch_block(char* output, - size_t block_number, - size_t startpos, + bool fetch_block(char* output, size_t block_number, size_t startpos, size_t length) { auto& bc = block_cache::get_instance(); std::string key = get_key_name(block_number); int64_t ret = bc.read(key, output, startpos, startpos + length); if (static_cast(ret) == length) return true; - logstream(LOG_INFO) << "Fetching " << sanitize_url(m_filename) << " Block " << block_number << std::endl; + logstream(LOG_INFO) << "Fetching " << sanitize_url(m_filename) << " Block " + << block_number << std::endl; // ok. failure... no such block or block is bad. We read it ourselves. // read the whole block auto block_start = block_number * READ_CACHING_BLOCK_SIZE; - auto block_end = std::min(block_start + READ_CACHING_BLOCK_SIZE, m_file_size); + auto block_end = + std::min(block_start + READ_CACHING_BLOCK_SIZE, m_file_size); // seek to the block and read the whole block at once auto& contents = get_contents(); contents->seek(block_start, std::ios_base::beg, std::ios_base::in); std::string block_contents(block_end - block_start, 0); - auto bytes_read = contents->read(&(block_contents[0]), - block_end - block_start); + + { + std::lock_guard file_size_guard(m_filesize_cache_mutex); + m_filename_to_stop_watch[m_filename]->start(); + } + + auto bytes_read = + contents->read(&(block_contents[0]), block_end - block_start); + + { + std::lock_guard file_size_guard(m_filesize_cache_mutex); + auto& stop_watch = m_filename_to_stop_watch[m_filename]; + if (stop_watch->is_time_to_record()) { + logprogress_stream << "Finished fetching block " << block_number + << ". Elapsed " + << stop_watch->duration().count() + << "s for downloading " << sanitize_url(m_filename) + << std::endl; + } + } + // read failed. - static_assert(std::is_signed::value, "decltype(bytes_read) signed"); - static_assert(std::is_integral::value, "decltype(bytes_read) integral"); + static_assert(std::is_signed::value, + "decltype(bytes_read) signed"); + static_assert(std::is_integral::value, + "decltype(bytes_read) integral"); if (bytes_read < truncate_check(block_end - block_start)) { return false; } @@ -241,12 +384,17 @@ class read_caching_device { return true; } -}; // end of read_caching_device +}; // end of read_caching_device -template +template mutex read_caching_device::m_filesize_cache_mutex; -template -std::map read_caching_device::m_filename_to_filesize_map; -} // namespace turi +template +std::map + read_caching_device::m_filename_to_filesize_map; + +template +std::map> + read_caching_device::m_filename_to_stop_watch; +} // namespace turi #endif diff --git a/src/core/storage/fileio/s3_filesys.cpp b/src/core/storage/fileio/s3_filesys.cpp index 96a2d4e253..ff5b93dde0 100644 --- a/src/core/storage/fileio/s3_filesys.cpp +++ b/src/core/storage/fileio/s3_filesys.cpp @@ -93,12 +93,6 @@ int AWSReadStreamBase::FillBuffer(char *input_ptr, size_t nwant) { // range is includsive and zero based ss << "bytes=" << curr_bytes_ << '-' << curr_bytes_ + nwant - 1; - std::string url_string = url_.string_from_s3url(false); - logprogress_stream << "Start downloading " << url_string << ", " << ss.str() - << std::endl; - - auto start = std::chrono::steady_clock::now(); - Aws::S3::Model::GetObjectRequest object_request; object_request.SetRange(ss.str().c_str()); object_request.SetBucket(url_.bucket.c_str()); @@ -121,11 +115,6 @@ int AWSReadStreamBase::FillBuffer(char *input_ptr, size_t nwant) { // std::istreambuf_iterator retrieved_file.read(input_ptr, nwant); - auto end = std::chrono::steady_clock::now(); - logprogress_stream - << "Finished downloading" << url_string << ". Duration: " - << std::chrono::duration_cast(end - start).count() - << " seconds" << std::endl; } else { auto error = get_object_outcome.GetError(); ss.str(""); @@ -181,12 +170,6 @@ void WriteStream::Upload(bool force_upload) { // store the future into completed parts completed_parts_.push_back(s3_client_.UploadPartCallable(my_request)); - - logprogress_stream << "Uploading part " << my_request.GetPartNumber() - << " of " << url_.string_from_s3url(false) - << ", with size " - << (double)(buffer_.size()) / (1024 * 1024) << " MB" - << std::endl; } void WriteStream::Finish() { @@ -221,9 +204,6 @@ void WriteStream::Finish() { logstream(LOG_ERROR) << ss.str() << std::endl; log_and_throw_io_failure(ss.str()); } - - logprogress_stream << "Finished uploading all parts of " - << url_.string_from_s3url(false) << std::endl; } /*! diff --git a/src/core/storage/fileio/s3_filesys.hpp b/src/core/storage/fileio/s3_filesys.hpp index 5640900971..d5b2b36a7d 100644 --- a/src/core/storage/fileio/s3_filesys.hpp +++ b/src/core/storage/fileio/s3_filesys.hpp @@ -109,8 +109,11 @@ class AWSReadStreamBase : public SeekStream { } virtual void Close() { - logstream(LOG_DEBUG) << "AWSReadStream::Close()" << std::endl; - Reset(file_size_); + if (!closed_) { + closed_ = true; + logstream(LOG_DEBUG) << "AWSReadStream::Close()" << std::endl; + Reset(file_size_); + } } virtual size_t Tell(void) { return curr_bytes_; } @@ -143,6 +146,7 @@ class AWSReadStreamBase : public SeekStream { protected: // the total size of the file size_t file_size_ = 0; + bool closed_ = false; s3url url_; private: From 86d604a5516d5cad9b4fe77bcf28b190fcbb710c Mon Sep 17 00:00:00 2001 From: Guihao Liang Date: Tue, 14 Apr 2020 18:05:50 -0700 Subject: [PATCH 06/11] partial work --- .../storage/fileio/read_caching_device.hpp | 99 ++++++++++------ test/fileio/stop_watch_test.cxx | 106 ++++++++++++++++++ 2 files changed, 173 insertions(+), 32 deletions(-) create mode 100644 test/fileio/stop_watch_test.cxx diff --git a/src/core/storage/fileio/read_caching_device.hpp b/src/core/storage/fileio/read_caching_device.hpp index 6908430313..2beec2950a 100644 --- a/src/core/storage/fileio/read_caching_device.hpp +++ b/src/core/storage/fileio/read_caching_device.hpp @@ -19,8 +19,7 @@ namespace turi { // private namespace namespace { const size_t READ_CACHING_BLOCK_SIZE = 64 * 1024 * 1024; // 64 MB - -using namespace std::chrono; +} // namespace /* * For each parallel (streamlined) network IO, we can view them as intervals @@ -38,10 +37,10 @@ using namespace std::chrono; * at once. It may have multiple times of fetching data. So it consists many * small parts of IO activities. * - * |----|<- less than 100 ms -> |----| + * |----|<- less than 150 ms -> |----| * s ------------------------------- e * - * if 2 IO acticity are only separated by less than 50 ms time intervals, + * if 2 IO acticity are only separated by less than 150 ms time intervals, * they should be considered as the same part of an IO process, instead of 2. * * |-----| <- 10 mins -> |-------| @@ -55,6 +54,7 @@ using namespace std::chrono; template class StopWatch { public: + using steady_clock = std::chrono::steady_clock; using my_time_t = decltype(steady_clock::now()); StopWatch(size_t interval) : interval_(interval), beg_(steady_clock::now()), end_(beg_) {} @@ -64,20 +64,27 @@ class StopWatch { void start() { std::lock_guard lk(mx_); - if (thread_num_ == 0) { - if (duration_cast(steady_clock::now() - beg_) > - milliseconds{150}) { + + if (thread_set_.size() == 0) { + using milliseconds = std::chrono::milliseconds; + auto lag = std::chrono::duration_cast( + std::chrono::steady_clock::now() - beg_); + if (lag > milliseconds{150}) { beg_ = steady_clock::now(); mile_stone_ = beg_; } } - ++thread_num_; + + if (!thread_set_.insert(std::this_thread::get_id()).second) { + logstream(LOG_DEBUG) << "this thread " << std::this_thread::get_id() + << "already starts the clock" << std::endl; + } } bool is_time_to_record() { std::lock_guard lock(mx_); - if (thread_num_ == 0) return false; + if (thread_set_.size() == 0) return false; // reach to timepoint, log it. auto cur = steady_clock::now(); @@ -90,25 +97,39 @@ class StopWatch { return false; } - void end() { + /* + * @return: int. number of threads still holding + */ + int stop() { std::lock_guard lk(mx_); - if (thread_num_ > 0 && --thread_num_ == 0) { - end_ = steady_clock::now(); + if (thread_set_.count(std::this_thread::get_id())) { + thread_set_.erase(std::this_thread::get_id()); + // last man standing + if (thread_set_.size() == 0) { + end_ = steady_clock::now(); + return 0; + } + // still running + return thread_set_.size(); } + + return thread_set_.size(); } - template + template Output duration() const { std::lock_guard lk(mx_); // the clock is still on - if (thread_num_ > 0) { - return duration_cast(steady_clock::now() - beg_); + if (thread_set_.count(std::this_thread::get_id())) { + return std::chrono::duration_cast(steady_clock::now() - beg_); } - return duration_cast(end_ - beg_); + return std::chrono::duration_cast(end_ - beg_); } - ~StopWatch() { end(); } + // singleton destroyed since it's non-copy and movable, + // any dangling pointer or reference to this stop_watah is illegal + ~StopWatch() { stop(); } private: uint64_t interval_; @@ -116,13 +137,12 @@ class StopWatch { my_time_t end_; my_time_t mile_stone_; mutable std::mutex mx_; - size_t thread_num_; + + std::set threads_; }; using StopWatchSec_t = StopWatch; -} // namespace - /** * \ingroup fileio * Can be wrapped around any device implement to provide read caching. This @@ -165,8 +185,11 @@ class read_caching_device { m_file_size = m_contents->file_size(); m_filename_to_filesize_map[filename] = m_file_size; // report downloading every 30s - m_filename_to_stop_watch[filename] = - std::unique_ptr(new StopWatchSec_t(30)); + if (m_filename_to_filesize_map.find(filename) != + m_filename_to_filesize_map.end()) { + m_filename_to_stop_watch[filename] = + std::unique_ptr(new StopWatchSec_t(30)); + } } } else { m_contents = std::make_shared(filename, write); @@ -194,12 +217,24 @@ class read_caching_device { { std::lock_guard file_size_guard(m_filesize_cache_mutex); m_filename_to_filesize_map.erase(m_filename); - m_filename_to_stop_watch.erase(m_filename); + + auto iter = m_filename_to_stop_watch.find(m_filename); + if (iter != m_filename_to_stop_watch.end()) { + if (!iter->stop()) { + m_filename_to_stop_watch.erase(iter); + } + } } } else if (mode == std::ios_base::in && !m_writing) { if (m_contents) m_contents->close(mode); m_contents.reset(); + { + std::lock_guard file_size_guard(m_filesize_cache_mutex); + if (m_filename_to_stop_watch.find(m_filename) != + m_filename_to_stop_watch.end()) + m_filename_to_stop_watch[m_filename]->stop(); + } } } @@ -254,10 +289,11 @@ class read_caching_device { std::lock_guard file_size_guard(m_filesize_cache_mutex); auto& stop_watch = m_filename_to_stop_watch[m_filename]; if (stop_watch->is_time_to_record()) { - logprogress_stream << "Finish uploading " << sanitize_url(m_filename) - << ". Elapsed time " - << stop_watch->duration().count() - << " seconds" << std::endl; + logprogress_stream + << "Finish uploading " << sanitize_url(m_filename) + << ". Elapsed time " + << stop_watch->template duration().count() + << " seconds" << std::endl; } } @@ -355,11 +391,10 @@ class read_caching_device { std::lock_guard file_size_guard(m_filesize_cache_mutex); auto& stop_watch = m_filename_to_stop_watch[m_filename]; if (stop_watch->is_time_to_record()) { - logprogress_stream << "Finished fetching block " << block_number - << ". Elapsed " - << stop_watch->duration().count() - << "s for downloading " << sanitize_url(m_filename) - << std::endl; + logprogress_stream + << "Finished fetching block " << block_number << ". Elapsed " + << stop_watch->template duration().count() + << "s for downloading " << sanitize_url(m_filename) << std::endl; } } diff --git a/test/fileio/stop_watch_test.cxx b/test/fileio/stop_watch_test.cxx new file mode 100644 index 0000000000..723d7f1692 --- /dev/null +++ b/test/fileio/stop_watch_test.cxx @@ -0,0 +1,106 @@ +#define BOOST_TEST_MODULE +#include +#include +#include +#include + +using namespace turi; +using namespace std::chrono; +struct stop_watch_test { + using my_watch_t = StopWatch; + + public: + void test_end_without_start(void) { + my_watch_t watch = my_watch_t(1); + watch.end(); + TS_ASSERT_EQUALS(watch.templace duration(), milliseconds{0}); + TS_ASSERT_THROWS_NOTHING(watch.end()); + } + + void test_double_start(void) { + my_watch_t watch = my_watch_t(1); + watch.start(); + TS_ASSERT_THROWS_ANYTHING(watch.start()); + } + + void test_single_thread(void) { + my_watch_t watch = my_watch_t(1); + watch.start(); + std::this_thread::sleep_for(milliseconds(2)); + TS_ASSERT(watch.templace duration() >= milliseconds(2)); + watch.end(); + TS_ASSERT(watch.templace duration() >= milliseconds(2)); + } + + void test_multi_thread_1(void) { + my_watch_t watch = my_watch_t(1); + watch.start(); + auto start = std::chrono::steady_clock::now(); + std::thread t1([&watch]() { + watch.start(); + watch.end(); + }); + + std::thread t2([&watch]() { + std::this_thread::sleep_for(milliseconds(1)); + watch.start(); + std::this_thread::sleep_for(milliseconds(1)); + watch.end(); + }); + std::this_thread::sleep_for(milliseconds(10)); + t1.join(); + t2.join(); + auto end = std::chrono::steady_clock::now(); + watch.end(); + + TS_ASSERT(watch.templace duration() >= + std::chrono::duration_cast(end - start)); + } + + void test_multi_thread_2(void) { + my_watch_t watch = my_watch_t(1); + watch.start(); + auto start = std::chrono::steady_clock::now(); + std::thread t1([&watch]() { + watch.start(); + watch.end(); + }); + + std::thread t2([&watch]() { + std::this_thread::sleep_for(milliseconds(1)); + watch.start(); + std::this_thread::sleep_for(milliseconds(15)); + watch.end(); + }); + t1.join(); + std::this_thread::sleep_for(milliseconds(1)); + auto end = std::chrono::steady_clock::now(); + watch.end(); + TS_ASSERT(watch.templace duration() >= + std::chrono::duration_cast(end - start)); + TS_ASSERT(watch.templace duration() <= milliseconds > (15)); + t2.join(); + + TS_ASSERT(watch.templace duration() >= milliseconds > (15)); + watch.end(); + TS_ASSERT(watch.templace duration() >= milliseconds > (15)); + } +}; + +BOOST_FIXTURE_TEST_SUITE(_stop_watch_test, stop_watch_test) +BOOST_AUTO_TEST_CASE(test_end_without_start) { + stop_watch_test::test_end_without_start(); +} +BOOST_AUTO_TEST_CASE(test_double_start) { + stop_watch_test::test_double_start(); +} +BOOST_AUTO_TEST_CASE(test_single_thread) { + stop_watch_test::test_single_thread(); +} +BOOST_AUTO_TEST_CASE(test_multi_thread_1) { + stop_watch_test::test_multi_thread_1(); +} +BOOST_AUTO_TEST_CASE(test_multi_thread_2) { + stop_watch_test::test_multi_thread_2(); +} +BOOST_AUTO_TEST_SUITE_END() From 14b65346123bdd8f21d74a294e76048376baf4a1 Mon Sep 17 00:00:00 2001 From: Guihao Liang Date: Fri, 8 May 2020 11:57:26 -0700 Subject: [PATCH 07/11] the last dance: implemented thread-safe stop watch --- .../storage/fileio/read_caching_device.hpp | 61 +++++++---- src/core/storage/fileio/s3_api.cpp | 3 - src/core/storage/fileio/s3_filesys.cpp | 3 +- test/fileio/CMakeLists.txt | 1 + test/fileio/stop_watch_test.cxx | 102 ++++++++++++------ 5 files changed, 114 insertions(+), 56 deletions(-) diff --git a/src/core/storage/fileio/read_caching_device.hpp b/src/core/storage/fileio/read_caching_device.hpp index 2beec2950a..7f41c9e4b2 100644 --- a/src/core/storage/fileio/read_caching_device.hpp +++ b/src/core/storage/fileio/read_caching_device.hpp @@ -62,10 +62,17 @@ class StopWatch { StopWatch(const StopWatch&) = delete; StopWatch(StopWatch&& rhs) = delete; + /* + * start will register the thread id so that client can use if (stop() == 0) + * to decide whether stop watch should be reallocated. + * + * ideally, use start once and close it once with stop. + * This impl allows double start. + * */ void start() { std::lock_guard lk(mx_); - if (thread_set_.size() == 0) { + if (threads_.size() == 0) { using milliseconds = std::chrono::milliseconds; auto lag = std::chrono::duration_cast( std::chrono::steady_clock::now() - beg_); @@ -75,8 +82,8 @@ class StopWatch { } } - if (!thread_set_.insert(std::this_thread::get_id()).second) { - logstream(LOG_DEBUG) << "this thread " << std::this_thread::get_id() + if (!threads_.insert(std::this_thread::get_id()).second) { + logstream(LOG_ERROR) << "this thread " << std::this_thread::get_id() << "already starts the clock" << std::endl; } } @@ -84,7 +91,7 @@ class StopWatch { bool is_time_to_record() { std::lock_guard lock(mx_); - if (thread_set_.size() == 0) return false; + if (threads_.size() == 0) return false; // reach to timepoint, log it. auto cur = steady_clock::now(); @@ -100,27 +107,31 @@ class StopWatch { /* * @return: int. number of threads still holding */ - int stop() { + int stop(bool no_throw = false) { std::lock_guard lk(mx_); - if (thread_set_.count(std::this_thread::get_id())) { - thread_set_.erase(std::this_thread::get_id()); + + if (threads_.count(std::this_thread::get_id())) { + threads_.erase(std::this_thread::get_id()); // last man standing - if (thread_set_.size() == 0) { + if (threads_.size() == 0) { end_ = steady_clock::now(); return 0; } // still running - return thread_set_.size(); + return threads_.size(); } - return thread_set_.size(); + // no-op + if (no_throw) return threads_.size(); + + std_log_and_throw(std::logic_error, "you don't own this stop watch"); } template Output duration() const { std::lock_guard lk(mx_); // the clock is still on - if (thread_set_.count(std::this_thread::get_id())) { + if (!threads_.empty()) { return std::chrono::duration_cast(steady_clock::now() - beg_); } @@ -129,7 +140,7 @@ class StopWatch { // singleton destroyed since it's non-copy and movable, // any dangling pointer or reference to this stop_watah is illegal - ~StopWatch() { stop(); } + ~StopWatch() { stop(true); } private: uint64_t interval_; @@ -180,16 +191,19 @@ class read_caching_device { auto iter = m_filename_to_filesize_map.find(filename); if (iter != m_filename_to_filesize_map.end()) { m_file_size = iter->second; + m_filename_to_stop_watch[filename]->start(); } else { m_contents = std::make_shared(filename, write); m_file_size = m_contents->file_size(); m_filename_to_filesize_map[filename] = m_file_size; // report downloading every 30s - if (m_filename_to_filesize_map.find(filename) != - m_filename_to_filesize_map.end()) { + if (m_filename_to_stop_watch.find(filename) == + m_filename_to_stop_watch.end()) { m_filename_to_stop_watch[filename] = std::unique_ptr(new StopWatchSec_t(30)); } + // start the clock immediately to register the thread id into it + m_filename_to_stop_watch[filename]->start(); } } else { m_contents = std::make_shared(filename, write); @@ -220,7 +234,9 @@ class read_caching_device { auto iter = m_filename_to_stop_watch.find(m_filename); if (iter != m_filename_to_stop_watch.end()) { - if (!iter->stop()) { + // use no throw since this nanny thread may not start clock + if (iter->second->stop(/* no throw */ true) == 0) { + // nobody is holding m_filename_to_stop_watch.erase(iter); } } @@ -231,9 +247,14 @@ class read_caching_device { m_contents.reset(); { std::lock_guard file_size_guard(m_filesize_cache_mutex); - if (m_filename_to_stop_watch.find(m_filename) != - m_filename_to_stop_watch.end()) - m_filename_to_stop_watch[m_filename]->stop(); + m_filename_to_filesize_map.erase(m_filename); + auto iter = m_filename_to_stop_watch.find(m_filename); + if (iter != m_filename_to_stop_watch.end()) + // use no throw since this nanny thread may not start clock + if (iter->second->stop(/* no throw */ true) == 0) { + // nobody is holding + m_filename_to_stop_watch.erase(iter); + } } } } @@ -280,6 +301,8 @@ class read_caching_device { std::streamsize write(const char* strm_ptr, std::streamsize n) { { std::lock_guard file_size_guard(m_filesize_cache_mutex); + if (m_filename_to_filesize_map.count(m_filename) == 0) + log_and_throw("write through closed files handle"); m_filename_to_stop_watch[m_filename]->start(); } @@ -381,6 +404,8 @@ class read_caching_device { { std::lock_guard file_size_guard(m_filesize_cache_mutex); + if (m_filename_to_filesize_map.count(m_filename) == 0) + log_and_throw("read through closed files handle"); m_filename_to_stop_watch[m_filename]->start(); } diff --git a/src/core/storage/fileio/s3_api.cpp b/src/core/storage/fileio/s3_api.cpp index 12eb255a26..eed24ffa98 100644 --- a/src/core/storage/fileio/s3_api.cpp +++ b/src/core/storage/fileio/s3_api.cpp @@ -540,9 +540,6 @@ list_objects_response list_objects_impl(const s3url& parsed_url, << "list_objects_impl failed:" << ret.error << std::endl; } else { // continue retry - logprogress_stream << "list_objects retry on " - << parsed_url.string_from_s3url(false) - << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(backoff)); backoff *= 2; } diff --git a/src/core/storage/fileio/s3_filesys.cpp b/src/core/storage/fileio/s3_filesys.cpp index ff5b93dde0..570988fe54 100644 --- a/src/core/storage/fileio/s3_filesys.cpp +++ b/src/core/storage/fileio/s3_filesys.cpp @@ -12,7 +12,6 @@ #include #include -#include #include #include #include @@ -92,6 +91,7 @@ int AWSReadStreamBase::FillBuffer(char *input_ptr, size_t nwant) { std::stringstream ss; // range is includsive and zero based ss << "bytes=" << curr_bytes_ << '-' << curr_bytes_ + nwant - 1; + logstream(LOG_DEBUG) << "GetObject.Range: " << ss.str() << std::endl; Aws::S3::Model::GetObjectRequest object_request; object_request.SetRange(ss.str().c_str()); @@ -114,7 +114,6 @@ int AWSReadStreamBase::FillBuffer(char *input_ptr, size_t nwant) { } // std::istreambuf_iterator retrieved_file.read(input_ptr, nwant); - } else { auto error = get_object_outcome.GetError(); ss.str(""); diff --git a/test/fileio/CMakeLists.txt b/test/fileio/CMakeLists.txt index 519cd502dc..f0bc201868 100644 --- a/test/fileio/CMakeLists.txt +++ b/test/fileio/CMakeLists.txt @@ -12,6 +12,7 @@ make_boost_test(fixed_size_cache_manager_test.cxx REQUIRES unity_shared_for_testing) make_boost_test(general_fstream_test.cxx REQUIRES unity_shared_for_testing) make_boost_test(parse_hdfs_url_test.cxx REQUIRES unity_shared_for_testing) +make_boost_test(stop_watch_test.cxx REQUIRES unity_shared_for_testing) if(${TC_BUILD_REMOTEFS}) make_executable(s3_filesys_test SOURCES s3_filesys_test.cpp REQUIRES diff --git a/test/fileio/stop_watch_test.cxx b/test/fileio/stop_watch_test.cxx index 723d7f1692..a6f51b773e 100644 --- a/test/fileio/stop_watch_test.cxx +++ b/test/fileio/stop_watch_test.cxx @@ -10,86 +10,116 @@ struct stop_watch_test { using my_watch_t = StopWatch; public: - void test_end_without_start(void) { - my_watch_t watch = my_watch_t(1); - watch.end(); - TS_ASSERT_EQUALS(watch.templace duration(), milliseconds{0}); - TS_ASSERT_THROWS_NOTHING(watch.end()); + void test_stop_without_start(void) { + my_watch_t watch(1); + TS_ASSERT_THROWS_ANYTHING(watch.stop()); } void test_double_start(void) { - my_watch_t watch = my_watch_t(1); + my_watch_t watch(1); watch.start(); - TS_ASSERT_THROWS_ANYTHING(watch.start()); + // no double start + TS_ASSERT_THROWS_NOTHING(watch.start()); } void test_single_thread(void) { - my_watch_t watch = my_watch_t(1); + my_watch_t watch(1); watch.start(); std::this_thread::sleep_for(milliseconds(2)); - TS_ASSERT(watch.templace duration() >= milliseconds(2)); - watch.end(); - TS_ASSERT(watch.templace duration() >= milliseconds(2)); + TS_ASSERT(watch.duration() >= milliseconds(2)); + watch.stop(); + TS_ASSERT(watch.duration() >= milliseconds(2)); } + // main thread's stop watch stops as the last void test_multi_thread_1(void) { - my_watch_t watch = my_watch_t(1); + my_watch_t watch(1); watch.start(); auto start = std::chrono::steady_clock::now(); std::thread t1([&watch]() { watch.start(); - watch.end(); + watch.stop(); }); std::thread t2([&watch]() { std::this_thread::sleep_for(milliseconds(1)); watch.start(); std::this_thread::sleep_for(milliseconds(1)); - watch.end(); + watch.stop(); }); std::this_thread::sleep_for(milliseconds(10)); t1.join(); t2.join(); - auto end = std::chrono::steady_clock::now(); - watch.end(); + auto stop = std::chrono::steady_clock::now(); + watch.stop(); - TS_ASSERT(watch.templace duration() >= - std::chrono::duration_cast(end - start)); + TS_ASSERT(watch.duration() >= + std::chrono::duration_cast(stop - start)); } + // t2 stops as the last one void test_multi_thread_2(void) { - my_watch_t watch = my_watch_t(1); + my_watch_t watch(1); watch.start(); auto start = std::chrono::steady_clock::now(); + std::thread t1([&watch]() { watch.start(); - watch.end(); + TS_ASSERT(watch.stop() > 0); }); std::thread t2([&watch]() { std::this_thread::sleep_for(milliseconds(1)); watch.start(); - std::this_thread::sleep_for(milliseconds(15)); - watch.end(); + std::this_thread::sleep_for(milliseconds(25)); + // t2 is the last one to stop the watch + TS_ASSERT(watch.stop() == 0); }); + t1.join(); - std::this_thread::sleep_for(milliseconds(1)); - auto end = std::chrono::steady_clock::now(); - watch.end(); - TS_ASSERT(watch.templace duration() >= - std::chrono::duration_cast(end - start)); - TS_ASSERT(watch.templace duration() <= milliseconds > (15)); + std::this_thread::sleep_for(milliseconds(5)); + // main also stops + auto stop = std::chrono::steady_clock::now(); + TS_ASSERT(watch.stop() > 0); + + // the clock is still on becuase t2 is still running + TS_ASSERT(watch.duration() >= + std::chrono::duration_cast(stop - start)); + TS_ASSERT(watch.duration() >= milliseconds(5)); + t2.join(); + TS_ASSERT(watch.duration() >= milliseconds(25)); + } + + void test_stop_and_continue(void) { + my_watch_t watch(1); + watch.start(); + watch.stop(); + // interval is less than 150ms + std::this_thread::sleep_for(milliseconds(3)); + watch.start(); + watch.stop(); + std::this_thread::sleep_for(milliseconds(3)); + watch.start(); + watch.stop(); - TS_ASSERT(watch.templace duration() >= milliseconds > (15)); - watch.end(); - TS_ASSERT(watch.templace duration() >= milliseconds > (15)); + TS_ASSERT(watch.duration() >= milliseconds(6)); + } + + void test_time_to_record(void) { + my_watch_t watch(5); + watch.start(); + TS_ASSERT(watch.is_time_to_record() == true); + TS_ASSERT(watch.is_time_to_record() == false); + std::this_thread::sleep_for(milliseconds(5)); + TS_ASSERT(watch.is_time_to_record() == true); + TS_ASSERT(watch.is_time_to_record() == false); } }; BOOST_FIXTURE_TEST_SUITE(_stop_watch_test, stop_watch_test) -BOOST_AUTO_TEST_CASE(test_end_without_start) { - stop_watch_test::test_end_without_start(); +BOOST_AUTO_TEST_CASE(test_stop_without_start) { + stop_watch_test::test_stop_without_start(); } BOOST_AUTO_TEST_CASE(test_double_start) { stop_watch_test::test_double_start(); @@ -103,4 +133,10 @@ BOOST_AUTO_TEST_CASE(test_multi_thread_1) { BOOST_AUTO_TEST_CASE(test_multi_thread_2) { stop_watch_test::test_multi_thread_2(); } +BOOST_AUTO_TEST_CASE(test_stop_and_continue) { + stop_watch_test::test_stop_and_continue(); +} +BOOST_AUTO_TEST_CASE(test_time_to_record) { + stop_watch_test::test_time_to_record(); +} BOOST_AUTO_TEST_SUITE_END() From 438824b0c49e63cd54cd9a77402a14e46c436a7e Mon Sep 17 00:00:00 2001 From: Guihao Liang Date: Fri, 8 May 2020 13:12:32 -0700 Subject: [PATCH 08/11] minor chagne. done with my due --- src/core/storage/fileio/read_caching_device.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/storage/fileio/read_caching_device.hpp b/src/core/storage/fileio/read_caching_device.hpp index 7f41c9e4b2..611c5e76fd 100644 --- a/src/core/storage/fileio/read_caching_device.hpp +++ b/src/core/storage/fileio/read_caching_device.hpp @@ -83,7 +83,7 @@ class StopWatch { } if (!threads_.insert(std::this_thread::get_id()).second) { - logstream(LOG_ERROR) << "this thread " << std::this_thread::get_id() + logstream(LOG_DEBUG) << "this thread " << std::this_thread::get_id() << "already starts the clock" << std::endl; } } @@ -140,7 +140,7 @@ class StopWatch { // singleton destroyed since it's non-copy and movable, // any dangling pointer or reference to this stop_watah is illegal - ~StopWatch() { stop(true); } + ~StopWatch() { stop(/*no throw */ true); } private: uint64_t interval_; From 1fabc03574b376e2641d57317ff4cfcdc68bace9 Mon Sep 17 00:00:00 2001 From: Guihao Liang Date: Fri, 8 May 2020 13:59:42 -0700 Subject: [PATCH 09/11] add more docs for ppl to understand. --- src/core/storage/fileio/read_caching_device.hpp | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/core/storage/fileio/read_caching_device.hpp b/src/core/storage/fileio/read_caching_device.hpp index 611c5e76fd..52ea23d605 100644 --- a/src/core/storage/fileio/read_caching_device.hpp +++ b/src/core/storage/fileio/read_caching_device.hpp @@ -107,7 +107,7 @@ class StopWatch { /* * @return: int. number of threads still holding */ - int stop(bool no_throw = false) { + size_t stop(bool no_throw = false) { std::lock_guard lk(mx_); if (threads_.count(std::this_thread::get_id())) { @@ -190,6 +190,8 @@ class read_caching_device { std::lock_guard file_size_guard(m_filesize_cache_mutex); auto iter = m_filename_to_filesize_map.find(filename); if (iter != m_filename_to_filesize_map.end()) { + // this means some thread is reading this file + // so it's okay to reuse the meta data m_file_size = iter->second; m_filename_to_stop_watch[filename]->start(); } else { @@ -247,13 +249,12 @@ class read_caching_device { m_contents.reset(); { std::lock_guard file_size_guard(m_filesize_cache_mutex); - m_filename_to_filesize_map.erase(m_filename); auto iter = m_filename_to_stop_watch.find(m_filename); if (iter != m_filename_to_stop_watch.end()) - // use no throw since this nanny thread may not start clock + // if there are no other threads reading the same file if (iter->second->stop(/* no throw */ true) == 0) { - // nobody is holding m_filename_to_stop_watch.erase(iter); + m_filename_to_filesize_map.erase(m_filename); } } } @@ -301,7 +302,7 @@ class read_caching_device { std::streamsize write(const char* strm_ptr, std::streamsize n) { { std::lock_guard file_size_guard(m_filesize_cache_mutex); - if (m_filename_to_filesize_map.count(m_filename) == 0) + if (m_filename_to_stop_watch.count(m_filename) == 0) log_and_throw("write through closed files handle"); m_filename_to_stop_watch[m_filename]->start(); } @@ -404,7 +405,7 @@ class read_caching_device { { std::lock_guard file_size_guard(m_filesize_cache_mutex); - if (m_filename_to_filesize_map.count(m_filename) == 0) + if (m_filename_to_stop_watch.count(m_filename) == 0) log_and_throw("read through closed files handle"); m_filename_to_stop_watch[m_filename]->start(); } From 4c18f5563e75deb8bfc6a94d597915185ae4dcaa Mon Sep 17 00:00:00 2001 From: Guihao Liang Date: Fri, 8 May 2020 16:09:54 -0700 Subject: [PATCH 10/11] upload is not single threaded. Okay, we need to guard it --- src/core/storage/fileio/read_caching_device.hpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/core/storage/fileio/read_caching_device.hpp b/src/core/storage/fileio/read_caching_device.hpp index 52ea23d605..4e32b22351 100644 --- a/src/core/storage/fileio/read_caching_device.hpp +++ b/src/core/storage/fileio/read_caching_device.hpp @@ -232,14 +232,13 @@ class read_caching_device { // evict the file size cache { std::lock_guard file_size_guard(m_filesize_cache_mutex); - m_filename_to_filesize_map.erase(m_filename); - auto iter = m_filename_to_stop_watch.find(m_filename); if (iter != m_filename_to_stop_watch.end()) { // use no throw since this nanny thread may not start clock if (iter->second->stop(/* no throw */ true) == 0) { // nobody is holding m_filename_to_stop_watch.erase(iter); + m_filename_to_filesize_map.erase(m_filename); } } } From e163555936277594022ed0fad02c30d82e64a645 Mon Sep 17 00:00:00 2001 From: Guihao Liang Date: Fri, 8 May 2020 17:26:54 -0700 Subject: [PATCH 11/11] one last segfault for upload --- .../storage/fileio/read_caching_device.hpp | 47 +++++++++++++++---- 1 file changed, 39 insertions(+), 8 deletions(-) diff --git a/src/core/storage/fileio/read_caching_device.hpp b/src/core/storage/fileio/read_caching_device.hpp index 4e32b22351..c792b2b448 100644 --- a/src/core/storage/fileio/read_caching_device.hpp +++ b/src/core/storage/fileio/read_caching_device.hpp @@ -19,6 +19,7 @@ namespace turi { // private namespace namespace { const size_t READ_CACHING_BLOCK_SIZE = 64 * 1024 * 1024; // 64 MB +const char* const WRITE_STREAM_SUFFIX = ".write"; } // namespace /* @@ -208,7 +209,16 @@ class read_caching_device { m_filename_to_stop_watch[filename]->start(); } } else { - m_contents = std::make_shared(filename, write); + // for write stream, should be treated different with read stream + m_contents = std::make_shared(m_filename, write); + auto watch_key = m_filename + WRITE_STREAM_SUFFIX; + if (m_filename_to_stop_watch.find(watch_key) == + m_filename_to_stop_watch.end()) { + m_filename_to_stop_watch[watch_key] = + std::unique_ptr(new StopWatchSec_t(30)); + } + // start the clock immediately to register the thread id into it + m_filename_to_stop_watch[watch_key]->start(); } m_writing = write; @@ -232,13 +242,24 @@ class read_caching_device { // evict the file size cache { std::lock_guard file_size_guard(m_filesize_cache_mutex); - auto iter = m_filename_to_stop_watch.find(m_filename); + auto iter = + m_filename_to_stop_watch.find(m_filename + WRITE_STREAM_SUFFIX); if (iter != m_filename_to_stop_watch.end()) { // use no throw since this nanny thread may not start clock if (iter->second->stop(/* no throw */ true) == 0) { + /* + * the last part upload mostly happens at `close`, which is by + * design of the async S3 multipart uploading; Unlike read stream, + * each read a sync operation. + * */ + logprogress_stream + << "Finished uploading " << sanitize_url(m_filename) + << ". Elapsed time " + << iter->second->template duration() + .count() + << " seconds" << std::endl; // nobody is holding m_filename_to_stop_watch.erase(iter); - m_filename_to_filesize_map.erase(m_filename); } } } @@ -299,22 +320,32 @@ class read_caching_device { } std::streamsize write(const char* strm_ptr, std::streamsize n) { + auto watch_key = m_filename + WRITE_STREAM_SUFFIX; + { std::lock_guard file_size_guard(m_filesize_cache_mutex); - if (m_filename_to_stop_watch.count(m_filename) == 0) + if (m_filename_to_stop_watch.count(watch_key) == 0) log_and_throw("write through closed files handle"); - m_filename_to_stop_watch[m_filename]->start(); + m_filename_to_stop_watch[watch_key]->start(); } auto bytes_written = get_contents()->write(strm_ptr, n); { + /* + * usually `n` is 4096; 4KB. + * it relies on the underlying buffer of m_contents implementation. + * For s3 impl, once the buffer is full, the upload may happen through + * network. Again, really depend on how AWS S3 multipart upload works. + * + * besides that, this first writes stream to local disk, that is to say, + * elapsed time measured here can be very small. + * */ std::lock_guard file_size_guard(m_filesize_cache_mutex); - auto& stop_watch = m_filename_to_stop_watch[m_filename]; + auto& stop_watch = m_filename_to_stop_watch[watch_key]; if (stop_watch->is_time_to_record()) { logprogress_stream - << "Finish uploading " << sanitize_url(m_filename) - << ". Elapsed time " + << "Uploading " << sanitize_url(m_filename) << ". Elapsed time " << stop_watch->template duration().count() << " seconds" << std::endl; }