Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extending example #46

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 79 additions & 14 deletions examples/Convert table to Iceberg.ipynb
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Prerequisites\n",
" - Spark 2.3.x\n",
" - Gradle >= 4.4"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Setup\n",
"\n",
"1. Create a Spark session\n",
"2. Add the iceberg-runtime Jar"
" 1. Create a Spark session\n",
" 1. Add the iceberg-runtime Jar\n",
" 1. Create variable for iceberg path and hive tables"
]
},
{
Expand Down Expand Up @@ -73,11 +82,21 @@
"%AddJar file:///home/user/iceberg-runtime-0.1.3.jar"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"val icebergTablePath = \"hdfs:/tmp/tables/job_metrics_tmp\"\n",
"val hiveTable = \"default.job_metrics\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Drop and create a table in HDFS\n",
"# Drop and create a table in HDFS using hive tables\n",
"\n",
"[Spark Schema Helpers](https://netflix.github.io/iceberg/current/javadoc/index.html?com/netflix/iceberg/spark/SparkSchemaUtil.html)"
]
Expand Down Expand Up @@ -138,24 +157,61 @@
"import com.netflix.iceberg.hadoop.HadoopTables\n",
"import com.netflix.iceberg.spark.SparkSchemaUtil\n",
"\n",
"val path = \"hdfs:/tmp/tables/job_metrics_tmp\"\n",
"{ // use a block to avoid values (conf, etc.) getting caught in closures\n",
"\n",
" // remove the temp table if it already exists\n",
" val conf = spark.sparkContext.hadoopConfiguration\n",
" val fs = new Path(icebergTablePath).getFileSystem(conf)\n",
" fs.delete(new Path(icebergTablePath), true /* recursive */ )\n",
"\n",
" // create the temp table using Spark utils to create a schema and partition spec\n",
" val tables = new HadoopTables(conf)\n",
" val icebergSchema = SparkSchemaUtil.schemaForTable(spark, hiveTable)\n",
" val spec = SparkSchemaUtil.specForTable(spark, hiveTable)\n",
"\n",
" tables.create(schema, spec, icebergTablePath)\n",
"\n",
" // show the schema\n",
" tables.load(icebergTablePath).schema\n",
"}\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Drop and create a table in HDFS using spark dataframe\n",
"\n",
"[Spark Schema Helpers](https://netflix.github.io/iceberg/current/javadoc/index.html?com/netflix/iceberg/spark/SparkSchemaUtil.html)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import org.apache.hadoop.fs.Path\n",
"import com.netflix.iceberg.hadoop.HadoopTables\n",
"import com.netflix.iceberg.spark.SparkSchemaUtil\n",
"import com.netflix.iceberg.PartitionSpec\n",
"\n",
"{ // use a block to avoid values (conf, etc.) getting caught in closures\n",
"\n",
" // remove the temp table if it already exists\n",
" val conf = spark.sparkContext.hadoopConfiguration\n",
" val fs = new Path(path).getFileSystem(conf)\n",
" fs.delete(new Path(path), true /* recursive */ )\n",
" val fs = new Path(icebergTablePath).getFileSystem(conf)\n",
" fs.delete(new Path(icebergTablePath), true /* recursive */ )\n",
"\n",
" // create the temp table using Spark utils to create a schema and partition spec\n",
" val tables = new HadoopTables(conf)\n",
" val schema = SparkSchemaUtil.schemaForTable(spark, \"default.job_metrics\")\n",
" val spec = SparkSchemaUtil.specForTable(spark, \"default.job_metrics\")\n",
" val icebergSchema = SparkSchemaUtil.convert(sparkDF.schema)\n",
" val spec = PartitionSpec.builderFor(icebergSchema).identity(partitionKey).build()\n",
"\n",
" tables.create(schema, spec, path)\n",
" tables.create(schema, spec, icebergTablePath)\n",
"\n",
" // show the schema\n",
" tables.load(path).schema\n",
" tables.load(icebergTablePath).schema\n",
"}\n"
]
},
Expand Down Expand Up @@ -232,7 +288,7 @@
"import com.netflix.iceberg.spark.SparkTableUtil\n",
"\n",
"// get a data frame with the table's partitions\n",
"val partitions = SparkTableUtil.partitionDF(spark, \"default.job_metrics\")\n",
"val partitions = SparkTableUtil.partitionDF(spark, hiveTable)\n",
" .filter($\"format\".contains(\"parquet\") || $\"format\".contains(\"avro\"))\n",
"\n",
"display(partitions.limit(10))"
Expand Down Expand Up @@ -285,7 +341,7 @@
"\n",
" // open the table and append the files from this partition\n",
" val tables = new HadoopTables(new Configuration())\n",
" val table = tables.load(\"hdfs:/tmp/tables/job_metrics_tmp\")\n",
" val table = tables.load(icebergTablePath)\n",
"\n",
" // fast appends will create a manifest for the new files\n",
" val append = table.newFastAppend\n",
Expand Down Expand Up @@ -395,12 +451,21 @@
"table.currentSnapshot"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Loading iceberg table using spark\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
"source": [
"val df = spark.read.format(\"iceberg\").load(icebergTablePath)"
]
}
],
"metadata": {
Expand Down