Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 source-mysql Support special chars in dbname #34580

Merged
merged 27 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
f875c01
support special chars in dbname
xiaohansong Jan 26, 2024
247a775
format
xiaohansong Jan 26, 2024
b3317ae
Merge branch 'master' into xiaohan/topic
xiaohansong Jan 26, 2024
05277a3
Merge remote-tracking branch 'origin/master' into xiaohan/topic
xiaohansong Jan 29, 2024
d81db6b
fix test
xiaohansong Jan 29, 2024
1adb231
fix one more spot
xiaohansong Jan 29, 2024
8f037ea
add comment
xiaohansong Feb 5, 2024
5451425
Merge remote-tracking branch 'origin/master' into xiaohan/topic
xiaohansong Feb 5, 2024
5b54342
bump cdk
xiaohansong Feb 5, 2024
7b0b608
use new cdk
xiaohansong Feb 5, 2024
c7b8682
overwrite
xiaohansong Feb 5, 2024
09ed4ff
remove all backticks
xiaohansong Feb 5, 2024
2efa38b
bump version and doc
xiaohansong Feb 5, 2024
c2532a1
Try different way to create user
xiaohansong Feb 6, 2024
68a2c88
test create user with wildcard
xiaohansong Feb 6, 2024
e1fdf69
try create user with wildcard (need double)
xiaohansong Feb 6, 2024
2218719
try use 8.0.36 tag
xiaohansong Feb 6, 2024
27c2ee5
Merge remote-tracking branch 'origin/master' into xiaohan/topic
xiaohansong Feb 9, 2024
3168937
fix test
xiaohansong Feb 9, 2024
263adb9
remove unrelated p12 files
xiaohansong Feb 9, 2024
b0ba72a
cleanup and upgrade
xiaohansong Feb 9, 2024
173d308
try to avoid concurrency error
xiaohansong Feb 10, 2024
93c536d
Merge remote-tracking branch 'origin/master' into xiaohan/topic
xiaohansong Feb 12, 2024
53ac16b
format and cdk versionbump
xiaohansong Feb 12, 2024
e55f417
merge
xiaohansong Feb 12, 2024
d43eb6f
bump dependency and update version
xiaohansong Feb 12, 2024
5198b54
bump dependency and update version
xiaohansong Feb 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.20.3 | 2024-02-09 | [\#34580](https://github.com/airbytehq/airbyte/pull/34580) | Support special chars in mysql/mssql database name. |
| 0.20.2 | 2024-02-12 | [\#35111](https://github.com/airbytehq/airbyte/pull/35144) | Make state emission from async framework synchronized. |
| 0.20.1 | 2024-02-11 | [\#35111](https://github.com/airbytehq/airbyte/pull/35111) | Fix GlobalAsyncStateManager stats counting logic. |
| 0.20.0 | 2024-02-09 | [\#34562](https://github.com/airbytehq/airbyte/pull/34562) | Add new test cases to BaseTypingDedupingTest to exercise special characters. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.20.2
version=0.20.3
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.debezium.spi.common.ReplacementFunction;
import java.util.Optional;
import java.util.Properties;

Expand Down Expand Up @@ -76,15 +77,42 @@ public Properties getDebeziumProperties(
props.setProperty("max.queue.size.in.bytes", BYTE_VALUE_256_MB);

// WARNING : Never change the value of this otherwise all the connectors would start syncing from
// scratch
props.setProperty(TOPIC_PREFIX_KEY, getName(config));
// scratch.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any potential re-sync issues with existing syncs? I assume that because in these cases syncs are already in a broken state it shouldn't be an issue, but wanted to confirm

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's what I'm thinking too. The reformatting code is just borrowed from debezium sanitize code so I would expect them to recognize invalid name in the same way.

props.setProperty(TOPIC_PREFIX_KEY, sanitizeTopicPrefix(getName(config)));

// includes
props.putAll(getIncludeConfiguration(catalog, config));

return props;
}

public static String sanitizeTopicPrefix(final String topicName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit :

I think you could simplify using regex:

`public static String sanitizeTopicPrefix(final String topicName) {
// Regex to match invalid characters (i.e., characters not allowed)
String regexPattern = "[^\\.\\-_A-Za-z0-9]";

// Replace all invalid characters with underscores
String sanitized = topicName.replaceAll(regexPattern, "_");

// Return the sanitized string
return sanitized;

}
`

but will leave it to you, I haven't really tested this (ChatGPT generated 😅 )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I borrowed it from the following and there is no good way to directly use them though:

https://github.com/debezium/debezium/blob/c51ef3099a688efb41204702d3aa6d4722bb4825/debezium-core/src/main/java/io/debezium/schema/AbstractTopicNamingStrategy.java#L178

And I guess we want to keep them the same. I'll leave some breadcrumb in the comment for better readability.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akashkulk I do prefer the replaceAll as well. It's simpler, and we'll use it infrequently enough that perf don't matter
But with a correct one : "[^a-zA-Z0-9._-]" ('.' loses its special meaning inside square brackets and '-' needs to be at the end)

StringBuilder sanitizedNameBuilder = new StringBuilder(topicName.length());
boolean changed = false;

for (int i = 0; i < topicName.length(); ++i) {
char c = topicName.charAt(i);
if (isValidCharacter(c)) {
sanitizedNameBuilder.append(c);
} else {
sanitizedNameBuilder.append(ReplacementFunction.UNDERSCORE_REPLACEMENT.replace(c));
changed = true;
}
}

if (changed) {
return sanitizedNameBuilder.toString();
} else {
return topicName;
}
}

// We need to keep the validation rule the same as debezium engine, which is defined here:
// https://github.com/debezium/debezium/blob/c51ef3099a688efb41204702d3aa6d4722bb4825/debezium-core/src/main/java/io/debezium/schema/AbstractTopicNamingStrategy.java#L178
private static boolean isValidCharacter(char c) {
return c == '.' || c == '_' || c == '-' || c >= 'A' && c <= 'Z' || c >= 'a' && c <= 'z' || c >= '0' && c <= '9';
}

protected abstract Properties getConnectionConfiguration(final JsonNode config);

protected abstract String getName(final JsonNode config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,19 @@ protected void writeRecords(
recordJson.get(modelCol).asText());
}

protected void deleteMessageOnIdCol(final String streamName, final String idCol, final int idValue) {
testdb.with("DELETE FROM %s.%s WHERE %s = %s", modelsSchema(), streamName, idCol, idValue);
}

protected void deleteCommand(final String streamName) {
testdb.with("DELETE FROM %s.%s", modelsSchema(), streamName);
}

protected void updateCommand(final String streamName, final String modelCol, final String modelVal, final String idCol, final int idValue) {
testdb.with("UPDATE %s.%s SET %s = '%s' WHERE %s = %s", modelsSchema(), streamName,
modelCol, modelVal, COL_ID, 11);
}

static protected Set<AirbyteRecordMessage> removeDuplicates(final Set<AirbyteRecordMessage> messages) {
final Set<JsonNode> existingDataRecordsWithoutUpdated = new HashSet<>();
final Set<AirbyteRecordMessage> output = new HashSet<>();
Expand Down Expand Up @@ -346,7 +359,7 @@ void testDelete() throws Exception {
final List<AirbyteStateMessage> stateMessages1 = extractStateMessages(actualRecords1);
assertExpectedStateMessages(stateMessages1);

testdb.with("DELETE FROM %s.%s WHERE %s = %s", modelsSchema(), MODELS_STREAM_NAME, COL_ID, 11);
deleteMessageOnIdCol(MODELS_STREAM_NAME, COL_ID, 11);

final JsonNode state = Jsons.jsonNode(Collections.singletonList(stateMessages1.get(stateMessages1.size() - 1)));
final AutoCloseableIterator<AirbyteMessage> read2 = source()
Expand Down Expand Up @@ -375,8 +388,7 @@ void testUpdate() throws Exception {
final List<AirbyteStateMessage> stateMessages1 = extractStateMessages(actualRecords1);
assertExpectedStateMessages(stateMessages1);

testdb.with("UPDATE %s.%s SET %s = '%s' WHERE %s = %s", modelsSchema(), MODELS_STREAM_NAME,
COL_MODEL, updatedModel, COL_ID, 11);
updateCommand(MODELS_STREAM_NAME, COL_MODEL, updatedModel, COL_ID, 11);

final JsonNode state = Jsons.jsonNode(Collections.singletonList(stateMessages1.get(stateMessages1.size() - 1)));
final AutoCloseableIterator<AirbyteMessage> read2 = source()
Expand Down Expand Up @@ -536,8 +548,7 @@ void testCdcAndFullRefreshInSameSync() throws Exception {
@DisplayName("When no records exist, no records are returned.")
void testNoData() throws Exception {

testdb.with("DELETE FROM %s.%s", modelsSchema(), MODELS_STREAM_NAME);

deleteCommand(MODELS_STREAM_NAME);
final AutoCloseableIterator<AirbyteMessage> read = source().read(config(), getConfiguredCatalog(), null);
final List<AirbyteMessage> actualRecords = AutoCloseableIterators.toListAndClose(read);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.19.0'
cdkVersionRequired = '0.20.3'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.3.4
dockerImageTag: 3.3.5
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,11 @@ public JsonNode constructInitialDebeziumState(final Properties properties,
// We use the schema_only_recovery property cause using this mode will instruct Debezium to
// construct the db schema history.
properties.setProperty("snapshot.mode", "schema_only_recovery");
final String dbName = database.getSourceConfig().get(JdbcUtils.DATABASE_KEY).asText();
// Topic.prefix is sanitized version of database name. At this stage properties does not have this
// value - it's set in RelationalDbDebeziumPropertiesManager.
final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeState(
constructBinlogOffset(database, database.getSourceConfig().get(JdbcUtils.DATABASE_KEY).asText()),
constructBinlogOffset(database, dbName, DebeziumPropertiesManager.sanitizeTopicPrefix(dbName)),
Optional.empty());
final AirbyteSchemaHistoryStorage schemaHistoryStorage =
AirbyteSchemaHistoryStorage.initializeDBHistory(new SchemaHistory<>(Optional.empty(), false), COMPRESSION_ENABLED);
Expand Down Expand Up @@ -303,13 +306,13 @@ public static JsonNode serialize(final Map<String, String> offset, final SchemaH
* Method to construct initial Debezium state which can be passed onto Debezium engine to make it
* process binlogs from a specific file and position and skip snapshot phase
*/
private JsonNode constructBinlogOffset(final JdbcDatabase database, final String dbName) {
return format(getStateAttributesFromDB(database), dbName, Instant.now());
private JsonNode constructBinlogOffset(final JdbcDatabase database, final String debeziumName, final String topicPrefixName) {
return format(getStateAttributesFromDB(database), debeziumName, topicPrefixName, Instant.now());
}

@VisibleForTesting
public JsonNode format(final MysqlDebeziumStateAttributes attributes, final String dbName, final Instant time) {
final String key = "[\"" + dbName + "\",{\"server\":\"" + dbName + "\"}]";
public JsonNode format(final MysqlDebeziumStateAttributes attributes, final String debeziumName, final String topicPrefixName, final Instant time) {
final String key = "[\"" + debeziumName + "\",{\"server\":\"" + topicPrefixName + "\"}]";
final String gtidSet = attributes.gtidSet().isPresent() ? ",\"gtids\":\"" + attributes.gtidSet().get() + "\"" : "";
final String value =
"{\"transaction_id\":null,\"ts_sec\":" + time.getEpochSecond() + ",\"file\":\"" + attributes.binlogFilename() + "\",\"pos\":"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,12 @@ protected void purgeAllBinaryLogs() {

@Override
protected String createSchemaSqlFmt() {
return "CREATE DATABASE IF NOT EXISTS %s;";
return "CREATE DATABASE IF NOT EXISTS `%s`;";
}

@Override
protected String createTableSqlFmt() {
return "CREATE TABLE `%s`.`%s`(%s);";
}

@Override
Expand Down Expand Up @@ -176,6 +181,36 @@ protected void addCdcDefaultCursorField(final AirbyteStream stream) {
}
}

@Override
protected void writeRecords(
final JsonNode recordJson,
final String dbName,
final String streamName,
final String idCol,
final String makeIdCol,
final String modelCol) {
testdb.with("INSERT INTO `%s` .`%s` (%s, %s, %s) VALUES (%s, %s, '%s');", dbName, streamName,
idCol, makeIdCol, modelCol,
recordJson.get(idCol).asInt(), recordJson.get(makeIdCol).asInt(),
recordJson.get(modelCol).asText());
}

@Override
protected void deleteMessageOnIdCol(final String streamName, final String idCol, final int idValue) {
testdb.with("DELETE FROM `%s`.`%s` WHERE %s = %s", modelsSchema(), streamName, idCol, idValue);
}

@Override
protected void deleteCommand(final String streamName) {
testdb.with("DELETE FROM `%s`.`%s`", modelsSchema(), streamName);
}

@Override
protected void updateCommand(final String streamName, final String modelCol, final String modelVal, final String idCol, final int idValue) {
testdb.with("UPDATE `%s`.`%s` SET %s = '%s' WHERE %s = %s", modelsSchema(), streamName,
modelCol, modelVal, COL_ID, 11);
}

@Test
protected void syncWithReplicationClientPrivilegeRevokedFailsCheck() throws Exception {
testdb.with("REVOKE REPLICATION CLIENT ON *.* FROM %s@'%%';", testdb.getUserName());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mysql;

import io.airbyte.integrations.source.mysql.MySQLTestDatabase.BaseImage;
import io.airbyte.integrations.source.mysql.MySQLTestDatabase.ContainerModifier;

public class CdcMysqlSourceWithSpecialDbNameTest extends CdcMysqlSourceTest {

public static final String INVALID_DB_NAME = "invalid@name";

@Override
protected MySQLTestDatabase createTestDatabase() {
return MySQLTestDatabase.inWithDbName(BaseImage.MYSQL_8, INVALID_DB_NAME, ContainerModifier.INVALID_TIMEZONE_CEST, ContainerModifier.CUSTOM_NAME)
.withCdcPermissions();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void debeziumInitialStateConstructTest() throws SQLException {
public void formatTestWithGtid() {
final MySqlDebeziumStateUtil mySqlDebeziumStateUtil = new MySqlDebeziumStateUtil();
final JsonNode debeziumState = mySqlDebeziumStateUtil.format(new MysqlDebeziumStateAttributes("binlog.000002", 633,
Optional.of("3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5")), "db_fgnfxvllud", Instant.parse("2023-06-06T08:36:10.341842Z"));
Optional.of("3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5")), "db_fgnfxvllud", "db_fgnfxvllud", Instant.parse("2023-06-06T08:36:10.341842Z"));
final Map<String, String> stateAsMap = Jsons.object(debeziumState, Map.class);
Assertions.assertEquals(1, stateAsMap.size());
Assertions.assertTrue(stateAsMap.containsKey("[\"db_fgnfxvllud\",{\"server\":\"db_fgnfxvllud\"}]"));
Expand All @@ -113,15 +113,15 @@ public void formatTestWithGtid() {
debeziumState, config);
Assertions.assertTrue(parsedOffset.isPresent());
final JsonNode stateGeneratedUsingParsedOffset =
mySqlDebeziumStateUtil.format(parsedOffset.get(), "db_fgnfxvllud", Instant.parse("2023-06-06T08:36:10.341842Z"));
mySqlDebeziumStateUtil.format(parsedOffset.get(), "db_fgnfxvllud", "db_fgnfxvllud", Instant.parse("2023-06-06T08:36:10.341842Z"));
Assertions.assertEquals(debeziumState, stateGeneratedUsingParsedOffset);
}

@Test
public void formatTestWithoutGtid() {
final MySqlDebeziumStateUtil mySqlDebeziumStateUtil = new MySqlDebeziumStateUtil();
final JsonNode debeziumState = mySqlDebeziumStateUtil.format(new MysqlDebeziumStateAttributes("binlog.000002", 633,
Optional.empty()), "db_fgnfxvllud", Instant.parse("2023-06-06T08:36:10.341842Z"));
Optional.empty()), "db_fgnfxvllud", "db_fgnfxvllud", Instant.parse("2023-06-06T08:36:10.341842Z"));
final Map<String, String> stateAsMap = Jsons.object(debeziumState, Map.class);
Assertions.assertEquals(1, stateAsMap.size());
Assertions.assertTrue(stateAsMap.containsKey("[\"db_fgnfxvllud\",{\"server\":\"db_fgnfxvllud\"}]"));
Expand All @@ -141,7 +141,7 @@ public void formatTestWithoutGtid() {
debeziumState, config);
Assertions.assertTrue(parsedOffset.isPresent());
final JsonNode stateGeneratedUsingParsedOffset =
mySqlDebeziumStateUtil.format(parsedOffset.get(), "db_fgnfxvllud", Instant.parse("2023-06-06T08:36:10.341842Z"));
mySqlDebeziumStateUtil.format(parsedOffset.get(), "db_fgnfxvllud", "db_fgnfxvllud", Instant.parse("2023-06-06T08:36:10.341842Z"));
Assertions.assertEquals(debeziumState, stateGeneratedUsingParsedOffset);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public void withMoscowTimezone(MySQLContainer<?> container) {
container.withEnv("TZ", "Europe/Moscow");
}

public void withCustomName(MySQLContainer<?> container) {} // do nothing

public void withRootAndServerCertificates(MySQLContainer<?> container) {
execInContainer(container,
"sed -i '31 a ssl' /etc/my.cnf",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public enum ContainerModifier {
ROOT_AND_SERVER_CERTIFICATES("withRootAndServerCertificates"),
CLIENT_CERTITICATE("withClientCertificate"),
NETWORK("withNetwork"),
;

CUSTOM_NAME("withCustomName");

public final String methodName;

Expand All @@ -53,6 +54,15 @@ static public MySQLTestDatabase in(BaseImage baseImage, ContainerModifier... met
return new MySQLTestDatabase(container).initialized();
}

static public MySQLTestDatabase inWithDbName(BaseImage baseImage, String dbName, ContainerModifier... methods) {
String[] methodNames = Stream.of(methods).map(im -> im.methodName).toList().toArray(new String[0]);
final var container = new MySQLContainerFactory().shared(baseImage.reference, methodNames);
MySQLTestDatabase db = new MySQLTestDatabase(container);
db.setDatabaseName(dbName);
db.initialized();
return db;
}

public MySQLTestDatabase(MySQLContainer<?> container) {
super(container);
}
Expand All @@ -70,6 +80,26 @@ public MySQLTestDatabase withoutStrictMode() {
}

static private final int MAX_CONNECTIONS = 1000;
private String databaseName = "";

@Override
public String getDatabaseName() {
if (databaseName.isBlank()) {
return super.getDatabaseName();
} else {
return databaseName;
}
}

@Override
public void close() {
super.close();
databaseName = "";
}

public void setDatabaseName(final String databaseName) {
this.databaseName = databaseName;
}

@Override
protected Stream<Stream<String>> inContainerBootstrapCmd() {
Expand All @@ -80,18 +110,19 @@ protected Stream<Stream<String>> inContainerBootstrapCmd() {
"sh", "-c", "ln -s -f /var/lib/mysql/mysql.sock /var/run/mysqld/mysqld.sock"),
mysqlCmd(Stream.of(
String.format("SET GLOBAL max_connections=%d", MAX_CONNECTIONS),
String.format("CREATE DATABASE %s", getDatabaseName()),
String.format("CREATE DATABASE \\`%s\\`", getDatabaseName()),
String.format("CREATE USER '%s' IDENTIFIED BY '%s'", getUserName(), getPassword()),
// Grant privileges also to the container's user, which is not root.
String.format("GRANT ALL PRIVILEGES ON *.* TO '%s', '%s' WITH GRANT OPTION", getUserName(),
getContainer().getUsername()))));

}

@Override
protected Stream<String> inContainerUndoBootstrapCmd() {
return mysqlCmd(Stream.of(
String.format("DROP USER '%s'", getUserName()),
String.format("DROP DATABASE %s", getDatabaseName())));
String.format("DROP DATABASE \\`%s\\`", getDatabaseName())));
}

@Override
Expand Down
Loading
Loading