Skip to content

Commit 1e37390

Browse files
aazam-ghfdelbrayelleMilosPaunovic
authored
feat(aws.emrserverless): add EMR Serverless plugin (create, submit, delete) (#646)
* Initial commit - add serverless application creation * Added delete serverless application and start serverless job runs * updated package.info * test: add unit tests for new tasks * docs: add missing doc for properties --------- Co-authored-by: François Delbrayelle <[email protected]> Co-authored-by: Miloš Paunović <[email protected]>
1 parent 9aaf8f0 commit 1e37390

11 files changed

+761
-5
lines changed

Dockerfile

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
FROM kestra/kestra:latest-no-plugins
2+
3+
RUN mkdir -p /app/plugins
4+
5+
COPY build/libs/* /app/plugins/

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,12 @@ dependencies {
7474
api 'software.amazon.awssdk:ecr'
7575
api 'software.amazon.awssdk:netty-nio-client'
7676
api 'software.amazon.awssdk:emr'
77+
api 'software.amazon.awssdk:emrserverless'
7778
api 'software.amazon.awssdk:glue'
7879
api 'software.amazon.awssdk:cloudwatch'
7980
api 'software.amazon.awssdk:cloudformation'
8081
}
8182

82-
8383
/**********************************************************************************************************************\
8484
* Test
8585
**********************************************************************************************************************/
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package io.kestra.plugin.aws.emr;
2+
3+
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
4+
import io.kestra.core.runners.RunContext;
5+
import io.kestra.plugin.aws.AbstractConnection;
6+
import io.kestra.plugin.aws.ConnectionUtils;
7+
import lombok.*;
8+
import lombok.experimental.SuperBuilder;
9+
import software.amazon.awssdk.services.emrserverless.EmrServerlessClient;
10+
11+
12+
@SuperBuilder
13+
@ToString
14+
@EqualsAndHashCode(callSuper = true)
15+
@Getter
16+
@NoArgsConstructor
17+
public abstract class AbstractEmrServerlessTask extends AbstractConnection {
18+
protected EmrServerlessClient client(final RunContext runContext) throws IllegalVariableEvaluationException {
19+
final AwsClientConfig clientConfig = awsClientConfig(runContext);
20+
return ConnectionUtils.configureSyncClient(clientConfig, EmrServerlessClient.builder()).build();
21+
}
22+
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package io.kestra.plugin.aws.emr;
2+
3+
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
4+
import io.kestra.core.models.annotations.Example;
5+
import io.kestra.core.models.annotations.Plugin;
6+
import io.kestra.core.models.property.Property;
7+
import io.kestra.core.models.tasks.RunnableTask;
8+
import io.kestra.core.runners.RunContext;
9+
import io.swagger.v3.oas.annotations.media.Schema;
10+
import jakarta.validation.constraints.NotNull;
11+
import lombok.*;
12+
import lombok.experimental.SuperBuilder;
13+
import software.amazon.awssdk.services.emrserverless.EmrServerlessClient;
14+
import software.amazon.awssdk.services.emrserverless.model.*;
15+
16+
@SuperBuilder
17+
@ToString
18+
@EqualsAndHashCode
19+
@Getter
20+
@NoArgsConstructor
21+
@Schema(title = "Create an EMR Serverless application and immediately start a job run.")
22+
@Plugin(
23+
examples = {
24+
@Example(
25+
title = "Create an EMR Serverless app and start a Spark job",
26+
full = true,
27+
code = """
28+
id: create_and_run_emr_serverless
29+
namespace: company.team
30+
31+
tasks:
32+
- id: create_and_run
33+
type: io.kestra.plugin.aws.emr.CreateServerlessApplicationAndStartJob
34+
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
35+
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
36+
region: "eu-central-1"
37+
releaseLabel: "emr-7.0.0"
38+
applicationType: "SPARK"
39+
executionRoleArn: "arn:aws:iam::123456789012:role/EMRServerlessRole"
40+
jobName: "example-job"
41+
entryPoint: "s3://my-bucket/jobs/script.py"
42+
"""
43+
)
44+
}
45+
)
46+
public class CreateServerlessApplicationAndStartJob extends AbstractEmrServerlessTask implements RunnableTask<CreateServerlessApplicationAndStartJob.Output> {
47+
48+
@Schema(
49+
title = "The EMR release label to use for the application.",
50+
description = "For example, `emr-6.3.0` or `emr-7.0.0`."
51+
)
52+
@NotNull
53+
private Property<String> releaseLabel;
54+
55+
@Schema(
56+
title = "The type of application to create.",
57+
description = "Valid values are for instance `SPARK` and `HIVE`."
58+
)
59+
@NotNull
60+
private Property<String> applicationType;
61+
62+
@Schema(
63+
title = "The execution role ARN for the application.",
64+
description = "This role will be assumed by EMR Serverless to access AWS resources on your behalf."
65+
)
66+
@NotNull
67+
private Property<String> executionRoleArn;
68+
69+
@Schema(
70+
title = "The name of the job to start."
71+
)
72+
@NotNull
73+
private Property<String> jobName;
74+
75+
@Schema(
76+
title = "The entry point for the job.",
77+
description = "For `SPARK` applications, this is typically the S3 path to your main application file (e.g., a Python or JAR file). For `HIVE` applications, this is the Hive query to execute."
78+
)
79+
@NotNull
80+
private Property<String> entryPoint;
81+
82+
@Override
83+
public Output run(RunContext runContext) throws IllegalVariableEvaluationException {
84+
try (EmrServerlessClient client = this.client(runContext)) {
85+
String rReleaseLabel = runContext.render(releaseLabel).as(String.class).orElseThrow();
86+
String rApplicationType = runContext.render(applicationType).as(String.class).orElseThrow();
87+
String rExecutionRoleArn = runContext.render(executionRoleArn).as(String.class).orElseThrow();
88+
String rJobName = runContext.render(jobName).as(String.class).orElseThrow();
89+
String rEntryPoint = runContext.render(entryPoint).as(String.class).orElseThrow();
90+
91+
// 1. Create Application
92+
CreateApplicationResponse app = client.createApplication(CreateApplicationRequest.builder()
93+
.releaseLabel(rReleaseLabel)
94+
.type(rApplicationType)
95+
.build());
96+
97+
// 2. Start Job
98+
StartJobRunRequest.Builder jobBuilder = StartJobRunRequest.builder()
99+
.applicationId(app.applicationId())
100+
.executionRoleArn(rExecutionRoleArn)
101+
.name(rJobName);
102+
103+
if ("SPARK".equalsIgnoreCase(rApplicationType)) {
104+
jobBuilder.jobDriver(jd -> jd.sparkSubmit(ss -> ss.entryPoint(rEntryPoint)));
105+
} else if ("HIVE".equalsIgnoreCase(rApplicationType)) {
106+
jobBuilder.jobDriver(jd -> jd.hive(hd -> hd.query(rEntryPoint)));
107+
} else {
108+
throw new IllegalArgumentException("Unsupported application rApplicationType: " + rApplicationType);
109+
}
110+
111+
StartJobRunResponse job = client.startJobRun(jobBuilder.build());
112+
113+
runContext.logger().info("Created app {} and started job {}", app.applicationId(), job.jobRunId());
114+
115+
return Output.builder()
116+
.applicationId(app.applicationId())
117+
.jobRunId(job.jobRunId())
118+
.build();
119+
} catch (Exception e) {
120+
throw new RuntimeException("Failed to create EMR Serverless app and start job", e);
121+
}
122+
}
123+
124+
@Builder
125+
@Getter
126+
public static class Output implements io.kestra.core.models.tasks.Output {
127+
private final String applicationId;
128+
private final String jobRunId;
129+
}
130+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package io.kestra.plugin.aws.emr;
2+
3+
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
4+
import io.kestra.core.models.annotations.Example;
5+
import io.kestra.core.models.annotations.Plugin;
6+
import io.kestra.core.models.property.Property;
7+
import io.kestra.core.models.tasks.RunnableTask;
8+
import io.kestra.core.models.tasks.VoidOutput;
9+
import io.kestra.core.runners.RunContext;
10+
import io.swagger.v3.oas.annotations.media.Schema;
11+
import jakarta.validation.constraints.NotNull;
12+
import lombok.EqualsAndHashCode;
13+
import lombok.Getter;
14+
import lombok.NoArgsConstructor;
15+
import lombok.ToString;
16+
import lombok.experimental.SuperBuilder;
17+
import software.amazon.awssdk.services.emrserverless.EmrServerlessClient;
18+
19+
import java.util.List;
20+
21+
import static io.kestra.core.utils.Rethrow.throwConsumer;
22+
23+
@SuperBuilder
24+
@ToString
25+
@EqualsAndHashCode
26+
@Getter
27+
@NoArgsConstructor
28+
@Schema(
29+
title = "Delete one or multiple AWS EMR Serverless applications."
30+
)
31+
@Plugin(
32+
examples = {
33+
@Example(
34+
title = "Delete a couple of EMR Serverless applications providing their IDs.",
35+
full = true,
36+
code = """
37+
id: aws_emrserverless_delete_app
38+
namespace: company.team
39+
40+
tasks:
41+
- id: delete_app
42+
type: io.kestra.plugin.aws.emr.DeleteServerlessApplication
43+
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
44+
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
45+
region: "eu-central-1"
46+
applicationIds:
47+
- 00f123abc456xyz
48+
- 11g789def012uvw
49+
"""
50+
)
51+
}
52+
)
53+
public class DeleteServerlessApplication extends AbstractEmrServerlessTask implements RunnableTask<VoidOutput> {
54+
55+
@Schema(
56+
title = "Application IDs.",
57+
description = "List of EMR Serverless application IDs to delete."
58+
)
59+
@NotNull
60+
private Property<List<String>> applicationIds;
61+
62+
@Override
63+
public VoidOutput run(RunContext runContext) throws IllegalVariableEvaluationException {
64+
try (EmrServerlessClient client = this.client(runContext)) {
65+
List<String> rApplicationIds = runContext.render(this.applicationIds).asList(String.class);
66+
rApplicationIds.forEach(throwConsumer(appId ->
67+
client.deleteApplication(r -> r.applicationId(appId))
68+
));
69+
runContext.logger().info("Deleted {} EMR Serverless applications: {}", rApplicationIds.size(), rApplicationIds);
70+
return null;
71+
}
72+
}
73+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package io.kestra.plugin.aws.emr;
2+
3+
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
4+
import io.kestra.core.models.annotations.Example;
5+
import io.kestra.core.models.annotations.Plugin;
6+
import io.kestra.core.models.property.Property;
7+
import io.kestra.core.models.tasks.RunnableTask;
8+
import io.kestra.core.runners.RunContext;
9+
import io.swagger.v3.oas.annotations.media.Schema;
10+
import jakarta.validation.constraints.NotNull;
11+
import lombok.*;
12+
import lombok.experimental.SuperBuilder;
13+
import software.amazon.awssdk.services.emrserverless.EmrServerlessClient;
14+
import software.amazon.awssdk.services.emrserverless.model.StartJobRunRequest;
15+
import software.amazon.awssdk.services.emrserverless.model.StartJobRunResponse;
16+
17+
@SuperBuilder
18+
@ToString
19+
@EqualsAndHashCode
20+
@Getter
21+
@NoArgsConstructor
22+
@Schema(title = "Start a job run on an existing EMR Serverless application.")
23+
@Plugin(
24+
examples = {
25+
@Example(
26+
title = "Start EMR Serverless job",
27+
full = true,
28+
code = """
29+
id: start_emr_job
30+
namespace: company.team
31+
32+
tasks:
33+
- id: start_job
34+
type: io.kestra.plugin.aws.emr.StartServerlessJobRun
35+
accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
36+
secretKeyId: "{{ secret('AWS_SECRET_KEY_ID') }}"
37+
region: "eu-central-1"
38+
applicationId: "00f123abc456xyz"
39+
executionRoleArn: "arn:aws:iam::123456789012:role/EMRServerlessRole"
40+
jobName: "sample-spark-job"
41+
entryPoint: "s3://my-bucket/scripts/spark-app.py"
42+
jobDriver:
43+
sparkSubmit:
44+
entryPointArguments:
45+
- "--arg1"
46+
- "value1"
47+
"""
48+
)
49+
}
50+
)
51+
public class StartServerlessJobRun extends AbstractEmrServerlessTask implements RunnableTask<StartServerlessJobRun.Output> {
52+
53+
@Schema(
54+
title = "The EMR Serverless application ID to run the job on."
55+
)
56+
@NotNull
57+
private Property<String> applicationId;
58+
59+
@Schema(
60+
title = "The execution role ARN for the job."
61+
)
62+
@NotNull
63+
private Property<String> executionRoleArn;
64+
65+
@Schema(
66+
title = "The name of the job."
67+
)
68+
@NotNull
69+
private Property<String> jobName;
70+
71+
@Schema(
72+
title = "The entry point for the job."
73+
)
74+
@NotNull
75+
private Property<String> entryPoint;
76+
77+
@Override
78+
public Output run(RunContext runContext) throws IllegalVariableEvaluationException {
79+
try (EmrServerlessClient client = this.client(runContext)) {
80+
String rApplicationId = runContext.render(applicationId).as(String.class).orElseThrow();
81+
String rExecutionRoleArn = runContext.render(executionRoleArn).as(String.class).orElseThrow();
82+
String rJobName = runContext.render(jobName).as(String.class).orElseThrow();
83+
String rEntryPoint = runContext.render(entryPoint).as(String.class).orElseThrow();
84+
85+
StartJobRunRequest request = StartJobRunRequest.builder()
86+
.applicationId(rApplicationId)
87+
.executionRoleArn(rExecutionRoleArn)
88+
.name(rJobName)
89+
.jobDriver(builder -> builder.sparkSubmit(builder2 -> builder2.entryPoint(rEntryPoint)))
90+
.build();
91+
92+
StartJobRunResponse response = client.startJobRun(request);
93+
94+
runContext.logger().info("Started EMR Serverless job: {}", response.jobRunId());
95+
96+
return Output.builder()
97+
.jobRunId(response.jobRunId())
98+
.build();
99+
} catch (Exception e) {
100+
throw new RuntimeException("Failed to start EMR Serverless job", e);
101+
}
102+
}
103+
104+
@Builder
105+
@Getter
106+
public static class Output implements io.kestra.core.models.tasks.Output {
107+
@Schema(title = "Job Run ID")
108+
private final String jobRunId;
109+
}
110+
}
Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
@PluginSubGroup(
22
title = "EMR",
3-
description = "This sub-group of plugins contains tasks for using Amazon EMR.\n" +
4-
"Amazon EMR (previously called Amazon Elastic MapReduce) is a managed cluster platform that simplifies running big data frameworks, such as Apache Hadoop and Apache Spark, on AWS to process and analyze vast amounts of data.",
3+
description = "This sub-group of plugins contains tasks for using Amazon EMR and EMR Serverless.\n" +
4+
"Amazon EMR (Elastic MapReduce) is a managed big data platform for running frameworks such as Apache Hadoop and Apache Spark on AWS.\n" +
5+
"EMR Serverless provides a serverless option that lets you run Spark or Hive jobs without managing clusters.",
56
categories = {PluginSubGroup.PluginCategory.TOOL, PluginSubGroup.PluginCategory.CLOUD}
67
)
78
package io.kestra.plugin.aws.emr;
89

9-
import io.kestra.core.models.annotations.PluginSubGroup;
10+
import io.kestra.core.models.annotations.PluginSubGroup;

0 commit comments

Comments
 (0)