Skip to content

Commit

Permalink
Merge pull request #264 from apecloud/main
Browse files Browse the repository at this point in the history
enable ddl in Postgres CDC task
  • Loading branch information
qianyiwen2019 authored Nov 12, 2024
2 parents 2844aa1 + 9d10ed4 commit 9d01fd6
Show file tree
Hide file tree
Showing 67 changed files with 755 additions and 189 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ Refer to [test docs](./dt-tests/README.md) for details.

- Image size

| ape_dts:2.0.2 | debezium/connect:2.7 |
| ape_dts:2.0.3 | debezium/connect:2.7 |
| :-------- | :-------- |
| 86.4 MB | 1.38 GB |

Expand Down
2 changes: 1 addition & 1 deletion README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@

- 镜像对比

| ape_dts:2.0.2 | debezium/connect:2.7 |
| ape_dts:2.0.3 | debezium/connect:2.7 |
| :-------- | :-------- |
| 86.4 MB | 1.38 GB |

Expand Down
2 changes: 2 additions & 0 deletions docs/en/tutorial/mongo_to_mongo.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
# Prerequisites
- [prerequisites](./prerequisites.md)

- This article is for quick start, refer to [templates](/docs/templates/mongo_to_mongo.md) and [common configs](/docs/en/config.md) for more details.

# Prepare Mongo instances

## Source
Expand Down
5 changes: 3 additions & 2 deletions docs/en/tutorial/mysql_to_http_server_consumer.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# Start as HTTP server and extract MySQL data

Refer to [Start ape_dts as an HTTP server to provide data to consumers](/docs/en/consumer/http_consumer.md) for details.
Refer to [Start ape_dts as HTTP server](/docs/en/consumer/http_consumer.md) to provide data to consumers for details.

# Prerequisites
- [prerequisites](./prerequisites.md)
- python3

- This article is for quick start, refer to [templates](/docs/templates/rdb_to_http_server.md) and [common configs](/docs/en/config.md) for more details.

# Prepare MySQL instance
Refer to [mysql to mysql](./mysql_to_mysql.md)
Expand Down
5 changes: 3 additions & 2 deletions docs/en/tutorial/mysql_to_kafka_consumer.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# Send MySQL data to Kafka

Refer to [Send data to Kafka for consumers](/docs/en/consumer/kafka_consumer.md)
Refer to [Send data to Kafka](/docs/en/consumer/kafka_consumer.md) for consumers.

# Prerequisites
- [prerequisites](./prerequisites.md)
- python3

- This article is for quick start, refer to [templates](/docs/templates/rdb_to_kafka.md) and [common configs](/docs/en/config.md) for more details.

# Prepare MySQL instance
Refer to [mysql to mysql](./mysql_to_mysql.md)
Expand Down
63 changes: 63 additions & 0 deletions docs/en/tutorial/mysql_to_mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
# Prerequisites
- [prerequisites](./prerequisites.md)

- This article is for quick start, refer to [templates](/docs/templates/mysql_to_mysql.md) and [common configs](/docs/en/config.md) for more details.

# Prepare MySQL instances

## Source
Expand Down Expand Up @@ -440,4 +442,65 @@ SELECT * FROM test_db.tb_1;
| 5 | 5 |
| 6 | 6 |
+----+---------+
```

# CDC task with ddl capture

## Start task
```
cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
db_type=mysql
extract_type=cdc
server_id=2000
url=mysql://root:[email protected]:3307?ssl-mode=disabled
[filter]
do_dbs=test_db
do_events=insert,update,delete
do_ddls=create_database,drop_database,alter_database,create_table,alter_table,drop_table,create_index,drop_index,truncate_table,rename_table
[sinker]
db_type=mysql
sink_type=write
batch_size=200
url=mysql://root:[email protected]:3308?ssl-mode=disabled
[parallelizer]
parallel_type=rdb_merge
parallel_size=8
[pipeline]
buffer_size=16000
checkpoint_interval_secs=1
EOL
```

```
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
"$APE_DTS_IMAGE" /task_config.ini
```

## Do ddls in source
```
mysql -h127.0.0.1 -uroot -p123456 -uroot -P3307
CREATE TABLE test_db.tb_2(id int, value int, primary key(id));
INSERT INTO test_db.tb_2 VALUES(1,1);
```

## Check results
```
mysql -h127.0.0.1 -uroot -p123456 -uroot -P3308
SELECT * FROM test_db.tb_2;
```

```
+----+-------+
| id | value |
+----+-------+
| 1 | 1 |
+----+-------+
```
2 changes: 2 additions & 0 deletions docs/en/tutorial/mysql_to_starrocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
# Prerequisites
- [prerequisites](./prerequisites.md)

- This article is for quick start, refer to [templates](/docs/templates/mysql_to_starrocks.md) and [common configs](/docs/en/config.md) for more details.

# Prepare MySQL instance
Refer to [mysql to mysql](./mysql_to_mysql.md)

Expand Down
5 changes: 3 additions & 2 deletions docs/en/tutorial/pg_to_http_server_consumer.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# Start as HTTP server and extract Postgres data

Refer to [Start ape_dts as an HTTP server to provide data to consumers](/docs/en/consumer/http_consumer.md) for details.
Refer to [Start ape_dts as HTTP server](/docs/en/consumer/http_consumer.md) to provide data to consumers for details.

# Prerequisites
- [prerequisites](./prerequisites.md)
- python3

- This article is for quick start, refer to [templates](/docs/templates/rdb_to_http_server.md) and [common configs](/docs/en/config.md) for more details.

# Prepare Postgres instances
Refer to [pg to pg](./pg_to_pg.md)
Expand Down
5 changes: 3 additions & 2 deletions docs/en/tutorial/pg_to_kafka_consumer.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# Send Postgres data to Kafka

Refer to [Send data to Kafka for consumers](/docs/en/consumer/kafka_consumer.md)
Refer to [Send data to Kafka](/docs/en/consumer/kafka_consumer.md) for consumers.

# Prerequisites
- [prerequisites](./prerequisites.md)
- python3

- This article is for quick start, refer to [templates](/docs/templates/rdb_to_kafka.md) and [common configs](/docs/en/config.md) for more details.

# Prepare Postgres instance
Refer to [pg to pg](./pg_to_pg.md)
Expand Down
193 changes: 176 additions & 17 deletions docs/en/tutorial/pg_to_pg.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
# Prerequisites
- [prerequisites](./prerequisites.md)

- This article is for quick start, refer to [templates](/docs/templates/pg_to_pg.md) and [common configs](/docs/en/config.md) for more details.

# Prepare Postgres instances

## Source
Expand Down Expand Up @@ -252,14 +254,12 @@ SELECT * FROM test_db.tb_1 ORDER BY id;
```

```
+----+-------+
| id | value |
+----+-------+
| 1 | 1 |
| 2 | 2 |
| 3 | 3 |
| 4 | 4 |
+----+-------+
id | value
----+-------
1 | 1
2 | 2
3 | 3
4 | 4
```

# Review data
Expand Down Expand Up @@ -303,9 +303,17 @@ docker run --rm --network host \
## Check results
- /tmp/ape_dts/review_data_task_log/check/miss.log and /tmp/ape_dts/review_data_task_log/check/diff.log should be empty

# Cdc task
# CDC task

## Drop slot if exists
```
SELECT pg_drop_replication_slot('ape_test') FROM pg_replication_slots WHERE slot_name = 'ape_test';
```

## Start task

- this will create slot if not exists.

```
cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
Expand Down Expand Up @@ -357,12 +365,163 @@ SELECT * FROM test_db.tb_1 ORDER BY id;
```

```
+----+---------+
| id | value |
+----+---------+
| 2 | 2000000 |
| 3 | 3 |
| 4 | 4 |
| 5 | 5 |
+----+---------+
id | value
----+---------
2 | 2000000
3 | 3
4 | 4
5 | 5
```

# CDC task with ddl capture

## Enable ddl capture in source

- Create a meta table to store ddl info
```
CREATE TABLE public.ape_dts_ddl_command
(
ddl_text text COLLATE pg_catalog."default",
id bigserial primary key,
event text COLLATE pg_catalog."default",
tag text COLLATE pg_catalog."default",
username character varying COLLATE pg_catalog."default",
database character varying COLLATE pg_catalog."default",
schema character varying COLLATE pg_catalog."default",
object_type character varying COLLATE pg_catalog."default",
object_name character varying COLLATE pg_catalog."default",
client_address character varying COLLATE pg_catalog."default",
client_port integer,
event_time timestamp with time zone,
txid_current character varying(128) COLLATE pg_catalog."default",
message text COLLATE pg_catalog."default"
);
```

- Create a function to capture ddl and record it into ddl meta table
```
CREATE FUNCTION public.ape_dts_capture_ddl()
RETURNS event_trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF SECURITY DEFINER
AS $BODY$
declare ddl_text text;
declare max_rows int := 10000;
declare current_rows int;
declare pg_version_95 int := 90500;
declare pg_version_10 int := 100000;
declare current_version int;
declare object_id varchar;
declare alter_table varchar;
declare record_object record;
declare message text;
declare pub RECORD;
begin
select current_query() into ddl_text;
if TG_TAG = 'CREATE TABLE' then -- ALTER TABLE schema.TABLE REPLICA IDENTITY FULL;
show server_version_num into current_version;
if current_version >= pg_version_95 then
for record_object in (select * from pg_event_trigger_ddl_commands()) loop
if record_object.command_tag = 'CREATE TABLE' then
object_id := record_object.object_identity;
end if;
end loop;
else
select btrim(substring(ddl_text from '[ \t\r\n\v\f]*[c|C][r|R][e|E][a|A][t|T][e|E][ \t\r\n\v\f]*.*[ \t\r\n\v\f]*[t|T][a|A][b|B][l|L][e|E][ \t\r\n\v\f]+(.*)\(.*'),' \t\r\n\v\f') into object_id;
end if;
if object_id = '' or object_id is null then
message := 'CREATE TABLE, but ddl_text=' || ddl_text || ', current_query=' || current_query();
end if;
if current_version >= pg_version_10 then
for pub in (select * from pg_publication where pubname like 'ape_dts_%') loop
raise notice 'pubname=%',pub.pubname;
BEGIN
execute 'alter publication ' || pub.pubname || ' add table ' || object_id;
EXCEPTION WHEN OTHERS THEN
END;
end loop;
end if;
end if;
insert into public.ape_dts_ddl_command(id,event,tag,username,database,schema,object_type,object_name,client_address,client_port,event_time,ddl_text,txid_current,message)
values (default,TG_EVENT,TG_TAG,current_user,current_database(),current_schema,'','',inet_client_addr(),inet_client_port(),current_timestamp,ddl_text,cast(TXID_CURRENT() as varchar(16)),message);
select count(id) into current_rows from public.ape_dts_ddl_command;
if current_rows > max_rows then
delete from public.ape_dts_ddl_command where id in (select min(id) from public.ape_dts_ddl_command);
end if;
end
$BODY$;
```

- Alter the function owner to your account
```
ALTER FUNCTION public.ape_dts_capture_ddl() OWNER TO postgres;
```

- Create an event trigger on ddl_command_end and execute the capture function
```
CREATE EVENT TRIGGER ape_dts_intercept_ddl ON ddl_command_end
EXECUTE PROCEDURE public.ape_dts_capture_ddl();
```

## Start task
```
cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
db_type=pg
extract_type=cdc
url=postgres://postgres:[email protected]:5433/postgres?options[statement_timeout]=10s
slot_name=ape_test
ddl_meta_tb=public.ape_dts_ddl_command
[filter]
do_dbs=test_db
do_events=insert,update,delete
do_ddls=create_schema,drop_schema,alter_schema,create_table,alter_table,drop_table,create_index,drop_index,truncate_table,rename_table
[sinker]
db_type=pg
sink_type=write
batch_size=200
url=postgres://postgres:[email protected]:5434/postgres?options[statement_timeout]=10s
[parallelizer]
parallel_type=rdb_merge
parallel_size=8
[pipeline]
buffer_size=16000
checkpoint_interval_secs=1
EOL
```

```
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
"$APE_DTS_IMAGE" /task_config.ini
```

## Do ddls in source
```
psql -h 127.0.0.1 -U postgres -d postgres -p 5433 -W
CREATE TABLE test_db.tb_2(id int, value int, primary key(id));
INSERT INTO test_db.tb_2 VALUES(1,1);
```

## Check results
```
psql -h 127.0.0.1 -U postgres -d postgres -p 5434 -W
SELECT * FROM test_db.tb_2 ORDER BY id;
```

```
id | value
----+-------
1 | 1
```
2 changes: 1 addition & 1 deletion docs/en/tutorial/prerequisites.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

- Set images
```
export APE_DTS_IMAGE="apecloud-registry.cn-zhangjiakou.cr.aliyuncs.com/apecloud/ape-dts:2.0.2"
export APE_DTS_IMAGE="apecloud-registry.cn-zhangjiakou.cr.aliyuncs.com/apecloud/ape-dts:2.0.3"
export MYSQL_IMAGE="mysql:5.7.40"
export POSTGRES_IMAGE="postgis/postgis:15-3.4"
export REDIS_IMAGE="redis:7.0"
Expand Down
Loading

0 comments on commit 9d01fd6

Please sign in to comment.