Skip to content

Latest commit

 

History

History
112 lines (64 loc) · 7.62 KB

38.md

File metadata and controls

112 lines (64 loc) · 7.62 KB

HDFS Connector

This connector provides a Sink that writes partitioned files to any filesystem supported by Hadoop FileSystem. To use this connector, add the following dependency to your project: 该连接器提供了一个Sink,可将分区文件写入Hadoop FileSystem支持的任何文件系统。要使用此连接器,请将以下依赖项添加到您的项目中:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-filesystem_2.11</artifactId>
  <version>1.7.1</version>
</dependency>

Note that the streaming connectors are currently not part of the binary distribution. See here for information about how to package the program with the libraries for cluster execution. 请注意,流连接器当前不是二进制分发的一部分。有关如何将程序与库一起打包以执行群集的信息,请参见此处。

Bucketing File Sink 桶式文件接收器

The bucketing behaviour as well as the writing can be configured but we will get to that later. This is how you can create a bucketing sink which by default, sinks to rolling files that are split by time: 可以配置存储桶的行为以及编写,但是稍后我们将进行介绍。这是您可以创建存储区接收器的方式,默认情况下,该存储区接收到按时间拆分的滚动文件:

DataStream<String> input = ...;

input.addSink(new BucketingSink<String>("/base/path"));
val input: DataStream[String] = ...

input.addSink(new BucketingSink[String]("/base/path"))

The only required parameter is the base path where the buckets will be stored. The sink can be further configured by specifying a custom bucketer, writer and batch size. 唯一需要的参数是存储桶的基本路径。可以通过指定自定义存储区,写入器和批处理大小来进一步配置接收器。

By default the bucketing sink will split by the current system time when elements arrive and will use the datetime pattern "yyyy-MM-dd--HH" to name the buckets. This pattern is passed to DateTimeFormatter with the current system time and JVM’s default timezone to form a bucket path. Users can also specify a timezone for the bucketer to format bucket path. A new bucket will be created whenever a new date is encountered. For example, if you have a pattern that contains minutes as the finest granularity you will get a new bucket every minute. Each bucket is itself a directory that contains several part files: each parallel instance of the sink will create its own part file and when part files get too big the sink will also create a new part file next to the others. When a bucket becomes inactive, the open part file will be flushed and closed. A bucket is regarded as inactive when it hasn’t been written to recently. By default, the sink checks for inactive buckets every minute, and closes any buckets which haven’t been written to for over a minute. This behaviour can be configured with setInactiveBucketCheckInterval() and setInactiveBucketThreshold() on a BucketingSink. 默认情况下,当元素到达时"yyyy-MM-dd--HH",存储区接收器将按当前系统时间划分,并将使用datetime模式命名存储区。该模式传递给DateTimeFormatter与当前系统时间和JVM的默认时区一起形成存储桶路径。用户还可以为存储区指定时区以格式化存储区路径。每当遇到新日期时,都会创建一个新存储桶。例如,如果您有一个包含分钟作为最细粒度的模式,那么您将每分钟获得一个新的存储桶。每个存储桶本身就是一个包含几个零件文件的目录:每个并行的接收器实例将创建自己的零件文件,并且当零件文件太大时,接收器还将在其他文件旁边创建一个新的零件文件。当存储桶变为非活动状态时,打开的零件文件将被刷新并关闭。如果最近未将其写入存储桶,则该存储桶被视为非活动桶。默认情况下,接收器每分钟检查一次不活动的存储桶,并关闭超过一分钟未写入的所有存储桶。setInactiveBucketCheckInterval()并setInactiveBucketThreshold()在BucketingSink。

You can also specify a custom bucketer by using setBucketer() on a BucketingSink. If desired, the bucketer can use a property of the element or tuple to determine the bucket directory. 您还可以setBucketer()在上使用来指定自定义存储区BucketingSink。如果需要,存储区管理程序可以使用元素或元组的属性来确定存储区目录。

The default writer is StringWriter. This will call toString() on the incoming elements and write them to part files, separated by newline. To specify a custom writer use setWriter() on a BucketingSink. If you want to write Hadoop SequenceFiles you can use the provided SequenceFileWriter which can also be configured to use compression. 默认编写器为StringWriter。这将调用toString()传入的元素,并将其写入用换行符分隔的零件文件中。要指定自定义编写器,请setWriter()在上使用BucketingSink。如果要编写Hadoop SequenceFiles,则可以使用提供的SequenceFileWriter,也可以配置为使用压缩。

There are two configuration options that specify when a part file should be closed and a new one started: 有两个配置选项,指定何时应关闭零件文件以及何时启动一个新的零件文件:

  • By setting a batch size (The default part file size is 384 MB) 通过设置批处理大小(默认零件文件大小为384 MB)
  • By setting a batch roll over time interval (The default roll over interval is Long.MAX_VALUE) 通过设置批次过渡时间间隔(默认过渡时间间隔为Long.MAX_VALUE)

A new part file is started when either of these two conditions is satisfied. 当满足这两个条件之一时,将启动一个新的零件文件。 Example:

DataStream<Tuple2<IntWritable,Text>> input = ...;

BucketingSink<String> sink = new BucketingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles")));
sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins

input.addSink(sink);
val input: DataStream[Tuple2[IntWritable, Text]] = ...

val sink = new BucketingSink[String]("/base/path")
sink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles")))
sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB, sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins 
input.addSink(sink)

This will create a sink that writes to bucket files that follow this schema: 这将创建一个接收器,该接收器写入遵循此架构的存储桶文件:

/base/path/{date-time}/part-{parallel-task}-{count}

Where date-time is the string that we get from the date/time format, parallel-task is the index of the parallel sink instance and count is the running number of part files that were created because of the batch size or batch roll over interval. date-time从日期/时间格式中获取的字符串在哪里,parallel-task是并行接收器实例的索引,并且count是由于批次大小或批次过渡间隔而创建的零件文件的运行数量。

For in-depth information, please refer to the JavaDoc for BucketingSink. 有关详细信息,请参阅JavaDoc for BucketingSink。