Skip to content

Commit

Permalink
fix: disabled_for_operators now stops whole event emission (apache#38033
Browse files Browse the repository at this point in the history
)

Signed-off-by: Kacper Muda <[email protected]>
  • Loading branch information
kacpermuda authored Mar 25, 2024
1 parent 32ed83b commit 9c4e333
Show file tree
Hide file tree
Showing 20 changed files with 913 additions and 355 deletions.
6 changes: 3 additions & 3 deletions airflow/providers/dbt/cloud/utils/openlineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ def generate_openlineage_events_from_dbt_cloud_run(
"""
from openlineage.common.provider.dbt import DbtCloudArtifactProcessor, ParentRunMetadata

from airflow.providers.openlineage.conf import namespace
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.providers.openlineage.plugins.adapter import (
_DAG_NAMESPACE,
_PRODUCER,
OpenLineageAdapter,
)
Expand Down Expand Up @@ -110,7 +110,7 @@ async def get_artifacts_for_steps(steps, artifacts):

processor = DbtCloudArtifactProcessor(
producer=_PRODUCER,
job_namespace=_DAG_NAMESPACE,
job_namespace=namespace(),
skip_errors=False,
logger=operator.log,
manifest=manifest,
Expand All @@ -130,7 +130,7 @@ async def get_artifacts_for_steps(steps, artifacts):
parent_job = ParentRunMetadata(
run_id=parent_run_id,
job_name=f"{task_instance.dag_id}.{task_instance.task_id}",
job_namespace=_DAG_NAMESPACE,
job_namespace=namespace(),
)
processor.dbt_run_metadata = parent_job

Expand Down
98 changes: 98 additions & 0 deletions airflow/providers/openlineage/conf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import os
from typing import Any

from airflow.compat.functools import cache
from airflow.configuration import conf

_CONFIG_SECTION = "openlineage"


@cache
def config_path(check_legacy_env_var: bool = True) -> str:
"""[openlineage] config_path."""
option = conf.get(_CONFIG_SECTION, "config_path", fallback="")
if check_legacy_env_var and not option:
option = os.getenv("OPENLINEAGE_CONFIG", "")
return option


@cache
def is_source_enabled() -> bool:
"""[openlineage] disable_source_code."""
option = conf.get(_CONFIG_SECTION, "disable_source_code", fallback="")
if not option:
option = os.getenv("OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE", "")
return option.lower() not in ("true", "1", "t")


@cache
def disabled_operators() -> set[str]:
"""[openlineage] disabled_for_operators."""
option = conf.get(_CONFIG_SECTION, "disabled_for_operators", fallback="")
return set(operator.strip() for operator in option.split(";") if operator.strip())


@cache
def custom_extractors() -> set[str]:
"""[openlineage] extractors."""
option = conf.get(_CONFIG_SECTION, "extractors", fallback="")
if not option:
option = os.getenv("OPENLINEAGE_EXTRACTORS", "")
return set(extractor.strip() for extractor in option.split(";") if extractor.strip())


@cache
def namespace() -> str:
"""[openlineage] namespace."""
option = conf.get(_CONFIG_SECTION, "namespace", fallback="")
if not option:
option = os.getenv("OPENLINEAGE_NAMESPACE", "default")
return option


@cache
def transport() -> dict[str, Any]:
"""[openlineage] transport."""
option = conf.getjson(_CONFIG_SECTION, "transport", fallback={})
if not isinstance(option, dict):
raise ValueError(f"OpenLineage transport `{option}` is not a dict")
return option


@cache
def is_disabled() -> bool:
"""[openlineage] disabled + some extra checks."""

def _is_true(val):
return str(val).lower().strip() in ("true", "1", "t")

option = conf.get(_CONFIG_SECTION, "disabled", fallback="")
if _is_true(option):
return True

option = os.getenv("OPENLINEAGE_DISABLED", "")
if _is_true(option):
return True

# Check if both 'transport' and 'config_path' are not present and also
# if legacy 'OPENLINEAGE_URL' environment variables is not set
return transport() == {} and config_path(True) == "" and os.getenv("OPENLINEAGE_URL", "") == ""
30 changes: 0 additions & 30 deletions airflow/providers/openlineage/extractors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from functools import cached_property
from typing import TYPE_CHECKING

from attrs import Factory, define

from airflow.configuration import conf
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import TaskInstanceState

Expand Down Expand Up @@ -64,31 +62,10 @@ def get_operator_classnames(cls) -> list[str]:
"""
raise NotImplementedError()

@cached_property
def disabled_operators(self) -> set[str]:
return set(
operator.strip()
for operator in conf.get("openlineage", "disabled_for_operators", fallback="").split(";")
)

@cached_property
def _is_operator_disabled(self) -> bool:
fully_qualified_class_name = (
self.operator.__class__.__module__ + "." + self.operator.__class__.__name__
)
return fully_qualified_class_name in self.disabled_operators

@abstractmethod
def _execute_extraction(self) -> OperatorLineage | None: ...

def extract(self) -> OperatorLineage | None:
if self._is_operator_disabled:
self.log.debug(
"Skipping extraction for operator %s "
"due to its presence in [openlineage] openlineage_disabled_for_operators.",
self.operator.task_type,
)
return None
return self._execute_extraction()

def extract_on_complete(self, task_instance) -> OperatorLineage | None:
Expand Down Expand Up @@ -125,13 +102,6 @@ def _execute_extraction(self) -> OperatorLineage | None:
return None

def extract_on_complete(self, task_instance) -> OperatorLineage | None:
if self._is_operator_disabled:
self.log.debug(
"Skipping extraction for operator %s "
"due to its presence in [openlineage] openlineage_disabled_for_operators.",
self.operator.task_type,
)
return None
if task_instance.state == TaskInstanceState.FAILED:
on_failed = getattr(self.operator, "get_openlineage_facets_on_failure", None)
if on_failed and callable(on_failed):
Expand Down
8 changes: 3 additions & 5 deletions airflow/providers/openlineage/extractors/bash.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@

from openlineage.client.facet import SourceCodeJobFacet

from airflow.providers.openlineage import conf
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from airflow.providers.openlineage.plugins.facets import (
UnknownOperatorAttributeRunFacet,
UnknownOperatorInstance,
)
from airflow.providers.openlineage.utils.utils import (
get_filtered_unknown_operator_keys,
is_source_enabled,
)
from airflow.providers.openlineage.utils.utils import get_filtered_unknown_operator_keys

"""
:meta private:
Expand All @@ -51,7 +49,7 @@ def get_operator_classnames(cls) -> list[str]:

def _execute_extraction(self) -> OperatorLineage | None:
job_facets: dict = {}
if is_source_enabled():
if conf.is_source_enabled():
job_facets = {
"sourceCode": SourceCodeJobFacet(
language="bash",
Expand Down
30 changes: 12 additions & 18 deletions airflow/providers/openlineage/extractors/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
# under the License.
from __future__ import annotations

import os
from contextlib import suppress
from typing import TYPE_CHECKING, Iterator

from airflow.configuration import conf
from airflow.providers.openlineage import conf
from airflow.providers.openlineage.extractors import BaseExtractor, OperatorLineage
from airflow.providers.openlineage.extractors.base import DefaultExtractor
from airflow.providers.openlineage.extractors.bash import BashExtractor
Expand Down Expand Up @@ -65,22 +64,17 @@ def __init__(self):
for operator_class in extractor.get_operator_classnames():
self.extractors[operator_class] = extractor

# Semicolon-separated extractors in Airflow configuration or OPENLINEAGE_EXTRACTORS variable.
# Extractors should implement BaseExtractor
env_extractors = conf.get("openlineage", "extractors", fallback=os.getenv("OPENLINEAGE_EXTRACTORS"))
# skip either when it's empty string or None
if env_extractors:
for extractor in env_extractors.split(";"):
extractor: type[BaseExtractor] = try_import_from_string(extractor.strip())
for operator_class in extractor.get_operator_classnames():
if operator_class in self.extractors:
self.log.debug(
"Duplicate extractor found for `%s`. `%s` will be used instead of `%s`",
operator_class,
extractor,
self.extractors[operator_class],
)
self.extractors[operator_class] = extractor
for extractor_path in conf.custom_extractors():
extractor: type[BaseExtractor] = try_import_from_string(extractor_path)
for operator_class in extractor.get_operator_classnames():
if operator_class in self.extractors:
self.log.debug(
"Duplicate extractor found for `%s`. `%s` will be used instead of `%s`",
operator_class,
extractor_path,
self.extractors[operator_class],
)
self.extractors[operator_class] = extractor

def add_extractor(self, operator_class: str, extractor: type[BaseExtractor]):
self.extractors[operator_class] = extractor
Expand Down
8 changes: 3 additions & 5 deletions airflow/providers/openlineage/extractors/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@

from openlineage.client.facet import SourceCodeJobFacet

from airflow.providers.openlineage import conf
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from airflow.providers.openlineage.plugins.facets import (
UnknownOperatorAttributeRunFacet,
UnknownOperatorInstance,
)
from airflow.providers.openlineage.utils.utils import (
get_filtered_unknown_operator_keys,
is_source_enabled,
)
from airflow.providers.openlineage.utils.utils import get_filtered_unknown_operator_keys

"""
:meta private:
Expand All @@ -55,7 +53,7 @@ def get_operator_classnames(cls) -> list[str]:
def _execute_extraction(self) -> OperatorLineage | None:
source_code = self.get_source_code(self.operator.python_callable)
job_facet: dict = {}
if is_source_enabled() and source_code:
if conf.is_source_enabled() and source_code:
job_facet = {
"sourceCode": SourceCodeJobFacet(
language="python",
Expand Down
28 changes: 9 additions & 19 deletions airflow/providers/openlineage/plugins/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# under the License.
from __future__ import annotations

import os
import uuid
from contextlib import ExitStack
from typing import TYPE_CHECKING
Expand All @@ -37,8 +36,7 @@
)
from openlineage.client.run import Job, Run, RunEvent, RunState

from airflow.configuration import conf
from airflow.providers.openlineage import __version__ as OPENLINEAGE_PROVIDER_VERSION
from airflow.providers.openlineage import __version__ as OPENLINEAGE_PROVIDER_VERSION, conf
from airflow.providers.openlineage.utils.utils import OpenLineageRedactor
from airflow.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin
Expand All @@ -48,12 +46,6 @@
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.utils.log.secrets_masker import SecretsMasker

_DAG_DEFAULT_NAMESPACE = "default"

_DAG_NAMESPACE = conf.get(
"openlineage", "namespace", fallback=os.getenv("OPENLINEAGE_NAMESPACE", _DAG_DEFAULT_NAMESPACE)
)

_PRODUCER = f"https://github.com/apache/airflow/tree/providers-openlineage/{OPENLINEAGE_PROVIDER_VERSION}"

set_producer(_PRODUCER)
Expand Down Expand Up @@ -88,33 +80,31 @@ def get_or_create_openlineage_client(self) -> OpenLineageClient:

def get_openlineage_config(self) -> dict | None:
# First, try to read from YAML file
openlineage_config_path = conf.get("openlineage", "config_path", fallback="")
openlineage_config_path = conf.config_path(check_legacy_env_var=False)
if openlineage_config_path:
config = self._read_yaml_config(openlineage_config_path)
if config:
return config.get("transport", None)
# Second, try to get transport config
transport = conf.getjson("openlineage", "transport", fallback="")
if not transport:
transport_config = conf.transport()
if not transport_config:
return None
elif not isinstance(transport, dict):
raise ValueError(f"{transport} is not a dict")
return transport
return transport_config

def _read_yaml_config(self, path: str) -> dict | None:
with open(path) as config_file:
return yaml.safe_load(config_file)

@staticmethod
def build_dag_run_id(dag_id, dag_run_id):
return str(uuid.uuid3(uuid.NAMESPACE_URL, f"{_DAG_NAMESPACE}.{dag_id}.{dag_run_id}"))
return str(uuid.uuid3(uuid.NAMESPACE_URL, f"{conf.namespace()}.{dag_id}.{dag_run_id}"))

@staticmethod
def build_task_instance_run_id(dag_id, task_id, execution_date, try_number):
return str(
uuid.uuid3(
uuid.NAMESPACE_URL,
f"{_DAG_NAMESPACE}.{dag_id}.{task_id}.{execution_date}.{try_number}",
f"{conf.namespace()}.{dag_id}.{task_id}.{execution_date}.{try_number}",
)
)

Expand Down Expand Up @@ -353,7 +343,7 @@ def _build_run(
if parent_run_id:
parent_run_facet = ParentRunFacet.create(
runId=parent_run_id,
namespace=_DAG_NAMESPACE,
namespace=conf.namespace(),
name=parent_job_name or job_name,
)
facets.update(
Expand Down Expand Up @@ -396,4 +386,4 @@ def _build_job(

facets.update({"jobType": job_type})

return Job(_DAG_NAMESPACE, job_name, facets)
return Job(conf.namespace(), job_name, facets)
Loading

0 comments on commit 9c4e333

Please sign in to comment.