Skip to content

[FLINK-37630] Support flink cdc pipeline Yarn session mode #3989

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

Mrart
Copy link
Contributor

@Mrart Mrart commented Apr 15, 2025

Support flink cdc pipeline Yarn session mode

@joyCurry30
Copy link
Contributor

Thank you for your contribution. Could you please add the doc for this mode?

@Mrart Mrart marked this pull request as draft April 17, 2025 06:09
@Mrart
Copy link
Contributor Author

Mrart commented Apr 17, 2025

As discuss with @lvyanquan

  1. If applicationId is not specified, a session is created and the job is submitted
  2. Multiple flink jobs can be submitted if the applicationId is specified.
  3. 4.Make sure the pipelinejar package that the submitted job depends on can be uploaded

@Mrart Mrart marked this pull request as ready for review May 19, 2025 08:13
@Mrart Mrart requested a review from joyCurry30 May 19, 2025 09:04
Comment on lines +96 to +99
// If applicationId is passed, we get the state of yarn; if not, we create a session
// cluster.
String applicationId = flinkConfig.get(YarnConfigOptions.APPLICATION_ID);
if (applicationId != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we cannot determine whether the application_id does not exist or is a misspelled application_id. If it is a misspelled, a new session cluster will be generated each time it is provided

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add a simple validate method.

Copy link
Contributor

@joyCurry30 joyCurry30 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I left some comments. @Mrart

Comment on lines +67 to +70
if (flinkConfig.get(PipelineOptions.JARS) == null) {
flinkConfig.set(
PipelineOptions.JARS, Collections.singletonList(getFlinkCDCDistJarFromEnv()));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If "JARS" not null, do we need to add the flink cdc dist into JARS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As application mode it seem this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants