-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DV2 TyperDeduper: Extract migrations to separate method #35376
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Ignored Deployment
|
796b41f
to
98802c6
Compare
Current dependencies on/for this PR:
This stack of pull requests is managed by Graphite. |
e596cd7
to
d0fbcd8
Compare
fb3ce67
to
52ef29f
Compare
e282bf3
to
71904f4
Compare
829841d
to
dcb79ff
Compare
if (overwriteStreamsWithTmpTable != null) { | ||
throw new IllegalStateException("Tables were already prepared."); | ||
} | ||
overwriteStreamsWithTmpTable = ConcurrentHashMap.newKeySet(); | ||
LOGGER.info("Preparing tables"); | ||
|
||
// This is intentionally not done in parallel to avoid rate limits in some destinations. | ||
prepareSchemas(parsedCatalog); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this call now happens in prepareSchemasAndRawTables
// This is intentionally not done in parallel to avoid rate limits in some destinations. | ||
prepareSchemas(parsedCatalog); | ||
|
||
// TODO: Either the migrations run the soft reset and create v2 tables or the actual prepare tables. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this logic now happens in TyperDeduperUtil.executeRawTableMigrations
import java.util.concurrent.ExecutorService | ||
|
||
|
||
class TyperDeduperUtil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this class is partly taken from @jbfbell 's https://github.com/airbytehq/airbyte/pull/34637/files#diff-c9e7bf1074e965f63b135422dd2a7ac6ba196046b226a0ffe3a855a4eb82ed3b
it'll have some merge conflicts b/c I reordered the arguments in prepareAllSchemas
but 🤷 just wanted to be consistent across the methods, and I generally prefer infra-ish params (executorservice, destinationhandler, etc.) to come before functional-ish params (parsedcatalog, streamconfig, etc.)
dcb79ff
to
ff34c69
Compare
// with current state of raw tables & final tables. This is done first before gather initial state | ||
// to avoid recreating | ||
// final tables later again. | ||
val runMigrationsResult = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as noted above - this code originally lived in DefaultTyperDeduper, I just extracted it to a separate class.
@@ -161,12 +165,14 @@ void existingEmptyTableMatchingSchema() throws Exception { | |||
initialStates.forEach(initialState -> { | |||
when(initialState.isFinalTablePresent()).thenReturn(true); | |||
when(initialState.isFinalTableEmpty()).thenReturn(true); | |||
when(initialState.isSchemaMismatch()).thenReturn(true); | |||
when(initialState.isSchemaMismatch()).thenReturn(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test previously didn't actually assert anything, and was secretly broken. This is the correct version.
(specifically: if there are no final tables, isSchemaMismatch is supposed to return false)
ff34c69
to
d2c07e5
Compare
d84938e
to
71d4578
Compare
d2c07e5
to
d307cb7
Compare
2aab345
to
16843df
Compare
06ac33d
to
90b10f3
Compare
90b10f3
to
d6199c6
Compare
d6199c6
to
f0a9727
Compare
f0a9727
to
c7e2466
Compare
c7e2466
to
d6199c6
Compare
Signed-off-by: Gireesh Sreepathi <[email protected]>
Signed-off-by: Gireesh Sreepathi <[email protected]>
/publish-java-cdk
|
Based on top of #35342
Coincidentally, closes https://github.com/airbytehq/airbyte-internal-issues/issues/2469. After this PR, we'll trigger soft resets after raw table creation.
Extract some common code from the DefaultTyperDeduper and NoopTyperDeduperWithMigrations classes. This is just a refactor to simplify some future work.
It also fixes some tests in DefaultTyperDeduperTest. Previously some tests weren't actually doing anything, because they called clearInvocations() followed immediately by verifyNoMoreInteractions(), which is effectively just assertTrue(true).
The CDK build.gradle changes are just copied out of #34637. Once that's merged, there should be no diff with master.