Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions examples/8b_benchmark_read_parallel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,14 @@ class TestInput
return;
Timer blockTime(s.str(), m_MPIRank);

Offset off(meshExtent.size(), 0);
Extent ext(meshExtent.size(), 1);
// Avoid the below forms due to a compiler bug on gcc14
// warning: 'void operator delete(void*, std::size_t)' called on pointer
// '<unknown>' with nonzero offset [1, 9223372036854775800] Offset
// off(meshExtent.size(), 0); Extent ext(meshExtent.size(), 1);
Offset off(meshExtent.size());
Extent ext(meshExtent.size());
std::fill(off.begin(), off.end(), 0);
std::fill(ext.begin(), ext.end(), 1);

for (unsigned int i = 0; i < meshExtent.size(); i++)
{
Expand Down Expand Up @@ -486,7 +492,7 @@ class TestInput
}
}

auto prettyLambda = [&](Offset oo, Extent cc) {
auto prettyLambda = [&](Offset const &oo, Extent const &cc) {
std::ostringstream o;
o << "[ ";
std::ostringstream c;
Expand Down
14 changes: 14 additions & 0 deletions include/openPMD/Series.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,18 @@ class Series : public Attributable
*/
Series &setMeshesPath(std::string const &meshesPath);

/**
* @return True if there is a rankTable dataset defined for this Series.
* False in write-only access modes.
* Will indiscriminately return false in file encoding:
* rankTable is not explicitly supported in file encoding,
* finding out if one is defined would require opening all available
* files. You may still call Series::rankTable() to retrieve the
* rank table if you know that there is one, but note that it will
* return the rank table from the file that was last opened.
*/
bool hasRankTableRead();

/**
* @throw no_such_attribute_error If optional attribute is not present.
* @param collective Run this read operation collectively.
Expand Down Expand Up @@ -1008,6 +1020,8 @@ OPENPMD_private
* steps?
*/
[[nodiscard]] bool randomAccessSteps() const;

std::vector<std::string> availableDatasets();
}; // Series

namespace debug
Expand Down
1 change: 0 additions & 1 deletion include/openPMD/auxiliary/Variant.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <any>
#include <cstddef>
#include <type_traits>
#include <variant> // IWYU pragma: export

namespace openPMD
{
Expand Down
11 changes: 9 additions & 2 deletions include/openPMD/snapshots/ContainerImpls.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,15 @@ class StatefulSnapshotsContainer : public AbstractSnapshotsContainer
* make_reading_stateful_iterator.
* The iterator is resolved upon calling get() below.
*/
std::variant<std::function<StatefulIterator *()>, StatefulIterator *>
m_bufferedIterator;

// Need to put the deferred function behind a shared_ptr to avoid a
// gcc14 compiler bug
// warning: '*(std::_Function_base*)((char*)this +
// 8).std::_Function_base::_M_manager' may be used uninitialized
using Deferred_t = std::shared_ptr<std::function<StatefulIterator *()>>;
using Evaluated_t = StatefulIterator *;
using BufferedIterator_t = std::variant<Deferred_t, Evaluated_t>;
BufferedIterator_t m_bufferedIterator = nullptr;
};
Members members;

Expand Down
11 changes: 7 additions & 4 deletions src/IO/HDF5/HDF5IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3373,20 +3373,23 @@ std::future<void> HDF5IOHandlerImpl::flush(internal::ParsedFlushParams &params)
if (params.backendConfig.json().contains("hdf5"))
{
auto hdf5_config = params.backendConfig["hdf5"];
auto init_json_shadow = nlohmann::json::parse(flush_cfg_mask);
json::merge_internal(
hdf5_config.getShadow(), init_json_shadow, /* do_prune = */ false);

if (auto shadow = hdf5_config.invertShadow(); shadow.size() > 0)
{
switch (hdf5_config.originallySpecifiedAs)
{
case json::SupportedLanguages::JSON:
std::cerr << "Warning: parts of the backend configuration for "
"HDF5 remain unused:\n"
std::cerr << "Warning: parts of the backend flush "
"configuration for HDF5 remain unused:\n"
<< shadow << std::endl;
break;
case json::SupportedLanguages::TOML: {
auto asToml = json::jsonToToml(shadow);
std::cerr << "Warning: parts of the backend configuration for "
"HDF5 remain unused:\n"
std::cerr << "Warning: parts of the backend flush "
"configuration for HDF5 remain unused:\n"
<< json::format_toml(asToml) << std::endl;
break;
}
Expand Down
77 changes: 52 additions & 25 deletions src/Series.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,38 @@ Series &Series::setMeshesPath(std::string const &mp)
return *this;
}

std::vector<std::string> Series::availableDatasets()
{
if (iterationEncoding() == IterationEncoding::variableBased &&
IOHandler()->m_backendAccess == Access::READ_RANDOM_ACCESS)
{
Parameter<Operation::ADVANCE> advance;
advance.mode =
Parameter<Operation::ADVANCE>::StepSelection{std::nullopt};
IOHandler()->enqueue(IOTask(this, std::move(advance)));
}
Parameter<Operation::LIST_DATASETS> listDatasets;
IOHandler()->enqueue(IOTask(this, listDatasets));
IOHandler()->flush(internal::defaultFlushParams);
return std::move(*listDatasets.datasets);
}

bool Series::hasRankTableRead()
{
if (access::writeOnly(IOHandler()->m_frontendAccess))
{
return false;
}
auto &series = get();
if (series.m_rankTable.m_bufferedRead.has_value())
{
return true;
}
auto datasets = availableDatasets();
return std::find(datasets.begin(), datasets.end(), "rankTable") !=
datasets.end();
}

#if openPMD_HAVE_MPI
chunk_assignment::RankMeta Series::rankTable(bool collective)
#else
Expand Down Expand Up @@ -245,21 +277,9 @@ chunk_assignment::RankMeta Series::rankTable([[maybe_unused]] bool collective)
IOHandler()->enqueue(IOTask(this, openFile));
#endif
}
if (iterationEncoding() == IterationEncoding::variableBased &&
IOHandler()->m_backendAccess == Access::READ_RANDOM_ACCESS)
{
Parameter<Operation::ADVANCE> advance;
advance.mode =
Parameter<Operation::ADVANCE>::StepSelection{std::nullopt};
IOHandler()->enqueue(IOTask(this, std::move(advance)));
}
Parameter<Operation::LIST_DATASETS> listDatasets;
IOHandler()->enqueue(IOTask(this, listDatasets));
IOHandler()->flush(internal::defaultFlushParams);
if (std::none_of(
listDatasets.datasets->begin(),
listDatasets.datasets->end(),
[](std::string const &str) { return str == "rankTable"; }))
auto datasets = availableDatasets();
if (std::find(datasets.begin(), datasets.end(), "rankTable") ==
datasets.end())
{
rankTable.m_bufferedRead = chunk_assignment::RankMeta{};
return {};
Expand Down Expand Up @@ -3119,18 +3139,25 @@ namespace internal
{
this->m_sharedStatefulIterator->close();
}
/*
* Scenario: A user calls `Series::flush()` but does not check for
* thrown exceptions. The exception will propagate further up,
* usually thereby popping the stack frame that holds the `Series`
* object. `Series::~Series()` will run. This check avoids that the
* `Series` is needlessly flushed a second time. Otherwise, error
* messages can get very confusing.
*/
Series impl;
impl.setData({this, [](auto const *) {}});
if (auto IOHandler = impl.IOHandler();
IOHandler && IOHandler->m_lastFlushSuccessful)
if (auto IOHandler = impl.IOHandler(); IOHandler &&
/*
* Scenario: A user calls `Series::flush()` but does not check for
* thrown exceptions. The exception will propagate further up,
* usually thereby popping the stack frame that holds the `Series`
* object. `Series::~Series()` will run. This check avoids that the
* `Series` is needlessly flushed a second time. Otherwise, error
* messages can get very confusing.
*/

IOHandler->m_lastFlushSuccessful &&
/*
* If a read-only Series is opened without any backend access, then
* don't go there now. Just peacefully close.
*/
!(access::readOnly(IOHandler->m_frontendAccess) &&
!(*this)->m_writable.written))
{
impl.flush();
/*
Expand Down
1 change: 1 addition & 0 deletions src/binding/python/Series.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ this method.
.def_property("base_path", &Series::basePath, &Series::setBasePath)
.def_property(
"meshes_path", &Series::meshesPath, &Series::setMeshesPath)
.def_property_readonly("has_rank_table_read", &Series::hasRankTableRead)
.def("get_rank_table", &Series::rankTable, py::arg("collective"))
.def("set_rank_table", &Series::setRankTable, py::arg("my_rank_info"))
.def_property(
Expand Down
5 changes: 4 additions & 1 deletion src/binding/python/openpmd_api/pipe/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,10 @@ def __copy(self, src, dest, current_path="/data/"):
print("\t {0}".format(r))
# With linear read mode, we can only load the source rank table
# inside `read_iterations()` since it's a dataset.
self.inranks = src.get_rank_table(collective=True)
if src.has_rank_table_read:
self.inranks = src.get_rank_table(collective=True)
else:
self.inranks = {}
out_iteration = write_iterations[in_iteration.iteration_index]
sys.stdout.flush()
self.__copy(
Expand Down
27 changes: 20 additions & 7 deletions src/snapshots/ContainerImpls.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,22 @@ namespace openPMD
{
StatefulSnapshotsContainer::StatefulSnapshotsContainer(
std::variant<std::function<StatefulIterator *()>, StatefulIterator *> begin)
: members{std::move(begin)}
: members{
// Need to put the deferred function behind a shared_ptr to avoid a
// gcc14 compiler bug
// warning: '*(std::_Function_base*)((char*)this +
// 8).std::_Function_base::_M_manager' may be used uninitialized
std::visit(
auxiliary::overloaded{
[](std::function<StatefulIterator *()> &&f)
-> Members::BufferedIterator_t {
return std::make_shared<
Members::Deferred_t::element_type>(std::move(f));
},
[](StatefulIterator *it) -> Members::BufferedIterator_t {
return it;
}},
std::move(begin))}
{}

StatefulSnapshotsContainer::StatefulSnapshotsContainer(
Expand All @@ -33,21 +48,19 @@ auto StatefulSnapshotsContainer::get() -> StatefulIterator *
{
return std::visit(
auxiliary::overloaded{
[this](
std::function<StatefulIterator *()> &deferred_initialization) {
auto it = deferred_initialization();
[this](Members::Deferred_t &deferred_initialization) {
auto it = (*deferred_initialization)();
this->members.m_bufferedIterator = it;
return it;
},
[](StatefulIterator *it) { return it; }},
[](Members::Evaluated_t it) { return it; }},
members.m_bufferedIterator);
}
auto StatefulSnapshotsContainer::get() const -> StatefulIterator const *
{
return std::visit(
auxiliary::overloaded{
[](std::function<StatefulIterator *()> const &)
-> StatefulIterator const * {
[](Members::Deferred_t const &) -> StatefulIterator const * {
throw std::runtime_error(
"[StatefulSnapshotscontainer] Initialization has been "
"deferred, but container is accessed as const, so cannot "
Expand Down
Loading