-
Notifications
You must be signed in to change notification settings - Fork 6
Flink SQL 介绍
在本章节 您将了解以下内容
- SQL on Flink 的设计目标
- Flink SQL 如何统一 stream/batch 处理
- SQL on Flink 的目标应用场景
- Flink 交互式 SQL CLI 客户端
下面的文档可能会为您的培训提供帮助:
- Streaming Concepts - Flink SQL 关于 streaming 的介绍,例如 time attributes 的配置以及更新结果的处理。
- Flink SQL - Flink SQL 整体介绍。
- Built-In Functions - 介绍 Flink 内置函数。
- Flink SQL Client - 介绍 Flink SQL Client 及其配置。
SQL CLI 客户端通过一个配置文件来描述环境配置,包括当前存在的 source 和 sink 表以及 UDF(用户自定义函数)。您不需要添加或删除表或函数。 您将要学习如何创建和删除 views(视图)。
接下来,我们演示如何通过 CLI 来展示练习环境里的表和函数。
您可以通过 SHOW TABLES
命令列出当前所有已经存在的表。这个命令会列举所有的 source 和 sink 表以及 view(视图)。
Flink SQL> SHOW TABLES;
Rides
Fares
DriverChanges
Sink_TenMinPsgCnts
Sink_AreaCnts
如下所示,您可以使用 DESCRIBE
命令查阅一张表的 schema:
Flink SQL> DESCRIBE Rides;
root
|-- rideId: BIGINT
|-- taxiId: BIGINT
|-- isStart: BOOLEAN
|-- lon: FLOAT
|-- lat: FLOAT
|-- rideTime: TIMESTAMP(3) *ROWTIME*
|-- psgCnt: INT
Source 表 可以 被用于一个 SQL 查询的 FROM
子句。它们 不可以 用作 INSERT INTO
子句的目标表。
所有的 source tables 都是以 Apache Kafka topics 的形式存储的外部表。 Kafka 的消息为 JSON 编码。从 topic 里读取的每条消息被追加到对应的 source 表。
注意:当训练环境启动后,Source 表不会立即完整地生成。它们的消息记录会持续的导入 Kafka topic 中。当训练环境启动后消息导入便开始了 (docker-compose up -d
) 并且以 10 倍快的速度运行,例如,导入 10 分钟的消息记录大概需要 1 分钟时间 (基于消息记录的时间戳)。
我们将使用 Rides
source 表来做所有的练习。这张表包含了 2013 年初纽约市的出租车乘坐信息。每次出租车乘坐对应 2 条事件消息记录,一条 开始打车事件 和一条 结束打车事件。
Rides
表的 schema 如下:
rideId: BIGINT // 一次打车的唯一 id (注意,Rides 一次打车对应两条消息记录)
taxiId: BIGINT // 出租车的唯一 id
isStart: BOOLEAN // flag 标识上车 (true) 或者下车 (false) 事件
lon: FLOAT // 上车或下车地点的精度
lat: FLOAT // 上车或下车地点的纬度
rideTime: TIMESTAMP // 上车或下车时的时间戳
psgCnt: INT // 打车人数
Fares
source 表包含出租车的车费信息。每次打车对应 1 条费用信息。
Fares
表的 schema 如下:
rideId: BIGINT // 打车事件的唯一 id
payTime: TIMESTAMP // 付费发生时间戳 (和 Rides 表中打车结束时间戳相等)
payMethod: VARCHAR // 付费方式 (CSH, CRD, DIS, NOC, UNK)
tip: FLOAT // 已付小费金额
toll: FLOAT // 已付通行费金额
fare: FLOAT // 已付车费金额
DriverChanges
包含司机更换的消息,一次更换对应 1 条记录。司机更换时表中会注入一条新记录。
DriverChanges
表的 schema 如下:
taxiId: BIGINT // 出租车的唯一 id
driverId: BIGINT // 新司机的唯一 id
usageStartTime: TIMESTAMP // 新司机使用出租车的时间戳
Sink 表 可以 被用作 INSERT INTO
子句的目标表。他们 不可以 被用于 FROM
子句。
Sink_TenMinPsgCnts
表代表一个外部的 append sink 写到 Apache Kafka topic TenMinPsgCnts
。
它包含字段:start time,end time,and count:
Flink SQL> DESCRIBE Sink_TenMinPsgCnts;
root
|-- cntStart: TIMESTAMP(3)
|-- cntEnd: TIMESTAMP(3)
|-- cnt: BIGINT
通过在包含 docker-compose.yml
的目录下运行如下命令,您可以监听 TenMinsPsgCnts
表的 Kafka topic:
docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic TenMinPsgCnts --from-beginning
Sink_AreaCnts
表代表一个外部的 upsert sink 写到 Elasticsearch 索引 area-cnts
。
它包含字段:area identifier 和 count:
Flink SQL> DESCRIBE Sink_AreaCnts;
root
|-- areaId: INT
|-- cnt: BIGINT
您可以通过 Elasticsearch 的 REST API 访问 sink 表 index 的状态:
- 展示已存在的索引:http://localhost:9200/_cat/indices?v
- 展示索引
area-cnts
的详细信息:http://localhost:9200/area-cnt - 展示索引
area-cnts
的统计信息:http://localhost:9200/area-cnt/_stats - 检索
area-cnts
索引的内容:http://localhost:9200/area-cnt/_search
注意:area-cnts
索引在 sink 写数据时会自动创建。
为了探索 Rides
表的数据,执行一个简单查询:
SELECT * FROM Rides;
CLI 将会进入结果展示模式来显示结果。
rideId taxiId isStart lon lat rideTime psgCnt
150156 2013003948 false -73.98211 40.74796 2013-01-01 06:49:26.0 2
150538 2013003570 false -74.004684 40.72859 2013-01-01 06:49:26.0 1
151066 2013005078 true -73.97712 40.752007 2013-01-01 06:49:26.0 1
147794 2013010015 false -73.87098 40.774143 2013-01-01 06:49:27.0 1
148680 2013003578 false -73.96466 40.680794 2013-01-01 06:49:27.0 1
151067 2013002010 true -73.992256 40.750004 2013-01-01 06:49:27.0 2
敲击 q
键退出结果展示模式。结果展示模式还提供了其他功能,例如翻页或者增/减更新频率,如结尾处展示所示。
训练环境提供了一套用户自定义函数来处理训练数据。
使用 SHOW FUNCTIONS
命令列举所有的用户自定义函数:
Flink SQL> SHOW FUNCTIONS;
timeDiff
toCoords
isInNYC
toAreaId
Drivers
函数的定义如下:
-
timeDiff(t1: TIMESTAMP, t2: TIMESTAMP): BIGINT
: 将 timestamps 转成 UTC,做减法t1 - t2
返回毫秒单位的时间 diff。 -
isInNYC(lon: FLOAT, lat: FLOAT): BOOLEAN
: Checks 是否位置在 NYC area。 -
toAreaId(lon: FLOAT, lat: FLOAT): INT
: 映射一个位置 (longitude, latitude) 到 area id 来代表一个 100x100 米的方格。 -
toCoords(areaId: INT): [lon: FLOAT, lat: FLOAT]
:getAreaId
的逆方法,用来将 area cell 转成对应的坐标(经纬度)信息。 -
Drivers(ts: TIMESTAMP): Table(taxiId: BIGINT, driverId: BIGINT, usageStartTime: TIMESTAMP)
是一个特殊的 table-valued 函数,即所谓的 Temporal Table Function 它返回一个时间戳 timestampts
描述一辆出租车最近一次被使用的时间。这个函数将会在 join 的主题培训中详细介绍。
Views 用来通过 SQL 查询定义虚拟表。视图的定义会被立即解析并做语法/语义校验。然而 View 的执行只在引用它的 SELECT
或者 INSERT INTO
语句被执行的时候才会发生。
Views 可以在 CLI session 中使用 CREATE VIEW
语句创建:
CREATE VIEW RideStarts AS SELECT * FROM Rides WHERE isStart;
使用 DROP VIEW
语句删除已创建的视图。
DROP VIEW RideStarts;
写一个查询找出 ride 123
的上下车事件记录。
输出的内容大致如下:
isStart rideTime
true 2013-01-01 00:01:00.0
false 2013-01-01 00:07:00.0
点击查看答案
SELECT isStart, rideTime FROM Rides WHERE rideId=123;
该查询使用 ride id 过滤数据。
这个练习的任务是清洗 ride 表的 events:删除不再 New York City 上车或下车的事件记录。
输出的内容大致如下:
rideId taxiId isStart lon lat rideTime psgCnt
1 2013000001 true -73.99078 40.76088 2013-01-01 00:00:00.0 1
2 2013000002 true -73.978325 40.77809 2013-01-01 00:00:00.0 5
3 2013000003 true -73.98962 40.72999 2013-01-01 00:00:00.0 1
4 2013000004 true -73.981575 40.76763 2013-01-01 00:00:00.0 2
5 2013000005 true -74.00053 40.737343 2013-01-01 00:00:00.0 4
6 2013000006 true -73.866135 40.77109 2013-01-01 00:00:00.0 6
7 2013000007 true -74.00693 40.740765 2013-01-01 00:00:00.0 6
8 2013000008 true -73.955925 40.781887 2013-01-01 00:00:00.0 3
9 2013000009 true -73.99988 40.743343 2013-01-01 00:00:00.0 1
10 2013000010 true -73.989845 40.75804 2013-01-01 00:00:00.0 3
11 2013000011 true -73.870834 40.77377 2013-01-01 00:00:00.0 1
点击查看答案
SELECT * FROM Rides WHERE isInNYC(lon, lat);
该查询使用 UDF isInNYC
来过滤数据。
创建一个视图 nyc_view
描述在 New York City 发生的事件。
SELECT * FROM nyc_view
的输出应该和上一个练习的输出一致。
点击查看答案
CREATE VIEW nyc_view AS SELECT * FROM Rides WHERE isInNYC(lon, lat);
这个语句使用上一个练习的 query 创建了一个视图
创建一个视图 nyc_area_view
为所有的消息补充 area id。
SELECT * FROM nyc_area_view
的输出大致如下:
rideId taxiId isStart areaId rideTime psgCnt
1 2013000001 true 47792 2013-01-01 00:00:00.0 1
2 2013000002 true 44301 2013-01-01 00:00:00.0 5
3 2013000003 true 54043 2013-01-01 00:00:00.0 1
4 2013000004 true 46298 2013-01-01 00:00:00.0 2
5 2013000005 true 52535 2013-01-01 00:00:00.0 4
6 2013000006 true 45881 2013-01-01 00:00:00.0 6
7 2013000007 true 51780 2013-01-01 00:00:00.0 6
8 2013000008 true 43567 2013-01-01 00:00:00.0 3
9 2013000009 true 51285 2013-01-01 00:00:00.0 1
10 2013000010 true 48292 2013-01-01 00:00:00.0 3
点击查看答案
CREATE VIEW nyc_area_view AS
SELECT rideId, taxiId, isStart, toAreaId(lon, lat) AS areaId, rideTime, psgCnt
FROM nyc_view;
这个语句使用上一个练习的语句创建一个视图。这个 UDF 将经纬度信息转成 area ids。
Apache Flink, Flink®, Apache®, the squirrel logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation.