-
Notifications
You must be signed in to change notification settings - Fork 6
Flink SQL 介绍
In this session you will learn about
- The design goals of SQL on Flink
- Flink SQL's approach to unified stream/batch processing
- Targeted use cases for SQL on Flink
- Flink's interactive SQL CLI client
The following documentation pages might be useful during the training:
- Streaming Concepts - Streaming-specific documentation for Flink SQL such as configuration of time attributes and handling of updating results.
- Flink SQL - Documentation of SQL coverage.
- Built-In Functions - Documentation of built-in functions.
- Flink SQL Client - Documentation for the SQL Client and its configuration.
The SQL CLI client is configured with an environment configuration file that specifies existing (source and sink) tables and user-defined functions. You will not need to add or remove tables or functions. However, you will learn how to create and drop views.
In the following, we show how to use the CLI client and present the tables and functions of the training environment.
You can list all available tables using the SHOW TABLES
command. It lists table sources and sinks as well as views.
Flink SQL> SHOW TABLES;
Rides
Fares
DriverChanges
Sink_TenMinPsgCnts
Sink_AreaCnts
You can check the schema of a table using the DESCRIBE
command as shown in the following:
Flink SQL> DESCRIBE Rides;
root
|-- rideId: BIGINT
|-- taxiId: BIGINT
|-- isStart: BOOLEAN
|-- lon: FLOAT
|-- lat: FLOAT
|-- rideTime: TIMESTAMP(3) *ROWTIME*
|-- psgCnt: INT
Source tables can be used in the FROM
clause of a SQL query. They cannot be used as the target of an INSERT INTO
clause in a SQL query.
All source tables are external tables stored in separate Apache Kafka topics. The Kafka records are encoded in JSON. Each record that is read from a topic is appended to the corresponding source table.
Note: The source tables are not immediately completely available when the training environment is started. Their records are continuously ingested into their Kafka topics. The ingestion starts when the training environment is started (docker-compose up -d
) and runs at 10x fast-forward speed, i.e., it takes 1 minute to ingest 10 minutes of data (based on the timestamp of the records).
We will use the Rides
source table for all of the exercises. It contains information about taxi rides that took place in New York City in the beginning of 2013. Each ride is represented by two event records, a start ride event and an end ride event.
The schema of the Rides
table is as follows:
rideId: BIGINT // the unique id of a ride (note, Rides contains two records per ride)
taxiId: BIGINT // the unique id of the taxi
isStart: BOOLEAN // flag for pick-up (true) or drop-off (false) event
lon: FLOAT // the longitude of the pick-up or drop-off location
lat: FLOAT // the latitude of the pick-up or drop-off location
rideTime: TIMESTAMP // the time of the pick-up or drop-off event
psgCnt: INT // the number of passengers on the ride
The Fares
source table contains information about about the fares paid for the taxi rides. The table contains one record for each taxi ride.
The schema of the Fares
table is as follows:
rideId: BIGINT // the unique id of the ride
payTime: TIMESTAMP // the time when the payment was made (same as timestamp of ride end event in Rides table)
payMethod: VARCHAR // the method of payment (CSH, CRD, DIS, NOC, UNK)
tip: FLOAT // the amount of paid tip
toll: FLOAT // the amount of paid toll
fare: FLOAT // the amount of paid fare
The DriverChanges
table contains one record for event when a taxi is driven by another driver than before, i.e., when the driver of a taxi changes. This might happen when a driver starts a new shift.
The schema of the DriverChanges
table is as follows:
taxiId: BIGINT // the unique id of the taxi
driverId: BIGINT // the unique id of the driver who starts using the taxi
usageStartTime: TIMESTAMP // the time when the driver starts using the taxi
Sink tables can be used as the target of an INSERT INTO
clause in a SQL query. They cannot be used in the FROM
clause of a SQL query.
The Sink_TenMinPsgCnts
table represents an external append sink that writes to the Apache Kafka topic TenMinPsgCnts
.
It consists of a start time, end time, and count:
Flink SQL> DESCRIBE Sink_TenMinPsgCnts;
root
|-- cntStart: TIMESTAMP(3)
|-- cntEnd: TIMESTAMP(3)
|-- cnt: BIGINT
You can monitor the Kafka topic of the TenMinsPsgCnts
table by running the following command in the folder that contains the docker-compose.yml
file:
docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic TenMinPsgCnts --from-beginning
The Sink_AreaCnts
table represents an external upsert sink that writes to the Elasticsearch index area-cnts
.
It consists of an area identifier and count:
Flink SQL> DESCRIBE Sink_AreaCnts;
root
|-- areaId: INT
|-- cnt: BIGINT
You can check the state of the sink index by accessing Elasticsearch's REST API:
- Show available indicies: ttp://localhost:9200/_cat/indices?v
- Show details of the
area-cnts
index: http://localhost:9200/area-cnt - Show stats of the
area-cnts
index: http://localhost:9200/area-cnt/_stats - Peeking into the
area-cnts
index: http://localhost:9200/area-cnt/_search
Note: The area-cnts
index is automatically created when the first query writes to it.
In order to explore the data of the Rides
table, execute a simple query:
SELECT * FROM Rides;
The CLI client will enter the result visualization mode and display the results.
rideId taxiId isStart lon lat rideTime psgCnt
150156 2013003948 false -73.98211 40.74796 2013-01-01 06:49:26.0 2
150538 2013003570 false -74.004684 40.72859 2013-01-01 06:49:26.0 1
151066 2013005078 true -73.97712 40.752007 2013-01-01 06:49:26.0 1
147794 2013010015 false -73.87098 40.774143 2013-01-01 06:49:27.0 1
148680 2013003578 false -73.96466 40.680794 2013-01-01 06:49:27.0 1
151067 2013002010 true -73.992256 40.750004 2013-01-01 06:49:27.0 2
You can leave the result visualization mode by pressing q
. The mode provides more functionality, such as skipping through pages or in/decreasing the update rate, as shown at the bottom.
The training environment provides a set of user-defined functions for the training data.
In order to list all user-defined functions use the SHOW FUNCTIONS
statement:
Flink SQL> SHOW FUNCTIONS;
timeDiff
toCoords
isInNYC
toAreaId
Drivers
The functions are defined as follows:
-
timeDiff(t1: TIMESTAMP, t2: TIMESTAMP): BIGINT
: Converts two timestamps into UTC, subtractst1 - t2
and returns the time difference in milliseconds. -
isInNYC(lon: FLOAT, lat: FLOAT): BOOLEAN
: Checks if a location is within the NYC area. -
toAreaId(lon: FLOAT, lat: FLOAT): INT
: Maps a location (longitude, latitude) to an area id that represents a cell of approximately 100x100 meters size. -
toCoords(areaId: INT): [lon: FLOAT, lat: FLOAT]
: Reverse method ofgetAreaId
to compute the longitude and latitude of the center of an area cell. -
Drivers(ts: TIMESTAMP): Table(taxiId: BIGINT, driverId: BIGINT, usageStartTime: TIMESTAMP)
is a special table-valued function, a so-called Temporal Table Function that returns for a timestampts
for every taxi the driver that most recently used it. This function will be discussed in the context of joins.
Views allow to define virtual tables from SQL queries. The view definition is parsed and syntactically validated immediately. However, the view is executed when it is referenced by a SELECT
or INSERT INTO
statement that is executed.
Views can be created within a CLI session using the CREATE VIEW
statement:
CREATE VIEW RideStarts AS SELECT * FROM Rides WHERE isStart;
Views created within a CLI session can be removed again using the DROP VIEW
statement:
DROP VIEW RideStarts;
Write a query that outputs the start and end event of ride 123
.
The output should look similar to:
isStart rideTime
true 2013-01-01 00:01:00.0
false 2013-01-01 00:07:00.0
Click to see the solution.
SELECT isStart, rideTime FROM Rides WHERE rideId=123;
The query filters by the ride id.
The task of this exercise is to cleanse the table of ride events by removing events that do not start or end in New York City.
The output should look similar to:
rideId taxiId isStart lon lat rideTime psgCnt
1 2013000001 true -73.99078 40.76088 2013-01-01 00:00:00.0 1
2 2013000002 true -73.978325 40.77809 2013-01-01 00:00:00.0 5
3 2013000003 true -73.98962 40.72999 2013-01-01 00:00:00.0 1
4 2013000004 true -73.981575 40.76763 2013-01-01 00:00:00.0 2
5 2013000005 true -74.00053 40.737343 2013-01-01 00:00:00.0 4
6 2013000006 true -73.866135 40.77109 2013-01-01 00:00:00.0 6
7 2013000007 true -74.00693 40.740765 2013-01-01 00:00:00.0 6
8 2013000008 true -73.955925 40.781887 2013-01-01 00:00:00.0 3
9 2013000009 true -73.99988 40.743343 2013-01-01 00:00:00.0 1
10 2013000010 true -73.989845 40.75804 2013-01-01 00:00:00.0 3
11 2013000011 true -73.870834 40.77377 2013-01-01 00:00:00.0 1
Click to see the solution.
SELECT * FROM Rides WHERE isInNYC(lon, lat);
The query filters by using the UDF isInNYC
.
Create a view nyc_view
for all ride events that happened in New York City.
The output of SELECT * FROM nyc_view
should be equal to the output of the previous exercise.
Click to see the solution.
CREATE VIEW nyc_view AS SELECT * FROM Rides WHERE isInNYC(lon, lat);
The statement creates a view from the query of the previous exercise.
Create a view nyc_area_view
and enrich all ride events with their area id.
The output of SELECT * FROM nyc_area_view
should look similar to:
rideId taxiId isStart areaId rideTime psgCnt
1 2013000001 true 47792 2013-01-01 00:00:00.0 1
2 2013000002 true 44301 2013-01-01 00:00:00.0 5
3 2013000003 true 54043 2013-01-01 00:00:00.0 1
4 2013000004 true 46298 2013-01-01 00:00:00.0 2
5 2013000005 true 52535 2013-01-01 00:00:00.0 4
6 2013000006 true 45881 2013-01-01 00:00:00.0 6
7 2013000007 true 51780 2013-01-01 00:00:00.0 6
8 2013000008 true 43567 2013-01-01 00:00:00.0 3
9 2013000009 true 51285 2013-01-01 00:00:00.0 1
10 2013000010 true 48292 2013-01-01 00:00:00.0 3
Click to see the solution.
CREATE VIEW nyc_area_view AS
SELECT rideId, taxiId, isStart, toAreaId(lon, lat) AS areaId, rideTime, psgCnt
FROM nyc_view;
The statement creates a view using the view from the previous exercise. The UDF converts the coordinates to area ids.
Apache Flink, Flink®, Apache®, the squirrel logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation.