通过建表语句中的 PERIOD FOR SYSTEM_TIME
将表标识为维表,其中PRIMARY KEY(keyInfo)
中的keyInfo,表示用来和源表进行关联的字段,
维表JOIN的条件必须与keyInfo
字段一致。
CREATE TABLE tableName(
colName cloType,
...
PRIMARY KEY(keyInfo),
PERIOD FOR SYSTEM_TIME
)WITH(
type ='cassandra',
address ='ip:port[,ip:port]',
userNae='dbUserName',
password='dbPwd',
tableName='tableName',
database='database',
cache ='LRU',
cacheSize ='10000',
cacheTTLMs ='60000',
parallelism ='1',
partitionedJoin='false'
);
cassandra-3.x
参数名称 | 含义 |
---|---|
tableName | 注册到flink的表名称(可选填;不填默认和cassandra对应的表名称相同) |
colName | 列名称 |
colType | 列类型 |
PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息 |
PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开 |
参数名称 | 含义 | 是否必填 | 默认值 |
---|---|---|---|
type | 表明 输出表类型 cassandra | 是 | |
address | 连接cassandra数据库 jdbcUrl | 是 | |
userName | cassandra连接用户名 | 否 | |
password | cassandra连接密码 | 否 | |
tableName | cassandra表名称 | 是 | |
database | cassandra表名称 | 是 | |
cache | 维表缓存策略(NONE/LRU) | 否 | NONE |
partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量) | 否 | false |
maxRequestsPerConnection | 每个连接允许的并发请求数 | 否 | 1 |
coreConnectionsPerHost | 每台主机连接的核心数 | 否 | 8 |
maxConnectionsPerHost | Cassandra集群里的每个机器都最多连接数 | 否 | 32768 |
maxQueueSize | Cassandra队列大小 | 否 | 100000 |
readTimeoutMillis | Cassandra读超时 | 否 | 60000 |
connectTimeoutMillis | Cassandra连接超时 | 否 | 60000 |
poolTimeoutMillis | Cassandra线程池超时 | 否 | 60000 |
缓存策略
- NONE:不做内存缓存。每条流数据触发一次维表查询操作。
- ALL: 任务启动时,一次性加载所有数据到内存,并进行缓存。适用于维表数据量较小的情况。
- LRU: 任务执行时,根据维表关联条件使用异步算子加载维表数据,并进行缓存。
CREATE TABLE sideTable(
id bigint,
school varchar,
home varchar,
PRIMARY KEY(id),
PERIOD FOR SYSTEM_TIME
)WITH(
type='mysql',
url='jdbc:mysql://172.16.8.109:3306/tiezhu',
userName='dtstack',
password='abc123',
tableName='stressTest',
cache='ALL',
parallelism='1'
);
CREATE TABLE sideTable(
id bigint,
message varchar,
PRIMARY KEY(id),
PERIOD FOR SYSTEM_TIME
)WITH(
type ='cassandra',
address ='192.168.80.106:9042, 192.168.80.107:9042',
database ='tiezhu',
tableName ='stu',
userName='cassandra',
password='cassandra',
cache ='LRU',
parallelism ='1',
partitionedJoin='false'
);
CREATE TABLE MyTable(
id bigint,
name varchar,
address varchar
)WITH(
type = 'kafka10',
bootstrapServers = '172.16.101.224:9092',
zookeeperQuorm = '172.16.100.188:2181/kafka',
offsetReset = 'latest',
topic = 'tiezhu_test_in2',
timezone = 'Asia/Shanghai',
topicIsPattern = 'false',
parallelism = '1'
);
CREATE TABLE sideTable(
id bigint,
message varchar,
PRIMARY KEY(id),
PERIOD FOR SYSTEM_TIME
)WITH(
type ='cassandra',
address ='192.168.80.106:9042, 192.168.80.107:9042',
database ='tiezhu',
tableName ='stu',
userName='cassandra',
password='cassandra',
cache ='LRU',
parallelism ='1',
partitionedJoin='false'
);
CREATE TABLE MyResult(
id bigint,
name varchar,
address varchar,
message varchar
)WITH(
type ='cassandra',
address ='192.168.80.106:9042,192.168.80.107:9042',
userName='cassandra',
password='cassandra',
database ='tiezhu',
tableName ='stu_out',
parallelism ='1'
);
insert
into
MyResult
select
t1.id AS id,
t1.name AS name,
t1.address AS address,
t2.message AS message
from
(
select
id,
name,
address
from
MyTable
) t1
join sideTable t2
on t1.id = t2.id;