Skip to content
This repository has been archived by the owner on Dec 21, 2023. It is now read-only.

Commit

Permalink
add stop watch
Browse files Browse the repository at this point in the history
  • Loading branch information
Guihao Liang committed May 8, 2020
1 parent ed85e8d commit 9ce8f2c
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 68 deletions.
240 changes: 194 additions & 46 deletions src/core/storage/fileio/read_caching_device.hpp
Original file line number Diff line number Diff line change
@@ -1,23 +1,127 @@
/* Copyright © 2017 Apple Inc. All rights reserved.
*
* Use of this source code is governed by a BSD-3-clause license that can
* be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
* be found in the LICENSE.txt file or at
* https://opensource.org/licenses/BSD-3-Clause
*/
#ifndef TURI_FILEIO_CACHING_DEVICE_HPP
#define TURI_FILEIO_CACHING_DEVICE_HPP
#include <core/logging/logger.hpp>
#include <core/parallel/mutex.hpp>
#include <core/storage/fileio/block_cache.hpp>
#include <core/storage/fileio/sanitize_url.hpp>
#include <core/parallel/mutex.hpp>
#include <core/util/basic_types.hpp>
#include <mutex>
#include <map>
#include <mutex>
#include <thread>
namespace turi {

// private namespace
namespace {
const size_t READ_CACHING_BLOCK_SIZE = 64*1024*1024; // 64 MB
} // end private namespace
const size_t READ_CACHING_BLOCK_SIZE = 64 * 1024 * 1024; // 64 MB

using namespace std::chrono;

/*
* For each parallel (streamlined) network IO, we can view them as intervals
* in parallel universe.
*
* What we do is to merge all universe into one: the very start and the
* very end of the entire IO, which is the elapsed time, using the same time
* axis.
* t1 |-----|
* t2 |----|
* t3 |-----|
* s ------------ e
*
* Since SFrame can be streamed, meaning it does not download all of the files
* at once. It may have multiple times of fetching data. So it consists many
* small parts of IO activities.
*
* |----|<- less than 100 ms -> |----|
* s ------------------------------- e
*
* if 2 IO acticity are only separated by less than 50 ms time intervals,
* they should be considered as the same part of an IO process, instead of 2.
*
* |-----| <- 10 mins -> |-------|
* s --- e s ----- e
*
* Since 2 adjacent IO activities are too distant, we should view it as 2
* separate activities. For example, user may play with first 1000 rows, and
* later the user jumps to the tail of the frame to mess around with the data.
*
*/
template <class T>
class StopWatch {
public:
using my_time_t = decltype(steady_clock::now());
StopWatch(size_t interval)
: interval_(interval), beg_(steady_clock::now()), end_(beg_) {}

StopWatch(const StopWatch&) = delete;
StopWatch(StopWatch&& rhs) = delete;

void start() {
std::lock_guard<std::mutex> lk(mx_);
if (thread_num_ == 0) {
if (duration_cast<milliseconds>(steady_clock::now() - beg_) >
milliseconds{150}) {
beg_ = steady_clock::now();
mile_stone_ = beg_;
}
}
++thread_num_;
}

bool is_time_to_record() {
std::lock_guard<std::mutex> lock(mx_);

if (thread_num_ == 0) return false;

// reach to timepoint, log it.
auto cur = steady_clock::now();
if (cur > mile_stone_) {
mile_stone_ = cur + T(interval_);
return true;
}

// no need to record
return false;
}

void end() {
std::lock_guard<std::mutex> lk(mx_);
if (thread_num_ > 0 && --thread_num_ == 0) {
end_ = steady_clock::now();
}
}

template <typename Output>
Output duration() const {
std::lock_guard<std::mutex> lk(mx_);
// the clock is still on
if (thread_num_ > 0) {
return duration_cast<Output>(steady_clock::now() - beg_);
}

return duration_cast<Output>(end_ - beg_);
}

~StopWatch() { end(); }

private:
uint64_t interval_;
my_time_t beg_;
my_time_t end_;
my_time_t mile_stone_;
mutable std::mutex mx_;
size_t thread_num_;
};

using StopWatchSec_t = StopWatch<std::chrono::seconds>;

} // namespace

/**
* \ingroup fileio
Expand All @@ -33,18 +137,18 @@ const size_t READ_CACHING_BLOCK_SIZE = 64*1024*1024; // 64 MB
*
* After:
* \code
* typedef boost::iostreams::stream<read_caching_device<s3_device> > s3_fstream;
* \endcode
* typedef boost::iostreams::stream<read_caching_device<s3_device> >
* s3_fstream; \endcode
*
* It uses the \ref block_cache to pro
*/
template <typename T>
class read_caching_device {
public: // boost iostream concepts
public: // boost iostream concepts
typedef typename T::char_type char_type;
typedef typename T::category category;

read_caching_device() { }
read_caching_device() {}

read_caching_device(const std::string& filename, const bool write = false) {
logstream(LOG_DEBUG) << "read_cachine_device: " << filename << std::endl;
Expand All @@ -60,6 +164,9 @@ class read_caching_device {
m_contents = std::make_shared<T>(filename, write);
m_file_size = m_contents->file_size();
m_filename_to_filesize_map[filename] = m_file_size;
// report downloading every 30s
m_filename_to_stop_watch[filename] =
std::unique_ptr<StopWatchSec_t>(new StopWatchSec_t(30));
}
} else {
m_contents = std::make_shared<T>(filename, write);
Expand All @@ -78,15 +185,16 @@ class read_caching_device {
// evict all blocks for this key
auto& bc = block_cache::get_instance();
size_t block_number = 0;
while(1) {
while (1) {
std::string key = get_key_name(block_number);
if (bc.evict_key(key) == false) break;
++ block_number;
++block_number;
}
// evict the file size cache
{
std::lock_guard<mutex> file_size_guard(m_filesize_cache_mutex);
m_filename_to_filesize_map.erase(m_filename);
m_filename_to_stop_watch.erase(m_filename);
}

} else if (mode == std::ios_base::in && !m_writing) {
Expand All @@ -111,18 +219,18 @@ class read_caching_device {

std::streamsize ret = 0;

while(n > 0) {
while (n > 0) {
// the block number containing the offset.
auto block_number = m_file_pos / READ_CACHING_BLOCK_SIZE;
// the offset inside the block
auto block_offset = m_file_pos % READ_CACHING_BLOCK_SIZE;
// number of bytes I can read inside this block before I hit the next block
size_t n_bytes = (block_number + 1) * READ_CACHING_BLOCK_SIZE - m_file_pos;
// number of bytes I can read inside this block before I hit the next
// block
size_t n_bytes =
(block_number + 1) * READ_CACHING_BLOCK_SIZE - m_file_pos;
n_bytes = std::min<size_t>(n_bytes, n);
bool success = fetch_block(strm_ptr + ret,
block_number,
block_offset,
n_bytes);
bool success =
fetch_block(strm_ptr + ret, block_number, block_offset, n_bytes);
if (success == false) {
log_and_throw(std::string("Unable to read ") + m_filename);
}
Expand All @@ -135,18 +243,33 @@ class read_caching_device {
}

std::streamsize write(const char* strm_ptr, std::streamsize n) {
return get_contents()->write(strm_ptr, n);
}
{
std::lock_guard<mutex> file_size_guard(m_filesize_cache_mutex);
m_filename_to_stop_watch[m_filename]->start();
}

auto bytes_written = get_contents()->write(strm_ptr, n);

{
std::lock_guard<mutex> file_size_guard(m_filesize_cache_mutex);
auto& stop_watch = m_filename_to_stop_watch[m_filename];
if (stop_watch->is_time_to_record()) {
logprogress_stream << "Finish uploading " << sanitize_url(m_filename)
<< ". Elapsed time "
<< stop_watch->duration<seconds>().count()
<< " seconds" << std::endl;
}
}

bool good() const {
return get_contents()->good();
return bytes_written;
}

bool good() const { return get_contents()->good(); }

/**
* Seeks to a different location.
*/
std::streampos seek(std::streamoff off,
std::ios_base::seekdir way,
std::streampos seek(std::streamoff off, std::ios_base::seekdir way,
std::ios_base::openmode openmode) {
if (openmode == std::ios_base::in) {
if (way == std::ios_base::beg) {
Expand All @@ -155,7 +278,8 @@ class read_caching_device {
m_file_pos = std::min<std::streamoff>(m_file_pos + off, m_file_size);
m_file_pos = std::max<std::streamoff>(m_file_pos, 0);
} else if (way == std::ios_base::end) {
m_file_pos = std::min<std::streamoff>(m_file_size + off - 1, m_file_size);
m_file_pos =
std::min<std::streamoff>(m_file_size + off - 1, m_file_size);
m_file_pos = std::max<std::streamoff>(m_file_pos, 0);
}
return m_file_pos;
Expand All @@ -169,14 +293,10 @@ class read_caching_device {
* Returns (size_t)(-1) if there is no file opened, or if there is an
* error obtaining the file size.
*/
size_t file_size() const {
return m_file_size;
}
size_t file_size() const { return m_file_size; }

/// Not supported
std::shared_ptr<std::istream> get_underlying_stream() {
return nullptr;
}
std::shared_ptr<std::istream> get_underlying_stream() { return nullptr; }

private:
std::string m_filename;
Expand All @@ -187,6 +307,8 @@ class read_caching_device {

static mutex m_filesize_cache_mutex;
static std::map<std::string, size_t> m_filename_to_filesize_map;
static std::map<std::string, std::unique_ptr<StopWatchSec_t>>
m_filename_to_stop_watch;

std::shared_ptr<T>& get_contents() {
if (!m_contents) {
Expand All @@ -196,35 +318,56 @@ class read_caching_device {
}
std::string get_key_name(size_t block_number) {
// we generate a key name that will never appear in any filename
return m_filename + "////:" + std::to_string(block_number);
return m_filename + "////:" + std::to_string(block_number);
}
/**
* Fetches the contents of a block.
* Returns true on success and false on failure.
*/
bool fetch_block(char* output,
size_t block_number,
size_t startpos,
bool fetch_block(char* output, size_t block_number, size_t startpos,
size_t length) {
auto& bc = block_cache::get_instance();
std::string key = get_key_name(block_number);
int64_t ret = bc.read(key, output, startpos, startpos + length);
if (static_cast<size_t>(ret) == length) return true;

logstream(LOG_INFO) << "Fetching " << sanitize_url(m_filename) << " Block " << block_number << std::endl;
logstream(LOG_INFO) << "Fetching " << sanitize_url(m_filename) << " Block "
<< block_number << std::endl;
// ok. failure... no such block or block is bad. We read it ourselves.
// read the whole block
auto block_start = block_number * READ_CACHING_BLOCK_SIZE;
auto block_end = std::min(block_start + READ_CACHING_BLOCK_SIZE, m_file_size);
auto block_end =
std::min(block_start + READ_CACHING_BLOCK_SIZE, m_file_size);
// seek to the block and read the whole block at once
auto& contents = get_contents();
contents->seek(block_start, std::ios_base::beg, std::ios_base::in);
std::string block_contents(block_end - block_start, 0);
auto bytes_read = contents->read(&(block_contents[0]),
block_end - block_start);

{
std::lock_guard<mutex> file_size_guard(m_filesize_cache_mutex);
m_filename_to_stop_watch[m_filename]->start();
}

auto bytes_read =
contents->read(&(block_contents[0]), block_end - block_start);

{
std::lock_guard<mutex> file_size_guard(m_filesize_cache_mutex);
auto& stop_watch = m_filename_to_stop_watch[m_filename];
if (stop_watch->is_time_to_record()) {
logprogress_stream << "Finished fetching block " << block_number
<< ". Elapsed "
<< stop_watch->duration<seconds>().count()
<< "s for downloading " << sanitize_url(m_filename)
<< std::endl;
}
}

// read failed.
static_assert(std::is_signed<decltype(bytes_read)>::value, "decltype(bytes_read) signed");
static_assert(std::is_integral<decltype(bytes_read)>::value, "decltype(bytes_read) integral");
static_assert(std::is_signed<decltype(bytes_read)>::value,
"decltype(bytes_read) signed");
static_assert(std::is_integral<decltype(bytes_read)>::value,
"decltype(bytes_read) integral");
if (bytes_read < truncate_check<int64_t>(block_end - block_start)) {
return false;
}
Expand All @@ -241,12 +384,17 @@ class read_caching_device {
return true;
}

}; // end of read_caching_device
}; // end of read_caching_device

template<typename T>
template <typename T>
mutex read_caching_device<T>::m_filesize_cache_mutex;

template<typename T>
std::map<std::string, size_t> read_caching_device<T>::m_filename_to_filesize_map;
} // namespace turi
template <typename T>
std::map<std::string, size_t>
read_caching_device<T>::m_filename_to_filesize_map;

template <typename T>
std::map<std::string, std::unique_ptr<StopWatchSec_t>>
read_caching_device<T>::m_filename_to_stop_watch;
} // namespace turi
#endif
Loading

0 comments on commit 9ce8f2c

Please sign in to comment.