Skip to content

Commit

Permalink
Merge branch 'main' into maxi297/asyncretriever-with-concurrent-cursor
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers authored Jan 24, 2025
2 parents 03d1958 + 5da0a7f commit 49690c7
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 4 deletions.
55 changes: 53 additions & 2 deletions .github/workflows/connector-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ concurrency:
jobs:
cdk_changes:
name: Get Changes
runs-on: ubuntu-24.04
runs-on: ubuntu-22.04
permissions:
statuses: write
pull-requests: read
Expand Down Expand Up @@ -62,7 +62,7 @@ jobs:
# Forked PRs are handled by the community_ci.yml workflow
# If the condition is not met the job will be skipped (it will not fail)
# runs-on: connector-test-large
runs-on: ubuntu-24.04
runs-on: ubuntu-22.04
timeout-minutes: 360 # 6 hours
strategy:
fail-fast: false
Expand Down Expand Up @@ -96,6 +96,8 @@ jobs:
name: "Check: '${{matrix.connector}}' (skip=${{needs.cdk_changes.outputs['src'] == 'false' || needs.cdk_changes.outputs[matrix.cdk_extra] == 'false'}})"
permissions:
checks: write
contents: write # Required for creating commit statuses
pull-requests: read
steps:
- name: Abort if extra not changed (${{matrix.cdk_extra}})
id: no_changes
Expand Down Expand Up @@ -127,6 +129,22 @@ jobs:
uses: actions/setup-python@v5
with:
python-version: "3.10"
# Create initial pending status for test report
- name: Create Pending Test Report Status
if: steps.no_changes.outputs.status != 'cancelled'
env:
GH_TOKEN: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
run: |
HEAD_SHA="${{ github.event.pull_request.head.sha || github.sha }}"
gh api \
--method POST \
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" \
repos/${{ github.repository }}/statuses/$HEAD_SHA \
-f state="pending" \
-f description="Running connector tests..." \
-f context="${{ matrix.connector }} Test Report"
- name: Test Connector
if: steps.no_changes.outputs.status != 'cancelled'
timeout-minutes: 90
Expand Down Expand Up @@ -173,6 +191,39 @@ jobs:
echo "success=${success}" >> $GITHUB_OUTPUT
echo "html_report_url=${html_report_url}" >> $GITHUB_OUTPUT
# Update the test report status with results
- name: Update Test Report Status
if: always() && steps.no_changes.outputs.status != 'cancelled' && steps.evaluate_output.outcome == 'success'
env:
GH_TOKEN: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
run: |
HEAD_SHA="${{ github.event.pull_request.head.sha || github.sha }}"
gh api \
--method POST \
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" \
repos/${{ github.repository }}/statuses/$HEAD_SHA \
-f state="${{ steps.evaluate_output.outputs.success == 'true' && 'success' || 'failure' }}" \
-f target_url="${{ steps.evaluate_output.outputs.html_report_url }}" \
-f description="Click Details to view the test report" \
-f context="${{ matrix.connector }} Test Report"
# Create failure status if report generation failed
- name: Create Report Generation Failed Status
if: always() && steps.no_changes.outputs.status != 'cancelled' && steps.evaluate_output.outcome != 'success'
env:
GH_TOKEN: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
run: |
HEAD_SHA="${{ github.event.pull_request.head.sha || github.sha }}"
gh api \
--method POST \
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" \
repos/${{ github.repository }}/statuses/$HEAD_SHA \
-f state="failure" \
-f description="Failed to run connector tests." \
-f context="${{ matrix.connector }} Test Report"
# Upload the job output to the artifacts
- name: Upload Job Output
id: upload_job_output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,12 @@ def set_initial_state(self, stream_state: StreamState) -> None:

if not parent_state and incremental_dependency:
# Attempt to retrieve child state
substream_state = list(stream_state.values())
substream_state = substream_state[0] if substream_state else {} # type: ignore [assignment] # Incorrect type for assignment
substream_state_values = list(stream_state.values())
substream_state = substream_state_values[0] if substream_state_values else {}
# Filter out per partition state. Because we pass the state to the parent stream in the format {cursor_field: substream_state}
if isinstance(substream_state, (list, dict)):
substream_state = {}

parent_state = {}

# Copy child state to parent streams with incremental dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,115 @@ def test_substream_partition_router_invalid_parent_record_type():
_ = [s for s in partition_router.stream_slices()]


@pytest.mark.parametrize(
"initial_state, expected_parent_state",
[
# Case 1: Empty initial state, no parent state expected
({}, {}),
# Case 2: Initial state with no `parent_state`, migrate `updated_at` to `parent_stream_cursor`
(
{"updated_at": "2023-05-27T00:00:00Z"},
{"parent_stream_cursor": "2023-05-27T00:00:00Z"},
),
# Case 3: Initial state with global `state`, no migration expected
(
{"state": {"updated": "2023-05-27T00:00:00Z"}},
{},
),
# Case 4: Initial state with per-partition `states`, no migration expected
(
{
"states": [
{
"partition": {
"issue_id": "10012",
"parent_slice": {
"parent_slice": {},
"project_id": "10000",
},
},
"cursor": {"updated": "2021-01-01T00:00:00+0000"},
},
{
"partition": {
"issue_id": "10019",
"parent_slice": {
"parent_slice": {},
"project_id": "10000",
},
},
"cursor": {"updated": "2021-01-01T00:00:00+0000"},
},
{
"partition": {
"issue_id": "10000",
"parent_slice": {
"parent_slice": {},
"project_id": "10000",
},
},
"cursor": {"updated": "2021-01-01T00:00:00+0000"},
},
]
},
{},
),
# Case 5: Initial state with `parent_state`, existing parent state persists
(
{
"parent_state": {
"parent_stream_name1": {"parent_stream_cursor": "2023-05-27T00:00:00Z"},
},
},
{"parent_stream_cursor": "2023-05-27T00:00:00Z"},
),
],
ids=[
"empty_initial_state",
"initial_state_no_parent_legacy_state",
"initial_state_no_parent_global_state",
"initial_state_no_parent_per_partition_state",
"initial_state_with_parent_state",
],
)
def test_set_initial_state(initial_state, expected_parent_state):
"""
Test the `set_initial_state` method of SubstreamPartitionRouter.
This test verifies that the method correctly handles different initial state formats
and sets the appropriate parent stream state.
"""
parent_stream = MockStream(
slices=[{}],
records=[],
name="parent_stream_name1",
cursor_field="parent_stream_cursor",
)
parent_stream.state = {}
parent_stream_config = ParentStreamConfig(
stream=parent_stream,
parent_key="id",
partition_field="parent_stream_id",
parameters={},
config={},
incremental_dependency=True,
)

partition_router = SubstreamPartitionRouter(
parent_stream_configs=[parent_stream_config],
parameters={},
config={},
)

partition_router.set_initial_state(initial_state)

# Assert the state of the parent stream
assert parent_stream.state == expected_parent_state, (
f"Unexpected parent state. Initial state: {initial_state}, "
f"Expected: {expected_parent_state}, Got: {parent_stream.state}"
)


@pytest.mark.parametrize(
"parent_stream_request_parameters, expected_req_params, expected_headers, expected_body_json, expected_body_data",
[
Expand Down

0 comments on commit 49690c7

Please sign in to comment.