diff --git a/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/DataStoreObserveQueryTests.swift b/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/DataStoreObserveQueryTests.swift index 518c69d24c..301ec951d4 100644 --- a/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/DataStoreObserveQueryTests.swift +++ b/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/DataStoreObserveQueryTests.swift @@ -16,8 +16,6 @@ import Combine // swiftlint:disable type_body_length // swiftlint:disable file_length class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase { - - var cancellables = Set() struct TestModelRegistration: AmplifyModelRegistration { func registerModels(registry: ModelRegistry.Type) { @@ -41,8 +39,9 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase { await setUp(withModels: TestModelRegistration()) try startAmplify() try await clearDataStore() - let snapshotWithIsSynced = asyncExpectation(description: "query snapshot with isSynced true") - let querySnapshotsCancelled = asyncExpectation(description: "query snapshots cancelled") + let snapshotWithIsSynced = expectation(description: "query snapshot with isSynced true") + snapshotWithIsSynced.assertForOverFulfill = false + let querySnapshotsCancelled = expectation(description: "query snapshots cancelled") let querySnapshots = Amplify.DataStore.observeQuery(for: Post.self) let task = Task { var snapshots = [DataStoreQuerySnapshot]() @@ -50,7 +49,7 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase { for try await querySnapshot in querySnapshots { snapshots.append(querySnapshot) if querySnapshot.isSynced { - await snapshotWithIsSynced.fulfill() + snapshotWithIsSynced.fulfill() } } } catch { @@ -58,14 +57,12 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase { } XCTAssertTrue(snapshots.count >= 2) XCTAssertFalse(snapshots[0].isSynced) - await querySnapshotsCancelled.fulfill() + querySnapshotsCancelled.fulfill() } - let receivedPost = asyncExpectation(description: "received Post") - try await savePostAndWaitForSync(Post(title: "title", content: "content", createdAt: .now()), - postSyncedExpctation: receivedPost) - await waitForExpectations([snapshotWithIsSynced, receivedPost], timeout: 100) + try await savePostAndWaitForSync(Post(title: "title", content: "content", createdAt: .now())) + await fulfillment(of: [snapshotWithIsSynced], timeout: 100) task.cancel() - await waitForExpectations([querySnapshotsCancelled], timeout: 10) + await fulfillment(of: [querySnapshotsCancelled], timeout: 10) } /// ObserveQuery API will eventually return query snapshot with `isSynced` true @@ -78,11 +75,13 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase { /// - Eventually one of the query snapshots will be returned with `isSynced` true /// func testObserveQueryInitialSync() async throws { + var cancellables = Set() await setUp(withModels: TestModelRegistration()) try startAmplify() try await clearDataStore() var snapshots = [DataStoreQuerySnapshot]() - let snapshotWithIsSynced = asyncExpectation(description: "query snapshot with isSynced true") + let snapshotWithIsSynced = expectation(description: "query snapshot with isSynced true") + snapshotWithIsSynced.assertForOverFulfill = false Amplify.Publisher.create(Amplify.DataStore.observeQuery(for: Post.self)).sink { completed in switch completed { case .finished: @@ -93,13 +92,12 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase { } receiveValue: { querySnapshot in snapshots.append(querySnapshot) if querySnapshot.isSynced { - Task { await snapshotWithIsSynced.fulfill() } + snapshotWithIsSynced.fulfill() } }.store(in: &cancellables) - let receivedPost = asyncExpectation(description: "received Post") - try await savePostAndWaitForSync(Post(title: "title", content: "content", createdAt: .now()), - postSyncedExpctation: receivedPost) - await waitForExpectations([snapshotWithIsSynced, receivedPost], timeout: 100) + + try await savePostAndWaitForSync(Post(title: "title", content: "content", createdAt: .now())) + await fulfillment(of: [snapshotWithIsSynced], timeout: 100) XCTAssertTrue(snapshots.count >= 2) XCTAssertFalse(snapshots[0].isSynced) @@ -116,13 +114,14 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase { /// - A query snapshot is received on `.modelSynced` /// func testObserveQueryWhenModelSyncedEvent() async throws { + var cancellables = Set() await setUp(withModels: TestModelRegistration()) try startAmplify() try await clearDataStore() var snapshots = [DataStoreQuerySnapshot]() var isObserveQueryReadyForTest = false - let observeQueryReadyForTest = asyncExpectation(description: "received query snapshot with .isSynced true") - let snapshotWithPost = asyncExpectation(description: "received first snapshot") + let observeQueryReadyForTest = expectation(description: "received query snapshot with .isSynced true") + let snapshotWithPost = expectation(description: "received first snapshot") let post = Post(title: "title", content: "content", createdAt: .now()) Amplify.Publisher.create(Amplify.DataStore.observeQuery(for: Post.self)).sink { completed in switch completed { @@ -135,16 +134,16 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase { snapshots.append(querySnapshot) if !isObserveQueryReadyForTest && querySnapshot.isSynced { isObserveQueryReadyForTest = true - Task { await observeQueryReadyForTest.fulfill() } + observeQueryReadyForTest.fulfill() } if querySnapshot.items.contains(where: { $0.id == post.id }) { - Task { await snapshotWithPost.fulfill() } + snapshotWithPost.fulfill() } }.store(in: &cancellables) - await waitForExpectations([observeQueryReadyForTest], timeout: 100) + await fulfillment(of: [observeQueryReadyForTest], timeout: 100) _ = try await Amplify.DataStore.save(post) - await waitForExpectations([snapshotWithPost], timeout: 100) + await fulfillment(of: [snapshotWithPost], timeout: 100) } /// Apply a query predicate "title begins with 'xyz'" @@ -156,10 +155,10 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase { /// - The models only contain models based on the predicate /// func testInitialSyncWithPredicate() async throws { + var cancellables = Set() let startTime = Temporal.DateTime.now() await setUp( withModels: TestModelRegistration(), - logLevel: .verbose, dataStoreConfiguration: .custom( syncMaxRecords: 100, syncExpressions: [ @@ -182,9 +181,10 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase { try await clearDataStore() var snapshots = [DataStoreQuerySnapshot]() - let snapshotWithIsSynced = asyncExpectation(description: "query snapshot with isSynced true") + let snapshotWithIsSynced = expectation(description: "query snapshot with isSynced true") var snapshotWithIsSyncedFulfilled = false - let receivedPostFromObserveQuery = asyncExpectation(description: "received Post") + let receivedPostFromObserveQuery = expectation(description: "received Post") + receivedPostFromObserveQuery.assertForOverFulfill = false let post4 = Post(title: "\(randomTitle) 4", content: "content", createdAt: .now()) let predicate = Post.keys.title.beginsWith(randomTitle) Amplify.Publisher.create(Amplify.DataStore.observeQuery(for: Post.self, where: predicate)).sink { completed in @@ -198,21 +198,21 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase { snapshots.append(querySnapshot) if !snapshotWithIsSyncedFulfilled && querySnapshot.isSynced { snapshotWithIsSyncedFulfilled = true - Task { await snapshotWithIsSynced.fulfill() } + snapshotWithIsSynced.fulfill() } else if snapshotWithIsSyncedFulfilled { if querySnapshot.items.count >= 4 && querySnapshot.items.allSatisfy({ $0.title.contains(randomTitle)}) { - Task { await receivedPostFromObserveQuery.fulfill() } + receivedPostFromObserveQuery.fulfill() } } }.store(in: &cancellables) - await waitForExpectations([snapshotWithIsSynced], timeout: 100) + await fulfillment(of: [snapshotWithIsSynced], timeout: 100) try await savePostAndWaitForSync(post4) - await waitForExpectations([receivedPostFromObserveQuery], timeout: 100) + await fulfillment(of: [receivedPostFromObserveQuery], timeout: 100) XCTAssertTrue(snapshots.count >= 2) } @@ -227,12 +227,14 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase { /// - Each snapshot should have items sorted according to the sort order /// func testObserveQueryWithSort() async throws { + var cancellables = Set() await setUp(withModels: TestModelRegistration()) try startAmplify() try await clearDataStore() let post1 = Post(title: "title", content: "content", createdAt: .now()) let post2 = Post(title: "title", content: "content", createdAt: .now().add(value: 1, to: .second)) - let snapshotWithSavedPost = asyncExpectation(description: "query snapshot with saved post") + let snapshotWithSavedPost = expectation(description: "query snapshot with saved post") + snapshotWithSavedPost.assertForOverFulfill = false Amplify.Publisher.create(Amplify.DataStore.observeQuery(for: Post.self, sort: .ascending(Post.keys.createdAt))) .sink { completed in switch completed { @@ -250,13 +252,13 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase { let actualPost1 = items.removeLast() XCTAssertEqual(actualPost2.id, post2.id) XCTAssertEqual(actualPost1.id, post1.id) - Task { await snapshotWithSavedPost.fulfill() } + snapshotWithSavedPost.fulfill() } }.store(in: &cancellables) try await savePostAndWaitForSync(post1) try await savePostAndWaitForSync(post2) - await waitForExpectations([snapshotWithSavedPost], timeout: 100) + await fulfillment(of: [snapshotWithSavedPost], timeout: 100) } /// ObserveQuery with DataStore delta sync. Ensure datastore has synced the models and stopped. @@ -270,13 +272,14 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase { /// - The final snapshot should have all the models with `isSynced` true /// func testObserveQueryWithDataStoreDeltaSync() async throws { + var cancellables = Set() await setUp(withModels: TestModelRegistration()) try await startAmplifyAndWaitForReady() try await savePostAndWaitForSync(Post(title: "title", content: "content", createdAt: .now())) let numberOfPosts = try await queryNumberOfPosts() XCTAssertTrue(numberOfPosts > 0) try await stopDataStore() - let snapshotWithIsSynced = asyncExpectation(description: "query snapshot with isSynced true") + let snapshotWithIsSynced = expectation(description: "query snapshot with isSynced true") var snapshots = [DataStoreQuerySnapshot]() Amplify.Publisher.create(Amplify.DataStore.observeQuery(for: Post.self)).sink { completed in switch completed { @@ -288,13 +291,11 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase { } receiveValue: { querySnapshot in snapshots.append(querySnapshot) if querySnapshot.isSynced { - Task { await snapshotWithIsSynced.fulfill() } + snapshotWithIsSynced.fulfill() } }.store(in: &cancellables) - let receivedPost = asyncExpectation(description: "received Post") - try await savePostAndWaitForSync(Post(title: "title", content: "content", createdAt: .now()), - postSyncedExpctation: receivedPost) - await waitForExpectations([snapshotWithIsSynced, receivedPost], timeout: 30) + try await savePostAndWaitForSync(Post(title: "title", content: "content", createdAt: .now())) + await fulfillment(of: [snapshotWithIsSynced], timeout: 30) XCTAssertTrue(snapshots.count >= 2) XCTAssertFalse(snapshots[0].isSynced) XCTAssertTrue(snapshots.last!.isSynced) @@ -313,11 +314,13 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase { /// - The final snapshot should have all the latest models with `isSynced` true /// func testObserveQuery_withClearedDataStore_fullySyncedWithMaxRecords() async throws { + var cancellables = Set() await setUp(withModels: TestModelRegistration()) try await startAmplifyAndWaitForReady() try await clearDataStore() - let snapshotWithIsSynced = asyncExpectation(description: "query snapshot with isSynced true") + let snapshotWithIsSynced = expectation(description: "query snapshot with isSynced true") + snapshotWithIsSynced.assertForOverFulfill = false var snapshots = [DataStoreQuerySnapshot]() Amplify.Publisher.create(Amplify.DataStore.observeQuery(for: Post.self)).sink { completed in @@ -330,19 +333,17 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase { } receiveValue: { querySnapshot in snapshots.append(querySnapshot) if querySnapshot.isSynced { - Task { await snapshotWithIsSynced.fulfill() } + snapshotWithIsSynced.fulfill() } }.store(in: &cancellables) let newPost = Post(title: "title", content: "content", createdAt: .now()) - let receivedPost = asyncExpectation(description: "received Post") - try await savePostAndWaitForSync(newPost, - postSyncedExpctation: receivedPost) + try await savePostAndWaitForSync(newPost) - await waitForExpectations([snapshotWithIsSynced, receivedPost], timeout: 30) + await fulfillment(of: [snapshotWithIsSynced], timeout: 30) XCTAssertTrue(snapshots.count >= 2) XCTAssertFalse(snapshots[0].isSynced) - XCTAssertEqual(1, snapshots.filter({ $0.isSynced }).count) + XCTAssertTrue(snapshots.last!.isSynced) let theSyncedSnapshot = snapshots.first(where: { $0.isSynced }) XCTAssertNotNil(theSyncedSnapshot) @@ -363,100 +364,104 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase { /// - Delete a model that matches the predicate. Model is removed from the snapshot /// - Delete a model that does NOT match the predicate. No snapshot is emitted func testPredicateWithCreateUpdateDelete() async throws { - await setUp(withModels: TestModelRegistration(), logLevel: .verbose) + await setUp(withModels: TestModelRegistration()) try await startAmplifyAndWaitForReady() let testId = UUID().uuidString + let randomTitile = UUID().uuidString + let predicate = Post.keys.title.beginsWith(randomTitile) && Post.keys.content == testId + let snapshotExpectation1 = expectation(description: "received snapshot 1") + let snapshotExpectation23 = expectation(description: "received snapshot 2 / 3") + snapshotExpectation23.expectedFulfillmentCount = 2 + let snapshotExpectation4 = expectation(description: "received snapshot 4") + let snapshotExpectation56 = expectation(description: "received snapshot 5 / 6") + snapshotExpectation56.expectedFulfillmentCount = 2 + let snapshotExpectation7 = expectation(description: "received snapshot 7") + let snapshotExpectation8 = expectation(description: "received snapshot 8") + + var cancellables = Set() var snapshotCount = 0 - let predicate = Post.keys.title.beginsWith("xyz") && Post.keys.content == testId - let snapshotExpectation1 = asyncExpectation(description: "received snapshot 1") - let snapshotExpectation23 = asyncExpectation(description: "received snapshot 2 / 3", - expectedFulfillmentCount: 2) - let snapshotExpectation4 = asyncExpectation(description: "received snapshot 4") - let snapshotExpectation56 = asyncExpectation(description: "received snapshot 5 / 6", - expectedFulfillmentCount: 2) - let snapshotExpectation7 = asyncExpectation(description: "received snapshot 7") - let snapshotExpectation8 = asyncExpectation(description: "received snapshot 8") - Amplify.Publisher.create(Amplify.DataStore.observeQuery(for: Post.self, where: predicate)).sink { completed in - switch completed { - case .finished: - break - case .failure(let error): - XCTFail("\(error)") - } - } receiveValue: { querySnapshot in - snapshotCount += 1 - let items = querySnapshot.items - if snapshotCount == 1 { - self.log.info("\(#function) 1. \(querySnapshot)") - XCTAssertEqual(items.count, 0) - Task { await snapshotExpectation1.fulfill() } - } else if snapshotCount == 2 || snapshotCount == 3 { - // See (1), subsequent snapshot should have item with "xyz 1". - self.log.info("\(#function) 2/3. \(querySnapshot)") - XCTAssertEqual(items.count, 1) - XCTAssertEqual(items[0].title, "xyz 1") - Task { await snapshotExpectation23.fulfill() } - } else if snapshotCount == 4 { - // See (2), should not be added to the snapshot. - // See (3), should be removed from the snapshot. So the resulting snapshot is empty. - self.log.info("\(#function) 4. \(querySnapshot)") - XCTAssertEqual(items.count, 0) - Task { await snapshotExpectation4.fulfill() } - } else if snapshotCount == 5 || snapshotCount == 6 { - // See (4). the post that now matches the snapshot should be added - self.log.info("\(#function) 5/6. \(querySnapshot)") - XCTAssertEqual(items.count, 1) - XCTAssertEqual(items[0].title, "xyz 2") - Task { await snapshotExpectation56.fulfill() } - } else if snapshotCount == 7 { - // See (5). the post that matched the predicate was deleted - self.log.info("\(#function) 7. \(querySnapshot)") - XCTAssertEqual(items.count, 0) - Task { await snapshotExpectation7.fulfill() } - } else if snapshotCount == 8 { - // See (6). Snapshot that is emitted due to "xyz 3" should not contain the deleted model - self.log.info("\(#function) 8. \(querySnapshot)") - XCTAssertEqual(items.count, 1) - XCTAssertEqual(items[0].title, "xyz 3") - Task { await snapshotExpectation8.fulfill() } - } - }.store(in: &cancellables) - await waitForExpectations([snapshotExpectation1], timeout: 10) + Amplify.Publisher.create(Amplify.DataStore.observeQuery(for: Post.self, where: predicate)) + .sink(receiveCompletion: { completed in + switch completed { + case .finished: + break + case .failure(let error): + XCTFail("\(error)") + } + }, receiveValue: { querySnapshot in + snapshotCount += 1 + self.log.info("\(#function) XX.\(snapshotCount) \(querySnapshot)") + let items = querySnapshot.items + if snapshotCount == 1 { + self.log.info("\(#function) 1. \(querySnapshot)") + XCTAssertEqual(items.count, 0) + snapshotExpectation1.fulfill() + } else if snapshotCount == 2 || snapshotCount == 3 { + // See (1), subsequent snapshot should have item with "xyz 1". + self.log.info("\(#function) 2/3. \(querySnapshot)") + XCTAssertEqual(items.count, 1) + XCTAssertEqual(items[0].title, "\(randomTitile) 1") + snapshotExpectation23.fulfill() + } else if snapshotCount == 4 { + // See (2), should not be added to the snapshot. + // See (3), should be removed from the snapshot. So the resulting snapshot is empty. + self.log.info("\(#function) 4. \(querySnapshot)") + XCTAssertEqual(items.count, 0) + snapshotExpectation4.fulfill() + } else if snapshotCount == 5 || snapshotCount == 6 { + // See (4). the post that now matches the snapshot should be added + self.log.info("\(#function) 5/6. \(querySnapshot)") + XCTAssertEqual(items.count, 1) + XCTAssertEqual(items[0].title, "\(randomTitile) 2") + snapshotExpectation56.fulfill() + } else if snapshotCount == 7 { + // See (5). the post that matched the predicate was deleted + self.log.info("\(#function) 7. \(querySnapshot)") + XCTAssertEqual(items.count, 0) + snapshotExpectation7.fulfill() + } else if snapshotCount == 8 { + // See (6). Snapshot that is emitted due to "xyz 3" should not contain the deleted model + self.log.info("\(#function) 8. \(querySnapshot)") + XCTAssertEqual(items.count, 1) + XCTAssertEqual(items[0].title, "\(randomTitile) 3") + snapshotExpectation8.fulfill() + } + }) + .store(in: &cancellables) + + await fulfillment(of: [snapshotExpectation1], timeout: 10) // (1) Add model that matches predicate - should be received on the snapshot - let postMatchPredicate = Post(title: "xyz 1", content: testId, createdAt: .now()) + let postMatchPredicate = Post(title: "\(randomTitile) 1", content: testId, createdAt: .now()) try await savePostAndWaitForSync(postMatchPredicate) - await waitForExpectations([snapshotExpectation23], timeout: 10) + await fulfillment(of: [snapshotExpectation23], timeout: 10) // (2) Add model that does not match predicate - should not be received on the snapshot // (3) Update model that used to match the predicate to no longer match - should be removed from snapshot let postDoesNotMatch = Post(title: "doesNotMatch", content: testId, createdAt: .now()) - let postDoesNotMatchExpectation = asyncExpectation(description: "received postDoesNotMatchExpectation") - try await savePostAndWaitForSync(postDoesNotMatch, postSyncedExpctation: postDoesNotMatchExpectation) + try await savePostAndWaitForSync(postDoesNotMatch) var postMatchPredicateNoLongerMatches = postMatchPredicate postMatchPredicateNoLongerMatches.title = "doesNotMatch" try await savePostAndWaitForSync(postMatchPredicateNoLongerMatches) - await waitForExpectations([snapshotExpectation4], timeout: 10) + await fulfillment(of: [snapshotExpectation4], timeout: 10) // (4) Update model that does not match predicate to match - should be added to snapshot var postDoesNotMatchNowMatches = postDoesNotMatch - postDoesNotMatchNowMatches.title = "xyz 2" + postDoesNotMatchNowMatches.title = "\(randomTitile) 2" try await savePostAndWaitForSync(postDoesNotMatchNowMatches) - await waitForExpectations([snapshotExpectation56], timeout: 10) + await fulfillment(of: [snapshotExpectation56], timeout: 10) // (5) Delete the model that matches the predicate - should be removed try await deletePostAndWaitForSync(postDoesNotMatchNowMatches) - await waitForExpectations([snapshotExpectation7], timeout: 10) + await fulfillment(of: [snapshotExpectation7], timeout: 10) // (6) Delete the model that does not match predicate - should have no snapshot emitted - let postMatchPredicateNoLongerMatchesExpectation = asyncExpectation(description: " received") - try await deletePostAndWaitForSync(postMatchPredicateNoLongerMatches, - postSyncedExpctation: postMatchPredicateNoLongerMatchesExpectation) + try await deletePostAndWaitForSync(postMatchPredicateNoLongerMatches) // Save "xyz 3" to force a snapshot to be emitted - try await savePostAndWaitForSync(Post(title: "xyz 3", content: testId, createdAt: .now())) - await waitForExpectations([snapshotExpectation8], timeout: 10) + try await savePostAndWaitForSync(Post(title: "\(randomTitile) 3", content: testId, createdAt: .now())) + await fulfillment(of: [snapshotExpectation8], timeout: 10) } /// ObserveQuery is set up with a sort order. @@ -471,7 +476,7 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase { /// - Delete models. The snapshot should have the models removed /// func testSortWithCreateUpdateDelete() async throws { - await setUp(withModels: TestModelRegistration(), logLevel: .info) + await setUp(withModels: TestModelRegistration()) try await startAmplifyAndWaitForReady() let testId = UUID().uuidString @@ -573,30 +578,32 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase { /// - ObserveQuery is not completed. /// func testObserveQueryShouldResetOnDataStoreStop() async throws { + var cancellables = Set() await setUp(withModels: TestModelRegistration()) try await startAmplifyAndWaitForReady() - let firstSnapshotWithIsSynced = asyncExpectation(description: "query snapshot with isSynced true") + let firstSnapshotWithIsSynced = expectation(description: "query snapshot with isSynced true") var onComplete: ((Subscribers.Completion) -> Void) = { _ in } Amplify.Publisher.create(Amplify.DataStore.observeQuery(for: Post.self)) .sink { onComplete($0) } receiveValue: { querySnapshot in if querySnapshot.isSynced { - Task { await firstSnapshotWithIsSynced.fulfill() } + firstSnapshotWithIsSynced.fulfill() } }.store(in: &cancellables) - await waitForExpectations([firstSnapshotWithIsSynced], timeout: 10) + await fulfillment(of: [firstSnapshotWithIsSynced], timeout: 10) - let observeQueryReceivedCompleted = asyncExpectation(description: "observeQuery received completed", - isInverted: true) + let observeQueryReceivedCompleted = expectation(description: "observeQuery received completed") + observeQueryReceivedCompleted.isInverted = true + onComplete = { completed in switch completed { case .finished: - Task { await observeQueryReceivedCompleted.fulfill() } + observeQueryReceivedCompleted.fulfill() case .failure(let error): XCTFail("\(error)") } } try await Amplify.DataStore.stop() - await waitForExpectations([observeQueryReceivedCompleted], timeout: 10) + await fulfillment(of: [observeQueryReceivedCompleted], timeout: 10) } /// Ensure clearing datastore will not complete the observeQuery subscribers. @@ -608,109 +615,97 @@ class DataStoreObserveQueryTests: SyncEngineIntegrationTestBase { /// - ObserveQuery is not completed. /// func testObserveQueryShouldResetOnDataStoreClear() async throws { + var cancellables = Set() await setUp(withModels: TestModelRegistration()) try await startAmplifyAndWaitForReady() - let firstSnapshotWithIsSynced = asyncExpectation(description: "query snapshot with isSynced true") + let firstSnapshotWithIsSynced = expectation(description: "query snapshot with isSynced true") var onComplete: ((Subscribers.Completion) -> Void) = { _ in } Amplify.Publisher.create(Amplify.DataStore.observeQuery(for: Post.self)) .sink { onComplete($0) } receiveValue: { querySnapshot in if querySnapshot.isSynced { - Task { await firstSnapshotWithIsSynced.fulfill() } + firstSnapshotWithIsSynced.fulfill() } }.store(in: &cancellables) - await waitForExpectations([firstSnapshotWithIsSynced], timeout: 100) + await fulfillment(of: [firstSnapshotWithIsSynced], timeout: 100) - let observeQueryReceivedCompleted = asyncExpectation(description: "observeQuery received completed", - isInverted: true) + let observeQueryReceivedCompleted = expectation(description: "observeQuery received completed") + observeQueryReceivedCompleted.isInverted = true onComplete = { completed in switch completed { case .finished: - Task { await observeQueryReceivedCompleted.fulfill() } + observeQueryReceivedCompleted.fulfill() case .failure(let error): XCTFail("\(error)") } } try await Amplify.DataStore.clear() - await waitForExpectations([observeQueryReceivedCompleted], timeout: 10) + await fulfillment(of: [observeQueryReceivedCompleted], timeout: 10) } func testObserveQueryShouldStartOnDataStoreStart() async throws { - try await setUp(withModels: TestModelRegistration()) + await setUp(withModels: TestModelRegistration()) try await startAmplifyAndWaitForReady() - let firstSnapshot = asyncExpectation(description: "first query snapshot") - let secondSnapshot = asyncExpectation(description: "second query snapshot") - let observeQueryReceivedCompleted = asyncExpectation(description: "observeQuery received completed", isInverted: true) + let firstSnapshot = expectation(description: "first query snapshot") + let secondSnapshot = expectation(description: "second query snapshot") + let observeQueryReceivedCompleted = expectation(description: "observeQuery received completed") + observeQueryReceivedCompleted.isInverted = true var querySnapshots = [DataStoreQuerySnapshot]() let sink = Amplify.Publisher.create(Amplify.DataStore.observeQuery(for: Post.self)).sink { completed in switch completed { case .finished: - Task { await observeQueryReceivedCompleted.fulfill() } + observeQueryReceivedCompleted.fulfill() case .failure(let error): XCTFail("\(error)") } } receiveValue: { querySnapshot in querySnapshots.append(querySnapshot) if querySnapshots.count == 1 { - Task { await firstSnapshot.fulfill() } + firstSnapshot.fulfill() } else if querySnapshots.count == 2 { - Task { await secondSnapshot.fulfill() } + secondSnapshot.fulfill() } } - await waitForExpectations([firstSnapshot], timeout: 100) + await fulfillment(of: [firstSnapshot], timeout: 100) try await Amplify.DataStore.stop() - await waitForExpectations([observeQueryReceivedCompleted], timeout: 10) + await fulfillment(of: [observeQueryReceivedCompleted], timeout: 10) try await Amplify.DataStore.start() - await waitForExpectations([secondSnapshot], timeout: 10) + await fulfillment(of: [secondSnapshot], timeout: 10) sink.cancel() } // MARK: - Helpers - func savePostAndWaitForSync(_ post: Post, postSyncedExpctation: AsyncExpectation? = nil) async throws { - // Wait for a fulfillment count of 2 (first event due to the locally source mutation saved to the local store - // and the second event due to the subscription event received from the remote store) - let receivedPost = postSyncedExpctation ?? asyncExpectation(description: "received Post", - expectedFulfillmentCount: 2) - Task { - let mutationEvents = Amplify.DataStore.observe(Post.self) - do { - for try await mutationEvent in mutationEvents { - if mutationEvent.modelId == post.id { - await receivedPost.fulfill() - } - } - } catch { - XCTFail("Failed \(error)") + func savePostAndWaitForSync(_ post: Post) async throws { + var cancellable = Set() + let receivedPost = expectation(description: "received Post") + Amplify.Hub.publisher(for: .dataStore) + .filter { $0.eventName == HubPayload.EventName.DataStore.syncReceived } + .compactMap { $0.data as? MutationEvent } + .filter { [GraphQLMutationType.create.rawValue, GraphQLMutationType.update.rawValue].contains($0.mutationType) } + .filter { $0.modelId == post.id } + .sink { _ in + receivedPost.fulfill() } - } - + .store(in: &cancellable) + _ = try await Amplify.DataStore.save(post) - if postSyncedExpctation == nil { - await waitForExpectations([receivedPost], timeout: 100) - } + await fulfillment(of: [receivedPost], timeout: 60) } - func deletePostAndWaitForSync(_ post: Post, postSyncedExpctation: AsyncExpectation? = nil) async throws { - // Wait for a fulfillment count of 2 (first event due to the locally source mutation deleted from the local - // store and the second event due to the subscription event received from the remote store) - let deletedPost = postSyncedExpctation ?? asyncExpectation(description: "deleted Post", - expectedFulfillmentCount: 2) - Task { - let mutationEvents = Amplify.DataStore.observe(Post.self) - do { - for try await mutationEvent in mutationEvents { - if mutationEvent.modelId == post.id { - await deletedPost.fulfill() - } - } - } catch { - XCTFail("Failed \(error)") + func deletePostAndWaitForSync(_ post: Post) async throws { + var cancellable = Set() + let deletedPost = expectation(description: "deleted Post") + Amplify.Hub.publisher(for: .dataStore) + .filter { $0.eventName == HubPayload.EventName.DataStore.syncReceived } + .compactMap { $0.data as? MutationEvent } + .filter { $0.mutationType == GraphQLMutationType.delete.rawValue } + .filter { $0.modelId == post.id } + .sink { _ in + deletedPost.fulfill() } - } - + .store(in: &cancellable) + _ = try await Amplify.DataStore.delete(post) - if postSyncedExpctation == nil { - await waitForExpectations([deletedPost], timeout: 100) - } + await fulfillment(of: [deletedPost], timeout: 100) } func queryNumberOfPosts() async throws -> Int {