Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Low-Code Concurrent CDK): Add ConcurrentPerPartitionCursor #111

Open
wants to merge 37 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
d326a26
Add concurrent PerPartitionCursor
tolik0 Dec 3, 2024
37efbae
Merge branch 'main' into tolik0/concurrent-perpartitioncursor
tolik0 Dec 3, 2024
a3304b9
Use request options provider for ConcurrentPerPartitionCursor
tolik0 Dec 4, 2024
4ddbb84
Delete unused DeclarativePartitionFactory
tolik0 Dec 4, 2024
41b029d
Fixed record filter
tolik0 Dec 5, 2024
eb8eec8
Merge branch 'main' into tolik0/concurrent-perpartitioncursor
tolik0 Dec 5, 2024
dfcf17f
Add unit test
tolik0 Dec 5, 2024
b84e68a
Fix record filter unit tests
tolik0 Dec 6, 2024
2038075
Update poetry lock
tolik0 Dec 6, 2024
c77b9a2
Merge branch 'main' into tolik0/concurrent-perpartitioncursor
tolik0 Dec 6, 2024
c59ed5a
Update poetry lock again
tolik0 Dec 6, 2024
a01c0b5
Fix mypy error
tolik0 Dec 9, 2024
357a925
Add global cursor with fallback
tolik0 Dec 18, 2024
79ffb77
Merge branch 'main' into tolik0/concurrent-perpartitioncursor
tolik0 Dec 18, 2024
a36726b
Auto-fix lint and format issues
Dec 18, 2024
5ee05f1
Fix parent state update in case of error
tolik0 Dec 18, 2024
24268e2
Fix error in tests
tolik0 Dec 26, 2024
660da93
Fix unit tests
tolik0 Jan 13, 2025
23d3059
Add request_option_provider to _request_headers of SimpleRetriever
tolik0 Jan 13, 2025
f3a00ff
Merge branch 'main' into tolik0/concurrent-perpartitioncursor
tolik0 Jan 14, 2025
d6bec35
Maxi297/fix simple retriever request headers (#217)
maxi297 Jan 14, 2025
11b86a9
Merge branch 'tolik0/fix-simple-retriever-request-headers' into tolik…
tolik0 Jan 14, 2025
871f1fe
Fix format
tolik0 Jan 14, 2025
6d2343a
Merge branch 'tolik0/fix-simple-retriever-request-headers' into tolik…
tolik0 Jan 14, 2025
ed687f5
Fix merge conflict
tolik0 Jan 14, 2025
cfef872
Add lookback window handling
tolik0 Jan 14, 2025
4260415
Fix state handling in concurrent cursor
tolik0 Jan 14, 2025
089137f
Fix unit test
tolik0 Jan 14, 2025
3489c7a
Fix mypy errors
tolik0 Jan 15, 2025
301bd31
Add error test, fix mypy errors
tolik0 Jan 15, 2025
9574f8c
Fix stream slice mypy errors
tolik0 Jan 15, 2025
5ab4ee3
Fix lookback window
tolik0 Jan 16, 2025
36c4992
Refactor unit tests
tolik0 Jan 16, 2025
b6707ef
Refactor to add helper to get retriever
tolik0 Jan 16, 2025
cf5107f
Refactor to add helper to get retriever
tolik0 Jan 16, 2025
df0993e
Add class variable for state keys
tolik0 Jan 16, 2025
daa6873
Add exception if stream_slices was executed two times
tolik0 Jan 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 80 additions & 16 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
ClientSideIncrementalRecordFilterDecorator,
)
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import (
PerPartitionWithGlobalCursor,
)
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
Expand All @@ -32,7 +35,7 @@
ModelToComponentFactory,
)
from airbyte_cdk.sources.declarative.requesters import HttpRequester
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
from airbyte_cdk.sources.declarative.retrievers import Retriever, SimpleRetriever
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
DeclarativePartitionFactory,
StreamSlicerPartitionGenerator,
Expand Down Expand Up @@ -230,21 +233,7 @@ def _group_streams(
stream_state=stream_state,
)

retriever = declarative_stream.retriever

# This is an optimization so that we don't invoke any cursor or state management flows within the
# low-code framework because state management is handled through the ConcurrentCursor.
if declarative_stream and isinstance(retriever, SimpleRetriever):
# Also a temporary hack. In the legacy Stream implementation, as part of the read,
# set_initial_state() is called to instantiate incoming state on the cursor. Although we no
# longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components
# like StopConditionPaginationStrategyDecorator and ClientSideIncrementalRecordFilterDecorator
# still rely on a DatetimeBasedCursor that is properly initialized with state.
if retriever.cursor:
retriever.cursor.set_initial_state(stream_state=stream_state)
# We zero it out here, but since this is a cursor reference, the state is still properly
# instantiated for the other components that reference it
retriever.cursor = None
retriever = self._get_retriever(declarative_stream, stream_state)

partition_generator = StreamSlicerPartitionGenerator(
DeclarativePartitionFactory(
Expand Down Expand Up @@ -304,6 +293,60 @@ def _group_streams(
cursor=final_state_cursor,
)
)
elif (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we filter only on list partition router until we support the global cursor part?

incremental_sync_component_definition
and incremental_sync_component_definition.get("type", "")
== DatetimeBasedCursorModel.__name__
and self._stream_supports_concurrent_partition_processing(
declarative_stream=declarative_stream
)
and hasattr(declarative_stream.retriever, "stream_slicer")
and isinstance(
declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor
)
):
stream_state = state_manager.get_stream_state(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inside of the elif clause seems very similar to the one for if self._is_datetime_incremental_without_partition_routing(declarative_stream, incremental_sync_component_definition). Should we merge them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added helper method

stream_name=declarative_stream.name, namespace=declarative_stream.namespace
)
partition_router = declarative_stream.retriever.stream_slicer._partition_router

perpartition_cursor = (
self._constructor.create_concurrent_cursor_from_perpartition_cursor(
state_manager=state_manager,
model_type=DatetimeBasedCursorModel,
component_definition=incremental_sync_component_definition,
stream_name=declarative_stream.name,
stream_namespace=declarative_stream.namespace,
config=config or {},
stream_state=stream_state,
partition_router=partition_router,
)
)

retriever = self._get_retriever(declarative_stream, stream_state)

partition_generator = StreamSlicerPartitionGenerator(
DeclarativePartitionFactory(
declarative_stream.name,
declarative_stream.get_json_schema(),
retriever,
self.message_repository,
),
perpartition_cursor,
)

concurrent_streams.append(
DefaultStream(
partition_generator=partition_generator,
name=declarative_stream.name,
json_schema=declarative_stream.get_json_schema(),
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
logger=self.logger,
cursor=perpartition_cursor,
)
)
else:
synchronous_streams.append(declarative_stream)
else:
Expand Down Expand Up @@ -394,6 +437,27 @@ def _stream_supports_concurrent_partition_processing(
return False
return True

def _get_retriever(
self, declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
) -> Retriever:
retriever = declarative_stream.retriever

# This is an optimization so that we don't invoke any cursor or state management flows within the
# low-code framework because state management is handled through the ConcurrentCursor.
if declarative_stream and isinstance(retriever, SimpleRetriever):
# Also a temporary hack. In the legacy Stream implementation, as part of the read,
# set_initial_state() is called to instantiate incoming state on the cursor. Although we no
# longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components
# like StopConditionPaginationStrategyDecorator and ClientSideIncrementalRecordFilterDecorator
# still rely on a DatetimeBasedCursor that is properly initialized with state.
if retriever.cursor:
retriever.cursor.set_initial_state(stream_state=stream_state)
# We zero it out here, but since this is a cursor reference, the state is still properly
# instantiated for the other components that reference it
retriever.cursor = None

return retriever

@staticmethod
def _select_streams(
streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
Expand Down
8 changes: 3 additions & 5 deletions airbyte_cdk/sources/declarative/extractors/record_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,11 @@ class ClientSideIncrementalRecordFilterDecorator(RecordFilter):

def __init__(
self,
date_time_based_cursor: DatetimeBasedCursor,
substream_cursor: Optional[Union[PerPartitionWithGlobalCursor, GlobalSubstreamCursor]],
cursor: Union[DatetimeBasedCursor, PerPartitionWithGlobalCursor, GlobalSubstreamCursor],
**kwargs: Any,
):
super().__init__(**kwargs)
self._date_time_based_cursor = date_time_based_cursor
self._substream_cursor = substream_cursor
self._cursor = cursor

def filter_records(
self,
Expand All @@ -77,7 +75,7 @@ def filter_records(
records = (
record
for record in records
if (self._substream_cursor or self._date_time_based_cursor).should_be_synced(
if self._cursor.should_be_synced(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is so beautiful that I want to cry ❤️

# Record is created on the fly to align with cursors interface; stream name is ignored as we don't need it here
# Record stream name is empty cause it is not used durig the filtering
Record(data=record, associated_slice=stream_slice, stream_name="")
Expand Down
6 changes: 6 additions & 0 deletions airbyte_cdk/sources/declarative/incremental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import (
ConcurrentCursorFactory,
ConcurrentPerPartitionCursor,
)
from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import (
Expand All @@ -21,6 +25,8 @@

__all__ = [
"CursorFactory",
"ConcurrentCursorFactory",
"ConcurrentPerPartitionCursor",
"DatetimeBasedCursor",
"DeclarativeCursor",
"GlobalSubstreamCursor",
Expand Down
Loading
Loading