Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sample query compatible with views #50437

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.10.0-rc.6
dockerImageTag: 3.10.0-rc.7
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,16 @@ class MySqlSourceOperations :
// chance of a row getting picked. This comes at a price of bias to the beginning
// of table on very large tables ( > 100s million of rows)
val greatestRate: String = 0.00005.toString()
// We only do a full count in case information schema contains no row count.
// This is the case for views.
val fullCount = "SELECT COUNT(*) FROM `$namespace`.`$name`"
// Quick approximation to "select count(*) from table" which doesn't require
// full table scan. However, note this could give delayed summary info about a table
// and thus a new table could be treated as empty despite we recently added rows.
// To prevent that from happening and resulted for skipping the table altogether,
// the minimum count is set to 10.
val quickCount =
"SELECT GREATEST(10, table_rows) FROM information_schema.tables WHERE table_schema = '$namespace' AND table_name = '$name'"
"SELECT GREATEST(10, COALESCE(table_rows, ($fullCount))) FROM information_schema.tables WHERE table_schema = '$namespace' AND table_name = '$name'"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the query optimizer really that clever to only perform a full count when table_rows is NULL and skip it otherwise?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified it is
It goes over the values one by one and until it finds a non-null.

val greatest = "GREATEST($greatestRate, $sampleSize / ($quickCount))"
// Rand returns a value between 0 and 1
val where = "WHERE RAND() < $greatest "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,24 @@ class MySqlSourceCursorBasedIntegrationTest {
assertTrue(run2.records().isEmpty())
}

@Test
fun testCursorBasedViewRead() {
provisionView(connectionFactory)
val catalog = getConfiguredCatalog()
catalog.streams[0].stream.name = viewName
val run1: BufferingOutputConsumer = CliRunner.source("read", config, catalog).run()
val lastStateMessageFromRun1 = run1.states().last()
val lastStreamStateFromRun1 = lastStateMessageFromRun1.stream.streamState

assertEquals("20", lastStreamStateFromRun1.get("cursor").textValue())
assertEquals(2, lastStreamStateFromRun1.get("version").intValue())
assertEquals("cursor_based", lastStreamStateFromRun1.get("state_type").asText())
assertEquals(viewName, lastStreamStateFromRun1.get("stream_name").asText())
assertEquals(listOf("k"), lastStreamStateFromRun1.get("cursor_field").map { it.asText() })
assertEquals("test", lastStreamStateFromRun1.get("stream_namespace").asText())
assertEquals(0, lastStreamStateFromRun1.get("cursor_record_count").asInt())
}

companion object {
val log = KotlinLogging.logger {}
val dbContainer: MySQLContainer<*> = MySqlContainerFactory.shared(imageName = "mysql:8.0")
Expand Down Expand Up @@ -206,6 +224,17 @@ class MySqlSourceCursorBasedIntegrationTest {
}
}
}

lateinit var viewName: String
fun provisionView(targetConnectionFactory: JdbcConnectionFactory) {
viewName = "$tableName-view"
targetConnectionFactory.get().use { connection: Connection ->
connection.isReadOnly = false
connection.createStatement().use { stmt: Statement ->
stmt.execute("CREATE VIEW test.`$viewName` AS SELECT * FROM test.`$tableName`")
}
}
}
}
val V1_STATE: String =
"""
Expand Down
6 changes: 3 additions & 3 deletions docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,10 @@ Any database or table encoding combination of charset and collation is supported

| Version | Date | Pull Request | Subject |
|:------------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.10.0-rc.6 | 2024-12-18 | [49892](https://github.com/airbytehq/airbyte/pull/49892) | Use a base image: airbyte/java-connector-base:1.0.0 |

| 3.10.0-rc.7 | 2024-12-27 | [50437](https://github.com/airbytehq/airbyte/pull/50437) | Compatibility with MySQL Views. |
| 3.10.0-rc.6 | 2024-12-18 | [49892](https://github.com/airbytehq/airbyte/pull/49892) | Use a base image: airbyte/java-connector-base:1.0.0 |
| 3.10.0-rc.5 | 2025-01-03 | [50868](https://github.com/airbytehq/airbyte/pull/50868) | Fix exception handling rules declaration. |
| 3.10.0-rc.4 | 2024-12-23 | [48587](https://github.com/airbytehq/airbyte/pull/48587) | Fix minor state counting mechanism. |
| 3.10.0-rc.4 | 2024-12-23 | [48587](https://github.com/airbytehq/airbyte/pull/48587) | Fix minor state counting mechanism. |
| 3.10.0-rc.3 | 2024-12-20 | [49918](https://github.com/airbytehq/airbyte/pull/49918) | Fix minor datatype handling and conversion bugs, maintain big number precision. |
| 3.10.0-rc.2 | 2024-12-20 | [49950](https://github.com/airbytehq/airbyte/pull/49950) | Remove unused configuration field, streamline SSL certificate key store logic. |
| 3.10.0-rc.1 | 2024-12-20 | [49948](https://github.com/airbytehq/airbyte/pull/49948) | Pin Bulk CDK version to 231, adopt required changes. |
Expand Down
Loading