Skip to content

Commit

Permalink
Update trigger_timeout column in task_instance table to use UtcDateTi…
Browse files Browse the repository at this point in the history
…me (apache#44507)

* update task instance trigger timeout to utcdatetime

* update task instance trigger timeout to utcdatetime

* update task instance trigger timeout to utcdatetime

* updatng task_instance file

* update migration desc

* fixing execution api test

* adding migration condition to run only for postgresql

* fix static check

* remove postgresql condition
  • Loading branch information
vatsrahul1001 authored Dec 12, 2024
1 parent 3e2b5e2 commit b7df463
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
update trigger_timeout column in task_instance table to UTC.
Revision ID: 038dc8bc6284
Revises: e229247a6cb1
Create Date: 2024-11-30 10:47:17.542690
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

from airflow.migrations.db_types import TIMESTAMP

# revision identifiers, used by Alembic.
revision = "038dc8bc6284"
down_revision = "e229247a6cb1"
branch_labels = None
depends_on = None
airflow_version = "3.0.0"


def upgrade():
"""Apply update task instance trigger timeout to utcdatetime."""
with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.alter_column(
"trigger_timeout",
existing_type=sa.DateTime(),
type_=TIMESTAMP(timezone=True),
existing_nullable=True,
)


def downgrade():
"""Unapply update task instance trigger timeout to utcdatetime."""
with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.alter_column(
"trigger_timeout",
existing_type=TIMESTAMP(timezone=True),
type_=sa.DateTime(),
existing_nullable=True,
)
7 changes: 2 additions & 5 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1715,11 +1715,8 @@ class TaskInstance(Base, LoggingMixin):
# The trigger to resume on if we are in state DEFERRED
trigger_id = Column(Integer)

# Optional timeout datetime for the trigger (past this, we'll fail)
trigger_timeout = Column(DateTime)
# The trigger_timeout should be TIMESTAMP(using UtcDateTime) but for ease of
# migration, we are keeping it as DateTime pending a change where expensive
# migration is inevitable.
# Optional timeout utcdatetime for the trigger (past this, we'll fail)
trigger_timeout = Column(UtcDateTime)

# The method to call next, and any extra arguments to pass to it.
# Usually used when resuming from DEFERRED.
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class MappedClassProtocol(Protocol):
"2.9.2": "686269002441",
"2.10.0": "22ed7efa9da2",
"2.10.3": "5f2621c13b39",
"3.0.0": "e229247a6cb1",
"3.0.0": "038dc8bc6284",
}


Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
95b59583f58fa1dad1ae07cac699b0a72143abae9beaa33f79c3ff0f24e52c7d
8f2fd91375c546b297490e701dc3853d7ba53c7cd1422ed7f7e57b9ac86f6eca
4 changes: 3 additions & 1 deletion docs/apache-airflow/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
| ``e229247a6cb1`` (head) | ``eed27faa34e3`` | ``3.0.0`` | Add DagBundleModel. |
| ``038dc8bc6284`` (head) | ``e229247a6cb1`` | ``3.0.0`` | update trigger_timeout column in task_instance table to UTC. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``e229247a6cb1`` | ``eed27faa34e3`` | ``3.0.0`` | Add DagBundleModel. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``eed27faa34e3`` | ``9fc3fc5de720`` | ``3.0.0`` | Remove pickled data from xcom table. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
Expand Down
4 changes: 2 additions & 2 deletions tests/api_fastapi/execution_api/routes/test_task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from __future__ import annotations

from datetime import datetime
from unittest import mock

import pytest
Expand Down Expand Up @@ -234,8 +235,7 @@ def test_ti_update_state_to_deferred(self, client, session, create_task_instance
assert tis[0].state == TaskInstanceState.DEFERRED
assert tis[0].next_method == "execute_callback"
assert tis[0].next_kwargs == {"key": "value"}
# TODO: Make TI.trigger_timeout a UtcDateTime instead of DateTime
assert tis[0].trigger_timeout == timezone.datetime(2024, 11, 23).replace(tzinfo=None)
assert tis[0].trigger_timeout == timezone.make_aware(datetime(2024, 11, 23), timezone=timezone.utc)

t = session.query(Trigger).all()
assert len(t) == 1
Expand Down

0 comments on commit b7df463

Please sign in to comment.