Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

增加运行参数use-lower-case 使用小写字母 #214

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
schema.setDistributeKeys(primaryKeys);
schema.setProperties(getCreateTableProps(options));

dorisSystem.createTable(schema);
dorisSystem.createTable(schema, false);
}

public List<String> getCreateDorisKeys(org.apache.flink.table.api.TableSchema schema){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
*/
@Public
public class DorisSystem {

private boolean useLowerCase;
private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class);
private JdbcConnectionProvider jdbcConnectionProvider;
private static final List<String> builtinDatabases = Arrays.asList("information_schema");
Expand Down Expand Up @@ -77,7 +79,7 @@ public boolean dropDatabase(String database) {
return true;
}

public boolean tableExists(String database, String table){
public boolean tableExists(String database, String table) {
return databaseExists(database)
&& listTables(database).contains(table);
}
Expand All @@ -97,16 +99,17 @@ public void dropTable(String tableName) {
execute(String.format("DROP TABLE IF EXISTS %s", tableName));
}

public void createTable(TableSchema schema) {
String ddl = buildCreateTableDDL(schema);
public void createTable(TableSchema schema, boolean useLowerCase) {
this.useLowerCase = useLowerCase;
String ddl = buildCreateTableDDL(schema, useLowerCase);
LOG.info("Create table with ddl:{}", ddl);
execute(ddl);
}

public void execute(String sql) {
try (Statement statement = jdbcConnectionProvider.getOrEstablishConnection().createStatement()) {
statement.execute(sql);
} catch (Exception e){
} catch (Exception e) {
throw new DorisSystemException(String.format("SQL query could not be executed: %s", sql), e);
}
}
Expand Down Expand Up @@ -140,18 +143,25 @@ public List<String> extractColumnValuesBySQL(
}
}

public String buildCreateTableDDL(TableSchema schema) {
public String buildCreateTableDDL(TableSchema schema, boolean useLowerCase) {
StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS ");
sb.append(identifier(schema.getDatabase()))
.append(".")
.append(identifier(schema.getTable()))
.append("(");

// 待验证, 生成Doris DDL语句时,排除不支持的字段名, 该规则尚未匹配 源数据对应字段名剔除
// Map<String, FieldSchema> fields = new Map<String, FieldSchema>;
// schema.getFields().forEach((key, val) -> {
// if (key.matches("^[a-zA-Z][a-zA-Z0-9-_]*$")) {
// fields.put(key, val);
// }
// });
Map<String, FieldSchema> fields = schema.getFields();
List<String> keys = schema.getKeys();
//append keys
for(String key : keys){
if(!fields.containsKey(key)){
for (String key : keys) {
if (!fields.containsKey(key)) {
throw new CreateTableException("key " + key + " not found in column list");
}
FieldSchema field = fields.get(key);
Expand All @@ -160,25 +170,25 @@ public String buildCreateTableDDL(TableSchema schema) {

//append values
for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
if(keys.contains(entry.getKey())){
if (keys.contains(entry.getKey())) {
continue;
}
FieldSchema field = entry.getValue();
buildColumn(sb, field, false);

}
sb = sb.deleteCharAt(sb.length() -1);
sb = sb.deleteCharAt(sb.length() - 1);
sb.append(" ) ");
//append uniq model
if(DataModel.UNIQUE.equals(schema.getModel())){
if (DataModel.UNIQUE.equals(schema.getModel())) {
sb.append(schema.getModel().name())
.append(" KEY(")
.append(String.join(",", identifier(schema.getKeys())))
.append(")");
}

//append table comment
if(!StringUtils.isNullOrWhitespaceOnly(schema.getTableComment())){
if (!StringUtils.isNullOrWhitespaceOnly(schema.getTableComment())) {
sb.append(" COMMENT '")
.append(quoteComment(schema.getTableComment()))
.append("' ");
Expand Down Expand Up @@ -210,9 +220,9 @@ public String buildCreateTableDDL(TableSchema schema) {
return sb.toString();
}

private void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey){
private void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey) {
String fieldType = field.getTypeString();
if(isKey && DorisType.STRING.equals(fieldType)){
if (isKey && DorisType.STRING.equals(fieldType)) {
fieldType = String.format("%s(%s)", DorisType.VARCHAR, 65533);
}
sql.append(identifier(field.getName()))
Expand All @@ -223,21 +233,20 @@ private void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey){
.append("',");
}

private String quoteComment(String comment){
if(comment == null){
private String quoteComment(String comment) {
if (comment == null) {
return "";
} else {
return comment.replaceAll("'","\\\\'");
return comment.replaceAll("'", "\\\\'");
}
}

private List<String> identifier(List<String> name) {
List<String> result = name.stream().map(m -> identifier(m)).collect(Collectors.toList());
return result;
return name.stream().map(m -> identifier(m)).collect(Collectors.toList());
}

private String identifier(String name) {
return "`" + name + "`";
return "`" + (this.useLowerCase ? name.toLowerCase() : name) + "`";
}

private String quoteProperties(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class TableSchema {
private List<String> distributeKeys = new ArrayList<>();
private Map<String, String> properties = new HashMap<>();


public String getDatabase() {
return database;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,14 @@ private static void syncDatabase(MultipleParameterTool params, DatabaseSync data
boolean createTableOnly = params.has("create-table-only");
boolean ignoreDefaultValue = params.has("ignore-default-value");
boolean useNewSchemaChange = params.has("use-new-schema-change");
boolean useLowerCase = params.has("use-lower-case");

Map<String, String> sinkMap = getConfigMap(params, "sink-conf");
Map<String, String> tableMap = getConfigMap(params, "table-conf");
Configuration sinkConfig = Configuration.fromMap(sinkMap);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables, ignoreDefaultValue, sinkConfig, tableMap, createTableOnly, useNewSchemaChange);
databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables, ignoreDefaultValue, sinkConfig, tableMap, createTableOnly, useNewSchemaChange, useLowerCase);
databaseSync.build();
if(StringUtils.isNullOrWhitespaceOnly(jobName)){
jobName = String.format("%s-Doris Sync Database: %s", type, config.getString("database-name","db"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public abstract class DatabaseSync {
public StreamExecutionEnvironment env;
private boolean createTableOnly = false;
private boolean newSchemaChange;
private boolean useLowerCase = false;
protected String includingTables;
protected String excludingTables;

Expand All @@ -73,7 +74,7 @@ public abstract class DatabaseSync {
public void create(StreamExecutionEnvironment env, String database, Configuration config,
String tablePrefix, String tableSuffix, String includingTables,
String excludingTables, boolean ignoreDefaultValue, Configuration sinkConfig,
Map<String, String> tableConfig, boolean createTableOnly, boolean useNewSchemaChange) {
Map<String, String> tableConfig, boolean createTableOnly, boolean useNewSchemaChange, boolean useLowerCase) {
this.env = env;
this.config = config;
this.database = database;
Expand All @@ -86,11 +87,12 @@ public void create(StreamExecutionEnvironment env, String database, Configuratio
this.sinkConfig = sinkConfig;
this.tableConfig = tableConfig == null ? new HashMap<>() : tableConfig;
//default enable light schema change
if(!this.tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)){
if (!this.tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)) {
this.tableConfig.put(LIGHT_SCHEMA_CHANGE, "true");
}
this.createTableOnly = createTableOnly;
this.newSchemaChange = useNewSchemaChange;
this.useLowerCase = useLowerCase;
}

public void build() throws Exception {
Expand All @@ -107,18 +109,18 @@ public void build() throws Exception {
List<String> syncTables = new ArrayList<>();
List<String> dorisTables = new ArrayList<>();
for (SourceSchema schema : schemaList) {
syncTables.add(schema.getTableName());
String dorisTable = converter.convert(schema.getTableName());
syncTables.add(this.useLowerCase ? schema.getTableName().toLowerCase() : schema.getTableName());
String dorisTable = this.useLowerCase ? converter.convert(schema.getTableName()).toLowerCase() : converter.convert(schema.getTableName());
if (!dorisSystem.tableExists(database, dorisTable)) {
TableSchema dorisSchema = schema.convertTableSchema(tableConfig);
//set doris target database
dorisSchema.setDatabase(database);
dorisSchema.setTable(dorisTable);
dorisSystem.createTable(dorisSchema);
dorisSystem.createTable(dorisSchema, useLowerCase);
}
dorisTables.add(dorisTable);
}
if(createTableOnly){
if (createTableOnly) {
System.out.println("Create table finished.");
System.exit(0);
}
Expand Down Expand Up @@ -191,21 +193,21 @@ public DorisSink<String> buildDorisSink(String table) {
sinkConfig.getOptional(DorisConfigOptions.SINK_IGNORE_UPDATE_BEFORE).ifPresent(executionBuilder::setIgnoreUpdateBefore);


if(!sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_2PC)){
if (!sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_2PC)) {
executionBuilder.disable2PC();
} else if(sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_2PC).isPresent()){
} else if (sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_2PC).isPresent()) {
//force open 2pc
executionBuilder.enable2PC();
}

//batch option
if(sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_BATCH_MODE)){
if (sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_BATCH_MODE)) {
executionBuilder.enableBatchMode();
}
sinkConfig.getOptional(DorisConfigOptions.SINK_FLUSH_QUEUE_SIZE).ifPresent(executionBuilder::setFlushQueueSize);
sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_ROWS).ifPresent(executionBuilder::setBufferFlushMaxRows);
sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_BYTES).ifPresent(executionBuilder::setBufferFlushMaxBytes);
sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_INTERVAL).ifPresent(v-> executionBuilder.setBufferFlushIntervalMs(v.toMillis()));
sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_INTERVAL).ifPresent(v -> executionBuilder.setBufferFlushIntervalMs(v.toMillis()));

sinkConfig.getOptional(DorisConfigOptions.SINK_USE_CACHE).ifPresent(executionBuilder::setUseCache);

Expand Down Expand Up @@ -241,8 +243,8 @@ public static class TableNameConverter implements Serializable {
private final String prefix;
private final String suffix;

TableNameConverter(){
this("","");
TableNameConverter() {
this("", "");
}

TableNameConverter(String prefix, String suffix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@ public TableSchema convertTableSchema(Map<String, String> tableProps) {
return tableSchema;
}

private List<String> buildKeys(){
private List<String> buildKeys() {
return buildDistributeKeys();
}

private List<String> buildDistributeKeys(){
if(!this.primaryKeys.isEmpty()){
private List<String> buildDistributeKeys() {
if (!this.primaryKeys.isEmpty()) {
return primaryKeys;
}
if(!this.fields.isEmpty()){
if (!this.fields.isEmpty()) {
Map.Entry<String, FieldSchema> firstField = this.fields.entrySet().iterator().next();
return Collections.singletonList(firstField.getKey());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@

public class CdcMysqlSyncDatabaseCase {

public static void main(String[] args) throws Exception{
public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(1);

Map<String,String> flinkMap = new HashMap<>();
flinkMap.put("execution.checkpointing.interval","10s");
flinkMap.put("pipeline.operator-chaining","false");
flinkMap.put("parallelism.default","1");
Map<String, String> flinkMap = new HashMap<>();
flinkMap.put("execution.checkpointing.interval", "10s");
flinkMap.put("pipeline.operator-chaining", "false");
flinkMap.put("parallelism.default", "1");


Configuration configuration = Configuration.fromMap(flinkMap);
Expand All @@ -43,32 +43,33 @@ public static void main(String[] args) throws Exception{
String database = "db1";
String tablePrefix = "";
String tableSuffix = "";
Map<String,String> mysqlConfig = new HashMap<>();
mysqlConfig.put("database-name","db1");
mysqlConfig.put("hostname","127.0.0.1");
mysqlConfig.put("port","3306");
mysqlConfig.put("username","root");
mysqlConfig.put("password","");
Map<String, String> mysqlConfig = new HashMap<>();
mysqlConfig.put("database-name", "db1");
mysqlConfig.put("hostname", "127.0.0.1");
mysqlConfig.put("port", "3306");
mysqlConfig.put("username", "root");
mysqlConfig.put("password", "");
Configuration config = Configuration.fromMap(mysqlConfig);

Map<String,String> sinkConfig = new HashMap<>();
sinkConfig.put("fenodes","10.20.30.1:8030");
Map<String, String> sinkConfig = new HashMap<>();
sinkConfig.put("fenodes", "10.20.30.1:8030");
// sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040, 10.20.30.3:8040");
sinkConfig.put("username","root");
sinkConfig.put("password","");
sinkConfig.put("jdbc-url","jdbc:mysql://10.20.30.1:9030");
sinkConfig.put("username", "root");
sinkConfig.put("password", "");
sinkConfig.put("jdbc-url", "jdbc:mysql://10.20.30.1:9030");
sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
Configuration sinkConf = Configuration.fromMap(sinkConfig);

Map<String,String> tableConfig = new HashMap<>();
Map<String, String> tableConfig = new HashMap<>();
tableConfig.put("replication_num", "1");

String includingTables = "tbl1|tbl2|tbl3";
String excludingTables = "";
boolean ignoreDefaultValue = false;
boolean useNewSchemaChange = false;
boolean useLowerCase = false;
DatabaseSync databaseSync = new MysqlDatabaseSync();
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange);
databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables, ignoreDefaultValue, sinkConf, tableConfig, false, useNewSchemaChange, useLowerCase);
databaseSync.build();
env.execute(String.format("MySQL-Doris Database Sync: %s", database));

Expand Down
Loading