diff --git a/astronomer_starship/__init__.py b/astronomer_starship/__init__.py
index 0741d2a..d031897 100644
--- a/astronomer_starship/__init__.py
+++ b/astronomer_starship/__init__.py
@@ -1,4 +1,4 @@
-__version__ = "2.1.0"
+__version__ = "2.2.0"
def get_provider_info():
diff --git a/astronomer_starship/compat/starship_compatability.py b/astronomer_starship/compat/starship_compatability.py
index 413e595..f6fa864 100644
--- a/astronomer_starship/compat/starship_compatability.py
+++ b/astronomer_starship/compat/starship_compatability.py
@@ -1,6 +1,7 @@
import json
+import logging
import os
-from flask import jsonify
+from flask import jsonify, Response
from sqlalchemy.orm import Session
from typing import TYPE_CHECKING
@@ -12,6 +13,9 @@
import pytz
+logger = logging.getLogger(__name__)
+
+
def get_from_request(args, json, key, required: bool = False) -> "Any":
val = json.get(key, args.get(key))
if val is None and required:
@@ -146,6 +150,24 @@ def generic_set_one(session: Session, qualname: str, attrs: dict, **kwargs):
raise e
+def generic_delete(session: Session, qualname: str, **kwargs) -> Response:
+ from http import HTTPStatus
+ from sqlalchemy import delete
+
+ (_, thing_cls) = import_from_qualname(qualname)
+
+ try:
+ filters = [getattr(thing_cls, attr) == val for attr, val in kwargs.items()]
+ deleted_rows = session.execute(delete(thing_cls).where(*filters)).rowcount
+ session.commit()
+ logger.info(f"Deleted {deleted_rows} rows for table {qualname}")
+ return Response(status=HTTPStatus.NO_CONTENT)
+ except Exception as e:
+ logger.error(f"Error deleting row(s) for table {qualname}: {e}")
+ session.rollback()
+ raise e
+
+
def get_test_data(attrs: dict, method: "Union[str, None]" = None) -> "Dict[str, Any]":
"""
>>> get_test_data(method="POST", attrs={"key": {"attr": "key", "methods": [("POST", True)], "test_value": "key"}})
@@ -195,10 +217,21 @@ def set_env_vars(cls):
res.status_code = 409
raise NotImplementedError()
+ @classmethod
+ def delete_env_vars(cls):
+ """This is not possible to do via API, so return an error"""
+ res = jsonify({"error": "Not implemented"})
+ res.status_code = 405
+ raise NotImplementedError()
+
@classmethod
def variable_attrs(cls) -> "Dict[str, AttrDesc]":
return {
- "key": {"attr": "key", "methods": [("POST", True)], "test_value": "key"},
+ "key": {
+ "attr": "key",
+ "methods": [("POST", True), ("DELETE", True)],
+ "test_value": "key",
+ },
"val": {"attr": "val", "methods": [("POST", True)], "test_value": "val"},
"description": {
"attr": "description",
@@ -217,12 +250,16 @@ def set_variable(self, **kwargs):
self.session, "airflow.models.Variable", self.variable_attrs(), **kwargs
)
+ def delete_variable(self, **kwargs):
+ attrs = {self.variable_attrs()[k]["attr"]: v for k, v in kwargs.items()}
+ return generic_delete(self.session, "airflow.models.Variable", **attrs)
+
@classmethod
def pool_attrs(cls) -> "Dict[str, AttrDesc]":
return {
"name": {
"attr": "pool",
- "methods": [("POST", True)],
+ "methods": [("POST", True), ("DELETE", True)],
"test_value": "test_name",
},
"slots": {"attr": "slots", "methods": [("POST", True)], "test_value": 1},
@@ -241,12 +278,20 @@ def set_pool(self, **kwargs):
self.session, "airflow.models.Pool", self.pool_attrs(), **kwargs
)
+ def delete_pool(self, **kwargs):
+ attrs = {
+ self.pool_attrs()[k]["attr"]: v
+ for k, v in kwargs.items()
+ if k in self.pool_attrs()
+ }
+ return generic_delete(self.session, "airflow.models.Pool", **attrs)
+
@classmethod
def connection_attrs(cls) -> "Dict[str, AttrDesc]":
return {
"conn_id": {
"attr": "conn_id",
- "methods": [("POST", True)],
+ "methods": [("POST", True), ("DELETE", True)],
"test_value": "conn_id",
},
"conn_type": {
@@ -301,6 +346,10 @@ def set_connection(self, **kwargs):
self.session, "airflow.models.Connection", self.connection_attrs(), **kwargs
)
+ def delete_connection(self, **kwargs):
+ attrs = {self.connection_attrs()[k]["attr"]: v for k, v in kwargs.items()}
+ return generic_delete(self.session, "airflow.models.Connection", **attrs)
+
@classmethod
def dag_attrs(cls) -> "Dict[str, AttrDesc]":
return {
@@ -442,7 +491,7 @@ def dag_runs_attrs(cls) -> "Dict[str, AttrDesc]":
return {
"dag_id": {
"attr": "dag_id",
- "methods": [("GET", True)],
+ "methods": [("GET", True), ("DELETE", True)],
"test_value": "dag_0",
},
# Limit is the number of rows to return.
@@ -591,6 +640,10 @@ def set_dag_runs(self, dag_runs: list):
dag_runs = self.insert_directly("dag_run", dag_runs)
return {"dag_runs": dag_runs, "dag_run_count": self._get_dag_run_count(dag_id)}
+ def delete_dag_runs(self, **kwargs):
+ attrs = {self.dag_runs_attrs()[k]["attr"]: v for k, v in kwargs.items()}
+ return generic_delete(self.session, "airflow.models.DagRun", **attrs)
+
@classmethod
def task_instances_attrs(cls) -> "Dict[str, AttrDesc]":
epoch = datetime.datetime(1970, 1, 1, 0, 0)
@@ -600,7 +653,7 @@ def task_instances_attrs(cls) -> "Dict[str, AttrDesc]":
return {
"dag_id": {
"attr": "dag_id",
- "methods": [("GET", True)],
+ "methods": [("GET", True), ("DELETE", True)],
"test_value": "dag_0",
},
# Limit is the number of rows to return.
@@ -853,6 +906,10 @@ def set_task_instances(self, task_instances: list):
task_instances = self.insert_directly("task_instance", task_instances)
return {"task_instances": task_instances}
+ def delete_task_instances(self, **kwargs):
+ attrs = {self.task_instances_attrs()[k]["attr"]: v for k, v in kwargs.items()}
+ return generic_delete(self.session, "airflow.models.TaskInstance", **attrs)
+
def insert_directly(self, table_name, items):
from sqlalchemy.exc import InvalidRequestError
from sqlalchemy import MetaData
diff --git a/astronomer_starship/src/component/MigrateButton.jsx b/astronomer_starship/src/component/MigrateButton.jsx
index a68644c..08829b6 100644
--- a/astronomer_starship/src/component/MigrateButton.jsx
+++ b/astronomer_starship/src/component/MigrateButton.jsx
@@ -2,11 +2,16 @@
import React, { useState } from 'react';
import axios from 'axios';
import { Button, useToast } from '@chakra-ui/react';
-import { MdErrorOutline } from 'react-icons/md';
-import { FaCheck } from 'react-icons/fa';
+import { MdErrorOutline, MdDeleteForever } from 'react-icons/md';
import { GoUpload } from 'react-icons/go';
import PropTypes from 'prop-types';
+function checkStatus(status, exists) {
+ if (status === 204)
+ return false;
+ return status === 200 || exists;
+}
+
export default function MigrateButton({
route, headers, existsInRemote, sendData, isDisabled,
}) {
@@ -16,13 +21,23 @@ export default function MigrateButton({
const [exists, setExists] = useState(existsInRemote);
function handleClick() {
setLoading(true);
- axios.post(route, sendData, { headers })
+ axios({
+ method: exists ? 'delete' : 'post',
+ url: route,
+ headers,
+ params: sendData,
+ })
.then((res) => {
setLoading(false);
- setExists(res.status === 200);
+ setExists(checkStatus(res.status, exists));
+ toast({
+ title: 'Success',
+ status: 'success',
+ isClosable: true,
+ })
})
.catch((err) => {
- setExists(false);
+ setExists(exists);
setLoading(false);
toast({
title: err.response?.data?.error || err.response?.data || err.message,
@@ -34,19 +49,19 @@ export default function MigrateButton({
}
return (
: exists ? : !loading ? :
+ error ? : exists ? : !loading ? :
)}
colorScheme={
- exists ? 'green' : loading ? 'teal' : error ? 'red' : 'teal'
+ exists ? 'red' : loading ? 'teal' : error ? 'red' : 'teal'
}
onClick={() => handleClick()}
>
- {exists ? 'Ok' : loading ? '' : error ? 'Error!' : 'Migrate'}
+ {exists ? 'Delete' : loading ? '' : error ? 'Error!' : 'Migrate'}
);
}
diff --git a/astronomer_starship/src/pages/DAGHistoryPage.jsx b/astronomer_starship/src/pages/DAGHistoryPage.jsx
index 1f73a8e..03d6cf3 100644
--- a/astronomer_starship/src/pages/DAGHistoryPage.jsx
+++ b/astronomer_starship/src/pages/DAGHistoryPage.jsx
@@ -19,9 +19,8 @@ import {
} from '@chakra-ui/react';
import PropTypes from 'prop-types';
import axios from 'axios';
-import { MdErrorOutline } from 'react-icons/md';
+import { MdErrorOutline, MdDeleteForever } from 'react-icons/md';
import { GrDocumentMissing } from 'react-icons/gr';
-import { FaCheck } from 'react-icons/fa';
import { GoUpload } from 'react-icons/go';
import humanFormat from 'human-format';
import { ExternalLinkIcon, RepeatIcon } from '@chakra-ui/icons';
@@ -56,6 +55,44 @@ function DAGHistoryMigrateButton({
const percent = 100;
function handleClick() {
+
+ function deleteRuns() {
+ setLoadPerc(percent * 0.5);
+ axios({
+ method: 'delete',
+ url: proxyUrl(url + constants.DAG_RUNS_ROUTE),
+ headers: proxyHeaders(token),
+ params: { dag_id: dagId },
+ }).then((res) => {
+ setExists(!(res.status === 204));
+ dispatch({
+ type: 'set-dags-data',
+ dagsData: {
+ [dagId]: {
+ remote: {
+ dag_run_count: 0,
+ },
+ },
+ },
+ });
+ setLoadPerc(percent * 1);
+ setLoadPerc(0);
+ }).catch((err) => {
+ setExists(false);
+ setLoadPerc(percent * 0);
+ toast({
+ title: err.response?.data?.error || err.response?.data || err.message,
+ status: 'error',
+ isClosable: true,
+ });
+ setError(err);
+ });
+ }
+
+ if (exists) {
+ deleteRuns();
+ return;
+ }
const errFn = (err) => {
setExists(false);
// noinspection PointlessArithmeticExpressionJS
@@ -117,23 +154,23 @@ function DAGHistoryMigrateButton({
return (
- : exists ?
+ : exists ?
: isDisabled ?
: !loadPerc ?
:
)}
colorScheme={
- exists ? 'green' : isDisabled ? 'gray' : error ? 'red' : 'teal'
+ exists ? 'red' : isDisabled ? 'gray' : error ? 'red' : 'teal'
}
onClick={() => handleClick()}
>
- {exists ? 'Ok'
+ {exists ? 'Delete'
: loadPerc ? (
)
@@ -326,8 +363,7 @@ export default function DAGHistoryPage({ state, dispatch }) {
isDisabled={
!info.row.original.remote?.dag_id ? 'DAG not found in remote'
: !info.row.original.local.dag_run_count ? 'No DAG Runs to migrate'
- : info.row.original.remote?.dag_run_count ? 'DAG Runs already exist in remote'
- : false
+ : false
}
dispatch={dispatch}
/>
diff --git a/astronomer_starship/starship.py b/astronomer_starship/starship.py
index 28d607c..653d11a 100644
--- a/astronomer_starship/starship.py
+++ b/astronomer_starship/starship.py
@@ -13,7 +13,7 @@
from airflow.security import permissions
from airflow.www import auth
-ALLOWED_PROXY_METHODS = ["GET", "POST", "PATCH"]
+ALLOWED_PROXY_METHODS = ["GET", "POST", "PATCH", "DELETE"]
class Starship(BaseView):
diff --git a/astronomer_starship/starship_api.py b/astronomer_starship/starship_api.py
index 0d99cd5..b80b20f 100644
--- a/astronomer_starship/starship_api.py
+++ b/astronomer_starship/starship_api.py
@@ -266,7 +266,7 @@ def env_vars(self):
return starship_route(get=starship_compat.get_env_vars)
# @auth.has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_POOL)])
- @expose("/pools", methods=["GET", "POST"])
+ @expose("/pools", methods=["GET", "POST", "DELETE"])
@csrf.exempt
def pools(self):
"""
@@ -306,15 +306,26 @@ def pools(self):
| include_deferred* | >=2.7 | bool | True |
**Response:** List of Pools, as `GET` Response
+
+ ### DELETE /api/starship/pools
+
+ **Parameters:** Args
+
+ | Field (*=Required) | Version | Type | Example |
+ |---------------------|---------|------|---------|
+ | name* | | str | my_pool |
+
+ **Response:** None
"""
return starship_route(
get=starship_compat.get_pools,
post=starship_compat.set_pool,
+ delete=starship_compat.delete_pool,
kwargs_fn=partial(get_kwargs_fn, attrs=starship_compat.pool_attrs()),
)
# @auth.has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_VARIABLE)])
- @expose("/variables", methods=["GET", "POST"])
+ @expose("/variables", methods=["GET", "POST", "DELETE"])
@csrf.exempt
def variables(self):
"""
@@ -353,15 +364,26 @@ def variables(self):
| description | | str | My Var |
**Response:** List of Variables, as `GET` Response
+
+ ### `DELETE /api/starship/variable`
+
+ **Parameters:** Args
+
+ | Field (*=Required) | Version | Type | Example |
+ |---------------------|---------|------|---------|
+ | key* | | str | key |
+
+ **Response:** None
"""
return starship_route(
get=starship_compat.get_variables,
post=starship_compat.set_variable,
+ delete=starship_compat.delete_variable,
kwargs_fn=partial(get_kwargs_fn, attrs=starship_compat.variable_attrs()),
)
# @auth.has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_CONNECTION)])
- @expose("/connections", methods=["GET", "POST"])
+ @expose("/connections", methods=["GET", "POST", "DELETE"])
@csrf.exempt
def connections(self):
"""
@@ -415,10 +437,21 @@ def connections(self):
| description | | str | My Conn |
**Response:** List of Connections, as `GET` Response
+
+ ### DELETE /api/starship/connections
+
+ **Parameters:** Args
+
+ | Field (*=Required) | Version | Type | Example |
+ |---------------------|---------|------|---------|
+ | conn_id* | | str | my_conn |
+
+ **Response:** None
"""
return starship_route(
get=starship_compat.get_connections,
post=starship_compat.set_connection,
+ delete=starship_compat.delete_connection,
kwargs_fn=partial(get_kwargs_fn, attrs=starship_compat.connection_attrs()),
)
@@ -479,7 +512,7 @@ def dags(self):
)
# @auth.has_access([(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN)])
- @expose("/dag_runs", methods=["GET", "POST"])
+ @expose("/dag_runs", methods=["GET", "POST", "DELETE"])
@csrf.exempt
def dag_runs(self):
"""
@@ -563,10 +596,21 @@ def dag_runs(self):
| last_scheduling_decision | | date | 1970-01-01T00:00:00+00:00 |
| dag_hash | | str | ... |
| clear_number | >=2.8 | int | 0 |
+
+ ### DELETE /api/starship/dag_runs
+
+ **Parameters:** Args
+
+ | Field (*=Required) | Version | Type | Example |
+ |--------------------------|---------|--------------------|-----------------------------------|
+ | dag_id* | | str | dag_0 |
+
+ **Response:** None
"""
return starship_route(
get=starship_compat.get_dag_runs,
post=starship_compat.set_dag_runs,
+ delete=starship_compat.delete_dag_runs,
kwargs_fn=partial(get_kwargs_fn, attrs=starship_compat.dag_runs_attrs()),
)
diff --git a/tests/api_integration_test.py b/tests/api_integration_test.py
index 3a1bb26..119db48 100644
--- a/tests/api_integration_test.py
+++ b/tests/api_integration_test.py
@@ -46,6 +46,13 @@ def set_and_get(route, test_input, token, url):
assert test_input in actual.json(), actual.text
+def delete(route, test_input, token, url):
+ actual = requests.post(
+ f"{url}/{route}", params=test_input, **get_extras(url, token)
+ )
+ assert actual.status_code == 204, actual.text
+
+
@pytest.fixture(
params=list(URLS_AND_TOKENS.values()),
ids=list(URLS_AND_TOKENS.keys()),
@@ -64,6 +71,7 @@ def test_integration_variables(url_and_token_and_starship):
route = "api/starship/variables"
test_input = get_test_data(method="POST", attrs=starship.variable_attrs())
set_and_get(route, test_input, token, url)
+ delete(route, test_input, token, url)
@manual_tests
@@ -72,6 +80,7 @@ def test_integration_pools(url_and_token_and_starship):
route = "api/starship/pools"
test_input = get_test_data(method="POST", attrs=starship.pool_attrs())
set_and_get(route, test_input, token, url)
+ delete(route, test_input, token, url)
@manual_tests
@@ -80,6 +89,7 @@ def test_integration_connections(url_and_token_and_starship):
route = "api/starship/connections"
test_input = get_test_data(method="POST", attrs=starship.connection_attrs())
set_and_get(route, test_input, token, url)
+ delete(route, test_input, token, url)
@manual_tests
@@ -160,8 +170,10 @@ def test_integration_dag_runs_and_task_instances(url_and_token_and_starship):
}
assert test_input["dag_runs"][0] == actual_dag_run, actual_dag_run
+ # Delete test
+ delete(route, test_input, token, url)
+
route = "api/starship/task_instances"
- requests.delete(f"{url}/api/v1/dags/{dag_id}", **get_extras(url, token))
test_input = get_test_data(method="POST", attrs=starship.task_instances_attrs())
test_input = json.loads(json.dumps(test_input, default=str))
diff --git a/tests/docker_test/docker_test.py b/tests/docker_test/docker_test.py
index 930f475..bcfa0fe 100644
--- a/tests/docker_test/docker_test.py
+++ b/tests/docker_test/docker_test.py
@@ -3,6 +3,8 @@
import os
import pytest
+from http import HTTPStatus
+
from astronomer_starship.compat.starship_compatability import (
StarshipCompatabilityLayer,
get_test_data,
@@ -27,6 +29,10 @@ def test_variables(starship):
actual = starship.get_variables()
assert test_input in actual, actual
+ test_input = get_test_data(method="DELETE", attrs=starship.variable_attrs())
+ actual = starship.delete_variable(**test_input)
+ assert actual.status_code == HTTPStatus.NO_CONTENT, actual
+
@docker_test
def test_pools(starship):
@@ -45,6 +51,10 @@ def test_pools(starship):
actual = starship.get_pools()
assert expected in actual, actual
+ test_input = get_test_data(method="DELETE", attrs=starship.pool_attrs())
+ actual = starship.delete_pool(**test_input)
+ assert actual.status_code == HTTPStatus.NO_CONTENT, actual
+
@docker_test
def test_connections(starship):
@@ -55,6 +65,10 @@ def test_connections(starship):
actual = starship.get_connections()
assert test_input in actual, actual
+ test_input = get_test_data(method="DELETE", attrs=starship.connection_attrs())
+ actual = starship.delete_connection(**test_input)
+ assert actual.status_code == HTTPStatus.NO_CONTENT, actual
+
@docker_test
def test_dags(starship):
@@ -118,3 +132,7 @@ def test_dag_runs_and_task_instances(starship):
assert json.dumps(actual_task_instances, default=str) in json.dumps(
test_input["task_instances"], default=str
), actual_task_instances
+
+ test_input = get_test_data(method="DELETE", attrs=starship.dag_runs_attrs())
+ actual = starship.delete_dag_runs(**test_input)
+ assert actual.status_code == HTTPStatus.NO_CONTENT, actual