Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add buffered reader #4018

Merged
merged 11 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Examples/Framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ add_library(
src/Framework/RandomNumbers.cpp
src/Framework/Sequencer.cpp
src/Framework/DataHandle.cpp
src/Framework/BufferedReader.cpp
src/Utilities/EventDataTransforms.cpp
src/Utilities/Paths.cpp
src/Utilities/Options.cpp
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// This file is part of the ACTS project.
//
// Copyright (C) 2016 CERN for the benefit of the ACTS project
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

#pragma once

#include "Acts/Utilities/Logger.hpp"
#include "ActsExamples/Framework/AlgorithmContext.hpp"
#include "ActsExamples/Framework/IReader.hpp"
#include "ActsExamples/Framework/ProcessCode.hpp"

#include <utility>

namespace ActsExamples {

class WhiteBoard;

/// Event data reader that takes a concrete reader instance, reads a number of
/// events in a buffer, and selects events from that buffer instead of directly
/// reading them from disk.
/// The purpose is to avoid IO bottlenecks in timing measurements
class BufferedReader final : public IReader {
public:
struct Config {
/// The upstream reader that should be used
std::shared_ptr<IReader> upstreamReader;

/// The seed for sampling events from the buffer
std::size_t selectionSeed = 123456;

/// Buffer size. The reader will throw and exception if the downstream
/// reader does not provide enough events
std::size_t bufferSize = 1;
};

/// Constructed the reader
BufferedReader(const Config& config, Acts::Logging::Level level);

/// Return the config
const Config& config() const { return m_cfg; }

/// Give the reader a understandable name
std::string name() const override {
return "Buffered" + m_cfg.upstreamReader->name();
}

/// The buffered reader provides the maximum available event range
std::pair<std::size_t, std::size_t> availableEvents() const override {
return {0, std::numeric_limits<std::size_t>::max()};
}
benjaminhuth marked this conversation as resolved.
Show resolved Hide resolved

/// Return a event from the buffer
ProcessCode read(const AlgorithmContext& ctx) override;

/// Fulfill the algorithm interface
ProcessCode initialize() override { return ProcessCode::SUCCESS; }

/// Fulfill the algorithm interface
ProcessCode finalize() override { return ProcessCode::SUCCESS; }

private:
Config m_cfg;
std::unique_ptr<const Acts::Logger> m_logger;
std::vector<std::unique_ptr<ActsExamples::WhiteBoard>> m_buffer;

const Acts::Logger& logger() const { return *m_logger; }
};

} // namespace ActsExamples
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class SequenceElement {
template <typename T>
friend class ReadDataHandle;

friend class BufferedReader;
paulgessinger marked this conversation as resolved.
Show resolved Hide resolved

std::vector<const DataHandleBase*> m_writeHandles;
std::vector<const DataHandleBase*> m_readHandles;
};
Expand Down
69 changes: 34 additions & 35 deletions Examples/Framework/include/ActsExamples/Framework/WhiteBoard.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include <stdexcept>
#include <string>
#include <string_view>
#include <type_traits>
#include <typeinfo>
#include <unordered_map>
#include <utility>
Expand All @@ -38,29 +37,22 @@ class WhiteBoard {
Acts::getDefaultLogger("WhiteBoard", Acts::Logging::INFO),
std::unordered_map<std::string, std::string> objectAliases = {});

// A WhiteBoard holds unique elements and can not be copied
WhiteBoard(const WhiteBoard& other) = delete;
WhiteBoard& operator=(const WhiteBoard&) = delete;

WhiteBoard(WhiteBoard&& other) = default;
WhiteBoard& operator=(WhiteBoard&& other) = default;
benjaminhuth marked this conversation as resolved.
Show resolved Hide resolved

bool exists(const std::string& name) const;

private:
/// Store an object on the white board and transfer ownership.
///
/// @param name Non-empty identifier to store it under
/// @param object Movable reference to the transferable object
/// @throws std::invalid_argument on empty or duplicate name
template <typename T>
void add(const std::string& name, T&& object);

/// Get access to a stored object.
///
/// @param[in] name Identifier for the object
/// @return reference to the stored object
/// @throws std::out_of_range if no object is stored under the requested name
template <typename T>
const T& get(const std::string& name) const;
/// Copies key from another whiteboard to this whiteboard.
/// This is a low overhead operation, since the data holders are
/// shared pointers.
/// Throws an exception if this whiteboard already contains one of
/// the keys in the other whiteboard.
void copyFrom(const WhiteBoard& other);

private:
private:
/// Find similar names for suggestions with levenshtein-distance
std::vector<std::string_view> similarNames(const std::string_view& name,
Expand All @@ -80,6 +72,30 @@ class WhiteBoard {
const std::type_info& type() const override { return typeid(T); }
};

/// Store an holder on the white board and transfer ownership.
benjaminhuth marked this conversation as resolved.
Show resolved Hide resolved
///
/// @param name Non-empty identifier to store it under
/// @param holder The holder to store
/// @throws std::invalid_argument on empty or duplicate name
void addHolder(const std::string& name, std::shared_ptr<IHolder> holder);

/// Store an object on the white board and transfer ownership.
///
/// @param name Non-empty identifier to store it under
/// @param object Movable reference to the transferable object
template <typename T>
void add(const std::string& name, T&& object) {
addHolder(name, std::make_shared<HolderT<T>>(std::forward<T>(object)));
}

/// Get access to a stored object.
///
/// @param[in] name Identifier for the object
/// @return reference to the stored object
/// @throws std::out_of_range if no object is stored under the requested name
template <typename T>
const T& get(const std::string& name) const;

std::unique_ptr<const Acts::Logger> m_logger;
std::unordered_map<std::string, std::shared_ptr<IHolder>> m_store;
std::unordered_map<std::string, std::string> m_objectAliases;
Expand All @@ -103,23 +119,6 @@ inline ActsExamples::WhiteBoard::WhiteBoard(
std::unordered_map<std::string, std::string> objectAliases)
: m_logger(std::move(logger)), m_objectAliases(std::move(objectAliases)) {}

template <typename T>
inline void ActsExamples::WhiteBoard::add(const std::string& name, T&& object) {
if (name.empty()) {
throw std::invalid_argument("Object can not have an empty name");
}
if (m_store.contains(name)) {
throw std::invalid_argument("Object '" + name + "' already exists");
}
auto holder = std::make_shared<HolderT<T>>(std::forward<T>(object));
m_store.emplace(name, holder);
ACTS_VERBOSE("Added object '" << name << "' of type " << typeid(T).name());
if (auto it = m_objectAliases.find(name); it != m_objectAliases.end()) {
m_store[it->second] = holder;
ACTS_VERBOSE("Added alias object '" << it->second << "'");
}
}

template <typename T>
inline const T& ActsExamples::WhiteBoard::get(const std::string& name) const {
ACTS_VERBOSE("Attempt to get object '" << name << "' of type "
Expand Down
75 changes: 75 additions & 0 deletions Examples/Framework/src/Framework/BufferedReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// This file is part of the ACTS project.
//
// Copyright (C) 2016 CERN for the benefit of the ACTS project
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

#include "ActsExamples/Framework/BufferedReader.hpp"

#include "Acts/Utilities/Logger.hpp"
#include "ActsExamples/Framework/AlgorithmContext.hpp"
#include "ActsExamples/Framework/WhiteBoard.hpp"

#include <random>
#include <utility>

namespace ActsExamples {

BufferedReader::BufferedReader(const Config &config, Acts::Logging::Level level)
: m_cfg(config), m_logger(Acts::getDefaultLogger(name(), level)) {
if (!m_cfg.upstreamReader) {
throw std::invalid_argument("No upstream reader provided!");
}

// Register write and read handles of the downstream reader
benjaminhuth marked this conversation as resolved.
Show resolved Hide resolved
for (auto rh : m_cfg.upstreamReader->readHandles()) {
registerReadHandle(*rh);
}

for (auto wh : m_cfg.upstreamReader->writeHandles()) {
registerWriteHandle(*wh);
}

// Read the events
auto [ebegin, eend] = m_cfg.upstreamReader->availableEvents();
if (eend - ebegin < m_cfg.bufferSize) {
throw std::runtime_error("Reader does not provide enough events");
}

ACTS_INFO("Start reading events into buffer...");

m_buffer.reserve(eend - ebegin);
for (auto i = ebegin; i < ebegin + m_cfg.bufferSize; ++i) {
auto board = std::make_unique<ActsExamples::WhiteBoard>(m_logger->clone());
ActsExamples::AlgorithmContext ctx(0, i, *board);

ACTS_DEBUG("Read event " << i << " into buffer");
m_cfg.upstreamReader->read(ctx);
m_buffer.emplace_back(std::move(board));
}

ACTS_INFO("Filled " << m_buffer.size() << " events into the buffer");
}

ProcessCode BufferedReader::read(const AlgorithmContext &ctx) {
// Set up a random event selection that is consistent if multiple
// BufferedReader are used within a workflow The linear congruential engine is
// chosen since it is cheap to instantiate. For each eventNumber, it is put in
// a reproducible state.
std::minstd_rand rng(m_cfg.selectionSeed);
rng.discard(ctx.eventNumber);
benjaminhuth marked this conversation as resolved.
Show resolved Hide resolved

/// Sample from the buffer and transfer the content
std::uniform_int_distribution<std::size_t> dist(0, m_cfg.bufferSize - 1);

const auto entry = dist(rng);
ctx.eventStore.copyFrom(*m_buffer.at(entry));

ACTS_DEBUG("Use buffer entry " << entry << " for event " << ctx.eventNumber);

return ProcessCode::SUCCESS;
}

} // namespace ActsExamples
38 changes: 38 additions & 0 deletions Examples/Framework/src/Framework/WhiteBoard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ std::vector<std::string_view> ActsExamples::WhiteBoard::similarNames(
names.push_back({d, n});
}
}
for (const auto &[from, to] : m_objectAliases) {
if (const auto d = levenshteinDistance(from, name); d < distThreshold) {
names.push_back({d, from});
}
}

std::ranges::sort(names, {}, [](const auto &n) { return n.first; });

Expand All @@ -84,3 +89,36 @@ std::string ActsExamples::WhiteBoard::typeMismatchMessage(
boost::core::demangle(req) + " but actually " +
boost::core::demangle(act)};
}

void ActsExamples::WhiteBoard::copyFrom(const WhiteBoard &other) {
for (auto &[key, val] : other.m_store) {
addHolder(key, val);
ACTS_VERBOSE("Copied key '" << key << "' to whiteboard");
}
}
benjaminhuth marked this conversation as resolved.
Show resolved Hide resolved

void ActsExamples::WhiteBoard::addHolder(const std::string &name,
std::shared_ptr<IHolder> holder) {
if (name.empty()) {
throw std::invalid_argument("Object can not have an empty name");
}

if (holder == nullptr) {
throw std::invalid_argument("Object '" + name + "' is nullptr");
}

auto [storeIt, success] = m_store.insert({name, std::move(holder)});

if (!success) {
throw std::invalid_argument("Object '" + name + "' already exists");
}
ACTS_VERBOSE("Added object '" << name << "' of type "
<< storeIt->second->type().name());

if (success) {
if (auto it = m_objectAliases.find(name); it != m_objectAliases.end()) {
m_store[it->second] = storeIt->second;
ACTS_VERBOSE("Added alias object '" << it->second << "'");
}
}
}
6 changes: 6 additions & 0 deletions Examples/Python/src/Input.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "Acts/Plugins/Python/Utilities.hpp"
#include "ActsExamples/EventData/Cluster.hpp"
#include "ActsExamples/Framework/BufferedReader.hpp"
#include "ActsExamples/Io/Csv/CsvDriftCircleReader.hpp"
#include "ActsExamples/Io/Csv/CsvExaTrkXGraphReader.hpp"
#include "ActsExamples/Io/Csv/CsvMeasurementReader.hpp"
Expand Down Expand Up @@ -39,6 +40,11 @@ namespace Acts::Python {
void addInput(Context& ctx) {
auto mex = ctx.get("examples");

// Buffered reader
ACTS_PYTHON_DECLARE_READER(ActsExamples::BufferedReader, mex,
"BufferedReader", upstreamReader, selectionSeed,
bufferSize);

// ROOT READERS
ACTS_PYTHON_DECLARE_READER(ActsExamples::RootParticleReader, mex,
"RootParticleReader", outputParticles, treeName,
Expand Down
49 changes: 49 additions & 0 deletions Examples/Python/tests/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,3 +435,52 @@ def test_edm4hep_tracks_reader(tmp_path):
)

s.run()


@pytest.mark.root
def test_buffered_reader(tmp_path, conf_const, ptcl_gun):
# Test the buffered reader with the ROOT particle reader
# need to write out some particles first
eventsInBuffer = 5
eventsToProcess = 10

s = Sequencer(numThreads=1, events=eventsInBuffer, logLevel=acts.logging.WARNING)
evGen = ptcl_gun(s)

file = tmp_path / "particles.root"
s.addWriter(
conf_const(
RootParticleWriter,
acts.logging.WARNING,
inputParticles=evGen.config.outputParticles,
filePath=str(file),
)
)

s.run()

# reset sequencer for reading
s2 = Sequencer(events=eventsToProcess, numThreads=1, logLevel=acts.logging.WARNING)

reader = acts.examples.RootParticleReader(
level=acts.logging.WARNING,
outputParticles="particles_input",
filePath=str(file),
)

s2.addReader(
acts.examples.BufferedReader(
level=acts.logging.WARNING,
upstreamReader=reader,
bufferSize=eventsInBuffer,
)
)

alg = AssertCollectionExistsAlg(
"particles_input", "check_alg", acts.logging.WARNING
)
s2.addAlgorithm(alg)

s2.run()

assert alg.events_seen == eventsToProcess
Loading