Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PipesEMRClient looses connection with EMR machine #27469

Open
Rahkovsky opened this issue Jan 31, 2025 · 0 comments
Open

PipesEMRClient looses connection with EMR machine #27469

Rahkovsky opened this issue Jan 31, 2025 · 0 comments
Labels
type: bug Something isn't working

Comments

@Rahkovsky
Copy link

Rahkovsky commented Jan 31, 2025

What's the issue?

I use dagster to start EMR machine and monitor the logs via PipesEMRClient. Sometimes it is working fine, however for longer jobs (> 40 min) the asset creation fails because PipesEMRClient losses connection with EMR thinking that it is terminated. The EMR keeps running fine, but dagster reports asset failure.

When I googled a similar problem I found suggestion to increase Delay and MaxAttempts on client.water.

Do you have any suggestions?

What did you expect to happen?

I expect PipesEMRClient not to lose connection with EMR

How to reproduce?

It is hard to reproduce because I use large confidential data pipeline and the error inconsistently.

Dagster version

1.9.8

Deployment type

Local

Deployment details

Python=3.11.6
Dagster=1.9.8

Additional information

Asset that creates EMR run:

pipes_emr_resource = PipesEMRClient(
    message_reader=PipesS3MessageReader(
        client=boto3.client("s3"),
        bucket=EnvVar("DS_BUCKET").get_value(),  # Bucket name only
        include_stdio_in_messages=True
    )
)

def build_emr_trained_model(context: AssetExecutionContext):
    """

    """
    duckdb_resource, asset_name, args = duck_factory_setup(context)  # DuckDB session, asset/duck table name
    pipes_emr_client: PipesEMRClient = context.resources.pipes_emr_client  # Piper EMR client to view EMR logs in real-time

    if args.emr_folded_data is True:
        folds_array = np.array(args.folds, dtype=np.int_)
        test_fold = np.array([args.fold], dtype=np.int_)  # Automatically handles type conversion
        train_folds = np.setdiff1d(folds_array, test_fold)
        args.train_data = args.file_pattern.replace("{fold}", f"[{','.join(train_folds.astype(str))}]")
        args.test_data = args.file_pattern.replace("{fold}", f"[{test_fold[0]}]")
        context.log.info(f"train data {args.train_data}")
        context.log.info(f"test data {args.test_data}")
    model_args_list = []
    keys = vars(
        CatBoostModelConfig(model="model").training_args_func[0]).keys()  # get list of args from training scripts
    for name in keys:  # Construct list of keys and arguments for CLI command
        value = getattr(args, name)  # get values from build_config.yaml and common defaults
        if (value is not None) and (value != []) and (value is not False):
            model_args_list.append(f"--{name.replace('_', '-')}")
            if type(value) != bool:  # for bool arguments we don't need to add the value
                if (type(value) == list) or (type(value) == tuple):
                    model_args_list = model_args_list + list(value)  # add list of values
                    context.log.info(model_args_list)
                else:
                    model_args_list.append(str(value))  # add single value
    context.log.info(model_args_list)
    train_script = CatBoostModelConfig(model=args.model).script_file  # training script to run on EMR

    emr_args = [
                   EnvVar("EMR_RUN_SCRIPT_PATH").get_value(),
                   train_script,
                   context.op_config["git_branch"],
                   context.op_config["s3_data_loc"],
               ] + model_args_list
    context.log.info(emr_args)
    job_flow_params = dict(
        Name=f"{args.emr_job_name} fold {args.fold} on {args.emr_machine}",
        EbsRootVolumeSize=100,
        LogUri=EnvVar("LOG_S3_LOCATION").get_value(),
        ReleaseLabel="emr-7.5.0",
        Instances={
            "InstanceGroups": [
                {
                    "Name": "Single Node Cluster",
                    "Market": "ON_DEMAND",
                    "InstanceRole": "MASTER",
                    "InstanceType": args.emr_machine,
                    "InstanceCount": 1,
                    "EbsConfiguration": {
                        "EbsBlockDeviceConfigs": [
                            {
                                "VolumeSpecification": {
                                    "VolumeType": "gp3",
                                    "Iops": 3000,
                                    "SizeInGB": 100,
                                },
                                "VolumesPerInstance": 1,
                            }
                        ],
                        "EbsOptimized": True,
                    },
                }
            ],
            "Ec2KeyName": "...",
            "KeepJobFlowAliveWhenNoSteps": False,
            "TerminationProtected": False,
            "Ec2SubnetId": EnvVar("EC2_SUBNET_ID").get_value(),
            "EmrManagedMasterSecurityGroup": EnvVar("EMR_MANAGED_MASTER_SECURITY_GROUP").get_value(),
            "EmrManagedSlaveSecurityGroup": EnvVar("EMR_MANAGED_SLAVE_SECURITY_GROUP").get_value(),
            "AdditionalMasterSecurityGroups": [EnvVar("ADDITIONAL_MASTER_SECURITY_GROUP").get_value()],
            "AdditionalSlaveSecurityGroups": [EnvVar("ADDITIONAL_SLAVE_SECURITY_GROUP").get_value()],
        },
        Steps=[
            {
                "Name": "Job",
                "ActionOnFailure": "TERMINATE_CLUSTER",
                "HadoopJarStep": {
                    "Jar": "s3://us-west-2.elasticmapreduce/libs/script-runner/script-runner.jar",
                    "Args": emr_args,
                },
            }
        ],
        BootstrapActions=[
            {
                "Name": "bootstrap action",
                "ScriptBootstrapAction": {
                    "Path": EnvVar("EMR_BOOSTRAP_SCRIPT_PATH").get_value(),
                },
            }
        ],
        VisibleToAllUsers=True,
        JobFlowRole=EnvVar("JOB_FLOW_ROLE").get_value(),
        ServiceRole=EnvVar("SERVICE_ROLE").get_value(),
        Configurations=[
            {
                "Classification": "yarn-site",
                "Properties": {
                    "yarn.nodemanager.vmem-check-enabled": "false",
                    "yarn.log-aggregation-enable": "true",
                },
            },
            {
                "Classification": "core-site",
                "Properties": {
                    "fs.s3a.connection.maximum": "500",
                    "fs.s3a.connection.timeout": "2000",
                },
            },
        ],
    )

    result = pipes_emr_client.run(
        context=context,
        run_job_flow_params=job_flow_params,
        extras={"verbose_logging": True},
    ).get_materialize_result()

    LOG_WAIT_TIME = 30  # Adjust as needed
    context.log.info(f"Waiting {LOG_WAIT_TIME} seconds for logs to sync to S3...")
    time.sleep(LOG_WAIT_TIME)
    context.log.info(f"result: {result}")

    metadata = result.metadata or {}

    mlflow_run = metadata.get("mlflow_artifact_uri")
    mlflow_hash = metadata.get("mlflow_hash").text 
    log_dagster_on_mlflow(context, mlflow_hash)  # log dagster run id & build_config.yaml on mlflow

    df = pd.DataFrame({"mlflow_artifact_uri": mlflow_run, "fold": [args.fold]})

    with duckdb_locked_connection(
            duckdb_resource) as conn:  # return table with fold number and mlflow links of trained models
        conn.register('mlflow_df', df)
        conn.sql(f"CREATE OR REPLACE TABLE {asset_name} AS SELECT * FROM mlflow_df")

    return get_duck_metadata(duckdb_resource, context, asset_name)

Error:


Debug info: reader for stdout of EMR step...
  warnings.warn(
/.../python3.11/site-packages/dagster/_core/pipes/utils.py:501: UserWarning: [pipes] Attempted to read log for reader PipesS3LogReader(.../stderr.txt.gz) but log was still not written 60 seconds after session close. Abandoning reader 1.
Debug info: reader for stderr of EMR step ...
  warnings.warn(
2025-01-31 20:56:17 +0000 - dagster - WARNING - ... - ... - ... - [pipes] did not receive closed message from external process. Buffered messages may have been discarded without being delivered. Use `open_dagster_pipes` as a context manager (a with block) to ensure that cleanup is successfully completed. If that is not possible, manually call `PipesContext.close()` before process exit.
2025-01-31 20:56:17 +0000 - dagster - ERROR - job- 077adc84-4680-47bf-80ee-ef1e7d054ab9 - 3057636 - ... - STEP_FAILURE - Execution of step "..." failed.

dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "..."::

botocore.exceptions.WaiterError: Waiter ClusterTerminated failed: Max attempts exceeded

Stack Trace:
  File "/....env/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/.../.env/lib/python3.11/site-packages/dagster/_utils/__init__.py", line 490, in iterate_with_context
    next_output = next(iterator)
                  ^^^^^^^^^^^^^^
  File "/.../pipelines/assets/factory.py", line 56, in asset_function
    yield from factory_function(context, **inputs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.../pipelines/assets/logic.py", line 966, in build_emr_trained_model
    result = pipes_emr_client.run(
             ^^^^^^^^^^^^^^^^^^^^^
  File ".../.env/lib/python3.11/site-packages/dagster_aws/pipes/clients/emr.py", line 150, in run
    wait_response = self._wait_for_completion(context, start_response)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../.env/lib/python3.11/site-packages/dagster_aws/pipes/clients/emr.py", line 235, in _wait_for_completion
    self._client.get_waiter("cluster_terminated").wait(ClusterId=cluster_id)
  File ".../.env/lib/python3.11/site-packages/botocore/waiter.py", line 55, in wait
    Waiter.wait(self, **kwargs)
  File ".../.env/lib/python3.11/site-packages/botocore/waiter.py", line 387, in wait
    raise WaiterError(

2025-01-31 20:56:17 +0000 - dagster - DEBUG - ... - 077adc84-4680-47bf-80ee-ef1e7d054ab9 - emr_claims_train_0 - HOOK_SKIPPED - Skipped the execution of hook "log_configs". It did not meet its triggering condition during the execution of "emr_claims_train_0".

Image

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

@Rahkovsky Rahkovsky added the type: bug Something isn't working label Jan 31, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant