-
Notifications
You must be signed in to change notification settings - Fork 6
使用 SQL 查询动态表
本课程将讨论如何编写 SQL 查询来处理流数据。
特定的,你讲学到:
- Flink 的动态表的概念
- 流-表的互转
- SQL 中 Event-Time 与 Processing-Time 的处理
- 支持的 window 操作
- 理解 query 状态的管理
- 算子以及 Append-only 和 Update 流
这些练习将会教会你如何在数据流上编写流式以及物化查询。
下面的练习基于事件发生的时间来处理数据。查询会消费一个 append-only 的流,并产生一个 append-only 的流。更新和删除不会发生。
为了持续地监测城市的出租车交通状况,统计每个区域每5分钟到达与离开的行程数。
我们只关心开始与结束都发生在纽约市的事件,以及至少有5个到达或离开的行程的区域。
点击获取提示。
- 使用提供的
toAreaId
来转换坐标(经度、维度) 到区域 id。
输出应该如下所示:
area isStart t cnt
49282 true 2013-01-01 00:05:00.0 6
45881 true 2013-01-01 00:05:00.0 8
51781 true 2013-01-01 00:05:00.0 8
49551 true 2013-01-01 00:05:00.0 7
48540 true 2013-01-01 00:10:00.0 6
51795 true 2013-01-01 00:10:00.0 6
47550 true 2013-01-01 00:10:00.0 6
54285 true 2013-01-01 00:10:00.0 8
51781 true 2013-01-01 00:10:00.0 17
45548 true 2013-01-01 00:10:00.0 14
其中t
列代表每5分钟窗口的结束时间。
点击查看答案。
SELECT
toAreaId(lon, lat) AS area,
isStart,
TUMBLE_END(rideTime, INTERVAL '5' MINUTE) AS t,
COUNT(*) AS cnt
FROM Rides
WHERE isInNYC(lon, lat)
GROUP BY
toAreaId(lon, lat),
isStart,
TUMBLE(rideTime, INTERVAL '5' MINUTE)
HAVING COUNT(*) >= 5;
该查询首先过滤了那些未发生在纽约市的事件。它使用了一个 5 分钟的 tumbling window,作用在 rideTime
这个 time attribute 之上。结果根据 area id,isStart
标记,以及 tumbling window 来分组。对于每个分组,我们返回了 area id,isStart
标记,以及窗口的结束边界,还有聚合的统计数。我们只返回统计数等于或大于5的结果。
下面的练习会在处理数据的同时持续地更新结果。查询生成了更新以及删除。
对于本练习,你需要计算每个乘车人数对应的行程数,也就是,乘车人数为1的行程有多少趟,乘车人数为2的行程有多少趟,乘车人数为3的行程有多少趟......
我们只关系发生在纽约市的行程数。
点击查看提示。
- 每个行程都由两个事件代表。过滤掉结束的事件,以得到准确的计数。
输出如下所示:
psgCnt cnt
4 6051
2 31063
3 10812
6 5408
5 11031
1 100367
点击查看结果.
SELECT
psgCnt,
COUNT(*) AS cnt
FROM Rides
WHERE isInNYC(lon, lat) AND isStart
GROUP BY
psgCnt;
在本练习中我们想统计每一个区域在一天的每个小时段中到达与离开的行程总数,也就是,每个区域在所有天发生在0点到1点,1点到2点,2点到3点....的总行程数。因此我们不想要每天的一个数目,而是所有天的每个小时段。
我们仅关心开始或结束于纽约的事件。为了限制结果的大小,仅返回超过60个事件的区域。
点击查看提示。
- 该查询与之前的 窗口的行程数 练习很像,不过主要区别在 GROUP BY 语句块,因为它会更新每天之前的统计数。
- 使用内置的
HOUR
函数来抽取事件戳的小时。 - 使用提供的
toAreaId
来转换坐标到 area id。
输出如下所示:
area isStart hourOfDay cnt
49551 false 0 85
49789 true 0 64
48806 true 0 75
50044 false 0 62
52543 true 0 67
49792 false 1 77
48559 true 1 114
48808 true 1 100
点击查看答案。
SELECT
toAreaId(lon, lat) AS area,
isStart,
HOUR(rideTime) AS hourOfDay,
COUNT(*) AS cnt
FROM Rides
WHERE isInNYC(lon, lat)
GROUP BY
toAreaId(lon, lat),
isStart,
HOUR(rideTime)
HAVING COUNT(*) > 60;
该查询是一个典型的 SELECT FROM GROUP BY
查询。在过滤了纽约车辆后,将结果按照 area id,isStart
标记,以及小时段(使用HOUR(timestamp)
计算而来)来分组。通过使用 HAVING
子句,来只返回超过60的统计结果。
下面的练习在处理流式操作的数据时会持续地更新结果。查询会生成更新和删除。
在该练习中,你需要计算纽约市中每个区域每小时离开的平均乘客数。
点击查看提示。
- 每个行程都由两个事件代表。过滤掉结束的事件,以得到准确的计数。
- 使用提供的
toAreaId
将坐标转换成 area id - 分两步计算:首先计算每个区域每小时离开的乘客数;然后基于第一个的结果计算每个区域平均每小时离开乘客数。
输出如下所示:
area avgPsgLeaving
46568 1.7083333333333333333333~
47559 0.7916666666666666666666~
36313 0.0833333333333333333333~
49106 0.0416666666666666666666~
48792 0.5
55325 0.0833333333333333333333~
37570 0.375
点击查看答案。
SELECT
area,
SUM(psgSum)/24.0 AS avgPsgLeaving
FROM
(SELECT
toAreaId(lon, lat) AS area,
TUMBLE_END(rideTime ,INTERVAL '1' HOUR) AS t,
SUM(psgCnt) AS psgSum
FROM
Rides
WHERE
isStart AND isInNYC(lon, lat)
GROUP BY
toAreaId(lon, lat),
TUMBLE(rideTime, INTERVAL '1' HOUR))
GROUP BY
area;
该查询定义了一个子查询,使用了一个小时级的滚动窗口计算每小时离开区域的乘客数。该子查询产生了一个流式的结果并且按照区域进行分组,再计算量每个区域的平均每小时离开乘客数。
在本练习中,我们想返回那些在过去10分钟内乘出租车离开超过10人的区域。返回出发区域、时间戳、以及过去10分钟离开该区域的乘客数(如果多于10人的话)。
我们只关心发生在纽约的行车记录。
点击查看提示。
- 首先,过滤出只发生在纽约市的出发事件。
- 因为该查询需要返回一行包含一个会更新的 count 值,一旦由新的出发事件到达该 count 值就会更新,使用一个
OVER
window 来计算每个区域的持续统计的 count。
输出如下所示:
areaId rideTime peopleCnt
45881 2013-01-01 00:00:56.0 12
45881 2013-01-01 00:01:02.0 14
53283 2013-01-01 00:02:00.0 11
8252892 2013-01-01 00:02:33.0 11
8252892 2013-01-01 00:02:41.0 12
8252892 2013-01-01 00:02:55.0 13
8252892 2013-01-01 00:03:00.0 18
8252892 2013-01-01 00:03:00.0 18
41819 2013-01-01 00:03:00.0 12
45631 2013-01-01 00:03:00.0 13
45881 2013-01-01 00:03:07.0 18
点击查看答案。
SELECT
areaId,
rideTime,
peopleCnt
FROM
(
SELECT
areaId,
rideTime,
SUM(psgCnt) OVER w AS peopleCnt
FROM
(SELECT toAreaId(lon, lat) AS areaId, rideTime, psgCnt FROM Rides WHERE isStart)
WINDOW w AS (
PARTITION BY areaId
ORDER BY rideTime RANGE BETWEEN INTERVAL '10' MINUTE PRECEDING AND CURRENT ROW
)
)
WHERE peopleCnt > 10;
该子查询返回了所有的出发事件,并将坐标转换成了 area id。
第二个子查询定义了一个 OVER WINDOW,按照时间(rideTime
)排序,并按照 area id 来做分区,并计算出一个过去10分钟离开该区域的持续滚动的乘客总数值。
最后过滤出该乘客总数值大于10的结果。
Apache Flink, Flink®, Apache®, the squirrel logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation.