Skip to content

Commit 77f9d38

Browse files
authored
[GOBBLIN-2054] Fix CommitActivityImpl to succeed for TaskStates that did not originate with CopySource (#3934)
1 parent 773da76 commit 77f9d38

File tree

5 files changed

+63
-68
lines changed

5 files changed

+63
-68
lines changed

gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -544,8 +544,7 @@ public static Optional<Class<? extends DataPublisher>> getJobDataPublisherClass(
544544
* or {@link ConfigurationKeys#PUBLISH_DATA_AT_JOB_LEVEL} is set to true.
545545
*/
546546
public static boolean shouldCommitDataInJob(State state) {
547-
boolean jobCommitPolicyIsFull =
548-
JobCommitPolicy.getCommitPolicy(state.getProperties()) == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
547+
boolean jobCommitPolicyIsFull = JobCommitPolicy.getCommitPolicy(state) == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
549548
boolean publishDataAtJobLevel = state.getPropAsBoolean(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL,
550549
ConfigurationKeys.DEFAULT_PUBLISH_DATA_AT_JOB_LEVEL);
551550
boolean jobDataPublisherSpecified =

gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -392,16 +392,21 @@ public List<TaskState> getTaskProgress() {
392392
* @return a {@link Map} from dataset URNs to {@link DatasetState}s representing the dataset states
393393
*/
394394
public Map<String, DatasetState> createDatasetStatesByUrns() {
395+
return calculateDatasetStatesByUrns(this.taskStates.values(), this.skippedTaskStates.values());
396+
}
397+
398+
/** {@see JobState#createDatasetStatesByUrns} */
399+
public Map<String, DatasetState> calculateDatasetStatesByUrns(Collection<TaskState> allTaskStates, Collection<TaskState> allSkippedTaskStates) {
395400
Map<String, DatasetState> datasetStatesByUrns = Maps.newHashMap();
396401

397-
for (TaskState taskState : this.taskStates.values()) {
402+
for (TaskState taskState : allTaskStates) {
398403
String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
399404

400405
datasetStatesByUrns.get(datasetUrn).incrementTaskCount();
401406
datasetStatesByUrns.get(datasetUrn).addTaskState(taskState);
402407
}
403408

404-
for (TaskState taskState : this.skippedTaskStates.values()) {
409+
for (TaskState taskState : allSkippedTaskStates) {
405410
String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
406411
datasetStatesByUrns.get(datasetUrn).addSkippedTaskState(taskState);
407412
}

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java

Lines changed: 53 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,7 @@
1717

1818
package org.apache.gobblin.temporal.ddm.activity.impl;
1919

20-
import com.google.common.base.Function;
21-
import com.google.common.base.Strings;
22-
import com.google.common.collect.ImmutableList;
23-
import com.google.common.collect.Iterables;
24-
import com.google.common.collect.Maps;
25-
import io.temporal.failure.ApplicationFailure;
2620
import java.io.IOException;
27-
import java.util.Collection;
2821
import java.util.HashSet;
2922
import java.util.List;
3023
import java.util.Map;
@@ -33,13 +26,24 @@
3326
import java.util.Set;
3427
import java.util.concurrent.Callable;
3528
import java.util.concurrent.ExecutionException;
29+
import java.util.stream.Collectors;
30+
3631
import javax.annotation.Nullable;
3732
import lombok.extern.slf4j.Slf4j;
33+
34+
import com.google.api.client.util.Lists;
35+
import com.google.common.base.Function;
36+
import com.google.common.base.Strings;
37+
import com.google.common.collect.Iterables;
38+
import io.temporal.failure.ApplicationFailure;
39+
40+
import org.apache.hadoop.fs.FileSystem;
41+
import org.apache.hadoop.fs.Path;
42+
3843
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
3944
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
4045
import org.apache.gobblin.commit.DeliverySemantics;
4146
import org.apache.gobblin.configuration.ConfigurationKeys;
42-
import org.apache.gobblin.configuration.State;
4347
import org.apache.gobblin.metastore.StateStore;
4448
import org.apache.gobblin.metrics.event.EventSubmitter;
4549
import org.apache.gobblin.runtime.JobContext;
@@ -57,8 +61,6 @@
5761
import org.apache.gobblin.util.Either;
5862
import org.apache.gobblin.util.ExecutorsUtils;
5963
import org.apache.gobblin.util.executors.IteratorExecutor;
60-
import org.apache.hadoop.fs.FileSystem;
61-
import org.apache.hadoop.fs.Path;
6264

6365

6466
@Slf4j
@@ -72,37 +74,27 @@ public class CommitActivityImpl implements CommitActivity {
7274
public int commit(WUProcessingSpec workSpec) {
7375
// TODO: Make this configurable
7476
int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
75-
String jobName = UNDEFINED_JOB_NAME;
77+
Optional<String> optJobName = Optional.empty();
7678
AutomaticTroubleshooter troubleshooter = null;
7779
try {
7880
FileSystem fs = Help.loadFileSystem(workSpec);
7981
JobState jobState = Help.loadJobState(workSpec, fs);
80-
jobName = jobState.getJobName();
82+
optJobName = Optional.ofNullable(jobState.getJobName());
8183
SharedResourcesBroker<GobblinScopeTypes> instanceBroker = JobStateUtils.getSharedResourcesBroker(jobState);
8284
troubleshooter = AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobState.getProperties()));
8385
troubleshooter.start();
84-
JobContext globalGobblinContext = new JobContext(jobState.getProperties(), log, instanceBroker, troubleshooter.getIssueRepository());
85-
// TODO: Task state dir is a stub with the assumption it is always colocated with the workunits dir (as in the case of MR which generates workunits)
86-
Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent();
87-
Path jobOutputPath = new Path(new Path(jobIdParent, "output"), jobIdParent.getName());
88-
log.info("Output path at: " + jobOutputPath + " with fs at " + fs.getUri());
89-
StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec, fs);
90-
Optional<Queue<TaskState>> taskStateQueueOpt =
91-
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, jobOutputPath.getName(), numDeserializationThreads);
92-
if (!taskStateQueueOpt.isPresent()) {
93-
log.error("No task states found at " + jobOutputPath);
94-
return 0;
86+
List<TaskState> taskStates = loadTaskStates(workSpec, fs, jobState, numDeserializationThreads);
87+
if (!taskStates.isEmpty()) {
88+
JobContext jobContext = new JobContext(jobState.getProperties(), log, instanceBroker, troubleshooter.getIssueRepository());
89+
commitTaskStates(jobState, taskStates, jobContext);
9590
}
96-
Queue<TaskState> taskStateQueue = taskStateQueueOpt.get();
97-
commitTaskStates(jobState, ImmutableList.copyOf(taskStateQueue), globalGobblinContext);
98-
return taskStateQueue.size();
91+
return taskStates.size();
9992
} catch (Exception e) {
10093
//TODO: IMPROVE GRANULARITY OF RETRIES
10194
throw ApplicationFailure.newNonRetryableFailureWithCause(
102-
String.format("Failed to commit dataset state for some dataset(s) of job %s", jobName),
95+
String.format("Failed to commit dataset state for some dataset(s) of job %s", optJobName.orElse(UNDEFINED_JOB_NAME)),
10396
IOException.class.toString(),
104-
new IOException(e),
105-
null
97+
new IOException(e)
10698
);
10799
} finally {
108100
String errCorrelator = String.format("Commit [%s]", calcCommitId(workSpec));
@@ -118,8 +110,13 @@ public int commit(WUProcessingSpec workSpec) {
118110
* @param jobContext
119111
* @throws IOException
120112
*/
121-
private void commitTaskStates(State jobState, Collection<TaskState> taskStates, JobContext jobContext) throws IOException {
122-
Map<String, JobState.DatasetState> datasetStatesByUrns = createDatasetStatesByUrns(taskStates);
113+
private void commitTaskStates(JobState jobState, List<TaskState> taskStates, JobContext jobContext) throws IOException {
114+
if (!taskStates.isEmpty()) {
115+
TaskState firstTaskState = taskStates.get(0);
116+
log.info("TaskState (commit) [{}] (**first of {}**): {}", firstTaskState.getTaskId(), taskStates.size(), firstTaskState.toJsonString(true));
117+
}
118+
//TODO: handle skipped tasks?
119+
Map<String, JobState.DatasetState> datasetStatesByUrns = jobState.calculateDatasetStatesByUrns(taskStates, Lists.newArrayList());
123120
final boolean shouldCommitDataInJob = JobContext.shouldCommitDataInJob(jobState);
124121
final DeliverySemantics deliverySemantics = DeliverySemantics.AT_LEAST_ONCE;
125122
//TODO: Make this configurable
@@ -148,7 +145,7 @@ public Callable<Void> apply(final Map.Entry<String, JobState.DatasetState> entry
148145
}).iterator(), numCommitThreads,
149146
// TODO: Rewrite executorUtils to use java util optional
150147
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log), com.google.common.base.Optional.of("Commit-thread-%d")))
151-
.executeAndGetResults();
148+
.executeAndGetResults();
152149

153150
IteratorExecutor.logFailures(result, null, 10);
154151

@@ -166,44 +163,39 @@ public Callable<Void> apply(final Map.Entry<String, JobState.DatasetState> entry
166163
}
167164
if (!IteratorExecutor.verifyAllSuccessful(result)) {
168165
// TODO: propagate cause of failure and determine whether or not this is retryable to throw a non-retryable failure exception
169-
String jobName = jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY, UNDEFINED_JOB_NAME);
166+
String jobName = jobState.getProp(ConfigurationKeys.JOB_NAME_KEY, UNDEFINED_JOB_NAME);
170167
throw new IOException("Failed to commit dataset state for some dataset(s) of job " + jobName);
171168
}
172169
} catch (InterruptedException exc) {
173170
throw new IOException(exc);
174171
}
175172
}
176173

174+
/** @return {@link TaskState}s loaded from the {@link StateStore<TaskState>} indicated by the {@link WUProcessingSpec} and {@link FileSystem} */
175+
private List<TaskState> loadTaskStates(WUProcessingSpec workSpec, FileSystem fs, JobState jobState, int numThreads) throws IOException {
176+
// TODO - decide whether to replace this method by adapting TaskStateCollectorService::collectOutputTaskStates (whence much of this code was drawn)
177+
StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec, fs);
178+
// NOTE: TaskState dir is assumed to be a sibling to the workunits dir (following conventions of `MRJobLauncher`)
179+
String jobIdPathName = new Path(workSpec.getWorkUnitsDir()).getParent().getName();
180+
log.info("TaskStateStore path (name component): '{}' (fs: '{}')", jobIdPathName, fs.getUri());
181+
Optional<Queue<TaskState>> taskStateQueueOpt = TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, jobIdPathName, numThreads);
182+
return taskStateQueueOpt.map(taskStateQueue ->
183+
taskStateQueue.stream().peek(taskState ->
184+
// CRITICAL: although some `WorkUnit`s, like those created by `CopySource::FileSetWorkUnitGenerator` for each `CopyEntity`
185+
// already themselves contain every prop of their `JobState`, not all do.
186+
// `TaskState extends WorkUnit` serialization will include its constituent `WorkUnit`, but not the constituent `JobState`.
187+
// given some `JobState` props may be essential for commit/publish, deserialization must re-associate each `TaskState` w/ `JobState`
188+
taskState.setJobState(jobState)
189+
// TODO - decide whether something akin necessary to streamline cumulative in-memory size of all issues: consumeTaskIssues(taskState);
190+
).collect(Collectors.toList())
191+
).orElseGet(() -> {
192+
log.error("TaskStateStore successfully opened, but no task states found under (name) '{}'", jobIdPathName);
193+
return Lists.newArrayList();
194+
});
195+
}
196+
177197
/** @return id/correlator for this particular commit activity */
178198
private static String calcCommitId(WUProcessingSpec workSpec) {
179199
return new Path(workSpec.getWorkUnitsDir()).getParent().getName();
180200
}
181-
182-
/**
183-
* Organize task states by dataset urns.
184-
* @param taskStates
185-
* @return A map of dataset urns to dataset task states.
186-
*/
187-
public static Map<String, JobState.DatasetState> createDatasetStatesByUrns(Collection<TaskState> taskStates) {
188-
Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
189-
190-
//TODO: handle skipped tasks?
191-
for (TaskState taskState : taskStates) {
192-
String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
193-
datasetStatesByUrns.get(datasetUrn).incrementTaskCount();
194-
datasetStatesByUrns.get(datasetUrn).addTaskState(taskState);
195-
}
196-
197-
return datasetStatesByUrns;
198-
}
199-
200-
private static String createDatasetUrn(Map<String, JobState.DatasetState> datasetStatesByUrns, TaskState taskState) {
201-
String datasetUrn = taskState.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN);
202-
if (!datasetStatesByUrns.containsKey(datasetUrn)) {
203-
JobState.DatasetState datasetState = new JobState.DatasetState();
204-
datasetState.setDatasetUrn(datasetUrn);
205-
datasetStatesByUrns.put(datasetUrn, datasetState);
206-
}
207-
return datasetUrn;
208-
}
209201
}

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public void onTaskCommitCompletion(Task task) {
138138
// TODO: if metrics configured, report them now
139139
log.info("WU [{} = {}] - finished commit after {}ms with state {}{}", wu.getCorrelator(), task.getTaskId(),
140140
taskState.getTaskDuration(), taskState.getWorkingState(),
141-
taskState.getWorkingState().equals(WorkUnitState.WorkingState.SUCCESSFUL)
141+
taskState.getWorkingState().equals(WorkUnitState.WorkingState.SUCCESSFUL) && taskState.contains(ConfigurationKeys.WRITER_OUTPUT_DIR)
142142
? (" to: " + taskState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR)) : "");
143143
log.debug("WU [{} = {}] - task state: {}", wu.getCorrelator(), task.getTaskId(),
144144
taskState.toJsonString(shouldUseExtendedLogging(wu)));

gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,7 @@ public int execute(Properties jobProps, EventSubmitterContext eventSubmitterCont
104104
throw ApplicationFailure.newNonRetryableFailureWithCause(
105105
String.format("Failed Gobblin job %s", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
106106
e.getClass().toString(),
107-
e,
108-
null
107+
e
109108
);
110109
}
111110
return numWUsGenerated;

0 commit comments

Comments
 (0)