Skip to content

Conversation

capistrant
Copy link
Contributor

@capistrant capistrant commented Sep 30, 2025

Description

Expand on #18444 to cover more surface area. While putting saveLogs on a strict timer, helped prevent some issues with LogWatch interaction hanging, I have still seen issues in the path where KubernetesWorkItem calls shutdown which starts a LogWatch. Even the initialization of a LogWatch object can hang indefinitely.

Release note

new config for users of the kubernetes-overlord-extensions for K8s TaskRunner.

| `druid.indexer.runner.podLogOperationTimeout` | `Duration` | Timeout for async operations that interact with `k8s` pod logs | `PT300S` | NO |

Key changed/added classes in this PR
  • KubernetesPeonLifecycle

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@capistrant capistrant requested a review from kfaraz September 30, 2025 20:15
this.logSaveTimeoutMs = logSaveTimeoutMs;
this.logWatchInitializationTimeoutMs = logWatchInitializationTimeoutMs;
this.logWatchCopyLogTimeoutMs = logWatchCopyLogTimeoutMs;
this.logWatchExecutor = Executors.newSingleThreadExecutor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like the original approach of creating an ExecutorService on the fly when needed made more sense.
We don't really need this executor until we need to watch/copy over the logs, so there is no reason to have it take up unnecessary memory upfront.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack. I don't like either approach 😨 but it makes sense to not eagerly consume memory if the log watch stuff is involved in a small chunk of the overall peon lifecycle

| `druid.indexer.runner.capacity` | `Integer` | Number of concurrent jobs that can be sent to Kubernetes. | `2147483647` | No |
| `druid.indexer.runner.cpuCoreInMicro` | `Integer` | Number of CPU micro core for the task. | `1000` | No |
| `druid.indexer.runner.logSaveTimeout` | `Duration` | How long to wait for task logs to be saved before giving up. | `PT300S` | NO |
| `druid.indexer.runner.logWatchInitializationTimeout` | `Duration` | How long to wait when initializing a log watch for a peon pod before giving up. | `PT30S` | NO |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid this new config and just use the original one for both the purposes.
A default timeout of 5 minutes is good enough for both initializing the logWatch field as well as
downloading the logs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I generalized the single config name a bit then

…hem on demand

The idea is that these will only exist for short time per task, so creating them eagerly can be wasteful and cause memory pressure
Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing the feedback, @capistrant ! Left some final suggestions.

| `druid.indexer.runner.capacity` | `Integer` | Number of concurrent jobs that can be sent to Kubernetes. | `2147483647` | No |
| `druid.indexer.runner.cpuCoreInMicro` | `Integer` | Number of CPU micro core for the task. | `1000` | No |
| `druid.indexer.runner.logSaveTimeout` | `Duration` | How long to wait for task logs to be saved before giving up. | `PT300S` | NO |
| `druid.indexer.runner.podLogOperationTimeout` | `Duration` | Timeout for async operations that interact with `k8s` pod logs | `PT300S` | NO |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logOperationTimeout seems more ambiguous than logSaveTimeout.
Do you prefer this name since the same timeout is used for multiple operations like copy and log watch initialization?

I think it is fine to continue calling the config logSaveTimeout since both are related to the saving of logs.
Although, please update the description to call out exactly which operations use this timeout and what is the result of that timeout (i.e. will logs for that task ever be accessible).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I was over thinking it. I was concerned about doing the init under the same config as the save. But I suppose you are right that it is probably ok to just stick with it. I reverted and updated the doc with more info on what it is for.

@NotNull
// how long to wait for log saving operations to complete
private Period logSaveTimeout = new Period("PT300S");
private Period podLogOperationTimeout = new Period("PT300S");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we continue with the same config name, we can just revert the changes made to this and some other files.

logWatchOperationTimeoutMs, taskId.getOriginalTaskId());
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to mark the thread as interrupted?
Is this status read by the KubernetesTaskRunner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm. I guess this may just be force of habit to be defensive and do this whenever I catch an interrupted exception. Is there negatives to it even if the status isn't read?

}

private void doSaveLogs()
protected void saveLogs()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we might benefit from adding a common method which can be used both for logWatch init as well as saveLogs:

private <T> T executeWithTimeout(Callable<T> runnable, long timeoutMillis, String operationName)
{
    // Create the executor
    // Start it
    // Handle the exceptions
    // Finally shutdown the executor
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, I took a shot at this

@capistrant capistrant requested a review from kfaraz October 6, 2025 01:34
Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor suggestions on the log lines etc.
Approach makes sense. 👍🏻

Map<String, String> annotations,
Integer capacity,
Period taskJoinTimeout
Period taskJoinTimeout,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert the changes to this file since they are not needed anymore.
We can stick to the original order of the constructor args.

private final TaskStateListener stateListener;
private final SettableFuture<Boolean> taskStartedSuccessfullyFuture;
private final long logSaveTimeoutMs;
private final long logSaveTimeout;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Please continue using logSaveTimeoutMs or logSaveTimeoutMillis as it removes any ambiguity regarding the time unit.

Alternatively, you may pass in a Duration object and use the name logSaveTimeout,
but I don't think that is needed here.

log.warn("Operation [%s] timed out after %d ms for task [%s]. %s", operationName, timeoutMillis, taskId.getOriginalTaskId(), errorMessage);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@capistrant , regarding the previous discussion, I think we should avoid marking this thread as interrupted here since we are not reading/clearing this interrupted status anywhere which might have unintended side effects.

We can add it in the future if we need it.

@capistrant capistrant merged commit 2887e52 into apache:master Oct 7, 2025
60 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants