@@ -872,8 +872,7 @@ protected boolean preserveSearchableSnapshotsIndicesUponCompletion() {
872
872
}
873
873
874
874
private void wipeCluster () throws Exception {
875
- logger .info ("Waiting for all cluster updates up to this moment to be processed" );
876
- assertOK (adminClient ().performRequest (new Request ("GET" , "_cluster/health?wait_for_events=languid" )));
875
+ waitForClusterUpdates ();
877
876
878
877
// Cleanup rollup before deleting indices. A rollup job might have bulks in-flight,
879
878
// so we need to fully shut them down first otherwise a job might stall waiting
@@ -1039,6 +1038,38 @@ private void wipeCluster() throws Exception {
1039
1038
deleteAllNodeShutdownMetadata ();
1040
1039
}
1041
1040
1041
+ private void waitForClusterUpdates () throws Exception {
1042
+ logger .info ("Waiting for all cluster updates up to this moment to be processed" );
1043
+ try {
1044
+ assertOK (adminClient ().performRequest (new Request ("GET" , "_cluster/health?wait_for_events=languid" )));
1045
+ } catch (ResponseException e ) {
1046
+ if (e .getResponse ().getStatusLine ().getStatusCode () == HttpStatus .SC_REQUEST_TIMEOUT ) {
1047
+ final var pendingTasks = getPendingClusterStateTasks ();
1048
+ if (pendingTasks != null ) {
1049
+ logger .error ("Timed out waiting for cluster updates to be processed, {}" , pendingTasks );
1050
+ }
1051
+ }
1052
+ throw e ;
1053
+ }
1054
+ }
1055
+
1056
+ private static String getPendingClusterStateTasks () {
1057
+ try {
1058
+ Response response = adminClient ().performRequest (new Request ("GET" , "/_cluster/pending_tasks" ));
1059
+ List <?> tasks = (List <?>) entityAsMap (response ).get ("tasks" );
1060
+ if (false == tasks .isEmpty ()) {
1061
+ StringBuilder message = new StringBuilder ("there are still running tasks:" );
1062
+ for (Object task : tasks ) {
1063
+ message .append ('\n' ).append (task .toString ());
1064
+ }
1065
+ return message .toString ();
1066
+ }
1067
+ } catch (IOException e ) {
1068
+ fail (e , "Failed to retrieve pending tasks in the cluster during cleanup" );
1069
+ }
1070
+ return null ;
1071
+ }
1072
+
1042
1073
/**
1043
1074
* This method checks whether ILM policies or templates get recreated after they have been deleted. If so, we are probably deleting
1044
1075
* them unnecessarily, potentially causing test performance problems. This could happen for example if someone adds a new standard ILM
@@ -1488,18 +1519,9 @@ private void logIfThereAreRunningTasks() throws IOException {
1488
1519
*/
1489
1520
private static void waitForClusterStateUpdatesToFinish () throws Exception {
1490
1521
assertBusy (() -> {
1491
- try {
1492
- Response response = adminClient ().performRequest (new Request ("GET" , "/_cluster/pending_tasks" ));
1493
- List <?> tasks = (List <?>) entityAsMap (response ).get ("tasks" );
1494
- if (false == tasks .isEmpty ()) {
1495
- StringBuilder message = new StringBuilder ("there are still running tasks:" );
1496
- for (Object task : tasks ) {
1497
- message .append ('\n' ).append (task .toString ());
1498
- }
1499
- fail (message .toString ());
1500
- }
1501
- } catch (IOException e ) {
1502
- fail ("cannot get cluster's pending tasks: " + e .getMessage ());
1522
+ final var pendingTasks = getPendingClusterStateTasks ();
1523
+ if (pendingTasks != null ) {
1524
+ fail (pendingTasks );
1503
1525
}
1504
1526
}, 30 , TimeUnit .SECONDS );
1505
1527
}
0 commit comments