diff --git a/src/core/storage/fileio/read_caching_device.hpp b/src/core/storage/fileio/read_caching_device.hpp index 6908430313..2beec2950a 100644 --- a/src/core/storage/fileio/read_caching_device.hpp +++ b/src/core/storage/fileio/read_caching_device.hpp @@ -19,8 +19,7 @@ namespace turi { // private namespace namespace { const size_t READ_CACHING_BLOCK_SIZE = 64 * 1024 * 1024; // 64 MB - -using namespace std::chrono; +} // namespace /* * For each parallel (streamlined) network IO, we can view them as intervals @@ -38,10 +37,10 @@ using namespace std::chrono; * at once. It may have multiple times of fetching data. So it consists many * small parts of IO activities. * - * |----|<- less than 100 ms -> |----| + * |----|<- less than 150 ms -> |----| * s ------------------------------- e * - * if 2 IO acticity are only separated by less than 50 ms time intervals, + * if 2 IO acticity are only separated by less than 150 ms time intervals, * they should be considered as the same part of an IO process, instead of 2. * * |-----| <- 10 mins -> |-------| @@ -55,6 +54,7 @@ using namespace std::chrono; template class StopWatch { public: + using steady_clock = std::chrono::steady_clock; using my_time_t = decltype(steady_clock::now()); StopWatch(size_t interval) : interval_(interval), beg_(steady_clock::now()), end_(beg_) {} @@ -64,20 +64,27 @@ class StopWatch { void start() { std::lock_guard lk(mx_); - if (thread_num_ == 0) { - if (duration_cast(steady_clock::now() - beg_) > - milliseconds{150}) { + + if (thread_set_.size() == 0) { + using milliseconds = std::chrono::milliseconds; + auto lag = std::chrono::duration_cast( + std::chrono::steady_clock::now() - beg_); + if (lag > milliseconds{150}) { beg_ = steady_clock::now(); mile_stone_ = beg_; } } - ++thread_num_; + + if (!thread_set_.insert(std::this_thread::get_id()).second) { + logstream(LOG_DEBUG) << "this thread " << std::this_thread::get_id() + << "already starts the clock" << std::endl; + } } bool is_time_to_record() { std::lock_guard lock(mx_); - if (thread_num_ == 0) return false; + if (thread_set_.size() == 0) return false; // reach to timepoint, log it. auto cur = steady_clock::now(); @@ -90,25 +97,39 @@ class StopWatch { return false; } - void end() { + /* + * @return: int. number of threads still holding + */ + int stop() { std::lock_guard lk(mx_); - if (thread_num_ > 0 && --thread_num_ == 0) { - end_ = steady_clock::now(); + if (thread_set_.count(std::this_thread::get_id())) { + thread_set_.erase(std::this_thread::get_id()); + // last man standing + if (thread_set_.size() == 0) { + end_ = steady_clock::now(); + return 0; + } + // still running + return thread_set_.size(); } + + return thread_set_.size(); } - template + template Output duration() const { std::lock_guard lk(mx_); // the clock is still on - if (thread_num_ > 0) { - return duration_cast(steady_clock::now() - beg_); + if (thread_set_.count(std::this_thread::get_id())) { + return std::chrono::duration_cast(steady_clock::now() - beg_); } - return duration_cast(end_ - beg_); + return std::chrono::duration_cast(end_ - beg_); } - ~StopWatch() { end(); } + // singleton destroyed since it's non-copy and movable, + // any dangling pointer or reference to this stop_watah is illegal + ~StopWatch() { stop(); } private: uint64_t interval_; @@ -116,13 +137,12 @@ class StopWatch { my_time_t end_; my_time_t mile_stone_; mutable std::mutex mx_; - size_t thread_num_; + + std::set threads_; }; using StopWatchSec_t = StopWatch; -} // namespace - /** * \ingroup fileio * Can be wrapped around any device implement to provide read caching. This @@ -165,8 +185,11 @@ class read_caching_device { m_file_size = m_contents->file_size(); m_filename_to_filesize_map[filename] = m_file_size; // report downloading every 30s - m_filename_to_stop_watch[filename] = - std::unique_ptr(new StopWatchSec_t(30)); + if (m_filename_to_filesize_map.find(filename) != + m_filename_to_filesize_map.end()) { + m_filename_to_stop_watch[filename] = + std::unique_ptr(new StopWatchSec_t(30)); + } } } else { m_contents = std::make_shared(filename, write); @@ -194,12 +217,24 @@ class read_caching_device { { std::lock_guard file_size_guard(m_filesize_cache_mutex); m_filename_to_filesize_map.erase(m_filename); - m_filename_to_stop_watch.erase(m_filename); + + auto iter = m_filename_to_stop_watch.find(m_filename); + if (iter != m_filename_to_stop_watch.end()) { + if (!iter->stop()) { + m_filename_to_stop_watch.erase(iter); + } + } } } else if (mode == std::ios_base::in && !m_writing) { if (m_contents) m_contents->close(mode); m_contents.reset(); + { + std::lock_guard file_size_guard(m_filesize_cache_mutex); + if (m_filename_to_stop_watch.find(m_filename) != + m_filename_to_stop_watch.end()) + m_filename_to_stop_watch[m_filename]->stop(); + } } } @@ -254,10 +289,11 @@ class read_caching_device { std::lock_guard file_size_guard(m_filesize_cache_mutex); auto& stop_watch = m_filename_to_stop_watch[m_filename]; if (stop_watch->is_time_to_record()) { - logprogress_stream << "Finish uploading " << sanitize_url(m_filename) - << ". Elapsed time " - << stop_watch->duration().count() - << " seconds" << std::endl; + logprogress_stream + << "Finish uploading " << sanitize_url(m_filename) + << ". Elapsed time " + << stop_watch->template duration().count() + << " seconds" << std::endl; } } @@ -355,11 +391,10 @@ class read_caching_device { std::lock_guard file_size_guard(m_filesize_cache_mutex); auto& stop_watch = m_filename_to_stop_watch[m_filename]; if (stop_watch->is_time_to_record()) { - logprogress_stream << "Finished fetching block " << block_number - << ". Elapsed " - << stop_watch->duration().count() - << "s for downloading " << sanitize_url(m_filename) - << std::endl; + logprogress_stream + << "Finished fetching block " << block_number << ". Elapsed " + << stop_watch->template duration().count() + << "s for downloading " << sanitize_url(m_filename) << std::endl; } } diff --git a/test/fileio/stop_watch_test.cxx b/test/fileio/stop_watch_test.cxx new file mode 100644 index 0000000000..723d7f1692 --- /dev/null +++ b/test/fileio/stop_watch_test.cxx @@ -0,0 +1,106 @@ +#define BOOST_TEST_MODULE +#include +#include +#include +#include + +using namespace turi; +using namespace std::chrono; +struct stop_watch_test { + using my_watch_t = StopWatch; + + public: + void test_end_without_start(void) { + my_watch_t watch = my_watch_t(1); + watch.end(); + TS_ASSERT_EQUALS(watch.templace duration(), milliseconds{0}); + TS_ASSERT_THROWS_NOTHING(watch.end()); + } + + void test_double_start(void) { + my_watch_t watch = my_watch_t(1); + watch.start(); + TS_ASSERT_THROWS_ANYTHING(watch.start()); + } + + void test_single_thread(void) { + my_watch_t watch = my_watch_t(1); + watch.start(); + std::this_thread::sleep_for(milliseconds(2)); + TS_ASSERT(watch.templace duration() >= milliseconds(2)); + watch.end(); + TS_ASSERT(watch.templace duration() >= milliseconds(2)); + } + + void test_multi_thread_1(void) { + my_watch_t watch = my_watch_t(1); + watch.start(); + auto start = std::chrono::steady_clock::now(); + std::thread t1([&watch]() { + watch.start(); + watch.end(); + }); + + std::thread t2([&watch]() { + std::this_thread::sleep_for(milliseconds(1)); + watch.start(); + std::this_thread::sleep_for(milliseconds(1)); + watch.end(); + }); + std::this_thread::sleep_for(milliseconds(10)); + t1.join(); + t2.join(); + auto end = std::chrono::steady_clock::now(); + watch.end(); + + TS_ASSERT(watch.templace duration() >= + std::chrono::duration_cast(end - start)); + } + + void test_multi_thread_2(void) { + my_watch_t watch = my_watch_t(1); + watch.start(); + auto start = std::chrono::steady_clock::now(); + std::thread t1([&watch]() { + watch.start(); + watch.end(); + }); + + std::thread t2([&watch]() { + std::this_thread::sleep_for(milliseconds(1)); + watch.start(); + std::this_thread::sleep_for(milliseconds(15)); + watch.end(); + }); + t1.join(); + std::this_thread::sleep_for(milliseconds(1)); + auto end = std::chrono::steady_clock::now(); + watch.end(); + TS_ASSERT(watch.templace duration() >= + std::chrono::duration_cast(end - start)); + TS_ASSERT(watch.templace duration() <= milliseconds > (15)); + t2.join(); + + TS_ASSERT(watch.templace duration() >= milliseconds > (15)); + watch.end(); + TS_ASSERT(watch.templace duration() >= milliseconds > (15)); + } +}; + +BOOST_FIXTURE_TEST_SUITE(_stop_watch_test, stop_watch_test) +BOOST_AUTO_TEST_CASE(test_end_without_start) { + stop_watch_test::test_end_without_start(); +} +BOOST_AUTO_TEST_CASE(test_double_start) { + stop_watch_test::test_double_start(); +} +BOOST_AUTO_TEST_CASE(test_single_thread) { + stop_watch_test::test_single_thread(); +} +BOOST_AUTO_TEST_CASE(test_multi_thread_1) { + stop_watch_test::test_multi_thread_1(); +} +BOOST_AUTO_TEST_CASE(test_multi_thread_2) { + stop_watch_test::test_multi_thread_2(); +} +BOOST_AUTO_TEST_SUITE_END()