TableSource
提供对存储在外部系统(数据库、键值存储、消息队列)或文件中的数据的访问。TableSource 在 TableEnvironment 中注册后可以通过Table API或SQL查询访问它。
TableSink
发送 Table到外部存储系统,如数据库、键值存储、消息队列或文件系统(以不同的编码方式,如 CSV、Parquet 或 ORC)。
TableFactory
允许将与外部系统的连接声明与实际实现分离。表工厂从规范化的、基于字符串的属性创建表源(sources)和表接收器(sinks)的已配置实例。可以使用描述符(Descriptor)
以编程方式生成属性,也可以通过SQL Client的YAML配置文件生成属性。
查看常见概念和API页面,了解如何注册TableSource以及如何通过 TableSink 发出 Table的详细信息。有关如何使用工厂的示例,请参阅内置源,接收器和格式页面。
TableSource
是一个通用接口,允许 Table API 和 SQL 查询访问存储在外部系统中的数据。它提供了表的模式以及映射到具有表模式的行的记录。根据是否在流式或批量查询中使用 TableSource
,记录将作为 DataSet
或 DataStream
生成。
如果在流查询中使用 TableSource
,它必须实现 StreamTableSource
接口;如果在批处理查询中使用它,它必须实现 BatchTableSource
接口。TableSource
还可以实现这两个接口,并可用于流查询和批处理查询。
StreamTableSource
和 BatchTableSource
扩展了定义了以下方法的基本接口 TableSource
:
TableSource<T> {
public TableSchema getTableSchema();
public TypeInformation<T> getReturnType();
public String explainSource();
}
TableSource[T] {
def getTableSchema: TableSchema
def getReturnType: TypeInformation[T]
def explainSource: String
}
-
getTableSchema()
: 返回表的模式,即表中字段的名称和类型。字段类型是使用 Flink 的类型信息(TypeInformation)
(参见Table API types和SQL types)定义的。 -
getReturnType()
: 返回DataStream
(StreamTableSource
) 或DataSet
(BatchTableSource
) 的物理类型和由TableSource
生成的记录。 -
explainSource()
: 返回描述TableSource
的字符串。此方法是可选的,仅用于显示目的。
TableSource
接口将逻辑表模式与返回的 DataStream
或 DataSet
的物理类型分开。因此,表模式(getTableSchema()
)的所有字段必须映射到物理返回类型(getReturnType()
)的对应类型的字段。默认情况下,这个映射是基于字段名完成的。例如,定义具有两个字段[name: String, size: Integer]
的表模式的 TableSource
需要一个 TypeInformation
,其中至少有两个字段名为 name
和 size
,类型为 String
和 Integer
。 这可能是一个 PojoTypeInfo
或 RowTypeInfo
,它有两个名为 name
和 size
的字段匹配类型。
然而,有些类型,如 Tuple 或 CaseClass 类型,确实支持自定义字段名。如果 TableSource
返回具有固定字段名的类型的 DataStream
或 DataSet
,它可以实现 DefinedFieldMapping
接口,将字段名从表模式映射到物理返回类型的字段名。
BatchTableSource
接口扩展了 TableSource
接口,并定义了一个额外的方法:
BatchTableSource<T> implements TableSource<T> {
public DataSet<T> getDataSet(ExecutionEnvironment execEnv);
}
BatchTableSource[T] extends TableSource[T] {
def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
}
getDataSet(execEnv)
: 返回包含表数据的DataSet
。DataSet
的类型必须与TableSource.getReturnType()
方法定义的返回类型相同。DataSet
可以通过使用 DataSet API 的常规数据源创建。通常,BatchTableSource
是通过包装InputFormat
或 batch connector来实现的。
StreamTableSource
接口扩展了 TableSource
接口,并定义了一个额外的方法:
StreamTableSource<T> implements TableSource<T> {
public DataStream<T> getDataStream(StreamExecutionEnvironment execEnv);
}
StreamTableSource[T] extends TableSource[T] {
def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
}
getDataStream(execEnv)
: 返回带有表数据的DataStream
。DataStream
的类型必须与TableSource.getReturnType()
方法定义的返回类型相同。DataStream
可以通过使用 DataStream API 的常规数据源(data source)创建。通常,StreamTableSource
是通过包装SourceFunction
或流连接器(stream connector)来实现的。
流式表API和SQL查询的基于时间的操作(如窗口聚合或连接)需要显式指定时间属性。
TableSource
在其表模式中将 time 属性定义为类型为 Types.SQL_TIMESTAMP
的字段。与模式中的所有常规字段相比,时间属性不能与表源返回类型中的物理字段匹配。相反,TableSource
通过实现某个接口来定义时间属性。
处理时间属性通常用于流查询。处理时间属性返回访问它的操作符的当前壁钟时间(wall-clock time)。TableSource
通过实现 DefinedProctimeAttribute
接口来定义处理时间属性。接口如下:
DefinedProctimeAttribute {
public String getProctimeAttribute();
}
DefinedProctimeAttribute {
def getProctimeAttribute: String
}
getProctimeAttribute()
: 返回处理时间属性的名称。必须在表模式中定义指定的属性类型Types.SQL_TIMESTAMP
,并且可以在基于时间的操作中使用。DefinedProctimeAttribute
表源可以通过返回null
来定义没有处理时间属性。
注意 StreamTableSource
和 BatchTableSource
都可以实现 DefinedProctimeAttribute
并定义处理时间属性。对于 BatchTableSource
,处理时间字段在表扫描期间使用当前时间戳初始化。
Rowtime属性是 TIMESTAMP
类型的属性,在流和批处理查询中以统一的方式处理。
通过指定,可以将类型为 SQL_TIMESTAMP
的表模式字段声明为 rowtime 属性
- 字段名,
TimestampExtractor
计算属性的实际值(通常来自一个或多个其他字段),以及- 指定如何为 rowtim e属性生成水印的
WatermarkStrategy
。
TableSource
通过实现 DefinedRowtimeAttributes
接口定义了一个 rowtime 属性。接口如下:
DefinedRowtimeAttribute {
public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors();
}
DefinedRowtimeAttributes {
def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor]
}
getRowtimeAttributeDescriptors()
: 返回一个RowtimeAttributeDescriptor
列表。RowtimeAttributeDescriptor
描述了一个具有以下属性的 rowtime 属性:attributeName
: 表模式中 rowtime 属性的名称。必须使用类型Types.SQL_TIMESTAMP
定义字段。timestampExtractor
: 时间戳提取器(timestamp extractor)从具有返回类型的记录中提取时间戳。例如,它可以将长字段转换为时间戳,或者解析字符串编码的时间戳。Flink 附带一组内置的TimestampExtractor
实现,用于常见的用例。还可以提供自定义实现。watermarkStrategy
: 水印策略(watermark strategy)定义了如何为 rowtime 属性生成水印。Flink 为常见用例提供了一组内置的WatermarkStrategy
实现。还可以提供自定义实现。
注意,虽然 getRowtimeAttributeDescriptors()
方法返回一个描述符列表,但目前只支持一个 rowtime 属性。我们计划在将来删除这个限制,并支持具有多个 rowtime 属性的表。
请注意,StreamTableSource
和 BatchTableSource
都可以实现 DefinedRowtimeAttributes
并定义 rowtime 属性。在这两种情况下,行时间字段都是使用 TimestampExtractor
提取的。因此,实现 StreamTableSource
和 BatchTableSource
并定义 rowtime 属性的 TableSource
为流处理和批处理查询提供了完全相同的数据。
Flink 为常用用例提供了 TimestampExtractor
实现。
以下 TimestampExtractor
实现目前可用:
ExistingField(fieldName)
: 从现有的LONG
,SQL_TIMESTAMP
或格式化的STRING
字段中提取 rowtime 属性的值。这样一个字符串的一个例子是 ‘2018-05-28 12:34:56.000’。StreamRecordTimestamp()
: 从DataStream
StreamRecord
的时间戳中提取 rowtime 属性的值。注意,此TimestampExtractor
不适用于批处理表源。
自定义的 TimestampExtractor
可以通过实现相应的接口来定义。
Flink 为常用用例提供了水印策略(WatermarkStrategy)
实现。
目前提供以下 WatermarkStrategy
实现:
AscendingTimestamps
: 用于提升时间戳的水印策略。带有无序时间戳的记录将被视为迟到。BoundedOutOfOrderTimestamps(delay)
: 时间戳的水印策略,其最多在指定的延迟之外是无序的。PreserveWatermarks()
: 应该从底层的DataStream
中保留指示水印的策略。
可以通过实现相应的接口来定义自定义的 WatermarkStrategy
。
TableSource
通过实现 ProjectableTableSource
接口支持投影下推。该接口定义了一个单一的方法:
ProjectableTableSource<T> {
public TableSource<T> projectFields(int[] fields);
}
ProjectableTableSource[T] {
def projectFields(fields: Array[Int]): TableSource[T]
}
projectFields(fields)
: 返回具有调整后的物理返回类型的TableSource
的 副本(copy)。fields
参数提供了必须由TableSource
提供的字段的索引。索引与物理返回类型的TypeInformation
有关,而 不是 与逻辑表模式相关。复制的TableSource
必须调整其返回类型和返回的DataStream
或DataSet
。复制的TableSource
的TableSchema
不能更改,即它必须与原来的TableSource
相同。如果TableSource
实现了DefinedFieldMapping
接口,则必须将字段映射调整为新的返回类型。
ProjectableTableSource
为项目平面字段添加了支持。如果 TableSource
定义了一个带有嵌套模式的表,它可以实现 NestedFieldsProjectableTableSource
来将投影扩展到嵌套字段。NestedFieldsProjectableTableSource
的定义如下:
NestedFieldsProjectableTableSource<T> {
public TableSource<T> projectNestedFields(int[] fields, String[][] nestedFields);
}
NestedFieldsProjectableTableSource[T] {
def projectNestedFields(fields: Array[Int], nestedFields: Array[Array[String]]): TableSource[T]
}
projectNestedField(fields, nestedFields)
: 返回TableSource
的具有调整后的物理返回类型的一个 副本(copy)。可以删除或重新排序物理返回类型的字段,但不能更改它们的类型。该方法的契约(contract)本质上与ProjectableTableSource.projectFields()
方法相同。此外,nestedFields
参数包含fields
列表中的每个字段索引,这是查询访问的所有嵌套字段的路径列表。所有其他嵌套字段都不需要在TableSource
生成的记录中读取、解析和设置。最重要的是 不能更改投影字段的类型,但是可以将未使用的字段设置为null或默认值。
FilterableTableSource
接口增加了对 TableSource
下推过滤器的支持。扩展这个接口的 TableSource
能够过滤记录,这样返回的 DataStream
或 DataSet
返回的记录就会更少。
接口如下:
FilterableTableSource<T> {
public TableSource<T> applyPredicate(List<Expression> predicates);
public boolean isFilterPushedDown();
}
FilterableTableSource[T] {
def applyPredicate(predicates: java.util.List[Expression]): TableSource[T]
def isFilterPushedDown: Boolean
}
applyPredicate(predicates)
: 返回带有添加谓词的TableSource
的 副本(copy)。predicates
参数是一个可变的连接谓词列表,它们“提供(offered)”给TableSource
。TableSource
接受通过从列表中删除谓词来计算谓词的值。列表中剩下的谓词将由后续过滤器操作符计算。isFilterPushedDown()
: 如果之前调用了applyPredicate()
方法,则返回 true。因此,对于从applyPredicate()
调用返回的所有TableSource
实例,isFilterPushedDown()
必须返回 true。
TableSink
指定如何将 Table
发送到外部系统或位置。该接口是通用的,因此它可以支持不同的存储位置和格式。对于批处理表和流表,有不同的表接收器(table sinks)。
通用接口如下:
TableSink<T> {
public TypeInformation<T> getOutputType();
public String[] getFieldNames();
public TypeInformation[] getFieldTypes();
public TableSink<T> configure(String[] fieldNames, TypeInformation[] fieldTypes);
}
TableSink[T] {
def getOutputType: TypeInformation<T>
def getFieldNames: Array[String]
def getFieldTypes: Array[TypeInformation]
def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation]): TableSink[T]
}
调用 TableSink#configure
方法将表的模式(字段名和类型)传递给 TableSink
。该方法必须返回 TableSink 的一个新实例,该实例被配置为发出所提供的表模式。
定义外部 TableSink
以发出批处理表。
接口如下:
BatchTableSink<T> implements TableSink<T> {
public void emitDataSet(DataSet<T> dataSet);
}
BatchTableSink[T] extends TableSink[T] {
def emitDataSet(dataSet: DataSet[T]): Unit
}
定义一个外部 TableSink
来发出只包含插入更改的流表。
接口如下:
AppendStreamTableSink<T> implements TableSink<T> {
public void emitDataStream(DataStream<T> dataStream);
}
AppendStreamTableSink[T] extends TableSink[T] {
def emitDataStream(dataStream: DataStream<T>): Unit
}
如果表也被 update 或 delete 修改,则会抛出一个 TableException
。
定义一个外部 TableSink
,用于发出具有插入、更新和删除更改的流表。
接口如下:
RetractStreamTableSink<T> implements TableSink<Tuple2<Boolean, T>> {
public TypeInformation<T> getRecordType();
public void emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
}
RetractStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] {
def getRecordType: TypeInformation[T]
def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit
}
该表将被转换为一个累加和收回消息流,这些消息被编码为 Java Tuple2
。第一个字段是一个布尔标志,用于指示消息类型(true
表示插入,false
表示删除)。第二个字段保存所请求类型 T
的记录。
定义一个外部 TableSink
,用于发出具有插入、更新和删除更改的流表。
接口如下:
UpsertStreamTableSink<T> implements TableSink<Tuple2<Boolean, T>> {
public void setKeyFields(String[] keys);
public void setIsAppendOnly(boolean isAppendOnly);
public TypeInformation<T> getRecordType();
public void emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);
}
UpsertStreamTableSink[T] extends TableSink[Tuple2[Boolean, T]] {
def setKeyFields(keys: Array[String]): Unit
def setIsAppendOnly(isAppendOnly: Boolean): Unit
def getRecordType: TypeInformation[T]
def emitDataStream(dataStream: DataStream[Tuple2[Boolean, T]]): Unit
}
该表必须具有唯一的 key 字段(原子或复合)或仅附加。如果表没有唯一 key 并且不是仅附加,则抛出 TableException
。表的唯一键由 UpsertStreamTableSink#setKeyFields()
方法配置。
该表将被转换为 upsert 和 delete 消息流,这些消息被编码为 Java Tuple2
。第一个字段是一个布尔标志,用于指示消息类型。第二个字段保存所请求类型 T
的记录。
具有 true 布尔字段的消息是已配置 key 的 upsert 消息。带有错误标志的消息是已配置 key 的删除消息。如果表是仅附加的,则所有消息都将具有 true 标志,并且必须解释为插入。
TableFactory
允许从基于字符串的属性创建不同的表相关实例。调用所有可用工厂以匹配给定的属性集和相应的工厂类。
工厂利用 Java 的服务提供者接口(Service Provider Interfaces, SPI)进行发现。这意味着每个依赖项和 JAR 文件都应在 META_INF/services
资源目录中包含一个文件 org.apache.flink.table.factories.TableFactory
,该文件列出了它提供的所有可用表工厂。
每个表工厂都需要实现以下接口:
package org.apache.flink.table.factories;
interface TableFactory {
Map<String, String> requiredContext();
List<String> supportedProperties();
}
package org.apache.flink.table.factories
trait TableFactory {
def requiredContext(): util.Map[String, String]
def supportedProperties(): util.List[String]
}
requiredContext()
: 指定已为此工厂实现的上下文。如果满足指定的属性和值集,框架保证仅匹配此工厂。典型属性可能是connector.type
,format.type
或update-mode
。诸如connector.property-version
和format.property-version
之类的属性 keys 保留用于将来的向后兼容性情况。supportedProperties
: 此工厂可以处理的属性 keys 列表。此方法将用于验证。如果传递了该工厂无法处理的属性,则会抛出异常。该列表不得包含上下文指定的 keys。
为了创建特定实例,工厂类可以实现 org.apache.flink.table.factories
中提供的一个或多个接口:
BatchTableSourceFactory
: 创建批处理表源。BatchTableSinkFactory
: 创建批处理表接收器。StreamTableSoureFactory
: 创建流表源。StreamTableSinkFactory
: 创建流表接收器。DeserializationSchemaFactory
: 创建反序列化架构格式。SerializationSchemaFactory
: 创建序列化架构格式。
工厂的发现经历了多个阶段:
- 发现所有可用的工厂。
- 按工厂类过滤(例如,
StreamTableSourceFactory
)。 - 通过匹配上下文过滤。
- 按支持的属性过滤。
- 验证一个工厂是否匹配,否则抛出
AmbiguousTableFactoryException
或NoMatchingTableFactoryException
。
以下示例显示如何为参数化提供附加 connector.debug
属性标志的自定义流式源。
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
class MySystemTableSourceFactory implements StreamTableSourceFactory<Row> {
@Override
public Map<String, String> requiredContext() {
Map<String, String> context = new HashMap<>();
context.put("update-mode", "append");
context.put("connector.type", "my-system");
return context;
}
@Override
public List<String> supportedProperties() {
List<String> list = new ArrayList<>();
list.add("connector.debug");
return list;
}
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
boolean isDebug = Boolean.valueOf(properties.get("connector.debug"));
# additional validation of the passed properties can also happen here
return new MySystemAppendTableSource(isDebug);
}
}
import java.util
import org.apache.flink.table.sources.StreamTableSource
import org.apache.flink.types.Row
class MySystemTableSourceFactory extends StreamTableSourceFactory[Row] {
override def requiredContext(): util.Map[String, String] = {
val context = new util.HashMap[String, String]()
context.put("update-mode", "append")
context.put("connector.type", "my-system")
context
}
override def supportedProperties(): util.List[String] = {
val properties = new util.ArrayList[String]()
properties.add("connector.debug")
properties
}
override def createStreamTableSource(properties: util.Map[String, String]): StreamTableSource[Row] = {
val isDebug = java.lang.Boolean.valueOf(properties.get("connector.debug"))
# additional validation of the passed properties can also happen here
new MySystemAppendTableSource(isDebug)
}
}
在 SQL 客户端环境文件中,先前显示的工厂可以声明为:
tables:
- name: MySystemTable
type: source
update-mode: append
connector:
type: my-system
debug: true
YAML 文件被转换为扁平的(flattened)字符串属性,并使用描述与外部系统的连接的属性调用表工厂:
update-mode=append
connector.type=my-system
connector.debug=true
注意,属性如 tables.#.name
或 tables.#.type
是 SQL 客户端的特定类型,不传递给任何工厂。type
属性根据执行环境决定是否需要发现 BatchTableSourceFactory
/StreamTableSourceFactory
(用于source
)、BatchTableSinkFactory
/StreamTableSinkFactory
(用于 sink
)或两者(用于 both
)。
对于具有解释性 Scaladoc/Javadoc 的类型安全的编程方法,Table 和 SQL API 在 org.apache.flink.table.descriptors
中提供了转换为基于字符串的属性的描述符。有关源(sources),接收器(sinks)和格式(formats)的信息,请参阅内置描述符(built-in descriptors)作为参考。
在我们的示例中,MySystem
的连接器可以扩展 ConnectorDescriptor
,如下所示:
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import java.util.HashMap;
import java.util.Map;
/**
* Connector to MySystem with debug mode.
*/
public class MySystemConnector extends ConnectorDescriptor {
public final boolean isDebug;
public MySystemConnector(boolean isDebug) {
super("my-system", 1, false);
this.isDebug = isDebug;
}
@Override
protected Map<String, String> toConnectorProperties() {
Map<String, String> properties = new HashMap<>();
properties.put("connector.debug", Boolean.toString(isDebug));
return properties;
}
}
import org.apache.flink.table.descriptors.ConnectorDescriptor
import java.util.HashMap
import java.util.Map
/**
* Connector to MySystem with debug mode.
*/
class MySystemConnector(isDebug: Boolean) extends ConnectorDescriptor("my-system", 1, false) {
override protected def toConnectorProperties(): Map[String, String] = {
val properties = new HashMap[String, String]
properties.put("connector.debug", isDebug.toString)
properties
}
}
然后可以在 API 中使用描述符,如下所示:
StreamTableEnvironment tableEnv = // ...
tableEnv
.connect(new MySystemConnector(true))
.inAppendMode()
.registerTableSource("MySystemTable");
val tableEnv: StreamTableEnvironment = // ...
tableEnv
.connect(new MySystemConnector(isDebug = true))
.inAppendMode()
.registerTableSource("MySystemTable")