Skip to content

Flink SQL 介绍

Danny Chan edited this page Nov 21, 2019 · 7 revisions

在本章节 您将了解以下内容

  • SQL on Flink 的设计目标
  • Flink SQL 如何统一 stream/batch 处理
  • SQL on Flink 的目标应用场景
  • Flink 交互式 SQL CLI 客户端

Slides

文档

下面的文档可能会为您的培训提供帮助:

训练环境介绍

SQL CLI 客户端通过一个配置文件来描述环境配置,包括当前存在的 source 和 sink 表以及 UDF(用户自定义函数)。您不需要添加或删除表或函数。 您将要学习如何创建和删除 views(视图)。

接下来,我们演示了了如何通过 CLI 来展示训练环境里的表和函数。

已提供的表

您可以通过 SHOW TABLES 命令列出当前所有已经存在的表。这个命令会列举所有的 source 和 sink 表以及 view(视图)。

Flink SQL> SHOW TABLES;
Rides
Fares
DriverChanges
Sink_TenMinPsgCnts
Sink_AreaCnts

如下所示,您可以使用 DESCRIBE 命令查阅一张表的 schema:

Flink SQL> DESCRIBE Rides;
root
 |-- rideId: BIGINT
 |-- taxiId: BIGINT
 |-- isStart: BOOLEAN
 |-- lon: FLOAT
 |-- lat: FLOAT
 |-- rideTime: TIMESTAMP(3) *ROWTIME*
 |-- psgCnt: INT

Source 表

Source 表 可以 可以被用于一个 SQL 查询的 FROM 子句。它们 不可以 用作 INSERT INTO 子句的目标表。

所有的 source tables 都是以 Apache Kafka topics 的形式存储的外部表。 Kafka 的消息为 JSON 编码。从 topic 里读取的每条消息被追加到对应的 source 表。

注意:当训练环境启动后,Source 表不会立即完整地生成。它们的消息记录会持续的导入 Kafka topic 中。当训练环境启动后消息导入便开始了 (docker-compose up -d) 并且以 10 倍快的速度运行,例如,导入 10 分钟的消息记录大概需要 1 分钟时间 (基于消息记录的时间戳)。

Rides

我们将使用 Rides source 表来做所有的练习。这张表包含了 2013 年初纽约市的出租车乘坐信息。每次出租车乘坐对应 2 条事件消息记录,一条 开始打车事件 和一条 结束打车事件

Rides 表的 schema 如下:

rideId: BIGINT       // 一次打车的唯一 id (注意,Rides 一次打车对应两条消息记录)
taxiId: BIGINT       // 出租车的唯一 id
isStart: BOOLEAN     // flag 标识上车 (true) 或者下车 (false) 事件
lon: FLOAT           // 上车或下车地点的精度
lat: FLOAT           // 上车或下车地点的维度
rideTime: TIMESTAMP  // 上车或下车时的时间戳
psgCnt: INT          // 打车人数

Fares

Fares source 表包含出租车的车费信息。每次打车对应 1 条费用信息。

Fares 表的 schema 如下:

rideId: BIGINT      // 打车的唯一 id
payTime: TIMESTAMP  // 付费发生时间戳 (和 Rides 表中打车结束时间戳相等)
payMethod: VARCHAR  // 付费方式 (CSH, CRD, DIS, NOC, UNK)
tip: FLOAT          // 已付小费金额
toll: FLOAT         // 已付通行费金额
fare: FLOAT         // 已付车费金额

DriverChanges

DriverChanges 包含司机更换的消息,一次更换对应 1 条记录。司机更换时表中会注入一条新记录。

DriverChanges 表的 schema 如下:

taxiId: BIGINT             // 出租车的唯一 id
driverId: BIGINT           // 新司机的唯一 id
usageStartTime: TIMESTAMP  // 新司机使用出租车的时间戳

Sink 表

Sink 表 可以 被用作 INSERT INTO 子句的目标表。他们 不可以 被用于 FROM 子句。

Sink_TenMinPsgCnts

Sink_TenMinPsgCnts 表代表一个外部的 append sink 写到 Apache Kafka topic TenMinPsgCnts

它包含字段:start time,end time,and count:

Flink SQL> DESCRIBE Sink_TenMinPsgCnts;
root
 |-- cntStart: TIMESTAMP(3)
 |-- cntEnd: TIMESTAMP(3)
 |-- cnt: BIGINT

通过在包含 docker-compose.yml 的目录下运行如下命令,您可以监听 TenMinsPsgCnts 表的 Kafka topic:

docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic TenMinPsgCnts --from-beginning

Sink_AreaCnts

Sink_AreaCnts 表代表一个外部的 upsert sink 写到 Elasticsearch 索引 area-cnts

它包含字段:area identifier 和 count:

Flink SQL> DESCRIBE Sink_AreaCnts;
root
 |-- areaId: INT
 |-- cnt: BIGINT

您可以通过 Elasticsearch 的 REST API 访问 sink 表 index 的状态:

注意area-cnts 索引在 sink 写数据时会自动创建。

探索训练数据

为了探索 Rides 表的数据,执行一个简单查询:

SELECT * FROM Rides;

CLI 将会进入结果展示模式来显示结果。

rideId        taxiId   isStart           lon           lat                  rideTime   psgCnt
150156    2013003948     false     -73.98211      40.74796     2013-01-01 06:49:26.0        2
150538    2013003570     false    -74.004684      40.72859     2013-01-01 06:49:26.0        1
151066    2013005078      true     -73.97712     40.752007     2013-01-01 06:49:26.0        1
147794    2013010015     false     -73.87098     40.774143     2013-01-01 06:49:27.0        1
148680    2013003578     false     -73.96466     40.680794     2013-01-01 06:49:27.0        1
151067    2013002010      true    -73.992256     40.750004     2013-01-01 06:49:27.0        2

敲击 q 键退出结果展示模式。结果展示模式还提供了其他功能,例如翻页或者增/减更新频率,如结尾处展示所示。

已提供的 User-Defined Functions(用户自定义函数)

训练环境提供了一套用户自定义函数来处理训练数据。

使用 SHOW FUNCTIONS 命令列举所有的用户自定义函数:

Flink SQL> SHOW FUNCTIONS;
timeDiff
toCoords
isInNYC
toAreaId
Drivers

函数的定义如下:

  • timeDiff(t1: TIMESTAMP, t2: TIMESTAMP): BIGINT: 将 timestamps 转成 UTC,做减法 t1 - t2 返回毫秒单位的时间 diff。

  • isInNYC(lon: FLOAT, lat: FLOAT): BOOLEAN: Checks 是否位置在 NYC area。

  • toAreaId(lon: FLOAT, lat: FLOAT): INT: 映射一个位置 (longitude, latitude) 到 area id 来代表一个方格 100x100 米的方格。

  • toCoords(areaId: INT): [lon: FLOAT, lat: FLOAT]: getAreaId 的逆方法用来将 area cell 转成对应的坐标(经纬度)信息。

  • Drivers(ts: TIMESTAMP): Table(taxiId: BIGINT, driverId: BIGINT, usageStartTime: TIMESTAMP) 是一个特殊的 table-valued 函数,即所谓的 Temporal Table Function 它返回一个时间戳 timestamp ts 描述一辆出租车最新被使用的时间。这个函数将会在 join 的主题培训中详细介绍。

Views(视图)

Views 用来通过 SQL 查询定义虚拟表。视图的定义会被立即解析并做语法/语法校验。然而 View 只在引用它的 SELECT 或者 INSERT INTO 语句被执行的时候才会跟着一起执行。

Views 可以再 CLI session 中使用 CREATE VIEW 语句创建:

CREATE VIEW RideStarts AS SELECT * FROM Rides WHERE isStart;

使用 DROP VIEW 语句删除已创建的视图。

DROP VIEW RideStarts;

练习

找到一个特定的 Ride

写一个查询找出 ride 123 的上下车事件记录。

输出的内容大致如下:

isStart                  rideTime
   true     2013-01-01 00:01:00.0
  false     2013-01-01 00:07:00.0
点击查看答案
SELECT isStart, rideTime FROM Rides WHERE rideId=123;

该查询使用 ride id 过滤数据。


清洗 Rides 表

这个练习的任务是清洗 ride 表的 events:删除不再 New York City 上车或下车的事件记录。

输出的内容大致如下:

rideId           taxiId          isStart              lon              lat                  rideTime    psgCnt
     1       2013000001             true        -73.99078         40.76088     2013-01-01 00:00:00.0         1
     2       2013000002             true       -73.978325         40.77809     2013-01-01 00:00:00.0         5
     3       2013000003             true        -73.98962         40.72999     2013-01-01 00:00:00.0         1
     4       2013000004             true       -73.981575         40.76763     2013-01-01 00:00:00.0         2
     5       2013000005             true        -74.00053        40.737343     2013-01-01 00:00:00.0         4
     6       2013000006             true       -73.866135         40.77109     2013-01-01 00:00:00.0         6
     7       2013000007             true        -74.00693        40.740765     2013-01-01 00:00:00.0         6
     8       2013000008             true       -73.955925        40.781887     2013-01-01 00:00:00.0         3
     9       2013000009             true        -73.99988        40.743343     2013-01-01 00:00:00.0         1
    10       2013000010             true       -73.989845         40.75804     2013-01-01 00:00:00.0         3
    11       2013000011             true       -73.870834         40.77377     2013-01-01 00:00:00.0         1
点击查看答案
SELECT * FROM Rides WHERE isInNYC(lon, lat);

该查询使用 UDF isInNYC 来过滤数据。


NYC Rides View

创建一个视图 nyc_view 描述在 New York City 发生的事件。

SELECT * FROM nyc_view 的输出应该和上一个练习的输出一致。

点击查看答案
CREATE VIEW nyc_view AS SELECT * FROM Rides WHERE isInNYC(lon, lat);

这个语句使用上一个练习的 query 创建了一个视图


NYC Areas Rides View

创建一个视图 nyc_area_view 为所有的消息补充 area id。

SELECT * FROM nyc_area_view 的输出大致如下:

rideId            taxiId           isStart            areaId                  rideTime      psgCnt
     1        2013000001              true             47792     2013-01-01 00:00:00.0           1
     2        2013000002              true             44301     2013-01-01 00:00:00.0           5
     3        2013000003              true             54043     2013-01-01 00:00:00.0           1
     4        2013000004              true             46298     2013-01-01 00:00:00.0           2
     5        2013000005              true             52535     2013-01-01 00:00:00.0           4
     6        2013000006              true             45881     2013-01-01 00:00:00.0           6
     7        2013000007              true             51780     2013-01-01 00:00:00.0           6
     8        2013000008              true             43567     2013-01-01 00:00:00.0           3
     9        2013000009              true             51285     2013-01-01 00:00:00.0           1
    10        2013000010              true             48292     2013-01-01 00:00:00.0           3
点击查看答案
CREATE VIEW nyc_area_view AS
SELECT rideId, taxiId, isStart, toAreaId(lon, lat) AS areaId, rideTime, psgCnt
FROM nyc_view;

这个语句使用上一个练习的语句创建一个视图。这个 UDF 将经纬度信息转成 area ids。