Skip to content

Commit 18b6d97

Browse files
authored
[release/5.x] Cherry pick: Fix empty committed snapshots: Restore behaviour of writing to temporary uncommitted file and renaming (#7029) (#7034)
1 parent 1fd8065 commit 18b6d97

File tree

4 files changed

+139
-36
lines changed

4 files changed

+139
-36
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
66
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
77

8+
## [5.0.18]
9+
10+
[5.0.18]: https://github.com/microsoft/CCF/releases/tag/5.0.18
11+
12+
### Fixed
13+
14+
- CCF will no longer create in-progress snapshot files with a `.committed` suffix. It will only rename files to `.committed` when they are complete and ready for reading (#7029).
15+
816
## [5.0.17]
917

1018
[5.0.17]: https://github.com/microsoft/CCF/releases/tag/5.0.17

python/src/ccf/ledger.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ def digest(algo, data):
9090
return h.finalize()
9191

9292

93+
def is_snapshot_file_committed(file_name):
94+
return file_name.endswith(COMMITTED_FILE_SUFFIX)
95+
96+
9397
def unpack(stream, fmt):
9498
size = struct.calcsize(fmt)
9599
buf = stream.read(size)
@@ -789,14 +793,23 @@ def __init__(self, filename: str):
789793
self._filename = filename
790794
self._file_size = os.path.getsize(filename)
791795

796+
if self._file_size == 0:
797+
raise InvalidSnapshotException(f"{filename} is currently empty")
798+
792799
entry_start_pos = super()._read_header()
793800

794801
# 1.x snapshots do not include evidence
795802
if self.is_committed() and not self.is_snapshot_file_1_x():
796803
receipt_pos = entry_start_pos + self._header.size
797804
receipt_bytes = _peek_all(self._file, pos=receipt_pos)
798805

799-
receipt = json.loads(receipt_bytes.decode("utf-8"))
806+
try:
807+
receipt = json.loads(receipt_bytes.decode("utf-8"))
808+
except json.decoder.JSONDecodeError as e:
809+
raise InvalidSnapshotException(
810+
f"Cannot read receipt from snapshot {os.path.basename(self._filename)}: Receipt starts at {receipt_pos} (file is {self._file_size} bytes), and contains {receipt_bytes}"
811+
) from e
812+
800813
# Receipts included in snapshots always contain leaf components,
801814
# including a claims digest and commit evidence, from 2.0.0-rc0 onwards.
802815
# This verification code deliberately does not support snapshots
@@ -1074,3 +1087,7 @@ class UntrustedNodeException(Exception):
10741087

10751088
class UnknownTransaction(Exception):
10761089
"""The transaction at seqno does not exist in ledger"""
1090+
1091+
1092+
class InvalidSnapshotException(Exception):
1093+
"""The given snapshot file is invalid and cannot be parsed"""

src/host/snapshots.h

Lines changed: 56 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -254,48 +254,77 @@ namespace asynchost
254254
{
255255
if (snapshot_idx == it->first)
256256
{
257-
// e.g. snapshot_100_105.committed
257+
// e.g. snapshot_100_105
258258
auto file_name = fmt::format(
259-
"{}{}{}{}{}{}",
259+
"{}{}{}{}{}",
260260
snapshot_file_prefix,
261261
snapshot_idx_delimiter,
262262
it->first,
263263
snapshot_idx_delimiter,
264-
it->second.evidence_idx,
265-
snapshot_committed_suffix);
264+
it->second.evidence_idx);
266265
auto full_snapshot_path = snapshot_dir / file_name;
267266

268-
if (fs::exists(full_snapshot_path))
267+
int snapshot_fd = open(
268+
full_snapshot_path.c_str(), O_CREAT | O_EXCL | O_WRONLY, 0664);
269+
if (snapshot_fd == -1)
269270
{
270-
// In the case that a file with this name already exists, keep
271-
// existing file and drop pending snapshot
272-
LOG_FAIL_FMT(
273-
"Cannot write snapshot as file already exists: {}", file_name);
274-
}
275-
else
276-
{
277-
std::ofstream snapshot_file(
278-
full_snapshot_path, std::ios::app | std::ios::binary);
279-
if (!snapshot_file.good())
271+
if (errno == EEXIST)
280272
{
273+
// In the case that a file with this name already exists, keep
274+
// existing file and drop pending snapshot
281275
LOG_FAIL_FMT(
282-
"Cannot write snapshot: error opening file {}", file_name);
276+
"Cannot write snapshot as file already exists: {}",
277+
file_name);
283278
}
284279
else
285280
{
286-
const auto& snapshot = it->second.snapshot;
287-
snapshot_file.write(
288-
reinterpret_cast<const char*>(snapshot->data()),
289-
snapshot->size());
290-
snapshot_file.write(
291-
reinterpret_cast<const char*>(receipt_data), receipt_size);
292-
293-
LOG_INFO_FMT(
294-
"New snapshot file written to {} [{} bytes]",
295-
file_name,
296-
static_cast<size_t>(snapshot_file.tellp()));
281+
LOG_FAIL_FMT(
282+
"Cannot write snapshot: error ({}) opening file {}",
283+
errno,
284+
file_name);
297285
}
298286
}
287+
else
288+
{
289+
const auto& snapshot = it->second.snapshot;
290+
291+
#define THROW_ON_ERROR(x) \
292+
do \
293+
{ \
294+
auto rc = x; \
295+
if (rc == -1) \
296+
{ \
297+
throw std::runtime_error(fmt::format( \
298+
"Error ({}) writing snapshot {} in " #x, errno, file_name)); \
299+
} \
300+
} while (0)
301+
302+
THROW_ON_ERROR(
303+
write(snapshot_fd, snapshot->data(), snapshot->size()));
304+
THROW_ON_ERROR(write(snapshot_fd, receipt_data, receipt_size));
305+
306+
THROW_ON_ERROR(fsync(snapshot_fd));
307+
THROW_ON_ERROR(close(snapshot_fd));
308+
309+
#undef THROW_ON_ERROR
310+
311+
LOG_INFO_FMT(
312+
"New snapshot file written to {} [{} bytes]",
313+
file_name,
314+
snapshot->size() + receipt_size);
315+
316+
// e.g. snapshot_100_105.committed
317+
const auto committed_file_name =
318+
fmt::format("{}{}", file_name, snapshot_committed_suffix);
319+
const auto full_committed_path =
320+
snapshot_dir / committed_file_name;
321+
322+
files::rename(full_snapshot_path, full_committed_path);
323+
LOG_INFO_FMT(
324+
"Renamed temporary snapshot {} to committed {}",
325+
file_name,
326+
committed_file_name);
327+
}
299328

300329
pending_snapshots.erase(it);
301330

tests/e2e_operations.py

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import infra.snp as snp
2323
from cryptography import x509
2424
from cryptography.hazmat.backends import default_backend
25+
import infra.concurrency
2526

2627
from loguru import logger as LOG
2728

@@ -49,14 +50,62 @@ def test_save_committed_ledger_files(network, args):
4950

5051

5152
def test_parse_snapshot_file(network, args):
52-
primary, _ = network.find_primary()
53-
network.txs.issue(network, number_txs=args.snapshot_tx_interval * 2)
54-
committed_snapshots_dir = network.get_committed_snapshots(primary)
55-
for snapshot in os.listdir(committed_snapshots_dir):
56-
with ccf.ledger.Snapshot(os.path.join(committed_snapshots_dir, snapshot)) as s:
57-
assert len(
58-
s.get_public_domain().get_tables()
59-
), "No public table in snapshot"
53+
class ReaderThread(infra.concurrency.StoppableThread):
54+
def __init__(self, network):
55+
super().__init__(name="reader")
56+
primary, _ = network.find_primary()
57+
self.snapshots_dir = os.path.join(
58+
primary.remote.remote.root,
59+
primary.remote.snapshots_dir_name,
60+
)
61+
62+
def run(self):
63+
seen = set()
64+
while not self.is_stopped():
65+
for snapshot in os.listdir(self.snapshots_dir):
66+
if (
67+
ccf.ledger.is_snapshot_file_committed(snapshot)
68+
and snapshot not in seen
69+
):
70+
seen.add(snapshot)
71+
with ccf.ledger.Snapshot(
72+
os.path.join(self.snapshots_dir, snapshot)
73+
) as s:
74+
assert len(
75+
s.get_public_domain().get_tables()
76+
), "No public table in snapshot"
77+
LOG.success(f"Successfully parsed snapshot: {snapshot}")
78+
LOG.info(f"Tested {len(seen)} snapshots")
79+
assert len(seen) > 0, "No snapshots seen, so this tested nothing"
80+
81+
class WriterThread(infra.concurrency.StoppableThread):
82+
def __init__(self, network, reader):
83+
super().__init__(name="writer")
84+
self.primary, _ = network.find_primary()
85+
self.member = network.consortium.get_any_active_member()
86+
self.reader = reader
87+
88+
def run(self):
89+
while not self.is_stopped() and self.reader.is_alive():
90+
self.member.update_ack_state_digest(self.primary)
91+
92+
reader_thread = ReaderThread(network)
93+
reader_thread.start()
94+
95+
writer_thread = WriterThread(network, reader_thread)
96+
writer_thread.start()
97+
98+
# When this test was added, the original failure was occurring 100% of the time within 0.5s.
99+
# This fix has been manually verified across multi-minute runs.
100+
# 5s is a plausible run-time in the CI, that should still provide convincing coverage.
101+
time.sleep(5)
102+
103+
writer_thread.stop()
104+
writer_thread.join()
105+
106+
reader_thread.stop()
107+
reader_thread.join()
108+
60109
return network
61110

62111

0 commit comments

Comments
 (0)