Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Depreacate EmrContainerSensorAsync and EmrStepSensorAsync #1390

Merged
merged 4 commits into from
Dec 21, 2023

Conversation

Lee-W
Copy link
Contributor

@Lee-W Lee-W commented Dec 20, 2023

What's changed

Deprecate EmrContainerSensorAsync and EmrStepSensorAsync and fallback to EmrContainerSensor and EmrStepSensor with deferrable=True

Why this change

The logic of EmrContainerSensorAsync and EmrStepSensorAsync has been contributed back to OSS airflow as part of EmrContainerSensor and EmrStepSensor

EmrContainerSensorAsync

def execute(self, context: Context) -> None:
"""Defers trigger class to poll for state of the job run until it reaches a failure state or success state"""
if not poke(self, context):
self.defer(
timeout=timedelta(seconds=self.timeout),
trigger=EmrContainerSensorTrigger(
virtual_cluster_id=self.virtual_cluster_id,
job_id=self.job_id,
max_tries=self.max_retries,
aws_conn_id=self.aws_conn_id,
poll_interval=self.poll_interval,
),
method_name="execute_complete",
)
# Ignoring the override type check because the parent class specifies "context: Any" but specifying it as
# "context: Context" is accurate as it's more specific.
def execute_complete(self, context: Context, event: dict[str, str]) -> None: # type: ignore[override]
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
if event:
if event["status"] == "error":
raise_error_or_skip_exception(self.soft_fail, event["message"])
self.log.info(event["message"])
return None

EmrContainerSensor

https://github.com/apache/airflow/blob/999b70178a1f5d891fd2c88af4831a4ba4c2cbc9/airflow/providers/amazon/aws/sensors/emr.py#L318-L346

EmrStepSensorAsync

def execute(self, context: Context) -> None:
"""Deferred and give control to trigger"""
if not poke(self, context):
self.defer(
timeout=timedelta(seconds=self.timeout),
trigger=EmrStepSensorTrigger(
job_flow_id=self.job_flow_id,
step_id=self.step_id,
target_states=self.target_states,
failed_states=self.failed_states,
aws_conn_id=self.aws_conn_id,
poke_interval=self.poke_interval,
),
method_name="execute_complete",
)
def execute_complete(self, context: Context, event: dict[str, Any]) -> None: # type: ignore[override]
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
if event:
if event["status"] == "error":
raise_error_or_skip_exception(self.soft_fail, event["message"])
self.log.info(event.get("message"))
self.log.info("%s completed successfully.", self.job_flow_id)

EmrStepSensor

https://github.com/apache/airflow/blob/999b70178a1f5d891fd2c88af4831a4ba4c2cbc9/airflow/providers/amazon/aws/sensors/emr.py#L644-L668

@Lee-W Lee-W force-pushed the depreacate-emr-sensors branch from 549c535 to b99adf0 Compare December 20, 2023 14:31
@Lee-W Lee-W marked this pull request as ready for review December 20, 2023 14:32
Copy link

codecov bot commented Dec 20, 2023

Codecov Report

All modified and coverable lines are covered by tests ✅

Comparison is base (6781996) 98.57% compared to head (148fba4) 98.55%.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1390      +/-   ##
==========================================
- Coverage   98.57%   98.55%   -0.02%     
==========================================
  Files          91       91              
  Lines        5460     5403      -57     
==========================================
- Hits         5382     5325      -57     
  Misses         78       78              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Collaborator

@phanikumv phanikumv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about the integration DAGs that might be using these classes, are we planning to remove them at a later point of time?

@phanikumv
Copy link
Collaborator

Please add description about why you are doing this change

Copy link
Collaborator

@pankajkoti pankajkoti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will also need to remove corresponding trigger modules, no?

@Lee-W
Copy link
Contributor Author

Lee-W commented Dec 21, 2023

what about the integration DAGs that might be using this class, are we planning to remove them at a later point of time?

Yes, I think we should keep it for a while and remove it after we deprecate the whole repo.

Please add description about why you are doing this change

Sure. let me reformat and reword it to make it easier to read.

@Lee-W
Copy link
Contributor Author

Lee-W commented Dec 21, 2023

We will also need to remove corresponding trigger modules, no?

Yep, we should remove it as well. Thanks for reminding me!

@Lee-W Lee-W merged commit f0061d3 into main Dec 21, 2023
11 checks passed
@Lee-W Lee-W deleted the depreacate-emr-sensors branch December 21, 2023 06:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants