Skip to content

Commit

Permalink
bulk-cdk-core-extract: fix mistaken assumption about non-global strea…
Browse files Browse the repository at this point in the history
…ms (#48466)
  • Loading branch information
postamar authored Nov 12, 2024
1 parent 31d3146 commit 4ae0ce6
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,15 @@ class StateManager(
.mapKeys { it.key.id }
} else {
val globalStreams: Map<Stream, OpaqueStateValue?> =
global.streams.associateWith { initialStreamStates[it] }
global.streams.associateWith { initialStreamStates[it] } +
initialStreamStates.filterKeys { global.streams.contains(it).not() }
this.global =
GlobalStateManager(
global = global,
initialGlobalState = initialGlobalState,
initialStreamStates = globalStreams,
)
nonGlobal =
initialStreamStates
.filterKeys { !globalStreams.containsKey(it) }
.mapValues { NonGlobalStreamStateManager(it.key, it.value) }
.mapKeys { it.key.id }
nonGlobal = emptyMap()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,11 @@ class StateManagerGlobalStatesTest {
|"global":{"shared_state":{"cdc":"starting"},
|"stream_states":[
|{"stream_descriptor":{"name":"KV","namespace":"PUBLIC"},
|"stream_state":{"initial_sync":"ongoing"}}
|"stream_state":{"initial_sync":"ongoing"}},
|{"stream_descriptor":{"name":"EVENTS","namespace":"PUBLIC"},
|"stream_state":{"full_refresh":"ongoing"}}
|]},
|"sourceStats":{"recordCount":123.0}
|}
""".trimMargin(),
"""{
|"type":"STREAM",
|"stream":{"stream_descriptor":{"name":"EVENTS","namespace":"PUBLIC"},
|"stream_state":{"full_refresh":"ongoing"}},
|"sourceStats":{"recordCount":456.0}
|"sourceStats":{"recordCount":579.0}
|}
""".trimMargin(),
)
Expand Down Expand Up @@ -124,7 +119,9 @@ class StateManagerGlobalStatesTest {
|"global":{"shared_state":{"cdc":"starting"},
|"stream_states":[
|{"stream_descriptor":{"name":"KV","namespace":"PUBLIC"},
|"stream_state":{"initial_sync":"ongoing"}}
|"stream_state":{"initial_sync":"ongoing"}},
|{"stream_descriptor":{"name":"EVENTS","namespace":"PUBLIC"},
|"stream_state":{}}
|]},"sourceStats":{"recordCount":123.0}
|}
""".trimMargin(),
Expand All @@ -147,7 +144,9 @@ class StateManagerGlobalStatesTest {
|"global":{"shared_state":{"cdc":"starting"},
|"stream_states":[
|{"stream_descriptor":{"name":"KV","namespace":"PUBLIC"},
|"stream_state":{"initial_sync":"completed"}}
|"stream_state":{"initial_sync":"completed"}},
|{"stream_descriptor":{"name":"EVENTS","namespace":"PUBLIC"},
|"stream_state":{}}
|]},"sourceStats":{"recordCount":1245.0}
|}
""".trimMargin(),
Expand Down Expand Up @@ -197,7 +196,9 @@ class StateManagerGlobalStatesTest {
|"global":{"shared_state":{"cdc":"starting"},
|"stream_states":[
|{"stream_descriptor":{"name":"KV","namespace":"PUBLIC"},
|"stream_state":{"initial_sync":"completed"}}
|"stream_state":{"initial_sync":"completed"}},
|{"stream_descriptor":{"name":"EVENTS","namespace":"PUBLIC"},
|"stream_state":{}}
|]},"sourceStats":{"recordCount":789.0}
|}
""".trimMargin(),
Expand Down Expand Up @@ -245,7 +246,9 @@ class StateManagerGlobalStatesTest {
|"global":{"shared_state":{"cdc":"ongoing"},
|"stream_states":[
|{"stream_descriptor":{"name":"KV","namespace":"PUBLIC"},
|"stream_state":{"initial_sync":"completed"}}
|"stream_state":{"initial_sync":"completed"}},
|{"stream_descriptor":{"name":"EVENTS","namespace":"PUBLIC"},
|"stream_state":{}}
|]},
|"sourceStats":{"recordCount":741.0}
|}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.9.0-rc.5
dockerImageTag: 3.9.0-rc.6
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import io.github.oshai.kotlinlogging.KotlinLogging
import java.net.MalformedURLException
import java.net.URI
import java.nio.file.FileSystems
import java.util.*
import java.util.UUID

private val log = KotlinLogging.logger {}

Expand Down

0 comments on commit 4ae0ce6

Please sign in to comment.