From ff31b0b0d63d7c22410f88a3a94611ad0df13e5b Mon Sep 17 00:00:00 2001 From: maxi297 Date: Tue, 3 Dec 2024 22:18:45 -0500 Subject: [PATCH] Streams without SimpleRetriever are not concurrent --- .../concurrent_declarative_source.py | 6 +- .../test_concurrent_declarative_source.py | 87 ++++++++++++++++--- 2 files changed, 79 insertions(+), 14 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 32970233c..85bce965d 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -200,7 +200,11 @@ def _group_streams( # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible, # so we need to treat them as synchronous - if isinstance(declarative_stream, DeclarativeStream): + if ( + isinstance(declarative_stream, DeclarativeStream) + and name_to_stream_mapping[declarative_stream.name].get("retriever")["type"] + == "SimpleRetriever" + ): incremental_sync_component_definition = name_to_stream_mapping[ declarative_stream.name ].get("incremental_sync") diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 932274879..dfaca8ca0 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -276,6 +276,69 @@ }, }, }, + "async_job_stream": { + "$ref": "#/definitions/base_stream", + "$parameters": { + "name": "async_job_stream", + "primary_key": "id", + "url_base": "https://persona.metaverse.com", + }, + "retriever": { + "type": "AsyncRetriever", + "status_mapping": { + "failed": ["failed"], + "running": ["pending"], + "timeout": ["timeout"], + "completed": ["ready"], + }, + "urls_extractor": {"type": "DpathExtractor", "field_path": ["urls"]}, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "status_extractor": {"type": "DpathExtractor", "field_path": ["status"]}, + "polling_requester": { + "type": "HttpRequester", + "path": "/async_job/{{stream_slice['create_job_response'].json()['id'] }}", + "http_method": "GET", + "authenticator": { + "type": "BearerAuthenticator", + "api_token": "{{ config['api_key'] }}", + }, + }, + "creation_requester": { + "type": "HttpRequester", + "path": "async_job", + "http_method": "POST", + "authenticator": { + "type": "BearerAuthenticator", + "api_token": "{{ config['api_key'] }}", + }, + }, + "download_requester": { + "type": "HttpRequester", + "path": "{{stream_slice['url']}}", + "http_method": "GET", + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "https://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "description": "The identifier", + "type": ["null", "string"], + }, + "name": { + "description": "The name of the metaverse palace", + "type": ["null", "string"], + }, + }, + }, + }, + }, "locations_stream": { "$ref": "#/definitions/base_incremental_stream", "retriever": { @@ -463,6 +526,7 @@ "#/definitions/party_members_skills_stream", "#/definitions/arcana_personas_stream", "#/definitions/palace_enemies_stream", + "#/definitions/async_job_stream", ], "check": {"stream_names": ["party_members", "locations"]}, "concurrency_level": { @@ -606,10 +670,12 @@ def test_group_streams(): assert isinstance(concurrent_stream_4, DefaultStream) assert concurrent_stream_4.name == "arcana_personas" - # 1 substream w/ incremental - assert len(synchronous_streams) == 1 + # 1 substream w/ incremental, 1 stream with async retriever + assert len(synchronous_streams) == 2 assert isinstance(synchronous_streams[0], DeclarativeStream) assert synchronous_streams[0].name == "palace_enemies" + assert isinstance(synchronous_streams[1], DeclarativeStream) + assert synchronous_streams[1].name == "async_job_stream" @freezegun.freeze_time(time_to_freeze=datetime(2024, 9, 1, 0, 0, 0, 0, tzinfo=timezone.utc)) @@ -725,14 +791,15 @@ def test_discover(): """ Verifies that the ConcurrentDeclarativeSource discover command returns concurrent and synchronous catalog definitions """ - expected_stream_names = [ + expected_stream_names = { "party_members", "palaces", "locations", "party_members_skills", "arcana_personas", "palace_enemies", - ] + "async_job_stream", + } source = ConcurrentDeclarativeSource( source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None @@ -740,13 +807,7 @@ def test_discover(): actual_catalog = source.discover(logger=source.logger, config=_CONFIG) - assert len(actual_catalog.streams) == 6 - assert actual_catalog.streams[0].name in expected_stream_names - assert actual_catalog.streams[1].name in expected_stream_names - assert actual_catalog.streams[2].name in expected_stream_names - assert actual_catalog.streams[3].name in expected_stream_names - assert actual_catalog.streams[4].name in expected_stream_names - assert actual_catalog.streams[5].name in expected_stream_names + assert set(map(lambda stream: stream.name, actual_catalog.streams)) == expected_stream_names def _mock_requests( @@ -1371,8 +1432,8 @@ def test_streams_with_stream_state_interpolation_should_be_synchronous(): # 1 full refresh stream, 2 with parent stream without incremental dependency assert len(source._concurrent_streams) == 3 - # 1 incremental with parent stream (palace_enemies), 2 incremental stream with interpolation on state (locations and party_members) - assert len(source._synchronous_streams) == 3 + # 2 incremental stream with interpolation on state (locations and party_members), 1 incremental with parent stream (palace_enemies), 1 stream with async retriever + assert len(source._synchronous_streams) == 4 def test_given_partition_routing_and_incremental_sync_then_stream_is_not_concurrent():