diff --git a/src/core/storage/fileio/read_caching_device.hpp b/src/core/storage/fileio/read_caching_device.hpp index fe5197813c..c792b2b448 100644 --- a/src/core/storage/fileio/read_caching_device.hpp +++ b/src/core/storage/fileio/read_caching_device.hpp @@ -1,23 +1,159 @@ /* Copyright © 2017 Apple Inc. All rights reserved. * * Use of this source code is governed by a BSD-3-clause license that can - * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause + * be found in the LICENSE.txt file or at + * https://opensource.org/licenses/BSD-3-Clause */ #ifndef TURI_FILEIO_CACHING_DEVICE_HPP #define TURI_FILEIO_CACHING_DEVICE_HPP #include +#include #include #include -#include #include -#include #include +#include +#include namespace turi { // private namespace namespace { -const size_t READ_CACHING_BLOCK_SIZE = 64*1024*1024; // 64 MB -} // end private namespace +const size_t READ_CACHING_BLOCK_SIZE = 64 * 1024 * 1024; // 64 MB +const char* const WRITE_STREAM_SUFFIX = ".write"; +} // namespace + +/* + * For each parallel (streamlined) network IO, we can view them as intervals + * in parallel universe. + * + * What we do is to merge all universe into one: the very start and the + * very end of the entire IO, which is the elapsed time, using the same time + * axis. + * t1 |-----| + * t2 |----| + * t3 |-----| + * s ------------ e + * + * Since SFrame can be streamed, meaning it does not download all of the files + * at once. It may have multiple times of fetching data. So it consists many + * small parts of IO activities. + * + * |----|<- less than 150 ms -> |----| + * s ------------------------------- e + * + * if 2 IO acticity are only separated by less than 150 ms time intervals, + * they should be considered as the same part of an IO process, instead of 2. + * + * |-----| <- 10 mins -> |-------| + * s --- e s ----- e + * + * Since 2 adjacent IO activities are too distant, we should view it as 2 + * separate activities. For example, user may play with first 1000 rows, and + * later the user jumps to the tail of the frame to mess around with the data. + * + */ +template +class StopWatch { + public: + using steady_clock = std::chrono::steady_clock; + using my_time_t = decltype(steady_clock::now()); + StopWatch(size_t interval) + : interval_(interval), beg_(steady_clock::now()), end_(beg_) {} + + StopWatch(const StopWatch&) = delete; + StopWatch(StopWatch&& rhs) = delete; + + /* + * start will register the thread id so that client can use if (stop() == 0) + * to decide whether stop watch should be reallocated. + * + * ideally, use start once and close it once with stop. + * This impl allows double start. + * */ + void start() { + std::lock_guard lk(mx_); + + if (threads_.size() == 0) { + using milliseconds = std::chrono::milliseconds; + auto lag = std::chrono::duration_cast( + std::chrono::steady_clock::now() - beg_); + if (lag > milliseconds{150}) { + beg_ = steady_clock::now(); + mile_stone_ = beg_; + } + } + + if (!threads_.insert(std::this_thread::get_id()).second) { + logstream(LOG_DEBUG) << "this thread " << std::this_thread::get_id() + << "already starts the clock" << std::endl; + } + } + + bool is_time_to_record() { + std::lock_guard lock(mx_); + + if (threads_.size() == 0) return false; + + // reach to timepoint, log it. + auto cur = steady_clock::now(); + if (cur > mile_stone_) { + mile_stone_ = cur + T(interval_); + return true; + } + + // no need to record + return false; + } + + /* + * @return: int. number of threads still holding + */ + size_t stop(bool no_throw = false) { + std::lock_guard lk(mx_); + + if (threads_.count(std::this_thread::get_id())) { + threads_.erase(std::this_thread::get_id()); + // last man standing + if (threads_.size() == 0) { + end_ = steady_clock::now(); + return 0; + } + // still running + return threads_.size(); + } + + // no-op + if (no_throw) return threads_.size(); + + std_log_and_throw(std::logic_error, "you don't own this stop watch"); + } + + template + Output duration() const { + std::lock_guard lk(mx_); + // the clock is still on + if (!threads_.empty()) { + return std::chrono::duration_cast(steady_clock::now() - beg_); + } + + return std::chrono::duration_cast(end_ - beg_); + } + + // singleton destroyed since it's non-copy and movable, + // any dangling pointer or reference to this stop_watah is illegal + ~StopWatch() { stop(/*no throw */ true); } + + private: + uint64_t interval_; + my_time_t beg_; + my_time_t end_; + my_time_t mile_stone_; + mutable std::mutex mx_; + + std::set threads_; +}; + +using StopWatchSec_t = StopWatch; /** * \ingroup fileio @@ -33,18 +169,18 @@ const size_t READ_CACHING_BLOCK_SIZE = 64*1024*1024; // 64 MB * * After: * \code - * typedef boost::iostreams::stream > s3_fstream; - * \endcode + * typedef boost::iostreams::stream > + * s3_fstream; \endcode * * It uses the \ref block_cache to pro */ template class read_caching_device { - public: // boost iostream concepts + public: // boost iostream concepts typedef typename T::char_type char_type; typedef typename T::category category; - read_caching_device() { } + read_caching_device() {} read_caching_device(const std::string& filename, const bool write = false) { logstream(LOG_DEBUG) << "read_cachine_device: " << filename << std::endl; @@ -55,14 +191,34 @@ class read_caching_device { std::lock_guard file_size_guard(m_filesize_cache_mutex); auto iter = m_filename_to_filesize_map.find(filename); if (iter != m_filename_to_filesize_map.end()) { + // this means some thread is reading this file + // so it's okay to reuse the meta data m_file_size = iter->second; + m_filename_to_stop_watch[filename]->start(); } else { m_contents = std::make_shared(filename, write); m_file_size = m_contents->file_size(); m_filename_to_filesize_map[filename] = m_file_size; + // report downloading every 30s + if (m_filename_to_stop_watch.find(filename) == + m_filename_to_stop_watch.end()) { + m_filename_to_stop_watch[filename] = + std::unique_ptr(new StopWatchSec_t(30)); + } + // start the clock immediately to register the thread id into it + m_filename_to_stop_watch[filename]->start(); } } else { - m_contents = std::make_shared(filename, write); + // for write stream, should be treated different with read stream + m_contents = std::make_shared(m_filename, write); + auto watch_key = m_filename + WRITE_STREAM_SUFFIX; + if (m_filename_to_stop_watch.find(watch_key) == + m_filename_to_stop_watch.end()) { + m_filename_to_stop_watch[watch_key] = + std::unique_ptr(new StopWatchSec_t(30)); + } + // start the clock immediately to register the thread id into it + m_filename_to_stop_watch[watch_key]->start(); } m_writing = write; @@ -78,20 +234,49 @@ class read_caching_device { // evict all blocks for this key auto& bc = block_cache::get_instance(); size_t block_number = 0; - while(1) { + while (1) { std::string key = get_key_name(block_number); if (bc.evict_key(key) == false) break; - ++ block_number; + ++block_number; } // evict the file size cache { std::lock_guard file_size_guard(m_filesize_cache_mutex); - m_filename_to_filesize_map.erase(m_filename); + auto iter = + m_filename_to_stop_watch.find(m_filename + WRITE_STREAM_SUFFIX); + if (iter != m_filename_to_stop_watch.end()) { + // use no throw since this nanny thread may not start clock + if (iter->second->stop(/* no throw */ true) == 0) { + /* + * the last part upload mostly happens at `close`, which is by + * design of the async S3 multipart uploading; Unlike read stream, + * each read a sync operation. + * */ + logprogress_stream + << "Finished uploading " << sanitize_url(m_filename) + << ". Elapsed time " + << iter->second->template duration() + .count() + << " seconds" << std::endl; + // nobody is holding + m_filename_to_stop_watch.erase(iter); + } + } } } else if (mode == std::ios_base::in && !m_writing) { if (m_contents) m_contents->close(mode); m_contents.reset(); + { + std::lock_guard file_size_guard(m_filesize_cache_mutex); + auto iter = m_filename_to_stop_watch.find(m_filename); + if (iter != m_filename_to_stop_watch.end()) + // if there are no other threads reading the same file + if (iter->second->stop(/* no throw */ true) == 0) { + m_filename_to_stop_watch.erase(iter); + m_filename_to_filesize_map.erase(m_filename); + } + } } } @@ -111,18 +296,18 @@ class read_caching_device { std::streamsize ret = 0; - while(n > 0) { + while (n > 0) { // the block number containing the offset. auto block_number = m_file_pos / READ_CACHING_BLOCK_SIZE; // the offset inside the block auto block_offset = m_file_pos % READ_CACHING_BLOCK_SIZE; - // number of bytes I can read inside this block before I hit the next block - size_t n_bytes = (block_number + 1) * READ_CACHING_BLOCK_SIZE - m_file_pos; + // number of bytes I can read inside this block before I hit the next + // block + size_t n_bytes = + (block_number + 1) * READ_CACHING_BLOCK_SIZE - m_file_pos; n_bytes = std::min(n_bytes, n); - bool success = fetch_block(strm_ptr + ret, - block_number, - block_offset, - n_bytes); + bool success = + fetch_block(strm_ptr + ret, block_number, block_offset, n_bytes); if (success == false) { log_and_throw(std::string("Unable to read ") + m_filename); } @@ -135,18 +320,46 @@ class read_caching_device { } std::streamsize write(const char* strm_ptr, std::streamsize n) { - return get_contents()->write(strm_ptr, n); - } + auto watch_key = m_filename + WRITE_STREAM_SUFFIX; - bool good() const { - return get_contents()->good(); + { + std::lock_guard file_size_guard(m_filesize_cache_mutex); + if (m_filename_to_stop_watch.count(watch_key) == 0) + log_and_throw("write through closed files handle"); + m_filename_to_stop_watch[watch_key]->start(); + } + + auto bytes_written = get_contents()->write(strm_ptr, n); + + { + /* + * usually `n` is 4096; 4KB. + * it relies on the underlying buffer of m_contents implementation. + * For s3 impl, once the buffer is full, the upload may happen through + * network. Again, really depend on how AWS S3 multipart upload works. + * + * besides that, this first writes stream to local disk, that is to say, + * elapsed time measured here can be very small. + * */ + std::lock_guard file_size_guard(m_filesize_cache_mutex); + auto& stop_watch = m_filename_to_stop_watch[watch_key]; + if (stop_watch->is_time_to_record()) { + logprogress_stream + << "Uploading " << sanitize_url(m_filename) << ". Elapsed time " + << stop_watch->template duration().count() + << " seconds" << std::endl; + } + } + + return bytes_written; } + bool good() const { return get_contents()->good(); } + /** * Seeks to a different location. */ - std::streampos seek(std::streamoff off, - std::ios_base::seekdir way, + std::streampos seek(std::streamoff off, std::ios_base::seekdir way, std::ios_base::openmode openmode) { if (openmode == std::ios_base::in) { if (way == std::ios_base::beg) { @@ -155,7 +368,8 @@ class read_caching_device { m_file_pos = std::min(m_file_pos + off, m_file_size); m_file_pos = std::max(m_file_pos, 0); } else if (way == std::ios_base::end) { - m_file_pos = std::min(m_file_size + off - 1, m_file_size); + m_file_pos = + std::min(m_file_size + off - 1, m_file_size); m_file_pos = std::max(m_file_pos, 0); } return m_file_pos; @@ -169,14 +383,10 @@ class read_caching_device { * Returns (size_t)(-1) if there is no file opened, or if there is an * error obtaining the file size. */ - size_t file_size() const { - return m_file_size; - } + size_t file_size() const { return m_file_size; } /// Not supported - std::shared_ptr get_underlying_stream() { - return nullptr; - } + std::shared_ptr get_underlying_stream() { return nullptr; } private: std::string m_filename; @@ -187,6 +397,8 @@ class read_caching_device { static mutex m_filesize_cache_mutex; static std::map m_filename_to_filesize_map; + static std::map> + m_filename_to_stop_watch; std::shared_ptr& get_contents() { if (!m_contents) { @@ -196,35 +408,57 @@ class read_caching_device { } std::string get_key_name(size_t block_number) { // we generate a key name that will never appear in any filename - return m_filename + "////:" + std::to_string(block_number); + return m_filename + "////:" + std::to_string(block_number); } /** * Fetches the contents of a block. * Returns true on success and false on failure. */ - bool fetch_block(char* output, - size_t block_number, - size_t startpos, + bool fetch_block(char* output, size_t block_number, size_t startpos, size_t length) { auto& bc = block_cache::get_instance(); std::string key = get_key_name(block_number); int64_t ret = bc.read(key, output, startpos, startpos + length); if (static_cast(ret) == length) return true; - logstream(LOG_INFO) << "Fetching " << sanitize_url(m_filename) << " Block " << block_number << std::endl; + logstream(LOG_INFO) << "Fetching " << sanitize_url(m_filename) << " Block " + << block_number << std::endl; // ok. failure... no such block or block is bad. We read it ourselves. // read the whole block auto block_start = block_number * READ_CACHING_BLOCK_SIZE; - auto block_end = std::min(block_start + READ_CACHING_BLOCK_SIZE, m_file_size); + auto block_end = + std::min(block_start + READ_CACHING_BLOCK_SIZE, m_file_size); // seek to the block and read the whole block at once auto& contents = get_contents(); contents->seek(block_start, std::ios_base::beg, std::ios_base::in); std::string block_contents(block_end - block_start, 0); - auto bytes_read = contents->read(&(block_contents[0]), - block_end - block_start); + + { + std::lock_guard file_size_guard(m_filesize_cache_mutex); + if (m_filename_to_stop_watch.count(m_filename) == 0) + log_and_throw("read through closed files handle"); + m_filename_to_stop_watch[m_filename]->start(); + } + + auto bytes_read = + contents->read(&(block_contents[0]), block_end - block_start); + + { + std::lock_guard file_size_guard(m_filesize_cache_mutex); + auto& stop_watch = m_filename_to_stop_watch[m_filename]; + if (stop_watch->is_time_to_record()) { + logprogress_stream + << "Finished fetching block " << block_number << ". Elapsed " + << stop_watch->template duration().count() + << "s for downloading " << sanitize_url(m_filename) << std::endl; + } + } + // read failed. - static_assert(std::is_signed::value, "decltype(bytes_read) signed"); - static_assert(std::is_integral::value, "decltype(bytes_read) integral"); + static_assert(std::is_signed::value, + "decltype(bytes_read) signed"); + static_assert(std::is_integral::value, + "decltype(bytes_read) integral"); if (bytes_read < truncate_check(block_end - block_start)) { return false; } @@ -241,12 +475,17 @@ class read_caching_device { return true; } -}; // end of read_caching_device +}; // end of read_caching_device -template +template mutex read_caching_device::m_filesize_cache_mutex; -template -std::map read_caching_device::m_filename_to_filesize_map; -} // namespace turi +template +std::map + read_caching_device::m_filename_to_filesize_map; + +template +std::map> + read_caching_device::m_filename_to_stop_watch; +} // namespace turi #endif diff --git a/src/core/storage/fileio/s3_filesys.hpp b/src/core/storage/fileio/s3_filesys.hpp index 5640900971..d5b2b36a7d 100644 --- a/src/core/storage/fileio/s3_filesys.hpp +++ b/src/core/storage/fileio/s3_filesys.hpp @@ -109,8 +109,11 @@ class AWSReadStreamBase : public SeekStream { } virtual void Close() { - logstream(LOG_DEBUG) << "AWSReadStream::Close()" << std::endl; - Reset(file_size_); + if (!closed_) { + closed_ = true; + logstream(LOG_DEBUG) << "AWSReadStream::Close()" << std::endl; + Reset(file_size_); + } } virtual size_t Tell(void) { return curr_bytes_; } @@ -143,6 +146,7 @@ class AWSReadStreamBase : public SeekStream { protected: // the total size of the file size_t file_size_ = 0; + bool closed_ = false; s3url url_; private: diff --git a/test/fileio/CMakeLists.txt b/test/fileio/CMakeLists.txt index 519cd502dc..f0bc201868 100644 --- a/test/fileio/CMakeLists.txt +++ b/test/fileio/CMakeLists.txt @@ -12,6 +12,7 @@ make_boost_test(fixed_size_cache_manager_test.cxx REQUIRES unity_shared_for_testing) make_boost_test(general_fstream_test.cxx REQUIRES unity_shared_for_testing) make_boost_test(parse_hdfs_url_test.cxx REQUIRES unity_shared_for_testing) +make_boost_test(stop_watch_test.cxx REQUIRES unity_shared_for_testing) if(${TC_BUILD_REMOTEFS}) make_executable(s3_filesys_test SOURCES s3_filesys_test.cpp REQUIRES diff --git a/test/fileio/stop_watch_test.cxx b/test/fileio/stop_watch_test.cxx new file mode 100644 index 0000000000..a6f51b773e --- /dev/null +++ b/test/fileio/stop_watch_test.cxx @@ -0,0 +1,142 @@ +#define BOOST_TEST_MODULE +#include +#include +#include +#include + +using namespace turi; +using namespace std::chrono; +struct stop_watch_test { + using my_watch_t = StopWatch; + + public: + void test_stop_without_start(void) { + my_watch_t watch(1); + TS_ASSERT_THROWS_ANYTHING(watch.stop()); + } + + void test_double_start(void) { + my_watch_t watch(1); + watch.start(); + // no double start + TS_ASSERT_THROWS_NOTHING(watch.start()); + } + + void test_single_thread(void) { + my_watch_t watch(1); + watch.start(); + std::this_thread::sleep_for(milliseconds(2)); + TS_ASSERT(watch.duration() >= milliseconds(2)); + watch.stop(); + TS_ASSERT(watch.duration() >= milliseconds(2)); + } + + // main thread's stop watch stops as the last + void test_multi_thread_1(void) { + my_watch_t watch(1); + watch.start(); + auto start = std::chrono::steady_clock::now(); + std::thread t1([&watch]() { + watch.start(); + watch.stop(); + }); + + std::thread t2([&watch]() { + std::this_thread::sleep_for(milliseconds(1)); + watch.start(); + std::this_thread::sleep_for(milliseconds(1)); + watch.stop(); + }); + std::this_thread::sleep_for(milliseconds(10)); + t1.join(); + t2.join(); + auto stop = std::chrono::steady_clock::now(); + watch.stop(); + + TS_ASSERT(watch.duration() >= + std::chrono::duration_cast(stop - start)); + } + + // t2 stops as the last one + void test_multi_thread_2(void) { + my_watch_t watch(1); + watch.start(); + auto start = std::chrono::steady_clock::now(); + + std::thread t1([&watch]() { + watch.start(); + TS_ASSERT(watch.stop() > 0); + }); + + std::thread t2([&watch]() { + std::this_thread::sleep_for(milliseconds(1)); + watch.start(); + std::this_thread::sleep_for(milliseconds(25)); + // t2 is the last one to stop the watch + TS_ASSERT(watch.stop() == 0); + }); + + t1.join(); + std::this_thread::sleep_for(milliseconds(5)); + // main also stops + auto stop = std::chrono::steady_clock::now(); + TS_ASSERT(watch.stop() > 0); + + // the clock is still on becuase t2 is still running + TS_ASSERT(watch.duration() >= + std::chrono::duration_cast(stop - start)); + TS_ASSERT(watch.duration() >= milliseconds(5)); + + t2.join(); + TS_ASSERT(watch.duration() >= milliseconds(25)); + } + + void test_stop_and_continue(void) { + my_watch_t watch(1); + watch.start(); + watch.stop(); + // interval is less than 150ms + std::this_thread::sleep_for(milliseconds(3)); + watch.start(); + watch.stop(); + std::this_thread::sleep_for(milliseconds(3)); + watch.start(); + watch.stop(); + + TS_ASSERT(watch.duration() >= milliseconds(6)); + } + + void test_time_to_record(void) { + my_watch_t watch(5); + watch.start(); + TS_ASSERT(watch.is_time_to_record() == true); + TS_ASSERT(watch.is_time_to_record() == false); + std::this_thread::sleep_for(milliseconds(5)); + TS_ASSERT(watch.is_time_to_record() == true); + TS_ASSERT(watch.is_time_to_record() == false); + } +}; + +BOOST_FIXTURE_TEST_SUITE(_stop_watch_test, stop_watch_test) +BOOST_AUTO_TEST_CASE(test_stop_without_start) { + stop_watch_test::test_stop_without_start(); +} +BOOST_AUTO_TEST_CASE(test_double_start) { + stop_watch_test::test_double_start(); +} +BOOST_AUTO_TEST_CASE(test_single_thread) { + stop_watch_test::test_single_thread(); +} +BOOST_AUTO_TEST_CASE(test_multi_thread_1) { + stop_watch_test::test_multi_thread_1(); +} +BOOST_AUTO_TEST_CASE(test_multi_thread_2) { + stop_watch_test::test_multi_thread_2(); +} +BOOST_AUTO_TEST_CASE(test_stop_and_continue) { + stop_watch_test::test_stop_and_continue(); +} +BOOST_AUTO_TEST_CASE(test_time_to_record) { + stop_watch_test::test_time_to_record(); +} +BOOST_AUTO_TEST_SUITE_END()