Skip to content

Commit b276c88

Browse files
More consistent behavior for reopening Iterations (#1810)
* Return last unclosed iteration upon currentIteration * .at(), operator[](): reopen Iterations implicitly * Write access: Better support for reopening Iterations Implicit reopening now supported (except when the Iteration would be from an old step) * Simplify control flow, defer more to ::at() * try this for groupbased too * Some comments
1 parent 49ccc53 commit b276c88

File tree

3 files changed

+153
-84
lines changed

3 files changed

+153
-84
lines changed

src/Iteration.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "openPMD/Error.hpp"
2525
#include "openPMD/IO/AbstractIOHandler.hpp"
2626
#include "openPMD/IO/IOTask.hpp"
27+
#include "openPMD/IterationEncoding.hpp"
2728
#include "openPMD/Series.hpp"
2829
#include "openPMD/Streaming.hpp"
2930
#include "openPMD/auxiliary/DerefDynamicCast.hpp"
@@ -881,7 +882,9 @@ auto Iteration::beginStep(
881882
}
882883
}
883884

884-
res.stepStatus = status;
885+
res.stepStatus = series.iterationEncoding() == IterationEncoding::fileBased
886+
? AdvanceStatus::RANDOMACCESS
887+
: status;
885888
return res;
886889
}
887890

src/snapshots/ContainerImpls.cpp

Lines changed: 73 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include "openPMD/snapshots/IteratorHelpers.hpp"
77
#include "openPMD/snapshots/RandomAccessIterator.hpp"
88
#include "openPMD/snapshots/StatefulIterator.hpp"
9+
#include <cassert>
910
#include <memory>
1011
#include <optional>
1112
#include <stdexcept>
@@ -209,79 +210,82 @@ auto StatefulSnapshotsContainer::operator[](key_type const &key)
209210
{
210211
throw std::runtime_error("Stateful iteration on a read-write Series.");
211212
}
212-
if (access::write(access))
213+
else if (
214+
access::read(access) ||
215+
s.series.iterations.find(key) != s.series.iterations.end())
213216
{
214-
auto lastIteration = base_iterator->peekCurrentlyOpenIteration();
215-
if (lastIteration.has_value())
217+
return at(key);
218+
}
219+
220+
assert(access::write(access));
221+
222+
auto lastIteration = base_iterator->peekCurrentlyOpenIteration();
223+
if (lastIteration.has_value())
224+
{
225+
auto lastIteration_v = lastIteration.value();
226+
if (lastIteration_v->first == key)
216227
{
217-
auto lastIteration_v = lastIteration.value();
218-
if (lastIteration_v->first == key)
219-
{
220-
return s.series.iterations.at(key);
221-
}
222-
else
223-
{
224-
lastIteration_v->second.close(); // continue below
225-
}
228+
return s.series.iterations.at(key);
226229
}
227-
if (auto it = s.series.iterations.find(key);
228-
it == s.series.iterations.end())
230+
else
229231
{
230-
s.currentStep.map_during_t(
231-
[&](detail::CurrentStep::During_t &during) {
232-
++during.step_count;
233-
base_iterator->get().seen_iterations[key] =
234-
during.step_count;
235-
during.iteration_idx = key;
236-
during.available_iterations_in_step = {key};
237-
},
238-
[&](detail::CurrentStep::AtTheEdge where_am_i)
239-
-> detail::CurrentStep::During_t {
240-
base_iterator->get().seen_iterations[key] = 0;
241-
switch (where_am_i)
242-
{
243-
case detail::CurrentStep::AtTheEdge::Begin:
244-
return detail::CurrentStep::During_t{0, key, {key}};
245-
case detail::CurrentStep::AtTheEdge::End:
246-
throw error::Internal(
247-
"Trying to create a new output step, but the "
248-
"stream is "
249-
"closed?");
250-
}
251-
throw std::runtime_error("Unreachable!");
252-
});
232+
lastIteration_v->second.close(); // continue below
253233
}
254-
auto &res = s.series.iterations[key];
255-
if (res.getStepStatus() != StepStatus::DuringStep)
234+
}
235+
236+
// create new
237+
auto &res = s.series.iterations[key];
238+
Iteration::BeginStepStatus status = [&]() {
239+
try
256240
{
257-
try
241+
return res.beginStep(/* reread = */ false);
242+
}
243+
catch (error::OperationUnsupportedInBackend const &)
244+
{
245+
s.series.iterations.retrieveSeries()
246+
.get()
247+
.m_currentlyActiveIterations.clear();
248+
throw;
249+
}
250+
}();
251+
res.setStepStatus(StepStatus::DuringStep);
252+
253+
s.currentStep.map_during_t(
254+
[&](detail::CurrentStep::During_t &during) {
255+
switch (status.stepStatus)
258256
{
259-
res.beginStep(/* reread = */ false);
257+
case AdvanceStatus::OK:
258+
++during.step_count;
259+
during.available_iterations_in_step = {key};
260+
break;
261+
case AdvanceStatus::RANDOMACCESS:
262+
during.available_iterations_in_step.emplace_back(key);
263+
break;
264+
case AdvanceStatus::OVER:
265+
throw error::Internal(
266+
"Backend reported OVER status while trying to create "
267+
"new Iteration.");
260268
}
261-
catch (error::OperationUnsupportedInBackend const &)
269+
base_iterator->get().seen_iterations[key] = during.step_count;
270+
during.iteration_idx = key;
271+
},
272+
[&](detail::CurrentStep::AtTheEdge where_am_i)
273+
-> detail::CurrentStep::During_t {
274+
base_iterator->get().seen_iterations[key] = 0;
275+
switch (where_am_i)
262276
{
263-
s.series.iterations.retrieveSeries()
264-
.get()
265-
.m_currentlyActiveIterations.clear();
266-
throw;
277+
case detail::CurrentStep::AtTheEdge::Begin:
278+
return detail::CurrentStep::During_t{0, key, {key}};
279+
case detail::CurrentStep::AtTheEdge::End:
280+
throw error::Internal(
281+
"Trying to create a new output step, but the "
282+
"stream is "
283+
"closed?");
267284
}
268-
res.setStepStatus(StepStatus::DuringStep);
269-
}
270-
return res;
271-
}
272-
else if (access::read(access))
273-
{
274-
auto &result = base_iterator->seek(
275-
{StatefulIterator::Seek::Seek_Iteration_t{key}});
276-
if (result.is_end())
277-
{
278-
throw std::out_of_range(
279-
"[StatefulSnapshotsContainer::operator[]()] Cannot (yet) skip "
280-
"to a Snapshot from an I/O step that is not active.");
281-
}
282-
return result->second;
283-
}
284-
throw error::Internal("Control flow error: This should be unreachable.");
285+
throw std::runtime_error("Unreachable!");
286+
});
287+
288+
return res;
285289
}
286290

287291
auto StatefulSnapshotsContainer::clear() -> void
@@ -352,14 +356,14 @@ RandomAccessIteratorContainer &RandomAccessIteratorContainer::operator=(
352356
auto RandomAccessIteratorContainer::currentIteration() const
353357
-> std::optional<value_type const *>
354358
{
355-
if (auto begin = m_cont.begin(); begin != m_cont.end())
359+
for (auto begin = m_cont.rbegin(); begin != m_cont.rend(); ++begin)
356360
{
357-
return std::make_optional<value_type const *>(&*begin);
358-
}
359-
else
360-
{
361-
return std::nullopt;
361+
if (!begin->second.closed())
362+
{
363+
return std::make_optional<value_type const *>(&*begin);
364+
}
362365
}
366+
return std::nullopt;
363367
}
364368

365369
auto RandomAccessIteratorContainer::begin() -> iterator

test/Files_SerialIO/close_and_reopen_test.cpp

Lines changed: 76 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ template <typename WriteIterations>
2020
auto run_test_filebased(
2121
Access writeAccess,
2222
WriteIterations &&writeIterations,
23-
std::string const &ext)
23+
std::string const &ext,
24+
bool synchronous)
2425
{
2526
std::string filename =
2627
"../samples/close_iteration_reopen/filebased_%T." + ext;
@@ -39,6 +40,11 @@ auto run_test_filebased(
3940
B_y.resetDataset({Datatype::INT, {5}});
4041
B_y.storeChunk(data, {0}, {5});
4142
it.close();
43+
// This also verifies that operator[] and at() can be used to access the
44+
// Iteration after closing
45+
REQUIRE(series.iterations.at(0).closed());
46+
REQUIRE(writeIterations(series)[0].closed() == !synchronous);
47+
REQUIRE(writeIterations(series).at(0).closed() == !synchronous);
4248
}
4349

4450
{
@@ -54,6 +60,14 @@ auto run_test_filebased(
5460
e_position_x.resetDataset({Datatype::INT, {5}});
5561
e_position_x.storeChunk(data, {0}, {5});
5662
it.close();
63+
REQUIRE(series.iterations.at(1).closed());
64+
REQUIRE(writeIterations(series).at(1).closed() == !synchronous);
65+
REQUIRE(writeIterations(series)[1].closed() == !synchronous);
66+
// We are in file-based iteration encoding, so the old iteration should
67+
// remain accessible
68+
// Note: this will create a particlespath at iteration 0, which will
69+
// lead to parsing warnings in HDF5.
70+
writeIterations(series).at(0);
5771
}
5872
{
5973
auto it = writeIterations(series)[2];
@@ -172,7 +186,8 @@ auto run_test_groupbased(
172186
Access writeAccess,
173187
WriteIterations &&writeIterations,
174188
std::string const &ext,
175-
std::vector<Access> const &readModes)
189+
std::vector<Access> const &readModes,
190+
bool synchronous)
176191
{
177192
std::string filename =
178193
"../samples/close_iteration_reopen/groupbased." + ext;
@@ -202,6 +217,18 @@ auto run_test_groupbased(
202217
B_y.resetDataset({Datatype::INT, {5}});
203218
B_y.storeChunk(data, {0}, {5});
204219
it.close();
220+
// This also verifies that operator[] and at() can be used to access the
221+
// Iteration after closing
222+
REQUIRE(series.iterations.at(0).closed());
223+
REQUIRE(writeIterations(series)[0].closed() == !synchronous);
224+
REQUIRE(writeIterations(series).at(0).closed() == !synchronous);
225+
if (synchronous)
226+
{
227+
// we opened a new step, need to do something in it now,
228+
// otherwise we get a corrupted file
229+
B_y.storeChunk(data, {0}, {5});
230+
it.close();
231+
}
205232
}
206233

207234
{
@@ -217,6 +244,19 @@ auto run_test_groupbased(
217244
E_y.resetDataset({Datatype::INT, {5}});
218245
E_y.storeChunk(data, {0}, {5});
219246
it.close();
247+
248+
if (!synchronous || series.backend() != "ADIOS2")
249+
{
250+
writeIterations(series).at(0);
251+
}
252+
else
253+
{
254+
// Cannot go back to an old IO step
255+
// Since the other backends do not use IO steps,
256+
// going back to an old Iteration should remain possible even
257+
// in synchronous modes
258+
REQUIRE_THROWS(writeIterations(series).at(0));
259+
}
220260
}
221261
{
222262
auto it = writeIterations(series)[2];
@@ -281,19 +321,35 @@ auto close_and_reopen_test() -> void
281321
for (auto writeAccess :
282322
{Access::CREATE_RANDOM_ACCESS, Access::CREATE_LINEAR})
283323
{
324+
bool synchronous = writeAccess == Access::CREATE_LINEAR;
284325
run_test_filebased(
285-
writeAccess, [](Series &s) { return s.iterations; }, "bp");
326+
writeAccess, [](Series &s) { return s.iterations; }, "bp", false);
286327
run_test_filebased(
287-
writeAccess, [](Series &s) { return s.writeIterations(); }, "bp");
328+
writeAccess,
329+
[](Series &s) { return s.writeIterations(); },
330+
"bp",
331+
true);
288332
run_test_filebased(
289-
writeAccess, [](Series &s) { return s.snapshots(); }, "bp");
333+
writeAccess,
334+
[](Series &s) { return s.snapshots(); },
335+
"bp",
336+
synchronous);
290337
run_test_filebased(
291-
writeAccess, [](Series &s) { return s.snapshots(); }, "bp");
338+
writeAccess,
339+
[](Series &s) { return s.snapshots(); },
340+
"bp",
341+
synchronous);
292342
run_test_filebased(
293-
writeAccess, [](Series &s) { return s.snapshots(); }, "json");
343+
writeAccess,
344+
[](Series &s) { return s.snapshots(); },
345+
"json",
346+
synchronous);
294347
#if openPMD_HAVE_HDF5
295348
run_test_filebased(
296-
writeAccess, [](Series &s) { return s.snapshots(); }, "h5");
349+
writeAccess,
350+
[](Series &s) { return s.snapshots(); },
351+
"h5",
352+
synchronous);
297353
#endif
298354

299355
/*
@@ -304,37 +360,43 @@ auto close_and_reopen_test() -> void
304360
writeAccess,
305361
[](Series &s) { return s.iterations; },
306362
"bp4",
307-
{Access::READ_ONLY, Access::READ_LINEAR});
363+
{Access::READ_ONLY, Access::READ_LINEAR},
364+
false);
308365
// since these write data in a way that distributes one iteration's data
309366
// over multiple steps, only random access read mode makes sense
310367
run_test_groupbased(
311368
writeAccess,
312369
[](Series &s) { return s.writeIterations(); },
313370
"bp4",
314-
{Access::READ_RANDOM_ACCESS});
371+
{Access::READ_RANDOM_ACCESS},
372+
true);
315373
run_test_groupbased(
316374
writeAccess,
317375
[](Series &s) { return s.snapshots(); },
318376
"bp4",
319-
{Access::READ_RANDOM_ACCESS});
377+
{Access::READ_RANDOM_ACCESS},
378+
synchronous);
320379
// that doesnt matter for json tho
321380
run_test_groupbased(
322381
writeAccess,
323382
[](Series &s) { return s.snapshots(); },
324383
"json",
325-
{Access::READ_RANDOM_ACCESS, Access::READ_LINEAR});
384+
{Access::READ_RANDOM_ACCESS, Access::READ_LINEAR},
385+
synchronous);
326386
#if openPMD_HAVE_HDF5
327387
run_test_groupbased(
328388
writeAccess,
329389
[](Series &s) { return s.snapshots(); },
330390
"h5",
331-
{Access::READ_RANDOM_ACCESS, Access::READ_LINEAR});
391+
{Access::READ_RANDOM_ACCESS, Access::READ_LINEAR},
392+
synchronous);
332393
#endif
333394
run_test_groupbased(
334395
writeAccess,
335396
[](Series &s) { return s.snapshots(); },
336397
"json",
337-
{Access::READ_RANDOM_ACCESS, Access::READ_LINEAR});
398+
{Access::READ_RANDOM_ACCESS, Access::READ_LINEAR},
399+
synchronous);
338400
}
339401
}
340402
#else

0 commit comments

Comments
 (0)