Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sample query compatible with views #50437

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.10.0-rc.4
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wondered why did we use rc tag in the previous version.. we could just use the minor version ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marius was running a progressive rollout right before the break for a datatype conversion fixes (49918).
I'll make sure this isn't conflicting with the rollout before merging my change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is why

dockerImageTag: 3.11.0
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down Expand Up @@ -70,5 +70,5 @@ data:
message: Add default cursor for cdc
upgradeDeadline: "2023-08-17"
rolloutConfiguration:
enableProgressiveRollout: true
enableProgressiveRollout: false
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,16 @@ class MySqlSourceOperations :
// chance of a row getting picked. This comes at a price of bias to the beginning
// of table on very large tables ( > 100s million of rows)
val greatestRate: String = 0.00005.toString()
// We only do a full count in case information schema contains no row count.
// This is the case for views.
val fullCount = "SELECT COUNT(*) FROM `$namespace`.`$name`"
// Quick approximation to "select count(*) from table" which doesn't require
// full table scan. However, note this could give delayed summary info about a table
// and thus a new table could be treated as empty despite we recently added rows.
// To prevent that from happening and resulted for skipping the table altogether,
// the minimum count is set to 10.
val quickCount =
"SELECT GREATEST(10, table_rows) FROM information_schema.tables WHERE table_schema = '$namespace' AND table_name = '$name'"
"SELECT GREATEST(10, COALESCE(table_rows, ($fullCount))) FROM information_schema.tables WHERE table_schema = '$namespace' AND table_name = '$name'"
val greatest = "GREATEST($greatestRate, $sampleSize / ($quickCount))"
// Rand returns a value between 0 and 1
val where = "WHERE RAND() < $greatest "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,24 @@ class MySqlSourceCursorBasedIntegrationTest {
assertTrue(run2.records().isEmpty())
}

@Test
fun testCursorBasedViewRead() {
provisionView(connectionFactory)
val catalog = getConfiguredCatalog()
catalog.streams[0].stream.name = viewName
val run1: BufferingOutputConsumer = CliRunner.source("read", config, catalog).run()
val lastStateMessageFromRun1 = run1.states().last()
val lastStreamStateFromRun1 = lastStateMessageFromRun1.stream.streamState

assertEquals("20", lastStreamStateFromRun1.get("cursor").textValue())
assertEquals(2, lastStreamStateFromRun1.get("version").intValue())
assertEquals("cursor_based", lastStreamStateFromRun1.get("state_type").asText())
assertEquals(viewName, lastStreamStateFromRun1.get("stream_name").asText())
assertEquals(listOf("k"), lastStreamStateFromRun1.get("cursor_field").map { it.asText() })
assertEquals("test", lastStreamStateFromRun1.get("stream_namespace").asText())
assertEquals(0, lastStreamStateFromRun1.get("cursor_record_count").asInt())
}

companion object {
val log = KotlinLogging.logger {}
val dbContainer: MySQLContainer<*> = MySqlContainerFactory.shared(imageName = "mysql:8.0")
Expand Down Expand Up @@ -206,6 +224,17 @@ class MySqlSourceCursorBasedIntegrationTest {
}
}
}

lateinit var viewName: String
fun provisionView(targetConnectionFactory: JdbcConnectionFactory) {
viewName = "$tableName-view"
targetConnectionFactory.get().use { connection: Connection ->
connection.isReadOnly = false
connection.createStatement().use { stmt: Statement ->
stmt.execute("CREATE VIEW test.`$viewName` AS SELECT * FROM test.`$tableName`")
}
}
}
}
val V1_STATE: String =
"""
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ Any database or table encoding combination of charset and collation is supported

| Version | Date | Pull Request | Subject |
|:------------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.10.0-rc.4 | 2024-12-23 | [48587](https://github.com/airbytehq/airbyte/pull/48587) | Fix minor state counting mechanism. |
| 3.11.0 | 2024-12-27 | [50437](https://github.com/airbytehq/airbyte/pull/50437) | Compatibility with MySQL Views. |
| 3.10.0-rc.4 | 2024-12-23 | [48587](https://github.com/airbytehq/airbyte/pull/48587) | Fix minor state counting mechanism. |
| 3.10.0-rc.3 | 2024-12-20 | [49918](https://github.com/airbytehq/airbyte/pull/49918) | Fix minor datatype handling and conversion bugs, maintain big number precision. |
| 3.10.0-rc.2 | 2024-12-20 | [49950](https://github.com/airbytehq/airbyte/pull/49950) | Remove unused configuration field, streamline SSL certificate key store logic. |
| 3.10.0-rc.1 | 2024-12-20 | [49948](https://github.com/airbytehq/airbyte/pull/49948) | Pin Bulk CDK version to 231, adopt required changes. |
Expand Down
Loading