Skip to content

Commit

Permalink
Streams without SimpleRetriever are not concurrent
Browse files Browse the repository at this point in the history
  • Loading branch information
maxi297 committed Dec 4, 2024
1 parent 6ca33fd commit ff31b0b
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,11 @@ def _group_streams(
# Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
# so we need to treat them as synchronous
if isinstance(declarative_stream, DeclarativeStream):
if (
isinstance(declarative_stream, DeclarativeStream)
and name_to_stream_mapping[declarative_stream.name].get("retriever")["type"]
== "SimpleRetriever"
):
incremental_sync_component_definition = name_to_stream_mapping[
declarative_stream.name
].get("incremental_sync")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,69 @@
},
},
},
"async_job_stream": {
"$ref": "#/definitions/base_stream",
"$parameters": {
"name": "async_job_stream",
"primary_key": "id",
"url_base": "https://persona.metaverse.com",
},
"retriever": {
"type": "AsyncRetriever",
"status_mapping": {
"failed": ["failed"],
"running": ["pending"],
"timeout": ["timeout"],
"completed": ["ready"],
},
"urls_extractor": {"type": "DpathExtractor", "field_path": ["urls"]},
"record_selector": {
"type": "RecordSelector",
"extractor": {"type": "DpathExtractor", "field_path": []},
},
"status_extractor": {"type": "DpathExtractor", "field_path": ["status"]},
"polling_requester": {
"type": "HttpRequester",
"path": "/async_job/{{stream_slice['create_job_response'].json()['id'] }}",
"http_method": "GET",
"authenticator": {
"type": "BearerAuthenticator",
"api_token": "{{ config['api_key'] }}",
},
},
"creation_requester": {
"type": "HttpRequester",
"path": "async_job",
"http_method": "POST",
"authenticator": {
"type": "BearerAuthenticator",
"api_token": "{{ config['api_key'] }}",
},
},
"download_requester": {
"type": "HttpRequester",
"path": "{{stream_slice['url']}}",
"http_method": "GET",
},
},
"schema_loader": {
"type": "InlineSchemaLoader",
"schema": {
"$schema": "https://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"description": "The identifier",
"type": ["null", "string"],
},
"name": {
"description": "The name of the metaverse palace",
"type": ["null", "string"],
},
},
},
},
},
"locations_stream": {
"$ref": "#/definitions/base_incremental_stream",
"retriever": {
Expand Down Expand Up @@ -463,6 +526,7 @@
"#/definitions/party_members_skills_stream",
"#/definitions/arcana_personas_stream",
"#/definitions/palace_enemies_stream",
"#/definitions/async_job_stream",
],
"check": {"stream_names": ["party_members", "locations"]},
"concurrency_level": {
Expand Down Expand Up @@ -606,10 +670,12 @@ def test_group_streams():
assert isinstance(concurrent_stream_4, DefaultStream)
assert concurrent_stream_4.name == "arcana_personas"

# 1 substream w/ incremental
assert len(synchronous_streams) == 1
# 1 substream w/ incremental, 1 stream with async retriever
assert len(synchronous_streams) == 2
assert isinstance(synchronous_streams[0], DeclarativeStream)
assert synchronous_streams[0].name == "palace_enemies"
assert isinstance(synchronous_streams[1], DeclarativeStream)
assert synchronous_streams[1].name == "async_job_stream"


@freezegun.freeze_time(time_to_freeze=datetime(2024, 9, 1, 0, 0, 0, 0, tzinfo=timezone.utc))
Expand Down Expand Up @@ -725,28 +791,23 @@ def test_discover():
"""
Verifies that the ConcurrentDeclarativeSource discover command returns concurrent and synchronous catalog definitions
"""
expected_stream_names = [
expected_stream_names = {
"party_members",
"palaces",
"locations",
"party_members_skills",
"arcana_personas",
"palace_enemies",
]
"async_job_stream",
}

source = ConcurrentDeclarativeSource(
source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None
)

actual_catalog = source.discover(logger=source.logger, config=_CONFIG)

assert len(actual_catalog.streams) == 6
assert actual_catalog.streams[0].name in expected_stream_names
assert actual_catalog.streams[1].name in expected_stream_names
assert actual_catalog.streams[2].name in expected_stream_names
assert actual_catalog.streams[3].name in expected_stream_names
assert actual_catalog.streams[4].name in expected_stream_names
assert actual_catalog.streams[5].name in expected_stream_names
assert set(map(lambda stream: stream.name, actual_catalog.streams)) == expected_stream_names


def _mock_requests(
Expand Down Expand Up @@ -1371,8 +1432,8 @@ def test_streams_with_stream_state_interpolation_should_be_synchronous():

# 1 full refresh stream, 2 with parent stream without incremental dependency
assert len(source._concurrent_streams) == 3
# 1 incremental with parent stream (palace_enemies), 2 incremental stream with interpolation on state (locations and party_members)
assert len(source._synchronous_streams) == 3
# 2 incremental stream with interpolation on state (locations and party_members), 1 incremental with parent stream (palace_enemies), 1 stream with async retriever
assert len(source._synchronous_streams) == 4


def test_given_partition_routing_and_incremental_sync_then_stream_is_not_concurrent():
Expand Down

0 comments on commit ff31b0b

Please sign in to comment.