Skip to content

Commit

Permalink
Flink: Support log store
Browse files Browse the repository at this point in the history
  • Loading branch information
a49a committed Dec 6, 2022
1 parent 8e79259 commit b71ab95
Show file tree
Hide file tree
Showing 33 changed files with 1,727 additions and 59 deletions.
35 changes: 32 additions & 3 deletions core/src/main/java/org/apache/iceberg/io/WriteResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,30 @@
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.CharSequenceSet;

public class WriteResult implements Serializable {
private DataFile[] dataFiles;
private DeleteFile[] deleteFiles;
private CharSequence[] referencedDataFiles;

private Map<Integer, Long> logStorePartitionOffsets;

private WriteResult(
List<DataFile> dataFiles, List<DeleteFile> deleteFiles, CharSequenceSet referencedDataFiles) {
List<DataFile> dataFiles,
List<DeleteFile> deleteFiles,
CharSequenceSet referencedDataFiles,
Map<Integer, Long> logStorePartitionOffsets) {
this.dataFiles = dataFiles.toArray(new DataFile[0]);
this.deleteFiles = deleteFiles.toArray(new DeleteFile[0]);
this.referencedDataFiles = referencedDataFiles.toArray(new CharSequence[0]);
this.logStorePartitionOffsets = logStorePartitionOffsets;
}

public DataFile[] dataFiles() {
Expand All @@ -51,6 +59,14 @@ public CharSequence[] referencedDataFiles() {
return referencedDataFiles;
}

public Map<Integer, Long> logStorePartitionOffsets() {
return logStorePartitionOffsets;
}

public void setLogStorePartitionOffsets(Map<Integer, Long> logStorePartitionOffsets) {
this.logStorePartitionOffsets = logStorePartitionOffsets;
}

public static Builder builder() {
return new Builder();
}
Expand All @@ -59,18 +75,20 @@ public static class Builder {
private final List<DataFile> dataFiles;
private final List<DeleteFile> deleteFiles;
private final CharSequenceSet referencedDataFiles;
private Map<Integer, Long> logStorePartitionOffsets;

private Builder() {
this.dataFiles = Lists.newArrayList();
this.deleteFiles = Lists.newArrayList();
this.referencedDataFiles = CharSequenceSet.empty();
this.logStorePartitionOffsets = Maps.newHashMap();
}

public Builder add(WriteResult result) {
addDataFiles(result.dataFiles);
addDeleteFiles(result.deleteFiles);
addReferencedDataFiles(result.referencedDataFiles);

addOffsets(result.logStorePartitionOffsets);
return this;
}

Expand Down Expand Up @@ -109,8 +127,19 @@ public Builder addReferencedDataFiles(Iterable<CharSequence> files) {
return this;
}

public Builder addOffsets(Map<Integer, Long> newLogStorePartitionOffsets) {
for (Map.Entry<Integer, Long> entry : newLogStorePartitionOffsets.entrySet()) {
Long oldOffset = this.logStorePartitionOffsets.get(entry.getKey());
Long newOffset = entry.getValue();
if (oldOffset == null || oldOffset < newOffset) {
this.logStorePartitionOffsets.put(entry.getKey(), newOffset);
}
}
return this;
}

public WriteResult build() {
return new WriteResult(dataFiles, deleteFiles, referencedDataFiles);
return new WriteResult(dataFiles, deleteFiles, referencedDataFiles, logStorePartitionOffsets);
}
}
}
79 changes: 79 additions & 0 deletions flink-example/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

String flinkVersion = '1.16.0'
String flinkMajorVersion = '1.16'
String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion")

project(":iceberg-flink-example") {
apply plugin: 'com.github.johnrengelman.shadow'

tasks.jar.dependsOn tasks.shadowJar

dependencies {
implementation project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}")
implementation "org.apache.flink:flink-table-api-java:${flinkVersion}"
implementation "org.apache.flink:flink-table-api-java-bridge:${flinkVersion}"
implementation 'org.apache.flink:flink-runtime:1.16.0'
implementation 'org.apache.flink:flink-table-runtime:1.16.0'
// implementation 'org.apache.flink:flink-table-planner-loader:1.16.0'
implementation "org.apache.flink:flink-sql-connector-hive-2.3.9_2.12:1.16.0"
implementation 'org.apache.flink:flink-json:1.16.0'
implementation "org.apache.flink:flink-connector-kafka:${flinkVersion}"
implementation "org.apache.flink:flink-connector-base:${flinkVersion}"
implementation "org.apache.flink:flink-connector-files:${flinkVersion}"
implementation "org.apache.flink:flink-clients:1.16.0"
implementation "org.apache.hadoop:hadoop-client"
implementation 'org.apache.flink:flink-runtime-web:1.16.0'
implementation 'org.apache.flink:flink-sql-gateway-api:1.16.0'
implementation 'org.apache.flink:flink-table-planner_2.12:1.16.0'
implementation 'org.apache.flink:flink-csv:1.16.0'
}

shadowJar {
configurations = [project.configurations.runtimeClasspath]

zip64 true

// include the LICENSE and NOTICE files for the shaded Jar
from(projectDir) {
include 'LICENSE'
include 'NOTICE'
}

// Relocate dependencies to avoid conflicts
relocate 'org.apache.avro', 'org.apache.iceberg.shaded.org.apache.avro'
relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet'
relocate 'com.google', 'org.apache.iceberg.shaded.com.google'
relocate 'com.fasterxml', 'org.apache.iceberg.shaded.com.fasterxml'
relocate 'com.github.benmanes', 'org.apache.iceberg.shaded.com.github.benmanes'
relocate 'org.checkerframework', 'org.apache.iceberg.shaded.org.checkerframework'
relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded'
relocate 'org.apache.orc', 'org.apache.iceberg.shaded.org.apache.orc'
relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift'
relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra'
relocate 'org.apache.httpcomponents.client5', 'org.apache.iceberg.shaded.org.apache.httpcomponents.client5'

classifier null
}

jar {
enabled = false
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class LogStoreExample {

private LogStoreExample() {}

public static void main(String[] args) throws Exception {

Configuration configuration = new Configuration();
configuration.setString("table.exec.iceberg.use-flip27-source", "true");
configuration.setString("execution.checkpointing.interval", "60s");
configuration.setString("state.checkpoint-storage", "jobmanager");
configuration.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);

StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
TableEnvironment tEnv = StreamTableEnvironment.create(env);

tEnv.executeSql(
"CREATE CATALOG hive_catalog WITH (\n"
+ " 'type'='iceberg',\n"
+ " 'uri'='thrift://dev-node2:9083',\n"
+ " 'warehouse'='hdfs://ns1/dtInsight/hive/warehouse'\n"
+ ")");

tEnv.executeSql("USE CATALOG hive_catalog");

tEnv.executeSql("USE iceberg_w");

// tEnv.executeSql("CREATE TABLE default_catalog.default_database.f (\n" +
// " id BIGINT,\n" +
// " name STRING\n" +
// ") WITH (\n" +
// " 'connector' = 'filesystem',\n" +
// " 'path' = 'file:///Users/ada/tmp/log-store',\n" +
// " 'format' = 'csv'\n" +
// ")");

// tEnv.executeSql("CREATE TABLE flip27_log_2 (\n" +
// " id BIGINT,\n" +
// " name STRING\n" +
// ") WITH (\n" +
// " 'format-version' = '2'," +
// " 'log-store' = 'kafka'," +
// " 'kafka.bootstrap.servers'='172.16.100.109:9092'," +
// " 'kafka.topic'='flip27_log_2'\n" +
// ")");

// tEnv.executeSql("ALTER TABLE flip27_log SET ('format-version'='2')");
// tEnv.executeSql("ALTER TABLE flip27_log SET ('kafka.bootstrap.servers'='172.16.100.109:9092')");
// tEnv.executeSql("ALTER TABLE flip27_log SET ('kafka.topic'='flip27_log')");

// tEnv.executeSql("INSERT INTO log_store_v2 VALUES (3, 'bar')");
// tEnv.executeSql(
// "SELECT * FROM log_store_v2 /*+ OPTIONS('streaming'='true',
// 'monitor-interval'='1s')*/")
// .print();

// tEnv.executeSql(
// "INSERT INTO default_catalog.default_database.f SELECT * FROM log_store_v2 /*+
// OPTIONS('streaming'='true', 'monitor-interval'='1s', 'log-store'='none') */")
// ;

// tEnv.executeSql("INSERT INTO flip27_log_2 VALUES(2, 'bar')");

tEnv.executeSql(
"SELECT * FROM flip27_log_2 /*+ OPTIONS('log-store'='kafka') */")
.print();
}
}
10 changes: 10 additions & 0 deletions flink/v1.16/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
implementation project(':iceberg-parquet')
implementation project(':iceberg-hive-metastore')

implementation "org.apache.flink:flink-connector-kafka:${flinkVersion}"

// for dropwizard histogram metrics implementation
compileOnly "org.apache.flink:flink-metrics-dropwizard:${flinkVersion}"
compileOnly "org.apache.flink:flink-streaming-java:${flinkVersion}"
Expand Down Expand Up @@ -80,6 +82,14 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-data', configuration: 'testArtifacts')

testImplementation("org.apache.kafka:kafka_${scalaVersion}:2.8.1") {
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'com.fasterxml.jackson.core'
exclude group: 'com.fasterxml.jackson.dataformat'
exclude group: 'com.fasterxml.jackson.module'
exclude group: 'com.fasterxml.jackson.datatype'
}

// By default, hive-exec is a fat/uber jar and it exports a guava library
// that's really old. We use the core classifier to be able to override our guava
// version. Luckily, hive-exec seems to work okay so far with this version of guava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,16 @@ private FlinkConfigOptions() {}
SplitAssignerType.SIMPLE
+ ": simple assigner that doesn't provide any guarantee on order or locality."))
.build());

public static final ConfigOption<String> LOG_KEY_FORMAT =
ConfigOptions.key("log.key.format")
.stringType()
.defaultValue("json")
.withDescription("Specify the key message format of log system with primary key.");

public static final ConfigOption<String> LOG_FORMAT =
ConfigOptions.key("log.format")
.stringType()
.defaultValue("json")
.withDescription("Specify the message format of log system.");
}
Loading

0 comments on commit b71ab95

Please sign in to comment.