Skip to content

Commit a74d17a

Browse files
authored
[GOBBLIN-2051] Rework FlowLaunchHandler, DagActionStore, and related classes for clarity (#3927)
Rework `FlowLaunchHandler`, `DagActionStore`, and related class' javadoc and method naming for clarity
1 parent 36cdf2b commit a74d17a

File tree

11 files changed

+109
-94
lines changed

11 files changed

+109
-94
lines changed

gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ public void scheduleJob(Properties jobProps, JobListener jobListener, Map<String
395395

396396
try {
397397
// Schedule the Quartz job with a trigger built from the job configuration
398-
Trigger trigger = createTriggerForJob(job.getKey(), jobProps, Optional.absent());
398+
Trigger trigger = createTriggerForJob(job.getKey(), jobProps, java.util.Optional.empty());
399399
this.scheduler.getScheduler().scheduleJob(job, trigger);
400400
logNewlyScheduledJob(job, trigger);
401401
} catch (SchedulerException se) {
@@ -585,11 +585,11 @@ public void close() throws IOException {
585585
* Get a {@link org.quartz.Trigger} from the given job configuration properties. If triggerSuffix is provided, appends
586586
* it to the end of the flow name. The suffix is used to add multiple unique triggers associated with the same job
587587
*/
588-
public static Trigger createTriggerForJob(JobKey jobKey, Properties jobProps, Optional<String> triggerSuffix) {
588+
public static Trigger createTriggerForJob(JobKey jobKey, Properties jobProps, java.util.Optional<String> triggerSuffix) {
589589
// Build a trigger for the job with the given cron-style schedule
590590
return TriggerBuilder.newTrigger()
591591
.withIdentity(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)
592-
+ triggerSuffix.transform(s -> "_" + s).or(""),
592+
+ triggerSuffix.map(s -> "_" + s).orElse(""),
593593
Strings.nullToEmpty(jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY)))
594594
.forJob(jobKey)
595595
.withSchedule(CronScheduleBuilder.cronSchedule(jobProps.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY)))

gobblin-runtime/src/test/java/org/apache/gobblin/scheduler/JobSchedulerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.gobblin.scheduler;
1919

20-
import com.google.common.base.Optional;
20+
import java.util.Optional;
2121
import java.util.Properties;
2222
import org.apache.gobblin.configuration.ConfigurationKeys;
2323
import org.junit.Assert;
@@ -39,7 +39,7 @@ public void testCreateUniqueTriggersForJob() {
3939
jobProps.put(ConfigurationKeys.JOB_GROUP_KEY, jobGroup);
4040
jobProps.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "0/2 * * * * ?");
4141

42-
Trigger trigger1 = JobScheduler.createTriggerForJob(jobKey, jobProps, Optional.absent());
42+
Trigger trigger1 = JobScheduler.createTriggerForJob(jobKey, jobProps, Optional.empty());
4343
Trigger trigger2 = JobScheduler.createTriggerForJob(jobKey, jobProps, Optional.of("suffix"));
4444

4545
Assert.assertFalse(trigger1.getKey().equals(trigger2.getKey()));

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ class DagAction {
4646
final String jobName;
4747
final DagActionType dagActionType;
4848

49+
public static DagAction forFlow(String flowGroup, String flowName, String flowExecutionId, DagActionType dagActionType) {
50+
return new DagAction(flowGroup, flowName, flowExecutionId, NO_JOB_NAME_DEFAULT, dagActionType);
51+
}
52+
4953
public FlowId getFlowId() {
5054
return new FlowId().setFlowGroup(this.flowGroup).setFlowName(this.flowName);
5155
}
@@ -90,6 +94,16 @@ public DagNodeId getDagNodeId() {
9094
*/
9195
boolean exists(String flowGroup, String flowName, String flowExecutionId, DagActionType dagActionType) throws IOException, SQLException;
9296

97+
/** Persist the {@link DagAction} in {@link DagActionStore} for durability */
98+
default void addDagAction(DagAction dagAction) throws IOException {
99+
addJobDagAction(
100+
dagAction.getFlowGroup(),
101+
dagAction.getFlowName(),
102+
dagAction.getFlowExecutionId(),
103+
dagAction.getJobName(),
104+
dagAction.getDagActionType());
105+
}
106+
93107
/**
94108
* Persist the dag action in {@link DagActionStore} for durability
95109
* @param flowGroup flow group for the dag action
@@ -109,11 +123,13 @@ public DagNodeId getDagNodeId() {
109123
* @param dagActionType the value of the dag action
110124
* @throws IOException
111125
*/
112-
void addFlowDagAction(String flowGroup, String flowName, String flowExecutionId, DagActionType dagActionType) throws IOException;
126+
default void addFlowDagAction(String flowGroup, String flowName, String flowExecutionId, DagActionType dagActionType) throws IOException {
127+
addDagAction(DagAction.forFlow(flowGroup, flowName, flowExecutionId, dagActionType));
128+
}
113129

114130
/**
115131
* delete the dag action from {@link DagActionStore}
116-
* @param DagAction containing all information needed to identify dag and specific action value
132+
* @param dagAction containing all information needed to identify dag and specific action value
117133
* @throws IOException
118134
* @return true if we successfully delete one record, return false if the record does not exist
119135
*/

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ private DagTask createDagTask(DagActionStore.DagAction dagAction, LeaseAttemptSt
177177
*/
178178
protected void scheduleReminderForEvent(LeaseAttemptStatus leaseStatus)
179179
throws SchedulerException {
180-
dagActionReminderScheduler.get().scheduleReminder(leaseStatus.getDagAction(),
180+
dagActionReminderScheduler.get().scheduleReminder(leaseStatus.getConsensusDagAction(),
181181
leaseStatus.getMinimumLingerDurationMillis());
182182
}
183183
}

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java

Lines changed: 47 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.time.temporal.ChronoUnit;
2525
import java.util.Date;
2626
import java.util.Locale;
27+
import java.util.Optional;
2728
import java.util.Properties;
2829
import java.util.Random;
2930

@@ -35,7 +36,6 @@
3536
import org.quartz.impl.JobDetailImpl;
3637

3738
import com.google.common.annotations.VisibleForTesting;
38-
import com.google.common.base.Optional;
3939
import com.typesafe.config.Config;
4040

4141
import javax.inject.Inject;
@@ -68,7 +68,7 @@
6868
@Slf4j
6969
public class FlowLaunchHandler {
7070
private final MultiActiveLeaseArbiter multiActiveLeaseArbiter;
71-
private Optional<DagActionStore> dagActionStore;
71+
private DagActionStore dagActionStore;
7272
private final MetricContext metricContext;
7373
private final int schedulerMaxBackoffMillis;
7474
private static Random random = new Random();
@@ -80,9 +80,14 @@ public class FlowLaunchHandler {
8080
@Inject
8181
public FlowLaunchHandler(Config config,
8282
@Named(ConfigurationKeys.SCHEDULER_LEASE_ARBITER_NAME) MultiActiveLeaseArbiter leaseArbiter,
83-
SchedulerService schedulerService, Optional<DagActionStore> dagActionStore) {
83+
SchedulerService schedulerService, com.google.common.base.Optional<DagActionStore> optDagActionStore) {
8484
this.multiActiveLeaseArbiter = leaseArbiter;
85-
this.dagActionStore = dagActionStore;
85+
86+
if (!optDagActionStore.isPresent()) {
87+
throw new RuntimeException("DagActionStore MUST be present for flow launch handling!");
88+
}
89+
this.dagActionStore = optDagActionStore.get();
90+
8691
this.schedulerMaxBackoffMillis = ConfigUtils.getInt(config, ConfigurationKeys.SCHEDULER_MAX_BACKOFF_MILLIS_KEY,
8792
ConfigurationKeys.DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS);
8893
this.schedulerService = schedulerService;
@@ -99,51 +104,46 @@ public FlowLaunchHandler(Config config,
99104
* This method is used in the multi-active scheduler case for one or more hosts to respond to a launch dag action
100105
* event triggered by the scheduler by attempting a lease for the launch event and processing the result depending on
101106
* the status of the attempt.
102-
* @param jobProps
103-
* @param dagAction
104-
* @param eventTimeMillis
105-
* @param isReminderEvent
106-
* @param skipFlowExecutionIdReplacement
107-
* @throws IOException
108107
*/
109108
public void handleFlowLaunchTriggerEvent(Properties jobProps, DagActionStore.DagAction dagAction,
110-
long eventTimeMillis, boolean isReminderEvent, boolean skipFlowExecutionIdReplacement) throws IOException {
111-
LeaseAttemptStatus
112-
leaseAttemptStatus = this.multiActiveLeaseArbiter
113-
.tryAcquireLease(dagAction, eventTimeMillis, isReminderEvent, skipFlowExecutionIdReplacement);
114-
if (leaseAttemptStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus) {
115-
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus =
116-
(LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus;
117-
if (persistDagAction(leaseObtainedStatus)) {
118-
log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", leaseObtainedStatus.getDagAction(),
119-
leaseObtainedStatus.getEventTimeMillis());
120-
return;
121-
}
122-
// If persisting the dag action failed, then we set another trigger for this event to occur immediately to
123-
// re-attempt handling the event
124-
scheduleReminderForEvent(jobProps,
125-
new LeaseAttemptStatus.LeasedToAnotherStatus(leaseObtainedStatus.getDagAction(), 0L), eventTimeMillis);
126-
} else if (leaseAttemptStatus instanceof LeaseAttemptStatus.LeasedToAnotherStatus) {
127-
scheduleReminderForEvent(jobProps, (LeaseAttemptStatus.LeasedToAnotherStatus) leaseAttemptStatus,
128-
eventTimeMillis);
129-
}
130-
// Otherwise leaseAttemptStatus instanceof MultiActiveLeaseArbiter.NoLongerLeasingStatus & no need to do anything
109+
long eventTimeMillis, boolean isReminderEvent, boolean adoptConsensusFlowExecutionId) throws IOException {
110+
LeaseAttemptStatus leaseAttempt = this.multiActiveLeaseArbiter.tryAcquireLease(
111+
dagAction, eventTimeMillis, isReminderEvent, adoptConsensusFlowExecutionId);
112+
if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus
113+
&& persistDagAction((LeaseAttemptStatus.LeaseObtainedStatus) leaseAttempt)) {
114+
log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", leaseAttempt.getConsensusDagAction(),
115+
((LeaseAttemptStatus.LeaseObtainedStatus) leaseAttempt).getEventTimeMillis());
116+
} else { // when NOT successfully `persistDagAction`, set a reminder to re-attempt handling (unless leasing finished)
117+
calcLeasedToAnotherStatusForReminder(leaseAttempt).ifPresent(leasedToAnother ->
118+
scheduleReminderForEvent(jobProps, leasedToAnother, eventTimeMillis));
119+
}
131120
}
132121

133-
// Called after obtaining a lease to persist the dag action to {@link DagActionStore} and mark the lease as done
134-
private boolean persistDagAction(LeaseAttemptStatus.LeaseObtainedStatus leaseStatus) {
135-
if (this.dagActionStore.isPresent()) {
136-
try {
137-
DagActionStore.DagAction dagAction = leaseStatus.getDagAction();
138-
this.dagActionStore.get().addFlowDagAction(dagAction.getFlowGroup(), dagAction.getFlowName(), dagAction.getFlowExecutionId(), dagAction.getDagActionType());
139-
// If the dag action has been persisted to the {@link DagActionStore} we can close the lease
140-
this.numFlowsSubmitted.mark();
141-
return this.multiActiveLeaseArbiter.recordLeaseSuccess(leaseStatus);
142-
} catch (IOException e) {
143-
throw new RuntimeException(e);
144-
}
122+
/** @return {@link Optional} status for reminding, unless {@link LeaseAttemptStatus.NoLongerLeasingStatus} (hence nothing to do) */
123+
private Optional<LeaseAttemptStatus.LeasedToAnotherStatus> calcLeasedToAnotherStatusForReminder(LeaseAttemptStatus leaseAttempt) {
124+
if (leaseAttempt instanceof LeaseAttemptStatus.NoLongerLeasingStatus) { // all done: nothing to remind about
125+
return Optional.empty();
126+
} else if (leaseAttempt instanceof LeaseAttemptStatus.LeasedToAnotherStatus) { // already have one: just return it
127+
return Optional.of((LeaseAttemptStatus.LeasedToAnotherStatus) leaseAttempt);
128+
} else if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus) { // remind w/o delay to immediately re-attempt handling
129+
return Optional.of(new LeaseAttemptStatus.LeasedToAnotherStatus(leaseAttempt.getConsensusDagAction(), 0L));
145130
} else {
146-
throw new RuntimeException("DagActionStore is " + (this.dagActionStore.isPresent() ? "" : "NOT") + " present.");
131+
throw new RuntimeException("unexpected `LeaseAttemptStatus` derived type: '" + leaseAttempt.getClass().getName() + "' in '" + leaseAttempt + "'");
132+
}
133+
}
134+
135+
/**
136+
* Called after obtaining a lease to both persist to the {@link DagActionStore} and
137+
* {@link MultiActiveLeaseArbiter#recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus)}
138+
*/
139+
private boolean persistDagAction(LeaseAttemptStatus.LeaseObtainedStatus leaseStatus) {
140+
try {
141+
this.dagActionStore.addDagAction(leaseStatus.getConsensusDagAction());
142+
this.numFlowsSubmitted.mark();
143+
// after successfully persisting, close the lease
144+
return this.multiActiveLeaseArbiter.recordLeaseSuccess(leaseStatus);
145+
} catch (IOException e) {
146+
throw new RuntimeException(e);
147147
}
148148
}
149149

@@ -156,7 +156,7 @@ private boolean persistDagAction(LeaseAttemptStatus.LeaseObtainedStatus leaseSta
156156
*/
157157
private void scheduleReminderForEvent(Properties jobProps, LeaseAttemptStatus.LeasedToAnotherStatus status,
158158
long triggerEventTimeMillis) {
159-
DagActionStore.DagAction dagAction = status.getDagAction();
159+
DagActionStore.DagAction dagAction = status.getConsensusDagAction();
160160
JobKey origJobKey = new JobKey(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "<<no job name>>"),
161161
jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "<<no job group>>"));
162162
try {
@@ -196,7 +196,7 @@ protected Trigger createAndScheduleReminder(JobKey origJobKey, LeaseAttemptStatu
196196
Trigger reminderTrigger = JobScheduler.createTriggerForJob(reminderJobKey, getJobPropertiesFromJobDetail(jobDetail),
197197
Optional.of(reminderSuffix));
198198
log.debug("Flow Launch Handler - [{}, eventTimestamp: {}] - attempting to schedule reminder for event {} with "
199-
+ "reminderJobKey {} and reminderTriggerKey {}", status.getDagAction(), triggerEventTimeMillis,
199+
+ "reminderJobKey {} and reminderTriggerKey {}", status.getConsensusDagAction(), triggerEventTimeMillis,
200200
status.getEventTimeMillis(), reminderJobKey, reminderTrigger.getKey());
201201
this.schedulerService.getScheduler().scheduleJob(jobDetail, reminderTrigger);
202202
return reminderTrigger;
@@ -258,7 +258,7 @@ public static JobDataMap updatePropsInJobDataMap(JobDataMap jobDataMap,
258258
// Saves the following properties in jobProps to retrieve when the trigger fires
259259
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY,
260260
String.valueOf(getUTCTimeFromDelayPeriod(delayPeriodMillis)));
261-
// Use the db laundered timestamp for the reminder to ensure consensus between hosts. Participant trigger timestamps
261+
// Use the db consensus timestamp for the reminder to ensure inter-host agreement. Participant trigger timestamps
262262
// can differ between participants and be interpreted as a reminder for a distinct flow trigger which will cause
263263
// excess flows to be triggered by the reminder functionality.
264264
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,18 @@
2525

2626

2727
/**
28-
Class used to encapsulate status of lease acquisition attempts made by {@link MultiActiveLeaseArbiter} and contains
29-
information specific to the status that results. The {@link LeaseAttemptStatus#getDagAction} and
30-
{@link LeaseAttemptStatus#getMinimumLingerDurationMillis} are meant to be
31-
overridden and used by relevant derived classes.
28+
* Hierarchy to convey the specific outcome of attempted lease acquisition via the {@link MultiActiveLeaseArbiter},
29+
* with each derived type carrying outcome-specific status info.
30+
*
31+
* IMPL. NOTE: {@link LeaseAttemptStatus#getConsensusDagAction} and {@link LeaseAttemptStatus#getMinimumLingerDurationMillis}
32+
* intended for `@Override`.
3233
*/
3334
public abstract class LeaseAttemptStatus {
34-
public DagActionStore.DagAction getDagAction() {
35+
/**
36+
* @return the {@link DagActionStore.DagAction}, which may now have an updated flowExecutionId that MUST henceforth be
37+
* used; {@see MultiActiveLeaseArbiter#tryAcquireLease}
38+
*/
39+
public DagActionStore.DagAction getConsensusDagAction() {
3540
return null;
3641
}
3742

@@ -53,7 +58,7 @@ public static class NoLongerLeasingStatus extends LeaseAttemptStatus {}
5358
*/
5459
@Data
5560
public static class LeaseObtainedStatus extends LeaseAttemptStatus {
56-
private final DagActionStore.DagAction dagAction;
61+
private final DagActionStore.DagAction consensusDagAction;
5762
private final long leaseAcquisitionTimestamp;
5863
private final long minimumLingerDurationMillis;
5964
@Getter(AccessLevel.NONE)
@@ -63,7 +68,7 @@ public static class LeaseObtainedStatus extends LeaseAttemptStatus {
6368
* @return event time in millis since epoch for the event of this lease acquisition
6469
*/
6570
public long getEventTimeMillis() {
66-
return Long.parseLong(dagAction.getFlowExecutionId());
71+
return Long.parseLong(consensusDagAction.getFlowExecutionId());
6772
}
6873

6974
/**
@@ -85,15 +90,15 @@ public boolean completeLease() throws IOException {
8590
*/
8691
@Data
8792
public static class LeasedToAnotherStatus extends LeaseAttemptStatus {
88-
private final DagActionStore.DagAction dagAction;
93+
private final DagActionStore.DagAction consensusDagAction;
8994
private final long minimumLingerDurationMillis;
9095

9196
/**
9297
* Returns event time in millis since epoch for the event whose lease was obtained by another participant.
9398
* @return
9499
*/
95100
public long getEventTimeMillis() {
96-
return Long.parseLong(dagAction.getFlowExecutionId());
101+
return Long.parseLong(consensusDagAction.getFlowExecutionId());
97102
}
98103
}
99104
}

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,11 @@ public interface MultiActiveLeaseArbiter {
5252
* @param eventTimeMillis is the time this dag action was triggered
5353
* @param isReminderEvent true if the dag action event we're checking on is a reminder event
5454
* @param adoptConsensusFlowExecutionId if true then replaces the dagAction flowExecutionId returned in
55-
* LeaseAttemptStatuses with the consensus eventTime
55+
* LeaseAttemptStatuses with the consensus eventTime, accessed via
56+
* {@link LeaseAttemptStatus#getConsensusDagAction()}
5657
*
57-
* @return LeaseAttemptStatus, containing a dag action that will have an updated flow execution id if `
58-
* adoptConsensusFlowExecutionId` is true. The caller should use the newer version of the dag action to easily track
59-
* the action moving forward.
58+
* @return {@link LeaseAttemptStatus}, containing, when `adoptConsensusFlowExecutionId`, a universally-agreed-upon
59+
* {@link DagActionStore.DagAction} with a possibly updated ("laundered") flow execution id that MUST be used thereafter
6060
* @throws IOException
6161
*/
6262
LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction dagAction, long eventTimeMillis, boolean isReminderEvent,

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,12 +144,6 @@ public void addJobDagAction(String flowGroup, String flowName, String flowExecut
144144
}}, true);
145145
}
146146

147-
@Override
148-
public void addFlowDagAction(String flowGroup, String flowName, String flowExecutionId, DagActionType dagActionType)
149-
throws IOException {
150-
addJobDagAction(flowGroup, flowName, flowExecutionId, NO_JOB_NAME_DEFAULT, dagActionType);
151-
}
152-
153147
@Override
154148
public boolean deleteDagAction(DagAction dagAction) throws IOException {
155149
return dbStatementExecutor.withPreparedStatement(String.format(DELETE_STATEMENT, tableName), deleteStatement -> {

gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -585,7 +585,7 @@ protected static void completeUpdatePreparedStatement(PreparedStatement statemen
585585
@Override
586586
public boolean recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus status)
587587
throws IOException {
588-
DagActionStore.DagAction dagAction = status.getDagAction();
588+
DagActionStore.DagAction dagAction = status.getConsensusDagAction();
589589
return dbStatementExecutor.withPreparedStatement(String.format(CONDITIONALLY_COMPLETE_LEASE_STATEMENT, leaseArbiterTableName),
590590
updateStatement -> {
591591
int i = 0;

0 commit comments

Comments
 (0)