Skip to content
Merged
28 changes: 28 additions & 0 deletions docs/source/details/backendconfig.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,34 @@ Using the Streaming API (i.e. ``SeriesInterface::readIteration()``) will do this
Parsing eagerly might be very expensive for a Series with many iterations, but will avoid bugs by forgotten calls to ``Iteration::open()``.
In complex environments, calling ``Iteration::open()`` on an already open environment does no harm (and does not incur additional runtime cost for additional ``open()`` calls).

By default, the library will print a warning to suggest using deferred Iteration parsing when opening a Series takes long.
The timeout can be tuned by the JSON/TOML key ``hint_lazy_parsing_timeout`` (integer, seconds):
if set to a positive value, the library will print periodic warnings to stderr when eager parsing of Iterations takes longer than the specified number of seconds (default: ``20``). Setting this option to ``0`` disables the warnings.

Environment variables may alternatively be used for options concerning deferred iteration parsing:

* Environment variable ``OPENPMD_DEFER_ITERATION_PARSING``: if set to a truthy value (e.g. ``1``), the Series will be opened with deferred iteration parsing as if ``{"defer_iteration_parsing": true}`` had been supplied.
* Environment variable ``OPENPMD_HINT_LAZY_PARSING_TIMEOUT``: accepts integral values equivalent to the ``hint_lazy_parsing_timeout`` key.

Examples:

.. code-block:: bash

# enable lazy parsing via env var
export OPENPMD_DEFER_ITERATION_PARSING=1

# disable the parsing hint/warning
export OPENPMD_HINT_LAZY_PARSING_TIMEOUT=0

Or in a Series constructor JSON/TOML configuration:

.. code-block:: json

{
"defer_iteration_parsing": true,
"hint_lazy_parsing_timeout": 20
}

The key ``resizable`` can be passed to ``Dataset`` options.
It if set to ``{"resizable": true}``, this declares that it shall be allowed to increased the ``Extent`` of a ``Dataset`` via ``resetDataset()`` at a later time, i.e., after it has been first declared (and potentially written).
For HDF5, resizable Datasets come with a performance penalty.
Expand Down
9 changes: 6 additions & 3 deletions examples/2_read_serial.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ using namespace openPMD;

int main()
{
Series series =
Series("../samples/git-sample/data%T.h5", Access::READ_ONLY);
Series series = Series(
"../samples/git-sample/data%T.h5",
Access::READ_ONLY,
R"({"defer_iteration_parsing": true})");
cout << "Read a Series with openPMD standard version " << series.openPMD()
<< '\n';

Expand All @@ -40,7 +42,8 @@ int main()
cout << "\n\t" << i.first;
cout << '\n';

Iteration i = series.snapshots()[100];
// with defer_iteration_parsing, open() must be called explicitly
Iteration i = series.snapshots()[100].open();
cout << "Iteration 100 contains " << i.meshes.size() << " meshes:";
for (auto const &m : i.meshes)
cout << "\n\t" << m.first;
Expand Down
7 changes: 5 additions & 2 deletions examples/2_read_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

if __name__ == "__main__":
series = io.Series("../samples/git-sample/data%T.h5",
io.Access.read_only)
io.Access.read_only, {
"defer_iteration_parsing": True
})
print("Read a Series with openPMD standard version %s" %
series.openPMD)

Expand All @@ -20,7 +22,8 @@
print("\t {0}".format(i))
print("")

i = series.snapshots()[100]
# with defer_iteration_parsing, open() must be called explicitly
i = series.snapshots()[100].open()
print("Iteration 100 contains {0} meshes:".format(len(i.meshes)))
for m in i.meshes:
print("\t {0}".format(m))
Expand Down
9 changes: 6 additions & 3 deletions examples/2a_read_thetaMode_serial.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ int main()
{
/* The pattern %E instructs the openPMD-api to determine the file ending
* automatically. It can also be given explicitly, e.g. `data%T.h5`. */
Series series =
Series("../samples/git-sample/thetaMode/data%T.h5", Access::READ_ONLY);
Series series = Series(
"../samples/git-sample/thetaMode/data%T.h5",
Access::READ_ONLY,
R"({"defer_iteration_parsing": true})");

Iteration i = series.snapshots()[500];
// defer_iteration_parsing implies that open() must be called explicitly
Iteration i = series.snapshots()[500].open();
MeshRecordComponent E_z_modes = i.meshes["E"]["z"];
Extent extent = E_z_modes.getExtent(); // (modal components, r, z)

Expand Down
7 changes: 5 additions & 2 deletions examples/2a_read_thetaMode_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
# The pattern %E instructs the openPMD-api to determine the file ending
# automatically. It can also be given explicitly, e.g. `data%T.h5`.
series = io.Series("../samples/git-sample/thetaMode/data%T.h5",
io.Access.read_only)
io.Access.read_only, {
"defer_iteration_parsing": True
})

i = series.snapshots()[500]
# with defer_iteration_parsing, open() must be called explicitly
i = series.snapshots()[500].open()
E_z_modes = i.meshes["E"]["z"]
shape = E_z_modes.shape # (modal components, r, z)

Expand Down
10 changes: 8 additions & 2 deletions examples/4_read_parallel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,17 @@ int main(int argc, char *argv[])
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);

Series series = Series(
"../samples/git-sample/data%T.h5", Access::READ_ONLY, MPI_COMM_WORLD);
"../samples/git-sample/data%T.h5",
Access::READ_ONLY,
MPI_COMM_WORLD,
R"({"defer_iteration_parsing": true})");
if (0 == mpi_rank)
cout << "Read a series in parallel with " << mpi_size << " MPI ranks\n";

MeshRecordComponent E_x = series.snapshots()[100].meshes["E"]["x"];
// with defer_iteration_parsing, open() must be called explicitly
// explicit Iteration opening is recommended in general for parallel
// applications
MeshRecordComponent E_x = series.snapshots()[100].open().meshes["E"]["x"];

Offset chunk_offset = {static_cast<long unsigned int>(mpi_rank) + 1, 1, 1};
Extent chunk_extent = {2, 2, 1};
Expand Down
8 changes: 6 additions & 2 deletions examples/4_read_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@
series = io.Series(
"../samples/git-sample/data%T.h5",
io.Access.read_only,
comm
comm, {
"defer_iteration_parsing": True
}
)
if 0 == comm.rank:
print("Read a series in parallel with {} MPI ranks".format(
comm.size))

E_x = series.snapshots()[100].meshes["E"]["x"]
# with defer_iteration_parsing, open() must be called explicitly
# explicit use of open() is recommended for parallel applications
E_x = series.snapshots()[100].open().meshes["E"]["x"]

chunk_offset = [comm.rank + 1, 1, 1]
chunk_extent = [2, 2, 1]
Expand Down
8 changes: 6 additions & 2 deletions examples/6_dump_filebased_series.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ using namespace openPMD;

int main()
{
Series o =
Series("../samples/git-sample/data%T.h5", Access::READ_RANDOM_ACCESS);
Series o = Series(
"../samples/git-sample/data%T.h5",
Access::READ_RANDOM_ACCESS,
R"({"defer_iteration_parsing": true})");

std::cout << "Read iterations ";
for (auto const &val : o.snapshots())
Expand Down Expand Up @@ -39,6 +41,8 @@ int main()

for (auto &[index, i] : o.snapshots())
{
// with defer_iteration_parsing, open() must be called explicitly
i.open();
std::cout << "Read attributes in iteration " << index << ":\n";
for (auto const &val : i.attributes())
std::cout << '\t' << val << '\n';
Expand Down
1 change: 1 addition & 0 deletions include/openPMD/Series.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ namespace internal
* True if a user opts into lazy parsing.
*/
bool m_parseLazily = false;
uint64_t m_hintLazyParsingAfterTimeout = 20; // seconds

/**
* In variable-based encoding, all backends except ADIOS2 can only write
Expand Down
121 changes: 118 additions & 3 deletions src/Series.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "openPMD/IterationEncoding.hpp"
#include "openPMD/ThrowError.hpp"
#include "openPMD/auxiliary/Date.hpp"
#include "openPMD/auxiliary/Environment.hpp"
#include "openPMD/auxiliary/Filesystem.hpp"
#include "openPMD/auxiliary/JSON_internal.hpp"
#include "openPMD/auxiliary/Mpi.hpp"
Expand All @@ -48,6 +49,7 @@

#include <algorithm>
#include <cctype>
#include <chrono>
#include <exception>
#include <iomanip>
#include <iostream>
Expand Down Expand Up @@ -126,6 +128,79 @@ namespace
int padding,
std::string const &postfix,
std::optional<std::string> const &extension);

struct TimeoutLazyParsing
{
using Clock = std::chrono::system_clock;
Clock::time_point start_parsing, time_of_last_warning;
uint64_t timeout;
bool printed_warning_already = false;

TimeoutLazyParsing(uint64_t timeout_in) : timeout(timeout_in)
{
if (timeout > 0)
{
start_parsing = Clock::now();
time_of_last_warning = start_parsing;
}
}

void now(size_t current_iteration_count, size_t total_iteration_count)
{
if (timeout == 0)
{
return;
}
auto current = Clock::now();
auto diff = std::chrono::duration_cast<std::chrono::seconds>(
current - time_of_last_warning);
if (uint64_t(diff.count()) >= timeout)
{
auto total_diff =
std::chrono::duration_cast<std::chrono::seconds>(
current - start_parsing);
if (!printed_warning_already)
{
std::cerr << &R"END(
[openPMD] WARNING: Parsing Iterations is taking a long time.
Consider using deferred Iteration parsing in order to open the Series lazily.
This can be achieved by either setting an environment variable:

> export OPENPMD_DEFER_ITERATION_PARSING=1

Or by specifying it as part of a JSON/TOML configuration:

> // C++:
> Series simData("my_data_%T.%E", R"({"defer_iteration_parsing": true})");
> // Python:
> simData = opmd.Series("my_data_%T.%E", {"defer_iteration_parsing": True})

Iterations will then be parsed only upon explicit user request:

> series.snapshots()[100].open() // new API
> series.iterations[100].open() // old API

Alternatively, Iterations will be opened implicitly when iterating in
READ_LINEAR access mode.
Refer also to the documentation at https://openpmd-api.readthedocs.io

This warning can be suppressed also by either specifying
an environment variable:

> export OPENPMD_HINT_LAZY_PARSING_TIMEOUT=0

Or by the JSON/TOML option {"hint_lazy_parsing_timeout": 0}.
)END"[1] << '\n';
printed_warning_already = true;
}
std::cerr << "Elapsed time: " << total_diff.count()
<< "s, parsed " << current_iteration_count << " of "
<< total_iteration_count << " Iterations."
<< std::endl;
time_of_last_warning = current;
}
}
};
} // namespace

struct Series::ParsedInput
Expand Down Expand Up @@ -1757,13 +1832,21 @@ void Series::readFileBased(
{
bool atLeastOneIterationSuccessful = false;
std::optional<error::ReadError> forwardFirstError;

TimeoutLazyParsing timeout(series.m_hintLazyParsingAfterTimeout);

size_t read_iterations = 0;
for (auto &iteration : series.iterations)
{
if (read_only_this_single_iteration.has_value() &&
*read_only_this_single_iteration != iteration.first)
{
continue;
}
if (!read_only_this_single_iteration.has_value())
{
timeout.now(read_iterations, iterations.size());
}
if (auto error = readIterationEagerly(iteration.second); error)
{
std::cerr << "Cannot read iteration '" << iteration.first
Expand All @@ -1779,6 +1862,7 @@ void Series::readFileBased(
{
atLeastOneIterationSuccessful = true;
}
++read_iterations;
}
if (!atLeastOneIterationSuccessful)
{
Expand Down Expand Up @@ -2157,6 +2241,10 @@ creating new iterations.

auto currentSteps = currentSnapshot();

TimeoutLazyParsing timeout{
series.m_parseLazily ? 0 : series.m_hintLazyParsingAfterTimeout};
size_t parsed_iterations = 0;

switch (iterationEncoding())
{
case IterationEncoding::groupBased:
Expand All @@ -2175,6 +2263,10 @@ creating new iterations.
{
continue;
}
if (!read_only_this_single_iteration.has_value())
{
timeout.now(parsed_iterations, pList.paths->size());
}
if (auto err = internal::withRWAccess(
IOHandler()->m_seriesStatus,
[&]() {
Expand All @@ -2198,6 +2290,7 @@ creating new iterations.
{
readableIterations.push_back(index);
}
++parsed_iterations;
}
if (currentSteps.has_value())
{
Expand Down Expand Up @@ -2254,6 +2347,10 @@ creating new iterations.

for (auto it : *currentSteps)
{
if (!read_only_this_single_iteration.has_value())
{
timeout.now(parsed_iterations, pList.paths->size());
}
/*
* Variable-based iteration encoding relies on steps, so parsing
* must happen after opening the first step.
Expand Down Expand Up @@ -2281,6 +2378,7 @@ creating new iterations.
*/
throw *err;
}
++parsed_iterations;
}
return *currentSteps;
}
Expand Down Expand Up @@ -2982,9 +3080,18 @@ namespace
* If yes, read it into the specified location.
*/
template <typename From, typename Dest = From>
void
getJsonOption(json::TracingJSON &config, std::string const &key, Dest &dest)
void getJsonOption(
json::TracingJSON &config,
std::string const &key,
Dest &dest,
std::optional<std::string> envVar = std::nullopt)
{
if (envVar.has_value())
{
dest = auxiliary::getEnvNum(*envVar, dest);
std::cout << "Read from env var " << *envVar << " as: " << dest
<< std::endl;
}
if (config.json().contains(key))
{
dest = config[key].json().get<From>();
Expand Down Expand Up @@ -3027,7 +3134,15 @@ void Series::parseJsonOptions(TracingJSON &options, ParsedInput &input)
{
auto &series = get();
getJsonOption<bool>(
options, "defer_iteration_parsing", series.m_parseLazily);
options,
"defer_iteration_parsing",
series.m_parseLazily,
"OPENPMD_DEFER_ITERATION_PARSING");
getJsonOption<uint64_t>(
options,
"hint_lazy_parsing_timeout",
series.m_hintLazyParsingAfterTimeout,
"OPENPMD_HINT_LAZY_PARSING_TIMEOUT");
internal::SeriesData::SourceSpecifiedViaJSON rankTableSource;
if (getJsonOptionLowerCase(options, "rank_table", rankTableSource.value))
{
Expand Down
Loading
Loading