Skip to content

Commit

Permalink
Data Lakehouse Flink wip
Browse files Browse the repository at this point in the history
  • Loading branch information
amstee committed Jul 27, 2024
1 parent cce9f95 commit efcd972
Showing 1 changed file with 54 additions and 227 deletions.
281 changes: 54 additions & 227 deletions content/post/flink-datalake/index.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
---
title: "Build your own Data Lakehouse in minutes with Flink SQL"
title: "Build your own Data Lakehouse with Flink SQL"
date: 2024-06-02
layout: article
image: "lakehouse.jpg"
categories:
- flink
- kubernertes
tags:
- Lakehouse
- Flink
- Iceberg
- Kubernetes
---

**This post is still a work in progress**

## Data Lakehouse

### What is a Data Lakehouse ?
Expand Down Expand Up @@ -113,16 +113,24 @@ I’m sure most of you are familiar with Git, and you know it is today the de-fa
Now let’s get started:

```bash
git clone TODO && cd flink-lakehouse-demo
kubectl create ns flink-lakehouse && kubectl config set-context --current --namespace=flink-lakehouse
```

## Nessie deployment

<details>
<summary>TODO</summary>
TODO
</details>
### Setup

```bash
helm repo add nessie-helm https://charts.projectnessie.org
helm repo update
```

### Deployment

```bash
helm install nessie -n flink-lakehouse helm/nessie
```


## Flink deployment

Expand All @@ -140,7 +148,7 @@ As I said earlier Flink is designed to be extended, and this is done by making t
Steps:

<details>
<summary>Download the necessary JARs</summary>
<summary>JARs</summary>

```bash
mkdir jars
Expand All @@ -152,9 +160,24 @@ mkdir jars
> - [iceberg-flink-runtime-1.17-1.5.2.jar](https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.17/1.5.2/) (Iceberg + Flink)
</details>

<details>
<summary>flink-hadoop-entrypoint.sh</summary>

This make sure our hadoop classpath is well defined: `flink-hadoop-entrypoint.sh`

```bash
#!/usr/bin/env bash

# Note: hadoop_home is set in docker image
export HADOOP_CLASSPATH="$(hadoop classpath)"

/docker-entrypoint.sh
```
</details>

<details>
<summary>Create a Dockerfile</summary>
<summary>Dockerfile</summary>

```docker
# Scala is needed for iceberg runtime
Expand Down Expand Up @@ -213,220 +236,12 @@ RUN mkdir /opt/flink/plugins/gs-fs-hadoop && cp /opt/flink/opt/flink-gs-fs-hadoo
# * Flink iceberg runtime
COPY ./jars/ /opt/flink/lib/
# Replace entrypoint to properly set Hadoop classpath
COPY ./entrypoint/docker-entrypoint.sh /docker-entrypoint.sh
COPY ./entrypoint/flink-hadoop-entrypoint.sh /flink-hadoop-entrypoint.sh
ENTRYPOINT ["/flink-hadoop-entrypoint.sh"]
```
</details>

<details>
<summary>Tweak our Docker Entrypoint script</summary>

This file is just a copy of Flink’s entrypoint, we just added `export HADOOP_CLASSPATH="$(hadoop classpath)"` to make sure it is properly initialised

```bash
#!/usr/bin/env bash

###############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################

COMMAND_STANDALONE="standalone-job"
COMMAND_HISTORY_SERVER="history-server"

# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
CONF_FILE_DIR="${FLINK_HOME}/conf"

# Note: hadoop_home is set in docker image
export HADOOP_CLASSPATH="$(hadoop classpath)"

drop_privs_cmd() {
if [ $(id -u) != 0 ]; then
# Don't need to drop privs if EUID != 0
return
elif [ -x /sbin/su-exec ]; then
# Alpine
echo su-exec flink
else
# Others
echo gosu flink
fi
}

copy_plugins_if_required() {
if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then
return 0
fi

echo "Enabling required built-in plugins"
for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do
echo "Linking ${target_plugin} to plugin directory"
plugin_name=${target_plugin%.jar}

mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"
if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then
echo "Plugin ${target_plugin} does not exist. Exiting."
exit 1
else
ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"
echo "Successfully enabled ${target_plugin}"
fi
done
}

set_config_options() {
local config_parser_script="$FLINK_HOME/bin/config-parser-utils.sh"
local config_dir="$FLINK_HOME/conf"
local bin_dir="$FLINK_HOME/bin"
local lib_dir="$FLINK_HOME/lib"

local config_params=()

while [ $# -gt 0 ]; do
local key="$1"
local value="$2"

config_params+=("-D${key}=${value}")

shift 2
done

if [ "${#config_params[@]}" -gt 0 ]; then
"${config_parser_script}" "${config_dir}" "${bin_dir}" "${lib_dir}" "${config_params[@]}"
fi
}

prepare_configuration() {
local config_options=()

config_options+=("jobmanager.rpc.address" "${JOB_MANAGER_RPC_ADDRESS}")
config_options+=("blob.server.port" "6124")
config_options+=("query.server.port" "6125")

if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
config_options+=("taskmanager.numberOfTaskSlots" "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}")
fi

if [ ${#config_options[@]} -ne 0 ]; then
set_config_options "${config_options[@]}"
fi

if [ -n "${FLINK_PROPERTIES}" ]; then
process_flink_properties "${FLINK_PROPERTIES}"
fi
}

process_flink_properties() {
local flink_properties_content=$1
local config_options=()

local OLD_IFS="$IFS"
IFS=$'\n'
for prop in $flink_properties_content; do
prop=$(echo $prop | tr -d '[:space:]')

if [ -z "$prop" ]; then
continue
fi

IFS=':' read -r key value <<< "$prop"

value=$(echo $value | envsubst)

config_options+=("$key" "$value")
done
IFS="$OLD_IFS"

if [ ${#config_options[@]} -ne 0 ]; then
set_config_options "${config_options[@]}"
fi
}

maybe_enable_jemalloc() {
if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"
JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"
if [ -f "$JEMALLOC_PATH" ]; then
export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH
elif [ -f "$JEMALLOC_FALLBACK" ]; then
export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK
else
if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then
MSG_PATH=$JEMALLOC_PATH
else
MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"
fi
echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."
fi
fi
}

maybe_enable_jemalloc

copy_plugins_if_required

prepare_configuration

args=("$@")
if [ "$1" = "help" ]; then
printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"
printf " Or $(basename "$0") help\n\n"
printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"
exit 0
elif [ "$1" = "jobmanager" ]; then
args=("${args[@]:1}")

echo "Starting Job Manager"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_STANDALONE} ]; then
args=("${args[@]:1}")

echo "Starting Job Manager"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then
args=("${args[@]:1}")

echo "Starting History Server"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
elif [ "$1" = "taskmanager" ]; then
args=("${args[@]:1}")

echo "Starting Task Manager"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
elif [ "$1" = "sqlgateway" ]; then
args=("${args[@]:1}")

echo "Starting SQL Gateway"

exec $(drop_privs_cmd) "$FLINK_HOME/bin/sql-gateway.sh" start-foreground "${args[@]}"
fi

args=("${args[@]}")

# Running command in pass-through mode
exec $(drop_privs_cmd) "${args[@]}"
```
</details>

### 3. Deploy our Flink cluster
### Deploy our Flink cluster

<details>
<summary> gcp-credentials.yaml </summary>
Expand Down Expand Up @@ -455,7 +270,7 @@ data:
type: "iceberg"
warehouse: "gs://${YOUR_GCS_BUCKET}/"
catalog-impl: "org.apache.iceberg.nessie.NessieCatalog"
uri: "http://nessie:19120/api/v1"
uri: "http://nessie.nessie-ns:19120/api/v1"
io-impl: "org.apache.iceberg.gcp.gcs.GCSFileIO"
```
</details>
Expand Down Expand Up @@ -586,7 +401,9 @@ CREATE TABLE TaxiTrips (
);
```

**4. Good old `SELECT *`**
> This configuration setting `'source.monitor-interval' = '1m'` tells Flink to periodically check the specified source location (in this case, the GCS bucket) for new or modified files. The '1m' value means it will perform this check every 1 minute.

**4. Validate it works**

```sql
SELECT * FROM NycDataset;
Expand Down Expand Up @@ -649,12 +466,22 @@ select * FROM nessie.nycdataset.TaxiTrips LIMIT 10;
select count(*) FROM nessie.nycdataset.TaxiTrips;
```

> This fact that our source table is unbouded effectively creates a pipeline that keeps your Iceberg table up-to-date with the latest data from your raw files, enabling near real-time data availability in your Data Lakehouse. It's a powerful feature that bridges the gap between batch and stream processing, allowing for fresher data in your analytics and downstream applications.

## Conclusion

TODO
Building your own Data Lakehouse with Flink SQL, Iceberg, and Nessie is not only possible but also relatively straightforward. This modern data architecture combines the best of data lakes and data warehouses, offering a flexible, scalable, and cost-effective solution for managing and analyzing large volumes of diverse data.

By leveraging Apache Flink's powerful stream processing capabilities, Apache Iceberg's high-performance table format, and Nessie's version control features, we've created a robust foundation for a Data Lakehouse. This setup allows for efficient data ingestion, processing, and analysis while maintaining data consistency and enabling advanced features like schema evolution and time travel.

Our step-by-step guide demonstrated how to deploy these components on Kubernetes, integrate with cloud storage (GCS in this case), and perform basic data operations. We showed how to query raw Parquet files and transform them into an Iceberg table, showcasing the seamless transition from traditional data formats to a Lakehouse structure.

This approach opens up numerous possibilities for organizations looking to modernize their data infrastructure. It enables real-time analytics, supports machine learning workflows, and provides a unified platform for various data types and workloads. As data volumes continue to grow and analytical needs become more complex, the Data Lakehouse architecture implemented here offers a scalable and adaptable solution.

By mastering these tools and concepts, data engineers and analysts can build powerful, flexible data platforms that meet the evolving needs of modern data-driven organizations. The journey to building your own Data Lakehouse may seem daunting at first, but with the right tools and approach, it's an achievable and rewarding endeavor that can transform how your organization manages and leverages its data assets.

## Coming Soon
## Coming Next

- Proto Confluent format for Flink
- Build your Data Lakehouse from BigQuery using BQ Storage Read API
- Spoiler: BQ users can read up to 300 TiB of data per month at no charge.
- Flink BigQuery source connector using Storage Read API
- BQ users can read up to 300 TiB of data per month at no charge.

0 comments on commit efcd972

Please sign in to comment.