From d28caa763de2ac74713a6852af1b817c340ab4ee Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Sat, 14 Dec 2024 00:07:42 +0100 Subject: [PATCH] Fix offline SQL generation of migrations (#44790) * Fix offline SQL generation of migrations Offline SQL mode is broken, and this provides a fix for it, including a test to run the offline migration through the CI. Changes: - refactored the migrations to remove dependency on inspectors and sessions - Updated the minimum version of the offline migration to 2.7.0, which is the oldest migration file we have in AF 3 * fixup! Fix offline SQL generation of migrations * fixup! fixup! Fix offline SQL generation of migrations * properly handle index dropping and fix raw sql issue * sqlite lite is not supported for offline mode * remove sa.text in op.execute * check foreign key doesn't exist before creating it * Apply suggestions from code review Co-authored-by: Kaxil Naik * use procedure for mysql --------- Co-authored-by: Kaxil Naik --- .github/actions/migration_tests/action.yml | 14 + airflow/migrations/utils.py | 45 + ...schedule_dataset_alias_reference_naming.py | 265 +- .../versions/0029_3_0_0_remove_is_subdag.py | 34 +- ...6_3_0_0_add_name_field_to_dataset_model.py | 13 +- .../versions/0038_3_0_0_add_asset_active.py | 25 +- ..._0_tweak_assetaliasmodel_to_match_asset.py | 10 +- .../versions/0047_3_0_0_add_dag_versioning.py | 41 +- airflow/utils/db.py | 2 +- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 3688 ++++++++--------- 11 files changed, 2156 insertions(+), 1983 deletions(-) diff --git a/.github/actions/migration_tests/action.yml b/.github/actions/migration_tests/action.yml index ee746b7f0f930..f6a7b81a2c203 100644 --- a/.github/actions/migration_tests/action.yml +++ b/.github/actions/migration_tests/action.yml @@ -45,6 +45,20 @@ runs: COMPOSE_PROJECT_NAME: "docker-compose" env: COMPOSE_PROJECT_NAME: "docker-compose" + - name: "Bring compose down again" + shell: bash + run: breeze down + env: + COMPOSE_PROJECT_NAME: "docker-compose" + - name: "Test offline migration ${{env.BACKEND}}" + shell: bash + run: > + breeze shell "airflow db reset -y && + airflow db downgrade -n 2.7.0 -y && + airflow db migrate -s" + env: + COMPOSE_PROJECT_NAME: "docker-compose" + if: env.BACKEND != 'sqlite' - name: "Bring any containers left down" shell: bash run: breeze down diff --git a/airflow/migrations/utils.py b/airflow/migrations/utils.py index bc31c8f70c5ed..9305606873549 100644 --- a/airflow/migrations/utils.py +++ b/airflow/migrations/utils.py @@ -58,3 +58,48 @@ def disable_sqlite_fkeys(op): op.execute("PRAGMA foreign_keys=on") else: yield op + + +def mysql_drop_foreignkey_if_exists(constraint_name, table_name, op): + """Older Mysql versions do not support DROP FOREIGN KEY IF EXISTS.""" + op.execute(f""" + CREATE PROCEDURE DropForeignKeyIfExists() + BEGIN + IF EXISTS ( + SELECT 1 + FROM information_schema.TABLE_CONSTRAINTS + WHERE + CONSTRAINT_SCHEMA = DATABASE() AND + TABLE_NAME = '{table_name}' AND + CONSTRAINT_NAME = '{constraint_name}' AND + CONSTRAINT_TYPE = 'FOREIGN KEY' + ) THEN + ALTER TABLE {table_name} + DROP CONSTRAINT {constraint_name}; + ELSE + SELECT 1; + END IF; + END; + CALL DropForeignKeyIfExists(); + DROP PROCEDURE DropForeignKeyIfExists; + """) + + +def mysql_drop_index_if_exists(index_name, table_name, op): + """Older Mysql versions do not support DROP INDEX IF EXISTS.""" + op.execute(f""" + IF EXISTS ( + SELECT 1 + FROM information_schema.TABLE_CONSTRAINTS + WHERE + CONSTRAINT_SCHEMA = DATABASE() AND + TABLE_NAME = '{table_name}' AND + CONSTRAINT_NAME = '{index_name}' AND + CONSTRAINT_TYPE = 'INDEX' + ) THEN + ALTER TABLE {table_name} + DROP INDEX {index_name}; + ELSE + SELECT 1; + END IF; + """) diff --git a/airflow/migrations/versions/0027_2_10_3_fix_dag_schedule_dataset_alias_reference_naming.py b/airflow/migrations/versions/0027_2_10_3_fix_dag_schedule_dataset_alias_reference_naming.py index 8fb02d3dcf193..4395a069ad2a8 100644 --- a/airflow/migrations/versions/0027_2_10_3_fix_dag_schedule_dataset_alias_reference_naming.py +++ b/airflow/migrations/versions/0027_2_10_3_fix_dag_schedule_dataset_alias_reference_naming.py @@ -27,10 +27,12 @@ from __future__ import annotations -from typing import TYPE_CHECKING - +import sqlalchemy as sa from alembic import op -from sqlalchemy import inspect + +from airflow.migrations.utils import mysql_drop_foreignkey_if_exists +from airflow.models import ID_LEN +from airflow.utils.sqlalchemy import UtcDateTime # revision identifiers, used by Alembic. revision = "5f2621c13b39" @@ -39,91 +41,198 @@ depends_on = None airflow_version = "2.10.3" -if TYPE_CHECKING: - from alembic.operations.base import BatchOperations - from sqlalchemy.sql.elements import conv - -def _rename_fk_constraint( - *, - batch_op: BatchOperations, - original_name: str | conv, - new_name: str | conv, - referent_table: str, - local_cols: list[str], - remote_cols: list[str], - ondelete: str, -) -> None: - batch_op.drop_constraint(original_name, type_="foreignkey") - batch_op.create_foreign_key( - constraint_name=new_name, - referent_table=referent_table, - local_cols=local_cols, - remote_cols=remote_cols, - ondelete=ondelete, - ) +def mysql_create_foreignkey_if_not_exists( + constraint_name, table_name, column_name, ref_table, ref_column, op +): + op.execute(f""" + CREATE PROCEDURE create_foreign_key_if_not_exists() + BEGIN + IF EXISTS ( + SELECT 1 + FROM information_schema.TABLE_CONSTRAINTS + WHERE + CONSTRAINT_SCHEMA = DATABASE() AND + TABLE_NAME = '{table_name}' AND + CONSTRAINT_NAME = '{constraint_name}' AND + CONSTRAINT_TYPE = 'FOREIGN KEY' + ) THEN + SELECT 1; + ELSE + ALTER TABLE {table_name} + ADD CONSTRAINT {constraint_name} FOREIGN KEY ({column_name}) + REFERENCES {ref_table}({ref_column}) + ON DELETE CASCADE; + END IF; + END; + CALL create_foreign_key_if_not_exists(); + DROP PROCEDURE create_foreign_key_if_not_exists; + """) + + +def postgres_create_foreignkey_if_not_exists( + constraint_name, table_name, column_name, ref_table, ref_column, op +): + op.execute(f""" + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 + FROM information_schema.table_constraints + WHERE constraint_type = 'FOREIGN KEY' + AND constraint_name = '{constraint_name}' + ) THEN + ALTER TABLE {table_name} + ADD CONSTRAINT {constraint_name} + FOREIGN KEY ({column_name}) + REFERENCES {ref_table} ({ref_column}) + ON DELETE CASCADE; + END IF; + END $$; + """) def upgrade(): """Rename dag_schedule_dataset_alias_reference constraint.""" - with op.batch_alter_table("dag_schedule_dataset_alias_reference", schema=None) as batch_op: - bind = op.get_context().bind - insp = inspect(bind) - fk_constraints = [fk["name"] for fk in insp.get_foreign_keys("dag_schedule_dataset_alias_reference")] - - # "dsdar_dataset_alias_fkey" was the constraint name defined in the model while "dsdar_dataset_fkey" is the one - # defined in the previous migration. - # Rename this constraint name if user is using the name "dsdar_dataset_fkey". - if "dsdar_dataset_fkey" in fk_constraints: - _rename_fk_constraint( - batch_op=batch_op, - original_name="dsdar_dataset_fkey", - new_name="dsdar_dataset_alias_fkey", - referent_table="dataset_alias", - local_cols=["alias_id"], - remote_cols=["id"], + dialect = op.get_context().dialect.name + if dialect == "sqlite": + op.create_table( + "new_table", + sa.Column("alias_id", sa.Integer(), primary_key=True, nullable=False), + sa.Column("dag_id", sa.String(ID_LEN), primary_key=True, nullable=False), + sa.Column("created_at", UtcDateTime(timezone=True), nullable=False), + sa.Column("updated_at", UtcDateTime(timezone=True), nullable=False), + sa.ForeignKeyConstraint( + ("alias_id",), + ["dataset_alias.id"], + name="dsdar_dataset_alias_fkey", ondelete="CASCADE", - ) - - # "dsdar_dag_fkey" was the constraint name defined in the model while "dsdar_dag_id_fkey" is the one - # defined in the previous migration. - # Rename this constraint name if user is using the name "dsdar_dag_fkey". - if "dsdar_dag_fkey" in fk_constraints: - _rename_fk_constraint( - batch_op=batch_op, - original_name="dsdar_dag_fkey", - new_name="dsdar_dag_id_fkey", - referent_table="dataset_alias", - local_cols=["alias_id"], - remote_cols=["id"], + ), + sa.ForeignKeyConstraint( + columns=("dag_id",), + refcolumns=["dag.dag_id"], + name="dsdar_dag_id_fkey", ondelete="CASCADE", - ) + ), + sa.PrimaryKeyConstraint("alias_id", "dag_id", name="dsdar_pkey"), + ) + op.execute("INSERT INTO new_table SELECT * FROM dag_schedule_dataset_alias_reference") + op.drop_table("dag_schedule_dataset_alias_reference") + op.rename_table("new_table", "dag_schedule_dataset_alias_reference") + op.create_index( + "idx_dag_schedule_dataset_alias_reference_dag_id", + "dag_schedule_dataset_alias_reference", + ["dag_id"], + unique=False, + ) + if dialect == "postgresql": + op.execute( + "ALTER TABLE dag_schedule_dataset_alias_reference DROP CONSTRAINT IF EXISTS dsdar_dataset_fkey" + ) + op.execute( + "ALTER TABLE dag_schedule_dataset_alias_reference DROP CONSTRAINT IF EXISTS dsdar_dag_fkey" + ) + postgres_create_foreignkey_if_not_exists( + "dsdar_dataset_alias_fkey", + "dag_schedule_dataset_alias_reference", + "alias_id", + "dataset_alias", + "id", + op, + ) + postgres_create_foreignkey_if_not_exists( + "dsdar_dag_id_fkey", "dag_schedule_dataset_alias_reference", "alias_id", "dataset_alias", "id", op + ) + if dialect == "mysql": + mysql_drop_foreignkey_if_exists("dsdar_dataset_fkey", "dag_schedule_dataset_alias_reference", op) + mysql_drop_foreignkey_if_exists("dsdar_dag_fkey", "dag_schedule_dataset_alias_reference", op) + mysql_create_foreignkey_if_not_exists( + "dsdar_dataset_alias_fkey", + "dag_schedule_dataset_alias_reference", + "alias_id", + "dataset_alias", + "id", + op, + ) + mysql_create_foreignkey_if_not_exists( + "dsdar_dag_id_fkey", "dag_schedule_dataset_alias_reference", "alias_id", "dataset_alias", "id", op + ) def downgrade(): """Undo dag_schedule_dataset_alias_reference constraint rename.""" - with op.batch_alter_table("dag_schedule_dataset_alias_reference", schema=None) as batch_op: - bind = op.get_context().bind - insp = inspect(bind) - fk_constraints = [fk["name"] for fk in insp.get_foreign_keys("dag_schedule_dataset_alias_reference")] - if "dsdar_dataset_alias_fkey" in fk_constraints: - _rename_fk_constraint( - batch_op=batch_op, - original_name="dsdar_dataset_alias_fkey", - new_name="dsdar_dataset_fkey", - referent_table="dataset_alias", - local_cols=["alias_id"], - remote_cols=["id"], + dialect = op.get_context().dialect.name + if dialect == "postgresql": + op.execute( + "ALTER TABLE dag_schedule_dataset_alias_reference DROP CONSTRAINT IF EXISTS dsdar_dataset_alias_fkey" + ) + op.execute( + "ALTER TABLE dag_schedule_dataset_alias_reference DROP CONSTRAINT IF EXISTS dsdar_dag_id_fkey" + ) + postgres_create_foreignkey_if_not_exists( + "dsdar_dataset_fkey", + "dag_schedule_dataset_alias_reference", + "alias_id", + "dataset_alias", + "id", + op, + ) + postgres_create_foreignkey_if_not_exists( + "dsdar_dag_fkey", + "dag_schedule_dataset_alias_reference", + "alias_id", + "dataset_alias", + "id", + op, + ) + if dialect == "mysql": + mysql_drop_foreignkey_if_exists( + "dsdar_dataset_alias_fkey", "dag_schedule_dataset_alias_reference", op + ) + mysql_drop_foreignkey_if_exists("dsdar_dag_id_fkey", "dag_schedule_dataset_alias_reference", op) + mysql_create_foreignkey_if_not_exists( + "dsdar_dataset_fkey", + "dag_schedule_dataset_alias_reference", + "alias_id", + "dataset_alias", + "id", + op, + ) + mysql_create_foreignkey_if_not_exists( + "dsdar_dag_fkey", + "dag_schedule_dataset_alias_reference", + "alias_id", + "dataset_alias", + "id", + op, + ) + if dialect == "sqlite": + op.create_table( + "new_table", + sa.Column("alias_id", sa.Integer(), primary_key=True, nullable=False), + sa.Column("dag_id", sa.String(ID_LEN), primary_key=True, nullable=False), + sa.Column("created_at", UtcDateTime(timezone=True), nullable=False), + sa.Column("updated_at", UtcDateTime(timezone=True), nullable=False), + sa.ForeignKeyConstraint( + ("alias_id",), + ["dataset_alias.id"], + name="dsdar_dataset_fkey", ondelete="CASCADE", - ) - - if "dsdar_dag_id_fkey" in fk_constraints: - _rename_fk_constraint( - batch_op=batch_op, - original_name="dsdar_dag_id_fkey", - new_name="dsdar_dag_fkey", - referent_table="dataset_alias", - local_cols=["alias_id"], - remote_cols=["id"], + ), + sa.ForeignKeyConstraint( + columns=("dag_id",), + refcolumns=["dag.dag_id"], + name="dsdar_dag_fkey", ondelete="CASCADE", - ) + ), + sa.PrimaryKeyConstraint("alias_id", "dag_id", name="dsdar_pkey"), + ) + op.execute("INSERT INTO new_table SELECT * FROM dag_schedule_dataset_alias_reference") + op.drop_table("dag_schedule_dataset_alias_reference") + op.rename_table("new_table", "dag_schedule_dataset_alias_reference") + op.create_index( + "idx_dag_schedule_dataset_alias_reference_dag_id", + "dag_schedule_dataset_alias_reference", + ["dag_id"], + unique=False, + ) diff --git a/airflow/migrations/versions/0029_3_0_0_remove_is_subdag.py b/airflow/migrations/versions/0029_3_0_0_remove_is_subdag.py index eab9954b329ab..cb652f647d10c 100644 --- a/airflow/migrations/versions/0029_3_0_0_remove_is_subdag.py +++ b/airflow/migrations/versions/0029_3_0_0_remove_is_subdag.py @@ -40,37 +40,17 @@ airflow_version = "3.0.0" -def _column_exists(inspector, column_name): - return column_name in [col["name"] for col in inspector.get_columns("dag")] - - -def _index_exists(inspector, index_name): - return index_name in [index["name"] for index in inspector.get_indexes("dag")] - - def upgrade(): """Remove ``is_subdag`` column from DAGs table.""" - conn = op.get_bind() - inspector = sa.inspect(conn) - - with op.batch_alter_table("dag", schema=None) as batch_op: - if _index_exists(inspector, "idx_root_dag_id"): - batch_op.drop_index("idx_root_dag_id") - if _column_exists(inspector, "is_subdag"): - batch_op.drop_column("is_subdag") - if _column_exists(inspector, "root_dag_id"): - batch_op.drop_column("root_dag_id") + with op.batch_alter_table("dag") as batch_op: + batch_op.drop_column("is_subdag") + batch_op.drop_index("idx_root_dag_id") + batch_op.drop_column("root_dag_id") def downgrade(): """Add ``is_subdag`` column in DAGs table.""" - conn = op.get_bind() - inspector = sa.inspect(conn) - with op.batch_alter_table("dag", schema=None) as batch_op: - if not _column_exists(inspector, "is_subdag"): - batch_op.add_column(sa.Column("is_subdag", sa.BOOLEAN(), nullable=True)) - if not _column_exists(inspector, "root_dag_id"): - batch_op.add_column(sa.Column("root_dag_id", StringID(), nullable=True)) - if not _index_exists(inspector, "idx_root_dag_id"): - batch_op.create_index("idx_root_dag_id", ["root_dag_id"], unique=False) + batch_op.add_column(sa.Column("is_subdag", sa.BOOLEAN(), nullable=True)) + batch_op.add_column(sa.Column("root_dag_id", StringID(), nullable=True)) + batch_op.create_index("idx_root_dag_id", ["root_dag_id"], unique=False) diff --git a/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py b/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py index c3e8edfc3b928..04dcc2596ed6a 100644 --- a/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py +++ b/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py @@ -38,7 +38,6 @@ import sqlalchemy as sa from alembic import op -from sqlalchemy.orm import Session # revision identifiers, used by Alembic. revision = "0d9e73a75ee4" @@ -54,6 +53,7 @@ def upgrade(): + dialect = op.get_bind().dialect.name # Fix index name on DatasetAlias. with op.batch_alter_table("dataset_alias", schema=None) as batch_op: batch_op.drop_index("idx_name_unique") @@ -63,11 +63,12 @@ def upgrade(): batch_op.add_column(sa.Column("name", _STRING_COLUMN_TYPE)) batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE)) # Fill name from uri column, and group to 'asset'. - dataset_table = sa.table("dataset", sa.column("name"), sa.column("uri"), sa.column("group")) - with Session(bind=op.get_bind()) as session: - session.execute(sa.update(dataset_table).values(name=dataset_table.c.uri, group="asset")) - session.commit() - # Set the name and group columns non-nullable. + if dialect == "mysql": + stmt = "UPDATE dataset SET name = uri, `group` = 'asset'" + else: + stmt = "UPDATE dataset SET name = uri, \"group\" = 'asset'" + op.execute(stmt) + # Set the name column non-nullable. # Now with values in there, we can create the new unique constraint and index. # Due to MySQL restrictions, we are also reducing the length on uri. with op.batch_alter_table("dataset", schema=None) as batch_op: diff --git a/airflow/migrations/versions/0038_3_0_0_add_asset_active.py b/airflow/migrations/versions/0038_3_0_0_add_asset_active.py index 422bc440dba85..0d86f8c28d6c6 100644 --- a/airflow/migrations/versions/0038_3_0_0_add_asset_active.py +++ b/airflow/migrations/versions/0038_3_0_0_add_asset_active.py @@ -29,7 +29,6 @@ import sqlalchemy as sa from alembic import op -from sqlalchemy.orm import Session # revision identifiers, used by Alembic. revision = "5a5d66100783" @@ -59,14 +58,7 @@ def upgrade(): sa.Index("idx_asset_active_name_unique", "name", unique=True), sa.Index("idx_asset_active_uri_unique", "uri", unique=True), ) - with Session(bind=op.get_bind()) as session: - session.execute( - sa.text( - "insert into asset_active (name, uri) " - "select name, uri from dataset where is_orphaned = false" - ) - ) - session.commit() + op.execute("insert into asset_active (name, uri) select name, uri from dataset where is_orphaned = false") with op.batch_alter_table("dataset", schema=None) as batch_op: batch_op.drop_column("is_orphaned") @@ -76,13 +68,10 @@ def downgrade(): batch_op.add_column( sa.Column("is_orphaned", sa.Boolean, default=False, nullable=False, server_default="0") ) - with Session(bind=op.get_bind()) as session: - session.execute( - sa.text( - "update dataset set is_orphaned = true " - "where exists (select 1 from asset_active " - "where dataset.name = asset_active.name and dataset.uri = asset_active.uri)" - ) - ) - session.commit() + op.execute( + "update dataset set is_orphaned = true " + "where exists (select 1 from asset_active " + "where dataset.name = asset_active.name and dataset.uri = asset_active.uri)" + ) + op.drop_table("asset_active") diff --git a/airflow/migrations/versions/0039_3_0_0_tweak_assetaliasmodel_to_match_asset.py b/airflow/migrations/versions/0039_3_0_0_tweak_assetaliasmodel_to_match_asset.py index 25b4f1871938d..68442d0f7c2d5 100644 --- a/airflow/migrations/versions/0039_3_0_0_tweak_assetaliasmodel_to_match_asset.py +++ b/airflow/migrations/versions/0039_3_0_0_tweak_assetaliasmodel_to_match_asset.py @@ -41,7 +41,6 @@ import sqlalchemy as sa from alembic import op -from sqlalchemy.orm import Session # Revision identifiers, used by Alembic. revision = "fb2d4922cd79" @@ -61,10 +60,11 @@ def upgrade(): with op.batch_alter_table("dataset_alias", schema=None) as batch_op: batch_op.alter_column("name", type_=_STRING_COLUMN_TYPE, nullable=False) batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE)) - dataset_alias_table = sa.table("dataset_alias", sa.column("group")) - with Session(bind=op.get_bind()) as session: - session.execute(sa.update(dataset_alias_table).values(group="asset")) - session.commit() + if op.get_bind().dialect.name == "mysql": + stmt = "UPDATE dataset SET `group` = 'asset'" + else: + stmt = "UPDATE dataset SET \"group\" = 'asset'" + op.execute(stmt) with op.batch_alter_table("dataset_alias", schema=None) as batch_op: batch_op.alter_column("group", type_=_STRING_COLUMN_TYPE, default="asset", nullable=False) diff --git a/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py b/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py index 35234894367dd..7ef506df2ba9a 100644 --- a/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py +++ b/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py @@ -28,11 +28,15 @@ from __future__ import annotations import sqlalchemy as sa +import sqlalchemy_jsonfield from alembic import op +from sqlalchemy import LargeBinary, Table +from sqlalchemy.dialects.mysql import MEDIUMTEXT from sqlalchemy_utils import UUIDType from airflow.migrations.db_types import StringID -from airflow.models.base import naming_convention +from airflow.models.base import ID_LEN, naming_convention +from airflow.settings import json from airflow.utils import timezone from airflow.utils.sqlalchemy import UtcDateTime @@ -49,6 +53,32 @@ def _delete_serdag_and_code(): op.execute(sa.text("DELETE FROM dag_code")) +# The below tables helps us use the recreate_always feature of batch_alter_table and makes +# this migration work in offline mode. +old_dagcode_table = Table( + "dag_code", + sa.MetaData(naming_convention=naming_convention), + sa.Column("fileloc_hash", sa.BigInteger(), nullable=False, primary_key=True), + sa.Column("fileloc", sa.String(length=2000), nullable=False), + sa.Column("last_updated", UtcDateTime(), nullable=False), + sa.Column("source_code", sa.Text().with_variant(MEDIUMTEXT(), "mysql"), nullable=False), +) + +old_serialized_table = Table( + "serialized_dag", + sa.MetaData(naming_convention=naming_convention), + sa.Column("dag_id", sa.String(ID_LEN), nullable=False, primary_key=True), + sa.Column("fileloc", sa.String(length=2000), nullable=False), + sa.Column("fileloc_hash", sa.BigInteger(), nullable=False), + sa.Column("data", sqlalchemy_jsonfield.JSONField(json=json), nullable=True), + sa.Column("data_compressed", LargeBinary, nullable=True), + sa.Column("dag_hash", sa.String(32), nullable=False), + sa.Column("last_updated", UtcDateTime(), nullable=False), + sa.Column("processor_subdir", sa.String(2000)), + sa.Index("idx_fileloc_hash", "fileloc_hash", unique=False), +) + + def upgrade(): """Apply add dag versioning.""" # Before creating the dag_version table, we need to delete the existing serialized_dag and dag_code tables @@ -65,7 +95,9 @@ def upgrade(): sa.PrimaryKeyConstraint("id", name=op.f("dag_version_pkey")), sa.UniqueConstraint("dag_id", "version_number", name="dag_id_v_name_v_number_unique_constraint"), ) - with op.batch_alter_table("dag_code", recreate="always", naming_convention=naming_convention) as batch_op: + with op.batch_alter_table( + "dag_code", recreate="always", naming_convention=naming_convention, copy_from=old_dagcode_table + ) as batch_op: batch_op.drop_constraint("dag_code_pkey", type_="primary") batch_op.add_column( sa.Column("id", UUIDType(binary=False), primary_key=True), insert_before="fileloc_hash" @@ -85,7 +117,10 @@ def upgrade(): batch_op.add_column(sa.Column("dag_id", sa.String(length=250), nullable=False)) with op.batch_alter_table( - "serialized_dag", recreate="always", naming_convention=naming_convention + "serialized_dag", + recreate="always", + naming_convention=naming_convention, + copy_from=old_serialized_table, ) as batch_op: batch_op.drop_constraint("serialized_dag_pkey", type_="primary") batch_op.add_column(sa.Column("id", UUIDType(binary=False), primary_key=True)) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index fe5c4c0a7916c..3cd3c206a66d2 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -1086,7 +1086,7 @@ def _revisions_above_min_for_offline(config, revisions) -> None: dbname = settings.engine.dialect.name if dbname == "sqlite": raise SystemExit("Offline migration not supported for SQLite.") - min_version, min_revision = ("3.0.0", "22ed7efa9da2") + min_version, min_revision = ("2.7.0", "937cbd173ca1") # Check if there is history between the revisions and the start revision # This ensures that the revisions are above `min_revision` diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 10b180a41aee8..6ee5972cc1624 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -8f2fd91375c546b297490e701dc3853d7ba53c7cd1422ed7f7e57b9ac86f6eca \ No newline at end of file +ccb8ef5583b2a6b3ee3ab4212139c112b92953675655010a6775fffb4945b206 \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index bdbd97db8efc8..9c37f5c320686 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -4,11 +4,11 @@ - - + + %3 - + log @@ -143,2298 +143,2298 @@ [VARCHAR(2000)] NOT NULL - + +job + +job + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + +end_date + + [TIMESTAMP] + +executor_class + + [VARCHAR(500)] + +hostname + + [VARCHAR(500)] + +job_type + + [VARCHAR(30)] + +latest_heartbeat + + [TIMESTAMP] + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +unixname + + [VARCHAR(1000)] + + + connection - -connection - -id - - [INTEGER] - NOT NULL - -conn_id - - [VARCHAR(250)] - NOT NULL - -conn_type - - [VARCHAR(500)] - NOT NULL - -description - - [TEXT] - -extra - - [TEXT] - -host - - [VARCHAR(500)] - -is_encrypted - - [BOOLEAN] - -is_extra_encrypted - - [BOOLEAN] - -login - - [TEXT] - -password - - [TEXT] - -port - - [INTEGER] - -schema - - [VARCHAR(500)] + +connection + +id + + [INTEGER] + NOT NULL + +conn_id + + [VARCHAR(250)] + NOT NULL + +conn_type + + [VARCHAR(500)] + NOT NULL + +description + + [TEXT] + +extra + + [TEXT] + +host + + [VARCHAR(500)] + +is_encrypted + + [BOOLEAN] + +is_extra_encrypted + + [BOOLEAN] + +login + + [TEXT] + +password + + [TEXT] + +port + + [INTEGER] + +schema + + [VARCHAR(500)] - + variable - -variable - -id - - [INTEGER] - NOT NULL - -description - - [TEXT] - -is_encrypted - - [BOOLEAN] - -key - - [VARCHAR(250)] - -val - - [TEXT] + +variable + +id + + [INTEGER] + NOT NULL + +description + + [TEXT] + +is_encrypted + + [BOOLEAN] + +key + + [VARCHAR(250)] + +val + + [TEXT] - -import_error - -import_error - -id - - [INTEGER] - NOT NULL - -filename - - [VARCHAR(1024)] - -processor_subdir - - [VARCHAR(2000)] - -stacktrace - - [TEXT] - -timestamp - - [TIMESTAMP] - - -job - -job - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - -end_date - - [TIMESTAMP] - -executor_class - - [VARCHAR(500)] - -hostname - - [VARCHAR(500)] - -job_type - - [VARCHAR(30)] - -latest_heartbeat - - [TIMESTAMP] - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -unixname - - [VARCHAR(1000)] +import_error + +import_error + +id + + [INTEGER] + NOT NULL + +filename + + [VARCHAR(1024)] + +processor_subdir + + [VARCHAR(2000)] + +stacktrace + + [TEXT] + +timestamp + + [TIMESTAMP] asset_alias - -asset_alias - -id - - [INTEGER] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL + +asset_alias + +id + + [INTEGER] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL asset_alias_asset - -asset_alias_asset - -alias_id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL + +asset_alias_asset + +alias_id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL asset_alias--asset_alias_asset - -0..N -1 + +0..N +1 asset_alias_asset_event - -asset_alias_asset_event - -alias_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +asset_alias_asset_event + +alias_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_alias--asset_alias_asset_event - -0..N -1 + +0..N +1 dag_schedule_asset_alias_reference - -dag_schedule_asset_alias_reference - -alias_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_alias_reference + +alias_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset_alias--dag_schedule_asset_alias_reference - -0..N -1 + +0..N +1 asset - -asset - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -extra - - [JSON] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +extra + + [JSON] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL asset--asset_alias_asset - -0..N -1 + +0..N +1 asset_trigger - -asset_trigger - -asset_id - - [INTEGER] - NOT NULL - -trigger_id - - [INTEGER] - NOT NULL + +asset_trigger + +asset_id + + [INTEGER] + NOT NULL + +trigger_id + + [INTEGER] + NOT NULL asset--asset_trigger - -0..N -1 + +0..N +1 asset_active - -asset_active - -name - - [VARCHAR(1500)] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset_active + +name + + [VARCHAR(1500)] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL asset--asset_active - -1 -1 + +1 +1 asset--asset_active - -1 -1 + +1 +1 dag_schedule_asset_reference - -dag_schedule_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset--dag_schedule_asset_reference - -0..N -1 + +0..N +1 task_outlet_asset_reference - -task_outlet_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +task_outlet_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset--task_outlet_asset_reference - -0..N -1 + +0..N +1 asset_dag_run_queue - -asset_dag_run_queue - -asset_id - - [INTEGER] - NOT NULL - -target_dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL + +asset_dag_run_queue + +asset_id + + [INTEGER] + NOT NULL + +target_dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL asset--asset_dag_run_queue - -0..N -1 + +0..N +1 asset_event - -asset_event - -id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL - -extra - - [JSON] - NOT NULL - -source_dag_id - - [VARCHAR(250)] - -source_map_index - - [INTEGER] - -source_run_id - - [VARCHAR(250)] - -source_task_id - - [VARCHAR(250)] - -timestamp - - [TIMESTAMP] - NOT NULL + +asset_event + +id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL + +extra + + [JSON] + NOT NULL + +source_dag_id + + [VARCHAR(250)] + +source_map_index + + [INTEGER] + +source_run_id + + [VARCHAR(250)] + +source_task_id + + [VARCHAR(250)] + +timestamp + + [TIMESTAMP] + NOT NULL asset_event--asset_alias_asset_event - -0..N -1 + +0..N +1 dagrun_asset_event - -dagrun_asset_event - -dag_run_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +dagrun_asset_event + +dag_run_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_event--dagrun_asset_event - -0..N -1 + +0..N +1 trigger - -trigger - -id - - [INTEGER] - NOT NULL - -classpath - - [VARCHAR(1000)] - NOT NULL - -created_date - - [TIMESTAMP] - NOT NULL - -kwargs - - [TEXT] - NOT NULL - -triggerer_id - - [INTEGER] + +trigger + +id + + [INTEGER] + NOT NULL + +classpath + + [VARCHAR(1000)] + NOT NULL + +created_date + + [TIMESTAMP] + NOT NULL + +kwargs + + [TEXT] + NOT NULL + +triggerer_id + + [INTEGER] trigger--asset_trigger - -0..N -1 + +0..N +1 task_instance - -task_instance - -id - - [UUID] - NOT NULL - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -last_heartbeat_at - - [TIMESTAMP] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSON] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance + +id + + [UUID] + NOT NULL + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +last_heartbeat_at + + [TIMESTAMP] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSON] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] trigger--task_instance - -0..N -{0,1} + +0..N +{0,1} task_reschedule - -task_reschedule - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -duration - - [INTEGER] - NOT NULL - -end_date - - [TIMESTAMP] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -reschedule_date - - [TIMESTAMP] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -try_number - - [INTEGER] - NOT NULL + +task_reschedule + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +duration + + [INTEGER] + NOT NULL + +end_date + + [TIMESTAMP] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +reschedule_date + + [TIMESTAMP] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +try_number + + [INTEGER] + NOT NULL task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 task_instance--task_reschedule - -0..N -1 + +0..N +1 rendered_task_instance_fields - -rendered_task_instance_fields - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -k8s_pod_yaml - - [JSON] - -rendered_fields - - [JSON] - NOT NULL + +rendered_task_instance_fields + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +k8s_pod_yaml + + [JSON] + +rendered_fields + + [JSON] + NOT NULL task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_map - -task_map - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -keys - - [JSON] - -length - - [INTEGER] - NOT NULL + +task_map + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +keys + + [JSON] + +length + + [INTEGER] + NOT NULL task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 task_instance--task_map - -0..N -1 + +0..N +1 xcom - -xcom - -dag_run_id - - [INTEGER] - NOT NULL - -key - - [VARCHAR(512)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL - -value - - [JSONB] + +xcom + +dag_run_id + + [INTEGER] + NOT NULL + +key + + [VARCHAR(512)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL + +value + + [JSONB] task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance--xcom - -0..N -1 + +0..N +1 task_instance_note - -task_instance_note - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +task_instance_note + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance--task_instance_note - -0..N -1 + +0..N +1 task_instance_history - -task_instance_history - -id - - [INTEGER] - NOT NULL - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSON] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - NOT NULL - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance_history + +id + + [INTEGER] + NOT NULL + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSON] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + NOT NULL + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 task_instance--task_instance_history - -0..N -1 + +0..N +1 dag_bundle - -dag_bundle - -id - - [UUID] - NOT NULL - -classpath - - [VARCHAR(1000)] - NOT NULL - -kwargs - - [JSON] - -last_refreshed - - [TIMESTAMP] - -latest_version - - [VARCHAR(200)] - -name - - [VARCHAR(250)] - NOT NULL - -refresh_interval - - [INTEGER] + +dag_bundle + +id + + [UUID] + NOT NULL + +classpath + + [VARCHAR(1000)] + NOT NULL + +kwargs + + [JSON] + +last_refreshed + + [TIMESTAMP] + +latest_version + + [VARCHAR(200)] + +name + + [VARCHAR(250)] + NOT NULL + +refresh_interval + + [INTEGER] dag - -dag - -dag_id - - [VARCHAR(250)] - NOT NULL - -asset_expression - - [JSON] - -bundle_id - - [UUID] - -dag_display_name - - [VARCHAR(2000)] - -default_view - - [VARCHAR(25)] - -description - - [TEXT] - -fileloc - - [VARCHAR(2000)] - -has_import_errors - - [BOOLEAN] - -has_task_concurrency_limits - - [BOOLEAN] - NOT NULL - -is_active - - [BOOLEAN] - -is_paused - - [BOOLEAN] - -last_expired - - [TIMESTAMP] - -last_parsed_time - - [TIMESTAMP] - -latest_bundle_version - - [VARCHAR(200)] - -max_active_runs - - [INTEGER] - -max_active_tasks - - [INTEGER] - NOT NULL - -max_consecutive_failed_dag_runs - - [INTEGER] - NOT NULL - -next_dagrun - - [TIMESTAMP] - -next_dagrun_create_after - - [TIMESTAMP] - -next_dagrun_data_interval_end - - [TIMESTAMP] - -next_dagrun_data_interval_start - - [TIMESTAMP] - -owners - - [VARCHAR(2000)] - -processor_subdir - - [VARCHAR(2000)] - -timetable_description - - [VARCHAR(1000)] - -timetable_summary - - [TEXT] + +dag + +dag_id + + [VARCHAR(250)] + NOT NULL + +asset_expression + + [JSON] + +bundle_id + + [UUID] + +dag_display_name + + [VARCHAR(2000)] + +default_view + + [VARCHAR(25)] + +description + + [TEXT] + +fileloc + + [VARCHAR(2000)] + +has_import_errors + + [BOOLEAN] + +has_task_concurrency_limits + + [BOOLEAN] + NOT NULL + +is_active + + [BOOLEAN] + +is_paused + + [BOOLEAN] + +last_expired + + [TIMESTAMP] + +last_parsed_time + + [TIMESTAMP] + +latest_bundle_version + + [VARCHAR(200)] + +max_active_runs + + [INTEGER] + +max_active_tasks + + [INTEGER] + NOT NULL + +max_consecutive_failed_dag_runs + + [INTEGER] + NOT NULL + +next_dagrun + + [TIMESTAMP] + +next_dagrun_create_after + + [TIMESTAMP] + +next_dagrun_data_interval_end + + [TIMESTAMP] + +next_dagrun_data_interval_start + + [TIMESTAMP] + +owners + + [VARCHAR(2000)] + +processor_subdir + + [VARCHAR(2000)] + +timetable_description + + [VARCHAR(1000)] + +timetable_summary + + [TEXT] dag_bundle--dag - -0..N -{0,1} + +0..N +{0,1} dag--dag_schedule_asset_alias_reference - -0..N -1 + +0..N +1 dag--dag_schedule_asset_reference - -0..N -1 + +0..N +1 dag--task_outlet_asset_reference - -0..N -1 + +0..N +1 dag--asset_dag_run_queue - -0..N -1 + +0..N +1 dag_version - -dag_version - -id - - [UUID] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -version_number - - [INTEGER] - NOT NULL + +dag_version + +id + + [UUID] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +version_number + + [INTEGER] + NOT NULL dag--dag_version - -0..N -1 + +0..N +1 dag_tag - -dag_tag - -dag_id - - [VARCHAR(250)] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL + +dag_tag + +dag_id + + [VARCHAR(250)] + NOT NULL + +name + + [VARCHAR(100)] + NOT NULL dag--dag_tag - -0..N -1 + +0..N +1 dag_owner_attributes - -dag_owner_attributes - -dag_id - - [VARCHAR(250)] - NOT NULL - -owner - - [VARCHAR(500)] - NOT NULL - -link - - [VARCHAR(500)] - NOT NULL + +dag_owner_attributes + +dag_id + + [VARCHAR(250)] + NOT NULL + +owner + + [VARCHAR(500)] + NOT NULL + +link + + [VARCHAR(500)] + NOT NULL dag--dag_owner_attributes - -0..N -1 + +0..N +1 dag_warning - -dag_warning - -dag_id - - [VARCHAR(250)] - NOT NULL - -warning_type - - [VARCHAR(50)] - NOT NULL - -message - - [TEXT] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL + +dag_warning + +dag_id + + [VARCHAR(250)] + NOT NULL + +warning_type + + [VARCHAR(50)] + NOT NULL + +message + + [TEXT] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL dag--dag_warning - -0..N -1 + +0..N +1 dag_version--task_instance - -0..N -{0,1} + +0..N +{0,1} - + -dag_code - -dag_code - -id - - [UUID] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - NOT NULL - -fileloc - - [VARCHAR(2000)] - NOT NULL - -last_updated - - [TIMESTAMP] - NOT NULL - -source_code - - [TEXT] - NOT NULL - -source_code_hash - - [VARCHAR(32)] - NOT NULL +dag_run + +dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + +clear_number + + [INTEGER] + NOT NULL + +conf + + [BYTEA] + +creating_job_id + + [INTEGER] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +data_interval_end + + [TIMESTAMP] + +data_interval_start + + [TIMESTAMP] + +end_date + + [TIMESTAMP] + +external_trigger + + [BOOLEAN] + +last_scheduling_decision + + [TIMESTAMP] + +log_template_id + + [INTEGER] + +logical_date + + [TIMESTAMP] + NOT NULL + +queued_at + + [TIMESTAMP] + +run_id + + [VARCHAR(250)] + NOT NULL + +run_type + + [VARCHAR(50)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(50)] + +triggered_by + + [VARCHAR(50)] + +updated_at + + [TIMESTAMP] - + -dag_version--dag_code - -0..N -1 +dag_version--dag_run + +0..N +{0,1} - + -dag_run - -dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - -clear_number - - [INTEGER] - NOT NULL - -conf - - [BYTEA] - -creating_job_id - - [INTEGER] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -data_interval_end - - [TIMESTAMP] - -data_interval_start - - [TIMESTAMP] - -end_date - - [TIMESTAMP] - -external_trigger - - [BOOLEAN] - -last_scheduling_decision - - [TIMESTAMP] - -log_template_id - - [INTEGER] - -logical_date - - [TIMESTAMP] - NOT NULL - -queued_at - - [TIMESTAMP] - -run_id - - [VARCHAR(250)] - NOT NULL - -run_type - - [VARCHAR(50)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(50)] - -triggered_by - - [VARCHAR(50)] - -updated_at - - [TIMESTAMP] +dag_code + +dag_code + +id + + [UUID] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + NOT NULL + +fileloc + + [VARCHAR(2000)] + NOT NULL + +last_updated + + [TIMESTAMP] + NOT NULL + +source_code + + [TEXT] + NOT NULL + +source_code_hash + + [VARCHAR(32)] + NOT NULL - + -dag_version--dag_run - -0..N -{0,1} +dag_version--dag_code + +0..N +1 serialized_dag - -serialized_dag - -id - - [UUID] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -dag_hash - - [VARCHAR(32)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - NOT NULL - -data - - [JSON] - -data_compressed - - [BYTEA] - -processor_subdir - - [VARCHAR(2000)] + +serialized_dag + +id + + [UUID] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +dag_hash + + [VARCHAR(32)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + NOT NULL + +data + + [JSON] + +data_compressed + + [BYTEA] + +processor_subdir + + [VARCHAR(2000)] dag_version--serialized_dag - -0..N -1 + +0..N +1 dag_run--dagrun_asset_event - -0..N -1 + +0..N +1 dag_run--task_instance - -0..N -1 + +0..N +1 dag_run--task_instance - -0..N -1 + +0..N +1 backfill_dag_run - -backfill_dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - NOT NULL - -dag_run_id - - [INTEGER] - -exception_reason - - [VARCHAR(250)] - -logical_date - - [TIMESTAMP] - NOT NULL - -sort_ordinal - - [INTEGER] - NOT NULL + +backfill_dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + NOT NULL + +dag_run_id + + [INTEGER] + +exception_reason + + [VARCHAR(250)] + +logical_date + + [TIMESTAMP] + NOT NULL + +sort_ordinal + + [INTEGER] + NOT NULL dag_run--backfill_dag_run - -0..N -{0,1} + +0..N +{0,1} dag_run_note - -dag_run_note - -dag_run_id - - [INTEGER] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +dag_run_note + +dag_run_id + + [INTEGER] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] dag_run--dag_run_note - -1 -1 + +1 +1 dag_run--task_reschedule - -0..N -1 + +0..N +1 dag_run--task_reschedule - -0..N -1 + +0..N +1 log_template - -log_template - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -elasticsearch_id - - [TEXT] - NOT NULL - -filename - - [TEXT] - NOT NULL + +log_template + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +elasticsearch_id + + [TEXT] + NOT NULL + +filename + + [TEXT] + NOT NULL log_template--dag_run - -0..N -{0,1} + +0..N +{0,1} backfill - -backfill - -id - - [INTEGER] - NOT NULL - -completed_at - - [TIMESTAMP] - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_run_conf - - [JSON] - NOT NULL - -from_date - - [TIMESTAMP] - NOT NULL - -is_paused - - [BOOLEAN] - -max_active_runs - - [INTEGER] - NOT NULL - -reprocess_behavior - - [VARCHAR(250)] - NOT NULL - -to_date - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +backfill + +id + + [INTEGER] + NOT NULL + +completed_at + + [TIMESTAMP] + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_run_conf + + [JSON] + NOT NULL + +from_date + + [TIMESTAMP] + NOT NULL + +is_paused + + [BOOLEAN] + +max_active_runs + + [INTEGER] + NOT NULL + +reprocess_behavior + + [VARCHAR(250)] + NOT NULL + +to_date + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL backfill--dag_run - -0..N -{0,1} + +0..N +{0,1} backfill--backfill_dag_run - -0..N -1 + +0..N +1 session - -session - -id - - [INTEGER] - NOT NULL - -data - - [BYTEA] - -expiry - - [TIMESTAMP] - -session_id - - [VARCHAR(255)] + +session + +id + + [INTEGER] + NOT NULL + +data + + [BYTEA] + +expiry + + [TIMESTAMP] + +session_id + + [VARCHAR(255)] alembic_version - -alembic_version - -version_num - - [VARCHAR(32)] - NOT NULL + +alembic_version + +version_num + + [VARCHAR(32)] + NOT NULL ab_user - -ab_user - -id - - [INTEGER] - NOT NULL - -active - - [BOOLEAN] - -changed_by_fk - - [INTEGER] - -changed_on - - [TIMESTAMP] - -created_by_fk - - [INTEGER] - -created_on - - [TIMESTAMP] - -email - - [VARCHAR(512)] - NOT NULL - -fail_login_count - - [INTEGER] - -first_name - - [VARCHAR(256)] - NOT NULL - -last_login - - [TIMESTAMP] - -last_name - - [VARCHAR(256)] - NOT NULL - -login_count - - [INTEGER] - -password - - [VARCHAR(256)] - -username - - [VARCHAR(512)] - NOT NULL + +ab_user + +id + + [INTEGER] + NOT NULL + +active + + [BOOLEAN] + +changed_by_fk + + [INTEGER] + +changed_on + + [TIMESTAMP] + +created_by_fk + + [INTEGER] + +created_on + + [TIMESTAMP] + +email + + [VARCHAR(512)] + NOT NULL + +fail_login_count + + [INTEGER] + +first_name + + [VARCHAR(256)] + NOT NULL + +last_login + + [TIMESTAMP] + +last_name + + [VARCHAR(256)] + NOT NULL + +login_count + + [INTEGER] + +password + + [VARCHAR(256)] + +username + + [VARCHAR(512)] + NOT NULL ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} ab_user_role - -ab_user_role - -id - - [INTEGER] - NOT NULL - -role_id - - [INTEGER] - -user_id - - [INTEGER] + +ab_user_role + +id + + [INTEGER] + NOT NULL + +role_id + + [INTEGER] + +user_id + + [INTEGER] ab_user--ab_user_role - -0..N -{0,1} + +0..N +{0,1} ab_register_user - -ab_register_user - -id - - [INTEGER] - NOT NULL - -email - - [VARCHAR(512)] - NOT NULL - -first_name - - [VARCHAR(256)] - NOT NULL - -last_name - - [VARCHAR(256)] - NOT NULL - -password - - [VARCHAR(256)] - -registration_date - - [TIMESTAMP] - -registration_hash - - [VARCHAR(256)] - -username - - [VARCHAR(512)] - NOT NULL + +ab_register_user + +id + + [INTEGER] + NOT NULL + +email + + [VARCHAR(512)] + NOT NULL + +first_name + + [VARCHAR(256)] + NOT NULL + +last_name + + [VARCHAR(256)] + NOT NULL + +password + + [VARCHAR(256)] + +registration_date + + [TIMESTAMP] + +registration_hash + + [VARCHAR(256)] + +username + + [VARCHAR(512)] + NOT NULL ab_permission - -ab_permission - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL + +ab_permission + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(100)] + NOT NULL ab_permission_view - -ab_permission_view - -id - - [INTEGER] - NOT NULL - -permission_id - - [INTEGER] - -view_menu_id - - [INTEGER] + +ab_permission_view + +id + + [INTEGER] + NOT NULL + +permission_id + + [INTEGER] + +view_menu_id + + [INTEGER] ab_permission--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} ab_permission_view_role - -ab_permission_view_role - -id - - [INTEGER] - NOT NULL - -permission_view_id - - [INTEGER] - -role_id - - [INTEGER] + +ab_permission_view_role + +id + + [INTEGER] + NOT NULL + +permission_view_id + + [INTEGER] + +role_id + + [INTEGER] ab_permission_view--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} ab_view_menu - -ab_view_menu - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(250)] - NOT NULL + +ab_view_menu + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(250)] + NOT NULL ab_view_menu--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} ab_role - -ab_role - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(64)] - NOT NULL + +ab_role + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(64)] + NOT NULL ab_role--ab_user_role - -0..N -{0,1} + +0..N +{0,1} ab_role--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} alembic_version_fab - -alembic_version_fab - -version_num - - [VARCHAR(32)] - NOT NULL + +alembic_version_fab + +version_num + + [VARCHAR(32)] + NOT NULL