Skip to content

Commit

Permalink
Add support for table inclusion (#40)
Browse files Browse the repository at this point in the history
* Add support for table inclusion

* Update connect image components
  • Loading branch information
teddyphreak authored Jun 28, 2023
1 parent edb7b41 commit be979eb
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 21 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ wait:
strimzi: strimzi-topics

strimzi-topics:
make --no-print-directory kubectl exec -- -it pod/$(DATAPLANE_CHART)-strimzi-kafka-0 -n $(DATAPLANE_NS) -- "/opt/kafka/bin/kafka-topics.sh --bootstrap-server $(DATAPLANE_CHART)-strimzi-kafka-bootstrap:9092 --list"
make --no-print-directory kubectl exec -- \
-it pod/$(DATAPLANE_CHART)-strimzi-kafka-0 \
-n $(DATAPLANE_NS) -- \
"/opt/kafka/bin/kafka-topics.sh --bootstrap-server $(DATAPLANE_CHART)-strimzi-kafka-bootstrap:9092 --list"

strimzi-connectors:
@make --no-print-directory kubectl exec -- -it svc/$(DATAPLANE_CHART)-connect-api -n $(DATAPLANE_NS) -- \
Expand Down
4 changes: 2 additions & 2 deletions charts/dataplane/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.13
version: 0.1.14

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: 0.1.13
appVersion: 0.1.14
20 changes: 16 additions & 4 deletions charts/dataplane/templates/strimzi/connector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
{{- fail "table partitions and includes are mutually exclusive" }}
{{- end }}


{{- if hasKey $db "exclude" }}
{{- if ge (len $db.exclude) 1 }}
{{- range $item := $db.exclude }}
Expand All @@ -26,6 +25,14 @@
{{- end }}
{{- end }}

{{- if hasKey $db "include" }}
{{- if ge (len $db.include) 1 }}
{{- range $item := $db.include }}
{{- $include = append $include $item }}
{{- end }}
{{- end }}
{{- end }}

{{- if hasKey $db "partitions" }}
{{- if ge (len $db.partitions) 1 }}
{{- range $item := $db.partitions }}
Expand All @@ -49,22 +56,26 @@ spec:
database.port: "{{ $db.port | default 5432 }}"
database.user: "${env:POSTGRES_CDC_USER}"
database.password: "${env:POSTGRES_CDC_PASS}"
database.server.name: "{{ $dbId }}"
database.dbname: "{{ $db.dbname }}"
database.server.name: "{{ $dbId }}"
snapshot.new.tables: parallel
slot.name: "{{ $dbSlotName }}"
plugin.name: pgoutput
tasks.max: "1"
{{- if ge (len $exclude) 1 }}
table.exclude.list: "{{- range $i, $p := $exclude -}}{{- if $i }},{{- end -}}{{ $p }}{{- end }}"
{{- end }}
{{- if ge (len $include) 1 }}
table.include.list: "{{- range $i, $p := $include -}}{{- if $i }},{{- end -}}{{ $p }}{{- end }}"
{{- end }}
key.converter: io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url: http://{{ $registryName }}:{{ $registryPort }}
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: http://{{ $registryName }}:{{ $registryPort }}
{{- if $db.signaling }}
signal.data.collection: debezium.signaling
{{- end }}
topic.prefix: "{{ $dbId }}"
transforms: reroute
transforms.reroute.type: io.debezium.transforms.ByLogicalTableRouter
transforms.reroute.topic.regex: (.*)
Expand Down Expand Up @@ -92,8 +103,8 @@ spec:
database.port: "{{ $db.port | default 5432 }}"
database.user: "${env:POSTGRES_CDC_USER}"
database.password: "${env:POSTGRES_CDC_PASS}"
database.server.name: "{{ $dbId }}"
database.dbname: "{{ $db.dbname }}"
database.server.name: "{{ $dbId }}-{{ $partition.sink }}"
snapshot.new.tables: parallel
slot.name: {{ $partitionSlotName }}
plugin.name: pgoutput
Expand All @@ -106,10 +117,11 @@ spec:
{{- if $db.signaling }}
signal.data.collection: debezium.signaling
{{- end }}
topic.prefix: "{{ $dbId }}"
transforms: reroute
transforms.reroute.type: io.debezium.transforms.ByLogicalTableRouter
transforms.reroute.topic.regex: "^([^\\.]+)\\.([^\\.]+)\\..*"
transforms.reroute.topic.replacement: "cdc.$1.$2.{{ $partitionSink }}"
transforms.reroute.topic.replacement: "cdc.{{ $dbId }}.$2.{{ $partitionSink }}"
{{- end }}
{{- end }}
{{- end }}
Expand Down
6 changes: 3 additions & 3 deletions charts/dataplane/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ util:
image:
repository: nephelaiio/dataplane-util
pullPolicy: IfNotPresent
tag: dataplane-0.1.13
tag: dataplane-0.1.14
resources: {}

cdc:
Expand Down Expand Up @@ -45,7 +45,7 @@ strimzi:
connect:
image:
repository: nephelaiio/dataplane-connect
tag: dataplane-0.1.13
tag: dataplane-0.1.14
replicas: 1
config:
group.id: connect-cluster
Expand Down Expand Up @@ -100,7 +100,7 @@ metabase:
image:
repository: nephelaiio/dataplane-util
pullPolicy: IfNotPresent
tag: dataplane-0.1.13
tag: dataplane-0.1.14
securityContext: {}

image:
Expand Down
6 changes: 3 additions & 3 deletions connect/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM confluentinc/cp-kafka-connect:7.2.2 as cp
RUN confluent-hub install --no-prompt debezium/debezium-connector-postgresql:1.9.6
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:7.2.2
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.6.0
RUN confluent-hub install --no-prompt debezium/debezium-connector-postgresql:2.2.1
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:7.4.0
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.7.3

FROM quay.io/strimzi/kafka:latest-kafka-3.2.3
ENV KAFKA_CONNECT_PLUGIN_PATH=/home/kafka/connect-plugins
Expand Down
7 changes: 7 additions & 0 deletions molecule/default/molecule.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ provisioner:
partitions:
- source: "public.payment.*"
sink: "payment"
- hostname: "{{ dataplane_pagila_team }}-{{ dataplane_pagila_db }}"
port: 5432
id: include
dbname: "{{ dataplane_pagila_db }}"
signaling: "{{ dataplane_pagila_signaling }}"
include:
- "public.staff"
strimzi:
connect:
image:
Expand Down
43 changes: 36 additions & 7 deletions molecule/default/verify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@

- name: verify sink database table names
block:
- name: verify warehouse tables
- name: verify warehouse pagila tables
community.postgresql.postgresql_query:
db: warehouse
query: |
Expand All @@ -119,23 +119,38 @@
login_host: "{{ warehouse_host }}"
vars:
warehouse_tables_expected: "{{ pagila_target_tables | map('map_format', 'pagila_public_%s') | list }}"
warehouse_tables_pagila: "{{ warehouse_tables_found | select('match', '^pagila_.*') }}"
warehouse_tables_include: "{{ warehouse_tables_found | select('match', '^include_.*') }}"
warehouse_tables_found: "{{ warehouse_table_query.query_result | map(attribute='table_name') | list }}"
register: warehouse_table_query
retries: 20
delay: 30
until:
- warehouse_tables_expected | difference(warehouse_tables_found) | length == 0
- warehouse_tables_found | difference(warehouse_tables_expected) | length == 0
- warehouse_tables_expected | difference(warehouse_tables_pagila) | length == 0
- warehouse_tables_pagila | difference(warehouse_tables_expected) | length == 0
- warehouse_tables_include == ['include_public_staff']

rescue:
- name: debug table mismatches
- name: debug pagila table mismatches
ansible.builtin.debug:
msg: "table diff=[{{ ', '.join(warehouse_tables_diff) }}]"
vars:
warehouse_tables_expected: "{{ pagila_target_tables| map('map_format', 'pagila_public_%s') | list }}"
warehouse_tables_pagila: "{{ warehouse_tables_found | select('match', '^pagila_.*') }}"
warehouse_tables_found: "{{ warehouse_table_query.query_result | map(attribute='table_name') | list }}"
warehouse_tables_diff_expected: "{{ warehouse_tables_expected | difference(warehouse_tables_found) }}"
warehouse_tables_diff_found: "{{ warehouse_tables_found | difference(warehouse_tables_expected) }}"
warehouse_tables_diff_expected: "{{ warehouse_tables_expected | difference(warehouse_tables_pagila) }}"
warehouse_tables_diff_found: "{{ warehouse_tables_pagila | difference(warehouse_tables_expected) }}"
warehouse_tables_diff: "{{ warehouse_tables_diff_expected + warehouse_tables_diff_found }}"

- name: debug include table mismatches
ansible.builtin.debug:
msg: "table diff=[{{ ', '.join(warehouse_tables_diff) }}]"
vars:
warehouse_tables_expected: ['include_public_staff']
warehouse_tables_include: "{{ warehouse_tables_found | select('match', '^pagila_.*') }}"
warehouse_tables_found: "{{ warehouse_table_query.query_result | map(attribute='table_name') | list }}"
warehouse_tables_diff_expected: "{{ warehouse_tables_expected | difference(warehouse_tables_include) }}"
warehouse_tables_diff_found: "{{ warehouse_tables_include | difference(warehouse_tables_expected) }}"
warehouse_tables_diff: "{{ warehouse_tables_diff_expected + warehouse_tables_diff_found }}"

- name: fail verification
Expand All @@ -161,7 +176,7 @@
ansible.builtin.set_fact:
warehouse_data: "{{ warehouse_data_query.results | list_to_dict('item') }}"

- name: verify warehouse table data
- name: verify warehouse pagila table data
ansible.builtin.fail:
msg: >-
warehouse data for table {{ source_table }} is incomplete,
Expand All @@ -176,6 +191,20 @@
loop: "{{ (pagila_target_tables | list) }}"
when: source_table_records > warehouse_table_records

- name: verify warehouse include table data
ansible.builtin.fail:
msg: >-
warehouse data for table {{ source_table }} is incomplete,
{{ warehouse_table_records }}/{{ source_table_records }} found"
vars:
warehouse_table_records: "{{ (warehouse_data[warehouse_table].query_result | list)[0].count }}"
warehouse_table: "include_public_{{ item }}"
source_table: "{{ item }}"
source_table_records: "{{ (pagila_data[source_table].query_result | list)[0].count }}"
loop:
- "staff"
when: source_table_records > warehouse_table_records

- name: verify metabase app database
when: lookup('ansible.builtin.env', 'METABASE_VERIFY', default='true') | bool
block:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "dataplane"
version = "0.1.13"
version = "0.1.14"
description = ""
authors = ["Teodoro Cook <[email protected]>"]

Expand Down

0 comments on commit be979eb

Please sign in to comment.