Skip to content

Commit

Permalink
code review
Browse files Browse the repository at this point in the history
  • Loading branch information
maxi297 committed Nov 28, 2024
1 parent a48323f commit 3acbd4c
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ def _group_streams(
declarative_stream.name
].get("incremental_sync")

is_without_partition_router_nor_cursor = not bool(
is_without_partition_router_or_cursor = not bool(
datetime_based_cursor_component_definition
) and not (
name_to_stream_mapping[declarative_stream.name]
Expand All @@ -207,7 +207,7 @@ def _group_streams(
)
)
if (
is_without_partition_router_nor_cursor
is_without_partition_router_or_cursor
or is_datetime_incremental_without_partition_routing
):
stream_state = state_manager.get_stream_state(
Expand Down Expand Up @@ -256,6 +256,9 @@ def _group_streams(
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
cursor_field=cursor.cursor_field.cursor_field_key
if hasattr(cursor, "cursor_field")
and hasattr(
cursor.cursor_field, "cursor_field_key"
) # FIXME this will need to be updated once we do the per partition
else None,
logger=self.logger,
cursor=cursor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ def test_group_streams():
concurrent_streams = source._concurrent_streams
synchronous_streams = source._synchronous_streams

# 2 incremental streams
# 1 substream, 2 incremental streams
assert len(concurrent_streams) == 3
concurrent_stream_0, concurrent_stream_1, concurrent_stream_2 = concurrent_streams
assert isinstance(concurrent_stream_0, DefaultStream)
Expand All @@ -488,7 +488,7 @@ def test_group_streams():
assert isinstance(concurrent_stream_2, DefaultStream)
assert concurrent_stream_2.name == "locations"

# 1 full refresh stream, 1 substream
# 1 full refresh stream
assert len(synchronous_streams) == 1
assert isinstance(synchronous_streams[0], DeclarativeStream)
assert synchronous_streams[0].name == "party_members_skills"
Expand Down Expand Up @@ -1274,7 +1274,9 @@ def test_streams_with_stream_state_interpolation_should_be_synchronous():
state=None,
)

# 1 stream with parent stream
assert len(source._concurrent_streams) == 1
# 1 full refresh stream, 2 incremental stream with interpolation on state
assert len(source._synchronous_streams) == 3


Expand Down

0 comments on commit 3acbd4c

Please sign in to comment.