Skip to content

Commit

Permalink
Optional debugging output for AbstractIOHandlerImpl::flush() (#1495)
Browse files Browse the repository at this point in the history
* Optional debugging output for AbstractIOHandlerImpl::flush()

* Add an environment variable for this
  • Loading branch information
franzpoeschel authored Aug 10, 2023
1 parent 733ef8d commit 0387b6f
Show file tree
Hide file tree
Showing 3 changed files with 364 additions and 190 deletions.
6 changes: 6 additions & 0 deletions docs/source/usage/workflow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,9 @@ Attributes are (currently) unaffected by this:
Some backends (e.g. the BP5 engine of ADIOS2) have multiple implementations for the openPMD-api-level guarantees of flush points.
For user-guided selection of such implementations, ``Series::flush`` and ``Attributable::seriesFlush()`` take an optional JSON/TOML string as a parameter.
See the section on :ref:`backend-specific configuration <backendconfig>` for details.

Deferred Data API Contract
--------------------------

A verbose debug log can optionally be printed to the standard error output by specifying the environment variable ``OPENPMD_VERBOSE=1``.
Note that this functionality is at the current time still relatively basic.
197 changes: 7 additions & 190 deletions include/openPMD/IO/AbstractIOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include "openPMD/auxiliary/DerefDynamicCast.hpp"

#include <future>
#include <iostream>

namespace openPMD
{
Expand All @@ -36,198 +35,11 @@ class Writable;
class AbstractIOHandlerImpl
{
public:
AbstractIOHandlerImpl(AbstractIOHandler *handler) : m_handler{handler}
{}
AbstractIOHandlerImpl(AbstractIOHandler *handler);

virtual ~AbstractIOHandlerImpl() = default;

std::future<void> flush()
{
using namespace auxiliary;

while (!(*m_handler).m_work.empty())
{
IOTask &i = (*m_handler).m_work.front();
try
{
switch (i.operation)
{
using O = Operation;
case O::CREATE_FILE:
createFile(
i.writable,
deref_dynamic_cast<Parameter<Operation::CREATE_FILE> >(
i.parameter.get()));
break;
case O::CHECK_FILE:
checkFile(
i.writable,
deref_dynamic_cast<Parameter<Operation::CHECK_FILE> >(
i.parameter.get()));
break;
case O::CREATE_PATH:
createPath(
i.writable,
deref_dynamic_cast<Parameter<O::CREATE_PATH> >(
i.parameter.get()));
break;
case O::CREATE_DATASET:
createDataset(
i.writable,
deref_dynamic_cast<Parameter<O::CREATE_DATASET> >(
i.parameter.get()));
break;
case O::EXTEND_DATASET:
extendDataset(
i.writable,
deref_dynamic_cast<Parameter<O::EXTEND_DATASET> >(
i.parameter.get()));
break;
case O::OPEN_FILE:
openFile(
i.writable,
deref_dynamic_cast<Parameter<O::OPEN_FILE> >(
i.parameter.get()));
break;
case O::CLOSE_FILE:
closeFile(
i.writable,
deref_dynamic_cast<Parameter<O::CLOSE_FILE> >(
i.parameter.get()));
break;
case O::OPEN_PATH:
openPath(
i.writable,
deref_dynamic_cast<Parameter<O::OPEN_PATH> >(
i.parameter.get()));
break;
case O::CLOSE_PATH:
closePath(
i.writable,
deref_dynamic_cast<Parameter<O::CLOSE_PATH> >(
i.parameter.get()));
break;
case O::OPEN_DATASET:
openDataset(
i.writable,
deref_dynamic_cast<Parameter<O::OPEN_DATASET> >(
i.parameter.get()));
break;
case O::DELETE_FILE:
deleteFile(
i.writable,
deref_dynamic_cast<Parameter<O::DELETE_FILE> >(
i.parameter.get()));
break;
case O::DELETE_PATH:
deletePath(
i.writable,
deref_dynamic_cast<Parameter<O::DELETE_PATH> >(
i.parameter.get()));
break;
case O::DELETE_DATASET:
deleteDataset(
i.writable,
deref_dynamic_cast<Parameter<O::DELETE_DATASET> >(
i.parameter.get()));
break;
case O::DELETE_ATT:
deleteAttribute(
i.writable,
deref_dynamic_cast<Parameter<O::DELETE_ATT> >(
i.parameter.get()));
break;
case O::WRITE_DATASET:
writeDataset(
i.writable,
deref_dynamic_cast<Parameter<O::WRITE_DATASET> >(
i.parameter.get()));
break;
case O::WRITE_ATT:
writeAttribute(
i.writable,
deref_dynamic_cast<Parameter<O::WRITE_ATT> >(
i.parameter.get()));
break;
case O::READ_DATASET:
readDataset(
i.writable,
deref_dynamic_cast<Parameter<O::READ_DATASET> >(
i.parameter.get()));
break;
case O::GET_BUFFER_VIEW:
getBufferView(
i.writable,
deref_dynamic_cast<Parameter<O::GET_BUFFER_VIEW> >(
i.parameter.get()));
break;
case O::READ_ATT:
readAttribute(
i.writable,
deref_dynamic_cast<Parameter<O::READ_ATT> >(
i.parameter.get()));
break;
case O::LIST_PATHS:
listPaths(
i.writable,
deref_dynamic_cast<Parameter<O::LIST_PATHS> >(
i.parameter.get()));
break;
case O::LIST_DATASETS:
listDatasets(
i.writable,
deref_dynamic_cast<Parameter<O::LIST_DATASETS> >(
i.parameter.get()));
break;
case O::LIST_ATTS:
listAttributes(
i.writable,
deref_dynamic_cast<Parameter<O::LIST_ATTS> >(
i.parameter.get()));
break;
case O::ADVANCE:
advance(
i.writable,
deref_dynamic_cast<Parameter<O::ADVANCE> >(
i.parameter.get()));
break;
case O::AVAILABLE_CHUNKS:
availableChunks(
i.writable,
deref_dynamic_cast<Parameter<O::AVAILABLE_CHUNKS> >(
i.parameter.get()));
break;
case O::KEEP_SYNCHRONOUS:
keepSynchronous(
i.writable,
deref_dynamic_cast<Parameter<O::KEEP_SYNCHRONOUS> >(
i.parameter.get()));
break;
case O::DEREGISTER:
deregister(
i.writable,
deref_dynamic_cast<Parameter<O::DEREGISTER> >(
i.parameter.get()));
break;
}
}
catch (...)
{
std::cerr << "[AbstractIOHandlerImpl] IO Task "
<< internal::operationAsString(i.operation)
<< " failed with exception. Clearing IO queue and "
"passing on the exception."
<< std::endl;
while (!m_handler->m_work.empty())
{
m_handler->m_work.pop();
}
throw;
}
(*m_handler).m_work.pop();
}
return std::future<void>();
}
std::future<void> flush();

/**
* Close the file corresponding with the writable and release file handles.
Expand Down Expand Up @@ -592,5 +404,10 @@ class AbstractIOHandlerImpl
deregister(Writable *, Parameter<Operation::DEREGISTER> const &param) = 0;

AbstractIOHandler *m_handler;
bool m_verboseIOTasks = false;

// Args will be forwarded to std::cerr if m_verboseIOTasks is true
template <typename... Args>
void writeToStderr(Args &&...) const;
}; // AbstractIOHandlerImpl
} // namespace openPMD
Loading

0 comments on commit 0387b6f

Please sign in to comment.