From 99769c4682a84aa8d72ed9db39928cd8a5cb8788 Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Fri, 1 Dec 2023 14:55:22 -0800 Subject: [PATCH 1/5] Add logging and metrics for flow compilation validation --- .../apache/gobblin/metrics/ServiceMetricNames.java | 1 + .../modules/flow/IdentityFlowToJobSpecCompiler.java | 4 ++++ .../service/modules/orchestration/DagManager.java | 1 + .../modules/orchestration/DagManagerMetrics.java | 13 ++++++++++++- .../utils/FlowCompilationValidationHelper.java | 6 ++++++ 5 files changed, 24 insertions(+), 1 deletion(-) diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java index 866644f50f0..e41bda2d7bb 100644 --- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java +++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java @@ -51,6 +51,7 @@ public class ServiceMetricNames { public static final String DAG_MANAGER_PREFIX = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "dagManager"; public static final String DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT = DAG_MANAGER_PREFIX + ".failedLaunchEventsOnStartupCount"; + public static final String DAG_MANAGER_LAUNCH_EVENTS_ATTEMPTED_ON_STARTUP_COUNT = DAG_MANAGER_PREFIX + ".launchEventsAttemptedOnStartupCount"; public static final String FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT = DAG_MANAGER_PREFIX + ".flowFailedForwardToDagManagerCount"; //Job status poll timer diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java index d3b25f54e17..473c6ad0ee9 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java @@ -84,6 +84,10 @@ public Dag compileFlow(Spec spec) { List jobExecutionPlans; try { jobExecutionPlans = getJobExecutionPlans(source, destination, jobSpec); + if (jobExecutionPlans.isEmpty()) { + flowSpec.addCompilationError(source, destination, + String.format("Could not find path between source: %s and destination: %s", source, destination)); + } } catch (InterruptedException | ExecutionException e) { Instrumented.markMeter(this.flowCompilationFailedMeter); throw new RuntimeException("Cannot determine topology capabilities", e); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index c90841b50a1..b6b4eafbefb 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -498,6 +498,7 @@ public synchronized void setActive(boolean active) { */ public void handleLaunchFlowEvent(DagActionStore.DagAction launchAction) { Preconditions.checkArgument(launchAction.getFlowActionType() == DagActionStore.FlowActionType.LAUNCH); + this.dagManagerMetrics.incrementLaunchAttemptCount(); log.info("Handle launch flow event for action {}", launchAction); FlowId flowId = launchAction.getFlowId(); try { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java index 6d6c545b5b4..b29cd0450a3 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java @@ -76,8 +76,9 @@ public class DagManagerMetrics { private final Map executorSlaExceededMeters = Maps.newConcurrentMap(); private final Map executorJobSentMeters = Maps.newConcurrentMap(); - // Metrics for unexpected flow handling failures + // Metric for unexpected flow handling failures private ContextAwareCounter failedLaunchEventsOnActivationCount; + private ContextAwareCounter launchEventsAttemptedOnActivationCount; MetricContext metricContext; public DagManagerMetrics(MetricContext metricContext) { @@ -103,9 +104,12 @@ public void activate() { ServiceMetricNames.SLA_EXCEEDED_FLOWS_METER)); allRunningMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR)); + // TODO: remove duplicate use of 'GOBBLIN_SERVICE_PREFIX' once startup functionality is stable failedLaunchEventsOnActivationCount = metricContext.contextAwareCounter( MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT)); + launchEventsAttemptedOnActivationCount = metricContext.contextAwareCounter( + ServiceMetricNames.DAG_MANAGER_LAUNCH_EVENTS_ATTEMPTED_ON_STARTUP_COUNT); } } @@ -212,6 +216,13 @@ public void incrementFailedLaunchCount() { } } + // Increment the count for launches attempted during leader activation + public void incrementLaunchAttemptCount() { + if (this.metricContext != null) { + this.launchEventsAttemptedOnActivationCount.inc(); + } + } + private List getRunningJobsCounterForUser(Dag.DagNode dagNode) { Config configs = dagNode.getValue().getJobSpec().getConfig(); String proxy = ConfigUtils.getString(configs, AzkabanProjectConfig.USER_TO_PROXY, null); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java index 78b5446bf7c..f19abee927a 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java @@ -86,10 +86,16 @@ public Optional> createExecutionPlanIfValid(FlowSpec flowS Map flowMetadata = TimingEventUtils.getFlowMetadata(flowSpec); if (!jobExecutionPlanDagOptional.isPresent()) { + log.warn("No dag execution plan created for flowGroup: {} flowName: {}", + flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD), + flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD)); return Optional.absent(); } if (jobExecutionPlanDagOptional.get() == null || jobExecutionPlanDagOptional.get().isEmpty()) { + log.warn("Null or empty dag execution plan created for flowGroup: {} flowName: {}", + flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD), + flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD)); populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, flowMetadata); return Optional.absent(); } From 37654fff5f8525f374bc90aec7ac948146754763 Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Fri, 1 Dec 2023 15:32:52 -0800 Subject: [PATCH 2/5] Change metric to count successful flow compilations --- .../apache/gobblin/metrics/ServiceMetricNames.java | 2 +- .../service/modules/orchestration/DagManager.java | 2 +- .../modules/orchestration/DagManagerMetrics.java | 12 ++++++------ 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java index e41bda2d7bb..a94dafeee0f 100644 --- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java +++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java @@ -51,7 +51,7 @@ public class ServiceMetricNames { public static final String DAG_MANAGER_PREFIX = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "dagManager"; public static final String DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT = DAG_MANAGER_PREFIX + ".failedLaunchEventsOnStartupCount"; - public static final String DAG_MANAGER_LAUNCH_EVENTS_ATTEMPTED_ON_STARTUP_COUNT = DAG_MANAGER_PREFIX + ".launchEventsAttemptedOnStartupCount"; + public static final String DAG_MANAGER_SUCCESSFUL_LAUNCH_EVENTS_ON_STARTUP_COUNT = DAG_MANAGER_PREFIX + ".successfulLaunchEventsOnActivationCount"; public static final String FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT = DAG_MANAGER_PREFIX + ".flowFailedForwardToDagManagerCount"; //Job status poll timer diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index b6b4eafbefb..9529697d6c6 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -498,7 +498,6 @@ public synchronized void setActive(boolean active) { */ public void handleLaunchFlowEvent(DagActionStore.DagAction launchAction) { Preconditions.checkArgument(launchAction.getFlowActionType() == DagActionStore.FlowActionType.LAUNCH); - this.dagManagerMetrics.incrementLaunchAttemptCount(); log.info("Handle launch flow event for action {}", launchAction); FlowId flowId = launchAction.getFlowId(); try { @@ -508,6 +507,7 @@ public void handleLaunchFlowEvent(DagActionStore.DagAction launchAction) { this.flowCompilationValidationHelper.createExecutionPlanIfValid(spec, Optional.absent()); if (optionalJobExecutionPlanDag.isPresent()) { addDag(optionalJobExecutionPlanDag.get(), true, true); + this.dagManagerMetrics.incrementSuccessfulLaunchAttemptCount(); } else { log.warn("Failed flow compilation of spec causing launch flow event to be skipped on startup. Flow {}", flowId); this.dagManagerMetrics.incrementFailedLaunchCount(); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java index b29cd0450a3..73e77265ab8 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java @@ -78,7 +78,7 @@ public class DagManagerMetrics { // Metric for unexpected flow handling failures private ContextAwareCounter failedLaunchEventsOnActivationCount; - private ContextAwareCounter launchEventsAttemptedOnActivationCount; + private ContextAwareCounter successfulLaunchEventsOnActivationCount; MetricContext metricContext; public DagManagerMetrics(MetricContext metricContext) { @@ -108,8 +108,8 @@ public void activate() { failedLaunchEventsOnActivationCount = metricContext.contextAwareCounter( MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT)); - launchEventsAttemptedOnActivationCount = metricContext.contextAwareCounter( - ServiceMetricNames.DAG_MANAGER_LAUNCH_EVENTS_ATTEMPTED_ON_STARTUP_COUNT); + successfulLaunchEventsOnActivationCount = metricContext.contextAwareCounter( + ServiceMetricNames.DAG_MANAGER_SUCCESSFUL_LAUNCH_EVENTS_ON_STARTUP_COUNT); } } @@ -216,10 +216,10 @@ public void incrementFailedLaunchCount() { } } - // Increment the count for launches attempted during leader activation - public void incrementLaunchAttemptCount() { + // Increment the count for num of successful launches attempted during leader activation + public void incrementSuccessfulLaunchAttemptCount() { if (this.metricContext != null) { - this.launchEventsAttemptedOnActivationCount.inc(); + this.successfulLaunchEventsOnActivationCount.inc(); } } From 33ada42a8839c046f2b8171cc53ee51edc254678 Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Fri, 1 Dec 2023 16:00:39 -0800 Subject: [PATCH 3/5] Initialize FlowCatalog before DagManager started --- .../modules/core/GobblinServiceGuiceModule.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java index af0c3461adf..53a2e185a9e 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java @@ -200,6 +200,12 @@ public void configure(Binder binder) { ServiceConfigKeys.TOPOLOGYSPEC_FACTORY_KEY, ServiceConfigKeys.DEFAULT_TOPOLOGY_SPEC_FACTORY)); } + // Initialize flow catalog before DagManager is enabled so templates are loaded when initializing the DagManager + OptionalBinder.newOptionalBinder(binder, FlowCatalog.class); + if (serviceConfig.isFlowCatalogEnabled()) { + binder.bind(FlowCatalog.class); + } + OptionalBinder.newOptionalBinder(binder, DagManager.class); if (serviceConfig.isDagManagerEnabled()) { binder.bind(DagManager.class); @@ -214,11 +220,6 @@ public void configure(Binder binder) { LOGGER.info("No ZooKeeper connection string. Running in single instance mode."); } - OptionalBinder.newOptionalBinder(binder, FlowCatalog.class); - if (serviceConfig.isFlowCatalogEnabled()) { - binder.bind(FlowCatalog.class); - } - if (serviceConfig.isJobStatusMonitorEnabled()) { binder.bind(KafkaJobStatusMonitor.class).toProvider(KafkaJobStatusMonitorFactory.class).in(Singleton.class); } From 028e21bb2496b99833e69f257c54e32229a8b355 Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Fri, 1 Dec 2023 16:20:14 -0800 Subject: [PATCH 4/5] Revert "Initialize FlowCatalog before DagManager started" This reverts commit 33ada42a8839c046f2b8171cc53ee51edc254678. --- .../modules/core/GobblinServiceGuiceModule.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java index 53a2e185a9e..af0c3461adf 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java @@ -200,12 +200,6 @@ public void configure(Binder binder) { ServiceConfigKeys.TOPOLOGYSPEC_FACTORY_KEY, ServiceConfigKeys.DEFAULT_TOPOLOGY_SPEC_FACTORY)); } - // Initialize flow catalog before DagManager is enabled so templates are loaded when initializing the DagManager - OptionalBinder.newOptionalBinder(binder, FlowCatalog.class); - if (serviceConfig.isFlowCatalogEnabled()) { - binder.bind(FlowCatalog.class); - } - OptionalBinder.newOptionalBinder(binder, DagManager.class); if (serviceConfig.isDagManagerEnabled()) { binder.bind(DagManager.class); @@ -220,6 +214,11 @@ public void configure(Binder binder) { LOGGER.info("No ZooKeeper connection string. Running in single instance mode."); } + OptionalBinder.newOptionalBinder(binder, FlowCatalog.class); + if (serviceConfig.isFlowCatalogEnabled()) { + binder.bind(FlowCatalog.class); + } + if (serviceConfig.isJobStatusMonitorEnabled()) { binder.bind(KafkaJobStatusMonitor.class).toProvider(KafkaJobStatusMonitorFactory.class).in(Singleton.class); } From ebbdad32f66bf88292f431c6e093df325664adb7 Mon Sep 17 00:00:00 2001 From: Urmi Mustafi Date: Fri, 1 Dec 2023 16:25:23 -0800 Subject: [PATCH 5/5] Rename metrics to be uniform --- .../gobblin/metrics/ServiceMetricNames.java | 2 +- .../modules/orchestration/DagManager.java | 2 +- .../orchestration/DagManagerMetrics.java | 24 +++++++++++-------- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java index a94dafeee0f..780086c1e69 100644 --- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java +++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java @@ -51,7 +51,7 @@ public class ServiceMetricNames { public static final String DAG_MANAGER_PREFIX = GOBBLIN_SERVICE_PREFIX_WITH_DELIMITER + "dagManager"; public static final String DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT = DAG_MANAGER_PREFIX + ".failedLaunchEventsOnStartupCount"; - public static final String DAG_MANAGER_SUCCESSFUL_LAUNCH_EVENTS_ON_STARTUP_COUNT = DAG_MANAGER_PREFIX + ".successfulLaunchEventsOnActivationCount"; + public static final String DAG_MANAGER_SUCCESSFUL_LAUNCH_EVENTS_ON_STARTUP_COUNT = DAG_MANAGER_PREFIX + ".successfulLaunchEventsOnStartupCount"; public static final String FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT = DAG_MANAGER_PREFIX + ".flowFailedForwardToDagManagerCount"; //Job status poll timer diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index 9529697d6c6..0b2440212b5 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -507,7 +507,7 @@ public void handleLaunchFlowEvent(DagActionStore.DagAction launchAction) { this.flowCompilationValidationHelper.createExecutionPlanIfValid(spec, Optional.absent()); if (optionalJobExecutionPlanDag.isPresent()) { addDag(optionalJobExecutionPlanDag.get(), true, true); - this.dagManagerMetrics.incrementSuccessfulLaunchAttemptCount(); + this.dagManagerMetrics.incrementSuccessfulLaunchCount(); } else { log.warn("Failed flow compilation of spec causing launch flow event to be skipped on startup. Flow {}", flowId); this.dagManagerMetrics.incrementFailedLaunchCount(); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java index 73e77265ab8..79bc5bf1d5d 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java @@ -76,9 +76,9 @@ public class DagManagerMetrics { private final Map executorSlaExceededMeters = Maps.newConcurrentMap(); private final Map executorJobSentMeters = Maps.newConcurrentMap(); - // Metric for unexpected flow handling failures - private ContextAwareCounter failedLaunchEventsOnActivationCount; - private ContextAwareCounter successfulLaunchEventsOnActivationCount; + // Metric for unexpected flow handling outcomes + private ContextAwareCounter failedLaunchEventsOnStartupCount; + private ContextAwareCounter successfulLaunchEventsOnStartupCount; MetricContext metricContext; public DagManagerMetrics(MetricContext metricContext) { @@ -105,10 +105,10 @@ public void activate() { allRunningMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR)); // TODO: remove duplicate use of 'GOBBLIN_SERVICE_PREFIX' once startup functionality is stable - failedLaunchEventsOnActivationCount = metricContext.contextAwareCounter( + failedLaunchEventsOnStartupCount = metricContext.contextAwareCounter( MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.DAG_MANAGER_FAILED_LAUNCH_EVENTS_ON_STARTUP_COUNT)); - successfulLaunchEventsOnActivationCount = metricContext.contextAwareCounter( + successfulLaunchEventsOnStartupCount = metricContext.contextAwareCounter( ServiceMetricNames.DAG_MANAGER_SUCCESSFUL_LAUNCH_EVENTS_ON_STARTUP_COUNT); } } @@ -209,17 +209,21 @@ public void incrementCountsStartSlaExceeded(Dag.DagNode node) } } - // Increment the count for num of failed launches during leader activation + /** + * Increment the count for num of failed launches during system startup + */ public void incrementFailedLaunchCount() { if (this.metricContext != null) { - this.failedLaunchEventsOnActivationCount.inc(); + this.failedLaunchEventsOnStartupCount.inc(); } } - // Increment the count for num of successful launches attempted during leader activation - public void incrementSuccessfulLaunchAttemptCount() { + /** + * Increment the count for num of successful launches during system startup + */ + public void incrementSuccessfulLaunchCount() { if (this.metricContext != null) { - this.successfulLaunchEventsOnActivationCount.inc(); + this.successfulLaunchEventsOnStartupCount.inc(); } }