Skip to content

Commit

Permalink
Merge branch 'master' into brian/concurrent_declarative_source
Browse files Browse the repository at this point in the history
  • Loading branch information
brianjlai committed Oct 17, 2024
2 parents 9116da6 + 3697303 commit 80186ca
Show file tree
Hide file tree
Showing 192 changed files with 15,997 additions and 1,462 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/airbyte-ci-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ jobs:
uses: ./.github/actions/run-airbyte-ci
with:
context: "pull_request"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_5 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
Expand All @@ -98,7 +98,7 @@ jobs:
uses: ./.github/actions/run-airbyte-ci
with:
context: "manual"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_5 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/cdk_connectors_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ jobs:
uses: ./.github/actions/run-airbyte-ci
with:
context: ${{ github.event_name == 'pull_request' && 'pull_request' || 'manual' }}
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_5 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/community_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ jobs:
uses: ./.github/actions/run-airbyte-ci
with:
context: "pull_request"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_4 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
Expand Down Expand Up @@ -237,7 +237,7 @@ jobs:
uses: ./.github/actions/run-airbyte-ci
with:
context: "pull_request"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_4 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/connectors_nightly_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
with:
context: "master"
ci_job_key: "nightly_builds"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_3 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/connectors_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ jobs:
uses: ./.github/actions/run-airbyte-ci
with:
context: "manual"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_3 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
Expand All @@ -121,7 +121,7 @@ jobs:
uses: ./.github/actions/run-airbyte-ci
with:
context: "pull_request"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_3 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/connectors_up_to_date.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
uses: ./.github/actions/run-airbyte-ci
with:
context: "master"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_3 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/connectors_version_increment_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
uses: ./.github/actions/run-airbyte-ci
with:
context: "pull_request"
# dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }} Commenting this out as we believe Dagger cloud caching is causing excessively long jobs for such a small check
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_2 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/connectors_weekly_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
with:
context: "master"
ci_job_key: "weekly_alpha_test"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_3 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/finalize_rollout.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ on:
options: ["promote", "rollback"]
jobs:
finalize_rollout:
name: Finalize connector rollout
name: Finalize connector rollout for ${{ github.event.inputs.connector_name }}
runs-on: connector-publish-large
env:
ACTION: ${{ github.event_name == 'workflow_dispatch' && github.event.inputs.action || github.event.client_payload.action }}
Expand All @@ -28,13 +28,13 @@ jobs:
shell: bash
- name: Checkout Airbyte
uses: actions/checkout@v4
- name: Promote {{ github.event.inputs.connector_name }} release candidate
- name: Promote ${{ github.event.inputs.connector_name }} release candidate
id: promote-release-candidate
if: ${{ env.ACTION == 'promote' }}
uses: ./.github/actions/run-airbyte-ci
with:
context: "manual"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_2 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
Expand All @@ -45,13 +45,13 @@ jobs:
slack_webhook_url: ${{ secrets.PUBLISH_ON_MERGE_SLACK_WEBHOOK }}
spec_cache_gcs_credentials: ${{ secrets.SPEC_CACHE_SERVICE_ACCOUNT_KEY_PUBLISH }}
subcommand: "connectors --name=${{ github.event.inputs.connector_name }} publish --promote-release-candidate"
- name: Rollback {{ github.event.inputs.connector_name }} release candidate
- name: Rollback ${{ github.event.inputs.connector_name }} release candidate
id: rollback-release-candidate
if: ${{ env.ACTION == 'rollback' }}
uses: ./.github/actions/run-airbyte-ci
with:
context: "manual"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_2 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/format-fix-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ jobs:
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
subcommand: "format fix all"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_2 }}

# This is helpful in the case that we change a previously committed generated file to be ignored by git.
- name: Remove any files that have been gitignored
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/format_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ jobs:
context: "pull_request"
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
subcommand: "format check all"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_2 }}

- name: Run airbyte-ci format check [WORKFLOW DISPATCH]
id: airbyte_ci_format_check_all_manual
Expand All @@ -52,6 +53,7 @@ jobs:
context: "manual"
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
subcommand: "format check all"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_2 }}

- name: Match GitHub User to Slack User [MASTER]
if: github.ref == 'refs/heads/master'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/live_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ jobs:
uses: ./.github/actions/run-airbyte-ci
with:
context: "manual"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_4 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
Expand Down
4 changes: 1 addition & 3 deletions .github/workflows/publish-cdk-command-manually.yml
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,7 @@ jobs:
uses: ./.github/actions/run-airbyte-ci
with:
context: "master" # TODO: figure out why changing this yells with `The ci_gcs_credentials was not set on this PipelineContext.`
# Disable the dagger_cloud_token to disable remote cache access.
# See https://github.com/airbytehq/airbyte-internal-issues/issues/6439#issuecomment-2109503985 for context
#dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_5 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/publish_connectors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
uses: ./.github/actions/run-airbyte-ci
with:
context: "master"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_3 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
Expand All @@ -53,7 +53,7 @@ jobs:
uses: ./.github/actions/run-airbyte-ci
with:
context: "manual"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_3 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/regression_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ jobs:
uses: ./.github/actions/run-airbyte-ci
with:
context: "manual"
#dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_CACHE_4 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ _Screenshot taken from [Airbyte Cloud](https://cloud.airbyte.com/signup)_.
- [Deploy Airbyte Open Source](https://docs.airbyte.com/quickstart/deploy-airbyte) or set up [Airbyte Cloud](https://docs.airbyte.com/cloud/getting-started-with-airbyte-cloud) to start centralizing your data.
- Create connectors in minutes with our [no-code Connector Builder](https://docs.airbyte.com/connector-development/connector-builder-ui/overview) or [low-code CDK](https://docs.airbyte.com/connector-development/config-based/low-code-cdk-overview).
- Explore popular use cases in our [tutorials](https://airbyte.com/tutorials).
- Orchestrate Airbyte syncs with [Airflow](https://docs.airbyte.com/operator-guides/using-the-airflow-airbyte-operator), [Prefect](https://docs.airbyte.com/operator-guides/using-prefect-task), [Dagster](https://docs.airbyte.com/operator-guides/using-dagster-integration), [Kestra](https://docs.airbyte.com/operator-guides/using-kestra-plugin) or the [Airbyte API](https://reference.airbyte.com/reference/start).
- Orchestrate Airbyte syncs with [Airflow](https://docs.airbyte.com/operator-guides/using-the-airflow-airbyte-operator), [Prefect](https://docs.airbyte.com/operator-guides/using-prefect-task), [Dagster](https://docs.airbyte.com/operator-guides/using-dagster-integration), [Kestra](https://docs.airbyte.com/operator-guides/using-kestra-plugin), or the [Airbyte API](https://reference.airbyte.com/reference/start).

Try it out yourself with our [demo app](https://demo.airbyte.io/), visit our [full documentation](https://docs.airbyte.com/) and learn more about [recent announcements](https://airbyte.com/blog-categories/company-updates). See our [registry](https://connectors.airbyte.com/files/generated_reports/connector_registry_report.html) for a full list of connectors already available in Airbyte or Airbyte Cloud.
Try it out yourself with our [demo app](https://demo.airbyte.io/), visit our [full documentation](https://docs.airbyte.com/), and learn more about [recent announcements](https://airbyte.com/blog-categories/company-updates). See our [registry](https://connectors.airbyte.com/files/generated_reports/connector_registry_report.html) for a full list of connectors already available in Airbyte or Airbyte Cloud.

### Join the Airbyte Community

The Airbyte community can be found in the [Airbyte Community Slack](https://airbyte.com/community), where you can ask questions and voice ideas. You can also ask for help in our [Airbyte Forum](https://github.com/airbytehq/airbyte/discussions), or join our [Office Hours](https://airbyte.io/daily-office-hours/). Airbyte's roadmap is publicly viewable on [GitHub](https://github.com/orgs/airbytehq/projects/37/views/1?pane=issue&itemId=26937554).

For videos and blogs on data engineering and building your data stack, check out Airbyte's [Content Hub](https://airbyte.com/content-hub), [Youtube](https://www.youtube.com/c/AirbyteHQ), and sign up for our [newsletter](https://airbyte.com/newsletter).
For videos and blogs on data engineering and building your data stack, check out Airbyte's [Content Hub](https://airbyte.com/content-hub), [YouTube](https://www.youtube.com/c/AirbyteHQ), and sign up for our [newsletter](https://airbyte.com/newsletter).

### Contributing

Expand All @@ -56,7 +56,7 @@ If you've found a problem with Airbyte, please open a [GitHub issue](https://git

Airbyte takes security issues very seriously. **Please do not file GitHub issues or post on our public forum for security vulnerabilities**. Email `[email protected]` if you believe you have uncovered a vulnerability. In the message, try to provide a description of the issue and ideally a way of reproducing it. The security team will get back to you as soon as possible.

[Airbyte Enterprise](https://airbyte.com/airbyte-enterprise) also offers additional security features (among others) on top of Airbyte Open Source.
[Airbyte Enterprise](https://airbyte.com/airbyte-enterprise) also offers additional security features (among others) on top of Airbyte open-source.

### License

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ data class DestinationStream(
it.namespace = namespace
}
}

fun toPrettyString() = "$namespace.$name"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,16 @@ class DefaultSyncManager(
}

override suspend fun markInputConsumed() {
val incompleteStreams =
streamManagers
.filter { (_, manager) -> !manager.endOfStreamRead() }
.map { (stream, _) -> stream }
if (incompleteStreams.isNotEmpty()) {
val prettyStreams = incompleteStreams.map { it.toPrettyString() }
throw IllegalStateException(
"Input was fully read, but some streams did not receive a terminal stream status message. This likely indicates an error in the source or platform. Streams without a status message: $prettyStreams"
)
}
inputConsumed.complete(true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,11 @@ class DefaultFailSyncTask(
private val destinationWriter: DestinationWriter,
private val exception: Exception,
private val syncManager: SyncManager,
private val checkpointManager: CheckpointManager<*, *>
private val checkpointManager: CheckpointManager<*, *>,
private val syncFailedHasRun: AtomicBoolean,
) : FailSyncTask {
private val log = KotlinLogging.logger {}

companion object {
private val syncFailedHasRun = AtomicBoolean(false)
}

override suspend fun execute() {
if (syncFailedHasRun.setOnce()) {
// Ensure any remaining ready state gets captured: don't waste work!
Expand Down Expand Up @@ -63,6 +60,7 @@ class DefaultFailSyncTaskFactory(
private val checkpointManager: CheckpointManager<*, *>,
private val destinationWriter: DestinationWriter
) : FailSyncTaskFactory {
private val syncFailedHasRun = AtomicBoolean(false)

override fun make(
exceptionHandler: DestinationTaskExceptionHandler<*, *>,
Expand All @@ -73,7 +71,8 @@ class DefaultFailSyncTaskFactory(
destinationWriter,
exception,
syncManager,
checkpointManager
checkpointManager,
syncFailedHasRun,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,11 @@ class DefaultTeardownTask(
private val checkpointManager: CheckpointManager<*, *>,
private val syncManager: SyncManager,
private val destination: DestinationWriter,
private val taskLauncher: DestinationTaskLauncher
private val taskLauncher: DestinationTaskLauncher,
private val teardownHasRun: AtomicBoolean,
) : TeardownTask {
val log = KotlinLogging.logger {}

companion object {
private val teardownHasRun = AtomicBoolean(false)
}

override suspend fun execute() {
// Run the task exactly once, and only after all streams have closed.
if (teardownHasRun.setOnce()) {
Expand Down Expand Up @@ -69,8 +66,15 @@ class DefaultTeardownTaskFactory(
private val syncManager: SyncManager,
private val destination: DestinationWriter,
) : TeardownTaskFactory {
private val teardownHasRun = AtomicBoolean(false)

override fun make(taskLauncher: DestinationTaskLauncher): TeardownTask {
return DefaultTeardownTask(checkpointManager, syncManager, destination, taskLauncher)
return DefaultTeardownTask(
checkpointManager,
syncManager,
destination,
taskLauncher,
teardownHasRun
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import io.airbyte.cdk.load.command.MockDestinationCatalogFactory.Companion.strea
import io.airbyte.cdk.load.test.util.CoroutineTestUtils
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import jakarta.inject.Inject
import kotlin.test.assertEquals
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows

@MicronautTest(
rebuildContext = true,
Expand Down Expand Up @@ -125,4 +127,17 @@ class SyncManagerTest {
syncManager.markSucceeded()
Assertions.assertEquals(SyncSuccess, completionChannel.receive())
}

@Test
fun testCrashOnNoEndOfStream() = runTest {
val manager1 = syncManager.getStreamManager(stream1.descriptor)
manager1.markEndOfStream()
// This should fail, because stream2 was not marked with end of stream
val e = assertThrows<IllegalStateException> { syncManager.markInputConsumed() }
assertEquals(
// stream1 is fine, so the message only includes stream2
"Input was fully read, but some streams did not receive a terminal stream status message. This likely indicates an error in the source or platform. Streams without a status message: [test.stream2]",
e.message
)
}
}
Loading

0 comments on commit 80186ca

Please sign in to comment.