diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index 74442b96..fb6385c2 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -95,6 +95,7 @@ def __init__( self._lookback_window: int = 0 self._parent_state: Optional[StreamState] = None self._over_limit: int = 0 + self._use_global_cursor: bool = False self._partition_serializer = PerPartitionKeySerializer() self._set_initial_state(stream_state) @@ -105,16 +106,18 @@ def cursor_field(self) -> CursorField: @property def state(self) -> MutableMapping[str, Any]: - states = [] - for partition_tuple, cursor in self._cursor_per_partition.items(): - if cursor.state: - states.append( - { - "partition": self._to_dict(partition_tuple), - "cursor": copy.deepcopy(cursor.state), - } - ) - state: dict[str, Any] = {self._PERPARTITION_STATE_KEY: states} + state: dict[str, Any] = {"use_global_cursor": self._use_global_cursor} + if not self._use_global_cursor: + states = [] + for partition_tuple, cursor in self._cursor_per_partition.items(): + if cursor.state: + states.append( + { + "partition": self._to_dict(partition_tuple), + "cursor": copy.deepcopy(cursor.state), + } + ) + state[self._PERPARTITION_STATE_KEY] = states if self._global_cursor: state[self._GLOBAL_STATE_KEY] = self._global_cursor @@ -147,7 +150,8 @@ def close_partition(self, partition: Partition) -> None: < cursor.state[self.cursor_field.cursor_field_key] ): self._new_global_cursor = copy.deepcopy(cursor.state) - self._emit_state_message() + if not self._use_global_cursor: + self._emit_state_message() def ensure_at_least_one_state_emitted(self) -> None: """ @@ -225,14 +229,18 @@ def _ensure_partition_limit(self) -> None: """ with self._lock: while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: + self._over_limit += 1 # Try removing finished partitions first for partition_key in list(self._cursor_per_partition.keys()): - if partition_key in self._finished_partitions: + if ( + partition_key in self._finished_partitions + and self._semaphore_per_partition[partition_key]._value == 0 + ): oldest_partition = self._cursor_per_partition.pop( partition_key ) # Remove the oldest partition logger.warning( - f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}." + f"The maximum number of partitions has been reached. Dropping the oldest finished partition: {oldest_partition}. Over limit: {self._over_limit}." ) break else: @@ -297,6 +305,8 @@ def _set_initial_state(self, stream_state: StreamState) -> None: self._new_global_cursor = deepcopy(stream_state) else: + self._use_global_cursor = stream_state.get("use_global_cursor", False) + self._lookback_window = int(stream_state.get("lookback_window", 0)) for state in stream_state.get(self._PERPARTITION_STATE_KEY, []): @@ -320,6 +330,9 @@ def _set_initial_state(self, stream_state: StreamState) -> None: self._partition_router.set_initial_state(stream_state) def observe(self, record: Record) -> None: + if not self._use_global_cursor and self.limit_reached(): + self._use_global_cursor = True + if not record.associated_slice: raise ValueError( "Invalid state as stream slices that are emitted should refer to an existing cursor" @@ -358,3 +371,6 @@ def _get_cursor(self, record: Record) -> ConcurrentCursor: ) cursor = self._cursor_per_partition[partition_key] return cursor + + def limit_reached(self) -> bool: + return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index 3fc9e001..3d7f9812 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -1,5 +1,5 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. - +import copy from copy import deepcopy from datetime import datetime, timedelta from typing import Any, List, Mapping, MutableMapping, Optional, Union @@ -721,6 +721,7 @@ def _run_read( "cursor": {"created_at": VOTE_300_CREATED_AT}, }, ], + "use_global_cursor": False, "lookback_window": 1, "parent_state": {}, "state": {"created_at": VOTE_100_CREATED_AT}, @@ -1121,6 +1122,7 @@ def run_incremental_parent_state_test( } }, "lookback_window": 1, + "use_global_cursor": False, "states": [ { "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}}, @@ -1170,8 +1172,66 @@ def test_incremental_parent_state( ) +STATE_MIGRATION_EXPECTED_STATE = { + "state": {"created_at": VOTE_100_CREATED_AT}, + "parent_state": { + "post_comments": { + "use_global_cursor": False, + "state": {"updated_at": COMMENT_10_UPDATED_AT}, + "parent_state": {"posts": {"updated_at": POST_1_UPDATED_AT}}, + "lookback_window": 1, + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": COMMENT_10_UPDATED_AT}, + }, + { + "partition": {"id": 2, "parent_slice": {}}, + "cursor": {"updated_at": COMMENT_20_UPDATED_AT}, + }, + { + "partition": {"id": 3, "parent_slice": {}}, + "cursor": {"updated_at": COMMENT_30_UPDATED_AT}, + }, + ], + } + }, + "lookback_window": 1, + "use_global_cursor": False, + "states": [ + { + "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_100_CREATED_AT}, + }, + { + "partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_111_CREATED_AT}, + }, + { + "partition": {"id": 12, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": PARTITION_SYNC_START_TIME}, + }, + { + "partition": {"id": 20, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_200_CREATED_AT}, + }, + { + "partition": {"id": 21, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_210_CREATED_AT}, + }, + { + "partition": {"id": 30, "parent_slice": {"id": 3, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_300_CREATED_AT}, + }, + ], +} +STATE_MIGRATION_GLOBAL_EXPECTED_STATE = copy.deepcopy(STATE_MIGRATION_EXPECTED_STATE) +del STATE_MIGRATION_GLOBAL_EXPECTED_STATE["states"] +STATE_MIGRATION_GLOBAL_EXPECTED_STATE["use_global_cursor"] = True + + @pytest.mark.parametrize( - "test_name, manifest, mock_requests, expected_records, expected_state", + "test_name, manifest, mock_requests, expected_records", [ ( "test_incremental_parent_state", @@ -1326,80 +1386,45 @@ def test_incremental_parent_state( "id": 300, }, ], - # Expected state + ), + ], +) +@pytest.mark.parametrize( + "initial_state, expected_state", + [ + ({"created_at": PARTITION_SYNC_START_TIME}, STATE_MIGRATION_EXPECTED_STATE), + ( { - "state": {"created_at": VOTE_100_CREATED_AT}, + "state": {"created_at": PARTITION_SYNC_START_TIME}, + "lookback_window": 0, + "use_global_cursor": False, "parent_state": { "post_comments": { - "use_global_cursor": False, - "state": {"updated_at": COMMENT_10_UPDATED_AT}, - "parent_state": {"posts": {"updated_at": POST_1_UPDATED_AT}}, - "lookback_window": 1, - "states": [ - { - "partition": {"id": 1, "parent_slice": {}}, - "cursor": {"updated_at": COMMENT_10_UPDATED_AT}, - }, - { - "partition": {"id": 2, "parent_slice": {}}, - "cursor": {"updated_at": COMMENT_20_UPDATED_AT}, - }, - { - "partition": {"id": 3, "parent_slice": {}}, - "cursor": {"updated_at": COMMENT_30_UPDATED_AT}, - }, - ], + "state": {"updated_at": PARTITION_SYNC_START_TIME}, + "parent_state": {"posts": {"updated_at": PARTITION_SYNC_START_TIME}}, + "lookback_window": 0, } }, - "lookback_window": 1, - "states": [ - { - "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}}, - "cursor": {"created_at": VOTE_100_CREATED_AT}, - }, - { - "partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}}, - "cursor": {"created_at": VOTE_111_CREATED_AT}, - }, - { - "partition": {"id": 12, "parent_slice": {"id": 1, "parent_slice": {}}}, - "cursor": {"created_at": PARTITION_SYNC_START_TIME}, - }, - { - "partition": {"id": 20, "parent_slice": {"id": 2, "parent_slice": {}}}, - "cursor": {"created_at": VOTE_200_CREATED_AT}, - }, - { - "partition": {"id": 21, "parent_slice": {"id": 2, "parent_slice": {}}}, - "cursor": {"created_at": VOTE_210_CREATED_AT}, - }, - { - "partition": {"id": 30, "parent_slice": {"id": 3, "parent_slice": {}}}, - "cursor": {"created_at": VOTE_300_CREATED_AT}, - }, - ], }, + STATE_MIGRATION_EXPECTED_STATE, ), - ], -) -@pytest.mark.parametrize( - "initial_state", - [ - {"created_at": PARTITION_SYNC_START_TIME}, - { - "state": {"created_at": PARTITION_SYNC_START_TIME}, - "lookback_window": 0, - "use_global_cursor": True, - "parent_state": { - "post_comments": { - "state": {"updated_at": PARTITION_SYNC_START_TIME}, - "parent_state": {"posts": {"updated_at": PARTITION_SYNC_START_TIME}}, - "lookback_window": 0, - } + ( + { + "state": {"created_at": PARTITION_SYNC_START_TIME}, + "lookback_window": 0, + "use_global_cursor": True, + "parent_state": { + "post_comments": { + "state": {"updated_at": PARTITION_SYNC_START_TIME}, + "parent_state": {"posts": {"updated_at": PARTITION_SYNC_START_TIME}}, + "lookback_window": 0, + } + }, }, - }, + STATE_MIGRATION_GLOBAL_EXPECTED_STATE, + ), ], - ids=["legacy_python_format", "low_code_global_format"], + ids=["legacy_python_format", "low_code_per_partition_state", "low_code_global_format"], ) def test_incremental_parent_state_migration( test_name, manifest, mock_requests, expected_records, initial_state, expected_state @@ -1510,6 +1535,7 @@ def test_incremental_parent_state_migration( ], "state": {"created_at": INITIAL_GLOBAL_CURSOR}, "lookback_window": 1, + "use_global_cursor": False, }, ), ], @@ -1677,13 +1703,14 @@ def test_incremental_parent_state_no_slices( "cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, }, ], - "use_global_cursor": True, + "use_global_cursor": False, "state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, "lookback_window": 0, }, # Expected state { "lookback_window": 1, + "use_global_cursor": False, "state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, "states": [ { @@ -1953,6 +1980,7 @@ def test_incremental_parent_state_no_records( }, "state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, "lookback_window": 86400, + "use_global_cursor": False, "states": [ { "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}}, @@ -2220,6 +2248,7 @@ def test_incremental_substream_error( }, # Expected state { + "use_global_cursor": False, "lookback_window": 1, "state": {"updated_at": "2024-01-25T00:00:00Z"}, "states": [ @@ -2318,6 +2347,7 @@ def test_incremental_list_partition_router( # Expected state { "lookback_window": 0, + "use_global_cursor": False, "state": {"updated_at": "2024-01-08T00:00:00Z"}, "states": [ {"cursor": {"updated_at": "2024-01-20T00:00:00Z"}, "partition": {"id": "1"}}, @@ -2845,6 +2875,7 @@ def test_incremental_error( } }, "lookback_window": 1, + "use_global_cursor": False, "states": [ { "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}},