From 94edd95483e6297ce055ab286f30f32104ae06a0 Mon Sep 17 00:00:00 2001 From: Eyal Trabelsi Date: Wed, 1 Aug 2018 10:55:55 +0300 Subject: [PATCH] - improve(example): minimal requirements, creating table using spark dataframe, loading iceberg table using spark --- examples/Convert table to Iceberg.ipynb | 93 +++++++++++++++++++++---- 1 file changed, 79 insertions(+), 14 deletions(-) diff --git a/examples/Convert table to Iceberg.ipynb b/examples/Convert table to Iceberg.ipynb index 867510cb5..882e949b3 100644 --- a/examples/Convert table to Iceberg.ipynb +++ b/examples/Convert table to Iceberg.ipynb @@ -1,13 +1,22 @@ { "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Prerequisites\n", + " - Spark 2.3.x\n", + " - Gradle >= 4.4" + ] + }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setup\n", - "\n", - "1. Create a Spark session\n", - "2. Add the iceberg-runtime Jar" + " 1. Create a Spark session\n", + " 1. Add the iceberg-runtime Jar\n", + " 1. Create variable for iceberg path and hive tables" ] }, { @@ -73,11 +82,21 @@ "%AddJar file:///home/user/iceberg-runtime-0.1.3.jar" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "val icebergTablePath = \"hdfs:/tmp/tables/job_metrics_tmp\"\n", + "val hiveTable = \"default.job_metrics\"" + ] + }, { "cell_type": "markdown", "metadata": {}, "source": [ - "# Drop and create a table in HDFS\n", + "# Drop and create a table in HDFS using hive tables\n", "\n", "[Spark Schema Helpers](https://netflix.github.io/iceberg/current/javadoc/index.html?com/netflix/iceberg/spark/SparkSchemaUtil.html)" ] @@ -138,24 +157,61 @@ "import com.netflix.iceberg.hadoop.HadoopTables\n", "import com.netflix.iceberg.spark.SparkSchemaUtil\n", "\n", - "val path = \"hdfs:/tmp/tables/job_metrics_tmp\"\n", + "{ // use a block to avoid values (conf, etc.) getting caught in closures\n", + "\n", + " // remove the temp table if it already exists\n", + " val conf = spark.sparkContext.hadoopConfiguration\n", + " val fs = new Path(icebergTablePath).getFileSystem(conf)\n", + " fs.delete(new Path(icebergTablePath), true /* recursive */ )\n", + "\n", + " // create the temp table using Spark utils to create a schema and partition spec\n", + " val tables = new HadoopTables(conf)\n", + " val icebergSchema = SparkSchemaUtil.schemaForTable(spark, hiveTable)\n", + " val spec = SparkSchemaUtil.specForTable(spark, hiveTable)\n", + "\n", + " tables.create(schema, spec, icebergTablePath)\n", + "\n", + " // show the schema\n", + " tables.load(icebergTablePath).schema\n", + "}\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Drop and create a table in HDFS using spark dataframe\n", + "\n", + "[Spark Schema Helpers](https://netflix.github.io/iceberg/current/javadoc/index.html?com/netflix/iceberg/spark/SparkSchemaUtil.html)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import org.apache.hadoop.fs.Path\n", + "import com.netflix.iceberg.hadoop.HadoopTables\n", + "import com.netflix.iceberg.spark.SparkSchemaUtil\n", + "import com.netflix.iceberg.PartitionSpec\n", "\n", "{ // use a block to avoid values (conf, etc.) getting caught in closures\n", "\n", " // remove the temp table if it already exists\n", " val conf = spark.sparkContext.hadoopConfiguration\n", - " val fs = new Path(path).getFileSystem(conf)\n", - " fs.delete(new Path(path), true /* recursive */ )\n", + " val fs = new Path(icebergTablePath).getFileSystem(conf)\n", + " fs.delete(new Path(icebergTablePath), true /* recursive */ )\n", "\n", " // create the temp table using Spark utils to create a schema and partition spec\n", " val tables = new HadoopTables(conf)\n", - " val schema = SparkSchemaUtil.schemaForTable(spark, \"default.job_metrics\")\n", - " val spec = SparkSchemaUtil.specForTable(spark, \"default.job_metrics\")\n", + " val icebergSchema = SparkSchemaUtil.convert(sparkDF.schema)\n", + " val spec = PartitionSpec.builderFor(icebergSchema).identity(partitionKey).build()\n", "\n", - " tables.create(schema, spec, path)\n", + " tables.create(schema, spec, icebergTablePath)\n", "\n", " // show the schema\n", - " tables.load(path).schema\n", + " tables.load(icebergTablePath).schema\n", "}\n" ] }, @@ -232,7 +288,7 @@ "import com.netflix.iceberg.spark.SparkTableUtil\n", "\n", "// get a data frame with the table's partitions\n", - "val partitions = SparkTableUtil.partitionDF(spark, \"default.job_metrics\")\n", + "val partitions = SparkTableUtil.partitionDF(spark, hiveTable)\n", " .filter($\"format\".contains(\"parquet\") || $\"format\".contains(\"avro\"))\n", "\n", "display(partitions.limit(10))" @@ -285,7 +341,7 @@ "\n", " // open the table and append the files from this partition\n", " val tables = new HadoopTables(new Configuration())\n", - " val table = tables.load(\"hdfs:/tmp/tables/job_metrics_tmp\")\n", + " val table = tables.load(icebergTablePath)\n", "\n", " // fast appends will create a manifest for the new files\n", " val append = table.newFastAppend\n", @@ -395,12 +451,21 @@ "table.currentSnapshot" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Loading iceberg table using spark\n" + ] + }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], - "source": [] + "source": [ + "val df = spark.read.format(\"iceberg\").load(icebergTablePath)" + ] } ], "metadata": {