Skip to content

Commit

Permalink
Add DAG bundles model (apache#44586)
Browse files Browse the repository at this point in the history
  • Loading branch information
jedcunningham authored Dec 3, 2024
1 parent 31ba41e commit 6ed71fa
Show file tree
Hide file tree
Showing 9 changed files with 2,192 additions and 1,991 deletions.
68 changes: 68 additions & 0 deletions airflow/migrations/versions/0050_3_0_0_add_dagbundlemodel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Add DagBundleModel.
Revision ID: e229247a6cb1
Revises: eed27faa34e3
Create Date: 2024-12-02 22:06:40.587330
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op
from sqlalchemy_utils import UUIDType

from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime

revision = "e229247a6cb1"
down_revision = "eed27faa34e3"
branch_labels = None
depends_on = None
airflow_version = "3.0.0"


def upgrade():
op.create_table(
"dag_bundle",
sa.Column("id", UUIDType(binary=False), nullable=False),
sa.Column("name", sa.String(length=200), nullable=False),
sa.Column("classpath", sa.String(length=1000), nullable=False),
sa.Column("kwargs", ExtendedJSON(), nullable=True),
sa.Column("refresh_interval", sa.Integer(), nullable=True),
sa.Column("latest_version", sa.String(length=200), nullable=True),
sa.Column("last_refreshed", UtcDateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint("id", name=op.f("dag_bundle_pkey")),
sa.UniqueConstraint("name", name=op.f("dag_bundle_name_uq")),
)
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.add_column(sa.Column("bundle_id", UUIDType(binary=False), nullable=True))
batch_op.add_column(sa.Column("latest_bundle_version", sa.String(length=200), nullable=True))
batch_op.create_foreign_key(batch_op.f("dag_bundle_id_fkey"), "dag_bundle", ["bundle_id"], ["id"])


def downgrade():
"""Unapply Add DagBundleModel."""
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.drop_constraint(batch_op.f("dag_bundle_id_fkey"), type_="foreignkey")
batch_op.drop_column("latest_bundle_version")
batch_op.drop_column("bundle_id")

op.drop_table("dag_bundle")
1 change: 1 addition & 0 deletions airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def import_all_models():
import airflow.models.asset
import airflow.models.backfill
import airflow.models.dag_version
import airflow.models.dagbundle
import airflow.models.dagwarning
import airflow.models.errors
import airflow.models.serialized_dag
Expand Down
4 changes: 4 additions & 0 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import backref, relationship
from sqlalchemy.sql import Select, expression
from sqlalchemy_utils import UUIDType

from airflow import settings, utils
from airflow.configuration import conf as airflow_conf, secrets_backend_list
Expand Down Expand Up @@ -2026,6 +2027,9 @@ class DagModel(Base):
fileloc = Column(String(2000))
# The base directory used by Dag Processor that parsed this dag.
processor_subdir = Column(String(2000), nullable=True)
bundle_id = Column(UUIDType(binary=False), ForeignKey("dag_bundle.id"), nullable=True)
# The version of the bundle the last time the DAG was parsed
latest_bundle_version = Column(String(200), nullable=True)
# String representing the owners
owners = Column(String(2000))
# Display name of the dag
Expand Down
73 changes: 73 additions & 0 deletions airflow/models/dagbundle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING

import uuid6
from sqlalchemy import Column, Integer, String
from sqlalchemy_utils import UUIDType

from airflow.models.base import Base
from airflow.utils.module_loading import import_string
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime

if TYPE_CHECKING:
from sqlalchemy.orm import Session

from airflow.dag_processing.bundles.base import BaseDagBundle


class DagBundleModel(Base):
"""A table for DAG Bundle config."""

__tablename__ = "dag_bundle"
id = Column(UUIDType(binary=False), primary_key=True, default=uuid6.uuid7)
name = Column(String(200), nullable=False, unique=True)
classpath = Column(String(1000), nullable=False)
kwargs = Column(ExtendedJSON, nullable=True)
refresh_interval = Column(Integer, nullable=True)
latest_version = Column(String(200), nullable=True)
last_refreshed = Column(UtcDateTime, nullable=True)

def __init__(self, *, name, classpath, kwargs, refresh_interval):
self.name = name
self.classpath = classpath
self.kwargs = kwargs
self.refresh_interval = refresh_interval

@classmethod
@provide_session
def get_all_dag_bundles(
cls, *, session: Session = NEW_SESSION
) -> list[tuple[DagBundleModel, BaseDagBundle]]:
"""
Get all DAG bundles.
:param session: A database session.
:return: list of DAG bundles.
"""
bundle_configs = session.query(cls).all()

bundles = []
for bundle_config in bundle_configs:
bundle_class = import_string(bundle_config.classpath)
bundle = bundle_class(name=bundle_config.name, **bundle_config.kwargs)
bundles.append((bundle_config, bundle))

return bundles
2 changes: 1 addition & 1 deletion airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class MappedClassProtocol(Protocol):
"2.9.2": "686269002441",
"2.10.0": "22ed7efa9da2",
"2.10.3": "5f2621c13b39",
"3.0.0": "eed27faa34e3",
"3.0.0": "e229247a6cb1",
}


Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3784bc5f7a8a5d39b5b757e6e07e5026b9c5d4fc978510f435ff64f4c9154f13
ae1ba5e05c5775211358665dc956b1e6b9793596be1c955672bbe31a285e56cb
Loading

0 comments on commit 6ed71fa

Please sign in to comment.