To deploy Materialize using MySQL as its upstream database, you'll need to configure it with Debezium.
[Debezium] itself provides change data capature (CDC) to legacy databases like MySQL and Postgres. Ultimately, CDC lets MySQL publish to Kafka, which in turn can be consumed by Materialize.
Check out mtrlz-setup
's
debezium.sh
.
-
Install MySQL through your favorite package manager.
brew install mysql
-
Adjust the MySQL configuration file to use UTC by adding the following line to 'my.cnf`:
default-time-zone = '+00:00'
With Homebrew, the file lives at
/usr/local/etc/my.cnf
. -
Restart MySQL that it can pick up the time zone change.
brew services restart mysql
-
Create a
tpch
database.mysql -uroot
CREATE DATABASE tpch;
-
Create a MySQL user that Debezium will connect as.
CREATE USER debezium; ALTER USER debezium IDENTIFIED WITH mysql_native_password BY 'debezium'; GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'; GRANT ALL PRIVILEGES ON tpch.* TO 'debezium'@'%';
The
IDENTIFIED WITH mysql_native_password ...
clause sets the password for the account to Debezium using the old MySQL password protocol, which is all that Debezium supports. This is only necessary on MySQL 8.x+ (I think); if you get an error aboutmysql_native_password
not being a thing, try just dropping theWITH mysql_native_password
bit from the query. -
Load TPC-H with
benesch
's fork oftpch-gen
's data.git clone https://github.com/benesch/tpch-dbgen.git cd tpch-dbgen make ./dbgen mysql -uroot --local-infile < ddl.sql
-
Download the latest 1.0 Debezium MySQL connector and place the
debezium-connector-mysql
directory in/usr/local/opt/confluent-platform/share/java/
. If this directory doesn't exist, put the contents of thetar
into theshare/java
directory under your Confluent install directory. In other words, replace/usr/local/opt/confluent-platform
in the path above with the path to where you installed Confluent.curl -O https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.0.0.Beta3/debezium-connector-mysql-1.0.0.Beta3-plugin.tar.gz tar -zxvf debezium-connector-mysql-1.0.0.Beta3-plugin.tar.gz rm debezium-connector-mysql-1.0.0.Beta3-plugin.tar.gz if [ -d /usr/local/opt/confluent-platform/share/java/ ]; then mv debezium-connector-mysql /usr/local/opt/confluent-platform/share/java else echo "Move debezium-connector-mysql into the share/java directory under your Confluent install directory" fi
Note that Debezium versions before 1.0 mishandled dates in a way that can cause incorrect results when loading data into Materialize.
To test that your local MySQL/Debezium actually works with Materialize, we'll
have Materialize ingest the TPCH lineitem
table.
!NOTE! If you run into problems, check out the Troubleshooting section at the bottom. If you don't see a solution to your problem there, throw the answer in their once you solve it.
-
Start/restart Kafka Connect.
confluent local services connect stop && confluent local service connect start
-
Connect MySQL and Kafka via Debezium.
curl -H 'Content-Type: application/json' localhost:8083/connectors --data '{ "name": "tpch-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "localhost", "database.port": "3306", "database.user": "debezium", "database.password": "debezium", "database.server.name": "tpch", "database.server.id": "184054", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "tpch-history", } }'
If you get an error that the Debezium MySQL connector doesn't exist, check out the
Failed to find any class that implements Connector and which name matches io.debezium.connector.mysql.MySqlConnector
section below. -
Watch the Connect log file and look for messages from Debezium.
confluent local log connect
-
In a new shell (#2), read the actual data out of Kafka once Debezium finishes its first snapshot.
kafka-avro-console-consumer --from-beginning --bootstrap-server localhost:9092 --topic tpch.tpch.customer
-
In a new shell (#3), launch a
materialized
server.cd <path/to/materialized> && cargo run --bin materialized
-
In a new shell (#4), connect to
materialized
and create a source to import thelineitems
table.cd <path/to/materialized> source doc/developer/assets/demo/utils.sh mtrlz-shell
CREATE SOURCE lineitem FROM KAFKA BROKER 'localhost:9092' TOPIC 'tpch.tpch.lineitem' FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://localhost:8081' ENVELOPE DEBEZIUM; CREATE MATERIALIZED VIEW count AS SELECT COUNT(*) FROM lineitem; SELECT * FROM count; SELECT * FROM count; -- ...
Once
SELECT * FROM count
returns6001215
, all of the rows fromlineitem
have been imported.Naturally, if this works at all, it indicates that Debezium is dutifully getting data out of MySQL. You could, at this point, tear down the setup that is loading
lineitem
into Materialize.
Failed to find any class that implements Connector and which name matches io.debezium.connector.mysql.MySqlConnector
If you encounter this error, the confluent-platform
set of tools that you
installed cannot find the debezium-connector-mysql
directory.
To start investigating this, find out where confluent-platform
is looking for
files to include using...
confluent local log connect
There's a chance it'll be ~/Downloads/
or some other such nonsense.
To fix this particular version of this issue, remove any reference to the
confluent
or confluent-platform
binary in the offending location (e.g. tar
files), and then restart Kafka Connect.
confluent local services connect stop && confluent local services connect start
If SELECT * FROM count
stops growing at a value less than 6001215
, try
writing explicit watermarks to indicate that a topic is finished. For example,
to add a watermark to lineitems:
```shell
kafka-avro-console-producer --broker-list localhost:9092 --topic tpch.tpch.lineitem --property value.schema="$(curl localhost:8081/subjects/tpch.tpch.lineitem-value/versions/1 | jq -r .schema | jq)" <<<'{"before":null,"after":null,"source":{"version":{"string":"0.9.5.Final"},"connector":{"string":"mysql"},"name":"tpch","server_id":0,"ts_sec":0,"gtid":null,"file":"binlog.000004","pos":951896181,"row":0,"snapshot":{"boolean":true},"thread":null,"db":{"string":"tpch"},"table":{"string":"lineitem"},"query":null},"op":"c","ts_ms":{"long":1560886948093}}'
```