-
Notifications
You must be signed in to change notification settings - Fork 6
Flink SQL 介绍
在本章节 您将了解以下内容
- SQL on Flink 的设计目标
- Flink SQL 如何统一 stream/batch 处理
- SQL on Flink 的目标应用场景
- Flink 交互式 SQL CLI 客户端
下面的文档可能会为您的培训提供帮助:
- Streaming Concepts - Flink SQL 关于 streaming 的介绍,例如 time attributes 的配置 以及更新结果的处理。
- Flink SQL - Flink SQL 整体介绍。
- Built-In Functions - 介绍 Flink 内置函数。
- Flink SQL Client - 介绍 Flink SQL Client 及其配置。
SQL CLI 客户端通过一个配置文件来描述环境配置,包括当前存在的 source 和 sink 表以及 UDF(用户自定义函数)。您不需要添加或删除表或函数。 您将要学习如何创建和删除 views(视图)。
接下来,我们演示了了如何通过 CLI 来展示训练环境里的表和函数。
您可以通过 SHOW TABLES
命令列出当前所有已经存在的表。这个命令会列举所有的 source 和 sink 表以及 view(视图)。
Flink SQL> SHOW TABLES;
Rides
Fares
DriverChanges
Sink_TenMinPsgCnts
Sink_AreaCnts
如下所示,您可以使用 DESCRIBE
命令查阅一张表的 schema:
Flink SQL> DESCRIBE Rides;
root
|-- rideId: BIGINT
|-- taxiId: BIGINT
|-- isStart: BOOLEAN
|-- lon: FLOAT
|-- lat: FLOAT
|-- rideTime: TIMESTAMP(3) *ROWTIME*
|-- psgCnt: INT
Source 表 可以 可以被用于一个 SQL 查询的 FROM
子句。它们 不可以 用作 INSERT INTO
子句的目标表。
所有的 source tables 都是以 Apache Kafka topics 的形式存储的外部表。 Kafka 的消息为 JSON 编码。从 topic 里读取的每条消息被追加到对应的 source 表。
注意:当训练环境启动后,Source 表不会立即完整地生成。它们的消息记录会持续的导入 Kafka topic 中。当训练环境启动后消息导入便开始了 (docker-compose up -d
) 并且以 10 倍快的速度运行,例如,导入 10 分钟的消息记录大概需要 1 分钟时间 (基于消息记录的时间戳)。
我们将使用 Rides
source 表来做所有的练习。这张表包含了 2013 年初纽约市的出租车乘坐信息。每次出租车乘坐对应 2 条事件消息记录,一条 开始打车事件 和一条 结束打车事件。
The schema of the Rides
table is as follows:
rideId: BIGINT // the unique id of a ride (note, Rides contains two records per ride)
taxiId: BIGINT // the unique id of the taxi
isStart: BOOLEAN // flag for pick-up (true) or drop-off (false) event
lon: FLOAT // the longitude of the pick-up or drop-off location
lat: FLOAT // the latitude of the pick-up or drop-off location
rideTime: TIMESTAMP // the time of the pick-up or drop-off event
psgCnt: INT // the number of passengers on the ride
The Fares
source table contains information about about the fares paid for the taxi rides. The table contains one record for each taxi ride.
The schema of the Fares
table is as follows:
rideId: BIGINT // the unique id of the ride
payTime: TIMESTAMP // the time when the payment was made (same as timestamp of ride end event in Rides table)
payMethod: VARCHAR // the method of payment (CSH, CRD, DIS, NOC, UNK)
tip: FLOAT // the amount of paid tip
toll: FLOAT // the amount of paid toll
fare: FLOAT // the amount of paid fare
The DriverChanges
table contains one record for event when a taxi is driven by another driver than before, i.e., when the driver of a taxi changes. This might happen when a driver starts a new shift.
The schema of the DriverChanges
table is as follows:
taxiId: BIGINT // the unique id of the taxi
driverId: BIGINT // the unique id of the driver who starts using the taxi
usageStartTime: TIMESTAMP // the time when the driver starts using the taxi
Sink tables can be used as the target of an INSERT INTO
clause in a SQL query. They cannot be used in the FROM
clause of a SQL query.
The Sink_TenMinPsgCnts
table represents an external append sink that writes to the Apache Kafka topic TenMinPsgCnts
.
It consists of a start time, end time, and count:
Flink SQL> DESCRIBE Sink_TenMinPsgCnts;
root
|-- cntStart: TIMESTAMP(3)
|-- cntEnd: TIMESTAMP(3)
|-- cnt: BIGINT
You can monitor the Kafka topic of the TenMinsPsgCnts
table by running the following command in the folder that contains the docker-compose.yml
file:
docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic TenMinPsgCnts --from-beginning
The Sink_AreaCnts
table represents an external upsert sink that writes to the Elasticsearch index area-cnts
.
It consists of an area identifier and count:
Flink SQL> DESCRIBE Sink_AreaCnts;
root
|-- areaId: INT
|-- cnt: BIGINT
You can check the state of the sink index by accessing Elasticsearch's REST API:
- Show available indicies: ttp://localhost:9200/_cat/indices?v
- Show details of the
area-cnts
index: http://localhost:9200/area-cnt - Show stats of the
area-cnts
index: http://localhost:9200/area-cnt/_stats - Peeking into the
area-cnts
index: http://localhost:9200/area-cnt/_search
Note: The area-cnts
index is automatically created when the first query writes to it.
In order to explore the data of the Rides
table, execute a simple query:
SELECT * FROM Rides;
The CLI client will enter the result visualization mode and display the results.
rideId taxiId isStart lon lat rideTime psgCnt
150156 2013003948 false -73.98211 40.74796 2013-01-01 06:49:26.0 2
150538 2013003570 false -74.004684 40.72859 2013-01-01 06:49:26.0 1
151066 2013005078 true -73.97712 40.752007 2013-01-01 06:49:26.0 1
147794 2013010015 false -73.87098 40.774143 2013-01-01 06:49:27.0 1
148680 2013003578 false -73.96466 40.680794 2013-01-01 06:49:27.0 1
151067 2013002010 true -73.992256 40.750004 2013-01-01 06:49:27.0 2
You can leave the result visualization mode by pressing q
. The mode provides more functionality, such as skipping through pages or in/decreasing the update rate, as shown at the bottom.
The training environment provides a set of user-defined functions for the training data.
In order to list all user-defined functions use the SHOW FUNCTIONS
statement:
Flink SQL> SHOW FUNCTIONS;
timeDiff
toCoords
isInNYC
toAreaId
Drivers
The functions are defined as follows:
-
timeDiff(t1: TIMESTAMP, t2: TIMESTAMP): BIGINT
: Converts two timestamps into UTC, subtractst1 - t2
and returns the time difference in milliseconds. -
isInNYC(lon: FLOAT, lat: FLOAT): BOOLEAN
: Checks if a location is within the NYC area. -
toAreaId(lon: FLOAT, lat: FLOAT): INT
: Maps a location (longitude, latitude) to an area id that represents a cell of approximately 100x100 meters size. -
toCoords(areaId: INT): [lon: FLOAT, lat: FLOAT]
: Reverse method ofgetAreaId
to compute the longitude and latitude of the center of an area cell. -
Drivers(ts: TIMESTAMP): Table(taxiId: BIGINT, driverId: BIGINT, usageStartTime: TIMESTAMP)
is a special table-valued function, a so-called Temporal Table Function that returns for a timestampts
for every taxi the driver that most recently used it. This function will be discussed in the context of joins.
Views allow to define virtual tables from SQL queries. The view definition is parsed and syntactically validated immediately. However, the view is executed when it is referenced by a SELECT
or INSERT INTO
statement that is executed.
Views can be created within a CLI session using the CREATE VIEW
statement:
CREATE VIEW RideStarts AS SELECT * FROM Rides WHERE isStart;
Views created within a CLI session can be removed again using the DROP VIEW
statement:
DROP VIEW RideStarts;
Write a query that outputs the start and end event of ride 123
.
The output should look similar to:
isStart rideTime
true 2013-01-01 00:01:00.0
false 2013-01-01 00:07:00.0
Click to see the solution.
SELECT isStart, rideTime FROM Rides WHERE rideId=123;
The query filters by the ride id.
The task of this exercise is to cleanse the table of ride events by removing events that do not start or end in New York City.
The output should look similar to:
rideId taxiId isStart lon lat rideTime psgCnt
1 2013000001 true -73.99078 40.76088 2013-01-01 00:00:00.0 1
2 2013000002 true -73.978325 40.77809 2013-01-01 00:00:00.0 5
3 2013000003 true -73.98962 40.72999 2013-01-01 00:00:00.0 1
4 2013000004 true -73.981575 40.76763 2013-01-01 00:00:00.0 2
5 2013000005 true -74.00053 40.737343 2013-01-01 00:00:00.0 4
6 2013000006 true -73.866135 40.77109 2013-01-01 00:00:00.0 6
7 2013000007 true -74.00693 40.740765 2013-01-01 00:00:00.0 6
8 2013000008 true -73.955925 40.781887 2013-01-01 00:00:00.0 3
9 2013000009 true -73.99988 40.743343 2013-01-01 00:00:00.0 1
10 2013000010 true -73.989845 40.75804 2013-01-01 00:00:00.0 3
11 2013000011 true -73.870834 40.77377 2013-01-01 00:00:00.0 1
Click to see the solution.
SELECT * FROM Rides WHERE isInNYC(lon, lat);
The query filters by using the UDF isInNYC
.
Create a view nyc_view
for all ride events that happened in New York City.
The output of SELECT * FROM nyc_view
should be equal to the output of the previous exercise.
Click to see the solution.
CREATE VIEW nyc_view AS SELECT * FROM Rides WHERE isInNYC(lon, lat);
The statement creates a view from the query of the previous exercise.
Create a view nyc_area_view
and enrich all ride events with their area id.
The output of SELECT * FROM nyc_area_view
should look similar to:
rideId taxiId isStart areaId rideTime psgCnt
1 2013000001 true 47792 2013-01-01 00:00:00.0 1
2 2013000002 true 44301 2013-01-01 00:00:00.0 5
3 2013000003 true 54043 2013-01-01 00:00:00.0 1
4 2013000004 true 46298 2013-01-01 00:00:00.0 2
5 2013000005 true 52535 2013-01-01 00:00:00.0 4
6 2013000006 true 45881 2013-01-01 00:00:00.0 6
7 2013000007 true 51780 2013-01-01 00:00:00.0 6
8 2013000008 true 43567 2013-01-01 00:00:00.0 3
9 2013000009 true 51285 2013-01-01 00:00:00.0 1
10 2013000010 true 48292 2013-01-01 00:00:00.0 3
Click to see the solution.
CREATE VIEW nyc_area_view AS
SELECT rideId, taxiId, isStart, toAreaId(lon, lat) AS areaId, rideTime, psgCnt
FROM nyc_view;
The statement creates a view using the view from the previous exercise. The UDF converts the coordinates to area ids.
Apache Flink, Flink®, Apache®, the squirrel logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation.