Skip to content

Python Table API使用

Hequn Cheng edited this page Feb 22, 2020 · 24 revisions

在本章节 您将了解以下内容

  • Python Table API相关的概念及实现架构
  • 核心算子及使用
  • Python UDF编写
  • Job提交及运行
  • Python Shell使用

Slides

文档

下面的文档可能会为您的培训提供帮助:

Source 表

Rides

我们将使用 Rides source 表来做所有的练习。这张表包含了 2013 年初纽约市的出租车乘坐信息。每次出租车乘坐对应 2 条事件消息记录,一条 开始打车事件 和一条 结束打车事件

Rides 表的 schema 如下:

rideId: BIGINT       // 一次打车的唯一 id (注意,Rides 一次打车对应两条消息记录)
taxiId: BIGINT       // 出租车的唯一 id
isStart: BOOLEAN     // flag 标识上车 (true) 或者下车 (false) 事件
lon: FLOAT           // 上车或下车地点的精度
lat: FLOAT           // 上车或下车地点的纬度
rideTime: TIMESTAMP  // 上车或下车时的时间戳
psgCnt: INT          // 打车人数

练习

1-流程串通,从kafka读并写到kafka

启动命令

cd sql-training/pyflink
docker-compose exec jobmanager ./bin/flink run -py /opt/examples/from_kafka_to_kafka.py

查看输出

docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic TempResults

输出的内容大致如下:

{"rideId":3321,"taxiId":2013003189,"isStart":true,"lon":-73.99606,"lat":40.725132,"psgCnt":2,"rideTime":"2013-01-01T00:11:47Z"}
{"rideId":744,"taxiId":2013000742,"isStart":false,"lon":-73.97362,"lat":40.791283,"psgCnt":1,"rideTime":"2013-01-01T00:11:48Z"}
{"rideId":3322,"taxiId":2013003190,"isStart":true,"lon":-73.98382,"lat":40.74381,"psgCnt":1,"rideTime":"2013-01-01T00:11:48Z"}
{"rideId":3323,"taxiId":2013003191,"isStart":true,"lon":-74.00485,"lat":40.72102,"psgCnt":4,"rideTime":"2013-01-01T00:11:48Z"}

停止job

请访问http://localhost:8081/#/overview ,并选择运行的job,然后点Cancle.

2-找到一个特定的 Ride

修改pyflink/examples/find_ride_id.py,找出 ride 123 的上下车事件记录。(主体程序已写好,只需要替换脚本里的???即可)

执行命令:

docker-compose exec jobmanager ./bin/flink run -py /opt/examples/find_ride_id.py

输出的内容大致如下:

{"rideId":123,"taxiId":2013000123,"isStart":true,"lon":-73.98036,"lat":40.730495,"psgCnt":6,"rideTime":"2013-01-01T00:01:00Z"}
{"rideId":123,"taxiId":2013000123,"isStart":false,"lon":-73.98785,"lat":40.740902,"psgCnt":6,"rideTime":"2013-01-01T00:07:00Z"}
点击查看答案
    st_env\
        .from_path("source")\
        .where("rideId=123")\
        .insert_into("sink")

该查询使用 ride id 过滤数据。


3-将Upsert表写入Elasticsearch

修改pyflink/examples/area_cnts.py文件,查询每个地区的行程数并将结果写入Sink_AreaCnts表。

Sink_AreaCnts是一张upsert表,对应Elasticsearch中的一个索引。 由于是upsert表,Sink_AreaCnts可以接收新数据或者根据key更新原有的数据。

查询结果的schema必须与目标表的schema一致。以下是Sink_AreaCnts表的schema。

Flink SQL> DESCRIBE Sink_AreaCnts;
root
 |-- areaId: INT
 |-- cnt: BIGINT

您可以通过http://localhost:9200访问Elasticsearch的REST API。 以下链接可用于查看Sink_AreaCnts表所对应的索引area-cnts

注意: Elasticsearch会在第一条数据写入Sink_AreaCnts时自动创建area-cnts索引,所以在初始状态下并没有索引数据。

在任务执行过程中,您可以通过刷新页面http://localhost:9200/area-cnts/_stats看到document的计数器(_all.primaries.docs.count_all.primaries.docs.deleted)在不断增长。

{
  "_shards": {
    "total": 10,
    "successful": 5,
    "failed": 0
  },
  "_all": {
    "primaries": {
      "docs": {
        "count": 15233,
        "deleted": 78447
      },
      "store": {
        "size_in_bytes": 3576466
      },
      ...

注意: 目前INSERT INTO语句无法通过SQL CLI停止。请通过Flink Web UI(http://localhost:8081)查看并停止任务。

点击查看答案
    # register udf
    st_env.register_java_function("toAreaId", "com.ververica.sql_training.udfs.ToAreaId")

    # query
    st_env.from_path("source")\
        .add_columns("toAreaId(lon, lat) as areaId")\
        .group_by("areaId")\
        .select("areaId, count(1)")\
        .insert_into("sink")


def register_cnt_sink(st_env):
    st_env.connect(
        Elasticsearch()
            .version("6")
            .host("elasticsearch", 9200, "http")
            .index("area-cnts")
            .document_type('areacnt')
            .key_delimiter("$")) \
        .with_schema(
            Schema()
                .field("areaId", DataTypes.INT())
                .field("cnt", DataTypes.BIGINT())) \
        .with_format(
           Json()
               .derive_schema()) \
        .in_upsert_mode() \
        .register_table_sink("sink")

4-计算行程时长

修改pyflink/examples/ride_duration.py文件,计算行程时长。在这个练习中,我们想计算每一次出租车行程的耗时,既行程开始事件和结束事件的时间差,单位为分钟。这意味着我们需要基于行程的 rideId 来连接开始事件和结束事件。

这里,我们只须关注发生在纽约市,且行程开始和结束事件发生在最近2小时纽约市的行程。

点击查看提示
  • 过滤 Rides 表来分开行程开始事件和行程结束事件。
  • 使用时间窗口连接来关联行程的开始和结束时间。在JOIN时,只关联那些结束事件(当然它的开始事件已经到达)在2小时内到达的事件。
  • 请使用提供的 timeDiff 函数来返回两个时间戳的时间间隔(单位为毫秒)。


输出数据类似如下:
rideId        durationMin
 52693                 13
 46868                 24
 53226                 12
 53629                 11
 55651                  7
 43220                 31
 53082                 12
 54716                  9
 55125                  9
 57211                  4
 44795                 28
 53563                 12
点击查看答案
    # register java udf (toAreaId)
    st_env.register_java_function("isInNYC", "com.ververica.sql_training.udfs.IsInNYC")
    st_env.register_java_function("timeDiff", "com.ververica.sql_training.udfs.TimeDiff")

    # query
    source_table = st_env.from_path("Rides")

    left_table = source_table\
        .where("isStart.isTrue && isInNYC(lon, lat)")\
        .select("rideId as startRideId, rideTime as startRideTime")

    right_table = source_table\
        .where("isStart.isFalse && isInNYC(lon, lat)")\
        .select("rideId as endRideId, rideTime as endRideTime")

    left_table.join(right_table,
                    "startRideId == endRideId && endRideTime.between(startRideTime, startRideTime + 2.hours) ")\
        .select("startRideId as rideId, timeDiff(startRideTime, endRideTime)/60000 as durationMin")\
        .insert_into("TempResults")

    # execute
    st_env.execute("ride_duration")

这个查询在两个子查询之间做了一个时间窗口JOIN。第一个子查询 left_table 返回纽约市已经开始的行程记录,第二个子查询 right_table 返回纽约市的行程结束事件。

这两个表通过 rideId 字段来做JOIN操作。另外,WHERE 条件指定了开始事件和结束事件的时间限制,这确保了只会JOIN那些结束事件发生在开始事件2小时内的两次出租车行程记录。

结果中的 durationMin 列指的是行程时长,单位为分钟。


5-Python udf编写

继续修改pyflink/examples/ride_duration.py文件,将java isInNYC函数替换成python udf。java对应的udf如下

/**
 * Table API / SQL Scalar UDF to check if a coordinate is in NYC.
 */
public class IsInNYC extends ScalarFunction {

	public boolean eval(float lon, float lat) {
		return isInNYC(lon, lat);
	}
}

// geo boundaries of the area of NYC
private static double LON_EAST = -73.7;
private static double LON_WEST = -74.05;
private static double LAT_NORTH = 41.0;
private static double LAT_SOUTH = 40.5;

public static boolean isInNYC(float lon, float lat) {

  return !(lon > LON_EAST || lon < LON_WEST) &&
      !(lat > LAT_NORTH || lat < LAT_SOUTH);
}
点击查看答案
    @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()], result_type=DataTypes.BOOLEAN())
    def is_in_nyc(lon, lat):
        LON_EAST = -73.7
        LON_WEST = -74.05
        LAT_NORTH = 41.0
        LAT_SOUTH = 40.5
        return not (lon > LON_EAST or lon < LON_WEST) and not (lat > LAT_NORTH or lat < LAT_SOUTH)

    # register java udf (toAreaId)
    st_env.register_function("isInNYC", is_in_nyc)