From bace22750a0eabd4526120084dd077e1693ec067 Mon Sep 17 00:00:00 2001 From: qsm12 Date: Mon, 23 Oct 2023 14:36:44 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=BF=90=E8=A1=8C=E5=8F=82?= =?UTF-8?q?=E6=95=B0=20--use-lower-case=20true=20=E8=AE=A9=E7=94=9F?= =?UTF-8?q?=E6=88=90=E7=9A=84Doris=E8=A1=A8=E5=90=8D=E4=B8=8E=E5=AD=97?= =?UTF-8?q?=E6=AE=B5=E5=90=8D=E4=BD=BF=E7=94=A8=E5=B0=8F=E5=86=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../doris/flink/catalog/DorisCatalog.java | 2 +- .../flink/catalog/doris/DorisSystem.java | 47 +++++++++++-------- .../flink/catalog/doris/TableSchema.java | 1 + .../doris/flink/tools/cdc/CdcTools.java | 3 +- .../doris/flink/tools/cdc/DatabaseSync.java | 26 +++++----- .../doris/flink/tools/cdc/SourceSchema.java | 8 ++-- .../tools/cdc/CdcMysqlSyncDatabaseCase.java | 37 ++++++++------- .../tools/cdc/CdcOraclelSyncDatabaseCase.java | 35 +++++++------- .../cdc/CdcPostgresSyncDatabaseCase.java | 37 ++++++++------- .../cdc/CdcSqlServerSyncDatabaseCase.java | 31 ++++++------ 10 files changed, 122 insertions(+), 105 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java index e49514687..5c38497a3 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java @@ -338,7 +338,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig schema.setDistributeKeys(primaryKeys); schema.setProperties(getCreateTableProps(options)); - dorisSystem.createTable(schema); + dorisSystem.createTable(schema, false); } public List getCreateDorisKeys(org.apache.flink.table.api.TableSchema schema){ diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java index 77584d3c0..afe04c69c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java @@ -47,6 +47,8 @@ */ @Public public class DorisSystem { + + private boolean useLowerCase; private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class); private JdbcConnectionProvider jdbcConnectionProvider; private static final List builtinDatabases = Arrays.asList("information_schema"); @@ -77,7 +79,7 @@ public boolean dropDatabase(String database) { return true; } - public boolean tableExists(String database, String table){ + public boolean tableExists(String database, String table) { return databaseExists(database) && listTables(database).contains(table); } @@ -97,8 +99,9 @@ public void dropTable(String tableName) { execute(String.format("DROP TABLE IF EXISTS %s", tableName)); } - public void createTable(TableSchema schema) { - String ddl = buildCreateTableDDL(schema); + public void createTable(TableSchema schema, boolean useLowerCase) { + this.useLowerCase = useLowerCase; + String ddl = buildCreateTableDDL(schema, useLowerCase); LOG.info("Create table with ddl:{}", ddl); execute(ddl); } @@ -106,7 +109,7 @@ public void createTable(TableSchema schema) { public void execute(String sql) { try (Statement statement = jdbcConnectionProvider.getOrEstablishConnection().createStatement()) { statement.execute(sql); - } catch (Exception e){ + } catch (Exception e) { throw new DorisSystemException(String.format("SQL query could not be executed: %s", sql), e); } } @@ -140,18 +143,25 @@ public List extractColumnValuesBySQL( } } - public String buildCreateTableDDL(TableSchema schema) { + public String buildCreateTableDDL(TableSchema schema, boolean useLowerCase) { StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS "); sb.append(identifier(schema.getDatabase())) .append(".") .append(identifier(schema.getTable())) .append("("); + // 待验证, 生成Doris DDL语句时,排除不支持的字段名, 该规则尚未匹配 源数据对应字段名剔除 + // Map fields = new Map; + // schema.getFields().forEach((key, val) -> { + // if (key.matches("^[a-zA-Z][a-zA-Z0-9-_]*$")) { + // fields.put(key, val); + // } + // }); Map fields = schema.getFields(); List keys = schema.getKeys(); //append keys - for(String key : keys){ - if(!fields.containsKey(key)){ + for (String key : keys) { + if (!fields.containsKey(key)) { throw new CreateTableException("key " + key + " not found in column list"); } FieldSchema field = fields.get(key); @@ -160,17 +170,17 @@ public String buildCreateTableDDL(TableSchema schema) { //append values for (Map.Entry entry : fields.entrySet()) { - if(keys.contains(entry.getKey())){ + if (keys.contains(entry.getKey())) { continue; } FieldSchema field = entry.getValue(); buildColumn(sb, field, false); } - sb = sb.deleteCharAt(sb.length() -1); + sb = sb.deleteCharAt(sb.length() - 1); sb.append(" ) "); //append uniq model - if(DataModel.UNIQUE.equals(schema.getModel())){ + if (DataModel.UNIQUE.equals(schema.getModel())) { sb.append(schema.getModel().name()) .append(" KEY(") .append(String.join(",", identifier(schema.getKeys()))) @@ -178,7 +188,7 @@ public String buildCreateTableDDL(TableSchema schema) { } //append table comment - if(!StringUtils.isNullOrWhitespaceOnly(schema.getTableComment())){ + if (!StringUtils.isNullOrWhitespaceOnly(schema.getTableComment())) { sb.append(" COMMENT '") .append(quoteComment(schema.getTableComment())) .append("' "); @@ -210,9 +220,9 @@ public String buildCreateTableDDL(TableSchema schema) { return sb.toString(); } - private void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey){ + private void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey) { String fieldType = field.getTypeString(); - if(isKey && DorisType.STRING.equals(fieldType)){ + if (isKey && DorisType.STRING.equals(fieldType)) { fieldType = String.format("%s(%s)", DorisType.VARCHAR, 65533); } sql.append(identifier(field.getName())) @@ -223,21 +233,20 @@ private void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey){ .append("',"); } - private String quoteComment(String comment){ - if(comment == null){ + private String quoteComment(String comment) { + if (comment == null) { return ""; } else { - return comment.replaceAll("'","\\\\'"); + return comment.replaceAll("'", "\\\\'"); } } private List identifier(List name) { - List result = name.stream().map(m -> identifier(m)).collect(Collectors.toList()); - return result; + return name.stream().map(m -> identifier(m)).collect(Collectors.toList()); } private String identifier(String name) { - return "`" + name + "`"; + return "`" + (this.useLowerCase ? name.toLowerCase() : name) + "`"; } private String quoteProperties(String name) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java index 8f047051c..e2b4986a5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/TableSchema.java @@ -31,6 +31,7 @@ public class TableSchema { private List distributeKeys = new ArrayList<>(); private Map properties = new HashMap<>(); + public String getDatabase() { return database; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java index 6a390ea82..1aae6f2b1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java @@ -106,13 +106,14 @@ private static void syncDatabase(MultipleParameterTool params, DatabaseSync data boolean createTableOnly = params.has("create-table-only"); boolean ignoreDefaultValue = params.has("ignore-default-value"); boolean useNewSchemaChange = params.has("use-new-schema-change"); + boolean useLowerCase = params.has("use-lower-case"); Map sinkMap = getConfigMap(params, "sink-conf"); Map tableMap = getConfigMap(params, "table-conf"); Configuration sinkConfig = Configuration.fromMap(sinkMap); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables, ignoreDefaultValue, sinkConfig, tableMap, createTableOnly, useNewSchemaChange); + databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables, ignoreDefaultValue, sinkConfig, tableMap, createTableOnly, useNewSchemaChange, useLowerCase); databaseSync.build(); if(StringUtils.isNullOrWhitespaceOnly(jobName)){ jobName = String.format("%s-Doris Sync Database: %s", type, config.getString("database-name","db")); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 8aef65d6a..e8bc50491 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -61,6 +61,7 @@ public abstract class DatabaseSync { public StreamExecutionEnvironment env; private boolean createTableOnly = false; private boolean newSchemaChange; + private boolean useLowerCase = false; protected String includingTables; protected String excludingTables; @@ -73,7 +74,7 @@ public abstract class DatabaseSync { public void create(StreamExecutionEnvironment env, String database, Configuration config, String tablePrefix, String tableSuffix, String includingTables, String excludingTables, boolean ignoreDefaultValue, Configuration sinkConfig, - Map tableConfig, boolean createTableOnly, boolean useNewSchemaChange) { + Map tableConfig, boolean createTableOnly, boolean useNewSchemaChange, boolean useLowerCase) { this.env = env; this.config = config; this.database = database; @@ -86,11 +87,12 @@ public void create(StreamExecutionEnvironment env, String database, Configuratio this.sinkConfig = sinkConfig; this.tableConfig = tableConfig == null ? new HashMap<>() : tableConfig; //default enable light schema change - if(!this.tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)){ + if (!this.tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)) { this.tableConfig.put(LIGHT_SCHEMA_CHANGE, "true"); } this.createTableOnly = createTableOnly; this.newSchemaChange = useNewSchemaChange; + this.useLowerCase = useLowerCase; } public void build() throws Exception { @@ -107,18 +109,18 @@ public void build() throws Exception { List syncTables = new ArrayList<>(); List dorisTables = new ArrayList<>(); for (SourceSchema schema : schemaList) { - syncTables.add(schema.getTableName()); - String dorisTable = converter.convert(schema.getTableName()); + syncTables.add(this.useLowerCase ? schema.getTableName().toLowerCase() : schema.getTableName()); + String dorisTable = this.useLowerCase ? converter.convert(schema.getTableName()).toLowerCase() : converter.convert(schema.getTableName()); if (!dorisSystem.tableExists(database, dorisTable)) { TableSchema dorisSchema = schema.convertTableSchema(tableConfig); //set doris target database dorisSchema.setDatabase(database); dorisSchema.setTable(dorisTable); - dorisSystem.createTable(dorisSchema); + dorisSystem.createTable(dorisSchema, useLowerCase); } dorisTables.add(dorisTable); } - if(createTableOnly){ + if (createTableOnly) { System.out.println("Create table finished."); System.exit(0); } @@ -191,21 +193,21 @@ public DorisSink buildDorisSink(String table) { sinkConfig.getOptional(DorisConfigOptions.SINK_IGNORE_UPDATE_BEFORE).ifPresent(executionBuilder::setIgnoreUpdateBefore); - if(!sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_2PC)){ + if (!sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_2PC)) { executionBuilder.disable2PC(); - } else if(sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_2PC).isPresent()){ + } else if (sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_2PC).isPresent()) { //force open 2pc executionBuilder.enable2PC(); } //batch option - if(sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_BATCH_MODE)){ + if (sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_BATCH_MODE)) { executionBuilder.enableBatchMode(); } sinkConfig.getOptional(DorisConfigOptions.SINK_FLUSH_QUEUE_SIZE).ifPresent(executionBuilder::setFlushQueueSize); sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_ROWS).ifPresent(executionBuilder::setBufferFlushMaxRows); sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_BYTES).ifPresent(executionBuilder::setBufferFlushMaxBytes); - sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_INTERVAL).ifPresent(v-> executionBuilder.setBufferFlushIntervalMs(v.toMillis())); + sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_INTERVAL).ifPresent(v -> executionBuilder.setBufferFlushIntervalMs(v.toMillis())); sinkConfig.getOptional(DorisConfigOptions.SINK_USE_CACHE).ifPresent(executionBuilder::setUseCache); @@ -241,8 +243,8 @@ public static class TableNameConverter implements Serializable { private final String prefix; private final String suffix; - TableNameConverter(){ - this("",""); + TableNameConverter() { + this("", ""); } TableNameConverter(String prefix, String suffix) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java index 9168cb515..24d2f830d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceSchema.java @@ -85,15 +85,15 @@ public TableSchema convertTableSchema(Map tableProps) { return tableSchema; } - private List buildKeys(){ + private List buildKeys() { return buildDistributeKeys(); } - private List buildDistributeKeys(){ - if(!this.primaryKeys.isEmpty()){ + private List buildDistributeKeys() { + if (!this.primaryKeys.isEmpty()) { return primaryKeys; } - if(!this.fields.isEmpty()){ + if (!this.fields.isEmpty()) { Map.Entry firstField = this.fields.entrySet().iterator().next(); return Collections.singletonList(firstField.getKey()); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java index 1a205b163..c34c40a72 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java @@ -26,15 +26,15 @@ public class CdcMysqlSyncDatabaseCase { - public static void main(String[] args) throws Exception{ + public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.setParallelism(1); - Map flinkMap = new HashMap<>(); - flinkMap.put("execution.checkpointing.interval","10s"); - flinkMap.put("pipeline.operator-chaining","false"); - flinkMap.put("parallelism.default","1"); + Map flinkMap = new HashMap<>(); + flinkMap.put("execution.checkpointing.interval", "10s"); + flinkMap.put("pipeline.operator-chaining", "false"); + flinkMap.put("parallelism.default", "1"); Configuration configuration = Configuration.fromMap(flinkMap); @@ -43,32 +43,33 @@ public static void main(String[] args) throws Exception{ String database = "db1"; String tablePrefix = ""; String tableSuffix = ""; - Map mysqlConfig = new HashMap<>(); - mysqlConfig.put("database-name","db1"); - mysqlConfig.put("hostname","127.0.0.1"); - mysqlConfig.put("port","3306"); - mysqlConfig.put("username","root"); - mysqlConfig.put("password",""); + Map mysqlConfig = new HashMap<>(); + mysqlConfig.put("database-name", "db1"); + mysqlConfig.put("hostname", "127.0.0.1"); + mysqlConfig.put("port", "3306"); + mysqlConfig.put("username", "root"); + mysqlConfig.put("password", ""); Configuration config = Configuration.fromMap(mysqlConfig); - Map sinkConfig = new HashMap<>(); - sinkConfig.put("fenodes","10.20.30.1:8030"); + Map sinkConfig = new HashMap<>(); + sinkConfig.put("fenodes", "10.20.30.1:8030"); // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040, 10.20.30.3:8040"); - sinkConfig.put("username","root"); - sinkConfig.put("password",""); - sinkConfig.put("jdbc-url","jdbc:mysql://10.20.30.1:9030"); + sinkConfig.put("username", "root"); + sinkConfig.put("password", ""); + sinkConfig.put("jdbc-url", "jdbc:mysql://10.20.30.1:9030"); sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString()); Configuration sinkConf = Configuration.fromMap(sinkConfig); - Map tableConfig = new HashMap<>(); + Map tableConfig = new HashMap<>(); tableConfig.put("replication_num", "1"); String includingTables = "tbl1|tbl2|tbl3"; String excludingTables = ""; boolean ignoreDefaultValue = false; boolean useNewSchemaChange = false; + boolean useLowerCase = false; DatabaseSync databaseSync = new MysqlDatabaseSync(); - databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange); + databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables, ignoreDefaultValue, sinkConf, tableConfig, false, useNewSchemaChange, useLowerCase); databaseSync.build(); env.execute(String.format("MySQL-Doris Database Sync: %s", database)); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java index 3a2a39efb..7ffed072f 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java @@ -26,7 +26,7 @@ public class CdcOraclelSyncDatabaseCase { - public static void main(String[] args) throws Exception{ + public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); @@ -45,36 +45,37 @@ public static void main(String[] args) throws Exception{ String database = "db1"; String tablePrefix = ""; String tableSuffix = ""; - Map sourceConfig = new HashMap<>(); - sourceConfig.put("database-name","XE"); - sourceConfig.put("schema-name","ADMIN"); - sourceConfig.put("hostname","127.0.0.1"); - sourceConfig.put("port","1521"); - sourceConfig.put("username","admin"); - sourceConfig.put("password",""); + Map sourceConfig = new HashMap<>(); + sourceConfig.put("database-name", "XE"); + sourceConfig.put("schema-name", "ADMIN"); + sourceConfig.put("hostname", "127.0.0.1"); + sourceConfig.put("port", "1521"); + sourceConfig.put("username", "admin"); + sourceConfig.put("password", ""); // sourceConfig.put("debezium.database.tablename.case.insensitive","false"); - sourceConfig.put("debezium.log.mining.strategy","online_catalog"); - sourceConfig.put("debezium.log.mining.continuous.mine","true"); + sourceConfig.put("debezium.log.mining.strategy", "online_catalog"); + sourceConfig.put("debezium.log.mining.continuous.mine", "true"); Configuration config = Configuration.fromMap(sourceConfig); - Map sinkConfig = new HashMap<>(); - sinkConfig.put("fenodes","10.20.30.1:8030"); + Map sinkConfig = new HashMap<>(); + sinkConfig.put("fenodes", "10.20.30.1:8030"); // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040, 10.20.30.3:8040"); - sinkConfig.put("username","root"); - sinkConfig.put("password",""); - sinkConfig.put("jdbc-url","jdbc:mysql://10.20.30.1:9030"); + sinkConfig.put("username", "root"); + sinkConfig.put("password", ""); + sinkConfig.put("jdbc-url", "jdbc:mysql://10.20.30.1:9030"); sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString()); Configuration sinkConf = Configuration.fromMap(sinkConfig); - Map tableConfig = new HashMap<>(); + Map tableConfig = new HashMap<>(); tableConfig.put("replication_num", "1"); String includingTables = "test.*"; String excludingTables = ""; boolean ignoreDefaultValue = false; boolean useNewSchemaChange = false; + boolean useLowerCase = false; DatabaseSync databaseSync = new OracleDatabaseSync(); - databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange); + databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables, ignoreDefaultValue, sinkConf, tableConfig, false, useNewSchemaChange, useLowerCase); databaseSync.build(); env.execute(String.format("Oracle-Doris Database Sync: %s", database)); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java index cf5e1d89a..013963a09 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcPostgresSyncDatabaseCase.java @@ -26,7 +26,7 @@ public class CdcPostgresSyncDatabaseCase { - public static void main(String[] args) throws Exception{ + public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); @@ -45,39 +45,40 @@ public static void main(String[] args) throws Exception{ String database = "db2"; String tablePrefix = ""; String tableSuffix = ""; - Map sourceConfig = new HashMap<>(); - sourceConfig.put("database-name","postgres"); - sourceConfig.put("schema-name","public"); - sourceConfig.put("slot.name","test"); - sourceConfig.put("decoding.plugin.name","pgoutput"); - sourceConfig.put("hostname","127.0.0.1"); - sourceConfig.put("port","5432"); - sourceConfig.put("username","postgres"); - sourceConfig.put("password","123456"); + Map sourceConfig = new HashMap<>(); + sourceConfig.put("database-name", "postgres"); + sourceConfig.put("schema-name", "public"); + sourceConfig.put("slot.name", "test"); + sourceConfig.put("decoding.plugin.name", "pgoutput"); + sourceConfig.put("hostname", "127.0.0.1"); + sourceConfig.put("port", "5432"); + sourceConfig.put("username", "postgres"); + sourceConfig.put("password", "123456"); // sourceConfig.put("debezium.database.tablename.case.insensitive","false"); // sourceConfig.put("scan.incremental.snapshot.enabled","true"); // sourceConfig.put("debezium.include.schema.changes","false"); - + Configuration config = Configuration.fromMap(sourceConfig); - Map sinkConfig = new HashMap<>(); - sinkConfig.put("fenodes","10.20.30.1:8030"); + Map sinkConfig = new HashMap<>(); + sinkConfig.put("fenodes", "10.20.30.1:8030"); // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040, 10.20.30.3:8040"); - sinkConfig.put("username","root"); - sinkConfig.put("password",""); - sinkConfig.put("jdbc-url","jdbc:mysql://10.20.30.1:9030"); + sinkConfig.put("username", "root"); + sinkConfig.put("password", ""); + sinkConfig.put("jdbc-url", "jdbc:mysql://10.20.30.1:9030"); sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString()); Configuration sinkConf = Configuration.fromMap(sinkConfig); - Map tableConfig = new HashMap<>(); + Map tableConfig = new HashMap<>(); tableConfig.put("replication_num", "1"); String includingTables = "testcdc"; String excludingTables = ""; boolean ignoreDefaultValue = false; boolean useNewSchemaChange = false; + boolean useLowerCase = false; DatabaseSync databaseSync = new PostgresDatabaseSync(); - databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange); + databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables, ignoreDefaultValue, sinkConf, tableConfig, false, useNewSchemaChange, useLowerCase); databaseSync.build(); env.execute(String.format("Postgres-Doris Database Sync: %s", database)); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java index 7251a7fda..759c8f733 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcSqlServerSyncDatabaseCase.java @@ -26,7 +26,7 @@ public class CdcSqlServerSyncDatabaseCase { - public static void main(String[] args) throws Exception{ + public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); @@ -45,37 +45,38 @@ public static void main(String[] args) throws Exception{ String database = "db2"; String tablePrefix = ""; String tableSuffix = ""; - Map sourceConfig = new HashMap<>(); - sourceConfig.put("database-name","CDC_DB"); - sourceConfig.put("schema-name","dbo"); - sourceConfig.put("hostname","127.0.0.1"); - sourceConfig.put("port","1433"); - sourceConfig.put("username","sa"); - sourceConfig.put("password","123456"); + Map sourceConfig = new HashMap<>(); + sourceConfig.put("database-name", "CDC_DB"); + sourceConfig.put("schema-name", "dbo"); + sourceConfig.put("hostname", "127.0.0.1"); + sourceConfig.put("port", "1433"); + sourceConfig.put("username", "sa"); + sourceConfig.put("password", "123456"); // sourceConfig.put("debezium.database.tablename.case.insensitive","false"); // sourceConfig.put("scan.incremental.snapshot.enabled","true"); // sourceConfig.put("debezium.include.schema.changes","false"); Configuration config = Configuration.fromMap(sourceConfig); - Map sinkConfig = new HashMap<>(); - sinkConfig.put("fenodes","10.20.30.1:8030"); + Map sinkConfig = new HashMap<>(); + sinkConfig.put("fenodes", "10.20.30.1:8030"); // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040, 10.20.30.3:8040"); - sinkConfig.put("username","root"); - sinkConfig.put("password",""); - sinkConfig.put("jdbc-url","jdbc:mysql://10.20.30.1:9030"); + sinkConfig.put("username", "root"); + sinkConfig.put("password", ""); + sinkConfig.put("jdbc-url", "jdbc:mysql://10.20.30.1:9030"); sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString()); Configuration sinkConf = Configuration.fromMap(sinkConfig); - Map tableConfig = new HashMap<>(); + Map tableConfig = new HashMap<>(); tableConfig.put("replication_num", "1"); String includingTables = "products_test"; String excludingTables = ""; boolean ignoreDefaultValue = false; boolean useNewSchemaChange = false; + boolean useLowerCase = false; DatabaseSync databaseSync = new SqlServerDatabaseSync(); - databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange); + databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables, ignoreDefaultValue, sinkConf, tableConfig, false, useNewSchemaChange, useLowerCase); databaseSync.build(); env.execute(String.format("Postgres-Doris Database Sync: %s", database));