Skip to content

Commit

Permalink
Add task SLA and queued datetime information to AirflowRunFacet (apac…
Browse files Browse the repository at this point in the history
…he#40091)

Signed-off-by: Kacper Muda <[email protected]>
  • Loading branch information
kacpermuda authored Jun 6, 2024
1 parent 0776bdb commit 1a61303
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
7 changes: 7 additions & 0 deletions airflow/providers/openlineage/facets/AirflowRunFacet.json
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@
"run_as_user": {
"type": "string"
},
"sla": {
"type": "number"
},
"task_id": {
"type": "string"
},
Expand Down Expand Up @@ -198,6 +201,10 @@
},
"try_number": {
"type": "integer"
},
"queued_dttm": {
"type": "string",
"format": "date-time"
}
},
"additionalProperties": true,
Expand Down
5 changes: 4 additions & 1 deletion airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ def __init__(self, obj):
def _cast_basic_types(value):
if isinstance(value, datetime.datetime):
return value.isoformat()
if isinstance(value, datetime.timedelta):
return f"{value.total_seconds()} seconds"
if isinstance(value, (set, list, tuple)):
return str(list(value))
return value
Expand Down Expand Up @@ -201,7 +203,7 @@ class DagRunInfo(InfoJsonEncodable):
class TaskInstanceInfo(InfoJsonEncodable):
"""Defines encoding TaskInstance object to JSON."""

includes = ["duration", "try_number", "pool"]
includes = ["duration", "try_number", "pool", "queued_dttm"]
casts = {
"map_index": lambda ti: (
ti.map_index if hasattr(ti, "map_index") and getattr(ti, "map_index") != -1 else None
Expand Down Expand Up @@ -235,6 +237,7 @@ class TaskInfo(InfoJsonEncodable):
"retries",
"retry_exponential_backoff",
"run_as_user",
"sla",
"task_id",
"trigger_rule",
"upstream_task_ids",
Expand Down

0 comments on commit 1a61303

Please sign in to comment.