From 7b85d4e6bdc5a9f80cd8ccf1d65c248623b5e1d4 Mon Sep 17 00:00:00 2001 From: Kalyan R Date: Mon, 28 Oct 2024 18:09:22 +0530 Subject: [PATCH] include limit and offset in request body schema for List task instances (batch) endpoint (#42870) * add offset and limit * add offset and limit * add offset and limit * add tests * add test for default offset and limit * fix * Update airflow/api_connexion/endpoints/task_instance_endpoint.py Co-authored-by: Ephraim Anierobi * Update tests/api_connexion/endpoints/test_task_instance_endpoint.py Co-authored-by: Ephraim Anierobi * fix test --------- Co-authored-by: Ephraim Anierobi --- .../endpoints/task_instance_endpoint.py | 1 + airflow/api_connexion/openapi/v1.yaml | 9 ++++ airflow/www/static/js/types/api-generated.ts | 7 +++ .../endpoints/test_task_instance_endpoint.py | 50 +++++++++++++++++++ 4 files changed, 67 insertions(+) diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index 0e98173f68a5f..2c32bad9f347d 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -425,6 +425,7 @@ def get_task_instances_batch(session: Session = NEW_SESSION) -> APIResponse: except _UnsupportedOrderBy as e: raise BadRequest(detail=f"Ordering with {e.order_by!r} is not supported") + ti_query = ti_query.offset(data["page_offset"]).limit(data["page_limit"]) task_instances = session.scalars(ti_query) return task_instance_collection_schema.dump( diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index 62d42264073ff..252d4cd6ed3eb 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -5038,6 +5038,15 @@ components: ListTaskInstanceForm: type: object properties: + page_offset: + type: integer + minimum: 0 + description: The number of items to skip before starting to collect the result set. + page_limit: + type: integer + minimum: 1 + default: 100 + description: The numbers of items to return. dag_ids: type: array items: diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index 930c6834d7376..e5c3434ae9003 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -2256,6 +2256,13 @@ export interface components { end_date_lte?: string; }; ListTaskInstanceForm: { + /** @description The number of items to skip before starting to collect the result set. */ + page_offset?: number; + /** + * @description The numbers of items to return. + * @default 100 + */ + page_limit?: number; /** * @description Return objects with specific DAG IDs. * The value can be repeated to retrieve multiple matching values (OR condition). diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index 1051aa21e0cc9..d8b9078bbeb4e 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -1032,6 +1032,56 @@ def test_should_raise_400_for_naive_and_bad_datetime(self, payload, expected, se assert response.status_code == 400 assert expected in response.json["detail"] + def test_should_respond_200_for_pagination(self, session): + dag_id = "example_python_operator" + + self.create_task_instances( + session, + task_instances=[ + {"start_date": DEFAULT_DATETIME_1 + dt.timedelta(minutes=(i + 1))} for i in range(10) + ], + dag_id=dag_id, + ) + + # First 5 items + response_batch1 = self.client.post( + "/api/v1/dags/~/dagRuns/~/taskInstances/list", + environ_overrides={"REMOTE_USER": "test"}, + json={"page_limit": 5, "page_offset": 0}, + ) + assert response_batch1.status_code == 200, response_batch1.json + num_entries_batch1 = len(response_batch1.json["task_instances"]) + assert num_entries_batch1 == 5 + assert len(response_batch1.json["task_instances"]) == 5 + + # 5 items after that + response_batch2 = self.client.post( + "/api/v1/dags/~/dagRuns/~/taskInstances/list", + environ_overrides={"REMOTE_USER": "test"}, + json={"page_limit": 5, "page_offset": 5}, + ) + assert response_batch2.status_code == 200, response_batch2.json + num_entries_batch2 = len(response_batch2.json["task_instances"]) + assert num_entries_batch2 > 0 + assert len(response_batch2.json["task_instances"]) > 0 + + # Match + ti_count = 9 + assert response_batch1.json["total_entries"] == response_batch2.json["total_entries"] == ti_count + assert (num_entries_batch1 + num_entries_batch2) == ti_count + assert response_batch1 != response_batch2 + + # default limit and offset + response_batch3 = self.client.post( + "/api/v1/dags/~/dagRuns/~/taskInstances/list", + environ_overrides={"REMOTE_USER": "test"}, + json={}, + ) + + num_entries_batch3 = len(response_batch3.json["task_instances"]) + assert num_entries_batch3 == ti_count + assert len(response_batch3.json["task_instances"]) == ti_count + class TestPostClearTaskInstances(TestTaskInstanceEndpoint): @pytest.mark.parametrize(