diff --git a/README.md b/README.md index ad12bb6e..68315895 100644 --- a/README.md +++ b/README.md @@ -94,7 +94,7 @@ Refer to [test docs](./dt-tests/README.md) for details. - Image size -| ape_dts:2.0.2 | debezium/connect:2.7 | +| ape_dts:2.0.3 | debezium/connect:2.7 | | :-------- | :-------- | | 86.4 MB | 1.38 GB | diff --git a/README_ZH.md b/README_ZH.md index 56ee1322..49d629a8 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -95,7 +95,7 @@ - 镜像对比 -| ape_dts:2.0.2 | debezium/connect:2.7 | +| ape_dts:2.0.3 | debezium/connect:2.7 | | :-------- | :-------- | | 86.4 MB | 1.38 GB | diff --git a/docs/en/tutorial/mongo_to_mongo.md b/docs/en/tutorial/mongo_to_mongo.md index 88ca6267..48037305 100644 --- a/docs/en/tutorial/mongo_to_mongo.md +++ b/docs/en/tutorial/mongo_to_mongo.md @@ -3,6 +3,8 @@ # Prerequisites - [prerequisites](./prerequisites.md) +- This article is for quick start, refer to [templates](/docs/templates/mongo_to_mongo.md) and [common configs](/docs/en/config.md) for more details. + # Prepare Mongo instances ## Source diff --git a/docs/en/tutorial/mysql_to_http_server_consumer.md b/docs/en/tutorial/mysql_to_http_server_consumer.md index 90e8fe2b..b7bc66ed 100644 --- a/docs/en/tutorial/mysql_to_http_server_consumer.md +++ b/docs/en/tutorial/mysql_to_http_server_consumer.md @@ -1,10 +1,11 @@ # Start as HTTP server and extract MySQL data -Refer to [Start ape_dts as an HTTP server to provide data to consumers](/docs/en/consumer/http_consumer.md) for details. +Refer to [Start ape_dts as HTTP server](/docs/en/consumer/http_consumer.md) to provide data to consumers for details. # Prerequisites - [prerequisites](./prerequisites.md) -- python3 + +- This article is for quick start, refer to [templates](/docs/templates/rdb_to_http_server.md) and [common configs](/docs/en/config.md) for more details. # Prepare MySQL instance Refer to [mysql to mysql](./mysql_to_mysql.md) diff --git a/docs/en/tutorial/mysql_to_kafka_consumer.md b/docs/en/tutorial/mysql_to_kafka_consumer.md index f9c3c07a..de8c176b 100644 --- a/docs/en/tutorial/mysql_to_kafka_consumer.md +++ b/docs/en/tutorial/mysql_to_kafka_consumer.md @@ -1,10 +1,11 @@ # Send MySQL data to Kafka -Refer to [Send data to Kafka for consumers](/docs/en/consumer/kafka_consumer.md) +Refer to [Send data to Kafka](/docs/en/consumer/kafka_consumer.md) for consumers. # Prerequisites - [prerequisites](./prerequisites.md) -- python3 + +- This article is for quick start, refer to [templates](/docs/templates/rdb_to_kafka.md) and [common configs](/docs/en/config.md) for more details. # Prepare MySQL instance Refer to [mysql to mysql](./mysql_to_mysql.md) diff --git a/docs/en/tutorial/mysql_to_mysql.md b/docs/en/tutorial/mysql_to_mysql.md index 93ea2ea0..32b0c59b 100644 --- a/docs/en/tutorial/mysql_to_mysql.md +++ b/docs/en/tutorial/mysql_to_mysql.md @@ -3,6 +3,8 @@ # Prerequisites - [prerequisites](./prerequisites.md) +- This article is for quick start, refer to [templates](/docs/templates/mysql_to_mysql.md) and [common configs](/docs/en/config.md) for more details. + # Prepare MySQL instances ## Source @@ -440,4 +442,65 @@ SELECT * FROM test_db.tb_1; | 5 | 5 | | 6 | 6 | +----+---------+ +``` + +# CDC task with ddl capture + +## Start task +``` +cat < /tmp/ape_dts/task_config.ini +[extractor] +db_type=mysql +extract_type=cdc +server_id=2000 +url=mysql://root:123456@127.0.0.1:3307?ssl-mode=disabled + +[filter] +do_dbs=test_db +do_events=insert,update,delete +do_ddls=create_database,drop_database,alter_database,create_table,alter_table,drop_table,create_index,drop_index,truncate_table,rename_table + +[sinker] +db_type=mysql +sink_type=write +batch_size=200 +url=mysql://root:123456@127.0.0.1:3308?ssl-mode=disabled + +[parallelizer] +parallel_type=rdb_merge +parallel_size=8 + +[pipeline] +buffer_size=16000 +checkpoint_interval_secs=1 +EOL +``` + +``` +docker run --rm --network host \ +-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \ +"$APE_DTS_IMAGE" /task_config.ini +``` + +## Do ddls in source +``` +mysql -h127.0.0.1 -uroot -p123456 -uroot -P3307 + +CREATE TABLE test_db.tb_2(id int, value int, primary key(id)); +INSERT INTO test_db.tb_2 VALUES(1,1); +``` + +## Check results +``` +mysql -h127.0.0.1 -uroot -p123456 -uroot -P3308 + +SELECT * FROM test_db.tb_2; +``` + +``` ++----+-------+ +| id | value | ++----+-------+ +| 1 | 1 | ++----+-------+ ``` \ No newline at end of file diff --git a/docs/en/tutorial/mysql_to_starrocks.md b/docs/en/tutorial/mysql_to_starrocks.md index bf868d78..d525bf68 100644 --- a/docs/en/tutorial/mysql_to_starrocks.md +++ b/docs/en/tutorial/mysql_to_starrocks.md @@ -3,6 +3,8 @@ # Prerequisites - [prerequisites](./prerequisites.md) +- This article is for quick start, refer to [templates](/docs/templates/mysql_to_starrocks.md) and [common configs](/docs/en/config.md) for more details. + # Prepare MySQL instance Refer to [mysql to mysql](./mysql_to_mysql.md) diff --git a/docs/en/tutorial/pg_to_http_server_consumer.md b/docs/en/tutorial/pg_to_http_server_consumer.md index 1a65649a..dc4a1c14 100644 --- a/docs/en/tutorial/pg_to_http_server_consumer.md +++ b/docs/en/tutorial/pg_to_http_server_consumer.md @@ -1,10 +1,11 @@ # Start as HTTP server and extract Postgres data -Refer to [Start ape_dts as an HTTP server to provide data to consumers](/docs/en/consumer/http_consumer.md) for details. +Refer to [Start ape_dts as HTTP server](/docs/en/consumer/http_consumer.md) to provide data to consumers for details. # Prerequisites - [prerequisites](./prerequisites.md) -- python3 + +- This article is for quick start, refer to [templates](/docs/templates/rdb_to_http_server.md) and [common configs](/docs/en/config.md) for more details. # Prepare Postgres instances Refer to [pg to pg](./pg_to_pg.md) diff --git a/docs/en/tutorial/pg_to_kafka_consumer.md b/docs/en/tutorial/pg_to_kafka_consumer.md index 3a18c074..bc1ac015 100644 --- a/docs/en/tutorial/pg_to_kafka_consumer.md +++ b/docs/en/tutorial/pg_to_kafka_consumer.md @@ -1,10 +1,11 @@ # Send Postgres data to Kafka -Refer to [Send data to Kafka for consumers](/docs/en/consumer/kafka_consumer.md) +Refer to [Send data to Kafka](/docs/en/consumer/kafka_consumer.md) for consumers. # Prerequisites - [prerequisites](./prerequisites.md) -- python3 + +- This article is for quick start, refer to [templates](/docs/templates/rdb_to_kafka.md) and [common configs](/docs/en/config.md) for more details. # Prepare Postgres instance Refer to [pg to pg](./pg_to_pg.md) diff --git a/docs/en/tutorial/pg_to_pg.md b/docs/en/tutorial/pg_to_pg.md index e76b5453..d86c19a4 100644 --- a/docs/en/tutorial/pg_to_pg.md +++ b/docs/en/tutorial/pg_to_pg.md @@ -3,6 +3,8 @@ # Prerequisites - [prerequisites](./prerequisites.md) +- This article is for quick start, refer to [templates](/docs/templates/pg_to_pg.md) and [common configs](/docs/en/config.md) for more details. + # Prepare Postgres instances ## Source @@ -252,14 +254,12 @@ SELECT * FROM test_db.tb_1 ORDER BY id; ``` ``` -+----+-------+ -| id | value | -+----+-------+ -| 1 | 1 | -| 2 | 2 | -| 3 | 3 | -| 4 | 4 | -+----+-------+ + id | value +----+------- + 1 | 1 + 2 | 2 + 3 | 3 + 4 | 4 ``` # Review data @@ -303,9 +303,17 @@ docker run --rm --network host \ ## Check results - /tmp/ape_dts/review_data_task_log/check/miss.log and /tmp/ape_dts/review_data_task_log/check/diff.log should be empty -# Cdc task +# CDC task + +## Drop slot if exists +``` +SELECT pg_drop_replication_slot('ape_test') FROM pg_replication_slots WHERE slot_name = 'ape_test'; +``` ## Start task + +- this will create slot if not exists. + ``` cat < /tmp/ape_dts/task_config.ini [extractor] @@ -357,12 +365,163 @@ SELECT * FROM test_db.tb_1 ORDER BY id; ``` ``` -+----+---------+ -| id | value | -+----+---------+ -| 2 | 2000000 | -| 3 | 3 | -| 4 | 4 | -| 5 | 5 | -+----+---------+ + id | value +----+--------- + 2 | 2000000 + 3 | 3 + 4 | 4 + 5 | 5 +``` + +# CDC task with ddl capture + +## Enable ddl capture in source + +- Create a meta table to store ddl info +``` +CREATE TABLE public.ape_dts_ddl_command +( + ddl_text text COLLATE pg_catalog."default", + id bigserial primary key, + event text COLLATE pg_catalog."default", + tag text COLLATE pg_catalog."default", + username character varying COLLATE pg_catalog."default", + database character varying COLLATE pg_catalog."default", + schema character varying COLLATE pg_catalog."default", + object_type character varying COLLATE pg_catalog."default", + object_name character varying COLLATE pg_catalog."default", + client_address character varying COLLATE pg_catalog."default", + client_port integer, + event_time timestamp with time zone, + txid_current character varying(128) COLLATE pg_catalog."default", + message text COLLATE pg_catalog."default" +); +``` + +- Create a function to capture ddl and record it into ddl meta table +``` +CREATE FUNCTION public.ape_dts_capture_ddl() + RETURNS event_trigger + LANGUAGE 'plpgsql' + COST 100 + VOLATILE NOT LEAKPROOF SECURITY DEFINER +AS $BODY$ + declare ddl_text text; + declare max_rows int := 10000; + declare current_rows int; + declare pg_version_95 int := 90500; + declare pg_version_10 int := 100000; + declare current_version int; + declare object_id varchar; + declare alter_table varchar; + declare record_object record; + declare message text; + declare pub RECORD; +begin + + select current_query() into ddl_text; + + if TG_TAG = 'CREATE TABLE' then -- ALTER TABLE schema.TABLE REPLICA IDENTITY FULL; + show server_version_num into current_version; + if current_version >= pg_version_95 then + for record_object in (select * from pg_event_trigger_ddl_commands()) loop + if record_object.command_tag = 'CREATE TABLE' then + object_id := record_object.object_identity; + end if; + end loop; + else + select btrim(substring(ddl_text from '[ \t\r\n\v\f]*[c|C][r|R][e|E][a|A][t|T][e|E][ \t\r\n\v\f]*.*[ \t\r\n\v\f]*[t|T][a|A][b|B][l|L][e|E][ \t\r\n\v\f]+(.*)\(.*'),' \t\r\n\v\f') into object_id; + end if; + if object_id = '' or object_id is null then + message := 'CREATE TABLE, but ddl_text=' || ddl_text || ', current_query=' || current_query(); + end if; + if current_version >= pg_version_10 then + for pub in (select * from pg_publication where pubname like 'ape_dts_%') loop + raise notice 'pubname=%',pub.pubname; + BEGIN + execute 'alter publication ' || pub.pubname || ' add table ' || object_id; + EXCEPTION WHEN OTHERS THEN + END; + end loop; + end if; + end if; + + insert into public.ape_dts_ddl_command(id,event,tag,username,database,schema,object_type,object_name,client_address,client_port,event_time,ddl_text,txid_current,message) + values (default,TG_EVENT,TG_TAG,current_user,current_database(),current_schema,'','',inet_client_addr(),inet_client_port(),current_timestamp,ddl_text,cast(TXID_CURRENT() as varchar(16)),message); + + select count(id) into current_rows from public.ape_dts_ddl_command; + if current_rows > max_rows then + delete from public.ape_dts_ddl_command where id in (select min(id) from public.ape_dts_ddl_command); + end if; +end +$BODY$; +``` + +- Alter the function owner to your account +``` +ALTER FUNCTION public.ape_dts_capture_ddl() OWNER TO postgres; +``` + +- Create an event trigger on ddl_command_end and execute the capture function +``` +CREATE EVENT TRIGGER ape_dts_intercept_ddl ON ddl_command_end +EXECUTE PROCEDURE public.ape_dts_capture_ddl(); +``` + +## Start task +``` +cat < /tmp/ape_dts/task_config.ini +[extractor] +db_type=pg +extract_type=cdc +url=postgres://postgres:postgres@127.0.0.1:5433/postgres?options[statement_timeout]=10s +slot_name=ape_test +ddl_meta_tb=public.ape_dts_ddl_command + +[filter] +do_dbs=test_db +do_events=insert,update,delete +do_ddls=create_schema,drop_schema,alter_schema,create_table,alter_table,drop_table,create_index,drop_index,truncate_table,rename_table + +[sinker] +db_type=pg +sink_type=write +batch_size=200 +url=postgres://postgres:postgres@127.0.0.1:5434/postgres?options[statement_timeout]=10s + +[parallelizer] +parallel_type=rdb_merge +parallel_size=8 + +[pipeline] +buffer_size=16000 +checkpoint_interval_secs=1 +EOL +``` + +``` +docker run --rm --network host \ +-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \ +"$APE_DTS_IMAGE" /task_config.ini +``` + +## Do ddls in source +``` +psql -h 127.0.0.1 -U postgres -d postgres -p 5433 -W + +CREATE TABLE test_db.tb_2(id int, value int, primary key(id)); +INSERT INTO test_db.tb_2 VALUES(1,1); +``` + +## Check results +``` +psql -h 127.0.0.1 -U postgres -d postgres -p 5434 -W + +SELECT * FROM test_db.tb_2 ORDER BY id; +``` + +``` + id | value +----+------- + 1 | 1 ``` \ No newline at end of file diff --git a/docs/en/tutorial/prerequisites.md b/docs/en/tutorial/prerequisites.md index 25779353..b6c50df4 100644 --- a/docs/en/tutorial/prerequisites.md +++ b/docs/en/tutorial/prerequisites.md @@ -3,7 +3,7 @@ - Set images ``` -export APE_DTS_IMAGE="apecloud-registry.cn-zhangjiakou.cr.aliyuncs.com/apecloud/ape-dts:2.0.2" +export APE_DTS_IMAGE="apecloud-registry.cn-zhangjiakou.cr.aliyuncs.com/apecloud/ape-dts:2.0.3" export MYSQL_IMAGE="mysql:5.7.40" export POSTGRES_IMAGE="postgis/postgis:15-3.4" export REDIS_IMAGE="redis:7.0" diff --git a/docs/en/tutorial/redis_to_redis.md b/docs/en/tutorial/redis_to_redis.md index 7b410195..27a49a96 100644 --- a/docs/en/tutorial/redis_to_redis.md +++ b/docs/en/tutorial/redis_to_redis.md @@ -3,6 +3,8 @@ # Prerequisites - [prerequisites](./prerequisites.md) +- This article is for quick start, refer to [templates](/docs/templates/redis_to_redis.md) and [common configs](/docs/en/config.md) for more details. + # Prepare Redis instances ## Source diff --git a/docs/templates/mysql_to_mysql.md b/docs/templates/mysql_to_mysql.md index 6a617e7e..96136c61 100644 --- a/docs/templates/mysql_to_mysql.md +++ b/docs/templates/mysql_to_mysql.md @@ -192,6 +192,21 @@ log4rs_file=./log4rs.yaml | gtid_enabled | use Gtid_Set to pull binlog | true | false | | gtid_set | the starting Gtid_Set to pull binlog from | 6d3960f6-4b36-11ef-8614-0242ac110002:1-10 | empty, which means from the latest Executed_Gtid_Set | +# CDC with ddl capture + +- The differences with CDC task config: + +``` +[filter] +do_ddls=create_database,drop_database,alter_database,create_table,alter_table,drop_table,create_index,drop_index,truncate_table,rename_table +``` + +- [filter] + +| Config | Description | Example | Default | +| :-------- | :-------- | :-------- | :-------- | +| do_ddls | the ddl types to capture and sync to target, it shoud be one or more among "create_database, drop_database, alter_database, create_table, alter_table, drop_table, create_index, drop_index, truncate_table, rename_table" | create_table,alter_table,drop_table | empty, which means ignore all ddls | + # Struct check ``` [extractor] diff --git a/docs/templates/pg_to_pg.md b/docs/templates/pg_to_pg.md index 7bc80c04..9f0f5cc6 100644 --- a/docs/templates/pg_to_pg.md +++ b/docs/templates/pg_to_pg.md @@ -131,6 +131,32 @@ log4rs_file=./log4rs.yaml - refer to [create slot and get starting lsn](/docs/en/tutorial/snapshot_and_cdc_without_data_loss.md) +# CDC with ddl capture + +- Refer to [tutorial](/docs/en/tutorial/pg_to_pg.md) for how to enable ddl capture in source Postgres. + +- The differences with CDC task config: + +``` +[extractor] +ddl_meta_tb=public.ape_dts_ddl_command + +[filter] +do_ddls=create_schema,drop_schema,alter_schema,create_table,alter_table,drop_table,create_index,drop_index,truncate_table,rename_table +``` + +- [extractor] + +| Config | Description | Example | Default | +| :-------- | :-------- | :-------- | :-------- | +| ddl_meta_tb | the meta table you created to store the captured ddl info | - | - | + +- [filter] + +| Config | Description | Example | Default | +| :-------- | :-------- | :-------- | :-------- | +| do_ddls | the ddl types to capture and sync to target, it shoud be one or more among "create_schema, drop_schema, alter_schema, create_table, alter_table, drop_table, create_index, drop_index, truncate_table, rename_table" | create_table,alter_table,drop_table | empty, which means ignore all ddls | + # Struct check ``` [extractor] diff --git a/dt-common/src/config/extractor_config.rs b/dt-common/src/config/extractor_config.rs index dffd76a3..98ea29a4 100644 --- a/dt-common/src/config/extractor_config.rs +++ b/dt-common/src/config/extractor_config.rs @@ -60,7 +60,7 @@ pub enum ExtractorConfig { keepalive_interval_secs: u64, heartbeat_interval_secs: u64, heartbeat_tb: String, - ddl_command_tb: String, + ddl_meta_tb: String, start_time_utc: String, end_time_utc: String, }, diff --git a/dt-common/src/config/task_config.rs b/dt-common/src/config/task_config.rs index df117949..b8f4243d 100644 --- a/dt-common/src/config/task_config.rs +++ b/dt-common/src/config/task_config.rs @@ -199,7 +199,7 @@ impl TaskConfig { keepalive_interval_secs, heartbeat_interval_secs, heartbeat_tb, - ddl_command_tb: loader.get_optional(EXTRACTOR, "ddl_command_tb"), + ddl_meta_tb: loader.get_optional(EXTRACTOR, "ddl_meta_tb"), start_time_utc: loader.get_optional(EXTRACTOR, "start_time_utc"), end_time_utc: loader.get_optional(EXTRACTOR, "end_time_utc"), }, diff --git a/dt-common/src/meta/ddl_meta/ddl_parser.rs b/dt-common/src/meta/ddl_meta/ddl_parser.rs index 3b060188..cc7673d3 100644 --- a/dt-common/src/meta/ddl_meta/ddl_parser.rs +++ b/dt-common/src/meta/ddl_meta/ddl_parser.rs @@ -23,11 +23,11 @@ use super::{ ddl_data::DdlData, ddl_statement::{ AlterSchemaStatement, DropMultiTableStatement, DropSchemaStatement, - MysqlAlterRenameTableStatement, MysqlAlterTableStatement, MysqlCreateIndexStatement, + MysqlAlterTableRenameStatement, MysqlAlterTableStatement, MysqlCreateIndexStatement, MysqlCreateTableStatement, MysqlDropIndexStatement, MysqlTruncateTableStatement, - PgAlterRenameTableStatement, PgAlterTableStatement, PgCreateIndexStatement, - PgCreateTableStatement, PgDropMultiIndexStatement, PgTruncateTableStatement, - RenameMultiTableStatement, + PgAlterTableRenameStatement, PgAlterTableSetSchemaStatement, PgAlterTableStatement, + PgCreateIndexStatement, PgCreateTableStatement, PgDropMultiIndexStatement, + PgTruncateTableStatement, RenameMultiTableStatement, }, ddl_type::DdlType, keywords::keyword_a_to_c, @@ -402,7 +402,7 @@ impl DdlParser { let (db, tb) = parse_table(table); if let Some((new_db, new_tb)) = rename_to { - let statement = MysqlAlterRenameTableStatement { + let statement = MysqlAlterTableRenameStatement { db, tb, new_db, @@ -411,7 +411,7 @@ impl DdlParser { }; let ddl = DdlData { ddl_type: DdlType::AlterTable, - statement: DdlStatement::MysqlAlterRenameTable(statement), + statement: DdlStatement::MysqlAlterTableRename(statement), ..Default::default() }; return Ok((remaining_input, ddl)); @@ -444,23 +444,38 @@ impl DdlParser { Ok((remaining_input, parse_table(new_table))) }; - let (remaining_input, (_, _, _, _, if_exists, only, table, _, rename_to, _)) = - tuple(( - tag_no_case("alter"), + let set_schema = |i: &'a [u8]| -> IResult<&'a [u8], String> { + let (remaining_input, (_, _, _, _, new_schema, _)) = tuple(( + tag_no_case("set"), multispace1, - tag_no_case("table"), + tag_no_case("schema"), multispace1, - opt(if_exists), - opt(tuple((tag_no_case("only"), multispace1))), - |i| self.schema_table(i), - multispace1, - opt(rename_to), + |i| self.sql_identifier(i), multispace0, ))(i)?; + Ok((remaining_input, to_string(new_schema))) + }; + + let ( + remaining_input, + (_, _, _, _, if_exists, only, table, _, rename_to_res, set_schema_res, _), + ) = tuple(( + tag_no_case("alter"), + multispace1, + tag_no_case("table"), + multispace1, + opt(if_exists), + opt(tuple((tag_no_case("only"), multispace1))), + |i| self.schema_table(i), + multispace1, + opt(rename_to), + opt(set_schema), + multispace0, + ))(i)?; let (schema, tb) = parse_table(table); - if let Some((new_schema, new_tb)) = rename_to { - let statement = PgAlterRenameTableStatement { + if let Some((new_schema, new_tb)) = rename_to_res { + let statement = PgAlterTableRenameStatement { schema, tb, new_schema, @@ -471,7 +486,25 @@ impl DdlParser { }; let ddl = DdlData { ddl_type: DdlType::AlterTable, - statement: DdlStatement::PgAlterRenameTable(statement), + statement: DdlStatement::PgAlterTableRename(statement), + ..Default::default() + }; + return Ok((remaining_input, ddl)); + } + + if let Some(new_schema) = set_schema_res { + let statement = PgAlterTableSetSchemaStatement { + schema, + tb: tb.clone(), + new_schema, + new_tb: tb, + if_exists: if_exists.is_some(), + is_only: only.is_some(), + unparsed: to_string(remaining_input), + }; + let ddl = DdlData { + ddl_type: DdlType::AlterTable, + statement: DdlStatement::PgAlterTableSetSchema(statement), ..Default::default() }; return Ok((remaining_input, ddl)); @@ -1688,15 +1721,19 @@ mod test_pg { let sqls = vec![ "ALTER TABLE tb_1 RENAME TO tb_2", "alter table tb_1 rename to tb_2", - r#"ALTER TABLE IF EXISTS ONLY "schema_1".tb_1 RENAME TO "schema_2".tb_2"#, - r#"alter table "schema_1".tb_1 rename to "schema_2".tb_2"#, + r#"ALTER TABLE IF EXISTS ONLY "schema_1".tb_1 RENAME TO tb_2"#, + r#"alter table "schema_1".tb_1 rename to tb_2"#, + r#"ALTER TABLE IF EXISTS ONLY "schema_1".tb_1 SET SCHEMA tb_2"#, + r#"alter table "schema_1".tb_1 set schema tb_2"#, ]; let expect_sqls = vec![ r#"ALTER TABLE "tb_1" RENAME TO "tb_2""#, r#"ALTER TABLE "tb_1" RENAME TO "tb_2""#, - r#"ALTER TABLE IF EXISTS ONLY "schema_1"."tb_1" RENAME TO "schema_2"."tb_2""#, - r#"ALTER TABLE "schema_1"."tb_1" RENAME TO "schema_2"."tb_2""#, + r#"ALTER TABLE IF EXISTS ONLY "schema_1"."tb_1" RENAME TO "tb_2""#, + r#"ALTER TABLE "schema_1"."tb_1" RENAME TO "tb_2""#, + r#"ALTER TABLE IF EXISTS ONLY "schema_1"."tb_1" SET SCHEMA "tb_2""#, + r#"ALTER TABLE "schema_1"."tb_1" SET SCHEMA "tb_2""#, ]; let parser = DdlParser::new(DbType::Pg); diff --git a/dt-common/src/meta/ddl_meta/ddl_statement.rs b/dt-common/src/meta/ddl_meta/ddl_statement.rs index ea260bdf..b9cb53fa 100644 --- a/dt-common/src/meta/ddl_meta/ddl_statement.rs +++ b/dt-common/src/meta/ddl_meta/ddl_statement.rs @@ -14,14 +14,15 @@ pub enum DdlStatement { MysqlCreateTable(MysqlCreateTableStatement), MysqlAlterTable(MysqlAlterTableStatement), - MysqlAlterRenameTable(MysqlAlterRenameTableStatement), + MysqlAlterTableRename(MysqlAlterTableRenameStatement), MysqlTruncateTable(MysqlTruncateTableStatement), MysqlCreateIndex(MysqlCreateIndexStatement), MysqlDropIndex(MysqlDropIndexStatement), PgCreateTable(PgCreateTableStatement), PgAlterTable(PgAlterTableStatement), - PgAlterRenameTable(PgAlterRenameTableStatement), + PgAlterTableRename(PgAlterTableRenameStatement), + PgAlterTableSetSchema(PgAlterTableSetSchemaStatement), PgTruncateTable(PgTruncateTableStatement), PgCreateIndex(PgCreateIndexStatement), @@ -108,8 +109,9 @@ impl DdlStatement { DdlStatement::DropTable(s) => (s.schema.clone(), s.tb.clone()), DdlStatement::RenameTable(s) => (s.schema.clone(), s.tb.clone()), - DdlStatement::MysqlAlterRenameTable(s) => (s.db.clone(), s.tb.clone()), - DdlStatement::PgAlterRenameTable(s) => (s.schema.clone(), s.tb.clone()), + DdlStatement::MysqlAlterTableRename(s) => (s.db.clone(), s.tb.clone()), + DdlStatement::PgAlterTableRename(s) => (s.schema.clone(), s.tb.clone()), + DdlStatement::PgAlterTableSetSchema(s) => (s.schema.clone(), s.tb.clone()), DdlStatement::PgDropIndex(_) | DdlStatement::PgDropMultiIndex(_) @@ -122,8 +124,8 @@ impl DdlStatement { pub fn get_rename_to_schema_tb(&self) -> (String, String) { match self { DdlStatement::RenameTable(s) => (s.new_schema.clone(), s.new_tb.clone()), - DdlStatement::MysqlAlterRenameTable(s) => (s.new_db.clone(), s.new_tb.clone()), - DdlStatement::PgAlterRenameTable(s) => (s.new_schema.clone(), s.new_tb.clone()), + DdlStatement::MysqlAlterTableRename(s) => (s.new_db.clone(), s.new_tb.clone()), + DdlStatement::PgAlterTableRename(s) => (s.new_schema.clone(), s.new_tb.clone()), _ => (String::new(), String::new()), } } @@ -136,7 +138,7 @@ impl DdlStatement { dst_new_tb: String, ) { match self { - DdlStatement::MysqlAlterRenameTable(s) => { + DdlStatement::MysqlAlterTableRename(s) => { if !s.db.is_empty() { s.db = dst_schema; } @@ -147,7 +149,7 @@ impl DdlStatement { s.new_tb = dst_new_tb; } - DdlStatement::PgAlterRenameTable(s) => { + DdlStatement::PgAlterTableRename(s) => { if !s.schema.is_empty() { s.schema = dst_schema; } @@ -260,8 +262,9 @@ impl DdlStatement { // not supported DdlStatement::RenameTable(_) - | DdlStatement::MysqlAlterRenameTable(_) - | DdlStatement::PgAlterRenameTable(_) + | DdlStatement::MysqlAlterTableRename(_) + | DdlStatement::PgAlterTableRename(_) + | DdlStatement::PgAlterTableSetSchema(_) | DdlStatement::PgDropIndex(_) | DdlStatement::PgDropMultiIndex(_) | DdlStatement::DropMultiTable(_) @@ -352,7 +355,7 @@ pub struct MysqlAlterTableStatement { } #[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] -pub struct MysqlAlterRenameTableStatement { +pub struct MysqlAlterTableRenameStatement { pub db: String, pub tb: String, pub new_db: String, @@ -370,7 +373,18 @@ pub struct PgAlterTableStatement { } #[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] -pub struct PgAlterRenameTableStatement { +pub struct PgAlterTableRenameStatement { + pub schema: String, + pub tb: String, + pub new_schema: String, + pub new_tb: String, + pub if_exists: bool, + pub is_only: bool, + pub unparsed: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] +pub struct PgAlterTableSetSchemaStatement { pub schema: String, pub tb: String, pub new_schema: String, @@ -561,7 +575,7 @@ impl DdlStatement { append_unparsed(sql, &s.unparsed) } - DdlStatement::MysqlAlterRenameTable(s) => { + DdlStatement::MysqlAlterTableRename(s) => { let mut sql = "ALTER TABLE".to_string(); sql = append_tb(&sql, &s.db, &s.tb, db_type); sql = format!("{} RENAME TO", sql); @@ -581,7 +595,7 @@ impl DdlStatement { append_unparsed(sql, &s.unparsed) } - DdlStatement::PgAlterRenameTable(s) => { + DdlStatement::PgAlterTableRename(s) => { let mut sql = "ALTER TABLE".to_string(); if s.if_exists { sql = format!("{} IF EXISTS", sql); @@ -595,6 +609,20 @@ impl DdlStatement { append_unparsed(sql, &s.unparsed) } + DdlStatement::PgAlterTableSetSchema(s) => { + let mut sql = "ALTER TABLE".to_string(); + if s.if_exists { + sql = format!("{} IF EXISTS", sql); + } + if s.is_only { + sql = format!("{} ONLY", sql); + } + sql = append_tb(&sql, &s.schema, &s.tb, db_type); + sql = format!("{} SET SCHEMA", sql); + sql = append_identifier(&sql, &s.new_schema, true, db_type); + append_unparsed(sql, &s.unparsed) + } + DdlStatement::RenameMultiTable(s) => s.to_sql(db_type), DdlStatement::RenameTable(s) => { diff --git a/dt-common/src/meta/pg/pg_meta_manager.rs b/dt-common/src/meta/pg/pg_meta_manager.rs index 0219f59a..9cc650af 100644 --- a/dt-common/src/meta/pg/pg_meta_manager.rs +++ b/dt-common/src/meta/pg/pg_meta_manager.rs @@ -43,8 +43,7 @@ impl PgMetaManager { .type_registry .oid_to_type .get(&oid) - .with_context(|| format!("no type found for oid: [{}]", oid)) - .unwrap() + .with_context(|| format!("no type found for oid: [{}]", oid))? .clone()) } @@ -59,8 +58,7 @@ impl PgMetaManager { Ok(self .oid_to_tb_meta .get(&oid) - .with_context(|| format!("no tb_meta found for oid: [{}]", oid)) - .unwrap() + .with_context(|| format!("no tb_meta found for oid: [{}]", oid))? .clone()) } @@ -147,7 +145,7 @@ impl PgMetaManager { schema, tb ); let mut rows = sqlx::query(&sql).fetch(conn_pool); - while let Some(row) = rows.try_next().await.unwrap() { + while let Some(row) = rows.try_next().await? { let col: String = row.try_get("column_name")?; cols.push(col); } @@ -163,7 +161,7 @@ impl PgMetaManager { ); let mut rows = sqlx::query(&sql).fetch(conn_pool); - while let Some(row) = rows.try_next().await.unwrap() { + while let Some(row) = rows.try_next().await? { let col: String = row.try_get("col_name")?; if !cols.contains(&col) { continue; @@ -208,7 +206,7 @@ impl PgMetaManager { let mut key_map: HashMap> = HashMap::new(); let mut rows = sqlx::query(&sql).fetch(conn_pool); - while let Some(row) = rows.try_next().await.unwrap() { + while let Some(row) = rows.try_next().await? { let mut col_name: String = row.try_get("col_name")?; col_name = col_name.to_lowercase(); @@ -233,7 +231,7 @@ impl PgMetaManager { async fn get_oid(conn_pool: &Pool, schema: &str, tb: &str) -> anyhow::Result { let sql = format!(r#"SELECT '"{}"."{}"'::regclass::oid;"#, schema, tb); let mut rows = sqlx::query(&sql).fetch(conn_pool); - if let Some(row) = rows.try_next().await.unwrap() { + if let Some(row) = rows.try_next().await? { let oid: i32 = row.try_get_unchecked("oid")?; return Ok(oid); } @@ -279,7 +277,7 @@ impl PgMetaManager { ); let mut rows = sqlx::query(&sql).fetch(conn_pool); - while let Some(row) = rows.try_next().await.unwrap() { + while let Some(row) = rows.try_next().await? { let my_schema: String = row.try_get("schema_name")?; let my_tb: String = row.try_get("table_name")?; let my_col: String = row.try_get("column_name")?; diff --git a/dt-common/src/rdb_filter.rs b/dt-common/src/rdb_filter.rs index 6df0b510..12334610 100644 --- a/dt-common/src/rdb_filter.rs +++ b/dt-common/src/rdb_filter.rs @@ -125,6 +125,10 @@ impl RdbFilter { self.ignore_tbs.insert((schema.into(), tb.into())); } + pub fn add_do_tb(&mut self, schema: &str, tb: &str) { + self.do_tbs.insert((schema.into(), tb.into())); + } + fn match_all(set: &HashSet) -> bool { set.len() == 1 && set.contains("*") } diff --git a/dt-connector/src/extractor/base_extractor.rs b/dt-connector/src/extractor/base_extractor.rs index 6badb326..92964d42 100644 --- a/dt-connector/src/extractor/base_extractor.rs +++ b/dt-connector/src/extractor/base_extractor.rs @@ -153,7 +153,11 @@ impl BaseExtractor { ); if heartbeat_interval_secs == 0 || heartbeat_tb.is_empty() { - log_warn!("heartbeat disabled, heartbeat_tb is empty"); + log_warn!( + "heartbeat disabled, heartbeat_tb: {}, heartbeat_interval_secs: {}", + heartbeat_tb, + heartbeat_interval_secs + ); return vec![]; } diff --git a/dt-connector/src/extractor/pg/pg_cdc_extractor.rs b/dt-connector/src/extractor/pg/pg_cdc_extractor.rs index 6e35ccca..d4a94b11 100644 --- a/dt-connector/src/extractor/pg/pg_cdc_extractor.rs +++ b/dt-connector/src/extractor/pg/pg_cdc_extractor.rs @@ -27,7 +27,10 @@ use sqlx::{postgres::PgArguments, query::Query, Pool, Postgres}; use tokio_postgres::replication::LogicalReplicationStream; use dt_common::{ - config::config_enums::DbType, error::Error, log_error, log_info, rdb_filter::RdbFilter, + config::{config_enums::DbType, config_token_parser::ConfigTokenParser}, + error::Error, + log_error, log_info, + rdb_filter::RdbFilter, utils::time_util::TimeUtil, }; @@ -64,7 +67,7 @@ pub struct PgCdcExtractor { pub keepalive_interval_secs: u64, pub heartbeat_interval_secs: u64, pub heartbeat_tb: String, - pub ddl_command_tb: String, + pub ddl_meta_tb: String, pub syncer: Arc>, pub resumer: CdcResumer, } @@ -80,12 +83,13 @@ impl Extractor for PgCdcExtractor { }; log_info!( - "PgCdcExtractor starts, slot_name: {}, start_lsn: {}, keepalive_interval_secs: {}, heartbeat_interval_secs: {}, heartbeat_tb: {}", + "PgCdcExtractor starts, slot_name: {}, start_lsn: {}, keepalive_interval_secs: {}, heartbeat_interval_secs: {}, heartbeat_tb: {}, ddl_meta_tb: {}", self.slot_name, self.start_lsn, self.keepalive_interval_secs, self.heartbeat_interval_secs, - self.heartbeat_tb + self.heartbeat_tb, + self.ddl_meta_tb, ); self.extract_internal().await?; self.base_extractor.wait_task_finish().await @@ -109,6 +113,12 @@ impl PgCdcExtractor { let (stream, actual_start_lsn) = cdc_client.connect().await?; tokio::pin!(stream); + // setup ddl capture + let ddl_meta = ConfigTokenParser::parse_config(&self.ddl_meta_tb, &DbType::Pg, &['.'])?; + if ddl_meta.len() == 2 { + self.filter.add_do_tb(&ddl_meta[0], &ddl_meta[1]); + } + // start heartbeat self.start_heartbeat(self.base_extractor.shut_down.clone())?; @@ -176,7 +186,7 @@ impl PgCdcExtractor { Insert(insert) => { if self.base_extractor.time_filter.started { - self.decode_insert(&insert, &position).await?; + self.decode_insert(&insert, &position, &ddl_meta).await?; } } @@ -294,6 +304,7 @@ impl PgCdcExtractor { &mut self, event: &InsertBody, position: &Position, + ddl_meta: &[String], ) -> anyhow::Result<()> { let tb_meta = self .meta_manager @@ -311,7 +322,7 @@ impl PgCdcExtractor { Some(col_values), ); - if row_data.tb == self.ddl_command_tb { + if ddl_meta.len() == 2 && row_data.schema == ddl_meta[0] && row_data.tb == ddl_meta[1] { return self.decode_ddl(&row_data, position).await; } diff --git a/dt-connector/src/rdb_router.rs b/dt-connector/src/rdb_router.rs index bb1cf5bf..51761104 100644 --- a/dt-connector/src/rdb_router.rs +++ b/dt-connector/src/rdb_router.rs @@ -159,8 +159,8 @@ impl RdbRouter { pub fn route_ddl(&self, mut ddl_data: DdlData) -> DdlData { match &mut ddl_data.statement { - DdlStatement::MysqlAlterRenameTable(_) - | DdlStatement::PgAlterRenameTable(_) + DdlStatement::MysqlAlterTableRename(_) + | DdlStatement::PgAlterTableRename(_) | DdlStatement::RenameTable(_) => { let (src_schema, src_tb) = ddl_data.get_schema_tb(); let (src_new_schema, src_new_tb) = ddl_data.get_rename_to_schema_tb(); diff --git a/dt-task/src/extractor_util.rs b/dt-task/src/extractor_util.rs index 001d7cb5..0bf00ab9 100644 --- a/dt-task/src/extractor_util.rs +++ b/dt-task/src/extractor_util.rs @@ -234,7 +234,7 @@ impl ExtractorUtil { keepalive_interval_secs, heartbeat_interval_secs, heartbeat_tb, - ddl_command_tb, + ddl_meta_tb, start_time_utc, end_time_utc, } => { @@ -254,7 +254,7 @@ impl ExtractorUtil { keepalive_interval_secs, heartbeat_interval_secs, heartbeat_tb, - ddl_command_tb, + ddl_meta_tb, resumer: cdc_resumer, base_extractor, }; diff --git a/dt-tests/tests/mysql_to_foxlake/cdc/basic_test/dst_prepare.sql b/dt-tests/tests/mysql_to_foxlake/cdc/basic_test/dst_prepare.sql index 05d4aa55..2fcb2e90 100644 --- a/dt-tests/tests/mysql_to_foxlake/cdc/basic_test/dst_prepare.sql +++ b/dt-tests/tests/mysql_to_foxlake/cdc/basic_test/dst_prepare.sql @@ -8,6 +8,7 @@ CREATE TABLE test_db_1.numeric_table ( f_0 tinyint, f_1 tinyint unsigned, f_2 sm CREATE TABLE test_db_1.big_decimal_table ( f_0 tinyint, f_1 decimal(47,25) DEFAULT NULL, PRIMARY KEY(f_0)); +``` CREATE TABLE test_db_1.date_time_table( f_0 tinyint, f_1 datetime DEFAULT NULL, f_2 datetime(6) DEFAULT NULL, @@ -19,4 +20,5 @@ CREATE TABLE test_db_1.date_time_table( f_0 tinyint, f_8 year DEFAULT NULL, PRIMARY KEY(f_0)); -CREATE TABLE test_db_1.composite_pk(f_0 int NOT NULL, f_1 float NOT NULL, PRIMARY KEY (f_0, f_1)); \ No newline at end of file +CREATE TABLE test_db_1.composite_pk(f_0 int NOT NULL, f_1 float NOT NULL, PRIMARY KEY (f_0, f_1)); +``` \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_foxlake/cdc/basic_test/src_prepare.sql b/dt-tests/tests/mysql_to_foxlake/cdc/basic_test/src_prepare.sql index 05d4aa55..2fcb2e90 100644 --- a/dt-tests/tests/mysql_to_foxlake/cdc/basic_test/src_prepare.sql +++ b/dt-tests/tests/mysql_to_foxlake/cdc/basic_test/src_prepare.sql @@ -8,6 +8,7 @@ CREATE TABLE test_db_1.numeric_table ( f_0 tinyint, f_1 tinyint unsigned, f_2 sm CREATE TABLE test_db_1.big_decimal_table ( f_0 tinyint, f_1 decimal(47,25) DEFAULT NULL, PRIMARY KEY(f_0)); +``` CREATE TABLE test_db_1.date_time_table( f_0 tinyint, f_1 datetime DEFAULT NULL, f_2 datetime(6) DEFAULT NULL, @@ -19,4 +20,5 @@ CREATE TABLE test_db_1.date_time_table( f_0 tinyint, f_8 year DEFAULT NULL, PRIMARY KEY(f_0)); -CREATE TABLE test_db_1.composite_pk(f_0 int NOT NULL, f_1 float NOT NULL, PRIMARY KEY (f_0, f_1)); \ No newline at end of file +CREATE TABLE test_db_1.composite_pk(f_0 int NOT NULL, f_1 float NOT NULL, PRIMARY KEY (f_0, f_1)); +``` \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_foxlake/cdc/foxlake_types_test/dst_prepare.sql b/dt-tests/tests/mysql_to_foxlake/cdc/foxlake_types_test/dst_prepare.sql index 1807a409..9d990cb1 100644 --- a/dt-tests/tests/mysql_to_foxlake/cdc/foxlake_types_test/dst_prepare.sql +++ b/dt-tests/tests/mysql_to_foxlake/cdc/foxlake_types_test/dst_prepare.sql @@ -1,5 +1,6 @@ DROP TABLE IF EXISTS sync_db_test_types.test_types; +``` CREATE TABLE sync_db_test_types.`test_types` ( `c_pk` bigint unsigned NOT NULL AUTO_INCREMENT, `c_type` varchar(255) DEFAULT NULL, @@ -44,4 +45,5 @@ CREATE TABLE sync_db_test_types.`test_types` ( `c_set` set('value1','value2') DEFAULT NULL, `c_json` json DEFAULT NULL, PRIMARY KEY (`c_pk`) -) AUTO_INCREMENT=394 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci \ No newline at end of file +) AUTO_INCREMENT=394 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci +``` \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_foxlake/cdc/foxlake_types_test/src_prepare.sql b/dt-tests/tests/mysql_to_foxlake/cdc/foxlake_types_test/src_prepare.sql index 0962eb71..ebbba9c4 100644 --- a/dt-tests/tests/mysql_to_foxlake/cdc/foxlake_types_test/src_prepare.sql +++ b/dt-tests/tests/mysql_to_foxlake/cdc/foxlake_types_test/src_prepare.sql @@ -2,6 +2,7 @@ DROP DATABASE IF EXISTS sync_db_test_types; CREATE DATABASE sync_db_test_types; +``` CREATE TABLE sync_db_test_types.`test_types` ( `c_pk` bigint unsigned NOT NULL AUTO_INCREMENT, `c_type` varchar(255) DEFAULT NULL, @@ -46,4 +47,5 @@ CREATE TABLE sync_db_test_types.`test_types` ( `c_set` set('value1','value2') DEFAULT NULL, `c_json` json DEFAULT NULL, PRIMARY KEY (`c_pk`) -) ENGINE=InnoDB AUTO_INCREMENT=394 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci \ No newline at end of file +) ENGINE=InnoDB AUTO_INCREMENT=394 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci +``` \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_foxlake/snapshot/basic_test/dst_prepare.sql b/dt-tests/tests/mysql_to_foxlake/snapshot/basic_test/dst_prepare.sql index 1cb8e9ce..bf139748 100644 --- a/dt-tests/tests/mysql_to_foxlake/snapshot/basic_test/dst_prepare.sql +++ b/dt-tests/tests/mysql_to_foxlake/snapshot/basic_test/dst_prepare.sql @@ -8,6 +8,7 @@ CREATE TABLE test_db_1.numeric_table ( f_0 tinyint, f_1 tinyint unsigned, f_2 sm CREATE TABLE test_db_1.big_decimal_table ( f_0 tinyint, f_1 decimal(47,25) DEFAULT NULL, PRIMARY KEY(f_0)); +``` CREATE TABLE test_db_1.date_time_table( f_0 tinyint, f_1 datetime DEFAULT NULL, f_2 datetime(6) DEFAULT NULL, @@ -17,4 +18,5 @@ CREATE TABLE test_db_1.date_time_table( f_0 tinyint, f_6 timestamp(6) NULL DEFAULT NULL, f_7 date DEFAULT NULL, f_8 year DEFAULT NULL, - PRIMARY KEY(f_0)); \ No newline at end of file + PRIMARY KEY(f_0)); +``` \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_foxlake/snapshot/basic_test/src_prepare.sql b/dt-tests/tests/mysql_to_foxlake/snapshot/basic_test/src_prepare.sql index 1cb8e9ce..bf139748 100644 --- a/dt-tests/tests/mysql_to_foxlake/snapshot/basic_test/src_prepare.sql +++ b/dt-tests/tests/mysql_to_foxlake/snapshot/basic_test/src_prepare.sql @@ -8,6 +8,7 @@ CREATE TABLE test_db_1.numeric_table ( f_0 tinyint, f_1 tinyint unsigned, f_2 sm CREATE TABLE test_db_1.big_decimal_table ( f_0 tinyint, f_1 decimal(47,25) DEFAULT NULL, PRIMARY KEY(f_0)); +``` CREATE TABLE test_db_1.date_time_table( f_0 tinyint, f_1 datetime DEFAULT NULL, f_2 datetime(6) DEFAULT NULL, @@ -17,4 +18,5 @@ CREATE TABLE test_db_1.date_time_table( f_0 tinyint, f_6 timestamp(6) NULL DEFAULT NULL, f_7 date DEFAULT NULL, f_8 year DEFAULT NULL, - PRIMARY KEY(f_0)); \ No newline at end of file + PRIMARY KEY(f_0)); +``` \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_foxlake/snapshot/foxlake_types_test/dst_prepare.sql b/dt-tests/tests/mysql_to_foxlake/snapshot/foxlake_types_test/dst_prepare.sql index 2230feff..22faa305 100644 --- a/dt-tests/tests/mysql_to_foxlake/snapshot/foxlake_types_test/dst_prepare.sql +++ b/dt-tests/tests/mysql_to_foxlake/snapshot/foxlake_types_test/dst_prepare.sql @@ -1,5 +1,6 @@ DROP TABLE IF EXISTS sync_db_test_types.test_types; +``` CREATE TABLE sync_db_test_types.`test_types` ( `c_pk` bigint unsigned NOT NULL AUTO_INCREMENT, `c_type` varchar(255) DEFAULT NULL, @@ -44,4 +45,5 @@ CREATE TABLE sync_db_test_types.`test_types` ( `c_set` set('value1','value2') DEFAULT NULL, `c_json` json DEFAULT NULL, PRIMARY KEY (`c_pk`) -) AUTO_INCREMENT=394 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; \ No newline at end of file +) AUTO_INCREMENT=394 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; +``` \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_foxlake/snapshot/foxlake_types_test/src_prepare.sql b/dt-tests/tests/mysql_to_foxlake/snapshot/foxlake_types_test/src_prepare.sql index 0962eb71..ebbba9c4 100644 --- a/dt-tests/tests/mysql_to_foxlake/snapshot/foxlake_types_test/src_prepare.sql +++ b/dt-tests/tests/mysql_to_foxlake/snapshot/foxlake_types_test/src_prepare.sql @@ -2,6 +2,7 @@ DROP DATABASE IF EXISTS sync_db_test_types; CREATE DATABASE sync_db_test_types; +``` CREATE TABLE sync_db_test_types.`test_types` ( `c_pk` bigint unsigned NOT NULL AUTO_INCREMENT, `c_type` varchar(255) DEFAULT NULL, @@ -46,4 +47,5 @@ CREATE TABLE sync_db_test_types.`test_types` ( `c_set` set('value1','value2') DEFAULT NULL, `c_json` json DEFAULT NULL, PRIMARY KEY (`c_pk`) -) ENGINE=InnoDB AUTO_INCREMENT=394 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci \ No newline at end of file +) ENGINE=InnoDB AUTO_INCREMENT=394 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci +``` \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_mysql/cdc/basic_test/dst_prepare.sql b/dt-tests/tests/mysql_to_mysql/cdc/basic_test/dst_prepare.sql index 4e985717..1dd9df61 100644 --- a/dt-tests/tests/mysql_to_mysql/cdc/basic_test/dst_prepare.sql +++ b/dt-tests/tests/mysql_to_mysql/cdc/basic_test/dst_prepare.sql @@ -16,6 +16,7 @@ CREATE TABLE test_db_1.col_has_special_character_table (`p:k` tinyint, `col"1` t CREATE TABLE test_db_1.numeric_table ( f_0 tinyint, f_1 tinyint unsigned, f_2 smallint, f_3 smallint unsigned, f_4 mediumint, f_5 mediumint unsigned, f_6 int, f_7 int unsigned, f_8 bigint, f_9 bigint unsigned, PRIMARY KEY(f_0)); +``` CREATE TABLE test_db_1.date_time_table( f_0 tinyint, f_1 datetime DEFAULT NULL, f_2 datetime(6) DEFAULT NULL, @@ -26,10 +27,13 @@ CREATE TABLE test_db_1.date_time_table( f_0 tinyint, f_7 date DEFAULT NULL, f_8 year DEFAULT NULL, PRIMARY KEY(f_0)); +``` +``` CREATE TABLE test_db_1.set_table( f_0 tinyint, f_1 SET('a','b','c','d','e'), PRIMARY KEY(f_0)); +``` CREATE TABLE test_db_1.ignore_cols_1 ( f_0 tinyint, f_1 smallint DEFAULT NULL, f_2 smallint DEFAULT NULL, f_3 smallint DEFAULT NULL, PRIMARY KEY (f_0) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE test_db_1.ignore_cols_2 ( f_0 tinyint, f_1 smallint DEFAULT NULL, f_2 smallint DEFAULT NULL, f_3 smallint DEFAULT NULL, PRIMARY KEY (f_0) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_mysql/cdc/basic_test/src_prepare.sql b/dt-tests/tests/mysql_to_mysql/cdc/basic_test/src_prepare.sql index 4e985717..1dd9df61 100644 --- a/dt-tests/tests/mysql_to_mysql/cdc/basic_test/src_prepare.sql +++ b/dt-tests/tests/mysql_to_mysql/cdc/basic_test/src_prepare.sql @@ -16,6 +16,7 @@ CREATE TABLE test_db_1.col_has_special_character_table (`p:k` tinyint, `col"1` t CREATE TABLE test_db_1.numeric_table ( f_0 tinyint, f_1 tinyint unsigned, f_2 smallint, f_3 smallint unsigned, f_4 mediumint, f_5 mediumint unsigned, f_6 int, f_7 int unsigned, f_8 bigint, f_9 bigint unsigned, PRIMARY KEY(f_0)); +``` CREATE TABLE test_db_1.date_time_table( f_0 tinyint, f_1 datetime DEFAULT NULL, f_2 datetime(6) DEFAULT NULL, @@ -26,10 +27,13 @@ CREATE TABLE test_db_1.date_time_table( f_0 tinyint, f_7 date DEFAULT NULL, f_8 year DEFAULT NULL, PRIMARY KEY(f_0)); +``` +``` CREATE TABLE test_db_1.set_table( f_0 tinyint, f_1 SET('a','b','c','d','e'), PRIMARY KEY(f_0)); +``` CREATE TABLE test_db_1.ignore_cols_1 ( f_0 tinyint, f_1 smallint DEFAULT NULL, f_2 smallint DEFAULT NULL, f_3 smallint DEFAULT NULL, PRIMARY KEY (f_0) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE test_db_1.ignore_cols_2 ( f_0 tinyint, f_1 smallint DEFAULT NULL, f_2 smallint DEFAULT NULL, f_3 smallint DEFAULT NULL, PRIMARY KEY (f_0) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_mysql/cdc/ddl_meta_center_test/task_config.ini b/dt-tests/tests/mysql_to_mysql/cdc/ddl_meta_center_test/task_config.ini index 833c5d9e..7faea605 100644 --- a/dt-tests/tests/mysql_to_mysql/cdc/ddl_meta_center_test/task_config.ini +++ b/dt-tests/tests/mysql_to_mysql/cdc/ddl_meta_center_test/task_config.ini @@ -13,7 +13,7 @@ ignore_dbs= do_dbs= do_tbs=*.* ignore_tbs= -do_events=insert,update,delete,ddl +do_events=insert,update,delete do_ddls=create_database,drop_database,alter_database,create_schema,drop_schema,alter_schema,create_table,alter_table,drop_table,create_index,truncate_table,rename_table [sinker] diff --git a/dt-tests/tests/mysql_to_mysql/cdc/ddl_route_test/task_config.ini b/dt-tests/tests/mysql_to_mysql/cdc/ddl_route_test/task_config.ini index 661f3c44..435cfd2c 100644 --- a/dt-tests/tests/mysql_to_mysql/cdc/ddl_route_test/task_config.ini +++ b/dt-tests/tests/mysql_to_mysql/cdc/ddl_route_test/task_config.ini @@ -13,7 +13,7 @@ ignore_dbs= do_dbs= do_tbs=*.* ignore_tbs= -do_events=insert,update,delete,ddl +do_events=insert,update,delete do_ddls=create_database,drop_database,alter_database,create_schema,drop_schema,alter_schema,create_table,alter_table,drop_table,create_index,drop_index,truncate_table,rename_table [sinker] diff --git a/dt-tests/tests/mysql_to_mysql/cdc/ddl_test/task_config.ini b/dt-tests/tests/mysql_to_mysql/cdc/ddl_test/task_config.ini index 9c305d44..e66e0477 100644 --- a/dt-tests/tests/mysql_to_mysql/cdc/ddl_test/task_config.ini +++ b/dt-tests/tests/mysql_to_mysql/cdc/ddl_test/task_config.ini @@ -13,7 +13,7 @@ ignore_dbs= do_dbs= do_tbs=*.* ignore_tbs= -do_events=insert,update,delete,ddl +do_events=insert,update,delete do_ddls=create_database,drop_database,alter_database,create_schema,drop_schema,alter_schema,create_table,alter_table,drop_table,create_index,drop_index,truncate_table,rename_table [sinker] diff --git a/dt-tests/tests/mysql_to_mysql/check/basic_struct_test/dst_prepare.sql b/dt-tests/tests/mysql_to_mysql/check/basic_struct_test/dst_prepare.sql index 5ab5f91b..f93d9038 100644 --- a/dt-tests/tests/mysql_to_mysql/check/basic_struct_test/dst_prepare.sql +++ b/dt-tests/tests/mysql_to_mysql/check/basic_struct_test/dst_prepare.sql @@ -3,13 +3,16 @@ drop database if exists struct_check_test_1; create database struct_check_test_1; -- simple test +``` CREATE TABLE struct_check_test_1.match_table_test( id integer, text varchar(10) comment 'col comment test', primary key (id) ) comment 'table comment test'; +``` -- full column type +``` CREATE TABLE struct_check_test_1.match_full_column_type ( id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY, varchar_col VARCHAR(255) NOT NULL, @@ -37,8 +40,10 @@ CREATE TABLE struct_check_test_1.match_full_column_type ( enum_col ENUM('value1', 'value2', 'value3'), set_col SET('option1', 'option2', 'option3') ); +``` -- full index type +``` CREATE TABLE struct_check_test_1.match_full_index_type(id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY, f_1 int, f_2 char(128), @@ -50,6 +55,7 @@ CREATE TABLE struct_check_test_1.match_full_index_type(id INT UNSIGNED AUTO_INCR f_8 TEXT, f_9 POINT NOT NULL ); +``` -- unique key with multiple columns CREATE UNIQUE INDEX idx_unique_1 ON struct_check_test_1.match_full_index_type(f_1, f_2, f_3); @@ -74,6 +80,7 @@ CREATE SPATIAL INDEX idx_spatial_1 ON struct_check_test_1.match_full_index_type( -- CREATE TABLE struct_check_test_1.not_match_miss(id integer, text varchar(10) ,primary key (id)); -- not match: column +``` CREATE TABLE struct_check_test_1.not_match_column ( id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY, char_col CHAR(10), @@ -99,8 +106,10 @@ CREATE TABLE struct_check_test_1.not_match_column ( enum_col ENUM('value1', 'value2', 'value3'), set_col SET('option1', 'option2', 'option3') ); +``` -- not match: index +``` CREATE TABLE struct_check_test_1.not_match_index ( id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY, unique_col VARCHAR(255) NOT NULL, @@ -112,6 +121,7 @@ CREATE TABLE struct_check_test_1.not_match_index ( composite_index_col2 VARCHAR(255), composite_index_col3 VARCHAR(255) ); +``` CREATE INDEX i4_diff_order ON struct_check_test_1.not_match_index (composite_index_col3, composite_index_col2, composite_index_col1); CREATE INDEX i5_diff_name_dst ON struct_check_test_1.not_match_index (index_col); diff --git a/dt-tests/tests/mysql_to_mysql/check/basic_struct_test/src_prepare.sql b/dt-tests/tests/mysql_to_mysql/check/basic_struct_test/src_prepare.sql index cc0bc2e1..a830a67c 100644 --- a/dt-tests/tests/mysql_to_mysql/check/basic_struct_test/src_prepare.sql +++ b/dt-tests/tests/mysql_to_mysql/check/basic_struct_test/src_prepare.sql @@ -3,13 +3,16 @@ drop database if exists struct_check_test_1; create database struct_check_test_1; -- simple test +``` CREATE TABLE struct_check_test_1.match_table_test( id integer, text varchar(10) comment 'col comment test', primary key (id) ) comment 'table comment test'; +``` -- full column type +``` CREATE TABLE struct_check_test_1.match_full_column_type ( id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY, varchar_col VARCHAR(255) NOT NULL, @@ -37,8 +40,10 @@ CREATE TABLE struct_check_test_1.match_full_column_type ( enum_col ENUM('value1', 'value2', 'value3'), set_col SET('option1', 'option2', 'option3') ); +``` -- full index type +``` CREATE TABLE struct_check_test_1.match_full_index_type(id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY, f_1 int, f_2 char(128), @@ -50,6 +55,7 @@ CREATE TABLE struct_check_test_1.match_full_index_type(id INT UNSIGNED AUTO_INCR f_8 TEXT, f_9 POINT NOT NULL ); +``` -- unique key with multiple columns CREATE UNIQUE INDEX idx_unique_1 ON struct_check_test_1.match_full_index_type(f_1, f_2, f_3); @@ -71,13 +77,16 @@ CREATE FULLTEXT INDEX idx_full_text_2 ON struct_check_test_1.match_full_index_ty CREATE SPATIAL INDEX idx_spatial_1 ON struct_check_test_1.match_full_index_type(f_9); -- not match: table miss +``` CREATE TABLE struct_check_test_1.not_match_miss( id integer, text varchar(10), primary key (id) ); +``` -- not match: column +``` CREATE TABLE struct_check_test_1.not_match_column ( id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY, varchar_col VARCHAR(255) NOT NULL, @@ -105,8 +114,10 @@ CREATE TABLE struct_check_test_1.not_match_column ( enum_col ENUM('value1', 'value2', 'value3'), set_col SET('option1', 'option2', 'option3') ); +``` -- not match: index +``` CREATE TABLE struct_check_test_1.not_match_index ( id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY, unique_col VARCHAR(255) NOT NULL, @@ -118,6 +129,7 @@ CREATE TABLE struct_check_test_1.not_match_index ( composite_index_col2 VARCHAR(255), composite_index_col3 VARCHAR(255) ); +``` CREATE INDEX i4_diff_order ON struct_check_test_1.not_match_index (composite_index_col2, composite_index_col1 , composite_index_col3); CREATE INDEX i5_diff_name_src ON struct_check_test_1.not_match_index (index_col); diff --git a/dt-tests/tests/mysql_to_mysql/snapshot/basic_test/dst_prepare.sql b/dt-tests/tests/mysql_to_mysql/snapshot/basic_test/dst_prepare.sql index 4e985717..1dd9df61 100644 --- a/dt-tests/tests/mysql_to_mysql/snapshot/basic_test/dst_prepare.sql +++ b/dt-tests/tests/mysql_to_mysql/snapshot/basic_test/dst_prepare.sql @@ -16,6 +16,7 @@ CREATE TABLE test_db_1.col_has_special_character_table (`p:k` tinyint, `col"1` t CREATE TABLE test_db_1.numeric_table ( f_0 tinyint, f_1 tinyint unsigned, f_2 smallint, f_3 smallint unsigned, f_4 mediumint, f_5 mediumint unsigned, f_6 int, f_7 int unsigned, f_8 bigint, f_9 bigint unsigned, PRIMARY KEY(f_0)); +``` CREATE TABLE test_db_1.date_time_table( f_0 tinyint, f_1 datetime DEFAULT NULL, f_2 datetime(6) DEFAULT NULL, @@ -26,10 +27,13 @@ CREATE TABLE test_db_1.date_time_table( f_0 tinyint, f_7 date DEFAULT NULL, f_8 year DEFAULT NULL, PRIMARY KEY(f_0)); +``` +``` CREATE TABLE test_db_1.set_table( f_0 tinyint, f_1 SET('a','b','c','d','e'), PRIMARY KEY(f_0)); +``` CREATE TABLE test_db_1.ignore_cols_1 ( f_0 tinyint, f_1 smallint DEFAULT NULL, f_2 smallint DEFAULT NULL, f_3 smallint DEFAULT NULL, PRIMARY KEY (f_0) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE test_db_1.ignore_cols_2 ( f_0 tinyint, f_1 smallint DEFAULT NULL, f_2 smallint DEFAULT NULL, f_3 smallint DEFAULT NULL, PRIMARY KEY (f_0) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_mysql/snapshot/basic_test/src_prepare.sql b/dt-tests/tests/mysql_to_mysql/snapshot/basic_test/src_prepare.sql index 4e985717..86786dca 100644 --- a/dt-tests/tests/mysql_to_mysql/snapshot/basic_test/src_prepare.sql +++ b/dt-tests/tests/mysql_to_mysql/snapshot/basic_test/src_prepare.sql @@ -16,6 +16,7 @@ CREATE TABLE test_db_1.col_has_special_character_table (`p:k` tinyint, `col"1` t CREATE TABLE test_db_1.numeric_table ( f_0 tinyint, f_1 tinyint unsigned, f_2 smallint, f_3 smallint unsigned, f_4 mediumint, f_5 mediumint unsigned, f_6 int, f_7 int unsigned, f_8 bigint, f_9 bigint unsigned, PRIMARY KEY(f_0)); +```sql CREATE TABLE test_db_1.date_time_table( f_0 tinyint, f_1 datetime DEFAULT NULL, f_2 datetime(6) DEFAULT NULL, @@ -26,10 +27,13 @@ CREATE TABLE test_db_1.date_time_table( f_0 tinyint, f_7 date DEFAULT NULL, f_8 year DEFAULT NULL, PRIMARY KEY(f_0)); +``` +``` CREATE TABLE test_db_1.set_table( f_0 tinyint, f_1 SET('a','b','c','d','e'), PRIMARY KEY(f_0)); +``` CREATE TABLE test_db_1.ignore_cols_1 ( f_0 tinyint, f_1 smallint DEFAULT NULL, f_2 smallint DEFAULT NULL, f_3 smallint DEFAULT NULL, PRIMARY KEY (f_0) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE test_db_1.ignore_cols_2 ( f_0 tinyint, f_1 smallint DEFAULT NULL, f_2 smallint DEFAULT NULL, f_3 smallint DEFAULT NULL, PRIMARY KEY (f_0) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_mysql/struct/basic_test/src_prepare.sql b/dt-tests/tests/mysql_to_mysql/struct/basic_test/src_prepare.sql index 4864e72a..716ae2e3 100644 --- a/dt-tests/tests/mysql_to_mysql/struct/basic_test/src_prepare.sql +++ b/dt-tests/tests/mysql_to_mysql/struct/basic_test/src_prepare.sql @@ -3,7 +3,7 @@ drop database if exists struct_it_mysql2mysql_1; create database if not exists struct_it_mysql2mysql_1; -- full column type - +``` CREATE TABLE struct_it_mysql2mysql_1.full_column_type (id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY, varchar_col VARCHAR(255) NOT NULL COMMENT 'varchar_col_comment', char_col CHAR(10) COMMENT 'char_col_comment', @@ -38,9 +38,10 @@ CREATE TABLE struct_it_mysql2mysql_1.full_column_type (id INT UNSIGNED AUTO_INCR set_col SET('option1', 'option2', 'option3') COMMENT 'set_col_comment', json_col JSON DEFAULT NULL COMMENT 'json_col_comment' ); +``` -- full index type - +``` CREATE TABLE struct_it_mysql2mysql_1.full_index_type( id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY, f_1 int, @@ -53,6 +54,7 @@ CREATE TABLE struct_it_mysql2mysql_1.full_index_type( f_8 TEXT, f_9 POINT NOT NULL ); +``` -- unique key with multiple columns CREATE UNIQUE INDEX idx_unique_1 ON struct_it_mysql2mysql_1.full_index_type(f_1, f_2, f_3); @@ -74,6 +76,7 @@ CREATE FULLTEXT INDEX idx_full_text_2 ON struct_it_mysql2mysql_1.full_index_type CREATE SPATIAL INDEX idx_spatial_1 ON struct_it_mysql2mysql_1.full_index_type(f_9); -- full constraint +``` CREATE TABLE struct_it_mysql2mysql_1.constraint_table ( id INT PRIMARY KEY AUTO_INCREMENT, username VARCHAR(50) NOT NULL UNIQUE, @@ -85,4 +88,5 @@ CREATE TABLE struct_it_mysql2mysql_1.constraint_table ( updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, CONSTRAINT chk_age CHECK (age >= 18), CONSTRAINT chk_email CHECK (email LIKE '%@%.%') -); \ No newline at end of file +); +``` \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_mysql/struct/charset_test/src_prepare.sql b/dt-tests/tests/mysql_to_mysql/struct/charset_test/src_prepare.sql index 5c5f995d..206399e8 100644 --- a/dt-tests/tests/mysql_to_mysql/struct/charset_test/src_prepare.sql +++ b/dt-tests/tests/mysql_to_mysql/struct/charset_test/src_prepare.sql @@ -3,8 +3,10 @@ DROP DATABASE IF EXISTS struct_it_mysql2mysql_1; CREATE DATABASE struct_it_mysql2mysql_1 DEFAULT CHARACTER SET utf32 DEFAULT COLLATE utf32_polish_ci; -- simple test +``` CREATE TABLE struct_it_mysql2mysql_1.table_test( col1 varchar(10), col2 varchar(10) CHARACTER SET latin1 DEFAULT '', col3 varchar(10) CHARACTER SET latin1 COLLATE latin1_spanish_ci DEFAULT 'bbb' -) DEFAULT CHARSET = utf16 COLLATE = utf16_unicode_ci; \ No newline at end of file +) DEFAULT CHARSET = utf16 COLLATE = utf16_unicode_ci; +``` \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_mysql/struct/filter_test_1/src_to_dst/src_prepare.sql b/dt-tests/tests/mysql_to_mysql/struct/filter_test_1/src_to_dst/src_prepare.sql index d58f2879..99710bf0 100644 --- a/dt-tests/tests/mysql_to_mysql/struct/filter_test_1/src_to_dst/src_prepare.sql +++ b/dt-tests/tests/mysql_to_mysql/struct/filter_test_1/src_to_dst/src_prepare.sql @@ -3,6 +3,7 @@ drop database if exists struct_it_mysql2mysql_1; create database if not exists struct_it_mysql2mysql_1; -- full index type +``` CREATE TABLE struct_it_mysql2mysql_1.full_index_type ( id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY, unique_col VARCHAR(255) NOT NULL, @@ -14,6 +15,7 @@ CREATE TABLE struct_it_mysql2mysql_1.full_index_type ( composite_index_col2 VARCHAR(255), composite_index_col3 VARCHAR(255) ); +``` CREATE UNIQUE INDEX unique_index ON struct_it_mysql2mysql_1.full_index_type (unique_col); @@ -24,6 +26,7 @@ CREATE INDEX simple_index ON struct_it_mysql2mysql_1.full_index_type (simple_ind CREATE INDEX composite_index ON struct_it_mysql2mysql_1.full_index_type (composite_index_col1, composite_index_col2, composite_index_col3); -- full constraint +``` CREATE TABLE struct_it_mysql2mysql_1.constraint_table ( id INT PRIMARY KEY AUTO_INCREMENT, username VARCHAR(50) NOT NULL UNIQUE, @@ -36,21 +39,26 @@ CREATE TABLE struct_it_mysql2mysql_1.constraint_table ( CONSTRAINT chk_age CHECK (age >= 18), CONSTRAINT chk_email CHECK (email LIKE '%@%.%') ); +``` -- foreign constraints +``` CREATE TABLE struct_it_mysql2mysql_1.foreign_key_parent ( pk int, parent_col_1 int UNIQUE, parent_col_2 int UNIQUE, PRIMARY KEY(pk) ); +``` +``` CREATE TABLE struct_it_mysql2mysql_1.foreign_key_child ( pk int, child_col_1 int UNIQUE, child_col_2 int UNIQUE, PRIMARY KEY(pk) ); +``` ALTER TABLE struct_it_mysql2mysql_1.foreign_key_child ADD CONSTRAINT fk_test_1 FOREIGN KEY (child_col_1) REFERENCES struct_it_mysql2mysql_1.foreign_key_parent (parent_col_1); ALTER TABLE struct_it_mysql2mysql_1.foreign_key_child ADD CONSTRAINT fk_test_2 FOREIGN KEY (child_col_2) REFERENCES struct_it_mysql2mysql_1.foreign_key_parent (parent_col_2); \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_mysql/struct/filter_test_2/dst_prepare.sql b/dt-tests/tests/mysql_to_mysql/struct/filter_test_2/dst_prepare.sql index 4350d33e..7628105c 100644 --- a/dt-tests/tests/mysql_to_mysql/struct/filter_test_2/dst_prepare.sql +++ b/dt-tests/tests/mysql_to_mysql/struct/filter_test_2/dst_prepare.sql @@ -3,6 +3,7 @@ drop database if exists struct_it_mysql2mysql_1; create database if not exists struct_it_mysql2mysql_1; -- create table with only primary and unique indexes +``` CREATE TABLE struct_it_mysql2mysql_1.full_index_type ( `id` int unsigned NOT NULL AUTO_INCREMENT, `unique_col` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL, @@ -15,10 +16,12 @@ CREATE TABLE struct_it_mysql2mysql_1.full_index_type ( `composite_index_col3` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb3 +``` CREATE UNIQUE INDEX unique_index ON struct_it_mysql2mysql_1.full_index_type (unique_col); -- create table without constraints +``` CREATE TABLE struct_it_mysql2mysql_1.`constraint_table` ( `id` int NOT NULL AUTO_INCREMENT, `username` varchar(50) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL, @@ -31,18 +34,23 @@ CREATE TABLE struct_it_mysql2mysql_1.`constraint_table` ( PRIMARY KEY (`id`), UNIQUE KEY `username` (`username`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb3; +``` -- foreign constraints +``` CREATE TABLE struct_it_mysql2mysql_1.foreign_key_parent ( pk int, parent_col_1 int UNIQUE, parent_col_2 int UNIQUE, PRIMARY KEY(pk) ); +``` +``` CREATE TABLE struct_it_mysql2mysql_1.foreign_key_child ( pk int, child_col_1 int UNIQUE, child_col_2 int UNIQUE, PRIMARY KEY(pk) -); \ No newline at end of file +); +``` \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_mysql/struct/filter_test_2/src_prepare.sql b/dt-tests/tests/mysql_to_mysql/struct/filter_test_2/src_prepare.sql index d58f2879..99710bf0 100644 --- a/dt-tests/tests/mysql_to_mysql/struct/filter_test_2/src_prepare.sql +++ b/dt-tests/tests/mysql_to_mysql/struct/filter_test_2/src_prepare.sql @@ -3,6 +3,7 @@ drop database if exists struct_it_mysql2mysql_1; create database if not exists struct_it_mysql2mysql_1; -- full index type +``` CREATE TABLE struct_it_mysql2mysql_1.full_index_type ( id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY, unique_col VARCHAR(255) NOT NULL, @@ -14,6 +15,7 @@ CREATE TABLE struct_it_mysql2mysql_1.full_index_type ( composite_index_col2 VARCHAR(255), composite_index_col3 VARCHAR(255) ); +``` CREATE UNIQUE INDEX unique_index ON struct_it_mysql2mysql_1.full_index_type (unique_col); @@ -24,6 +26,7 @@ CREATE INDEX simple_index ON struct_it_mysql2mysql_1.full_index_type (simple_ind CREATE INDEX composite_index ON struct_it_mysql2mysql_1.full_index_type (composite_index_col1, composite_index_col2, composite_index_col3); -- full constraint +``` CREATE TABLE struct_it_mysql2mysql_1.constraint_table ( id INT PRIMARY KEY AUTO_INCREMENT, username VARCHAR(50) NOT NULL UNIQUE, @@ -36,21 +39,26 @@ CREATE TABLE struct_it_mysql2mysql_1.constraint_table ( CONSTRAINT chk_age CHECK (age >= 18), CONSTRAINT chk_email CHECK (email LIKE '%@%.%') ); +``` -- foreign constraints +``` CREATE TABLE struct_it_mysql2mysql_1.foreign_key_parent ( pk int, parent_col_1 int UNIQUE, parent_col_2 int UNIQUE, PRIMARY KEY(pk) ); +``` +``` CREATE TABLE struct_it_mysql2mysql_1.foreign_key_child ( pk int, child_col_1 int UNIQUE, child_col_2 int UNIQUE, PRIMARY KEY(pk) ); +``` ALTER TABLE struct_it_mysql2mysql_1.foreign_key_child ADD CONSTRAINT fk_test_1 FOREIGN KEY (child_col_1) REFERENCES struct_it_mysql2mysql_1.foreign_key_parent (parent_col_1); ALTER TABLE struct_it_mysql2mysql_1.foreign_key_child ADD CONSTRAINT fk_test_2 FOREIGN KEY (child_col_2) REFERENCES struct_it_mysql2mysql_1.foreign_key_parent (parent_col_2); \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_mysql/struct/route_test/src_prepare.sql b/dt-tests/tests/mysql_to_mysql/struct/route_test/src_prepare.sql index 4864e72a..69ad9393 100644 --- a/dt-tests/tests/mysql_to_mysql/struct/route_test/src_prepare.sql +++ b/dt-tests/tests/mysql_to_mysql/struct/route_test/src_prepare.sql @@ -3,7 +3,7 @@ drop database if exists struct_it_mysql2mysql_1; create database if not exists struct_it_mysql2mysql_1; -- full column type - +``` CREATE TABLE struct_it_mysql2mysql_1.full_column_type (id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY, varchar_col VARCHAR(255) NOT NULL COMMENT 'varchar_col_comment', char_col CHAR(10) COMMENT 'char_col_comment', @@ -38,9 +38,11 @@ CREATE TABLE struct_it_mysql2mysql_1.full_column_type (id INT UNSIGNED AUTO_INCR set_col SET('option1', 'option2', 'option3') COMMENT 'set_col_comment', json_col JSON DEFAULT NULL COMMENT 'json_col_comment' ); +``` -- full index type +``` CREATE TABLE struct_it_mysql2mysql_1.full_index_type( id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY, f_1 int, @@ -53,6 +55,7 @@ CREATE TABLE struct_it_mysql2mysql_1.full_index_type( f_8 TEXT, f_9 POINT NOT NULL ); +``` -- unique key with multiple columns CREATE UNIQUE INDEX idx_unique_1 ON struct_it_mysql2mysql_1.full_index_type(f_1, f_2, f_3); @@ -74,6 +77,7 @@ CREATE FULLTEXT INDEX idx_full_text_2 ON struct_it_mysql2mysql_1.full_index_type CREATE SPATIAL INDEX idx_spatial_1 ON struct_it_mysql2mysql_1.full_index_type(f_9); -- full constraint +``` CREATE TABLE struct_it_mysql2mysql_1.constraint_table ( id INT PRIMARY KEY AUTO_INCREMENT, username VARCHAR(50) NOT NULL UNIQUE, @@ -85,4 +89,5 @@ CREATE TABLE struct_it_mysql2mysql_1.constraint_table ( updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, CONSTRAINT chk_age CHECK (age >= 18), CONSTRAINT chk_email CHECK (email LIKE '%@%.%') -); \ No newline at end of file +); +``` \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_starrocks/cdc/2_5_4/basic_test/dst_prepare.sql b/dt-tests/tests/mysql_to_starrocks/cdc/2_5_4/basic_test/dst_prepare.sql index 359767f9..052add60 100644 --- a/dt-tests/tests/mysql_to_starrocks/cdc/2_5_4/basic_test/dst_prepare.sql +++ b/dt-tests/tests/mysql_to_starrocks/cdc/2_5_4/basic_test/dst_prepare.sql @@ -3,6 +3,7 @@ DROP DATABASE IF EXISTS test_db_1; CREATE DATABASE test_db_1; -- BINARY, VARBINARY were NOT supported +``` CREATE TABLE test_db_1.one_pk_no_uk ( f_0 TINYINT, f_1 SMALLINT DEFAULT NULL, @@ -26,3 +27,4 @@ CREATE TABLE test_db_1.one_pk_no_uk ( f_26 VARCHAR(100), f_27 VARCHAR(100), f_28 JSON) ENGINE=OLAP PRIMARY KEY(f_0) DISTRIBUTED BY HASH(f_0) PROPERTIES ("replication_num" = "1"); +``` diff --git a/dt-tests/tests/mysql_to_starrocks/cdc/2_5_4/basic_test/src_prepare.sql b/dt-tests/tests/mysql_to_starrocks/cdc/2_5_4/basic_test/src_prepare.sql index e62cb105..413a0b09 100644 --- a/dt-tests/tests/mysql_to_starrocks/cdc/2_5_4/basic_test/src_prepare.sql +++ b/dt-tests/tests/mysql_to_starrocks/cdc/2_5_4/basic_test/src_prepare.sql @@ -2,6 +2,7 @@ DROP DATABASE IF EXISTS test_db_1; CREATE DATABASE test_db_1; +``` CREATE TABLE test_db_1.one_pk_no_uk ( f_0 tinyint, f_1 smallint DEFAULT NULL, @@ -25,4 +26,5 @@ CREATE TABLE test_db_1.one_pk_no_uk ( f_26 enum('x-small','small','medium','large','x-large') DEFAULT NULL, f_27 set('a','b','c','d','e') DEFAULT NULL, f_28 json DEFAULT NULL, - PRIMARY KEY (f_0) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; \ No newline at end of file + PRIMARY KEY (f_0) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +``` \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_starrocks/cdc/3_2_11/basic_test/dst_prepare.sql b/dt-tests/tests/mysql_to_starrocks/cdc/3_2_11/basic_test/dst_prepare.sql index aca72c95..11e1505a 100644 --- a/dt-tests/tests/mysql_to_starrocks/cdc/3_2_11/basic_test/dst_prepare.sql +++ b/dt-tests/tests/mysql_to_starrocks/cdc/3_2_11/basic_test/dst_prepare.sql @@ -2,6 +2,7 @@ DROP DATABASE IF EXISTS test_db_1; CREATE DATABASE test_db_1; +``` CREATE TABLE test_db_1.one_pk_no_uk ( f_0 TINYINT, f_1 SMALLINT DEFAULT NULL, @@ -30,4 +31,5 @@ CREATE TABLE test_db_1.one_pk_no_uk ( f_25 VARBINARY(255) DEFAULT NULL, f_26 VARCHAR(100), f_27 VARCHAR(100), - f_28 JSON) ENGINE=OLAP PRIMARY KEY(f_0) DISTRIBUTED BY HASH(f_0); \ No newline at end of file + f_28 JSON) ENGINE=OLAP PRIMARY KEY(f_0) DISTRIBUTED BY HASH(f_0); +``` \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_starrocks/cdc/3_2_11/basic_test/src_prepare.sql b/dt-tests/tests/mysql_to_starrocks/cdc/3_2_11/basic_test/src_prepare.sql index f219dd43..56469dcb 100644 --- a/dt-tests/tests/mysql_to_starrocks/cdc/3_2_11/basic_test/src_prepare.sql +++ b/dt-tests/tests/mysql_to_starrocks/cdc/3_2_11/basic_test/src_prepare.sql @@ -5,6 +5,7 @@ CREATE DATABASE test_db_1; -- TODO, support columns: -- f_8 bit(64) DEFAULT NULL, +``` CREATE TABLE test_db_1.one_pk_no_uk ( f_0 tinyint, f_1 smallint DEFAULT NULL, @@ -34,4 +35,5 @@ CREATE TABLE test_db_1.one_pk_no_uk ( f_26 enum('x-small','small','medium','large','x-large') DEFAULT NULL, f_27 set('a','b','c','d','e') DEFAULT NULL, f_28 json DEFAULT NULL, - PRIMARY KEY (f_0) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; \ No newline at end of file + PRIMARY KEY (f_0) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +``` \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_starrocks/cdc/3_2_11/json_test/dst_prepare.sql b/dt-tests/tests/mysql_to_starrocks/cdc/3_2_11/json_test/dst_prepare.sql index 3ca54059..72d01018 100644 --- a/dt-tests/tests/mysql_to_starrocks/cdc/3_2_11/json_test/dst_prepare.sql +++ b/dt-tests/tests/mysql_to_starrocks/cdc/3_2_11/json_test/dst_prepare.sql @@ -2,6 +2,8 @@ DROP DATABASE IF EXISTS test_db_1; CREATE DATABASE test_db_1; +``` CREATE TABLE test_db_1.json_test ( f_0 TINYINT, f_1 JSON) ENGINE=OLAP PRIMARY KEY(f_0) DISTRIBUTED BY HASH(f_0); +``` diff --git a/dt-tests/tests/mysql_to_starrocks/cdc/3_2_11/json_to_string_test/dst_prepare.sql b/dt-tests/tests/mysql_to_starrocks/cdc/3_2_11/json_to_string_test/dst_prepare.sql index 4c38a8c7..175f9d98 100644 --- a/dt-tests/tests/mysql_to_starrocks/cdc/3_2_11/json_to_string_test/dst_prepare.sql +++ b/dt-tests/tests/mysql_to_starrocks/cdc/3_2_11/json_to_string_test/dst_prepare.sql @@ -2,6 +2,8 @@ DROP DATABASE IF EXISTS test_db_1; CREATE DATABASE test_db_1; +``` CREATE TABLE test_db_1.json_test ( f_0 TINYINT, f_1 STRING) ENGINE=OLAP PRIMARY KEY(f_0) DISTRIBUTED BY HASH(f_0); +``` diff --git a/dt-tests/tests/mysql_to_starrocks/snapshot/2_5_4/basic_test/dst_prepare.sql b/dt-tests/tests/mysql_to_starrocks/snapshot/2_5_4/basic_test/dst_prepare.sql index 359767f9..052add60 100644 --- a/dt-tests/tests/mysql_to_starrocks/snapshot/2_5_4/basic_test/dst_prepare.sql +++ b/dt-tests/tests/mysql_to_starrocks/snapshot/2_5_4/basic_test/dst_prepare.sql @@ -3,6 +3,7 @@ DROP DATABASE IF EXISTS test_db_1; CREATE DATABASE test_db_1; -- BINARY, VARBINARY were NOT supported +``` CREATE TABLE test_db_1.one_pk_no_uk ( f_0 TINYINT, f_1 SMALLINT DEFAULT NULL, @@ -26,3 +27,4 @@ CREATE TABLE test_db_1.one_pk_no_uk ( f_26 VARCHAR(100), f_27 VARCHAR(100), f_28 JSON) ENGINE=OLAP PRIMARY KEY(f_0) DISTRIBUTED BY HASH(f_0) PROPERTIES ("replication_num" = "1"); +``` diff --git a/dt-tests/tests/mysql_to_starrocks/snapshot/2_5_4/basic_test/src_prepare.sql b/dt-tests/tests/mysql_to_starrocks/snapshot/2_5_4/basic_test/src_prepare.sql index e62cb105..413a0b09 100644 --- a/dt-tests/tests/mysql_to_starrocks/snapshot/2_5_4/basic_test/src_prepare.sql +++ b/dt-tests/tests/mysql_to_starrocks/snapshot/2_5_4/basic_test/src_prepare.sql @@ -2,6 +2,7 @@ DROP DATABASE IF EXISTS test_db_1; CREATE DATABASE test_db_1; +``` CREATE TABLE test_db_1.one_pk_no_uk ( f_0 tinyint, f_1 smallint DEFAULT NULL, @@ -25,4 +26,5 @@ CREATE TABLE test_db_1.one_pk_no_uk ( f_26 enum('x-small','small','medium','large','x-large') DEFAULT NULL, f_27 set('a','b','c','d','e') DEFAULT NULL, f_28 json DEFAULT NULL, - PRIMARY KEY (f_0) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; \ No newline at end of file + PRIMARY KEY (f_0) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +``` \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_starrocks/snapshot/3_2_11/basic_test/dst_prepare.sql b/dt-tests/tests/mysql_to_starrocks/snapshot/3_2_11/basic_test/dst_prepare.sql index d198afc7..d98f026b 100644 --- a/dt-tests/tests/mysql_to_starrocks/snapshot/3_2_11/basic_test/dst_prepare.sql +++ b/dt-tests/tests/mysql_to_starrocks/snapshot/3_2_11/basic_test/dst_prepare.sql @@ -2,6 +2,7 @@ DROP DATABASE IF EXISTS test_db_1; CREATE DATABASE test_db_1; +``` CREATE TABLE test_db_1.one_pk_no_uk ( f_0 TINYINT, f_1 SMALLINT DEFAULT NULL, @@ -31,3 +32,4 @@ CREATE TABLE test_db_1.one_pk_no_uk ( f_26 VARCHAR(100), f_27 VARCHAR(100), f_28 JSON) ENGINE=OLAP PRIMARY KEY(f_0) DISTRIBUTED BY HASH(f_0); +``` diff --git a/dt-tests/tests/mysql_to_starrocks/snapshot/3_2_11/basic_test/src_prepare.sql b/dt-tests/tests/mysql_to_starrocks/snapshot/3_2_11/basic_test/src_prepare.sql index f219dd43..56469dcb 100644 --- a/dt-tests/tests/mysql_to_starrocks/snapshot/3_2_11/basic_test/src_prepare.sql +++ b/dt-tests/tests/mysql_to_starrocks/snapshot/3_2_11/basic_test/src_prepare.sql @@ -5,6 +5,7 @@ CREATE DATABASE test_db_1; -- TODO, support columns: -- f_8 bit(64) DEFAULT NULL, +``` CREATE TABLE test_db_1.one_pk_no_uk ( f_0 tinyint, f_1 smallint DEFAULT NULL, @@ -34,4 +35,5 @@ CREATE TABLE test_db_1.one_pk_no_uk ( f_26 enum('x-small','small','medium','large','x-large') DEFAULT NULL, f_27 set('a','b','c','d','e') DEFAULT NULL, f_28 json DEFAULT NULL, - PRIMARY KEY (f_0) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; \ No newline at end of file + PRIMARY KEY (f_0) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +``` \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_starrocks/snapshot/3_2_11/json_test/dst_prepare.sql b/dt-tests/tests/mysql_to_starrocks/snapshot/3_2_11/json_test/dst_prepare.sql index 3ca54059..72d01018 100644 --- a/dt-tests/tests/mysql_to_starrocks/snapshot/3_2_11/json_test/dst_prepare.sql +++ b/dt-tests/tests/mysql_to_starrocks/snapshot/3_2_11/json_test/dst_prepare.sql @@ -2,6 +2,8 @@ DROP DATABASE IF EXISTS test_db_1; CREATE DATABASE test_db_1; +``` CREATE TABLE test_db_1.json_test ( f_0 TINYINT, f_1 JSON) ENGINE=OLAP PRIMARY KEY(f_0) DISTRIBUTED BY HASH(f_0); +``` diff --git a/dt-tests/tests/mysql_to_starrocks/snapshot/3_2_11/json_to_string_test/dst_prepare.sql b/dt-tests/tests/mysql_to_starrocks/snapshot/3_2_11/json_to_string_test/dst_prepare.sql index 876822fa..d113cbbc 100644 --- a/dt-tests/tests/mysql_to_starrocks/snapshot/3_2_11/json_to_string_test/dst_prepare.sql +++ b/dt-tests/tests/mysql_to_starrocks/snapshot/3_2_11/json_to_string_test/dst_prepare.sql @@ -3,6 +3,8 @@ DROP DATABASE IF EXISTS test_db_1; CREATE DATABASE test_db_1; -- STRING == varchar(65533) +``` CREATE TABLE test_db_1.json_test ( f_0 TINYINT, f_1 STRING) ENGINE=OLAP PRIMARY KEY(f_0) DISTRIBUTED BY HASH(f_0); +``` diff --git a/dt-tests/tests/pg_to_pg/cdc/ddl_test/dst_prepare.sql b/dt-tests/tests/pg_to_pg/cdc/ddl_test/dst_prepare.sql index 5f0573e6..925ebcbe 100644 --- a/dt-tests/tests/pg_to_pg/cdc/ddl_test/dst_prepare.sql +++ b/dt-tests/tests/pg_to_pg/cdc/ddl_test/dst_prepare.sql @@ -11,6 +11,10 @@ CREATE TABLE test_db_1.tb_1 ( f_0 int, f_1 int DEFAULT NULL, PRIMARY KEY (f_0) ) CREATE TABLE test_db_1.drop_tb_1 ( f_0 int, f_1 int DEFAULT NULL, PRIMARY KEY (f_0) ) ; +CREATE TABLE test_db_1.rename_tb_1 ( f_0 int, f_1 int DEFAULT NULL, PRIMARY KEY (f_0) ); + +CREATE TABLE test_db_1.rename_tb_2 ( f_0 int, f_1 int DEFAULT NULL, PRIMARY KEY (f_0) ); + CREATE TABLE test_db_1.truncate_tb_1 ( f_0 int, f_1 int DEFAULT NULL, PRIMARY KEY (f_0) ) ; -- INSERT INTO test_db_1.truncate_tb_1 VALUES (1, 1); diff --git a/dt-tests/tests/pg_to_pg/cdc/ddl_test/filtered_tbs.txt b/dt-tests/tests/pg_to_pg/cdc/ddl_test/filtered_tbs.txt index 2f654dac..df0a0bb3 100644 --- a/dt-tests/tests/pg_to_pg/cdc/ddl_test/filtered_tbs.txt +++ b/dt-tests/tests/pg_to_pg/cdc/ddl_test/filtered_tbs.txt @@ -1 +1,4 @@ -test_db_1.drop_tb_1 \ No newline at end of file +test_db_1.drop_tb_1 +test_db_1.rename_tb_1 +test_db_1.rename_tb_2 +public.ape_dts_ddl_command \ No newline at end of file diff --git a/dt-tests/tests/pg_to_pg/cdc/ddl_test/src_prepare.sql b/dt-tests/tests/pg_to_pg/cdc/ddl_test/src_prepare.sql index 5f0573e6..68732757 100644 --- a/dt-tests/tests/pg_to_pg/cdc/ddl_test/src_prepare.sql +++ b/dt-tests/tests/pg_to_pg/cdc/ddl_test/src_prepare.sql @@ -1,3 +1,95 @@ +DROP TRIGGER IF EXISTS ape_dts_intercept_ddl ON ddl_command_end; + +DROP FUNCTION IF EXISTS public.ape_dts_capture_ddl() CASCADE; + +DROP TABLE IF EXISTS public.ape_dts_ddl_command; + +``` +CREATE TABLE public.ape_dts_ddl_command +( + ddl_text text COLLATE pg_catalog."default", + id bigserial primary key, + event text COLLATE pg_catalog."default", + tag text COLLATE pg_catalog."default", + username character varying COLLATE pg_catalog."default", + database character varying COLLATE pg_catalog."default", + schema character varying COLLATE pg_catalog."default", + object_type character varying COLLATE pg_catalog."default", + object_name character varying COLLATE pg_catalog."default", + client_address character varying COLLATE pg_catalog."default", + client_port integer, + event_time timestamp with time zone, + txid_current character varying(128) COLLATE pg_catalog."default", + message text COLLATE pg_catalog."default" +); +``` + +``` +CREATE FUNCTION public.ape_dts_capture_ddl() + RETURNS event_trigger + LANGUAGE 'plpgsql' + COST 100 + VOLATILE NOT LEAKPROOF SECURITY DEFINER +AS $BODY$ + declare ddl_text text; + declare max_rows int := 10000; + declare current_rows int; + declare pg_version_95 int := 90500; + declare pg_version_10 int := 100000; + declare current_version int; + declare object_id varchar; + declare alter_table varchar; + declare record_object record; + declare message text; + declare pub RECORD; +begin + + select current_query() into ddl_text; + + if TG_TAG = 'CREATE TABLE' then -- ALTER TABLE schema.TABLE REPLICA IDENTITY FULL; + show server_version_num into current_version; + if current_version >= pg_version_95 then + for record_object in (select * from pg_event_trigger_ddl_commands()) loop + if record_object.command_tag = 'CREATE TABLE' then + object_id := record_object.object_identity; + end if; + end loop; + else + select btrim(substring(ddl_text from '[ \t\r\n\v\f]*[c|C][r|R][e|E][a|A][t|T][e|E][ \t\r\n\v\f]*.*[ \t\r\n\v\f]*[t|T][a|A][b|B][l|L][e|E][ \t\r\n\v\f]+(.*)\(.*'),' \t\r\n\v\f') into object_id; + end if; + if object_id = '' or object_id is null then + message := 'CREATE TABLE, but ddl_text=' || ddl_text || ', current_query=' || current_query(); + end if; + if current_version >= pg_version_10 then + for pub in (select * from pg_publication where pubname like 'ape_dts_%') loop + raise notice 'pubname=%',pub.pubname; + BEGIN + execute 'alter publication ' || pub.pubname || ' add table ' || object_id; + EXCEPTION WHEN OTHERS THEN + END; + end loop; + end if; + end if; + + insert into public.ape_dts_ddl_command(id,event,tag,username,database,schema,object_type,object_name,client_address,client_port,event_time,ddl_text,txid_current,message) + values (default,TG_EVENT,TG_TAG,current_user,current_database(),current_schema,'','',inet_client_addr(),inet_client_port(),current_timestamp,ddl_text,cast(TXID_CURRENT() as varchar(16)),message); + + select count(id) into current_rows from public.ape_dts_ddl_command; + if current_rows > max_rows then + delete from public.ape_dts_ddl_command where id in (select min(id) from public.ape_dts_ddl_command); + end if; +end +$BODY$; +``` + +ALTER FUNCTION public.ape_dts_capture_ddl() OWNER TO postgres; + +``` +CREATE EVENT TRIGGER ape_dts_intercept_ddl ON ddl_command_end +EXECUTE PROCEDURE public.ape_dts_capture_ddl(); +``` + +-- create test schemas and tables DROP SCHEMA IF EXISTS test_db_1 CASCADE; DROP SCHEMA IF EXISTS test_db_2 CASCADE; DROP SCHEMA IF EXISTS test_db_3 CASCADE; @@ -9,12 +101,16 @@ CREATE SCHEMA test_db_3; CREATE TABLE test_db_1.tb_1 ( f_0 int, f_1 int DEFAULT NULL, PRIMARY KEY (f_0) ) ; +CREATE TABLE test_db_1.rename_tb_1 ( f_0 int, f_1 int DEFAULT NULL, PRIMARY KEY (f_0) ); + +CREATE TABLE test_db_1.rename_tb_2 ( f_0 int, f_1 int DEFAULT NULL, PRIMARY KEY (f_0) ); + CREATE TABLE test_db_1.drop_tb_1 ( f_0 int, f_1 int DEFAULT NULL, PRIMARY KEY (f_0) ) ; CREATE TABLE test_db_1.truncate_tb_1 ( f_0 int, f_1 int DEFAULT NULL, PRIMARY KEY (f_0) ) ; --- INSERT INTO test_db_1.truncate_tb_1 VALUES (1, 1); +INSERT INTO test_db_1.truncate_tb_1 VALUES (1, 1); CREATE TABLE test_db_1.truncate_tb_2 ( f_0 int, f_1 int DEFAULT NULL, PRIMARY KEY (f_0) ) ; --- INSERT INTO test_db_1.truncate_tb_2 VALUES (1, 1); +INSERT INTO test_db_1.truncate_tb_2 VALUES (1, 1); CREATE TABLE test_db_2.truncate_tb_1 ( f_0 int, f_1 int DEFAULT NULL, PRIMARY KEY (f_0) ) ; diff --git a/dt-tests/tests/pg_to_pg/cdc/ddl_test/src_test.sql b/dt-tests/tests/pg_to_pg/cdc/ddl_test/src_test.sql index 53213731..aff8e7d6 100644 --- a/dt-tests/tests/pg_to_pg/cdc/ddl_test/src_test.sql +++ b/dt-tests/tests/pg_to_pg/cdc/ddl_test/src_test.sql @@ -17,16 +17,21 @@ TRUNCATE test_db_1.truncate_tb_1; TRUNCATE TABLE test_db_1.truncate_tb_2; -- rename table --- ALTER TABLE test_db_1.tb_1 RENAME test_db_1.tb_2; --- RENAME TABLE test_db_1.tb_2 TO test_db_1.tb_3; +ALTER TABLE test_db_1.rename_tb_1 RENAME TO dst_rename_tb_1; + +INSERT INTO test_db_1.dst_rename_tb_1 VALUES(1, 1); + +ALTER TABLE test_db_1.rename_tb_2 SET SCHEMA test_db_2; + +INSERT INTO test_db_2.rename_tb_2 VALUES(1, 1); -- drop table DROP TABLE test_db_1.drop_tb_1; --- drop database +-- drop schema DROP SCHEMA test_db_3 CASCADE; --- create database +-- create schema CREATE SCHEMA test_db_4; -- create table @@ -37,12 +42,9 @@ INSERT INTO test_db_2.tb_1 VALUES (1,1,1); -- add index ALTER TABLE test_db_2.tb_1 ADD CONSTRAINT idx_f_1 UNIQUE (f_1); --- NOT supported ddl CREATE INDEX idx_f_2 ON test_db_2.tb_1 (f_2); --- RENAME TABLE products TO products_old, products_new TO products; - --- create database with special character +-- create schema with special character CREATE SCHEMA "中文database!@$%^&*()_+"; -- create table with chinese character diff --git a/dt-tests/tests/pg_to_pg/cdc/ddl_test/task_config.ini b/dt-tests/tests/pg_to_pg/cdc/ddl_test/task_config.ini index 71e41d8c..64862402 100644 --- a/dt-tests/tests/pg_to_pg/cdc/ddl_test/task_config.ini +++ b/dt-tests/tests/pg_to_pg/cdc/ddl_test/task_config.ini @@ -2,23 +2,18 @@ db_type=pg extract_type=cdc url={pg_extractor_url} -heartbeat_interval_secs=10 start_lsn= slot_name=ape_test recreate_slot_if_exists=true -ddl_command_table=ape_dts_ddl_command +ddl_meta_tb=public.ape_dts_ddl_command pub_name=ape_dts_publication_for_all_tables heartbeat_interval_secs=1 heartbeat_tb=heartbeat_db.ape_dts_heartbeat [filter] do_dbs=test_db_1,test_db_2,test_db_3,test_db_4,"中文database!@$%^&*()_+" -do_events=insert,update,delete,ddl -ignore_dbs= -ignore_tbs= -do_tbs=public.ape_dts_ddl_command -;truncate_table, rename_table not supported -do_ddls=create_database,drop_database,alter_database,create_schema,drop_schema,alter_schema,create_table,alter_table,drop_table,create_index +do_events=insert,update,delete +do_ddls=create_database,drop_database,alter_database,create_schema,drop_schema,alter_schema,create_table,alter_table,drop_table,create_index,truncate_table,rename_table [sinker] db_type=pg diff --git a/dt-tests/tests/pg_to_pg/struct/basic_test/src_prepare.sql b/dt-tests/tests/pg_to_pg/struct/basic_test/src_prepare.sql index 08ae7642..3852a718 100644 --- a/dt-tests/tests/pg_to_pg/struct/basic_test/src_prepare.sql +++ b/dt-tests/tests/pg_to_pg/struct/basic_test/src_prepare.sql @@ -3,6 +3,7 @@ drop schema if exists struct_it_pg2pg_1 CASCADE; create schema struct_it_pg2pg_1; -- all basic column types: +``` CREATE TABLE struct_it_pg2pg_1.full_column_type ( id SERIAL PRIMARY KEY, varchar_col VARCHAR(255) NOT NULL, diff --git a/dt-tests/tests/pg_to_pg/struct/route_test/src_prepare.sql b/dt-tests/tests/pg_to_pg/struct/route_test/src_prepare.sql index 08ae7642..3223b92e 100644 --- a/dt-tests/tests/pg_to_pg/struct/route_test/src_prepare.sql +++ b/dt-tests/tests/pg_to_pg/struct/route_test/src_prepare.sql @@ -3,6 +3,7 @@ drop schema if exists struct_it_pg2pg_1 CASCADE; create schema struct_it_pg2pg_1; -- all basic column types: +``` CREATE TABLE struct_it_pg2pg_1.full_column_type ( id SERIAL PRIMARY KEY, varchar_col VARCHAR(255) NOT NULL, @@ -33,11 +34,13 @@ CREATE TABLE struct_it_pg2pg_1.full_column_type ( polygon_col POLYGON, circle_col CIRCLE ); +``` -- array column types: -- CREATE TABLE struct_it_pg2pg_1.array_table (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], citext_array CITEXT[], inet_array INET[], cidr_array CIDR[], macaddr_array MACADDR[], tsrange_array TSRANGE[], tstzrange_array TSTZRANGE[], daterange_array DATERANGE[], int4range_array INT4RANGE[],numerange_array NUMRANGE[], int8range_array INT8RANGE[], uuid_array UUID[], json_array json[], jsonb_array jsonb[], oid_array OID[], PRIMARY KEY(pk)); -- postgres 12, without: CITEXT[] +``` CREATE TABLE struct_it_pg2pg_1.array_table ( pk SERIAL, int_array INT[], @@ -63,8 +66,10 @@ CREATE TABLE struct_it_pg2pg_1.array_table ( oid_array OID[], PRIMARY KEY(pk) ); +``` -- all check types(without fk and exclude): +``` CREATE TABLE struct_it_pg2pg_1.full_constraint_type ( id SERIAL PRIMARY KEY, varchar_col VARCHAR(255) NOT NULL, @@ -72,8 +77,10 @@ CREATE TABLE struct_it_pg2pg_1.full_constraint_type ( not_null_col VARCHAR(255) NOT NULL, check_col VARCHAR(255) CHECK (char_length(check_col) > 3) ); +``` -- all index types: +``` CREATE TABLE struct_it_pg2pg_1.full_index_type ( id SERIAL PRIMARY KEY, unique_col VARCHAR(255) NOT NULL, @@ -85,6 +92,7 @@ CREATE TABLE struct_it_pg2pg_1.full_index_type ( composite_index_col2 VARCHAR(255), composite_index_col3 VARCHAR(255) ); +``` CREATE UNIQUE INDEX unique_index ON struct_it_pg2pg_1.full_index_type (unique_col); @@ -96,10 +104,12 @@ CREATE INDEX spatial_index ON struct_it_pg2pg_1.full_index_type USING gist(spati CREATE INDEX simple_index ON struct_it_pg2pg_1.full_index_type (simple_index_col); +``` CREATE INDEX composite_index ON struct_it_pg2pg_1.full_index_type ( composite_index_col1, composite_index_col2, composite_index_col3 ); +``` -- table comments: COMMENT ON TABLE struct_it_pg2pg_1.full_column_type IS 'Comment on full_column_type.'; @@ -132,11 +142,13 @@ ALTER SEQUENCE struct_it_pg2pg_1.sequence_test_2_seq_3 OWNED BY struct_it_pg2pg_ CREATE SEQUENCE struct_it_pg2pg_1.sequence_test_3_seq_2; CREATE SEQUENCE struct_it_pg2pg_1."sequence_test_3_seq.\d@_3"; +``` CREATE TABLE struct_it_pg2pg_1.sequence_test_3 ( seq_1 SERIAL, seq_2 BIGINT DEFAULT nextval('struct_it_pg2pg_1.sequence_test_3_seq_2'), seq_3 SMALLINT DEFAULT nextval('struct_it_pg2pg_1."sequence_test_3_seq.\d@_3"') ); +``` -- case 4: create independent sequences and never used by any tables -- we should not migrate them diff --git a/dt-tests/tests/test_runner/base_test_runner.rs b/dt-tests/tests/test_runner/base_test_runner.rs index d4e22540..dd5ec61a 100644 --- a/dt-tests/tests/test_runner/base_test_runner.rs +++ b/dt-tests/tests/test_runner/base_test_runner.rs @@ -1,7 +1,4 @@ -use dt_common::{ - config::{config_enums::DbType, task_config::TaskConfig}, - utils::time_util::TimeUtil, -}; +use dt_common::{config::task_config::TaskConfig, utils::time_util::TimeUtil}; use dt_connector::data_marker::DataMarker; use dt_task::task_runner::TaskRunner; use std::{ @@ -45,8 +42,6 @@ impl BaseTestRunner { // update extractor / sinker urls from .env TestConfigUtil::update_task_config_from_env(&dst_task_config_file, &dst_task_config_file); - let config = TaskConfig::new(&dst_task_config_file).unwrap(); - let ( src_test_sqls, dst_test_sqls, @@ -55,11 +50,7 @@ impl BaseTestRunner { src_clean_sqls, dst_clean_sqls, meta_center_prepare_sqls, - ) = Self::load_sqls( - &test_dir, - &config.extractor_basic.db_type, - &config.sinker_basic.db_type, - ); + ) = Self::load_sqls(&test_dir); Ok(Self { task_config_file: dst_task_config_file, @@ -141,8 +132,6 @@ impl BaseTestRunner { fn load_sqls( test_dir: &str, - src_db_type: &DbType, - dst_db_type: &DbType, ) -> ( Vec, Vec, @@ -152,74 +141,70 @@ impl BaseTestRunner { Vec, Vec, ) { - let load = |sql_file: &str, db_type: &DbType| -> Vec { + let load = |sql_file: &str| -> Vec { let full_sql_path = format!("{}/{}", test_dir, sql_file); if !Self::check_path_exists(&full_sql_path) { return Vec::new(); } - - match db_type { - DbType::Mysql | DbType::Pg | DbType::Foxlake | DbType::StarRocks => { - Self::load_rdb_sqls(&full_sql_path) - } - _ => Self::load_non_rdb_sqls(&full_sql_path), - } + Self::load_sql_file(&full_sql_path) }; ( - load("src_test.sql", src_db_type), - load("dst_test.sql", dst_db_type), - load("src_prepare.sql", src_db_type), - load("dst_prepare.sql", dst_db_type), - load("src_clean.sql", src_db_type), - load("dst_clean.sql", dst_db_type), - load("meta_center_prepare.sql", src_db_type), + load("src_test.sql"), + load("dst_test.sql"), + load("src_prepare.sql"), + load("dst_prepare.sql"), + load("src_clean.sql"), + load("dst_clean.sql"), + load("meta_center_prepare.sql"), ) } - fn load_non_rdb_sqls(sql_file: &str) -> Vec { + fn load_sql_file(sql_file: &str) -> Vec { + /* sqls content E.g. : + -- this is comment + + select * from db_1.tb_1; -- a sql in single line + + ``` + -- a sql in multiple lines + select * + from + db_1.tb_1; + ``` + */ let mut sqls = Vec::new(); - let mut lines = Self::load_file(&sql_file); - for line in lines.drain(..) { + let mut in_block = false; + let mut multi_line_sql = String::new(); + + for line in Self::load_file(&sql_file).iter() { let line = line.trim(); if line.is_empty() || line.starts_with("--") { continue; } - sqls.push(line.to_string()); - } - sqls - } - fn load_rdb_sqls(sql_file: &str) -> Vec { - let mut sqls = Vec::new(); - let sql_start_keywords = vec![ - "create ", - "drop ", - "alter ", - "insert ", - "update ", - "delete ", - "comment ", - "truncate ", - "select ", - ]; - let mut lines = Self::load_file(&sql_file); - for line in lines.drain(..) { - let check_line = line.trim().to_lowercase(); - if check_line.is_empty() || check_line.starts_with("--") { + if line.starts_with("```") { + if in_block { + // block end + in_block = false; + if !multi_line_sql.is_empty() { + // pop the last '\n' + multi_line_sql.pop(); + sqls.push(multi_line_sql.clone()); + } + } else { + // block start + in_block = true; + multi_line_sql.clear(); + } continue; } - if sql_start_keywords - .iter() - .any(|keyword| -> bool { check_line.starts_with(keyword) }) - { - sqls.push(line); + if in_block { + multi_line_sql.push_str(&line); + multi_line_sql.push('\n'); } else { - if let Some(last_sql) = sqls.last_mut() { - last_sql.push_str("\n"); - last_sql.push_str(&line); - } + sqls.push(line.into()); } } sqls