Skip to content

Commit a9cb0a9

Browse files
committed
session and submiit
1 parent 7b07879 commit a9cb0a9

File tree

2 files changed

+64
-9
lines changed

2 files changed

+64
-9
lines changed

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/YarnApplicationDeploymentExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public PipelineExecution.ExecutionInfo deploy(
101101
if (client != null) {
102102
client.shutDownCluster();
103103
}
104-
throw new RuntimeException("Failed to deploy Flink CDC job", e);
104+
throw new RuntimeException("Failed to yarn application deploy Flink CDC job", e);
105105
} finally {
106106
descriptor.close();
107107
if (client != null) {

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/YarnSessionDeploymentExecutor.java

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,15 @@
2222
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
2323
import org.apache.flink.client.program.ClusterClient;
2424
import org.apache.flink.client.program.ClusterClientProvider;
25+
import org.apache.flink.client.program.PackagedProgram;
26+
import org.apache.flink.client.program.PackagedProgramUtils;
2527
import org.apache.flink.configuration.Configuration;
2628
import org.apache.flink.configuration.PipelineOptions;
2729
import org.apache.flink.core.fs.FSDataInputStream;
2830
import org.apache.flink.core.fs.FileSystem;
2931
import org.apache.flink.core.fs.Path;
32+
import org.apache.flink.runtime.jobgraph.JobGraph;
33+
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
3034
import org.apache.flink.yarn.YarnClusterClientFactory;
3135
import org.apache.flink.yarn.YarnClusterDescriptor;
3236
import org.apache.flink.yarn.configuration.YarnConfigOptions;
@@ -38,10 +42,12 @@
3842

3943
import org.apache.commons.cli.CommandLine;
4044
import org.apache.hadoop.yarn.api.records.ApplicationId;
45+
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
4146
import org.slf4j.Logger;
4247
import org.slf4j.LoggerFactory;
4348

4449
import java.io.File;
50+
import java.util.ArrayList;
4551
import java.util.Collections;
4652
import java.util.List;
4753
import java.util.stream.Collectors;
@@ -87,23 +93,72 @@ public PipelineExecution.ExecutionInfo deploy(
8793

8894
ClusterClient<ApplicationId> client = null;
8995
try {
90-
ClusterClientProvider<ApplicationId> clusterClientProvider =
91-
descriptor.deploySessionCluster(specification);
92-
client = clusterClientProvider.getClusterClient();
93-
ApplicationId clusterId = client.getClusterId();
94-
LOG.info("Deployment Flink CDC From Cluster ID {}", clusterId);
95-
return new PipelineExecution.ExecutionInfo(
96-
clusterId.toString(), "submit job successful");
96+
// If applicationId is passed, we get the state of yarn; if not, we create a session
97+
// cluster.
98+
String applicationId = flinkConfig.get(YarnConfigOptions.APPLICATION_ID);
99+
if (applicationId != null) {
100+
FinalApplicationStatus applicationStatus =
101+
descriptor
102+
.getYarnClient()
103+
.getApplicationReport(ApplicationId.fromString(applicationId))
104+
.getFinalApplicationStatus();
105+
if (FinalApplicationStatus.UNDEFINED.equals(applicationStatus)) {
106+
// applicationId is running.
107+
client =
108+
descriptor
109+
.retrieve(ApplicationId.fromString(applicationId))
110+
.getClusterClient();
111+
}
112+
} else {
113+
ClusterClientProvider<ApplicationId> clusterClientProvider =
114+
descriptor.deploySessionCluster(specification);
115+
client = clusterClientProvider.getClusterClient();
116+
applicationId = String.valueOf(client.getClusterId());
117+
}
118+
LOG.info("Deployment Flink CDC From application ID {}", applicationId);
119+
// how to get jobGraph
120+
assert client != null;
121+
client.submitJob(getJobGraph(flinkConfig, 1));
122+
123+
return new PipelineExecution.ExecutionInfo(applicationId, "submit job successful");
97124
} catch (Exception e) {
98125
if (client != null) {
99126
client.shutDownCluster();
100127
}
101-
throw new RuntimeException("Failed to deploy Flink CDC job", e);
128+
throw new RuntimeException("Failed to yarn session deploy Flink CDC job", e);
102129
} finally {
103130
descriptor.close();
104131
if (client != null) {
105132
client.close();
106133
}
107134
}
108135
}
136+
137+
/** Get jobGraph from configuration. */
138+
private JobGraph getJobGraph(Configuration configuration, int parallelism) throws Exception {
139+
SavepointRestoreSettings savepointRestoreSettings =
140+
SavepointRestoreSettings.fromConfiguration(configuration);
141+
PackagedProgram.Builder builder =
142+
PackagedProgram.newBuilder()
143+
.setSavepointRestoreSettings(savepointRestoreSettings)
144+
.setEntryPointClassName(
145+
configuration
146+
.getOptional(
147+
ApplicationConfiguration.APPLICATION_MAIN_CLASS)
148+
.get())
149+
.setArguments(
150+
configuration
151+
.getOptional(ApplicationConfiguration.APPLICATION_ARGS)
152+
.orElse(new ArrayList<>())
153+
.toArray(new String[] {}))
154+
.setJarFile(
155+
new File(
156+
configuration
157+
.getOptional(PipelineOptions.JARS)
158+
.orElse(new ArrayList<>())
159+
.get(0)));
160+
PackagedProgram program = builder.build();
161+
return PackagedProgramUtils.createJobGraph(
162+
program, configuration, parallelism, null, false);
163+
}
109164
}

0 commit comments

Comments
 (0)