Skip to content
This repository has been archived by the owner on Dec 21, 2023. It is now read-only.

Commit

Permalink
partial work
Browse files Browse the repository at this point in the history
  • Loading branch information
Guihao Liang committed May 8, 2020
1 parent 9ce8f2c commit fb7394b
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 32 deletions.
99 changes: 67 additions & 32 deletions src/core/storage/fileio/read_caching_device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ namespace turi {
// private namespace
namespace {
const size_t READ_CACHING_BLOCK_SIZE = 64 * 1024 * 1024; // 64 MB

using namespace std::chrono;
} // namespace

/*
* For each parallel (streamlined) network IO, we can view them as intervals
Expand All @@ -38,10 +37,10 @@ using namespace std::chrono;
* at once. It may have multiple times of fetching data. So it consists many
* small parts of IO activities.
*
* |----|<- less than 100 ms -> |----|
* |----|<- less than 150 ms -> |----|
* s ------------------------------- e
*
* if 2 IO acticity are only separated by less than 50 ms time intervals,
* if 2 IO acticity are only separated by less than 150 ms time intervals,
* they should be considered as the same part of an IO process, instead of 2.
*
* |-----| <- 10 mins -> |-------|
Expand All @@ -55,6 +54,7 @@ using namespace std::chrono;
template <class T>
class StopWatch {
public:
using steady_clock = std::chrono::steady_clock;
using my_time_t = decltype(steady_clock::now());
StopWatch(size_t interval)
: interval_(interval), beg_(steady_clock::now()), end_(beg_) {}
Expand All @@ -64,20 +64,27 @@ class StopWatch {

void start() {
std::lock_guard<std::mutex> lk(mx_);
if (thread_num_ == 0) {
if (duration_cast<milliseconds>(steady_clock::now() - beg_) >
milliseconds{150}) {

if (thread_set_.size() == 0) {
using milliseconds = std::chrono::milliseconds;
auto lag = std::chrono::duration_cast<milliseconds>(
std::chrono::steady_clock::now() - beg_);
if (lag > milliseconds{150}) {
beg_ = steady_clock::now();
mile_stone_ = beg_;
}
}
++thread_num_;

if (!thread_set_.insert(std::this_thread::get_id()).second) {
logstream(LOG_DEBUG) << "this thread " << std::this_thread::get_id()
<< "already starts the clock" << std::endl;
}
}

bool is_time_to_record() {
std::lock_guard<std::mutex> lock(mx_);

if (thread_num_ == 0) return false;
if (thread_set_.size() == 0) return false;

// reach to timepoint, log it.
auto cur = steady_clock::now();
Expand All @@ -90,39 +97,52 @@ class StopWatch {
return false;
}

void end() {
/*
* @return: int. number of threads still holding
*/
int stop() {
std::lock_guard<std::mutex> lk(mx_);
if (thread_num_ > 0 && --thread_num_ == 0) {
end_ = steady_clock::now();
if (thread_set_.count(std::this_thread::get_id())) {
thread_set_.erase(std::this_thread::get_id());
// last man standing
if (thread_set_.size() == 0) {
end_ = steady_clock::now();
return 0;
}
// still running
return thread_set_.size();
}

return thread_set_.size();
}

template <typename Output>
template <class Output>
Output duration() const {
std::lock_guard<std::mutex> lk(mx_);
// the clock is still on
if (thread_num_ > 0) {
return duration_cast<Output>(steady_clock::now() - beg_);
if (thread_set_.count(std::this_thread::get_id())) {
return std::chrono::duration_cast<Output>(steady_clock::now() - beg_);
}

return duration_cast<Output>(end_ - beg_);
return std::chrono::duration_cast<Output>(end_ - beg_);
}

~StopWatch() { end(); }
// singleton destroyed since it's non-copy and movable,
// any dangling pointer or reference to this stop_watah is illegal
~StopWatch() { stop(); }

private:
uint64_t interval_;
my_time_t beg_;
my_time_t end_;
my_time_t mile_stone_;
mutable std::mutex mx_;
size_t thread_num_;

std::set<std::thread::id> threads_;
};

using StopWatchSec_t = StopWatch<std::chrono::seconds>;

} // namespace

/**
* \ingroup fileio
* Can be wrapped around any device implement to provide read caching. This
Expand Down Expand Up @@ -165,8 +185,11 @@ class read_caching_device {
m_file_size = m_contents->file_size();
m_filename_to_filesize_map[filename] = m_file_size;
// report downloading every 30s
m_filename_to_stop_watch[filename] =
std::unique_ptr<StopWatchSec_t>(new StopWatchSec_t(30));
if (m_filename_to_filesize_map.find(filename) !=
m_filename_to_filesize_map.end()) {
m_filename_to_stop_watch[filename] =
std::unique_ptr<StopWatchSec_t>(new StopWatchSec_t(30));
}
}
} else {
m_contents = std::make_shared<T>(filename, write);
Expand Down Expand Up @@ -194,12 +217,24 @@ class read_caching_device {
{
std::lock_guard<mutex> file_size_guard(m_filesize_cache_mutex);
m_filename_to_filesize_map.erase(m_filename);
m_filename_to_stop_watch.erase(m_filename);

auto iter = m_filename_to_stop_watch.find(m_filename);
if (iter != m_filename_to_stop_watch.end()) {
if (!iter->stop()) {
m_filename_to_stop_watch.erase(iter);
}
}
}

} else if (mode == std::ios_base::in && !m_writing) {
if (m_contents) m_contents->close(mode);
m_contents.reset();
{
std::lock_guard<mutex> file_size_guard(m_filesize_cache_mutex);
if (m_filename_to_stop_watch.find(m_filename) !=
m_filename_to_stop_watch.end())
m_filename_to_stop_watch[m_filename]->stop();
}
}
}

Expand Down Expand Up @@ -254,10 +289,11 @@ class read_caching_device {
std::lock_guard<mutex> file_size_guard(m_filesize_cache_mutex);
auto& stop_watch = m_filename_to_stop_watch[m_filename];
if (stop_watch->is_time_to_record()) {
logprogress_stream << "Finish uploading " << sanitize_url(m_filename)
<< ". Elapsed time "
<< stop_watch->duration<seconds>().count()
<< " seconds" << std::endl;
logprogress_stream
<< "Finish uploading " << sanitize_url(m_filename)
<< ". Elapsed time "
<< stop_watch->template duration<std::chrono::seconds>().count()
<< " seconds" << std::endl;
}
}

Expand Down Expand Up @@ -355,11 +391,10 @@ class read_caching_device {
std::lock_guard<mutex> file_size_guard(m_filesize_cache_mutex);
auto& stop_watch = m_filename_to_stop_watch[m_filename];
if (stop_watch->is_time_to_record()) {
logprogress_stream << "Finished fetching block " << block_number
<< ". Elapsed "
<< stop_watch->duration<seconds>().count()
<< "s for downloading " << sanitize_url(m_filename)
<< std::endl;
logprogress_stream
<< "Finished fetching block " << block_number << ". Elapsed "
<< stop_watch->template duration<std::chrono::seconds>().count()
<< "s for downloading " << sanitize_url(m_filename) << std::endl;
}
}

Expand Down
106 changes: 106 additions & 0 deletions test/fileio/stop_watch_test.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#define BOOST_TEST_MODULE
#include <boost/test/unit_test.hpp>
#include <chrono>
#include <core/storage/fileio/read_caching_device.hpp>
#include <core/util/test_macros.hpp>

using namespace turi;
using namespace std::chrono;
struct stop_watch_test {
using my_watch_t = StopWatch<milliseconds>;

public:
void test_end_without_start(void) {
my_watch_t watch = my_watch_t(1);
watch.end();
TS_ASSERT_EQUALS(watch.templace duration<milliseconds>(), milliseconds{0});
TS_ASSERT_THROWS_NOTHING(watch.end());
}

void test_double_start(void) {
my_watch_t watch = my_watch_t(1);
watch.start();
TS_ASSERT_THROWS_ANYTHING(watch.start());
}

void test_single_thread(void) {
my_watch_t watch = my_watch_t(1);
watch.start();
std::this_thread::sleep_for(milliseconds(2));
TS_ASSERT(watch.templace duration<milliseconds>() >= milliseconds(2));
watch.end();
TS_ASSERT(watch.templace duration<milliseconds>() >= milliseconds(2));
}

void test_multi_thread_1(void) {
my_watch_t watch = my_watch_t(1);
watch.start();
auto start = std::chrono::steady_clock::now();
std::thread t1([&watch]() {
watch.start();
watch.end();
});

std::thread t2([&watch]() {
std::this_thread::sleep_for(milliseconds(1));
watch.start();
std::this_thread::sleep_for(milliseconds(1));
watch.end();
});
std::this_thread::sleep_for(milliseconds(10));
t1.join();
t2.join();
auto end = std::chrono::steady_clock::now();
watch.end();

TS_ASSERT(watch.templace duration<milliseconds>() >=
std::chrono::duration_cast<milliseconds>(end - start));
}

void test_multi_thread_2(void) {
my_watch_t watch = my_watch_t(1);
watch.start();
auto start = std::chrono::steady_clock::now();
std::thread t1([&watch]() {
watch.start();
watch.end();
});

std::thread t2([&watch]() {
std::this_thread::sleep_for(milliseconds(1));
watch.start();
std::this_thread::sleep_for(milliseconds(15));
watch.end();
});
t1.join();
std::this_thread::sleep_for(milliseconds(1));
auto end = std::chrono::steady_clock::now();
watch.end();
TS_ASSERT(watch.templace duration<milliseconds>() >=
std::chrono::duration_cast<milliseconds>(end - start));
TS_ASSERT(watch.templace duration<milliseconds>() <= milliseconds > (15));
t2.join();

TS_ASSERT(watch.templace duration<milliseconds>() >= milliseconds > (15));
watch.end();
TS_ASSERT(watch.templace duration<milliseconds>() >= milliseconds > (15));
}
};

BOOST_FIXTURE_TEST_SUITE(_stop_watch_test, stop_watch_test)
BOOST_AUTO_TEST_CASE(test_end_without_start) {
stop_watch_test::test_end_without_start();
}
BOOST_AUTO_TEST_CASE(test_double_start) {
stop_watch_test::test_double_start();
}
BOOST_AUTO_TEST_CASE(test_single_thread) {
stop_watch_test::test_single_thread();
}
BOOST_AUTO_TEST_CASE(test_multi_thread_1) {
stop_watch_test::test_multi_thread_1();
}
BOOST_AUTO_TEST_CASE(test_multi_thread_2) {
stop_watch_test::test_multi_thread_2();
}
BOOST_AUTO_TEST_SUITE_END()

0 comments on commit fb7394b

Please sign in to comment.