Skip to content

Commit 266c872

Browse files
Tasks - keep tasks after completion (#10044)
* Introduce task retention Keep completed tasks around for X time (setting based). Maintain previous behaviour for gds.listProgress and Aura Shutdown/Backup * Fix tests * Rename task ttl to retentionPeriod Co-authored-by: Sören Reichardt <[email protected]> * Fix test on rebase * Reuse logic to wait for taskStore to get cleaned up We can be a bit relaxed on this side-effect check. As long as it happens its fine. Avoiding race conditions for instance on cancel() * Fix resultStore and exportJobId order * Remove accidentially commited test * Format code * Document setting also for prod deployment * Define export and resultStore jobId for v2 endpoints even graphstore exports go through the resultStore --------- Co-authored-by: Sören Reichardt <[email protected]>
1 parent 34faa99 commit 266c872

File tree

75 files changed

+687
-240
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

75 files changed

+687
-240
lines changed

algo-common/src/main/java/org/neo4j/gds/result/CommunityStatistics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ public static CommunityStats communityStats(
188188
concurrency
189189
);
190190
}
191-
} catch (Exception e){
191+
} catch (Exception e) {
192192
if (e.getMessage().contains("is out of bounds for histogram, current covered range")) {
193193
return new CommunityStats(0,Optional.empty(), computeMilliseconds.get(), false);
194194
} else {

algo/src/test/java/org/neo4j/gds/traversal/RandomWalkTest.java

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.junit.jupiter.api.Test;
2626
import org.junit.jupiter.params.ParameterizedTest;
2727
import org.junit.jupiter.params.provider.ValueSource;
28+
import org.neo4j.gds.TaskStoreHelper;
2829
import org.neo4j.gds.TestProgressTracker;
2930
import org.neo4j.gds.TestSupport;
3031
import org.neo4j.gds.applications.algorithms.machinery.RequestScopedDependencies;
@@ -48,15 +49,12 @@
4849
import org.neo4j.gds.logging.GdsTestLog;
4950
import org.neo4j.gds.termination.TerminationFlag;
5051

51-
import java.time.Instant;
52-
import java.time.temporal.ChronoUnit;
52+
import java.time.Duration;
5353
import java.util.Arrays;
5454
import java.util.HashMap;
5555
import java.util.List;
5656
import java.util.Optional;
5757
import java.util.TreeSet;
58-
import java.util.concurrent.TimeUnit;
59-
import java.util.concurrent.locks.LockSupport;
6058
import java.util.stream.Collectors;
6159

6260
import static org.assertj.core.api.Assertions.assertThat;
@@ -516,7 +514,7 @@ void progressLogging() throws InterruptedException {
516514
.build();
517515

518516
var log = new GdsTestLog();
519-
var taskStore = new PerDatabaseTaskStore();
517+
var taskStore = new PerDatabaseTaskStore(Duration.ZERO);
520518

521519
var requestScopedDependencies = RequestScopedDependencies.builder()
522520
.taskRegistryFactory(TaskRegistryFactory.local("rw", taskStore))
@@ -538,7 +536,7 @@ void progressLogging() throws InterruptedException {
538536
assertThat(randomWalksStream).hasSize(5000);
539537
});
540538

541-
awaitEmptyTaskStore(taskStore);
539+
TaskStoreHelper.awaitEmptyTaskStore(taskStore);
542540

543541
assertThat(log.getMessages(TestLog.INFO))
544542
.extracting(removingThreadId())
@@ -571,7 +569,7 @@ void shouldLogProgressOnWeightedGraph() {
571569
.build();
572570

573571
var log = new GdsTestLog();
574-
var taskStore = new PerDatabaseTaskStore();
572+
var taskStore = new PerDatabaseTaskStore(Duration.ZERO);
575573

576574
var requestScopedDependencies = RequestScopedDependencies.builder()
577575
.taskRegistryFactory(TaskRegistryFactory.local("rw", taskStore))
@@ -592,7 +590,7 @@ void shouldLogProgressOnWeightedGraph() {
592590
assertThat(randomWalksStream).hasSize(5000);
593591
});
594592

595-
awaitEmptyTaskStore(taskStore);
593+
TaskStoreHelper.awaitEmptyTaskStore(taskStore);
596594

597595
assertThat(log.getMessages(TestLog.INFO))
598596
.extracting(removingThreadId())
@@ -615,9 +613,9 @@ void shouldLogProgressOnWeightedGraph() {
615613
}
616614

617615
@Test
618-
void shouldLeaveNoTasksBehind() {
616+
void shouldLeaveNoOngoingTasksBehind() {
619617
var config = RandomWalkStreamConfigImpl.builder().build();
620-
var taskStore = new PerDatabaseTaskStore();
618+
var taskStore = new PerDatabaseTaskStore(Duration.ZERO);
621619

622620
var requestScopedDependencies = RequestScopedDependencies.builder()
623621
.taskRegistryFactory(TaskRegistryFactory.local("rw", taskStore))
@@ -638,26 +636,11 @@ void shouldLeaveNoTasksBehind() {
638636
//noinspection ResultOfMethodCallIgnored
639637
result.count();
640638

641-
awaitEmptyTaskStore(taskStore);
639+
TaskStoreHelper.awaitEmptyTaskStore(taskStore);
642640

643641
// the task store should now be empty
644-
assertThat(taskStore.isEmpty()).isTrue();
642+
assertThat(taskStore.queryRunning()).isEmpty();
645643
}
646644

647-
private void awaitEmptyTaskStore(PerDatabaseTaskStore taskStore) {
648-
// there is a race condition between the thread consuming the result,
649-
// and the thread scheduled to end the last subtask
650-
long timeoutInSeconds = 5 * (TestSupport.CI ? 5 : 1);
651-
var deadline = Instant.now().plus(timeoutInSeconds, ChronoUnit.SECONDS);
652-
653-
// On my machine (TM) with 1000 iterations this never fails and each run is 10ish or 100ish ms
654-
while (Instant.now().isBefore(deadline)) {
655-
if (taskStore.isEmpty()) {
656-
break;
657-
}
658-
659-
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
660-
}
661-
}
662645
}
663646
}

applications/algorithms/machinery/src/main/java/org/neo4j/gds/applications/algorithms/machinery/Neo4jDatabaseNodePropertyWriter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,9 @@ static NodePropertiesWritten writeNodeProperties(
168168
try {
169169
exporter.write(nodeProperties);
170170
propertiesWritten.setValue(exporter.propertiesWritten());
171+
} catch (Exception e) {
172+
progressTracker.endSubTaskWithFailure();
173+
throw e;
171174
} finally {
172175
progressTracker.release();
173176
}

applications/algorithms/machinery/src/main/java/org/neo4j/gds/applications/algorithms/machinery/NodePropertyWriter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,10 @@ public NodePropertiesWritten writeNodeProperties(
9696

9797
try {
9898
return writeNodeProperties(nodePropertyExporter, nodeProperties);
99-
} finally {
99+
} catch (Exception e) {
100+
progressTracker.endSubTaskWithFailure();
101+
throw e;
102+
}finally {
100103
progressTracker.release();
101104
}
102105
}

applications/algorithms/node-embeddings/src/test/java/org/neo4j/gds/applications/algorithms/embeddings/NodeEmbeddingAlgorithmsGraphSageTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.neo4j.gds.logging.GdsTestLog;
3535
import org.neo4j.gds.termination.TerminationFlag;
3636

37+
import java.time.Duration;
3738
import java.util.List;
3839

3940
import static org.assertj.core.api.Assertions.assertThat;
@@ -76,7 +77,7 @@ void testLogProgressForGraphSageTrain() {
7677
GraphSageTrainTask.create(graph, config),
7778
config.username(),
7879
config.jobId(),
79-
new PerDatabaseTaskStore(),
80+
new PerDatabaseTaskStore(Duration.ofMinutes(1)),
8081
new LoggerForProgressTrackingAdapter(log)
8182
);
8283

applications/graph-store-catalog/src/main/java/org/neo4j/gds/applications/graphstorecatalog/WriteNodePropertiesApplication.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,11 +146,12 @@ private static long writeNodeProperties(
146146
.collect(Collectors.toList());
147147

148148
exporter.write(writeNodeProperties);
149+
progressTracker.endSubTask();
149150

150151
return exporter.propertiesWritten();
151-
152-
} finally {
153-
progressTracker.endSubTask();
152+
} catch (Exception e) {
153+
progressTracker.endSubTaskWithFailure();
154+
throw e;
154155
}
155156
}
156157
}

applications/graph-store-catalog/src/main/java/org/neo4j/gds/applications/graphstorecatalog/WriteRelationshipsApplication.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ WriteRelationshipResult compute(
9494
// result
9595
return builder.build();
9696
} catch (RuntimeException e) {
97+
progressTracker.endSubTaskWithFailure();
9798
loggers.log().warn("Writing relationships failed", e);
9899
throw e;
99100
}

applications/operations/src/main/java/org/neo4j/gds/applications/operations/OperationsApplications.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public Stream<UserTask> listProgress() {
6464

6565
if (user.isAdmin()) return taskStore.query();
6666

67+
// TODO only fetch ongoing tasks here and name listProgressOngoing
68+
// (add listProgress which fetches all tasks)
6769
return taskStore.query(user.getUsername());
6870
}
6971

core-write/src/main/java/org/neo4j/gds/core/write/NativeNodeLabelExporter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,10 @@ public void write(String nodeLabel) {
8686
} else {
8787
writeSequential(nodeLabelToken);
8888
}
89-
} finally {
9089
progressTracker.endSubTask();
90+
} catch (Exception e) {
91+
progressTracker.endSubTaskWithFailure();
92+
throw e;
9193
}
9294
}
9395

core-write/src/main/java/org/neo4j/gds/core/write/NativeNodePropertyExporter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,10 @@ public void write(Collection<NodeProperty> nodeProperties) {
122122
} else {
123123
writeSequential(resolvedNodeProperties);
124124
}
125-
} finally {
126125
progressTracker.endSubTask();
126+
} catch (Exception e) {
127+
progressTracker.endSubTaskWithFailure();
128+
throw e;
127129
}
128130
}
129131

0 commit comments

Comments
 (0)