Skip to content

Commit

Permalink
Avoid change attributes into the constructor in Apache Beam operators (
Browse files Browse the repository at this point in the history
…apache#37934)

* Avoid change attributes into the init method in Apache Beam operators

* Remove commented out code

* Remove useless constants
  • Loading branch information
Taragolis authored Mar 8, 2024
1 parent 804ba59 commit 2ce28d5
Show file tree
Hide file tree
Showing 2 changed files with 345 additions and 284 deletions.
42 changes: 23 additions & 19 deletions airflow/providers/apache/beam/operators/beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,19 @@ def __init__(
self.runner = runner
self.default_pipeline_options = default_pipeline_options or {}
self.pipeline_options = pipeline_options or {}
# ``dataflow_config`` type will resolve into the execute method
self.dataflow_config = dataflow_config or {} # type: ignore[assignment]
self.gcp_conn_id = gcp_conn_id
if isinstance(dataflow_config, dict):
self.dataflow_config = DataflowConfiguration(**dataflow_config)
else:
self.dataflow_config = dataflow_config or DataflowConfiguration()
self.beam_hook: BeamHook
self.dataflow_hook: DataflowHook | None = None
self.dataflow_job_id: str | None = None

def _cast_dataflow_config(self):
if isinstance(self.dataflow_config, dict):
self.dataflow_config = DataflowConfiguration(**self.dataflow_config)
else:
self.dataflow_config = self.dataflow_config or DataflowConfiguration()

if self.dataflow_config and self.runner.lower() != BeamRunnerType.DataflowRunner.lower():
self.log.warning(
"dataflow_config is defined but runner is different than DataflowRunner (%s)", self.runner
Expand Down Expand Up @@ -333,14 +337,14 @@ def __init__(
self.py_interpreter = py_interpreter
self.py_requirements = py_requirements
self.py_system_site_packages = py_system_site_packages
self.pipeline_options = copy.deepcopy(self.pipeline_options)
self.pipeline_options.setdefault("labels", {}).update(
{"airflow-version": "v" + version.replace(".", "-").replace("+", "-")}
)
self.deferrable = deferrable

def execute(self, context: Context):
"""Execute the Apache Beam Python Pipeline."""
self._cast_dataflow_config()
self.pipeline_options.setdefault("labels", {}).update(
{"airflow-version": "v" + version.replace(".", "-").replace("+", "-")}
)
(
self.is_dataflow,
self.dataflow_job_name,
Expand Down Expand Up @@ -527,6 +531,7 @@ def __init__(

def execute(self, context: Context):
"""Execute the Apache Beam Python Pipeline."""
self._cast_dataflow_config()
(
self.is_dataflow,
self.dataflow_job_name,
Expand Down Expand Up @@ -727,27 +732,26 @@ def __init__(
dataflow_config=dataflow_config,
**kwargs,
)
self.go_file = go_file
self.launcher_binary = launcher_binary
self.worker_binary = worker_binary or launcher_binary

def execute(self, context: Context):
"""Execute the Apache Beam Pipeline."""
if not exactly_one(self.go_file, self.launcher_binary):
raise ValueError("Exactly one of `go_file` and `launcher_binary` must be set")

self._cast_dataflow_config()
if self.dataflow_config.impersonation_chain:
self.log.info(
self.log.warning(
"Impersonation chain parameter is not supported for Apache Beam GO SDK and will be skipped "
"in the execution"
)
self.dataflow_support_impersonation = False

if not exactly_one(go_file, launcher_binary):
raise ValueError("Exactly one of `go_file` and `launcher_binary` must be set")

self.go_file = go_file
self.launcher_binary = launcher_binary
self.worker_binary = worker_binary or launcher_binary
self.pipeline_options = copy.deepcopy(self.pipeline_options)
self.pipeline_options.setdefault("labels", {}).update(
{"airflow-version": "v" + version.replace(".", "-").replace("+", "-")}
)

def execute(self, context: Context):
"""Execute the Apache Beam Pipeline."""
(
is_dataflow,
dataflow_job_name,
Expand Down
Loading

0 comments on commit 2ce28d5

Please sign in to comment.