workUnitsToAdd) {
+ LOGGER.info("Temporal addTasksToCurrentJob");
+ }
+}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/GobblinTemporalJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/GobblinTemporalJobScheduler.java
new file mode 100644
index 00000000000..b2b6fb24fc5
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/GobblinTemporalJobScheduler.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster.temporal;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.gobblin.cluster.*;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.cluster.event.CancelJobConfigArrivalEvent;
+import org.apache.gobblin.cluster.event.DeleteJobConfigArrivalEvent;
+import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
+import org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.JobException;
+import org.apache.gobblin.runtime.JobLauncher;
+import org.apache.gobblin.runtime.listeners.JobListener;
+import org.apache.gobblin.scheduler.JobScheduler;
+import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.PropertiesUtils;
+
+
+/**
+ * An extension to {@link JobScheduler} that schedules and runs
+ * Gobblin jobs on Temporal.
+ *
+ * If the job should be launched from the scheduler node,
+ * {@link GobblinTemporalJobLauncher} is invoked.
+ * TODO(yiyang): this file should be cleaned up with HelixJobScheduler.
+ */
+@Alpha
+public class GobblinTemporalJobScheduler extends JobScheduler implements StandardMetricsBridge {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(GobblinTemporalJobScheduler.class);
+ private static final String COMMON_JOB_PROPS = "gobblin.common.job.props";
+
+ private final Properties commonJobProperties;
+ private final EventBus eventBus;
+ private final Path appWorkDir;
+ private final List extends Tag>> metadataTags;
+ private final ConcurrentHashMap jobRunningMap;
+ private final MetricContext metricContext;
+ final GobblinHelixJobSchedulerMetrics jobSchedulerMetrics;
+ final GobblinHelixJobLauncherMetrics launcherMetrics;
+ final GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics;
+ final HelixJobsMapping jobsMapping;
+ private boolean startServicesCompleted;
+
+ public GobblinTemporalJobScheduler(Config sysConfig,
+ EventBus eventBus,
+ Path appWorkDir, List extends Tag>> metadataTags,
+ SchedulerService schedulerService) throws Exception {
+
+ super(ConfigUtils.configToProperties(sysConfig), schedulerService);
+ this.commonJobProperties = ConfigUtils.configToProperties(ConfigUtils.getConfigOrEmpty(sysConfig, COMMON_JOB_PROPS));
+ this.eventBus = eventBus;
+ this.jobRunningMap = new ConcurrentHashMap<>();
+ this.appWorkDir = appWorkDir;
+ this.metadataTags = metadataTags;
+ this.metricContext = Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(properties), this.getClass());
+
+ int metricsWindowSizeInMin = ConfigUtils.getInt(sysConfig,
+ ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
+ ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
+
+ this.launcherMetrics = new GobblinHelixJobLauncherMetrics("launcherInScheduler",
+ this.metricContext,
+ metricsWindowSizeInMin);
+
+ this.jobSchedulerMetrics = new GobblinHelixJobSchedulerMetrics(this.jobExecutor,
+ this.metricContext,
+ metricsWindowSizeInMin);
+
+ this.jobsMapping = new HelixJobsMapping(ConfigUtils.propertiesToConfig(properties),
+ PathUtils.getRootPath(appWorkDir).toUri(),
+ appWorkDir.toString());
+
+ this.planningJobLauncherMetrics = new GobblinHelixPlanningJobLauncherMetrics("planningLauncherInScheduler",
+ this.metricContext,
+ metricsWindowSizeInMin, this.jobsMapping);
+
+ this.startServicesCompleted = false;
+ }
+
+ @Override
+ public Collection getStandardMetricsCollection() {
+ return ImmutableList.of(this.launcherMetrics,
+ this.jobSchedulerMetrics,
+ this.planningJobLauncherMetrics);
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ this.eventBus.register(this);
+ super.startUp();
+ this.startServicesCompleted = true;
+ }
+
+ @Override
+ public void scheduleJob(Properties jobProps, JobListener jobListener) throws JobException {
+ try {
+ while (!startServicesCompleted) {
+ LOGGER.info("{} service is not fully up, waiting here...", this.getClass().getName());
+ Thread.sleep(1000);
+ }
+
+ scheduleJob(jobProps,
+ jobListener,
+ Maps.newHashMap(),
+ GobblinHelixJob.class);
+
+ } catch (Exception e) {
+ throw new JobException("Failed to schedule job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
+ }
+ }
+
+ @Override
+ protected void startServices() throws Exception {
+
+ boolean cleanAllDistJobs = PropertiesUtils.getPropAsBoolean(this.properties,
+ GobblinClusterConfigurationKeys.CLEAN_ALL_DIST_JOBS,
+ String.valueOf(GobblinClusterConfigurationKeys.DEFAULT_CLEAN_ALL_DIST_JOBS));
+
+ if (cleanAllDistJobs) {
+ for (org.apache.gobblin.configuration.State state : this.jobsMapping.getAllStates()) {
+ String jobUri = state.getId();
+ LOGGER.info("Delete mapping for job " + jobUri);
+ this.jobsMapping.deleteMapping(jobUri);
+ }
+ }
+ }
+
+ @Override
+ public void runJob(Properties jobProps, JobListener jobListener) throws JobException {
+ }
+
+ @Override
+ public GobblinTemporalJobLauncher buildJobLauncher(Properties jobProps)
+ throws Exception {
+ Properties combinedProps = new Properties();
+ combinedProps.putAll(properties);
+ combinedProps.putAll(jobProps);
+
+ return new GobblinTemporalJobLauncher(combinedProps,
+ this.appWorkDir,
+ this.metadataTags,
+ this.jobRunningMap);
+ }
+
+ @Subscribe
+ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
+ String jobUri = newJobArrival.getJobName();
+ LOGGER.info("Received new job configuration of job " + jobUri);
+ try {
+ Properties jobProps = new Properties();
+ jobProps.putAll(this.commonJobProperties);
+ jobProps.putAll(newJobArrival.getJobConfig());
+
+ // set uri so that we can delete this job later
+ jobProps.setProperty(GobblinClusterConfigurationKeys.JOB_SPEC_URI, jobUri);
+
+ this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps);
+
+ if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+ LOGGER.info("Scheduling job " + jobUri);
+ scheduleJob(jobProps,
+ new GobblinHelixJobLauncherListener(this.launcherMetrics));
+ } else {
+ LOGGER.info("No job schedule found, so running job " + jobUri);
+ GobblinHelixJobLauncherListener listener = new GobblinHelixJobLauncherListener(this.launcherMetrics);
+ JobLauncher launcher = buildJobLauncher(newJobArrival.getJobConfig());
+ launcher.launchJob(listener);
+ }
+ } catch (Exception je) {
+ LOGGER.error("Failed to schedule or run job " + jobUri, je);
+ }
+ }
+
+ @Subscribe
+ public void handleUpdateJobConfigArrival(UpdateJobConfigArrivalEvent updateJobArrival) {
+ LOGGER.info("Received update for job configuration of job " + updateJobArrival.getJobName());
+ try {
+ handleDeleteJobConfigArrival(new DeleteJobConfigArrivalEvent(updateJobArrival.getJobName(),
+ updateJobArrival.getJobConfig()));
+ } catch (Exception je) {
+ LOGGER.error("Failed to update job " + updateJobArrival.getJobName(), je);
+ }
+
+ try {
+ handleNewJobConfigArrival(new NewJobConfigArrivalEvent(updateJobArrival.getJobName(),
+ updateJobArrival.getJobConfig()));
+ } catch (Exception je) {
+ LOGGER.error("Failed to update job " + updateJobArrival.getJobName(), je);
+ }
+ }
+
+ private void waitForJobCompletion(String jobName) {
+ while (this.jobRunningMap.getOrDefault(jobName, false)) {
+ LOGGER.info("Waiting for job {} to stop...", jobName);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Interrupted exception encountered: ", e);
+ }
+ }
+ }
+
+ @Subscribe
+ public void handleDeleteJobConfigArrival(DeleteJobConfigArrivalEvent deleteJobArrival) throws InterruptedException {
+ LOGGER.info("Received delete for job configuration of job " + deleteJobArrival.getJobName());
+ try {
+ unscheduleJob(deleteJobArrival.getJobName());
+ } catch (JobException je) {
+ LOGGER.error("Failed to unschedule job " + deleteJobArrival.getJobName());
+ }
+ }
+
+ @Subscribe
+ public void handleCancelJobConfigArrival(CancelJobConfigArrivalEvent cancelJobArrival)
+ throws InterruptedException {
+ }
+
+ /**
+ * This class is responsible for running non-scheduled jobs.
+ */
+ class NonScheduledJobRunner implements Runnable {
+ private final Properties jobProps;
+ private final GobblinHelixJobLauncherListener jobListener;
+ private final Long creationTimeInMillis;
+
+ public NonScheduledJobRunner(Properties jobProps,
+ GobblinHelixJobLauncherListener jobListener) {
+
+ this.jobProps = jobProps;
+ this.jobListener = jobListener;
+ this.creationTimeInMillis = System.currentTimeMillis();
+ }
+
+ @Override
+ public void run() {
+ try {
+ GobblinTemporalJobScheduler.this.jobSchedulerMetrics.updateTimeBeforeJobLaunching(this.jobProps);
+ GobblinTemporalJobScheduler.this.jobSchedulerMetrics.updateTimeBetweenJobSchedulingAndJobLaunching(this.creationTimeInMillis, System.currentTimeMillis());
+ GobblinTemporalJobScheduler.this.runJob(this.jobProps, this.jobListener);
+ } catch (JobException je) {
+ LOGGER.error("Failed to run job " + this.jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
+ }
+ }
+ }
+}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/GobblinTemporalTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/GobblinTemporalTaskRunner.java
new file mode 100644
index 00000000000..74218bc0681
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/GobblinTemporalTaskRunner.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster.temporal;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.gobblin.cluster.*;
+import org.apache.gobblin.cluster.temporal.NestingExecWorker;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.api.client.repackaged.com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.Service;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import io.temporal.client.WorkflowClient;
+import io.temporal.client.WorkflowClientOptions;
+import io.temporal.serviceclient.WorkflowServiceStubs;
+import io.temporal.worker.Worker;
+import io.temporal.worker.WorkerFactory;
+import io.temporal.worker.WorkerOptions;
+import lombok.Getter;
+import lombok.Setter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.cluster.temporal.Shared;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.instrumented.StandardMetricsBridge;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MultiReporterException;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
+import org.apache.gobblin.runtime.api.TaskEventMetadataGenerator;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.FileUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.TaskEventMetadataUtils;
+import org.apache.gobblin.util.event.ContainerHealthCheckFailureEvent;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+import static org.apache.gobblin.cluster.temporal.TemporalWorkflowClientFactory.createClientInstance;
+import static org.apache.gobblin.cluster.temporal.TemporalWorkflowClientFactory.createServiceInstance;
+
+
+/**
+ * The main class running in the containers managing services for running Gobblin
+ * {@link org.apache.gobblin.source.workunit.WorkUnit}s.
+ *
+ *
+ * If for some reason, the container exits or gets killed, the {@link GobblinClusterManager} will
+ * be notified for the completion of the container and will start a new container to replace this one.
+ *
+ *
+ * @author Yinan Li
+ */
+@Alpha
+public class GobblinTemporalTaskRunner implements StandardMetricsBridge {
+ // Working directory key for applications. This config is set dynamically.
+ public static final String CLUSTER_APP_WORK_DIR = GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_PREFIX + "appWorkDir";
+
+ private static final Logger logger = LoggerFactory.getLogger(GobblinTemporalTaskRunner.class);
+
+ static final java.nio.file.Path CLUSTER_CONF_PATH = Paths.get("generated-gobblin-cluster.conf");
+
+ private static TaskRunnerSuiteBase.Builder builder;
+ private final Optional containerMetrics;
+ private final Path appWorkPath;
+ private boolean isTaskDriver;
+ @Getter
+ private volatile boolean started = false;
+ private volatile boolean stopInProgress = false;
+ private volatile boolean isStopped = false;
+ @Getter
+ @Setter
+ private volatile boolean healthCheckFailed = false;
+
+ protected final String taskRunnerId;
+ protected final EventBus eventBus = new EventBus(GobblinTemporalTaskRunner.class.getSimpleName());
+ protected final Config clusterConfig;
+ @Getter
+ protected final FileSystem fs;
+ protected final String applicationName;
+ protected final String applicationId;
+ protected final int temporalWorkerSize;
+ private final boolean isMetricReportingFailureFatal;
+ private final boolean isEventReportingFailureFatal;
+
+ public GobblinTemporalTaskRunner(String applicationName,
+ String applicationId,
+ String taskRunnerId,
+ Config config,
+ Optional appWorkDirOptional) throws Exception {
+ GobblinClusterUtils.setSystemProperties(config);
+
+ //Add dynamic config
+ config = GobblinClusterUtils.addDynamicConfig(config);
+
+ this.isTaskDriver = ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.TASK_DRIVER_ENABLED,false);
+ this.taskRunnerId = taskRunnerId;
+ this.applicationName = applicationName;
+ this.applicationId = applicationId;
+ Configuration conf = HadoopUtils.newConfiguration();
+ this.fs = GobblinClusterUtils.buildFileSystem(config, conf);
+ this.appWorkPath = initAppWorkDir(config, appWorkDirOptional);
+ this.clusterConfig = saveConfigToFile(config);
+
+ logger.info("Configured GobblinTaskRunner work dir to: {}", this.appWorkPath.toString());
+
+ this.containerMetrics = buildContainerMetrics();
+ this.builder = initBuilder();
+ // The default worker size would be 1
+ this.temporalWorkerSize = ConfigUtils.getInt(config, GobblinClusterConfigurationKeys.TEMPORAL_WORKER_SIZE,1);
+
+ this.isMetricReportingFailureFatal = ConfigUtils.getBoolean(this.clusterConfig,
+ ConfigurationKeys.GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL,
+ ConfigurationKeys.DEFAULT_GOBBLIN_TASK_METRIC_REPORTING_FAILURE_FATAL);
+
+ this.isEventReportingFailureFatal = ConfigUtils.getBoolean(this.clusterConfig,
+ ConfigurationKeys.GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL,
+ ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL);
+
+ logger.info("GobblinTaskRunner({}): applicationName {}, applicationId {}, taskRunnerId {}, config {}, appWorkDir {}",
+ this.isTaskDriver ? "taskDriver" : "worker",
+ applicationName,
+ applicationId,
+ taskRunnerId,
+ config,
+ appWorkDirOptional);
+ }
+
+ public static TaskRunnerSuiteBase.Builder getBuilder() {
+ return builder;
+ }
+
+ private TaskRunnerSuiteBase.Builder initBuilder() throws ReflectiveOperationException {
+ String builderStr = ConfigUtils.getString(this.clusterConfig,
+ GobblinClusterConfigurationKeys.TASK_RUNNER_SUITE_BUILDER,
+ TaskRunnerSuiteBase.Builder.class.getName());
+
+ String hostName = "";
+ try {
+ hostName = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ logger.warn("Cannot find host name for Helix instance: {}");
+ }
+
+ TaskRunnerSuiteBase.Builder builder = GobblinConstructorUtils.invokeLongestConstructor(
+ new ClassAliasResolver(TaskRunnerSuiteBase.Builder.class)
+ .resolveClass(builderStr), this.clusterConfig);
+
+ return builder.setAppWorkPath(this.appWorkPath)
+ .setContainerMetrics(this.containerMetrics)
+ .setFileSystem(this.fs)
+ .setApplicationId(applicationId)
+ .setApplicationName(applicationName)
+ .setContainerId(taskRunnerId)
+ .setHostName(hostName);
+ }
+
+ private Path initAppWorkDir(Config config, Optional appWorkDirOptional) {
+ return appWorkDirOptional.isPresent() ? appWorkDirOptional.get() : GobblinClusterUtils
+ .getAppWorkDirPathFromConfig(config, this.fs, this.applicationName, this.applicationId);
+ }
+
+ private Config saveConfigToFile(Config config)
+ throws IOException {
+ Config newConf = config
+ .withValue(CLUSTER_APP_WORK_DIR, ConfigValueFactory.fromAnyRef(this.appWorkPath.toString()));
+ ConfigUtils configUtils = new ConfigUtils(new FileUtils());
+ configUtils.saveConfigToFile(newConf, CLUSTER_CONF_PATH);
+ return newConf;
+ }
+
+ /**
+ * Start this {@link GobblinTemporalTaskRunner} instance.
+ */
+ public void start()
+ throws ContainerHealthCheckException {
+ logger.info("Calling start method in GobblinTemporalTaskRunner");
+ logger.info(String.format("Starting in container %s", this.taskRunnerId));
+
+ // Start metric reporting
+ initMetricReporter();
+
+ // Add a shutdown hook so the task scheduler gets properly shutdown
+ addShutdownHook();
+
+ try {
+ for (int i = 0; i < this.temporalWorkerSize; i++) {
+ initiateWorker();
+ }
+ }catch (Exception e) {
+ logger.info(e + " for initiate workers");
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void initiateWorker() throws Exception{
+ logger.info("Starting Temporal Worker");
+
+ WorkflowServiceStubs service = createServiceInstance();
+ WorkflowClient client = createClientInstance(service);
+
+ WorkerOptions workerOptions = WorkerOptions.newBuilder()
+ .setMaxConcurrentWorkflowTaskExecutionSize(1)
+ .setMaxConcurrentActivityExecutionSize(1)
+ .build();
+
+ NestingExecWorker worker = new NestingExecWorker(client, Shared.GOBBLIN_TEMPORAL_TASK_QUEUE);
+ worker.start();
+ logger.info("A new worker is started.");
+ }
+
+ private void initMetricReporter() {
+ if (this.containerMetrics.isPresent()) {
+ try {
+ this.containerMetrics.get()
+ .startMetricReportingWithFileSuffix(ConfigUtils.configToState(this.clusterConfig), this.taskRunnerId);
+ } catch (MultiReporterException ex) {
+ if (MetricReportUtils.shouldThrowException(logger, ex, this.isMetricReportingFailureFatal, this.isEventReportingFailureFatal)) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+ }
+
+ public synchronized void stop() {
+ if (this.isStopped) {
+ logger.info("Gobblin Task runner is already stopped.");
+ return;
+ }
+
+ if (this.stopInProgress) {
+ logger.info("Gobblin Task runner stop already in progress.");
+ return;
+ }
+
+ this.stopInProgress = true;
+
+ logger.info("Stopping the Gobblin Task runner");
+
+ // Stop metric reporting
+ if (this.containerMetrics.isPresent()) {
+ this.containerMetrics.get().stopMetricsReporting();
+ }
+
+ logger.info("All services are stopped.");
+
+ this.isStopped = true;
+ }
+
+ /**
+ * Creates and returns a {@link List} of additional {@link Service}s that should be run in this
+ * {@link GobblinTemporalTaskRunner}. Sub-classes that need additional {@link Service}s to run, should override this method
+ *
+ * @return a {@link List} of additional {@link Service}s to run.
+ */
+ protected List getServices() {
+ List serviceList = new ArrayList<>();
+ if (ConfigUtils.getBoolean(this.clusterConfig, GobblinClusterConfigurationKeys.CONTAINER_HEALTH_METRICS_SERVICE_ENABLED,
+ GobblinClusterConfigurationKeys.DEFAULT_CONTAINER_HEALTH_METRICS_SERVICE_ENABLED)) {
+ serviceList.add(new ContainerHealthMetricsService(clusterConfig));
+ }
+ return serviceList;
+ }
+
+ @VisibleForTesting
+ boolean isStopped() {
+ return this.isStopped;
+ }
+
+ private void addShutdownHook() {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+
+ @Override
+ public void run() {
+ logger.info("Running the shutdown hook");
+ GobblinTemporalTaskRunner.this.stop();
+ }
+ });
+ }
+
+ private Optional buildContainerMetrics() {
+ Properties properties = ConfigUtils.configToProperties(this.clusterConfig);
+ if (GobblinMetrics.isEnabled(properties)) {
+ logger.info("Container metrics are enabled");
+ return Optional.of(ContainerMetrics
+ .get(ConfigUtils.configToState(clusterConfig), this.applicationName, this.taskRunnerId));
+ } else {
+ return Optional.absent();
+ }
+ }
+
+ // hard coded for now
+ @Override
+ public Collection getStandardMetricsCollection() {
+ return null;
+ }
+
+ @Subscribe
+ public void handleContainerHealthCheckFailureEvent(ContainerHealthCheckFailureEvent event) {
+ logger.error("Received {} from: {}", event.getClass().getSimpleName(), event.getClassName());
+ logger.error("Submitting a ContainerHealthCheckFailureEvent..");
+ submitEvent(event);
+ logger.error("Stopping GobblinTaskRunner...");
+ GobblinTemporalTaskRunner.this.setHealthCheckFailed(true);
+ GobblinTemporalTaskRunner.this.stop();
+ }
+
+ private void submitEvent(ContainerHealthCheckFailureEvent event) {
+ EventSubmitter eventSubmitter = new EventSubmitter.Builder(RootMetricContext.get(), getClass().getPackage().getName()).build();
+ GobblinEventBuilder eventBuilder = new GobblinEventBuilder(event.getClass().getSimpleName());
+ State taskState = ConfigUtils.configToState(event.getConfig());
+ //Add task metadata such as taskId, containerId, and workflowId if configured
+ TaskEventMetadataGenerator taskEventMetadataGenerator = TaskEventMetadataUtils.getTaskEventMetadataGenerator(taskState);
+ eventBuilder.addAdditionalMetadata(taskEventMetadataGenerator.getMetadata(taskState, event.getClass().getSimpleName()));
+ eventBuilder.addAdditionalMetadata(event.getMetadata());
+ eventSubmitter.submit(eventBuilder);
+ }
+
+ private static String getApplicationId() {
+ return "1";
+ }
+
+ private static String getTaskRunnerId() {
+ return UUID.randomUUID().toString();
+ }
+
+ public static Options buildOptions() {
+ Options options = new Options();
+ options.addOption("a", GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME, true,
+ "Application name");
+ options.addOption("d", GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME, true,
+ "Application id");
+ options.addOption("i", GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME, true,
+ "Helix instance name");
+ options.addOption(Option.builder("t").longOpt(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
+ .hasArg(true).required(false).desc("Helix instance tags").build());
+ return options;
+ }
+
+ public static void printUsage(Options options) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(GobblinClusterManager.class.getSimpleName(), options);
+ }
+
+ public static void main(String[] args)
+ throws Exception {
+ Options options = buildOptions();
+ try {
+ CommandLine cmd = new DefaultParser().parse(options, args);
+ if (!cmd.hasOption(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)) {
+ printUsage(options);
+ System.exit(1);
+ }
+
+ logger.info(JvmUtils.getJvmInputArguments());
+
+ String applicationName =
+ cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME);
+ GobblinTemporalTaskRunner gobblinWorkUnitRunner =
+ new GobblinTemporalTaskRunner(applicationName, getApplicationId(),
+ getTaskRunnerId(), ConfigFactory.load(), Optional.absent());
+ gobblinWorkUnitRunner.start();
+ } catch (ParseException pe) {
+ printUsage(options);
+ System.exit(1);
+ }
+ }
+}
+
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/IllustrationTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/IllustrationTask.java
new file mode 100644
index 00000000000..43f1e6bd6f0
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/IllustrationTask.java
@@ -0,0 +1,16 @@
+package org.apache.gobblin.cluster.temporal;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+
+/** Generally, this would specify what "work" needs performing plus how to perform; now just a unique name (to log) */
+@Data
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@RequiredArgsConstructor
+public class IllustrationTask {
+ @NonNull
+ private String name;
+}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/IllustrationTaskActivity.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/IllustrationTaskActivity.java
new file mode 100644
index 00000000000..d24217fe45f
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/IllustrationTaskActivity.java
@@ -0,0 +1,16 @@
+package org.apache.gobblin.cluster.temporal;
+
+import io.temporal.activity.ActivityInterface;
+import io.temporal.activity.ActivityMethod;
+/**
+ * Activity for processing {@link IllustrationTask}s
+ *
+ * CAUTION/FINDING: an `@ActivityInterface` must not be parameterized (e.g. here, by TASK), as doing so results in:
+ * io.temporal.failure.ApplicationFailure: message='class java.util.LinkedHashMap cannot be cast to class
+ * com.linkedin.temporal.app.work.IllustrationTask', type='java.lang.ClassCastException'
+ */
+@ActivityInterface
+public interface IllustrationTaskActivity {
+ @ActivityMethod
+ String doTask(IllustrationTask task);
+}
\ No newline at end of file
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/IllustrationTaskActivityImpl.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/IllustrationTaskActivityImpl.java
new file mode 100644
index 00000000000..c170829f09e
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/IllustrationTaskActivityImpl.java
@@ -0,0 +1,12 @@
+package org.apache.gobblin.cluster.temporal;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class IllustrationTaskActivityImpl implements IllustrationTaskActivity {
+ @Override
+ public String doTask(final IllustrationTask task) {
+ log.info("Now performing - '" + task.getName() + "'");
+ return task.getName();
+ }
+}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/NestingExecWorker.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/NestingExecWorker.java
new file mode 100644
index 00000000000..327a28c8d91
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/NestingExecWorker.java
@@ -0,0 +1,21 @@
+package org.apache.gobblin.cluster.temporal;
+
+import io.temporal.client.WorkflowClient;
+import io.temporal.worker.Worker;
+import io.temporal.worker.WorkerOptions;
+import io.temporal.worker.WorkerFactory;
+public class NestingExecWorker extends AbstractTemporalWorker{
+ public NestingExecWorker(WorkflowClient workflowClient, String queueName) {
+ super(workflowClient, queueName);
+ }
+
+ @Override
+ protected Class>[] getWorkflowImplClasses() {
+ return new Class[] { NestingExecWorkflowImpl.class };
+ }
+
+ @Override
+ protected Object[] getActivityImplInstances() {
+ return new Object[] { new IllustrationTaskActivityImpl() };
+ }
+}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/NestingExecWorkflow.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/NestingExecWorkflow.java
new file mode 100644
index 00000000000..8887d71b74c
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/NestingExecWorkflow.java
@@ -0,0 +1,30 @@
+package org.apache.gobblin.cluster.temporal;
+
+import io.temporal.workflow.WorkflowInterface;
+import io.temporal.workflow.WorkflowMethod;
+import java.util.Optional;
+
+/**
+ * Process all `TASK`s of `workload`, from `startIndex` to the end by creating child workflows, where this and
+ * descendants should have at most `maxBranchesPerTree`, with at most `maxSubTreesPerTree` of those being child
+ * workflows. (Non-child-workflow branches being activities.)
+ *
+ * IMPORTANT: `Math.sqrt(maxBranchesPerTree) == maxSubTreesPerTree` provides a good rule-of-thumb; `maxSubTreesPerTree
+ * should not exceed that.
+ *
+ * @param the type of task for which to invoke an appropriate activity
+ * @param maxSubTreesForCurrentTreeOverride when the current tree should use different max sub-trees than descendants
+ */
+
+@WorkflowInterface
+public interface NestingExecWorkflow {
+ @WorkflowMethod
+ int performWork(
+ WFAddr addr,
+ Workload workload,
+ int startIndex,
+ int maxBranchesPerTree,
+ int maxSubTreesPerTree,
+ Optional maxSubTreesForCurrentTreeOverride
+ );
+}
\ No newline at end of file
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/NestingExecWorkflowImpl.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/NestingExecWorkflowImpl.java
new file mode 100644
index 00000000000..4f16c1101cc
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/NestingExecWorkflowImpl.java
@@ -0,0 +1,34 @@
+package org.apache.gobblin.cluster.temporal;
+
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+import io.temporal.workflow.Async;
+import io.temporal.workflow.Promise;
+import io.temporal.workflow.Workflow;
+import java.time.Duration;
+
+/** {@link com.linkedin.temporal.app.workflow.nesting.NestingExecWorkflow} for {@link IllustrationTask} */
+public class NestingExecWorkflowImpl
+ extends AbstractNestingExecWorkflowImpl {
+
+ // RetryOptions specify how to automatically handle retries when Activities fail.
+ private static final RetryOptions ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder()
+ .setInitialInterval(Duration.ofSeconds(1))
+ .setMaximumInterval(Duration.ofSeconds(100))
+ .setBackoffCoefficient(2)
+ .setMaximumAttempts(3)
+ .build();
+
+ private static final ActivityOptions ACTIVITY_OPTS = ActivityOptions.newBuilder()
+ .setStartToCloseTimeout(Duration.ofSeconds(10))
+ .setRetryOptions(ACTIVITY_RETRY_OPTS)
+ .build();
+
+ private final IllustrationTaskActivity activityStub =
+ Workflow.newActivityStub(IllustrationTaskActivity.class, ACTIVITY_OPTS);
+
+ @Override
+ protected Promise launchAsyncActivity(final IllustrationTask t) {
+ return Async.function(activityStub::doTask, t);
+ }
+}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/Shared.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/Shared.java
new file mode 100644
index 00000000000..3d7e51a459e
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/Shared.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster.temporal;
+
+public interface Shared {
+
+ // Define the task queue name
+ final String GOBBLIN_TEMPORAL_TASK_QUEUE = "GobblinTemporalTaskQueue";
+}
\ No newline at end of file
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/SimpleGeneratedWorkload.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/SimpleGeneratedWorkload.java
new file mode 100644
index 00000000000..93461bfe27d
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/SimpleGeneratedWorkload.java
@@ -0,0 +1,44 @@
+package org.apache.gobblin.cluster.temporal;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import lombok.AccessLevel;
+
+/** Example, illustration workload that synthesizes tasks; genuine {@link Workload}s likely arise from query/calc */
+@lombok.AllArgsConstructor(access = AccessLevel.PRIVATE)
+@lombok.NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@lombok.ToString
+public class SimpleGeneratedWorkload implements Workload {
+ private int numTasks;
+
+ /** Factory method */
+ public static SimpleGeneratedWorkload createAs(final int numTasks) {
+ return new SimpleGeneratedWorkload(numTasks);
+ }
+
+ @Override
+ public Optional> getSpan(final int startIndex, final int numElements) {
+ if (startIndex >= numTasks || startIndex < 0) {
+ return Optional.empty();
+ } else {
+ List elems = IntStream.range(startIndex, Math.min(startIndex + numElements, numTasks))
+ .mapToObj(n -> new IllustrationTask("task-" + n + "-of-" + numTasks))
+ .collect(Collectors.toList());
+ return Optional.of(new CollectionBackedTaskSpan<>(elems, startIndex));
+ }
+ }
+
+ @Override
+ public boolean isIndexKnownToExceed(final int index) {
+ return isDefiniteSize() && index >= numTasks;
+ }
+
+ @Override
+ @JsonIgnore // (because no-arg method resembles 'java bean property')
+ public boolean isDefiniteSize() {
+ return true;
+ }
+}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/TemporalWorkflowClientFactory.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/TemporalWorkflowClientFactory.java
new file mode 100644
index 00000000000..505d7b4d158
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/TemporalWorkflowClientFactory.java
@@ -0,0 +1,91 @@
+package org.apache.gobblin.cluster.temporal;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
+import io.temporal.client.WorkflowClient;
+import io.temporal.client.WorkflowClientOptions;
+import io.temporal.serviceclient.WorkflowServiceStubs;
+import io.temporal.serviceclient.WorkflowServiceStubsOptions;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.File;
+import java.security.KeyStore;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.gobblin.security.ssl.SSLContextFactory.toInputStream;
+
+public class TemporalWorkflowClientFactory {
+ public static WorkflowServiceStubs createServiceInstance() throws Exception {
+ GobblinClusterUtils.setSystemProperties(ConfigFactory.load());
+ Config config = GobblinClusterUtils.addDynamicConfig(ConfigFactory.load());
+ String SHARED_KAFKA_CONFIG_PREFIX_WITH_DOT = "gobblin.kafka.sharedConfig.";
+ String SSL_KEYMANAGER_ALGORITHM = SHARED_KAFKA_CONFIG_PREFIX_WITH_DOT + "ssl.keymanager.algorithm";
+ String SSL_KEYSTORE_TYPE = SHARED_KAFKA_CONFIG_PREFIX_WITH_DOT + "ssl.keystore.type";
+ String SSL_KEYSTORE_LOCATION = SHARED_KAFKA_CONFIG_PREFIX_WITH_DOT + "ssl.keystore.location";
+ String SSL_KEY_PASSWORD = SHARED_KAFKA_CONFIG_PREFIX_WITH_DOT + "ssl.key.password";
+ String SSL_TRUSTSTORE_LOCATION = SHARED_KAFKA_CONFIG_PREFIX_WITH_DOT + "ssl.truststore.location";
+ String SSL_TRUSTSTORE_PASSWORD = SHARED_KAFKA_CONFIG_PREFIX_WITH_DOT + "ssl.truststore.password";
+
+ List SSL_CONFIG_DEFAULT_SSL_PROTOCOLS = Collections.unmodifiableList(
+ Arrays.asList("TLSv1.2"));
+ List SSL_CONFIG_DEFAULT_CIPHER_SUITES = Collections.unmodifiableList(Arrays.asList(
+ // The following list is from https://github.com/netty/netty/blob/4.1/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2SecurityUtil.java#L50
+ "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
+
+ /* REQUIRED BY HTTP/2 SPEC */
+ "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
+ /* REQUIRED BY HTTP/2 SPEC */
+
+ "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384",
+ "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384",
+ "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256",
+ "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256"
+ ));
+
+ String keyStoreType = config.getString(SSL_KEYSTORE_TYPE);
+ File keyStoreFile = new File(config.getString(SSL_KEYSTORE_LOCATION));
+ String keyStorePassword = config.getString(SSL_KEY_PASSWORD);
+
+ KeyStore keyStore = KeyStore.getInstance(keyStoreType);
+ keyStore.load(toInputStream(keyStoreFile), keyStorePassword.toCharArray());
+
+ // Set key manager from key store
+ String sslKeyManagerAlgorithm = config.getString(SSL_KEYMANAGER_ALGORITHM);
+ KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(sslKeyManagerAlgorithm);
+ keyManagerFactory.init(keyStore, keyStorePassword.toCharArray());
+
+ // Set trust manager from trust store
+ KeyStore trustStore = KeyStore.getInstance("JKS");
+ File trustStoreFile = new File(config.getString(SSL_TRUSTSTORE_LOCATION));
+
+ String trustStorePassword = config.getString(SSL_TRUSTSTORE_PASSWORD);
+ trustStore.load(toInputStream(trustStoreFile), trustStorePassword.toCharArray());
+ TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("SunX509");
+ trustManagerFactory.init(trustStore);
+
+ SslContext sslContext = GrpcSslContexts.forClient()
+ .keyManager(keyManagerFactory)
+ .trustManager(trustManagerFactory)
+ .protocols(SSL_CONFIG_DEFAULT_SSL_PROTOCOLS)
+ .ciphers(SSL_CONFIG_DEFAULT_CIPHER_SUITES)
+ .build();
+
+ WorkflowServiceStubsOptions options = WorkflowServiceStubsOptions.newBuilder()
+ .setTarget("1.nephos-temporal.corp-lca1.atd.corp.linkedin.com:7233")
+ .setEnableHttps(true)
+ .setSslContext(sslContext)
+ .build();
+ return WorkflowServiceStubs.newServiceStubs(options);
+ }
+
+ public static WorkflowClient createClientInstance(WorkflowServiceStubs service) {
+ WorkflowClientOptions options = WorkflowClientOptions.newBuilder().setNamespace("gobblin-fastingest-internpoc").build();
+ return WorkflowClient.newInstance(service, options);
+ }
+}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/WFAddr.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/WFAddr.java
new file mode 100644
index 00000000000..e7de7f347c0
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/WFAddr.java
@@ -0,0 +1,48 @@
+package org.apache.gobblin.cluster.temporal;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+
+/** Hierarchical address for nesting workflows (0-based). */
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@RequiredArgsConstructor
+public class WFAddr {
+ public static final String SEP = ".";
+
+ /** initial, top-level address */
+ public static final WFAddr ROOT = new WFAddr(0);
+
+ @Getter
+ @NonNull // IMPORTANT: for jackson (de)serialization (which won't permit `final`)
+ private List segments;
+
+ public WFAddr(final int firstLevelOnly) {
+ this(Lists.newArrayList(firstLevelOnly));
+ }
+
+ /** @return 0-based depth */
+ @JsonIgnore // (because no-arg method resembles 'java bean property')
+ public int getDepth() {
+ return segments.size() - 1;
+ }
+
+ /** Create a child of the current `WFAddr` */
+ public WFAddr createChild(int childLevel) {
+ final List copy = new ArrayList<>(segments);
+ copy.add(childLevel);
+ return new WFAddr(copy);
+ }
+
+ @Override
+ public String toString() {
+ return Joiner.on(SEP).join(segments);
+ }
+}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/Workload.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/Workload.java
new file mode 100644
index 00000000000..2957eeb2a7f
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/temporal/Workload.java
@@ -0,0 +1,37 @@
+package org.apache.gobblin.cluster.temporal;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import java.util.Iterator;
+import java.util.Optional;
+
+
+/**
+ * An assemblage of "work", modeled as sequential "task" specifications. Given Temporal's required determinism, tasks
+ * and task spans should remain unchanged, with stable sequential ordering. This need not constrain `Workload`s to
+ * eager, advance elaboration: "streaming" definition is possible, so long as producing a deterministic result.
+ *
+ * A actual, real-world workload might correspond to datastore contents, such as records serialized into HDFS files
+ * or ordered DB query results.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class") // to handle impls
+
+public interface Workload {
+
+ /**
+ * @return a sequential sub-sequence, from `startIndex` (0-based), unless it falls beyond the underlying sequence
+ * NOTE: this is a blocking call that forces elaboration: `TaskSpan.getNumElems() < numElements` signifies end of seq
+ */
+ Optional> getSpan(int startIndex, int numElements);
+
+ /** Non-blocking, best-effort advice: to support non-strict elaboration, does NOT guarantee `index` will not exceed */
+ boolean isIndexKnownToExceed(int index);
+
+ default boolean isDefiniteSize() {
+ return false;
+ }
+
+ /** Logical sub-sequence 'slice' of contiguous "tasks" */
+ public interface TaskSpan extends Iterator {
+ int getNumElems();
+ }
+}
\ No newline at end of file
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/security/ssl/SSLContextFactory.java b/gobblin-core/src/main/java/org/apache/gobblin/security/ssl/SSLContextFactory.java
index e7677fe76fc..a940090cf99 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/security/ssl/SSLContextFactory.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/security/ssl/SSLContextFactory.java
@@ -119,7 +119,7 @@ public static SSLContext createInstance(Config srcConfig) {
new File(trustStoreFilePath), trustStorePassword);
}
- private static InputStream toInputStream(File storeFile)
+ public static InputStream toInputStream(File storeFile)
throws IOException {
byte[] data = FileUtils.readFileToByteArray(storeFile);
return new ByteArrayInputStream(data);
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
index 7a851e4a15e..bb8186f22f0 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
@@ -211,6 +211,7 @@ public void submitEvent(GobblinTrackingEvent nonReusableEvent) {
EventNotification notification = new EventNotification(nonReusableEvent);
sendNotification(notification);
+ LOG.info("EventBuilder {} is submitted.", nonReusableEvent);
}
/**
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
index 891f980b7f9..8fee12918fd 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
@@ -27,6 +27,8 @@
import org.apache.gobblin.metrics.MetricContext;
import lombok.Getter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -38,7 +40,7 @@
*
*/
public class EventSubmitter {
-
+ private static final Logger LOGGER = LoggerFactory.getLogger(EventSubmitter.class);
public static final String EVENT_TYPE = "eventType";
private final Map metadata;
diff --git a/gobblin-yarn/build.gradle b/gobblin-yarn/build.gradle
index 0221c01b4ef..8594245a67f 100644
--- a/gobblin-yarn/build.gradle
+++ b/gobblin-yarn/build.gradle
@@ -59,7 +59,7 @@ dependencies {
compile (externalDependency.helix) {
exclude group: 'io.dropwizard.metrics', module: 'metrics-core'
}
-
+ compile externalDependency."temporal-sdk"
testCompile project(path: ':gobblin-cluster', configuration: 'tests')
testCompile project(":gobblin-example")
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 48ac8947972..b761138593c 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -38,6 +38,7 @@
import org.apache.avro.Schema;
import org.apache.commons.io.FileUtils;
import org.apache.commons.mail.EmailException;
+import org.apache.gobblin.yarn.GobblinTemporalApplicationMaster;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -801,23 +802,25 @@ private void addJobConfPackage(String jobConfPackagePath, Path destDir, Map").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
- appMasterClassName).append(".").append(ApplicationConstants.STDOUT)
+ gobblinAppMasterClassName).append(".").append(ApplicationConstants.STDOUT)
.append(" 2>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
- appMasterClassName).append(".").append(ApplicationConstants.STDERR)
+ gobblinAppMasterClassName).append(".").append(ApplicationConstants.STDERR)
.toString();
}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index 7088dfa996c..490ed72b94f 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -151,4 +151,5 @@ public class GobblinYarnConfigurationKeys {
//Config to control Heartbeat interval for Yarn AMRM client.
public static final String AMRM_HEARTBEAT_INTERVAL_SECS = GOBBLIN_YARN_PREFIX + "amRmHeartbeatIntervalSecs";
public static final Integer DEFAULT_AMRM_HEARTBEAT_INTERVAL_SECS = 15;
+ public static final String TEMPORAL_WORKERPOOL_SIZE = "temporal.workerpool.size";
}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/temporal/GobblinTemporalApplicationMaster.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/temporal/GobblinTemporalApplicationMaster.java
new file mode 100644
index 00000000000..716d53f5de2
--- /dev/null
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/temporal/GobblinTemporalApplicationMaster.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.gobblin.cluster.GobblinClusterManager;
+import org.apache.gobblin.cluster.temporal.GobblinTemporalClusterManager;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Service;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
+import org.apache.gobblin.util.logs.LogCopier;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.gobblin.yarn.YarnTemporalService;
+import org.apache.gobblin.yarn.YarnTemporalAppMasterSecurityManager;
+
+/**
+ * The Yarn ApplicationMaster class for Gobblin using Temporal.
+ *
+ *
+ * This class runs the {@link YarnTemporalService} for all Yarn-related stuffs like ApplicationMaster registration
+ * and un-registration and Yarn container provisioning.
+ *
+ *
+ * @author Yinan Li
+ */
+@Alpha
+public class GobblinTemporalApplicationMaster extends GobblinTemporalClusterManager {
+ private static final Logger LOGGER = LoggerFactory.getLogger(GobblinTemporalApplicationMaster.class);
+
+ @Getter
+ private final YarnTemporalService _yarnTemporalService;
+ private LogCopier logCopier;
+
+ public GobblinTemporalApplicationMaster(String applicationName, String applicationId, ContainerId containerId, Config config,
+ YarnConfiguration yarnConfiguration) throws Exception {
+ super(applicationName, applicationId, config.withValue(GobblinYarnConfigurationKeys.CONTAINER_NUM_KEY,
+ ConfigValueFactory.fromAnyRef(YarnHelixUtils.getContainerNum(containerId.toString()))),
+ Optional.absent());
+
+ String containerLogDir = config.getString(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY);
+ GobblinYarnLogSource gobblinYarnLogSource = new GobblinYarnLogSource();
+ if (gobblinYarnLogSource.isLogSourcePresent()) {
+ Path appWorkDir = PathUtils.combinePaths(containerLogDir, GobblinClusterUtils.getAppWorkDirPath(this.clusterName, this.applicationId), "AppMaster");
+ logCopier = gobblinYarnLogSource.buildLogCopier(this.config, containerId.toString(), this.fs, appWorkDir);
+ this.applicationLauncher
+ .addService(logCopier);
+ }
+ YarnHelixUtils.setYarnClassPath(config, yarnConfiguration);
+ YarnHelixUtils.setAdditionalYarnClassPath(config, yarnConfiguration);
+ this._yarnTemporalService = buildTemporalYarnService(this.config, applicationName, this.applicationId, yarnConfiguration, this.fs);
+ this.applicationLauncher.addService(this._yarnTemporalService);
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ LOGGER.info("Adding YarnContainerSecurityManager since security is enabled");
+ this.applicationLauncher.addService(buildYarnContainerSecurityManager(this.config, this.fs));
+ }
+
+ // Add additional services
+ List serviceClassNames = ConfigUtils.getStringList(this.config,
+ GobblinYarnConfigurationKeys.APP_MASTER_SERVICE_CLASSES);
+
+ for (String serviceClassName : serviceClassNames) {
+ Class> serviceClass = Class.forName(serviceClassName);
+ this.applicationLauncher.addService((Service) GobblinConstructorUtils.invokeLongestConstructor(serviceClass, this));
+ }
+ }
+
+ /**
+ * Build the {@link YarnTemporalService} for the Application Master.
+ */
+ protected YarnTemporalService buildTemporalYarnService(Config config, String applicationName, String applicationId,
+ YarnConfiguration yarnConfiguration, FileSystem fs)
+ throws Exception {
+ return new YarnTemporalService(config, applicationName, applicationId, yarnConfiguration, fs, this.eventBus);
+ }
+
+ /**
+ * Build the {@link YarnAppMasterSecurityManager} for the Application Master.
+ */
+ private YarnContainerSecurityManager buildYarnContainerSecurityManager(Config config, FileSystem fs) {
+ return new YarnTemporalAppMasterSecurityManager(config, fs, this.eventBus, this.logCopier, this._yarnTemporalService);
+ }
+
+ private static Options buildOptions() {
+ Options options = new Options();
+ options.addOption("a", GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME, true, "Yarn application name");
+ options.addOption("d", GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME, true, "Yarn application id");
+ return options;
+ }
+
+ private static void printUsage(Options options) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(GobblinTemporalApplicationMaster.class.getSimpleName(), options);
+ }
+
+ public static void main(String[] args) throws Exception {
+ Options options = buildOptions();
+ try {
+ CommandLine cmd = new DefaultParser().parse(options, args);
+ if (!cmd.hasOption(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME) ||
+ (!cmd.hasOption(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME))) {
+ printUsage(options);
+ System.exit(1);
+ }
+
+ //Because AM is restarted with the original AppSubmissionContext, it may have outdated delegation tokens.
+ //So the refreshed tokens should be added into the container's UGI before any HDFS/Hive/RM access is performed.
+ YarnHelixUtils.updateToken(GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
+
+ Log4jConfigurationHelper.updateLog4jConfiguration(GobblinTemporalApplicationMaster.class,
+ GobblinYarnConfigurationKeys.GOBBLIN_YARN_LOG4J_CONFIGURATION_FILE,
+ GobblinYarnConfigurationKeys.GOBBLIN_YARN_LOG4J_CONFIGURATION_FILE);
+
+ LOGGER.info(JvmUtils.getJvmInputArguments());
+
+ ContainerId containerId =
+ ConverterUtils.toContainerId(System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.key()));
+
+ try (GobblinTemporalApplicationMaster applicationMaster = new GobblinTemporalApplicationMaster(
+ cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME),
+ cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME), containerId,
+ ConfigFactory.load(), new YarnConfiguration())) {
+
+ applicationMaster.start();
+ }
+ } catch (ParseException pe) {
+ printUsage(options);
+ System.exit(1);
+ }
+ }
+}
+
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/temporal/GobblinTemporalYarnTaskRunner.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/temporal/GobblinTemporalYarnTaskRunner.java
new file mode 100644
index 00000000000..26f57f4db2b
--- /dev/null
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/temporal/GobblinTemporalYarnTaskRunner.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Service;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.GobblinTaskRunner;
+import org.apache.gobblin.cluster.temporal.GobblinTemporalTaskRunner;
+import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
+import org.apache.gobblin.util.logs.LogCopier;
+
+
+public class GobblinTemporalYarnTaskRunner extends GobblinTemporalTaskRunner {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(GobblinTaskRunner.class);
+ public GobblinTemporalYarnTaskRunner(String applicationName, String applicationId, ContainerId containerId, Config config,
+ Optional appWorkDirOptional) throws Exception {
+ super(applicationName, applicationId, getTaskRunnerId(containerId), config
+ .withValue(GobblinYarnConfigurationKeys.CONTAINER_NUM_KEY,
+ ConfigValueFactory.fromAnyRef(YarnHelixUtils.getContainerNum(containerId.toString()))), appWorkDirOptional);
+ }
+
+ @Override
+ public List getServices() {
+ List services = new ArrayList<>();
+ services.addAll(super.getServices());
+ LogCopier logCopier = null;
+ if (clusterConfig.hasPath(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY)) {
+ GobblinYarnLogSource gobblinYarnLogSource = new GobblinYarnLogSource();
+ String containerLogDir = clusterConfig.getString(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY);
+
+ if (gobblinYarnLogSource.isLogSourcePresent()) {
+ try {
+ logCopier = gobblinYarnLogSource.buildLogCopier(this.clusterConfig, this.taskRunnerId, this.fs,
+ new Path(containerLogDir, GobblinClusterUtils.getAppWorkDirPath(this.applicationName, this.applicationId)));
+ services.add(logCopier);
+ } catch (Exception e) {
+ LOGGER.warn("Cannot add LogCopier service to the service manager due to", e);
+ }
+ }
+ }
+ if (UserGroupInformation.isSecurityEnabled()) {
+ LOGGER.info("Adding YarnContainerSecurityManager since security is enabled");
+ services.add(new YarnContainerSecurityManager(this.clusterConfig, this.fs, this.eventBus, logCopier));
+ }
+ return services;
+ }
+
+
+ private static String getApplicationId(ContainerId containerId) {
+ return containerId.getApplicationAttemptId().getApplicationId().toString();
+ }
+
+ private static String getTaskRunnerId(ContainerId containerId) {
+ return containerId.toString();
+ }
+
+ public static void main(String[] args) {
+ LOGGER.info("Starting GobblinTemporalYarnTaskRunner");
+ Options options = buildOptions();
+ try {
+ CommandLine cmd = new DefaultParser().parse(options, args);
+ if (!cmd.hasOption(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME) || !cmd
+ .hasOption(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME)) {
+ printUsage(options);
+ System.exit(1);
+ }
+
+ Log4jConfigurationHelper.updateLog4jConfiguration(GobblinTaskRunner.class,
+ GobblinYarnConfigurationKeys.GOBBLIN_YARN_LOG4J_CONFIGURATION_FILE,
+ GobblinYarnConfigurationKeys.GOBBLIN_YARN_LOG4J_CONFIGURATION_FILE);
+
+ LOGGER.info(JvmUtils.getJvmInputArguments());
+
+ ContainerId containerId =
+ ConverterUtils.toContainerId(System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.key()));
+ String applicationName = cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME);
+ String applicationId = cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME);
+ Config config = ConfigFactory.load();
+
+ GobblinTemporalTaskRunner gobblinTemporalTaskRunner =
+ new GobblinTemporalYarnTaskRunner(applicationName, applicationId, containerId, config,
+ Optional.absent());
+ gobblinTemporalTaskRunner.start();
+
+ } catch (ParseException pe) {
+ printUsage(options);
+ System.exit(1);
+ } catch (Throwable t) {
+ // Ideally, we should not be catching non-recoverable exceptions and errors. However,
+ // simply propagating the exception may prevent the container exit due to the presence of non-daemon threads present
+ // in the application. Hence, we catch this exception to invoke System.exit() which in turn ensures that all non-daemon threads are killed.
+ LOGGER.error("Exception encountered: {}", t);
+ System.exit(1);
+ }
+ }
+}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/temporal/YarnTemporalAppMasterSecurityManager.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/temporal/YarnTemporalAppMasterSecurityManager.java
new file mode 100644
index 00000000000..eaf2505e7d0
--- /dev/null
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/temporal/YarnTemporalAppMasterSecurityManager.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import com.google.common.base.Throwables;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.logs.LogCopier;
+import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
+
+
+public class YarnTemporalAppMasterSecurityManager extends YarnContainerSecurityManager{
+
+ private YarnTemporalService _yarnTemporalService;
+ public YarnTemporalAppMasterSecurityManager(Config config, FileSystem fs, EventBus eventBus, LogCopier logCopier, YarnTemporalService yarnTemporalService) {
+ super(config, fs, eventBus, logCopier);
+ this._yarnTemporalService = yarnTemporalService;
+ }
+
+ @Override
+ public void handleTokenFileUpdatedEvent(DelegationTokenUpdatedEvent delegationTokenUpdatedEvent) {
+ super.handleTokenFileUpdatedEvent(delegationTokenUpdatedEvent);
+ try {
+ _yarnTemporalService.updateToken();
+ } catch (IOException ioe) {
+ throw Throwables.propagate(ioe);
+ }
+ }
+}
+
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/temporal/YarnTemporalAutoScalingManager.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/temporal/YarnTemporalAutoScalingManager.java
new file mode 100644
index 00000000000..a37d0b44256
--- /dev/null
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/temporal/YarnTemporalAutoScalingManager.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.util.ArrayDeque;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.compress.utils.Sets;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobDag;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+
+/**
+ * The autoscaling manager is responsible for figuring out how many containers are required for the workload and
+ * requesting the {@link YarnTemporalService} to request that many containers.
+ */
+@Slf4j
+public class YarnTemporalAutoScalingManager extends AbstractIdleService {
+ private final String AUTO_SCALING_PREFIX = GobblinYarnConfigurationKeys.GOBBLIN_YARN_PREFIX + "autoScaling.";
+ private final String AUTO_SCALING_POLLING_INTERVAL_SECS =
+ AUTO_SCALING_PREFIX + "pollingIntervalSeconds";
+ private static final int THRESHOLD_NUMBER_OF_ATTEMPTS_FOR_LOGGING = 20;
+ private final int DEFAULT_AUTO_SCALING_POLLING_INTERVAL_SECS = 60;
+ // Only one container will be requested for each N partitions of work
+ private final String AUTO_SCALING_PARTITIONS_PER_CONTAINER = AUTO_SCALING_PREFIX + "partitionsPerContainer";
+ private final int DEFAULT_AUTO_SCALING_PARTITIONS_PER_CONTAINER = 1;
+ private final String AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR = AUTO_SCALING_PREFIX + "overProvisionFactor";
+ private final double DEFAULT_AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR = 1.0;
+ // The cluster level default tags for Helix instances
+ private final String defaultHelixInstanceTags;
+ private final int defaultContainerMemoryMbs;
+ private final int defaultContainerCores;
+
+ private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX + "initialDelay";
+ private final int DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS = 60;
+
+ private final String AUTO_SCALING_WINDOW_SIZE = AUTO_SCALING_PREFIX + "windowSize";
+
+ public final static int DEFAULT_MAX_CONTAINER_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES = 10;
+
+ private final Config config;
+ // private final HelixManager helixManager;
+ private final ScheduledExecutorService autoScalingExecutor;
+ private final YarnTemporalService _yarnTemporalService;
+ private final int partitionsPerContainer;
+ private final double overProvisionFactor;
+ private final SlidingWindowReservoir slidingFixedSizeWindow;
+ private static int maxIdleTimeInMinutesBeforeScalingDown = DEFAULT_MAX_CONTAINER_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES;
+ private static final HashSet
+ UNUSUAL_HELIX_TASK_STATES = Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.DROPPED, TaskPartitionState.COMPLETED, TaskPartitionState.TIMED_OUT);
+
+ public YarnTemporalAutoScalingManager(GobblinTemporalApplicationMaster appMaster) {
+ this.config = appMaster.getConfig();
+ this._yarnTemporalService = appMaster.get_yarnTemporalService();
+ this.partitionsPerContainer = ConfigUtils.getInt(this.config, AUTO_SCALING_PARTITIONS_PER_CONTAINER,
+ DEFAULT_AUTO_SCALING_PARTITIONS_PER_CONTAINER);
+
+ Preconditions.checkArgument(this.partitionsPerContainer > 0,
+ AUTO_SCALING_PARTITIONS_PER_CONTAINER + " needs to be greater than 0");
+
+ this.overProvisionFactor = ConfigUtils.getDouble(this.config, AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR,
+ DEFAULT_AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR);
+
+ this.slidingFixedSizeWindow = config.hasPath(AUTO_SCALING_WINDOW_SIZE)
+ ? new SlidingWindowReservoir(config.getInt(AUTO_SCALING_WINDOW_SIZE), Integer.MAX_VALUE)
+ : new SlidingWindowReservoir(Integer.MAX_VALUE);
+
+ this.autoScalingExecutor = Executors.newSingleThreadScheduledExecutor(
+ ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("AutoScalingExecutor")));
+
+ this.defaultHelixInstanceTags = ConfigUtils.getString(config,
+ GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG);
+ this.defaultContainerMemoryMbs = config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
+ this.defaultContainerCores = config.getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY);
+ }
+
+ @Override
+ protected void startUp() {
+ int scheduleInterval = ConfigUtils.getInt(this.config, AUTO_SCALING_POLLING_INTERVAL_SECS,
+ DEFAULT_AUTO_SCALING_POLLING_INTERVAL_SECS);
+ int initialDelay = ConfigUtils.getInt(this.config, AUTO_SCALING_INITIAL_DELAY,
+ DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS);
+ log.info("Starting the " + YarnTemporalAutoScalingManager.class.getSimpleName());
+ log.info("Scheduling the auto scaling task with an interval of {} seconds", scheduleInterval);
+
+// this.autoScalingExecutor.scheduleAtFixedRate(new TemporalYarnAutoScalingRunnable(new TaskDriver(this.helixManager),
+// this.temporalYarnService, this.partitionsPerContainer, this.overProvisionFactor,
+// this.slidingFixedSizeWindow, this.defaultHelixInstanceTags,
+// this.defaultContainerMemoryMbs, this.defaultContainerCores),
+// initialDelay, scheduleInterval, TimeUnit.SECONDS);
+ }
+
+ @Override
+ protected void shutDown() {
+ log.info("Stopping the " + YarnTemporalAutoScalingManager.class.getSimpleName());
+
+ ExecutorsUtils.shutdownExecutorService(this.autoScalingExecutor, Optional.of(log));
+ }
+
+ /**
+ * A {@link Runnable} that figures out the number of containers required for the workload
+ * and requests those containers.
+ */
+ @VisibleForTesting
+ @AllArgsConstructor
+ static class TemporalYarnAutoScalingRunnable implements Runnable {
+ private final TaskDriver taskDriver;
+ private final YarnTemporalService _yarnTemporalService;
+ private final int partitionsPerContainer;
+ private final double overProvisionFactor;
+ private final SlidingWindowReservoir slidingWindowReservoir;
+ private final String defaultHelixInstanceTags;
+ private final int defaultContainerMemoryMbs;
+ private final int defaultContainerCores;
+
+ /**
+ * A static map that keep track of an idle instance and its latest beginning idle time.
+ * If an instance is no longer idle when inspected, it will be dropped from this map.
+ */
+ private static final Map instanceIdleSince = new HashMap<>();
+
+
+ @Override
+ public void run() {
+ // Suppress errors to avoid interrupting any scheduled executions of this Runnable
+ try {
+ runInternal();
+ } catch (Throwable t) {
+ log.warn("Suppressing error from YarnAutoScalingRunnable.run()", t);
+ }
+ }
+
+ /**
+ * Iterate through the workflows configured in Helix to figure out the number of required partitions
+ * and request the {@link YarnTemporalService} to scale to the desired number of containers.
+ */
+ @VisibleForTesting
+ void runInternal() {
+ Set inUseInstances = new HashSet<>();
+ YarnContainerRequestBundle yarnContainerRequestBundle = new YarnContainerRequestBundle();
+ for (Map.Entry workFlowEntry : taskDriver.getWorkflows().entrySet()) {
+ WorkflowContext workflowContext = taskDriver.getWorkflowContext(workFlowEntry.getKey());
+
+ // Only allocate for active workflows
+ if (workflowContext == null || !workflowContext.getWorkflowState().equals(TaskState.IN_PROGRESS)) {
+ continue;
+ }
+
+ log.debug("Workflow name {} config {} context {}", workFlowEntry.getKey(), workFlowEntry.getValue(),
+ workflowContext);
+
+ WorkflowConfig workflowConfig = workFlowEntry.getValue();
+ JobDag jobDag = workflowConfig.getJobDag();
+ Set jobs = jobDag.getAllNodes();
+
+ // sum up the number of partitions
+ for (String jobName : jobs) {
+ JobContext jobContext = taskDriver.getJobContext(jobName);
+ JobConfig jobConfig = taskDriver.getJobConfig(jobName);
+ Resource resource = Resource.newInstance(this.defaultContainerMemoryMbs, this.defaultContainerCores);
+ int numPartitions = 0;
+ String jobTag = defaultHelixInstanceTags;
+ if (jobContext != null) {
+ log.debug("JobContext {} num partitions {}", jobContext, jobContext.getPartitionSet().size());
+
+ numPartitions = jobContext.getPartitionSet().size();
+ // Job level config for helix instance tags takes precedence over other tag configurations
+ if (jobConfig != null) {
+ if (!Strings.isNullOrEmpty(jobConfig.getInstanceGroupTag())) {
+ jobTag = jobConfig.getInstanceGroupTag();
+ }
+ Map jobCommandConfigMap = jobConfig.getJobCommandConfigMap();
+ if(jobCommandConfigMap.containsKey(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS)){
+ resource.setMemory(Integer.parseInt(jobCommandConfigMap.get(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS)));
+ }
+ if(jobCommandConfigMap.containsKey(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES)){
+ resource.setVirtualCores(Integer.parseInt(jobCommandConfigMap.get(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES)));
+ }
+ }
+ }
+ // compute the container count as a ceiling of number of partitions divided by the number of containers
+ // per partition. Scale the result by a constant overprovision factor.
+ int containerCount = (int) Math.ceil(((double)numPartitions / this.partitionsPerContainer) * this.overProvisionFactor);
+ yarnContainerRequestBundle.add(jobTag, containerCount, resource);
+ log.info("jobName={}, jobTag={}, numPartitions={}, targetNumContainers={}",
+ jobName, jobTag, numPartitions, containerCount);
+ }
+ }
+ slidingWindowReservoir.add(yarnContainerRequestBundle);
+
+ log.debug("There are {} containers being requested in total, tag-count map {}, tag-resource map {}",
+ yarnContainerRequestBundle.getTotalContainers(), yarnContainerRequestBundle.getHelixTagContainerCountMap(),
+ yarnContainerRequestBundle.getHelixTagResourceMap());
+
+ this._yarnTemporalService.requestTargetNumberOfContainers(slidingWindowReservoir.getMax(), inUseInstances);
+ }
+
+ @VisibleForTesting
+ /**
+ * Return true is the condition for tagging an instance as "unused" holds.
+ * The condition, by default is that if an instance went back to
+ * active (having partition running on it) within {@link #maxIdleTimeInMinutesBeforeScalingDown} minutes, we will
+ * not tag that instance as "unused" and have that as the candidate for scaling down.
+ */
+ boolean isInstanceUnused(String participant){
+ return System.currentTimeMillis() - instanceIdleSince.get(participant) >
+ TimeUnit.MINUTES.toMillis(maxIdleTimeInMinutesBeforeScalingDown);
+ }
+ }
+
+ /**
+ * A FIFO queue with fixed size and returns maxValue among all elements within the queue in constant time.
+ * This data structure prevents temporary fluctuation in the number of active helix partitions as the size of queue
+ * grows and will be less sensitive when scaling down is actually required.
+ *
+ * The interface for this is implemented in a minimal-necessity manner to serve only as a sliding-sized-window
+ * which captures max value. It is NOT built for general purpose.
+ */
+ static class SlidingWindowReservoir {
+ private ArrayDeque fifoQueue;
+ private PriorityQueue priorityQueue;
+
+ // Queue Size
+ private int maxSize;
+ private static final int DEFAULT_MAX_SIZE = 10;
+
+ // Upper-bound of value within the queue.
+ private int upperBound;
+
+ public SlidingWindowReservoir(int maxSize, int upperBound) {
+ Preconditions.checkArgument(maxSize > 0, "maxSize has to be a value larger than 0");
+
+ this.maxSize = maxSize;
+ this.upperBound = upperBound;
+ this.fifoQueue = new ArrayDeque<>(maxSize);
+ this.priorityQueue = new PriorityQueue<>(maxSize, new Comparator() {
+ @Override
+ public int compare(YarnContainerRequestBundle o1, YarnContainerRequestBundle o2) {
+ Integer i2 = o2.getTotalContainers();
+ return i2.compareTo(o1.getTotalContainers());
+ }
+ });
+ }
+
+ public SlidingWindowReservoir(int upperBound) {
+ this(DEFAULT_MAX_SIZE, upperBound);
+ }
+
+ /**
+ * Add element into data structure.
+ * When a new element is larger than upperbound, reject the value since we may request too many Yarn containers.
+ * When queue is full, evict head of FIFO-queue (In FIFO queue, elements are inserted from tail).
+ */
+ public void add(YarnContainerRequestBundle e) {
+ if (e.getTotalContainers() > upperBound) {
+ log.error(String.format("Request of getting %s containers seems to be excessive, rejected", e));
+ return;
+ }
+
+ if (fifoQueue.size() == maxSize) {
+ YarnContainerRequestBundle removedElement = fifoQueue.remove();
+ priorityQueue.remove(removedElement);
+ }
+
+ if (fifoQueue.size() == priorityQueue.size()) {
+ fifoQueue.add(e);
+ priorityQueue.add(e);
+ } else {
+ throw new IllegalStateException("Queue has its internal data structure being inconsistent.");
+ }
+ }
+
+ /**
+ * If queue is empty, throw {@link IllegalStateException}.
+ */
+ public YarnContainerRequestBundle getMax() {
+ if (priorityQueue.size() > 0) {
+ return this.priorityQueue.peek();
+ } else {
+ throw new IllegalStateException("Queried before elements added into the queue.");
+ }
+ }
+ }
+}
+
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/temporal/YarnTemporalService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/temporal/YarnTemporalService.java
new file mode 100644
index 00000000000..4644a5561dd
--- /dev/null
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/temporal/YarnTemporalService.java
@@ -0,0 +1,982 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.io.Closer;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.GobblinClusterMetricTagNames;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricReporterException;
+import org.apache.gobblin.metrics.MultiReporterException;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.JvmUtils;
+import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
+import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
+import org.apache.gobblin.yarn.event.ContainerShutdownRequest;
+import org.apache.gobblin.yarn.event.NewContainerRequest;
+
+/**
+ * This class is responsible for all Yarn-related stuffs including ApplicationMaster registration,
+ * ApplicationMaster un-registration, Yarn container management, etc.
+ *
+ * @author Yinan Li
+ */
+public class YarnTemporalService extends AbstractIdleService {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(YarnTemporalService.class);
+
+ private static final String UNKNOWN_HELIX_INSTANCE = "UNKNOWN";
+
+ private final String applicationName;
+ private final String applicationId;
+ private final String appViewAcl;
+ //Default helix instance tag derived from cluster level config
+ private final String helixInstanceTags;
+
+ private final Config config;
+
+ private final EventBus eventBus;
+
+ private final Configuration yarnConfiguration;
+ private final FileSystem fs;
+
+ private final Optional gobblinMetrics;
+ private final Optional eventSubmitter;
+
+ @VisibleForTesting
+ @Getter(AccessLevel.PROTECTED)
+ private final AMRMClientAsync amrmClientAsync;
+ private final NMClientAsync nmClientAsync;
+ private final ExecutorService containerLaunchExecutor;
+
+ private final int initialContainers;
+ private final int requestedContainerMemoryMbs;
+ private final int requestedContainerCores;
+ private final int jvmMemoryOverheadMbs;
+ private final double jvmMemoryXmxRatio;
+ private final boolean containerHostAffinityEnabled;
+
+ private final int helixInstanceMaxRetries;
+
+ private final Optional containerJvmArgs;
+ private final String containerTimezone;
+
+ @Getter(AccessLevel.PROTECTED)
+ private volatile Optional maxResourceCapacity = Optional.absent();
+
+ // Security tokens for accessing HDFS
+ private ByteBuffer tokens;
+
+ private final Closer closer = Closer.create();
+
+ private final Object allContainersStopped = new Object();
+
+ // A map from container IDs to Container instances, Helix participant IDs of the containers and Helix Tag
+ @VisibleForTesting
+ @Getter(AccessLevel.PROTECTED)
+ private final ConcurrentMap containerMap = Maps.newConcurrentMap();
+
+ // A cache of the containers with an outstanding container release request.
+ // This is a cache instead of a set to get the automatic cleanup in case a container completes before the requested
+ // release.
+ @VisibleForTesting
+ @Getter(AccessLevel.PROTECTED)
+ private final Cache releasedContainerCache;
+
+ // A map from Helix instance names to the number times the instances are retried to be started
+ private final ConcurrentMap helixInstanceRetryCount = Maps.newConcurrentMap();
+
+ // A concurrent HashSet of unused Helix instance names. An unused Helix instance name gets put
+ // into the set if the container running the instance completes. Unused Helix
+ // instance names get picked up when replacement containers get allocated.
+ private final Set unusedHelixInstanceNames = ConcurrentHashMap.newKeySet();
+
+ // The map from helix tag to allocated container count
+ private final ConcurrentMap allocatedContainerCountMap = Maps.newConcurrentMap();
+ private final ConcurrentMap removedContainerID = Maps.newConcurrentMap();
+
+ private volatile YarnContainerRequestBundle yarnContainerRequest;
+ private final AtomicInteger priorityNumGenerator = new AtomicInteger(0);
+ private final Map resourcePriorityMap = new HashMap<>();
+
+ private volatile boolean shutdownInProgress = false;
+
+ public YarnTemporalService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
+ FileSystem fs, EventBus eventBus) throws Exception {
+ this.applicationName = applicationName;
+ this.applicationId = applicationId;
+
+ this.config = config;
+
+ this.eventBus = eventBus;
+
+ this.gobblinMetrics = config.getBoolean(ConfigurationKeys.METRICS_ENABLED_KEY) ?
+ Optional.of(buildGobblinMetrics()) : Optional.absent();
+
+ this.eventSubmitter = config.getBoolean(ConfigurationKeys.METRICS_ENABLED_KEY) ?
+ Optional.of(buildEventSubmitter()) : Optional.absent();
+
+ this.yarnConfiguration = yarnConfiguration;
+ this.fs = fs;
+
+ int amRmHeartbeatIntervalMillis = Long.valueOf(TimeUnit.SECONDS.toMillis(
+ ConfigUtils.getInt(config, GobblinYarnConfigurationKeys.AMRM_HEARTBEAT_INTERVAL_SECS,
+ GobblinYarnConfigurationKeys.DEFAULT_AMRM_HEARTBEAT_INTERVAL_SECS))).intValue();
+ this.amrmClientAsync = closer.register(
+ AMRMClientAsync.createAMRMClientAsync(amRmHeartbeatIntervalMillis, new AMRMClientCallbackHandler()));
+ this.amrmClientAsync.init(this.yarnConfiguration);
+ this.nmClientAsync = closer.register(NMClientAsync.createNMClientAsync(getNMClientCallbackHandler()));
+ this.nmClientAsync.init(this.yarnConfiguration);
+
+ this.initialContainers = config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY);
+ this.requestedContainerMemoryMbs = config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
+ this.requestedContainerCores = config.getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY);
+ this.containerHostAffinityEnabled = config.getBoolean(GobblinYarnConfigurationKeys.CONTAINER_HOST_AFFINITY_ENABLED);
+
+ this.helixInstanceMaxRetries = config.getInt(GobblinYarnConfigurationKeys.HELIX_INSTANCE_MAX_RETRIES);
+ this.helixInstanceTags = ConfigUtils.getString(config,
+ GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG);
+
+ this.containerJvmArgs = config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY) ?
+ Optional.of(config.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY)) :
+ Optional.absent();
+
+ int numContainerLaunchThreads =
+ ConfigUtils.getInt(config, GobblinYarnConfigurationKeys.MAX_CONTAINER_LAUNCH_THREADS_KEY,
+ GobblinYarnConfigurationKeys.DEFAULT_MAX_CONTAINER_LAUNCH_THREADS);
+ this.containerLaunchExecutor = ScalingThreadPoolExecutor.newScalingThreadPool(5, numContainerLaunchThreads, 0L,
+ ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("ContainerLaunchExecutor")));
+
+ this.tokens = getSecurityTokens();
+
+ this.releasedContainerCache = CacheBuilder.newBuilder().expireAfterAccess(ConfigUtils.getInt(config,
+ GobblinYarnConfigurationKeys.RELEASED_CONTAINERS_CACHE_EXPIRY_SECS,
+ GobblinYarnConfigurationKeys.DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS), TimeUnit.SECONDS).build();
+
+ this.jvmMemoryXmxRatio = ConfigUtils.getDouble(this.config,
+ GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY,
+ GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO);
+
+ Preconditions.checkArgument(this.jvmMemoryXmxRatio >= 0 && this.jvmMemoryXmxRatio <= 1,
+ GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY + " must be between 0 and 1 inclusive");
+
+ this.jvmMemoryOverheadMbs = ConfigUtils.getInt(this.config,
+ GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY,
+ GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS);
+
+ Preconditions.checkArgument(this.jvmMemoryOverheadMbs < this.requestedContainerMemoryMbs * this.jvmMemoryXmxRatio,
+ GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY + " cannot be more than "
+ + GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY + " * "
+ + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY);
+
+ this.appViewAcl = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.APP_VIEW_ACL,
+ GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL);
+ this.containerTimezone = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE,
+ GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
+ }
+
+ @SuppressWarnings("unused")
+ @Subscribe
+ public void handleNewContainerRequest(NewContainerRequest newContainerRequest) {
+ if (!this.maxResourceCapacity.isPresent()) {
+ LOGGER.error(String.format(
+ "Unable to handle new container request as maximum resource capacity is not available: "
+ + "[memory (MBs) requested = %d, vcores requested = %d]", this.requestedContainerMemoryMbs,
+ this.requestedContainerCores));
+ return;
+ }
+ requestContainer(newContainerRequest.getReplacedContainer().transform(container -> container.getNodeId().getHost()),
+ newContainerRequest.getResource());
+ }
+
+ protected NMClientCallbackHandler getNMClientCallbackHandler() {
+ return new NMClientCallbackHandler();
+ }
+
+ @SuppressWarnings("unused")
+ @Subscribe
+ public void handleContainerShutdownRequest(ContainerShutdownRequest containerShutdownRequest) {
+ for (Container container : containerShutdownRequest.getContainers()) {
+ LOGGER.info(String.format("Stopping container %s running on %s", container.getId(), container.getNodeId()));
+ this.nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId());
+ }
+ }
+
+ /**
+ * Request the Resource Manager to release the container
+ * @param containerReleaseRequest containers to release
+ */
+ @Subscribe
+ public void handleContainerReleaseRequest(ContainerReleaseRequest containerReleaseRequest) {
+ for (Container container : containerReleaseRequest.getContainers()) {
+ LOGGER.info(String.format("Releasing container %s running on %s", container.getId(), container.getNodeId()));
+
+ // Record that this container was explicitly released so that a new one is not spawned to replace it
+ // Put the container id in the releasedContainerCache before releasing it so that handleContainerCompletion()
+ // can check for the container id and skip spawning a replacement container.
+ // Note that this is the best effort since these are asynchronous operations and a container may abort concurrently
+ // with the release call. So in some cases a replacement container may have already been spawned before
+ // the container is put into the black list.
+ this.releasedContainerCache.put(container.getId(), "");
+ this.amrmClientAsync.releaseAssignedContainer(container.getId());
+ }
+ }
+
+ @Override
+ protected synchronized void startUp() throws Exception {
+ LOGGER.info("Starting the TemporalYarnService");
+
+ // Register itself with the EventBus for container-related requests
+ this.eventBus.register(this);
+
+ this.amrmClientAsync.start();
+ this.nmClientAsync.start();
+
+ // The ApplicationMaster registration response is used to determine the maximum resource capacity of the cluster
+ RegisterApplicationMasterResponse response = this.amrmClientAsync.registerApplicationMaster(
+ GobblinClusterUtils.getHostname(), -1, "");
+ LOGGER.info("ApplicationMaster registration response: " + response);
+ this.maxResourceCapacity = Optional.of(response.getMaximumResourceCapability());
+
+ LOGGER.info("Requesting initial containers");
+ requestInitialContainers(this.initialContainers);
+ }
+
+ @Override
+ protected void shutDown() throws IOException {
+ LOGGER.info("Stopping the TemporalYarnService");
+
+ this.shutdownInProgress = true;
+
+ try {
+ ExecutorsUtils.shutdownExecutorService(this.containerLaunchExecutor, Optional.of(LOGGER));
+
+ // Stop the running containers
+ for (ContainerInfo containerInfo : this.containerMap.values()) {
+ LOGGER.info("Stopping container {} running participant {}", containerInfo.getContainer().getId(),
+ containerInfo.getHelixParticipantId());
+ this.nmClientAsync.stopContainerAsync(containerInfo.getContainer().getId(), containerInfo.getContainer().getNodeId());
+ }
+
+ if (!this.containerMap.isEmpty()) {
+ synchronized (this.allContainersStopped) {
+ try {
+ // Wait 5 minutes for the containers to stop
+ Duration waitTimeout = Duration.ofMinutes(5);
+ this.allContainersStopped.wait(waitTimeout.toMillis());
+ LOGGER.info("All of the containers have been stopped");
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, null);
+ } catch (IOException | YarnException e) {
+ LOGGER.error("Failed to unregister the ApplicationMaster", e);
+ } finally {
+ try {
+ this.closer.close();
+ } finally {
+ if (this.gobblinMetrics.isPresent()) {
+ this.gobblinMetrics.get().stopMetricsReporting();
+ }
+ }
+ }
+ }
+
+ public void updateToken() throws IOException{
+ this.tokens = getSecurityTokens();
+ }
+
+ private GobblinMetrics buildGobblinMetrics() {
+ // Create tags list
+ ImmutableList.Builder> tags = new ImmutableList.Builder<>();
+ tags.add(new Tag<>(GobblinClusterMetricTagNames.APPLICATION_ID, this.applicationId));
+ tags.add(new Tag<>(GobblinClusterMetricTagNames.APPLICATION_NAME, this.applicationName));
+
+ // Intialize Gobblin metrics and start reporters
+ GobblinMetrics gobblinMetrics = GobblinMetrics.get(this.applicationId, null, tags.build());
+ try {
+ gobblinMetrics.startMetricReporting(ConfigUtils.configToProperties(config));
+ } catch (MultiReporterException ex) {
+ for (MetricReporterException e: ex.getExceptions()) {
+ LOGGER.error("Failed to start {} {} reporter.", e.getSinkType().name(), e.getReporterType().name(), e);
+ }
+ }
+
+ return gobblinMetrics;
+ }
+
+ private EventSubmitter buildEventSubmitter() {
+ return new EventSubmitter.Builder(this.gobblinMetrics.get().getMetricContext(),
+ GobblinYarnEventConstants.EVENT_NAMESPACE)
+ .build();
+ }
+
+ /**
+ * Request an allocation of containers. If numTargetContainers is larger than the max of current and expected number
+ * of containers then additional containers are requested.
+ *
+ * If numTargetContainers is less than the current number of allocated containers then release free containers.
+ * Shrinking is relative to the number of currently allocated containers since it takes time for containers
+ * to be allocated and assigned work and we want to avoid releasing a container prematurely before it is assigned
+ * work. This means that a container may not be released even though numTargetContainers is less than the requested
+ * number of containers. The intended usage is for the caller of this method to make periodic calls to attempt to
+ * adjust the cluster towards the desired number of containers.
+ *
+ * @param yarnContainerRequestBundle the desired containers information, including numbers, resource and helix tag
+ * @param inUseInstances a set of in use instances
+ * @return whether successfully requested the target number of containers
+ */
+ public synchronized boolean requestTargetNumberOfContainers(YarnContainerRequestBundle yarnContainerRequestBundle, Set inUseInstances) {
+ int defaultContainerMemoryMbs = config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
+ int defaultContainerCores = config.getInt(GobblinYarnConfigurationKeys. CONTAINER_CORES_KEY);
+ // making workerPoolSize configurable, the default value would be 10
+ int workerPoolSize = ConfigUtils.getInt(config, GobblinYarnConfigurationKeys.TEMPORAL_WORKERPOOL_SIZE,10);
+
+ LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances count is {}, container map size is {}",
+ workerPoolSize, inUseInstances.size(), this.containerMap.size());
+
+ requestContainers(workerPoolSize, Resource.newInstance(defaultContainerMemoryMbs, defaultContainerCores));
+
+ this.yarnContainerRequest = yarnContainerRequestBundle;
+ LOGGER.info("Current tag-container desired count:{}, tag-container allocated: {}",
+ yarnContainerRequestBundle.getHelixTagContainerCountMap(), this.allocatedContainerCountMap);
+ return true;
+ }
+
+ // Request initial containers with default resource and helix tag
+ private void requestInitialContainers(int containersRequested) {
+ YarnContainerRequestBundle initialYarnContainerRequest = new YarnContainerRequestBundle();
+ Resource capability = Resource.newInstance(this.requestedContainerMemoryMbs, this.requestedContainerCores);
+ initialYarnContainerRequest.add(this.helixInstanceTags, containersRequested, capability);
+ requestTargetNumberOfContainers(initialYarnContainerRequest, Collections.EMPTY_SET);
+ }
+
+ private void requestContainer(Optional preferredNode, Optional resourceOptional) {
+ Resource desiredResource = resourceOptional.or(Resource.newInstance(
+ this.requestedContainerMemoryMbs, this.requestedContainerCores));
+ requestContainer(preferredNode, desiredResource);
+ }
+
+ /**
+ * Request {@param numContainers} from yarn with the specified resource. Resources will be allocated without a preferred
+ * node
+ * @param numContainers
+ * @param resource
+ */
+ private void requestContainers(int numContainers, Resource resource) {
+ LOGGER.info("Requesting {} containers with resource={}", numContainers, resource);
+ IntStream.range(0, numContainers)
+ .forEach(i -> requestContainer(Optional.absent(), resource));
+ }
+
+ // Request containers with specific resource requirement
+ private void requestContainer(Optional preferredNode, Resource resource) {
+ // Fail if Yarn cannot meet container resource requirements
+ Preconditions.checkArgument(resource.getMemory() <= this.maxResourceCapacity.get().getMemory() &&
+ resource.getVirtualCores() <= this.maxResourceCapacity.get().getVirtualCores(),
+ "Resource requirement must less than the max resource capacity. Requested resource" + resource.toString()
+ + " exceed the max resource limit " + this.maxResourceCapacity.get().toString());
+
+ // Due to YARN-314, different resource capacity needs different priority, otherwise Yarn will not allocate container
+ Priority priority = Records.newRecord(Priority.class);
+ if(!resourcePriorityMap.containsKey(resource.toString())) {
+ resourcePriorityMap.put(resource.toString(), priorityNumGenerator.getAndIncrement());
+ }
+ int priorityNum = resourcePriorityMap.get(resource.toString());
+ priority.setPriority(priorityNum);
+
+ String[] preferredNodes = preferredNode.isPresent() ? new String[] {preferredNode.get()} : null;
+ this.amrmClientAsync.addContainerRequest(
+ new AMRMClient.ContainerRequest(resource, preferredNodes, null, priority));
+ }
+
+ protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo containerInfo)
+ throws IOException {
+ Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, this.applicationId);
+ Path containerWorkDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
+
+ Map resourceMap = Maps.newHashMap();
+
+ addContainerLocalResources(new Path(appWorkDir, GobblinYarnConfigurationKeys.LIB_JARS_DIR_NAME), resourceMap);
+ addContainerLocalResources(new Path(containerWorkDir, GobblinYarnConfigurationKeys.APP_JARS_DIR_NAME), resourceMap);
+ addContainerLocalResources(
+ new Path(containerWorkDir, GobblinYarnConfigurationKeys.APP_FILES_DIR_NAME), resourceMap);
+
+ if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_FILES_REMOTE_KEY)) {
+ YarnHelixUtils.addRemoteFilesToLocalResources(this.config.getString(GobblinYarnConfigurationKeys.CONTAINER_FILES_REMOTE_KEY),
+ resourceMap, yarnConfiguration);
+ }
+ if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_ZIPS_REMOTE_KEY)) {
+ YarnHelixUtils.addRemoteZipsToLocalResources(this.config.getString(GobblinYarnConfigurationKeys.CONTAINER_ZIPS_REMOTE_KEY),
+ resourceMap, yarnConfiguration);
+ }
+ ContainerLaunchContext containerLaunchContext = Records.newRecord(ContainerLaunchContext.class);
+ containerLaunchContext.setLocalResources(resourceMap);
+ containerLaunchContext.setEnvironment(YarnHelixUtils.getEnvironmentVariables(this.yarnConfiguration));
+ containerLaunchContext.setCommands(Arrays.asList(containerInfo.getStartupCommand()));
+
+ Map acls = new HashMap<>(1);
+ acls.put(ApplicationAccessType.VIEW_APP, this.appViewAcl);
+ containerLaunchContext.setApplicationACLs(acls);
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ containerLaunchContext.setTokens(this.tokens.duplicate());
+ }
+
+ return containerLaunchContext;
+ }
+
+ private void addContainerLocalResources(Path destDir, Map resourceMap) throws IOException {
+ if (!this.fs.exists(destDir)) {
+ LOGGER.warn(String.format("Path %s does not exist so no container LocalResource to add", destDir));
+ return;
+ }
+
+ FileStatus[] statuses = this.fs.listStatus(destDir);
+ if (statuses != null) {
+ for (FileStatus status : statuses) {
+ YarnHelixUtils.addFileAsLocalResource(this.fs, status.getPath(), LocalResourceType.FILE, resourceMap);
+ }
+ }
+ }
+
+
+ protected ByteBuffer getSecurityTokens() throws IOException {
+ Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
+ Closer closer = Closer.create();
+ try {
+ DataOutputBuffer dataOutputBuffer = closer.register(new DataOutputBuffer());
+ credentials.writeTokenStorageToStream(dataOutputBuffer);
+
+ // Remove the AM->RM token so that containers cannot access it
+ Iterator> tokenIterator = credentials.getAllTokens().iterator();
+ while (tokenIterator.hasNext()) {
+ Token> token = tokenIterator.next();
+ if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+ tokenIterator.remove();
+ }
+ }
+
+ return ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength());
+ } catch (Throwable t) {
+ throw closer.rethrow(t);
+ } finally {
+ closer.close();
+ }
+ }
+
+ @VisibleForTesting
+ protected String buildContainerCommand(Container container, String helixParticipantId, String helixInstanceTag) {
+ String containerProcessName = GobblinTemporalYarnTaskRunner.class.getSimpleName();
+ StringBuilder containerCommand = new StringBuilder()
+ .append(ApplicationConstants.Environment.JAVA_HOME.$()).append("/bin/java")
+ .append(" -Xmx").append((int) (container.getResource().getMemory() * this.jvmMemoryXmxRatio) -
+ this.jvmMemoryOverheadMbs).append("M")
+ .append(" -D").append(GobblinYarnConfigurationKeys.JVM_USER_TIMEZONE_CONFIG).append("=").append(this.containerTimezone)
+ .append(" -D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME).append("=").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR)
+ .append(" -D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME).append("=").append(containerProcessName).append(".").append(ApplicationConstants.STDOUT)
+ .append(" ").append(JvmUtils.formatJvmArguments(this.containerJvmArgs))
+ .append(" ").append(GobblinTemporalYarnTaskRunner.class.getName())
+ .append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
+ .append(" ").append(this.applicationName)
+ .append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME)
+ .append(" ").append(this.applicationId)
+ .append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME)
+ .append(" ").append(helixParticipantId);
+
+ if (!Strings.isNullOrEmpty(helixInstanceTag)) {
+ containerCommand.append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
+ .append(" ").append(helixInstanceTag);
+ }
+
+ LOGGER.info("Building " + containerProcessName);
+ return containerCommand.append(" 1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
+ containerProcessName).append(".").append(ApplicationConstants.STDOUT)
+ .append(" 2>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
+ containerProcessName).append(".").append(ApplicationConstants.STDERR).toString();
+ }
+
+ /**
+ * Check the exit status of a completed container and see if the replacement container
+ * should try to be started on the same node. Some exit status indicates a disk or
+ * node failure and in such cases the replacement container should try to be started on
+ * a different node.
+ */
+ private boolean shouldStickToTheSameNode(int containerExitStatus) {
+ switch (containerExitStatus) {
+ case ContainerExitStatus.DISKS_FAILED:
+ return false;
+ case ContainerExitStatus.ABORTED:
+ // Mostly likely this exit status is due to node failures because the
+ // application itself will not release containers.
+ return false;
+ default:
+ // Stick to the same node for other cases if host affinity is enabled.
+ return this.containerHostAffinityEnabled;
+ }
+ }
+
+ /**
+ * Handle the completion of a container. A new container will be requested to replace the one
+ * that just exited. Depending on the exit status and if container host affinity is enabled,
+ * the new container may or may not try to be started on the same node.
+ *
+ * A container completes in either of the following conditions: 1) some error happens in the
+ * container and caused the container to exit, 2) the container gets killed due to some reason,
+ * for example, if it runs over the allowed amount of virtual or physical memory, 3) the gets
+ * preempted by the ResourceManager, or 4) the container gets stopped by the ApplicationMaster.
+ * A replacement container is needed in all but the last case.
+ */
+ protected void handleContainerCompletion(ContainerStatus containerStatus) {
+ ContainerInfo completedContainerInfo = this.containerMap.remove(containerStatus.getContainerId());
+ //Get the Helix instance name for the completed container. Because callbacks are processed asynchronously, we might
+ //encounter situations where handleContainerCompletion() is called before onContainersAllocated(), resulting in the
+ //containerId missing from the containersMap.
+ // We use removedContainerID to remember these containers and remove them from containerMap later when we call requestTargetNumberOfContainers method
+ if (completedContainerInfo == null) {
+ removedContainerID.putIfAbsent(containerStatus.getContainerId(), "");
+ }
+ String completedInstanceName = UNKNOWN_HELIX_INSTANCE;
+
+ String helixTag = completedContainerInfo == null ? helixInstanceTags : completedContainerInfo.getHelixTag();
+ if (completedContainerInfo != null) {
+ allocatedContainerCountMap.get(helixTag).decrementAndGet();
+ }
+
+ LOGGER.info(String.format("Container %s running Helix instance %s with tag %s has completed with exit status %d",
+ containerStatus.getContainerId(), completedInstanceName, helixTag, containerStatus.getExitStatus()));
+
+ if (!Strings.isNullOrEmpty(containerStatus.getDiagnostics())) {
+ LOGGER.info(String.format("Received the following diagnostics information for container %s: %s",
+ containerStatus.getContainerId(), containerStatus.getDiagnostics()));
+ }
+
+ switch(containerStatus.getExitStatus()) {
+ case(ContainerExitStatus.ABORTED):
+ if (handleAbortedContainer(containerStatus, completedContainerInfo, completedInstanceName)) {
+ return;
+ }
+ break;
+ case(1): // Same as linux exit status 1 Often occurs when launch_container.sh failed
+ LOGGER.info("Exit status 1. CompletedContainerInfo={}", completedContainerInfo);
+ break;
+ default:
+ break;
+ }
+
+ if (this.shutdownInProgress) {
+ return;
+ }
+ if(completedContainerInfo != null) {
+ this.helixInstanceRetryCount.putIfAbsent(completedInstanceName, new AtomicInteger(0));
+ int retryCount = this.helixInstanceRetryCount.get(completedInstanceName).incrementAndGet();
+
+ // Populate event metadata
+ Optional> eventMetadataBuilder = Optional.absent();
+ if (this.eventSubmitter.isPresent()) {
+ eventMetadataBuilder = Optional.of(buildContainerStatusEventMetadata(containerStatus));
+ eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.HELIX_INSTANCE_ID, completedInstanceName);
+ eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.CONTAINER_STATUS_RETRY_ATTEMPT, retryCount + "");
+ }
+
+ if (this.helixInstanceMaxRetries > 0 && retryCount > this.helixInstanceMaxRetries) {
+ if (this.eventSubmitter.isPresent()) {
+ this.eventSubmitter.get()
+ .submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION, eventMetadataBuilder.get().build());
+ }
+
+ LOGGER.warn("Maximum number of retries has been achieved for Helix instance " + completedInstanceName);
+ return;
+ }
+
+ // Add the Helix instance name of the completed container to the set of unused
+ // instance names so they can be reused by a replacement container.
+ LOGGER.info("Adding instance {} to the pool of unused instances", completedInstanceName);
+ this.unusedHelixInstanceNames.add(completedInstanceName);
+
+ if (this.eventSubmitter.isPresent()) {
+ this.eventSubmitter.get()
+ .submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION, eventMetadataBuilder.get().build());
+ }
+ }
+ Optional newContainerResource = completedContainerInfo != null ?
+ Optional.of(completedContainerInfo.getContainer().getResource()) : Optional.absent();
+ LOGGER.info("Requesting a new container to replace {} to run Helix instance {} with helix tag {} and resource {}",
+ containerStatus.getContainerId(), completedInstanceName, helixTag, newContainerResource.orNull());
+ this.eventBus.post(new NewContainerRequest(
+ shouldStickToTheSameNode(containerStatus.getExitStatus()) && completedContainerInfo != null ?
+ Optional.of(completedContainerInfo.getContainer()) : Optional.absent(), newContainerResource));
+ }
+
+ private boolean handleAbortedContainer(ContainerStatus containerStatus, ContainerInfo completedContainerInfo,
+ String completedInstanceName) {
+ if (this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) != null) {
+ LOGGER.info("Container release requested, so not spawning a replacement for containerId {}", containerStatus.getContainerId());
+ if (completedContainerInfo != null) {
+ LOGGER.info("Adding instance {} to the pool of unused instances", completedInstanceName);
+ this.unusedHelixInstanceNames.add(completedInstanceName);
+ }
+ return true;
+ }
+ LOGGER.info("Container {} aborted due to lost NM", containerStatus.getContainerId());
+ return false;
+ }
+
+ private ImmutableMap.Builder buildContainerStatusEventMetadata(ContainerStatus containerStatus) {
+ ImmutableMap.Builder eventMetadataBuilder = new ImmutableMap.Builder<>();
+ eventMetadataBuilder.put(GobblinYarnMetricTagNames.CONTAINER_ID, containerStatus.getContainerId().toString());
+ eventMetadataBuilder.put(GobblinYarnEventConstants.EventMetadata.CONTAINER_STATUS_CONTAINER_STATE,
+ containerStatus.getState().toString());
+ if (ContainerExitStatus.INVALID != containerStatus.getExitStatus()) {
+ eventMetadataBuilder.put(GobblinYarnEventConstants.EventMetadata.CONTAINER_STATUS_EXIT_STATUS,
+ containerStatus.getExitStatus() + "");
+ }
+ if (!Strings.isNullOrEmpty(containerStatus.getDiagnostics())) {
+ eventMetadataBuilder.put(GobblinYarnEventConstants.EventMetadata.CONTAINER_STATUS_EXIT_DIAGNOSTICS,
+ containerStatus.getDiagnostics());
+ }
+
+ return eventMetadataBuilder;
+ }
+
+ /**
+ * Get the number of matching container requests for the specified resource memory and cores.
+ * Due to YARN-1902 and YARN-660, this API is not 100% accurate. {@link AMRMClientCallbackHandler#onContainersAllocated(List)}
+ * contains logic for best effort clean up of requests, and the resource tend to match the allocated container. So in practice the count is pretty accurate.
+ *
+ * This API call gets the count of container requests for containers that are > resource if there is no request with the exact same resource
+ * The RM can return containers that are larger (because of normalization etc).
+ * Container may be larger by memory or cpu (e.g. container (1000M, 3cpu) can fit request (1000M, 1cpu) or request (500M, 3cpu).
+ *
+ */
+ private int getMatchingRequestsCount(Resource resource) {
+ Integer priorityNum = resourcePriorityMap.get(resource.toString());
+ if (priorityNum == null) { // request has never been made with this resource
+ return 0;
+ }
+ Priority priority = Priority.newInstance(priorityNum);
+
+ // Each collection in the list represents a set of requests with each with the same resource requirement.
+ // The reason for differing resources can be due to normalization
+ List extends Collection> outstandingRequests = getAmrmClientAsync().getMatchingRequests(priority, ResourceRequest.ANY, resource);
+ return outstandingRequests == null ? 0 : outstandingRequests.stream()
+ .filter(Objects::nonNull)
+ .mapToInt(Collection::size)
+ .sum();
+ }
+
+ /**
+ * A custom implementation of {@link AMRMClientAsync.CallbackHandler}.
+ */
+ private class AMRMClientCallbackHandler implements AMRMClientAsync.CallbackHandler {
+
+ private volatile boolean done = false;
+
+ @Override
+ public void onContainersCompleted(List statuses) {
+ for (ContainerStatus containerStatus : statuses) {
+ handleContainerCompletion(containerStatus);
+ }
+ }
+
+ @Override
+ public void onContainersAllocated(List containers) {
+ for (final Container container : containers) {
+ String containerId = container.getId().toString();
+ String containerHelixTag = YarnHelixUtils.findHelixTagForContainer(container, allocatedContainerCountMap, yarnContainerRequest);
+ if (Strings.isNullOrEmpty(containerHelixTag)) {
+ containerHelixTag = helixInstanceTags;
+ }
+ if (eventSubmitter.isPresent()) {
+ eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.CONTAINER_ALLOCATION,
+ GobblinYarnMetricTagNames.CONTAINER_ID, containerId);
+ }
+
+ LOGGER.info("Container {} has been allocated with resource {} for helix tag {}",
+ container.getId(), container.getResource(), containerHelixTag);
+
+ //Iterate over the (thread-safe) set of unused instances to find the first instance that is not currently live.
+ //Once we find a candidate instance, it is removed from the set.
+ String instanceName = null;
+
+ //Ensure that updates to unusedHelixInstanceNames are visible to other threads that might concurrently
+ //invoke the callback on container allocation.
+ synchronized (this) {
+ Iterator iterator = unusedHelixInstanceNames.iterator();
+ while (iterator.hasNext()) {
+ instanceName = iterator.next();
+ }
+ }
+
+ ContainerInfo containerInfo = new ContainerInfo(container, instanceName, containerHelixTag);
+ containerMap.put(container.getId(), containerInfo);
+ allocatedContainerCountMap.putIfAbsent(containerHelixTag, new AtomicInteger(0));
+ allocatedContainerCountMap.get(containerHelixTag).incrementAndGet();
+
+ // Find matching requests and remove the request (YARN-660). We the scheduler are responsible
+ // for cleaning up requests after allocation based on the design in the described ticket.
+ // YARN does not have a delta request API and the requests are not cleaned up automatically.
+ // Try finding a match first with the host as the resource name then fall back to any resource match.
+ // Also see YARN-1902. Container count will explode without this logic for removing container requests.
+ List extends Collection> matchingRequests = amrmClientAsync
+ .getMatchingRequests(container.getPriority(), container.getNodeHttpAddress(), container.getResource());
+
+ if (matchingRequests.isEmpty()) {
+ LOGGER.debug("Matching request by host {} not found", container.getNodeHttpAddress());
+
+ matchingRequests = amrmClientAsync
+ .getMatchingRequests(container.getPriority(), ResourceRequest.ANY, container.getResource());
+ }
+
+ if (!matchingRequests.isEmpty()) {
+ AMRMClient.ContainerRequest firstMatchingContainerRequest = matchingRequests.get(0).iterator().next();
+ LOGGER.debug("Found matching requests {}, removing first matching request {}",
+ matchingRequests, firstMatchingContainerRequest);
+
+ amrmClientAsync.removeContainerRequest(firstMatchingContainerRequest);
+ }
+
+ containerLaunchExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LOGGER.info("Starting container " + containerId);
+
+ nmClientAsync.startContainerAsync(container, newContainerLaunchContext(containerInfo));
+ } catch (IOException ioe) {
+ LOGGER.error("Failed to start container " + containerId, ioe);
+ }
+ }
+ });
+ }
+ }
+
+ @Override
+ public void onShutdownRequest() {
+ if (eventSubmitter.isPresent()) {
+ eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.SHUTDOWN_REQUEST);
+ }
+
+ LOGGER.info("Received shutdown request from the ResourceManager");
+ this.done = true;
+ eventBus.post(new ClusterManagerShutdownRequest());
+ }
+
+ @Override
+ public void onNodesUpdated(List updatedNodes) {
+ for (NodeReport nodeReport : updatedNodes) {
+ LOGGER.info("Received node update report: " + nodeReport);
+ }
+ }
+
+ @Override
+ public float getProgress() {
+ return this.done ? 1.0f : 0.0f;
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ if (eventSubmitter.isPresent()) {
+ eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.ERROR,
+ GobblinYarnEventConstants.EventMetadata.ERROR_EXCEPTION, Throwables.getStackTraceAsString(t));
+ }
+
+ LOGGER.error("Received error: " + t, t);
+ this.done = true;
+ eventBus.post(new ClusterManagerShutdownRequest());
+ }
+ }
+
+ /**
+ * A custom implementation of {@link NMClientAsync.CallbackHandler}.
+ */
+ class NMClientCallbackHandler implements NMClientAsync.CallbackHandler {
+
+ @Override
+ public void onContainerStarted(ContainerId containerId, Map allServiceResponse) {
+ if (eventSubmitter.isPresent()) {
+ eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.CONTAINER_STARTED,
+ GobblinYarnMetricTagNames.CONTAINER_ID, containerId.toString());
+ }
+
+ LOGGER.info(String.format("Container %s has been started", containerId));
+ }
+
+ @Override
+ public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
+ if (eventSubmitter.isPresent()) {
+ eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.CONTAINER_STATUS_RECEIVED,
+ buildContainerStatusEventMetadata(containerStatus).build());
+ }
+
+ LOGGER.info(String.format("Received container status for container %s: %s", containerId, containerStatus));
+ }
+
+ @Override
+ public void onContainerStopped(ContainerId containerId) {
+ if (eventSubmitter.isPresent()) {
+ eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.CONTAINER_STOPPED,
+ GobblinYarnMetricTagNames.CONTAINER_ID, containerId.toString());
+ }
+
+ LOGGER.info(String.format("Container %s has been stopped", containerId));
+ if (containerMap.isEmpty()) {
+ synchronized (allContainersStopped) {
+ allContainersStopped.notify();
+ }
+ }
+ }
+
+ @Override
+ public void onStartContainerError(ContainerId containerId, Throwable t) {
+ if (eventSubmitter.isPresent()) {
+ eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.CONTAINER_START_ERROR,
+ GobblinYarnMetricTagNames.CONTAINER_ID, containerId.toString(),
+ GobblinYarnEventConstants.EventMetadata.ERROR_EXCEPTION, Throwables.getStackTraceAsString(t));
+ }
+
+ LOGGER.error(String.format("Failed to start container %s due to error %s", containerId, t));
+ }
+
+ @Override
+ public void onGetContainerStatusError(ContainerId containerId, Throwable t) {
+ if (eventSubmitter.isPresent()) {
+ eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.CONTAINER_GET_STATUS_ERROR,
+ GobblinYarnMetricTagNames.CONTAINER_ID, containerId.toString(),
+ GobblinYarnEventConstants.EventMetadata.ERROR_EXCEPTION, Throwables.getStackTraceAsString(t));
+ }
+
+ LOGGER.error(String.format("Failed to get status for container %s due to error %s", containerId, t));
+ }
+
+ @Override
+ public void onStopContainerError(ContainerId containerId, Throwable t) {
+ if (eventSubmitter.isPresent()) {
+ eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.CONTAINER_STOP_ERROR,
+ GobblinYarnMetricTagNames.CONTAINER_ID, containerId.toString(),
+ GobblinYarnEventConstants.EventMetadata.ERROR_EXCEPTION, Throwables.getStackTraceAsString(t));
+ }
+
+ LOGGER.error(String.format("Failed to stop container %s due to error %s", containerId, t));
+ }
+ }
+
+ // Class encapsulates Container instances, Helix participant IDs of the containers, Helix Tag, and
+ // initial startup command
+ @Getter
+ class ContainerInfo {
+ private final Container container;
+ private final String helixParticipantId;
+ private final String helixTag;
+ private final String startupCommand;
+
+ public ContainerInfo(Container container, String helixParticipantId, String helixTag) {
+ this.container = container;
+ this.helixParticipantId = helixParticipantId;
+ this.helixTag = helixTag;
+ this.startupCommand = YarnTemporalService.this.buildContainerCommand(container, helixParticipantId, helixTag);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("ContainerInfo{ container=%s, helixParticipantId=%s, helixTag=%s, startupCommand=%s }",
+ container.getId(), helixParticipantId, helixTag, startupCommand);
+ }
+ }
+}
+
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index cee0abedb3e..8a395926bb3 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -102,6 +102,7 @@ ext.externalDependency = [
"confluentJsonSerializer": "io.confluent:kafka-json-serializer:" + confluentVersion,
"zkClient": "com.101tec:zkclient:0.7",
"quartz": "org.quartz-scheduler:quartz:2.2.3",
+ "temporal-sdk": "io.temporal:temporal-sdk:1.18.1",
"testng": "org.testng:testng:6.14.3",
"junit": "junit:junit:4.13.2",
"mockserver":"org.mock-server:mockserver-netty:3.10.4",