diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java index 7809e785f57..9edc5e3525e 100644 --- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java +++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java @@ -58,6 +58,7 @@ public class ServiceMetricNames { public static final String CREATE_FLOW_METER = "CreateFlow"; public static final String DELETE_FLOW_METER = "DeleteFlow"; + public static final String FLOW_SPEC_EXISTS_FOR_ADHOC_FLOW = "flowSpecExistsForAdhocFlow"; public static final String RUN_IMMEDIATELY_FLOW_METER = "RunImmediatelyFlow"; public static final String SUCCESSFUL_FLOW_METER = "SuccessfulFlows"; public static final String START_SLA_EXCEEDED_FLOWS_METER = "StartSLAExceededFlows"; @@ -96,6 +97,8 @@ public class ServiceMetricNames { public static final String DAG_ACTIONS_ACT_FAILED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsActFailed."; public static final String DAG_ACTIONS_ACT_SUCCEEDED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsActSucceeded."; public static final String DAG_ACTIONS_CONCLUDE_FAILED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFailed."; + public static final String DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_SUCCEEDED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFlowSpecRemovalSucceeded."; + public static final String DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_FAILED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFlowSpecRemovalFailed."; public static final String DAG_ACTIONS_CONCLUDE_SUCCEEDED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeSucceeded."; public static final String DAG_ACTIONS_DELETE_SUCCEEDED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsDeleteSucceeded."; public static final String DAG_ACTIONS_DELETE_FAILED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsDeleteFailed."; diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java index 055724d83d6..1e85169de41 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java @@ -86,6 +86,7 @@ public class FlowConfigsV2ResourceHandler implements FlowConfigsResourceHandlerI protected FlowCatalog flowCatalog; protected final ContextAwareMeter createFlow; protected final ContextAwareMeter deleteFlow; + protected final ContextAwareMeter flowSpecExistsForAdhocFlow; protected final ContextAwareMeter runImmediatelyFlow; @Inject @@ -100,6 +101,8 @@ public FlowConfigsV2ResourceHandler(@Named(InjectionNames.SERVICE_NAME) String s MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.DELETE_FLOW_METER)); this.runImmediatelyFlow = metricContext.contextAwareMeter( MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.RUN_IMMEDIATELY_FLOW_METER)); + this.flowSpecExistsForAdhocFlow = metricContext.contextAwareMeter( + MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.FLOW_SPEC_EXISTS_FOR_ADHOC_FLOW)); } public FlowConfig getFlowConfig(FlowId flowId) @@ -248,8 +251,22 @@ public CreateKVResponse, FlowConfig> cr // Return conflict and take no action if flowSpec has already been created if (this.flowCatalog.exists(flowSpec.getUri())) { log.warn("FlowSpec with URI {} already exists, no action will be taken", flowSpec.getUri()); - return new CreateKVResponse<>(new RestLiServiceException(HttpStatus.S_409_CONFLICT, - "FlowSpec with URI " + flowSpec.getUri() + " already exists, no action will be taken")); + try { + FlowSpec storedFlowSpec = this.flowCatalog.getSpecs(flowSpec.getUri()); + if (!storedFlowSpec.isScheduled()) { + log.warn("FlowSpec Already Exists As Adhoc Flow with URI: " + flowSpec.getUri()); + if (!flowSpec.isScheduled()) { + flowSpecExistsForAdhocFlow.mark(); + } + } else { + log.warn("FlowSpec Already Exists As Scheduled Flow with URI: " + flowSpec.getUri()); + } + } catch (SpecNotFoundException e) { + log.error("Error Retrieving FLow For Existing Flow With URI: " + flowSpec.getUri()); + } finally { + return new CreateKVResponse<>(new RestLiServiceException(HttpStatus.S_409_CONFLICT, + "FlowSpec with URI " + flowSpec.getUri() + " already exists, no action will be taken")); + } } Map responseMap; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java index 25412b1aeb6..be39a7d27ee 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java @@ -59,6 +59,10 @@ public class DagProcessingEngineMetrics { private final HashMap dagActionsActSucceededMeterByDagActionType = new HashMap<>(); private final HashMap dagActionsConcludeFailedMeterByDagActionType = new HashMap<>(); private final HashMap dagActionsConcludeSucceededMeterByDagActionType = new HashMap<>(); + private final HashMap + dagActionsConcludeFlowSpecRemovalSucceededMeterByDagActionType = new HashMap<>(); + private final HashMap + dagActionsConcludeFlowSpecRemovalFailedMeterByDagActionType = new HashMap<>(); private final HashMap dagActionsDeleteFailedMeterByDagActionType = new HashMap<>(); private final HashMap dagActionsDeleteSucceededMeterByDagActionType = new HashMap<>(); private final HashMap dagActionsAverageProcessingDelayMillisMeterByDagActionType = new HashMap<>(); @@ -89,6 +93,8 @@ public void registerAllMetrics() { registerMetricForEachDagActionType(this.dagActionsActSucceededMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_ACT_SUCCEEDED); registerMetricForEachDagActionType(this.dagActionsConcludeFailedMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FAILED); registerMetricForEachDagActionType(this.dagActionsConcludeSucceededMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_SUCCEEDED); + registerMetricForEachDagActionType(this.dagActionsConcludeFlowSpecRemovalSucceededMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_SUCCEEDED); + registerMetricForEachDagActionType(this.dagActionsConcludeFlowSpecRemovalFailedMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_FAILED); registerMetricForEachDagActionType(this.dagActionsDeleteFailedMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_DELETE_FAILED); registerMetricForEachDagActionType(this.dagActionsDeleteSucceededMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_DELETE_SUCCEEDED); registerMetricForEachDagActionType(this.dagActionsAverageProcessingDelayMillisMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_AVERAGE_PROCESSING_DELAY_MILLIS); @@ -163,6 +169,14 @@ public void markDagActionsConclude(DagActionStore.DagActionType dagActionType, b } } + public void markDagActionsConcludeFlowSpecRemoval(DagActionStore.DagActionType dagActionType, boolean succeeded) { + if (succeeded) { + updateMetricForDagActionType(this.dagActionsConcludeFlowSpecRemovalSucceededMeterByDagActionType, dagActionType); + } else { + updateMetricForDagActionType(this.dagActionsConcludeFlowSpecRemovalFailedMeterByDagActionType, dagActionType); + } + } + public void markDagActionsDeleted(DagActionStore.DagActionType dagActionType, boolean succeeded) { if (succeeded) { updateMetricForDagActionType(this.dagActionsDeleteSucceededMeterByDagActionType, dagActionType); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java index e00a59df45a..54bb6ff3108 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java @@ -40,7 +40,7 @@ public abstract class DagTask { @Getter public final DagActionStore.DagAction dagAction; protected final DagManagementStateStore dagManagementStateStore; private final LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus; - private final DagProcessingEngineMetrics dagProcEngineMetrics; + protected final DagProcessingEngineMetrics dagProcEngineMetrics; public DagTask(DagActionStore.DagAction dagAction, LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus, DagManagementStateStore dagManagementStateStore, DagProcessingEngineMetrics dagProcEngineMetrics) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java index fa3f291d92c..183f394f69b 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java @@ -54,7 +54,15 @@ public final boolean conclude() { FlowSpec flowSpec = this.dagManagementStateStore.getFlowSpec(FlowSpec.Utils.createFlowSpecUri(dagId.getFlowId())); if (!flowSpec.isScheduled()) { - dagManagementStateStore.removeFlowSpec(flowSpec.getUri(), new Properties(), false); + try { + //This can throw Runtime, IllegalState and IO Exceptions which are not caught here. + dagManagementStateStore.removeFlowSpec(flowSpec.getUri(), new Properties(), false); + } catch (Exception e) { + super.dagProcEngineMetrics.markDagActionsConcludeFlowSpecRemoval(this.dagAction.getDagActionType(), false); + log.error("Failed to Remove The FlowSpec For Adhoc Flow with URI: " + flowSpec.getUri()); + return false; + } + super.dagProcEngineMetrics.markDagActionsConcludeFlowSpecRemoval(this.dagAction.getDagActionType(), true); } return true; }