Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Output single Parquet file per locale #39

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 41 additions & 24 deletions src/centrality/SafegraphCBGTiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
#include <blaze/math/DynamicVector.h>
#include <hpx/parallel/algorithms/transform_reduce.hpp>
#include <hpx/parallel/algorithms/for_each.hpp>
#include <components/TileWriter.hpp>
#include <components/VisitMatrixWriter.hpp>
#include <components/TileWriter2.hpp>
#include <components/VisitMatrixWriter2.hpp>
#include <shared/DateUtils.hpp>
#include <shared/ConversionUtils.hpp>
#include <spdlog/fmt/fmt.h>
Expand Down Expand Up @@ -131,28 +131,35 @@ void SafegraphCBGTiler::write_parquet(const mt::ctx::ReduceContext<v2, mt::coord

// Figure out my temporal start date
const date::sys_days start_date = date::sys_days{} + date::days(_tc._time_offset);
const date::sys_days end_date = start_date + date::days(_tc._time_count);
const auto date_range = boost::irange(0, static_cast<int>(_tc._time_count));

par::for_each(par::execution::par_unseq, date_range.begin(), date_range.end(), [&](const std::size_t i) {
// Some nice pretty-printing of the dates
const date::sys_days matrix_date = start_date + date::days{i};
const auto parquet_filename = fmt::format("{}-{}-{}-{}.parquet",
*output_name,
*(_oc->from_global_offset(_tc._tile_min)),
*(_oc->from_global_offset(_tc._tile_max)),
date::format("%F", matrix_date));
const auto p_file = fs::path(*output_dir) /= fs::path(parquet_filename);

const auto visit_filename = fmt::format("{}-visits-{}-{}-{}.parquet",
*output_name,
*(_oc->from_global_offset(_tc._tile_min)),
*(_oc->from_global_offset(_tc._tile_max)),
date::format("%F", matrix_date));
const auto parquet_filename = fmt::format("{}-{}-{}-{}-{}.parquet",
*output_name,
*(_oc->from_global_offset(_tc._tile_min)),
*(_oc->from_global_offset(_tc._tile_max)),
date::format("%F", start_date),
date::format("%F", end_date));
const auto p_file = fs::path(*output_dir) /= fs::path(parquet_filename);

components::TileWriter2 tw(std::string(p_file.string()), *_oc);

const auto visit_filename = fmt::format("{}-visits-{}-{}-{}-{}.parquet",
*output_name,
*(_oc->from_global_offset(_tc._tile_min)),
*(_oc->from_global_offset(_tc._tile_max)),
date::format("%F", start_date),
date::format("%F", end_date));

const auto v_file = fs::path(*output_dir) /= fs::path(visit_filename);


const auto v_file = fs::path(*output_dir) /= fs::path(visit_filename);
components::VisitMatrixWriter2 vw(std::string(v_file.string()), *_oc);

components::TileWriter tw(std::string(p_file.string()), *_oc);
components::VisitMatrixWriter vw(std::string(v_file.string()), *_oc);
par::for_each(par::execution::par_unseq, date_range.begin(), date_range.end(), [&](const std::size_t i) {
// Some nice pretty-printing of the dates
const date::sys_days matrix_date = start_date + date::days{i};

const auto multiply_start = hpx::util::high_resolution_clock::now();
const auto matrix_pair = _tm->get_matrix_pair(i);
Expand All @@ -169,9 +176,6 @@ void SafegraphCBGTiler::write_parquet(const mt::ctx::ReduceContext<v2, mt::coord
spdlog::info("Performing multiplication for {}", date::format("%F", matrix_date));
const auto multiply_elapsed = hpx::util::high_resolution_clock::now() - multiply_start;
print_timing("Multiply", multiply_elapsed);

spdlog::info("Beginning tile write");
const auto write_start = hpx::util::high_resolution_clock::now();
arrow::Status status = tw.writeResults(matrix_date, cbg_risk_score, {}, visit_sum);
if (!status.ok()) {
spdlog::critical("Could not write parquet file: {}", status.CodeAsString());
Expand All @@ -180,9 +184,22 @@ void SafegraphCBGTiler::write_parquet(const mt::ctx::ReduceContext<v2, mt::coord
if (!status.ok()) {
spdlog::critical("Could not write parquet file: {}", status.CodeAsString());
}
const auto write_elapsed = hpx::util::high_resolution_clock::now() - write_start;
print_timing("File Write", write_elapsed);

});
const auto write_start = hpx::util::high_resolution_clock::now();
spdlog::info("Beginning tile write");
const auto write_elapsed = hpx::util::high_resolution_clock::now() - write_start;
print_timing("File Write", write_elapsed);
// Write the files
arrow::Status status;
status = tw.writeToDisk();
if (!status.ok()) {
spdlog::error("Error writing tiles");
}
status = vw.writeToDisk();
if (!status.ok()) {
spdlog::error("Error visit patterns");
}

}

Expand Down
2 changes: 1 addition & 1 deletion src/components/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ add_library(mcpp-components
src/CBGShapefileWrapper.cpp include/components/CBGShapefileWrapper.hpp
src/TemporalMatricies.cpp include/components/TemporalMatricies.hpp
src/TileWriter.cpp include/components/TileWriter.hpp src/RowProcessor.cpp include/components/RowProcessor.hpp src/detail/CBGOffsetCalculator.cpp include/components/detail/CBGOffsetCalculator.hpp
src/VisitMatrixWriter.cpp include/components/VisitMatrixWriter.hpp TemporalGraphs.cpp include/components/TemporalGraphs.hpp include/components/BaseSafegraphMapper.hpp src/BaseSafegraphMapper.cpp include/components/OffsetCalculator.hpp src/detail/CountyOffsetCalculator.cpp include/components/detail/CountyOffsetCalculator.hpp include/components/detail/offset_shared.hpp src/server/CountyShapefileServer.cpp include/components/server/CountyShapefileServer.hpp src/CountyShapefileWrapper.cpp include/components/CountyShapefileWrapper.hpp src/detail/helpers.cpp include/components/detail/helpers.hpp)
src/VisitMatrixWriter.cpp include/components/VisitMatrixWriter.hpp TemporalGraphs.cpp include/components/TemporalGraphs.hpp include/components/BaseSafegraphMapper.hpp src/BaseSafegraphMapper.cpp include/components/OffsetCalculator.hpp src/detail/CountyOffsetCalculator.cpp include/components/detail/CountyOffsetCalculator.hpp include/components/detail/offset_shared.hpp src/server/CountyShapefileServer.cpp include/components/server/CountyShapefileServer.hpp src/CountyShapefileWrapper.cpp include/components/CountyShapefileWrapper.hpp src/detail/helpers.cpp include/components/detail/helpers.hpp src/TileWriter2.cpp include/components/TileWriter2.hpp src/VisitMatrixWriter2.cpp include/components/VisitMatrixWriter2.hpp)

add_library(MCPP::components ALIAS mcpp-components)

Expand Down
38 changes: 38 additions & 0 deletions src/components/include/components/TileWriter2.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//
// Created by Nick Robison on 11/19/20.
//

#pragma once

#include "components/detail/CBGOffsetCalculator.hpp"
#include <absl/synchronization/mutex.h>
#include <io/parquet.hpp>
#include <blaze/math/CompressedVector.h>
#include <date/date.h>
#include <string>
#include <mutex>

namespace components {

class TileWriter2 {
public:
TileWriter2(const std::string &filename, detail::CBGOffsetCalculator oc);

arrow::Status writeResults(const date::sys_days &result_date,
const blaze::CompressedVector<double, blaze::rowVector> &results,
const blaze::CompressedVector<double, blaze::rowVector> &norm_results,
const blaze::CompressedVector<std::uint32_t, blaze::rowVector> &visits);

arrow::Status writeToDisk();

private:
const io::Parquet _p;
const detail::CBGOffsetCalculator _offset_calculator;
arrow::StringBuilder _cbg_builder;
arrow::Date32Builder _date_builder;
arrow::DoubleBuilder _risk_builder;
arrow::DoubleBuilder _normalize_risk_builder;
arrow::UInt32Builder _visit_builder;
std::mutex _mtx;
};
}
2 changes: 1 addition & 1 deletion src/components/include/components/VisitMatrixWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
#ifndef MOBILITY_CPP_VISITMATRIXWRITER_HPP
#define MOBILITY_CPP_VISITMATRIXWRITER_HPP

#include "components/detail/CBGOffsetCalculator.hpp"
#include "components/TemporalMatricies.hpp"
#include <io/parquet.hpp>
#include <date/date.h>
#include "components/detail/CBGOffsetCalculator.hpp"

namespace components {
class VisitMatrixWriter {
Expand Down
30 changes: 30 additions & 0 deletions src/components/include/components/VisitMatrixWriter2.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//
// Created by Nick Robison on 11/19/20.
//

#pragma once

#include "components/detail/CBGOffsetCalculator.hpp"
#include "components/TemporalMatricies.hpp"
#include <io/parquet.hpp>
#include <date/date.h>
#include <mutex>

namespace components {
class VisitMatrixWriter2 {
public:
VisitMatrixWriter2(const std::string &filename, detail::CBGOffsetCalculator oc);

arrow::Status writeResults(const date::sys_days &result_date, const visit_matrix &matrix);
arrow::Status writeToDisk();

private:
const io::Parquet _p;
const detail::CBGOffsetCalculator _offset_calculator;
arrow::StringBuilder _poi_cbg_builder;
arrow::StringBuilder _visitor_cbg_builder;
arrow::Date32Builder _date_builder;
arrow::UInt32Builder _visit_builder;
std::mutex _mtx;
};
}
66 changes: 66 additions & 0 deletions src/components/src/TileWriter2.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
//
// Created by Nick Robison on 11/19/20.
//

#include "components/TileWriter2.hpp"
#include "spdlog/spdlog.h"

namespace components {

TileWriter2::TileWriter2(const std::string &filename, detail::CBGOffsetCalculator oc) : _p(io::Parquet(filename)),
_offset_calculator(std::move(oc)) {

}

arrow::Status TileWriter2::writeResults(const date::sys_days &result_date,
const blaze::CompressedVector<double, blaze::rowVector> &results,
const blaze::CompressedVector<double, blaze::rowVector> &norm_results,
const blaze::CompressedVector<std::uint32_t, blaze::rowVector> &visits) {
const std::lock_guard<std::mutex> lock(_mtx);
arrow::Status status;

for (size_t i = 0; i < results.size(); i++) {
// Reverse lookup the index with the matching CBG
const auto cbg = _offset_calculator.from_local_offset(i);
if (cbg.has_value()) {
// Write it out
status = _cbg_builder.Append(*cbg);
status = _date_builder.Append(result_date.time_since_epoch().count());
status = _risk_builder.Append(results[i]);
status = _normalize_risk_builder.Append(norm_results[i]);
status = _visit_builder.Append(visits[i]);
} else {
spdlog::error("Cannot process index: `{}`", i);
}
}

return status;
}

arrow::Status TileWriter2::writeToDisk() {
arrow::Status status;
// Convert to table pass to parquet writer
std::shared_ptr<arrow::Array> cbg_array;
status = _cbg_builder.Finish(&cbg_array);
std::shared_ptr<arrow::Array> date_array;
status = _date_builder.Finish(&date_array);
std::shared_ptr<arrow::Array> risk_array;
status = _risk_builder.Finish(&risk_array);
std::shared_ptr<arrow::Array> norm_risk_array;
status = _normalize_risk_builder.Finish(&norm_risk_array);
std::shared_ptr<arrow::Array> visit_array;
status = _visit_builder.Finish(&visit_array);

auto schema = arrow::schema(
{arrow::field("cbg", arrow::utf8()),
arrow::field("date", arrow::date32()),
arrow::field("risk", arrow::float64()),
arrow::field("norm_risk", arrow::float64()),
arrow::field("visits", arrow::uint32()),
});

auto table = arrow::Table::Make(schema, {cbg_array, date_array, risk_array, norm_risk_array, visit_array});
return _p.write(*table);
}

}
66 changes: 66 additions & 0 deletions src/components/src/VisitMatrixWriter2.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
//
// Created by Nick Robison on 11/19/20.
//

#include "components/VisitMatrixWriter2.hpp"
#include <spdlog/spdlog.h>

namespace components {


VisitMatrixWriter2::VisitMatrixWriter2(const std::string &filename, detail::CBGOffsetCalculator oc) : _p(
io::Parquet(filename)), _offset_calculator(std::move(oc)) {
// Not used
}

arrow::Status VisitMatrixWriter2::writeResults(const date::sys_days &result_date, const visit_matrix &matrix) {
const std::lock_guard<std::mutex> lock(_mtx);
arrow::Status status;

for (size_t i = 0; i < matrix.columns(); i++) {
const auto poi_cbg = _offset_calculator.from_local_offset(i);
if (!poi_cbg.has_value()) {
spdlog::error("Cannot process source cbg: `{}`", i);
continue;
}
for (visit_matrix::ConstIterator it = matrix.cbegin(i); it != matrix.cend(i); ++it) {
const auto id = it->index();
const auto visitor_cbg = _offset_calculator.from_global_offset(id);
if (!visitor_cbg.has_value()) {
spdlog::error("Cannot process dest cbg: `{}`", id);
continue;
}
status = _poi_cbg_builder.Append(*poi_cbg);
status = _visitor_cbg_builder.Append(*visitor_cbg);
status = _date_builder.Append(result_date.time_since_epoch().count());
status = _visit_builder.Append(it->value());
}
}

return status;
}

arrow::Status VisitMatrixWriter2::writeToDisk() {
arrow::Status status;

// Convert to table pass to parquet writer
std::shared_ptr<arrow::Array> poi_cbg_array;
status = _poi_cbg_builder.Finish(&poi_cbg_array);
std::shared_ptr<arrow::Array> visitor_cbg_array;
status = _visitor_cbg_builder.Finish(&visitor_cbg_array);
std::shared_ptr<arrow::Array> date_array;
status = _date_builder.Finish(&date_array);
std::shared_ptr<arrow::Array> visit_array;
status = _visit_builder.Finish(&visit_array);

auto schema = arrow::schema(
{arrow::field("poi_cbg", arrow::utf8()),
arrow::field("visitor_cbg", arrow::utf8()),
arrow::field("date", arrow::date32()),
arrow::field("visits", arrow::uint32()),
});

auto table = arrow::Table::Make(schema, {poi_cbg_array, visitor_cbg_array, date_array, visit_array});
return _p.write(*table);
}
}