diff --git a/docs/source/details/backendconfig.rst b/docs/source/details/backendconfig.rst index 9d5e1dcf2c..185c111526 100644 --- a/docs/source/details/backendconfig.rst +++ b/docs/source/details/backendconfig.rst @@ -94,6 +94,34 @@ Using the Streaming API (i.e. ``SeriesInterface::readIteration()``) will do this Parsing eagerly might be very expensive for a Series with many iterations, but will avoid bugs by forgotten calls to ``Iteration::open()``. In complex environments, calling ``Iteration::open()`` on an already open environment does no harm (and does not incur additional runtime cost for additional ``open()`` calls). +By default, the library will print a warning to suggest using deferred Iteration parsing when opening a Series takes long. +The timeout can be tuned by the JSON/TOML key ``hint_lazy_parsing_timeout`` (integer, seconds): +if set to a positive value, the library will print periodic warnings to stderr when eager parsing of Iterations takes longer than the specified number of seconds (default: ``20``). Setting this option to ``0`` disables the warnings. + +Environment variables may alternatively be used for options concerning deferred iteration parsing: + +* Environment variable ``OPENPMD_DEFER_ITERATION_PARSING``: if set to a truthy value (e.g. ``1``), the Series will be opened with deferred iteration parsing as if ``{"defer_iteration_parsing": true}`` had been supplied. +* Environment variable ``OPENPMD_HINT_LAZY_PARSING_TIMEOUT``: accepts integral values equivalent to the ``hint_lazy_parsing_timeout`` key. + +Examples: + +.. code-block:: bash + + # enable lazy parsing via env var + export OPENPMD_DEFER_ITERATION_PARSING=1 + + # disable the parsing hint/warning + export OPENPMD_HINT_LAZY_PARSING_TIMEOUT=0 + +Or in a Series constructor JSON/TOML configuration: + +.. code-block:: json + + { + "defer_iteration_parsing": true, + "hint_lazy_parsing_timeout": 20 + } + The key ``resizable`` can be passed to ``Dataset`` options. It if set to ``{"resizable": true}``, this declares that it shall be allowed to increased the ``Extent`` of a ``Dataset`` via ``resetDataset()`` at a later time, i.e., after it has been first declared (and potentially written). For HDF5, resizable Datasets come with a performance penalty. diff --git a/examples/2_read_serial.cpp b/examples/2_read_serial.cpp index 95bfbaeadf..08be3f4ac9 100644 --- a/examples/2_read_serial.cpp +++ b/examples/2_read_serial.cpp @@ -29,8 +29,10 @@ using namespace openPMD; int main() { - Series series = - Series("../samples/git-sample/data%T.h5", Access::READ_ONLY); + Series series = Series( + "../samples/git-sample/data%T.h5", + Access::READ_ONLY, + R"({"defer_iteration_parsing": true})"); cout << "Read a Series with openPMD standard version " << series.openPMD() << '\n'; @@ -40,7 +42,8 @@ int main() cout << "\n\t" << i.first; cout << '\n'; - Iteration i = series.snapshots()[100]; + // with defer_iteration_parsing, open() must be called explicitly + Iteration i = series.snapshots()[100].open(); cout << "Iteration 100 contains " << i.meshes.size() << " meshes:"; for (auto const &m : i.meshes) cout << "\n\t" << m.first; diff --git a/examples/2_read_serial.py b/examples/2_read_serial.py index f750929a30..3632471757 100644 --- a/examples/2_read_serial.py +++ b/examples/2_read_serial.py @@ -10,7 +10,9 @@ if __name__ == "__main__": series = io.Series("../samples/git-sample/data%T.h5", - io.Access.read_only) + io.Access.read_only, { + "defer_iteration_parsing": True + }) print("Read a Series with openPMD standard version %s" % series.openPMD) @@ -20,7 +22,8 @@ print("\t {0}".format(i)) print("") - i = series.snapshots()[100] + # with defer_iteration_parsing, open() must be called explicitly + i = series.snapshots()[100].open() print("Iteration 100 contains {0} meshes:".format(len(i.meshes))) for m in i.meshes: print("\t {0}".format(m)) diff --git a/examples/2a_read_thetaMode_serial.cpp b/examples/2a_read_thetaMode_serial.cpp index 84c3de0ee5..00fed37636 100644 --- a/examples/2a_read_thetaMode_serial.cpp +++ b/examples/2a_read_thetaMode_serial.cpp @@ -31,10 +31,13 @@ int main() { /* The pattern %E instructs the openPMD-api to determine the file ending * automatically. It can also be given explicitly, e.g. `data%T.h5`. */ - Series series = - Series("../samples/git-sample/thetaMode/data%T.h5", Access::READ_ONLY); + Series series = Series( + "../samples/git-sample/thetaMode/data%T.h5", + Access::READ_ONLY, + R"({"defer_iteration_parsing": true})"); - Iteration i = series.snapshots()[500]; + // defer_iteration_parsing implies that open() must be called explicitly + Iteration i = series.snapshots()[500].open(); MeshRecordComponent E_z_modes = i.meshes["E"]["z"]; Extent extent = E_z_modes.getExtent(); // (modal components, r, z) diff --git a/examples/2a_read_thetaMode_serial.py b/examples/2a_read_thetaMode_serial.py index 9e55857a6f..3bef932c18 100644 --- a/examples/2a_read_thetaMode_serial.py +++ b/examples/2a_read_thetaMode_serial.py @@ -12,9 +12,12 @@ # The pattern %E instructs the openPMD-api to determine the file ending # automatically. It can also be given explicitly, e.g. `data%T.h5`. series = io.Series("../samples/git-sample/thetaMode/data%T.h5", - io.Access.read_only) + io.Access.read_only, { + "defer_iteration_parsing": True + }) - i = series.snapshots()[500] + # with defer_iteration_parsing, open() must be called explicitly + i = series.snapshots()[500].open() E_z_modes = i.meshes["E"]["z"] shape = E_z_modes.shape # (modal components, r, z) diff --git a/examples/4_read_parallel.cpp b/examples/4_read_parallel.cpp index 6c9ef57b6a..5f07a755d7 100644 --- a/examples/4_read_parallel.cpp +++ b/examples/4_read_parallel.cpp @@ -40,11 +40,17 @@ int main(int argc, char *argv[]) MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank); Series series = Series( - "../samples/git-sample/data%T.h5", Access::READ_ONLY, MPI_COMM_WORLD); + "../samples/git-sample/data%T.h5", + Access::READ_ONLY, + MPI_COMM_WORLD, + R"({"defer_iteration_parsing": true})"); if (0 == mpi_rank) cout << "Read a series in parallel with " << mpi_size << " MPI ranks\n"; - MeshRecordComponent E_x = series.snapshots()[100].meshes["E"]["x"]; + // with defer_iteration_parsing, open() must be called explicitly + // explicit Iteration opening is recommended in general for parallel + // applications + MeshRecordComponent E_x = series.snapshots()[100].open().meshes["E"]["x"]; Offset chunk_offset = {static_cast(mpi_rank) + 1, 1, 1}; Extent chunk_extent = {2, 2, 1}; diff --git a/examples/4_read_parallel.py b/examples/4_read_parallel.py index 018f33f498..91075e6d90 100644 --- a/examples/4_read_parallel.py +++ b/examples/4_read_parallel.py @@ -20,13 +20,17 @@ series = io.Series( "../samples/git-sample/data%T.h5", io.Access.read_only, - comm + comm, { + "defer_iteration_parsing": True + } ) if 0 == comm.rank: print("Read a series in parallel with {} MPI ranks".format( comm.size)) - E_x = series.snapshots()[100].meshes["E"]["x"] + # with defer_iteration_parsing, open() must be called explicitly + # explicit use of open() is recommended for parallel applications + E_x = series.snapshots()[100].open().meshes["E"]["x"] chunk_offset = [comm.rank + 1, 1, 1] chunk_extent = [2, 2, 1] diff --git a/examples/6_dump_filebased_series.cpp b/examples/6_dump_filebased_series.cpp index d29d0aac61..1ba259481d 100644 --- a/examples/6_dump_filebased_series.cpp +++ b/examples/6_dump_filebased_series.cpp @@ -7,8 +7,10 @@ using namespace openPMD; int main() { - Series o = - Series("../samples/git-sample/data%T.h5", Access::READ_RANDOM_ACCESS); + Series o = Series( + "../samples/git-sample/data%T.h5", + Access::READ_RANDOM_ACCESS, + R"({"defer_iteration_parsing": true})"); std::cout << "Read iterations "; for (auto const &val : o.snapshots()) @@ -39,6 +41,8 @@ int main() for (auto &[index, i] : o.snapshots()) { + // with defer_iteration_parsing, open() must be called explicitly + i.open(); std::cout << "Read attributes in iteration " << index << ":\n"; for (auto const &val : i.attributes()) std::cout << '\t' << val << '\n'; diff --git a/include/openPMD/Series.hpp b/include/openPMD/Series.hpp index 85387e6473..75c91675d0 100644 --- a/include/openPMD/Series.hpp +++ b/include/openPMD/Series.hpp @@ -217,6 +217,7 @@ namespace internal * True if a user opts into lazy parsing. */ bool m_parseLazily = false; + uint64_t m_hintLazyParsingAfterTimeout = 20; // seconds /** * In variable-based encoding, all backends except ADIOS2 can only write diff --git a/src/Series.cpp b/src/Series.cpp index 65ee793745..e71ae49c65 100644 --- a/src/Series.cpp +++ b/src/Series.cpp @@ -32,6 +32,7 @@ #include "openPMD/IterationEncoding.hpp" #include "openPMD/ThrowError.hpp" #include "openPMD/auxiliary/Date.hpp" +#include "openPMD/auxiliary/Environment.hpp" #include "openPMD/auxiliary/Filesystem.hpp" #include "openPMD/auxiliary/JSON_internal.hpp" #include "openPMD/auxiliary/Mpi.hpp" @@ -48,6 +49,7 @@ #include #include +#include #include #include #include @@ -126,6 +128,79 @@ namespace int padding, std::string const &postfix, std::optional const &extension); + + struct TimeoutLazyParsing + { + using Clock = std::chrono::system_clock; + Clock::time_point start_parsing, time_of_last_warning; + uint64_t timeout; + bool printed_warning_already = false; + + TimeoutLazyParsing(uint64_t timeout_in) : timeout(timeout_in) + { + if (timeout > 0) + { + start_parsing = Clock::now(); + time_of_last_warning = start_parsing; + } + } + + void now(size_t current_iteration_count, size_t total_iteration_count) + { + if (timeout == 0) + { + return; + } + auto current = Clock::now(); + auto diff = std::chrono::duration_cast( + current - time_of_last_warning); + if (uint64_t(diff.count()) >= timeout) + { + auto total_diff = + std::chrono::duration_cast( + current - start_parsing); + if (!printed_warning_already) + { + std::cerr << &R"END( +[openPMD] WARNING: Parsing Iterations is taking a long time. +Consider using deferred Iteration parsing in order to open the Series lazily. +This can be achieved by either setting an environment variable: + +> export OPENPMD_DEFER_ITERATION_PARSING=1 + +Or by specifying it as part of a JSON/TOML configuration: + +> // C++: +> Series simData("my_data_%T.%E", R"({"defer_iteration_parsing": true})"); +> // Python: +> simData = opmd.Series("my_data_%T.%E", {"defer_iteration_parsing": True}) + +Iterations will then be parsed only upon explicit user request: + +> series.snapshots()[100].open() // new API +> series.iterations[100].open() // old API + +Alternatively, Iterations will be opened implicitly when iterating in +READ_LINEAR access mode. +Refer also to the documentation at https://openpmd-api.readthedocs.io + +This warning can be suppressed also by either specifying +an environment variable: + +> export OPENPMD_HINT_LAZY_PARSING_TIMEOUT=0 + +Or by the JSON/TOML option {"hint_lazy_parsing_timeout": 0}. +)END"[1] << '\n'; + printed_warning_already = true; + } + std::cerr << "Elapsed time: " << total_diff.count() + << "s, parsed " << current_iteration_count << " of " + << total_iteration_count << " Iterations." + << std::endl; + time_of_last_warning = current; + } + } + }; } // namespace struct Series::ParsedInput @@ -1757,6 +1832,10 @@ void Series::readFileBased( { bool atLeastOneIterationSuccessful = false; std::optional forwardFirstError; + + TimeoutLazyParsing timeout(series.m_hintLazyParsingAfterTimeout); + + size_t read_iterations = 0; for (auto &iteration : series.iterations) { if (read_only_this_single_iteration.has_value() && @@ -1764,6 +1843,10 @@ void Series::readFileBased( { continue; } + if (!read_only_this_single_iteration.has_value()) + { + timeout.now(read_iterations, iterations.size()); + } if (auto error = readIterationEagerly(iteration.second); error) { std::cerr << "Cannot read iteration '" << iteration.first @@ -1779,6 +1862,7 @@ void Series::readFileBased( { atLeastOneIterationSuccessful = true; } + ++read_iterations; } if (!atLeastOneIterationSuccessful) { @@ -2157,6 +2241,10 @@ creating new iterations. auto currentSteps = currentSnapshot(); + TimeoutLazyParsing timeout{ + series.m_parseLazily ? 0 : series.m_hintLazyParsingAfterTimeout}; + size_t parsed_iterations = 0; + switch (iterationEncoding()) { case IterationEncoding::groupBased: @@ -2175,6 +2263,10 @@ creating new iterations. { continue; } + if (!read_only_this_single_iteration.has_value()) + { + timeout.now(parsed_iterations, pList.paths->size()); + } if (auto err = internal::withRWAccess( IOHandler()->m_seriesStatus, [&]() { @@ -2198,6 +2290,7 @@ creating new iterations. { readableIterations.push_back(index); } + ++parsed_iterations; } if (currentSteps.has_value()) { @@ -2254,6 +2347,10 @@ creating new iterations. for (auto it : *currentSteps) { + if (!read_only_this_single_iteration.has_value()) + { + timeout.now(parsed_iterations, pList.paths->size()); + } /* * Variable-based iteration encoding relies on steps, so parsing * must happen after opening the first step. @@ -2281,6 +2378,7 @@ creating new iterations. */ throw *err; } + ++parsed_iterations; } return *currentSteps; } @@ -2982,9 +3080,18 @@ namespace * If yes, read it into the specified location. */ template - void - getJsonOption(json::TracingJSON &config, std::string const &key, Dest &dest) + void getJsonOption( + json::TracingJSON &config, + std::string const &key, + Dest &dest, + std::optional envVar = std::nullopt) { + if (envVar.has_value()) + { + dest = auxiliary::getEnvNum(*envVar, dest); + std::cout << "Read from env var " << *envVar << " as: " << dest + << std::endl; + } if (config.json().contains(key)) { dest = config[key].json().get(); @@ -3027,7 +3134,15 @@ void Series::parseJsonOptions(TracingJSON &options, ParsedInput &input) { auto &series = get(); getJsonOption( - options, "defer_iteration_parsing", series.m_parseLazily); + options, + "defer_iteration_parsing", + series.m_parseLazily, + "OPENPMD_DEFER_ITERATION_PARSING"); + getJsonOption( + options, + "hint_lazy_parsing_timeout", + series.m_hintLazyParsingAfterTimeout, + "OPENPMD_HINT_LAZY_PARSING_TIMEOUT"); internal::SeriesData::SourceSpecifiedViaJSON rankTableSource; if (getJsonOptionLowerCase(options, "rank_table", rankTableSource.value)) { diff --git a/src/binding/python/openpmd_api/DataFrame.py b/src/binding/python/openpmd_api/DataFrame.py index 55db5d1769..1ee68f79d8 100644 --- a/src/binding/python/openpmd_api/DataFrame.py +++ b/src/binding/python/openpmd_api/DataFrame.py @@ -113,11 +113,11 @@ def iterations_to_dataframe(series, species_name): df = pd.concat( ( - series.iterations[i] + iteration .particles[species_name] .to_df() .assign(iteration=i) - for i in series.iterations + for i, iteration in series.snapshots().items() ), axis=0, ignore_index=True, @@ -170,12 +170,12 @@ def iterations_to_cudf(series, species_name): cdf = cudf.concat( ( cudf.from_pandas( - series.iterations[i] - .particles[species_name] - .to_df() - .assign(iteration=i) + iteration + .particles[species_name] + .to_df() + .assign(iteration=i) ) - for i in series.iterations + for i, iteration in series.snapshots().items() ), axis=0, ignore_index=True,