Skip to content

Commit

Permalink
include limit and offset in request body schema for List task instanc…
Browse files Browse the repository at this point in the history
…es (batch) endpoint (apache#42870)

* add offset and limit

* add offset and limit

* add offset and limit

* add tests

* add test for default offset and limit

* fix

* Update airflow/api_connexion/endpoints/task_instance_endpoint.py

Co-authored-by: Ephraim Anierobi <[email protected]>

* Update tests/api_connexion/endpoints/test_task_instance_endpoint.py

Co-authored-by: Ephraim Anierobi <[email protected]>

* fix test

---------

Co-authored-by: Ephraim Anierobi <[email protected]>
  • Loading branch information
rawwar and ephraimbuddy authored Oct 28, 2024
1 parent 8a6b1fc commit 7b85d4e
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 0 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ def get_task_instances_batch(session: Session = NEW_SESSION) -> APIResponse:
except _UnsupportedOrderBy as e:
raise BadRequest(detail=f"Ordering with {e.order_by!r} is not supported")

ti_query = ti_query.offset(data["page_offset"]).limit(data["page_limit"])
task_instances = session.scalars(ti_query)

return task_instance_collection_schema.dump(
Expand Down
9 changes: 9 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5038,6 +5038,15 @@ components:
ListTaskInstanceForm:
type: object
properties:
page_offset:
type: integer
minimum: 0
description: The number of items to skip before starting to collect the result set.
page_limit:
type: integer
minimum: 1
default: 100
description: The numbers of items to return.
dag_ids:
type: array
items:
Expand Down
7 changes: 7 additions & 0 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2256,6 +2256,13 @@ export interface components {
end_date_lte?: string;
};
ListTaskInstanceForm: {
/** @description The number of items to skip before starting to collect the result set. */
page_offset?: number;
/**
* @description The numbers of items to return.
* @default 100
*/
page_limit?: number;
/**
* @description Return objects with specific DAG IDs.
* The value can be repeated to retrieve multiple matching values (OR condition).
Expand Down
50 changes: 50 additions & 0 deletions tests/api_connexion/endpoints/test_task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,56 @@ def test_should_raise_400_for_naive_and_bad_datetime(self, payload, expected, se
assert response.status_code == 400
assert expected in response.json["detail"]

def test_should_respond_200_for_pagination(self, session):
dag_id = "example_python_operator"

self.create_task_instances(
session,
task_instances=[
{"start_date": DEFAULT_DATETIME_1 + dt.timedelta(minutes=(i + 1))} for i in range(10)
],
dag_id=dag_id,
)

# First 5 items
response_batch1 = self.client.post(
"/api/v1/dags/~/dagRuns/~/taskInstances/list",
environ_overrides={"REMOTE_USER": "test"},
json={"page_limit": 5, "page_offset": 0},
)
assert response_batch1.status_code == 200, response_batch1.json
num_entries_batch1 = len(response_batch1.json["task_instances"])
assert num_entries_batch1 == 5
assert len(response_batch1.json["task_instances"]) == 5

# 5 items after that
response_batch2 = self.client.post(
"/api/v1/dags/~/dagRuns/~/taskInstances/list",
environ_overrides={"REMOTE_USER": "test"},
json={"page_limit": 5, "page_offset": 5},
)
assert response_batch2.status_code == 200, response_batch2.json
num_entries_batch2 = len(response_batch2.json["task_instances"])
assert num_entries_batch2 > 0
assert len(response_batch2.json["task_instances"]) > 0

# Match
ti_count = 9
assert response_batch1.json["total_entries"] == response_batch2.json["total_entries"] == ti_count
assert (num_entries_batch1 + num_entries_batch2) == ti_count
assert response_batch1 != response_batch2

# default limit and offset
response_batch3 = self.client.post(
"/api/v1/dags/~/dagRuns/~/taskInstances/list",
environ_overrides={"REMOTE_USER": "test"},
json={},
)

num_entries_batch3 = len(response_batch3.json["task_instances"])
assert num_entries_batch3 == ti_count
assert len(response_batch3.json["task_instances"]) == ti_count


class TestPostClearTaskInstances(TestTaskInstanceEndpoint):
@pytest.mark.parametrize(
Expand Down

0 comments on commit 7b85d4e

Please sign in to comment.