diff --git a/src/core/storage/fileio/read_caching_device.hpp b/src/core/storage/fileio/read_caching_device.hpp index fe5197813c..6908430313 100644 --- a/src/core/storage/fileio/read_caching_device.hpp +++ b/src/core/storage/fileio/read_caching_device.hpp @@ -1,23 +1,127 @@ /* Copyright © 2017 Apple Inc. All rights reserved. * * Use of this source code is governed by a BSD-3-clause license that can - * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause + * be found in the LICENSE.txt file or at + * https://opensource.org/licenses/BSD-3-Clause */ #ifndef TURI_FILEIO_CACHING_DEVICE_HPP #define TURI_FILEIO_CACHING_DEVICE_HPP #include +#include #include #include -#include #include -#include #include +#include +#include namespace turi { // private namespace namespace { -const size_t READ_CACHING_BLOCK_SIZE = 64*1024*1024; // 64 MB -} // end private namespace +const size_t READ_CACHING_BLOCK_SIZE = 64 * 1024 * 1024; // 64 MB + +using namespace std::chrono; + +/* + * For each parallel (streamlined) network IO, we can view them as intervals + * in parallel universe. + * + * What we do is to merge all universe into one: the very start and the + * very end of the entire IO, which is the elapsed time, using the same time + * axis. + * t1 |-----| + * t2 |----| + * t3 |-----| + * s ------------ e + * + * Since SFrame can be streamed, meaning it does not download all of the files + * at once. It may have multiple times of fetching data. So it consists many + * small parts of IO activities. + * + * |----|<- less than 100 ms -> |----| + * s ------------------------------- e + * + * if 2 IO acticity are only separated by less than 50 ms time intervals, + * they should be considered as the same part of an IO process, instead of 2. + * + * |-----| <- 10 mins -> |-------| + * s --- e s ----- e + * + * Since 2 adjacent IO activities are too distant, we should view it as 2 + * separate activities. For example, user may play with first 1000 rows, and + * later the user jumps to the tail of the frame to mess around with the data. + * + */ +template +class StopWatch { + public: + using my_time_t = decltype(steady_clock::now()); + StopWatch(size_t interval) + : interval_(interval), beg_(steady_clock::now()), end_(beg_) {} + + StopWatch(const StopWatch&) = delete; + StopWatch(StopWatch&& rhs) = delete; + + void start() { + std::lock_guard lk(mx_); + if (thread_num_ == 0) { + if (duration_cast(steady_clock::now() - beg_) > + milliseconds{150}) { + beg_ = steady_clock::now(); + mile_stone_ = beg_; + } + } + ++thread_num_; + } + + bool is_time_to_record() { + std::lock_guard lock(mx_); + + if (thread_num_ == 0) return false; + + // reach to timepoint, log it. + auto cur = steady_clock::now(); + if (cur > mile_stone_) { + mile_stone_ = cur + T(interval_); + return true; + } + + // no need to record + return false; + } + + void end() { + std::lock_guard lk(mx_); + if (thread_num_ > 0 && --thread_num_ == 0) { + end_ = steady_clock::now(); + } + } + + template + Output duration() const { + std::lock_guard lk(mx_); + // the clock is still on + if (thread_num_ > 0) { + return duration_cast(steady_clock::now() - beg_); + } + + return duration_cast(end_ - beg_); + } + + ~StopWatch() { end(); } + + private: + uint64_t interval_; + my_time_t beg_; + my_time_t end_; + my_time_t mile_stone_; + mutable std::mutex mx_; + size_t thread_num_; +}; + +using StopWatchSec_t = StopWatch; + +} // namespace /** * \ingroup fileio @@ -33,18 +137,18 @@ const size_t READ_CACHING_BLOCK_SIZE = 64*1024*1024; // 64 MB * * After: * \code - * typedef boost::iostreams::stream > s3_fstream; - * \endcode + * typedef boost::iostreams::stream > + * s3_fstream; \endcode * * It uses the \ref block_cache to pro */ template class read_caching_device { - public: // boost iostream concepts + public: // boost iostream concepts typedef typename T::char_type char_type; typedef typename T::category category; - read_caching_device() { } + read_caching_device() {} read_caching_device(const std::string& filename, const bool write = false) { logstream(LOG_DEBUG) << "read_cachine_device: " << filename << std::endl; @@ -60,6 +164,9 @@ class read_caching_device { m_contents = std::make_shared(filename, write); m_file_size = m_contents->file_size(); m_filename_to_filesize_map[filename] = m_file_size; + // report downloading every 30s + m_filename_to_stop_watch[filename] = + std::unique_ptr(new StopWatchSec_t(30)); } } else { m_contents = std::make_shared(filename, write); @@ -78,15 +185,16 @@ class read_caching_device { // evict all blocks for this key auto& bc = block_cache::get_instance(); size_t block_number = 0; - while(1) { + while (1) { std::string key = get_key_name(block_number); if (bc.evict_key(key) == false) break; - ++ block_number; + ++block_number; } // evict the file size cache { std::lock_guard file_size_guard(m_filesize_cache_mutex); m_filename_to_filesize_map.erase(m_filename); + m_filename_to_stop_watch.erase(m_filename); } } else if (mode == std::ios_base::in && !m_writing) { @@ -111,18 +219,18 @@ class read_caching_device { std::streamsize ret = 0; - while(n > 0) { + while (n > 0) { // the block number containing the offset. auto block_number = m_file_pos / READ_CACHING_BLOCK_SIZE; // the offset inside the block auto block_offset = m_file_pos % READ_CACHING_BLOCK_SIZE; - // number of bytes I can read inside this block before I hit the next block - size_t n_bytes = (block_number + 1) * READ_CACHING_BLOCK_SIZE - m_file_pos; + // number of bytes I can read inside this block before I hit the next + // block + size_t n_bytes = + (block_number + 1) * READ_CACHING_BLOCK_SIZE - m_file_pos; n_bytes = std::min(n_bytes, n); - bool success = fetch_block(strm_ptr + ret, - block_number, - block_offset, - n_bytes); + bool success = + fetch_block(strm_ptr + ret, block_number, block_offset, n_bytes); if (success == false) { log_and_throw(std::string("Unable to read ") + m_filename); } @@ -135,18 +243,33 @@ class read_caching_device { } std::streamsize write(const char* strm_ptr, std::streamsize n) { - return get_contents()->write(strm_ptr, n); - } + { + std::lock_guard file_size_guard(m_filesize_cache_mutex); + m_filename_to_stop_watch[m_filename]->start(); + } + + auto bytes_written = get_contents()->write(strm_ptr, n); + + { + std::lock_guard file_size_guard(m_filesize_cache_mutex); + auto& stop_watch = m_filename_to_stop_watch[m_filename]; + if (stop_watch->is_time_to_record()) { + logprogress_stream << "Finish uploading " << sanitize_url(m_filename) + << ". Elapsed time " + << stop_watch->duration().count() + << " seconds" << std::endl; + } + } - bool good() const { - return get_contents()->good(); + return bytes_written; } + bool good() const { return get_contents()->good(); } + /** * Seeks to a different location. */ - std::streampos seek(std::streamoff off, - std::ios_base::seekdir way, + std::streampos seek(std::streamoff off, std::ios_base::seekdir way, std::ios_base::openmode openmode) { if (openmode == std::ios_base::in) { if (way == std::ios_base::beg) { @@ -155,7 +278,8 @@ class read_caching_device { m_file_pos = std::min(m_file_pos + off, m_file_size); m_file_pos = std::max(m_file_pos, 0); } else if (way == std::ios_base::end) { - m_file_pos = std::min(m_file_size + off - 1, m_file_size); + m_file_pos = + std::min(m_file_size + off - 1, m_file_size); m_file_pos = std::max(m_file_pos, 0); } return m_file_pos; @@ -169,14 +293,10 @@ class read_caching_device { * Returns (size_t)(-1) if there is no file opened, or if there is an * error obtaining the file size. */ - size_t file_size() const { - return m_file_size; - } + size_t file_size() const { return m_file_size; } /// Not supported - std::shared_ptr get_underlying_stream() { - return nullptr; - } + std::shared_ptr get_underlying_stream() { return nullptr; } private: std::string m_filename; @@ -187,6 +307,8 @@ class read_caching_device { static mutex m_filesize_cache_mutex; static std::map m_filename_to_filesize_map; + static std::map> + m_filename_to_stop_watch; std::shared_ptr& get_contents() { if (!m_contents) { @@ -196,35 +318,56 @@ class read_caching_device { } std::string get_key_name(size_t block_number) { // we generate a key name that will never appear in any filename - return m_filename + "////:" + std::to_string(block_number); + return m_filename + "////:" + std::to_string(block_number); } /** * Fetches the contents of a block. * Returns true on success and false on failure. */ - bool fetch_block(char* output, - size_t block_number, - size_t startpos, + bool fetch_block(char* output, size_t block_number, size_t startpos, size_t length) { auto& bc = block_cache::get_instance(); std::string key = get_key_name(block_number); int64_t ret = bc.read(key, output, startpos, startpos + length); if (static_cast(ret) == length) return true; - logstream(LOG_INFO) << "Fetching " << sanitize_url(m_filename) << " Block " << block_number << std::endl; + logstream(LOG_INFO) << "Fetching " << sanitize_url(m_filename) << " Block " + << block_number << std::endl; // ok. failure... no such block or block is bad. We read it ourselves. // read the whole block auto block_start = block_number * READ_CACHING_BLOCK_SIZE; - auto block_end = std::min(block_start + READ_CACHING_BLOCK_SIZE, m_file_size); + auto block_end = + std::min(block_start + READ_CACHING_BLOCK_SIZE, m_file_size); // seek to the block and read the whole block at once auto& contents = get_contents(); contents->seek(block_start, std::ios_base::beg, std::ios_base::in); std::string block_contents(block_end - block_start, 0); - auto bytes_read = contents->read(&(block_contents[0]), - block_end - block_start); + + { + std::lock_guard file_size_guard(m_filesize_cache_mutex); + m_filename_to_stop_watch[m_filename]->start(); + } + + auto bytes_read = + contents->read(&(block_contents[0]), block_end - block_start); + + { + std::lock_guard file_size_guard(m_filesize_cache_mutex); + auto& stop_watch = m_filename_to_stop_watch[m_filename]; + if (stop_watch->is_time_to_record()) { + logprogress_stream << "Finished fetching block " << block_number + << ". Elapsed " + << stop_watch->duration().count() + << "s for downloading " << sanitize_url(m_filename) + << std::endl; + } + } + // read failed. - static_assert(std::is_signed::value, "decltype(bytes_read) signed"); - static_assert(std::is_integral::value, "decltype(bytes_read) integral"); + static_assert(std::is_signed::value, + "decltype(bytes_read) signed"); + static_assert(std::is_integral::value, + "decltype(bytes_read) integral"); if (bytes_read < truncate_check(block_end - block_start)) { return false; } @@ -241,12 +384,17 @@ class read_caching_device { return true; } -}; // end of read_caching_device +}; // end of read_caching_device -template +template mutex read_caching_device::m_filesize_cache_mutex; -template -std::map read_caching_device::m_filename_to_filesize_map; -} // namespace turi +template +std::map + read_caching_device::m_filename_to_filesize_map; + +template +std::map> + read_caching_device::m_filename_to_stop_watch; +} // namespace turi #endif diff --git a/src/core/storage/fileio/s3_filesys.cpp b/src/core/storage/fileio/s3_filesys.cpp index 96a2d4e253..ff5b93dde0 100644 --- a/src/core/storage/fileio/s3_filesys.cpp +++ b/src/core/storage/fileio/s3_filesys.cpp @@ -93,12 +93,6 @@ int AWSReadStreamBase::FillBuffer(char *input_ptr, size_t nwant) { // range is includsive and zero based ss << "bytes=" << curr_bytes_ << '-' << curr_bytes_ + nwant - 1; - std::string url_string = url_.string_from_s3url(false); - logprogress_stream << "Start downloading " << url_string << ", " << ss.str() - << std::endl; - - auto start = std::chrono::steady_clock::now(); - Aws::S3::Model::GetObjectRequest object_request; object_request.SetRange(ss.str().c_str()); object_request.SetBucket(url_.bucket.c_str()); @@ -121,11 +115,6 @@ int AWSReadStreamBase::FillBuffer(char *input_ptr, size_t nwant) { // std::istreambuf_iterator retrieved_file.read(input_ptr, nwant); - auto end = std::chrono::steady_clock::now(); - logprogress_stream - << "Finished downloading" << url_string << ". Duration: " - << std::chrono::duration_cast(end - start).count() - << " seconds" << std::endl; } else { auto error = get_object_outcome.GetError(); ss.str(""); @@ -181,12 +170,6 @@ void WriteStream::Upload(bool force_upload) { // store the future into completed parts completed_parts_.push_back(s3_client_.UploadPartCallable(my_request)); - - logprogress_stream << "Uploading part " << my_request.GetPartNumber() - << " of " << url_.string_from_s3url(false) - << ", with size " - << (double)(buffer_.size()) / (1024 * 1024) << " MB" - << std::endl; } void WriteStream::Finish() { @@ -221,9 +204,6 @@ void WriteStream::Finish() { logstream(LOG_ERROR) << ss.str() << std::endl; log_and_throw_io_failure(ss.str()); } - - logprogress_stream << "Finished uploading all parts of " - << url_.string_from_s3url(false) << std::endl; } /*! diff --git a/src/core/storage/fileio/s3_filesys.hpp b/src/core/storage/fileio/s3_filesys.hpp index 5640900971..d5b2b36a7d 100644 --- a/src/core/storage/fileio/s3_filesys.hpp +++ b/src/core/storage/fileio/s3_filesys.hpp @@ -109,8 +109,11 @@ class AWSReadStreamBase : public SeekStream { } virtual void Close() { - logstream(LOG_DEBUG) << "AWSReadStream::Close()" << std::endl; - Reset(file_size_); + if (!closed_) { + closed_ = true; + logstream(LOG_DEBUG) << "AWSReadStream::Close()" << std::endl; + Reset(file_size_); + } } virtual size_t Tell(void) { return curr_bytes_; } @@ -143,6 +146,7 @@ class AWSReadStreamBase : public SeekStream { protected: // the total size of the file size_t file_size_ = 0; + bool closed_ = false; s3url url_; private: