From 97419d04ea2dcfaf3154f33e748fab9b3dd36f70 Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk <35109939+tolik0@users.noreply.github.com> Date: Fri, 24 Jan 2025 01:41:14 +0200 Subject: [PATCH 1/2] fix(low-code): Fix legacy state migration in SubstreamPartitionRouter (#261) --- .../substream_partition_router.py | 8 +- .../test_substream_partition_router.py | 109 ++++++++++++++++++ 2 files changed, 115 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py index 1c7bb6961..ae558c634 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py +++ b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py @@ -296,8 +296,12 @@ def set_initial_state(self, stream_state: StreamState) -> None: if not parent_state and incremental_dependency: # Attempt to retrieve child state - substream_state = list(stream_state.values()) - substream_state = substream_state[0] if substream_state else {} # type: ignore [assignment] # Incorrect type for assignment + substream_state_values = list(stream_state.values()) + substream_state = substream_state_values[0] if substream_state_values else {} + # Filter out per partition state. Because we pass the state to the parent stream in the format {cursor_field: substream_state} + if isinstance(substream_state, (list, dict)): + substream_state = {} + parent_state = {} # Copy child state to parent streams with incremental dependencies diff --git a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py index 5814805e2..52306d348 100644 --- a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py +++ b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py @@ -402,6 +402,115 @@ def test_substream_partition_router_invalid_parent_record_type(): _ = [s for s in partition_router.stream_slices()] +@pytest.mark.parametrize( + "initial_state, expected_parent_state", + [ + # Case 1: Empty initial state, no parent state expected + ({}, {}), + # Case 2: Initial state with no `parent_state`, migrate `updated_at` to `parent_stream_cursor` + ( + {"updated_at": "2023-05-27T00:00:00Z"}, + {"parent_stream_cursor": "2023-05-27T00:00:00Z"}, + ), + # Case 3: Initial state with global `state`, no migration expected + ( + {"state": {"updated": "2023-05-27T00:00:00Z"}}, + {}, + ), + # Case 4: Initial state with per-partition `states`, no migration expected + ( + { + "states": [ + { + "partition": { + "issue_id": "10012", + "parent_slice": { + "parent_slice": {}, + "project_id": "10000", + }, + }, + "cursor": {"updated": "2021-01-01T00:00:00+0000"}, + }, + { + "partition": { + "issue_id": "10019", + "parent_slice": { + "parent_slice": {}, + "project_id": "10000", + }, + }, + "cursor": {"updated": "2021-01-01T00:00:00+0000"}, + }, + { + "partition": { + "issue_id": "10000", + "parent_slice": { + "parent_slice": {}, + "project_id": "10000", + }, + }, + "cursor": {"updated": "2021-01-01T00:00:00+0000"}, + }, + ] + }, + {}, + ), + # Case 5: Initial state with `parent_state`, existing parent state persists + ( + { + "parent_state": { + "parent_stream_name1": {"parent_stream_cursor": "2023-05-27T00:00:00Z"}, + }, + }, + {"parent_stream_cursor": "2023-05-27T00:00:00Z"}, + ), + ], + ids=[ + "empty_initial_state", + "initial_state_no_parent_legacy_state", + "initial_state_no_parent_global_state", + "initial_state_no_parent_per_partition_state", + "initial_state_with_parent_state", + ], +) +def test_set_initial_state(initial_state, expected_parent_state): + """ + Test the `set_initial_state` method of SubstreamPartitionRouter. + + This test verifies that the method correctly handles different initial state formats + and sets the appropriate parent stream state. + """ + parent_stream = MockStream( + slices=[{}], + records=[], + name="parent_stream_name1", + cursor_field="parent_stream_cursor", + ) + parent_stream.state = {} + parent_stream_config = ParentStreamConfig( + stream=parent_stream, + parent_key="id", + partition_field="parent_stream_id", + parameters={}, + config={}, + incremental_dependency=True, + ) + + partition_router = SubstreamPartitionRouter( + parent_stream_configs=[parent_stream_config], + parameters={}, + config={}, + ) + + partition_router.set_initial_state(initial_state) + + # Assert the state of the parent stream + assert parent_stream.state == expected_parent_state, ( + f"Unexpected parent state. Initial state: {initial_state}, " + f"Expected: {expected_parent_state}, Got: {parent_stream.state}" + ) + + @pytest.mark.parametrize( "parent_stream_request_parameters, expected_req_params, expected_headers, expected_body_json, expected_body_data", [ From 5da0a7f91df8a7237f790caee6ff773d5c40837a Mon Sep 17 00:00:00 2001 From: "devin-ai-integration[bot]" <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 24 Jan 2025 01:47:27 +0000 Subject: [PATCH 2/2] ci: post direct links to html connector test reports (#252) (#263) Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Aaron Steers --- .github/workflows/connector-tests.yml | 55 ++++++++++++++++++++++++++- 1 file changed, 53 insertions(+), 2 deletions(-) diff --git a/.github/workflows/connector-tests.yml b/.github/workflows/connector-tests.yml index b4e02fecb..4f6cedee0 100644 --- a/.github/workflows/connector-tests.yml +++ b/.github/workflows/connector-tests.yml @@ -25,7 +25,7 @@ concurrency: jobs: cdk_changes: name: Get Changes - runs-on: ubuntu-24.04 + runs-on: ubuntu-22.04 permissions: statuses: write pull-requests: read @@ -62,7 +62,7 @@ jobs: # Forked PRs are handled by the community_ci.yml workflow # If the condition is not met the job will be skipped (it will not fail) # runs-on: connector-test-large - runs-on: ubuntu-24.04 + runs-on: ubuntu-22.04 timeout-minutes: 360 # 6 hours strategy: fail-fast: false @@ -96,6 +96,8 @@ jobs: name: "Check: '${{matrix.connector}}' (skip=${{needs.cdk_changes.outputs['src'] == 'false' || needs.cdk_changes.outputs[matrix.cdk_extra] == 'false'}})" permissions: checks: write + contents: write # Required for creating commit statuses + pull-requests: read steps: - name: Abort if extra not changed (${{matrix.cdk_extra}}) id: no_changes @@ -127,6 +129,22 @@ jobs: uses: actions/setup-python@v5 with: python-version: "3.10" + # Create initial pending status for test report + - name: Create Pending Test Report Status + if: steps.no_changes.outputs.status != 'cancelled' + env: + GH_TOKEN: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }} + run: | + HEAD_SHA="${{ github.event.pull_request.head.sha || github.sha }}" + gh api \ + --method POST \ + -H "Accept: application/vnd.github+json" \ + -H "X-GitHub-Api-Version: 2022-11-28" \ + repos/${{ github.repository }}/statuses/$HEAD_SHA \ + -f state="pending" \ + -f description="Running connector tests..." \ + -f context="${{ matrix.connector }} Test Report" + - name: Test Connector if: steps.no_changes.outputs.status != 'cancelled' timeout-minutes: 90 @@ -173,6 +191,39 @@ jobs: echo "success=${success}" >> $GITHUB_OUTPUT echo "html_report_url=${html_report_url}" >> $GITHUB_OUTPUT + # Update the test report status with results + - name: Update Test Report Status + if: always() && steps.no_changes.outputs.status != 'cancelled' && steps.evaluate_output.outcome == 'success' + env: + GH_TOKEN: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }} + run: | + HEAD_SHA="${{ github.event.pull_request.head.sha || github.sha }}" + gh api \ + --method POST \ + -H "Accept: application/vnd.github+json" \ + -H "X-GitHub-Api-Version: 2022-11-28" \ + repos/${{ github.repository }}/statuses/$HEAD_SHA \ + -f state="${{ steps.evaluate_output.outputs.success == 'true' && 'success' || 'failure' }}" \ + -f target_url="${{ steps.evaluate_output.outputs.html_report_url }}" \ + -f description="Click Details to view the test report" \ + -f context="${{ matrix.connector }} Test Report" + + # Create failure status if report generation failed + - name: Create Report Generation Failed Status + if: always() && steps.no_changes.outputs.status != 'cancelled' && steps.evaluate_output.outcome != 'success' + env: + GH_TOKEN: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }} + run: | + HEAD_SHA="${{ github.event.pull_request.head.sha || github.sha }}" + gh api \ + --method POST \ + -H "Accept: application/vnd.github+json" \ + -H "X-GitHub-Api-Version: 2022-11-28" \ + repos/${{ github.repository }}/statuses/$HEAD_SHA \ + -f state="failure" \ + -f description="Failed to run connector tests." \ + -f context="${{ matrix.connector }} Test Report" + # Upload the job output to the artifacts - name: Upload Job Output id: upload_job_output