diff --git a/airflow/providers/openlineage/utils/utils.py b/airflow/providers/openlineage/utils/utils.py index ed217bdc7aa23..dd9be69d50203 100644 --- a/airflow/providers/openlineage/utils/utils.py +++ b/airflow/providers/openlineage/utils/utils.py @@ -150,7 +150,7 @@ def _cast_basic_types(value): return value.isoformat() if isinstance(value, datetime.timedelta): return f"{value.total_seconds()} seconds" - if isinstance(value, (set, list, tuple)): + if isinstance(value, (set, tuple)): return str(list(value)) return value @@ -214,6 +214,12 @@ class TaskInstanceInfo(InfoJsonEncodable): } +class DatasetInfo(InfoJsonEncodable): + """Defines encoding Airflow Dataset object to JSON.""" + + includes = ["uri", "extra"] + + class TaskInfo(InfoJsonEncodable): """Defines encoding BaseOperator/AbstractOperator object to JSON.""" @@ -242,6 +248,9 @@ class TaskInfo(InfoJsonEncodable): "run_as_user", "sla", "task_id", + "trigger_dag_id", + "external_dag_id", + "external_task_id", "trigger_rule", "upstream_task_ids", "wait_for_downstream", @@ -255,6 +264,8 @@ class TaskInfo(InfoJsonEncodable): if hasattr(task, "task_group") and getattr(task.task_group, "_group_id", None) else None ), + "inlets": lambda task: [DatasetInfo(inlet) for inlet in task.inlets], + "outlets": lambda task: [DatasetInfo(outlet) for outlet in task.outlets], } diff --git a/tests/providers/openlineage/plugins/test_utils.py b/tests/providers/openlineage/plugins/test_utils.py index d3e54504edc72..016bb99d4e6b0 100644 --- a/tests/providers/openlineage/plugins/test_utils.py +++ b/tests/providers/openlineage/plugins/test_utils.py @@ -117,6 +117,32 @@ class Test: } +def test_info_json_encodable_list_does_not_flatten(): + class TestInfo(InfoJsonEncodable): + includes = ["alist"] + + @define(slots=False) + class Test: + alist: list[str] + + obj = Test(["a", "b", "c"]) + + assert json.loads(json.dumps(TestInfo(obj))) == {"alist": ["a", "b", "c"]} + + +def test_info_json_encodable_list_does_include_nonexisting(): + class TestInfo(InfoJsonEncodable): + includes = ["exists", "doesnotexist"] + + @define(slots=False) + class Test: + exists: str + + obj = Test("something") + + assert json.loads(json.dumps(TestInfo(obj))) == {"exists": "something"} + + def test_is_name_redactable(): class NotMixin: def __init__(self):