Ingest kafka data into databend
go install github.com/databendcloud/bend-ingest-kafka@latest
Or download the binary from the release page.
The json transform mode is the default mode which will transform the kafka data into databend table, you can use it by setting the --is-json-transform
to true
.
For example, the kafka data like
{"i64": 10,"u64": 30,"f64": 20,"s": "hao","s2": "hello","a16":[1],"a8":[2],"d": "2011-03-06","t": "2016-04-04 11:30:00"}
you should create a table using
CREATE TABLE test_ingest (
i64 Int64,
u64 UInt64,
f64 Float64,
s String,
s2 String,
a16 Array(Int16),
a8 Array(UInt8),
d Date,
t DateTime);
bend-ingest-kafka
--kafka-bootstrap-servers="127.0.0.1:9092,127.0.0.2:9092"\
--kafka-topic="Your Topic"\
--kafka-consumer-group= "Consumer Group"\
--databend-dsn="http://root:[email protected]:8000"\
--databend-table="db1.tbl" \
--data-format="json" \
--batch-size=100000 \
--batch-max-interval=300
Config the config file config/conf.json
{
"kafkaBootstrapServers": "localhost:9092",
"kafkaTopic": "ingest_test",
"KafkaConsumerGroup": "test",
"mockData": "",
"isJsonTransform": true,
"databendDSN": "https://host:port@host:port",
"databendTable": "default.kfk_test",
"batchSize": 1,
"batchMaxInterval": 5,
"dataFormat": "json",
"workers": 1,
"copyPurge": false,
"copyForce": false
}
and execute the command
./bend-ingest-kafka
The raw mode is used to ingest the raw data into databend table, you can use it by setting the isJsonTransform
to false
.
In this mode, we will create a table with the name databendTable
which columns are (uuid, koffset,kpartition, raw_data, record_metadata, add_time)
and ingest the raw data into this table.
The record_metadata
is the metadata of the kafka record which contains the topic
, partition
, offset
, create_time
, key
, and the add_time
is the time when the record is added into databend.
If the kafka json data is:
{"i64": 10,"u64": 30,"f64": 20,"s": "hao","s2": "hello","a16":[1],"a8":[2],"d": "2011-03-06","t": "2016-04-04 11:30:00"}
run the command
./bend-ingest-kafka
with config.conf.json
and the table default.kfk_test
will be created and the data will be ingested into this table.
Parameter | Description | Default | example |
---|---|---|---|
kafkaBootstrapServers | kafka bootstrap servers | "127.0.0.1:64103" | "127.0.0.1:9092,127.0.0.2:9092" |
kafkaTopic | kafka topic | "test" | "test" |
KafkaConsumerGroup | kafka consumer group | "kafka-bend-ingest" | "test" |
isSASL | is sasl | false | true |
saslUser | sasl user | "" | "user" |
saslPassword | sasl password | "" | "password" |
mockData | mock data | "" | "" |
isJsonTransform | is json transform | true | true |
databendDSN | databend dsn | no | "http://localhost:8000" |
databendTable | databend table | no | "db1.tbl" |
batchSize | batch size | 1000 | 1000 |
batchMaxInterval | batch max interval | 30 | 30 |
dataFormat | data format | json | "json" |
workers | workers thread number | 1 | 1 |
copyPurge | copy purge | false | false |
copyForce | copy force | false | false |
DisableVariantCheck | disable variant check | false | false |
MinBytes | min bytes | 1024 | 1024 |
MaxBytes | max bytes | 1048576 | 1048576 |
MaxWait | max wait | 10 | 10 |
useReplaceMode | use replace mode | false | false |
userStage | user external stage name | ~ | ~ |
NOTE:
- The
useReplaceMode
is used to replace the data in the table, if the data already exists in the table, the new data will replace the old data. But theuseReplaceMode
is only supported whenisJsonTransform
false because it needs to addkoffset
andkpartition
field in the target table.