Skip to content

Commit 8ac7035

Browse files
Added manual snapshotting to create a new file tree with the current dataset
1 parent 44f92dc commit 8ac7035

File tree

8 files changed

+377
-2
lines changed

8 files changed

+377
-2
lines changed

Sources/CodableDatastore/Persistence/Disk Persistence/Datastore/DatastoreIndex.swift

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1498,3 +1498,77 @@ extension DiskPersistence.Datastore.Index {
14981498
return (manifest: manifest, removedPages: removedPages)
14991499
}
15001500
}
1501+
1502+
// MARK: - Snapshotting
1503+
1504+
extension DiskPersistence.Datastore.Index {
1505+
@discardableResult
1506+
func copy(
1507+
into newDatastore: DiskPersistence<ReadWrite>.Datastore,
1508+
rootObjectManifest: inout DatastoreRootManifest,
1509+
targetPageSize: Int
1510+
) async throws -> DiskPersistence<ReadWrite>.Datastore.Index {
1511+
let actualPageSize = min(max(targetPageSize, Configuration.minimumPageSize), Configuration.maximumPageSize) - DiskPersistence.Datastore.Page.headerSize
1512+
1513+
var newIndexManifest = DatastoreIndexManifest(
1514+
id: self.id.manifestID,
1515+
orderedPages: []
1516+
)
1517+
1518+
let stream = AsyncThrowingBackpressureStream { continuation in
1519+
try await self.forwardScanEntries(after: self.firstInsertionCursor) { entry in
1520+
try await continuation.yield(entry)
1521+
return true
1522+
}
1523+
}
1524+
1525+
var currentPageBlocks: [DatastorePageEntryBlock] = []
1526+
var remainingSpace = actualPageSize
1527+
1528+
var count = 0
1529+
1530+
for try await entry in stream {
1531+
count += 1
1532+
let blocks = entry.blocks(remainingPageSpace: remainingSpace, maxPageSpace: actualPageSize)
1533+
1534+
for block in blocks {
1535+
let encodedSize = block.encodedSize
1536+
if encodedSize > remainingSpace {
1537+
let newPage = DiskPersistence<ReadWrite>.Datastore.Page(
1538+
datastore: newDatastore,
1539+
id: .init(index: id, page: .init()),
1540+
blocks: currentPageBlocks
1541+
)
1542+
newIndexManifest.orderedPages.append(.added(newPage.id.page))
1543+
try await newPage.persistIfNeeded()
1544+
1545+
currentPageBlocks.removeAll(keepingCapacity: true)
1546+
remainingSpace = actualPageSize
1547+
}
1548+
1549+
remainingSpace -= encodedSize
1550+
currentPageBlocks.append(block)
1551+
}
1552+
}
1553+
1554+
if !currentPageBlocks.isEmpty {
1555+
let newPage = DiskPersistence<ReadWrite>.Datastore.Page(
1556+
datastore: newDatastore,
1557+
id: .init(index: id, page: .init()),
1558+
blocks: currentPageBlocks
1559+
)
1560+
newIndexManifest.orderedPages.append(.added(newPage.id.page))
1561+
try await newPage.persistIfNeeded()
1562+
}
1563+
1564+
rootObjectManifest.addedIndexes.insert(.init(id))
1565+
rootObjectManifest.addedIndexManifests.insert(.init(id))
1566+
let newIndex = DiskPersistence<ReadWrite>.Datastore.Index(
1567+
datastore: newDatastore,
1568+
id: id,
1569+
manifest: newIndexManifest
1570+
)
1571+
try await newIndex.persistIfNeeded()
1572+
return newIndex
1573+
}
1574+
}

Sources/CodableDatastore/Persistence/Disk Persistence/Datastore/DatastoreRootManifest.swift

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,14 @@ extension DatastoreRootManifest {
6666
case primary
6767
case direct(index: DatastoreIndexIdentifier)
6868
case secondary(index: DatastoreIndexIdentifier)
69+
70+
init(_ id: PersistenceDatastoreIndexID) {
71+
switch id {
72+
case .primary: self = .primary
73+
case .direct(let index, _): self = .direct(index: index)
74+
case .secondary(let index, _): self = .secondary(index: index)
75+
}
76+
}
6977
}
7078

7179
enum IndexManifestID: Codable, Hashable {

Sources/CodableDatastore/Persistence/Disk Persistence/Datastore/PersistenceDatastore.swift

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,3 +279,80 @@ extension DiskPersistence.Datastore {
279279

280280
var hasObservers: Bool { !observers.isEmpty }
281281
}
282+
283+
// MARK: - Snapshotting
284+
285+
extension DiskPersistence.Datastore {
286+
@discardableResult
287+
func copy(
288+
rootIdentifier: DatastoreRootIdentifier?,
289+
datastoreKey: DatastoreKey,
290+
into newSnapshot: Snapshot<ReadWrite>,
291+
iteration: inout SnapshotIteration,
292+
targetPageSize: Int
293+
) async throws -> DiskPersistence<ReadWrite>.Datastore {
294+
let newDatastore = DiskPersistence<ReadWrite>.Datastore(id: id, snapshot: newSnapshot)
295+
296+
/// Copy the datastore over.
297+
iteration.dataStores[datastoreKey] = SnapshotIteration.DatastoreInfo(key: datastoreKey, id: id, root: rootIdentifier)
298+
299+
/// Record the addition of a new datastore since it lives in a new location on disk.
300+
iteration.addedDatastores.insert(id)
301+
302+
/// Stop here if there are no roots for the datastore — there is nothing else to migrate.
303+
guard let datastoreRootIdentifier = rootIdentifier
304+
else { return newDatastore }
305+
306+
/// Record the addition of a new datastore since it lives in a new location on disk.
307+
iteration.addedDatastoreRoots.insert(DatastoreRootReference(
308+
datastoreID: id,
309+
datastoreRootID: datastoreRootIdentifier
310+
))
311+
312+
let rootObject = rootObject(for: datastoreRootIdentifier)
313+
let rootObjectManifest = try await rootObject.manifest
314+
var newRootObjectManifest = DatastoreRootManifest(
315+
id: rootObjectManifest.id,
316+
modificationDate: rootObjectManifest.modificationDate,
317+
descriptor: rootObjectManifest.descriptor,
318+
primaryIndexManifest: rootObjectManifest.primaryIndexManifest,
319+
directIndexManifests: rootObjectManifest.directIndexManifests,
320+
secondaryIndexManifests: rootObjectManifest.secondaryIndexManifests,
321+
addedIndexes: [],
322+
removedIndexes: [],
323+
addedIndexManifests: [],
324+
removedIndexManifests: []
325+
)
326+
327+
try await rootObject.primaryIndex.copy(
328+
into: newDatastore,
329+
rootObjectManifest: &newRootObjectManifest,
330+
targetPageSize: targetPageSize
331+
)
332+
333+
for index in try await rootObject.directIndexes.values {
334+
try await index.copy(
335+
into: newDatastore,
336+
rootObjectManifest: &newRootObjectManifest,
337+
targetPageSize: targetPageSize
338+
)
339+
}
340+
341+
for index in try await rootObject.secondaryIndexes.values {
342+
try await index.copy(
343+
into: newDatastore,
344+
rootObjectManifest: &newRootObjectManifest,
345+
targetPageSize: targetPageSize
346+
)
347+
}
348+
349+
let newRootObject = DiskPersistence<ReadWrite>.Datastore.RootObject(
350+
datastore: newDatastore,
351+
id: datastoreRootIdentifier,
352+
rootObject: newRootObjectManifest
353+
)
354+
try await newRootObject.persistIfNeeded()
355+
356+
return newDatastore
357+
}
358+
}

Sources/CodableDatastore/Persistence/Disk Persistence/DatedIdentifier.swift

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,21 @@ struct DatedIdentifierComponents {
7272
"\(hour)-\(minute)"
7373
}
7474

75+
var date: Date? {
76+
DateComponents(
77+
calendar: Calendar(identifier: .gregorian),
78+
timeZone: TimeZone(secondsFromGMT: 0),
79+
year: Int(year),
80+
month: Int(month),
81+
day: Int(day),
82+
hour: Int(hour),
83+
minute: Int(minute),
84+
second: Int(second),
85+
nanosecond: Int(millisecond).map { $0*1_000_000 }
86+
)
87+
.date
88+
}
89+
7590
static let size = 40
7691
}
7792

Sources/CodableDatastore/Persistence/Disk Persistence/DiskPersistence.swift

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -284,8 +284,16 @@ extension DiskPersistence {
284284

285285
extension DiskPersistence {
286286
/// Load the default snapshot from disk, or create an empty one if such a file does not exist.
287-
private func loadSnapshot(from storeInfo: StoreInfo) -> Snapshot<AccessMode> {
288-
let snapshotID = storeInfo.currentSnapshot ?? SnapshotIdentifier()
287+
///
288+
/// - Parameters:
289+
/// - storeInfo: The store infor to load from.
290+
/// - newSnapshotIdentifier: A new snapshot identifier to use if the store doesn't have one yet. If nil, a new one will be created automatically.
291+
/// - Returns: A snapshot to start using.
292+
private func loadSnapshot(
293+
from storeInfo: StoreInfo,
294+
newSnapshotIdentifier: SnapshotIdentifier? = nil
295+
) -> Snapshot<AccessMode> {
296+
let snapshotID = storeInfo.currentSnapshot ?? newSnapshotIdentifier ?? SnapshotIdentifier()
289297

290298
if let snapshot = snapshots[snapshotID] {
291299
return snapshot
@@ -378,6 +386,69 @@ extension DiskPersistence {
378386
try await updateCurrentSnapshot(accessor: escapingClosure).value
379387
}
380388
}
389+
390+
/// Create a new snapshot from the current snapshot the persistence is pointing to.
391+
///
392+
/// This method is temporarily publc — once all the components are public, you should use them directly before the next minor version.
393+
public func _takeSnapshot() async throws where AccessMode == ReadWrite {
394+
try await _takeSnapshot(newSnapshotIdentifier: nil)
395+
}
396+
397+
func _takeSnapshot(
398+
newSnapshotIdentifier: SnapshotIdentifier?
399+
) async throws where AccessMode == ReadWrite { // TODO: return new snapshot iteration
400+
let readSnapshot = try await currentSnapshot
401+
let newSnapshot = try await createSnapshot(from: readSnapshot, newSnapshotIdentifier: newSnapshotIdentifier)
402+
try await setCurrentSnapshot(snapshot: newSnapshot)
403+
}
404+
405+
/// Load the current snapshot the persistence is reading and writing to.
406+
var currentSnapshot: Snapshot<AccessMode> {
407+
// TODO: This should return a readonly snapshot, but we need to be able to make a read-only copy from the persistence first.
408+
get async throws {
409+
try await withStoreInfo { await loadSnapshot(from: $0) }
410+
}
411+
}
412+
413+
func setCurrentSnapshot(
414+
snapshot: Snapshot<ReadWrite>,
415+
dateUpdate: ModificationUpdate = .updateOnWrite
416+
) async throws where AccessMode == ReadWrite {
417+
try await withStoreInfo { storeInfo in
418+
guard snapshot.persistence === self
419+
else { throw DiskPersistenceError.wrongPersistence }
420+
421+
/// Update the store info with snapshot and modification date to use
422+
storeInfo.currentSnapshot = snapshot.id
423+
storeInfo.modificationDate = dateUpdate.modificationDate(for: storeInfo.modificationDate)
424+
}
425+
}
426+
427+
func loadSnapshot(id: SnapshotIdentifier) async throws -> Snapshot<ReadOnly> {
428+
preconditionFailure("Unimplemented")
429+
}
430+
431+
// var allSnapshots: AsyncStream<Snapshot<ReadOnly>> {
432+
// /// Crawl the `Snapshots` directory, and collect all the .snapshot folders. Return the manifest structure.
433+
// preconditionFailure("Unimplemented")
434+
// }
435+
436+
func createSnapshot(
437+
from snapshot: Snapshot<ReadWrite>, // TODO: Shouldn't need to be readwrite
438+
actionName: String? = nil,
439+
newSnapshotIdentifier: SnapshotIdentifier? = nil
440+
) async throws -> Snapshot<ReadWrite> where AccessMode == ReadWrite {
441+
let newSnapshot = try await snapshot.copy(
442+
into: self,
443+
actionName: actionName,
444+
newSnapshotIdentifier: newSnapshotIdentifier,
445+
targetPageSize: Configuration.defaultPageSize
446+
)
447+
448+
/// Save a reference to this new snapshot, and return it.
449+
snapshots[newSnapshot.id] = newSnapshot
450+
return newSnapshot
451+
}
381452
}
382453

383454
// MARK: - Persistence Creation

Sources/CodableDatastore/Persistence/Disk Persistence/DiskPersistenceError.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ public enum DiskPersistenceError: LocalizedError, Equatable {
3333
/// The persistence is read only and cannot be written to.
3434
case cannotWrite
3535

36+
/// The persistence of another object is not in common with the persistence being used.
37+
case wrongPersistence
38+
3639
public var errorDescription: String? {
3740
switch self {
3841
case .notFileURL:
@@ -49,6 +52,8 @@ public enum DiskPersistenceError: LocalizedError, Equatable {
4952
"The entry was in a format that could not be understood."
5053
case .cannotWrite:
5154
"The persistence is read only and cannot be written to."
55+
case .wrongPersistence:
56+
"The persistence of another object is not in common with the persistence being used."
5257
}
5358
}
5459
}

Sources/CodableDatastore/Persistence/Disk Persistence/Snapshot/Snapshot.swift

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,3 +340,59 @@ extension Snapshot {
340340
return (datastore, datastoreInfo.root)
341341
}
342342
}
343+
344+
// MARK: - Snapshotting
345+
346+
extension Snapshot {
347+
@discardableResult
348+
func copy(
349+
into persistence: DiskPersistence<ReadWrite>,
350+
actionName: String? = nil,
351+
newSnapshotIdentifier: SnapshotIdentifier? = nil,
352+
targetPageSize: Int
353+
) async throws -> Snapshot<ReadWrite> {
354+
try await readingManifest { manifest, iteration in
355+
/// Create a new snapshot and iteration to load data into
356+
let newSnapshot = Snapshot<ReadWrite>(id: newSnapshotIdentifier ?? SnapshotIdentifier(), persistence: persistence)
357+
358+
let creationDate = (try? newSnapshot.id.components)?.date ?? Date()
359+
var newIteration = SnapshotIteration(
360+
id: SnapshotIterationIdentifier(rawValue: newSnapshot.id.rawValue),
361+
creationDate: creationDate,
362+
precedingIteration: iteration.id,
363+
precedingSnapshot: id,
364+
successiveIterations: [],
365+
actionName: actionName,
366+
dataStores: [:],
367+
addedDatastores: [],
368+
removedDatastores: [],
369+
addedDatastoreRoots: [],
370+
removedDatastoreRoots: []
371+
)
372+
373+
/// Iterate through each datastore and copy the data over
374+
for (_, datastoreInfo) in iteration.dataStores {
375+
let (datastore, _) = await loadDatastore(for: datastoreInfo.key, from: iteration)
376+
try await datastore.copy(
377+
rootIdentifier: datastoreInfo.root,
378+
datastoreKey: datastoreInfo.key,
379+
into: newSnapshot,
380+
iteration: &newIteration,
381+
targetPageSize: targetPageSize
382+
)
383+
}
384+
385+
/// Create a new manifest with our written data.
386+
let newManifest = SnapshotManifest(
387+
id: newSnapshot.id,
388+
modificationDate: creationDate,
389+
currentIteration: newIteration.id
390+
)
391+
392+
/// Write the iteration and manifest records so the persistence is complete.
393+
try await newSnapshot.write(iteration: newIteration)
394+
try await newSnapshot.write(manifest: newManifest)
395+
return newSnapshot
396+
}
397+
}
398+
}

0 commit comments

Comments
 (0)