Skip to content

Commit

Permalink
Add support for table partition rerouting (#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
teddyphreak authored Jun 23, 2023
1 parent 52e02bf commit 7d4865b
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 29 deletions.
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# nephelaiio.dataplane

[![Build Status](https://github.com/nephelaiio/helm-dataplane/workflows/molecule/badge.svg)](https://github.com/nephelaiio/helm-dataplane/actions)

A helm chart to deploy a CDC replication stack integrating the following components
* Strimzi Kafka Broker
* Zalando PostgreSQL Data Warehouse
Expand Down
4 changes: 2 additions & 2 deletions charts/dataplane/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.11
version: 0.1.12

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: 0.1.11
appVersion: 0.1.12
54 changes: 52 additions & 2 deletions charts/dataplane/templates/strimzi/connector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
{{- $registryPort := .Values.registry.service.port -}}

{{- range $db := .Values.cdc.postgres }}
{{- $dbConnectorName := lower (printf "%s-%s-%s" $fullName $db.servername $db.dbname) }}
{{- $dbSlotName := $dbConnectorName | replace "-" "_" }}
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: {{ $fullName }}-{{ lower $db.servername }}-{{ lower $db.dbname }}
name: {{ $dbConnectorName }}
labels:
strimzi.io/cluster: {{ $fullName }}
spec:
Expand All @@ -21,7 +23,12 @@ spec:
database.server.name: "{{ $db.servername }}"
database.dbname: "{{ $db.dbname }}"
snapshot.new.tables: parallel
slot.name: "{{ $dbSlotName }}"
plugin.name: pgoutput
tasks.max: "1"
{{- if and (hasKey $db "partitions") (ge (len $db.partitions) 1) }}
table.exclude.list: "{{- range $i, $p := $db.partitions -}}{{- if $i }},{{- end -}}{{ $p.source }}{{- end }}"
{{- end }}
key.converter: io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url: http://{{ $registryName }}:{{ $registryPort }}
value.converter: io.confluent.connect.avro.AvroConverter
Expand All @@ -33,7 +40,50 @@ spec:
transforms.reroute.type: io.debezium.transforms.ByLogicalTableRouter
transforms.reroute.topic.regex: (.*)
transforms.reroute.topic.replacement: cdc.$1

{{- if and (hasKey $db "partitions") (ge (len $db.partitions) 1) }}
{{- range $partition := $db.partitions }}
{{- $partitionSource := required "partition source is required" $partition.source }}
{{- $partitionSink := required "partition sink is required" $partition.sink }}
{{- $partitionConnectorName := lower (printf "%s-%s" $dbConnectorName $partitionSink) | replace "." "-" }}
{{- $partitionSlotName := $partitionConnectorName | replace "-" "_" }}
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: {{ $partitionConnectorName }}
labels:
strimzi.io/cluster: {{ $fullName }}
spec:
class: io.debezium.connector.postgresql.PostgresConnector
tasksMax: 1
config:
database.hostname: {{ $db.hostname }}
database.port: "{{ $db.port | default 5432 }}"
database.user: "${env:POSTGRES_CDC_USER}"
database.password: "${env:POSTGRES_CDC_PASS}"
database.server.name: "{{ $db.servername }}"
database.dbname: "{{ $db.dbname }}"
snapshot.new.tables: parallel
slot.name: {{ $partitionSlotName }}
plugin.name: pgoutput
tasks.max: "1"
table.include.list: "{{ $partitionSource }}"
key.converter: io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url: http://{{ $registryName }}:{{ $registryPort }}
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: http://{{ $registryName }}:{{ $registryPort }}
{{- if $db.signaling }}
signal.data.collection: debezium.signaling
{{- end }}
transforms: reroute
transforms.reroute.type: io.debezium.transforms.ByLogicalTableRouter
transforms.reroute.topic.regex: "^([^\\.]+)\\.([^\\.]+)\\..*"
transforms.reroute.topic.replacement: "cdc.$1.$2.{{ $partitionSink }}"
{{- end }}
{{- end }}
{{- end }}

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
Expand All @@ -60,5 +110,5 @@ spec:
transforms.unwrap.drop.tombstones: false
transforms.unwrap.delete.handling.mode: rewrite
transforms.rename.type: org.apache.kafka.connect.transforms.RegexRouter
transforms.rename.regex: "^cdc\\.(.*)\\.(.*)\\.(.*)"
transforms.rename.regex: "^cdc\\.([^\\.]*)\\.([^\\.]*)\\.([^\\.]*)"
transforms.rename.replacement: "$1_$3"
6 changes: 3 additions & 3 deletions charts/dataplane/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ util:
image:
repository: nephelaiio/dataplane-util
pullPolicy: IfNotPresent
tag: dataplane-0.1.11
tag: dataplane-0.1.12
resources: {}

cdc:
Expand Down Expand Up @@ -45,7 +45,7 @@ strimzi:
connect:
image:
repository: nephelaiio/dataplane-connect
tag: dataplane-0.1.11
tag: dataplane-0.1.12
replicas: 1
config:
group.id: connect-cluster
Expand Down Expand Up @@ -100,7 +100,7 @@ metabase:
image:
repository: nephelaiio/dataplane-util
pullPolicy: IfNotPresent
tag: dataplane-0.1.11
tag: dataplane-0.1.12
securityContext: {}

image:
Expand Down
55 changes: 43 additions & 12 deletions molecule/default/converge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,49 @@
- (cluster_status | length) != (cluster_query | length)
- cluster_failed | length > 0

- name: query pagila service data
ansible.builtin.set_fact:
pagila_host: "{{ _db_svc_data.status.loadBalancer.ingress[0].ip }}"
vars:
_db_svc_name: "{{ dataplane_pagila_team }}-{{ dataplane_pagila_db }}"
_db_svc_data: "{{ service_query | selectattr('metadata.name', 'equalto', _db_svc_name) | first }}"
service_query: "{{ query(
'kubernetes.core.k8s',
namespace=dataplane_pagila_namespace,
kind='Service',
kubeconfig=k8s_kubeconfig) }}"

- name: query pagila owner data
ansible.builtin.set_fact:
pagila_owner_user: "{{ _db_secret_data.data.username | b64decode }}"
pagila_owner_pass: "{{ _db_secret_data.data.password | b64decode }}"
vars:
_db_secret_name: "zalando-{{ dataplane_pagila_team }}-{{ dataplane_pagila_db }}"
_db_secret_data: "{{ secret_query | selectattr('metadata.name', 'equalto', _db_secret_name) | first }}"
secret_query: "{{ query(
'kubernetes.core.k8s',
namespace=dataplane_pagila_namespace,
kind='Secret',
kubeconfig=k8s_kubeconfig) }}"

- name: query pagila publications
community.postgresql.postgresql_query:
db: "{{ dataplane_pagila_db }}"
query: "SELECT * FROM pg_publication where pubname = 'dbz_publication'"
login_user: "{{ pagila_owner_user }}"
login_password: "{{ pagila_owner_pass }}"
login_host: "{{ pagila_host }}"
register: pagila_publication_query

- name: create pagila publication
community.postgresql.postgresql_query:
db: "{{ dataplane_pagila_db }}"
query: "CREATE PUBLICATION dbz_publication FOR ALL TABLES"
login_user: "{{ pagila_owner_user }}"
login_password: "{{ pagila_owner_pass }}"
login_host: "{{ pagila_host }}"
when: pagila_publication_query.query_all_results | flatten | length == 0

- name: deploy dataplane helm chart
kubernetes.core.helm:
state: present
Expand Down Expand Up @@ -144,18 +187,6 @@
kind='Secret',
kubeconfig=k8s_kubeconfig) }}"

- name: query pagila service data
ansible.builtin.set_fact:
pagila_host: "{{ _db_svc_data.status.loadBalancer.ingress[0].ip }}"
vars:
_db_svc_name: "{{ dataplane_pagila_team }}-{{ dataplane_pagila_db }}"
_db_svc_data: "{{ service_query | selectattr('metadata.name', 'equalto', _db_svc_name) | first }}"
service_query: "{{ query(
'kubernetes.core.k8s',
namespace=dataplane_pagila_namespace,
kind='Service',
kubeconfig=k8s_kubeconfig) }}"

- name: wait for database port open
ansible.builtin.wait_for:
host: "{{ pagila_host }}"
Expand Down
4 changes: 2 additions & 2 deletions molecule/default/molecule.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ provisioner:
dbname: "{{ dataplane_pagila_db }}"
signaling: "{{ dataplane_pagila_signaling }}"
partitions:
- source: "payments.*"
sink: "payments"
- source: "public.payment.*"
sink: "payment"
strimzi:
connect:
image:
Expand Down
11 changes: 6 additions & 5 deletions molecule/default/verify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@

- name: record source table data
ansible.builtin.set_fact:
pagila_tables: "{{ _table_names | difference(['payment']) | list }}"
pagila_all_tables: "{{ _table_names | list }}"
pagila_target_tables: "{{ _table_names | reject('regex', '^payment_.*') }}"
vars:
_table_names: "{{ pagila_table_query.query_result | map(attribute='table_name') }}"

Expand All @@ -72,7 +73,7 @@
login_user: "{{ pagila_user }}"
login_password: "{{ pagila_pass }}"
login_host: "{{ pagila_host }}"
loop: "{{ pagila_tables }}"
loop: "{{ pagila_all_tables }}"
register: pagila_data_query

- name: record pagila table data
Expand Down Expand Up @@ -117,7 +118,7 @@
login_password: "{{ warehouse_pass }}"
login_host: "{{ warehouse_host }}"
vars:
warehouse_tables_expected: "{{ pagila_tables | map('map_format', 'pagila_%s') | list }}"
warehouse_tables_expected: "{{ pagila_target_tables | map('map_format', 'pagila_%s') | list }}"
warehouse_tables_found: "{{ warehouse_table_query.query_result | map(attribute='table_name') | list }}"
register: warehouse_table_query
retries: 20
Expand All @@ -129,7 +130,7 @@
ansible.builtin.debug:
msg: "table diff=[{{ ', '.join(warehouse_tables_diff) }}]"
vars:
warehouse_tables_expected: "{{ pagila_tables | map('map_format', 'pagila_%s') | list }}"
warehouse_tables_expected: "{{ pagila_target_tables| map('map_format', 'pagila_%s') | list }}"
warehouse_tables_found: "{{ warehouse_table_query.query_result | map(attribute='table_name') | list }}"
warehouse_tables_diff: "{{ warehouse_tables_expected | difference(warehouse_tables_found) }}"

Expand Down Expand Up @@ -168,7 +169,7 @@
source_table_records: "{{ (pagila_data[source_table].query_result | list)[0].count }}"
loop_control:
label: "{{ source_table }}"
loop: "{{ (pagila_data.keys() | list) }}"
loop: "{{ (pagila_target_tables | list) }}"
when: source_table_records > warehouse_table_records

- name: verify metabase app database
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "dataplane"
version = "0.1.11"
version = "0.1.12"
description = ""
authors = ["Teodoro Cook <[email protected]>"]

Expand Down

0 comments on commit 7d4865b

Please sign in to comment.