Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

usage help #39

Closed
eyaltrabelsi opened this issue Jul 6, 2018 · 19 comments
Closed

usage help #39

eyaltrabelsi opened this issue Jul 6, 2018 · 19 comments

Comments

@eyaltrabelsi
Copy link

First of all I think this package can be super beneficial so kudos.
Can you guide me in the process of installing and using this amazing package either by:

  • installation guide including prerequisites ( or even better a docker image)
  • tutorial, i saw there is a small example in Jupiter notebook .

thanks for the hard work

@rdblue
Copy link
Contributor

rdblue commented Jul 10, 2018

Sure! I haven't published a release yet, but you can build fairly easily. Just download gradle 4.4 and build using gradle build.

That will build the project and should produce the iceberg-runtime Jar in runtime/build/libs/. Just drop that Jar into your Spark 2.3.0 jars directory and you should be able to read and write Iceberg tables.

For an example of creating an Iceberg table, you can look at the example. The API is fairly easy, but I can answer questions if you have any.

@eyaltrabelsi
Copy link
Author

@rdblue first thanks for the help i managed to run the jar on spark but still have some questions (I dont use hdfs i use S3 and example is hdfs) i get

import com.netflix.iceberg.hadoop.HadoopTables
import com.netflix.iceberg.spark.SparkSchemaUtil

val input_table_path = "s3:////" // This path holds parquet files partitioned
val iceberg_table_path = "s3:///iceberg/" // This path exists
spark.read.parquet(input_table_path).createOrReplaceTempView("<table_name>")
spark.sql("create table default.<table_name> as select * from <table_name>")

{
val conf = spark.sparkContext.hadoopConfiguration
val tables = new HadoopTables(conf)
val schema = SparkSchemaUtil.schemaForTable(spark, "<table_name>")
val spec = SparkSchemaUtil.specForTable(spark, "<table_name>")
tables.create(schema, spec,iceberg_table_path) // java.lang.NullPointerException
tables.load(iceberg_table_path).schema
}

  • Was I right creating table ? if not how do you create spark table with database?

    spark.read.parquet(input_table_path).createOrReplaceTempView("<table_name>")
    spark.sql("create table default.<table_name> as select * from <table_name>")

  • I get null pointer when running tables.create due to the line at com.netflix.iceberg.TableMetadata.newTableMetadata(TableMetadata.java:47) do you know why ?:)

@rdblue
Copy link
Contributor

rdblue commented Jul 11, 2018

You can use HadoopTables with S3 paths, but because S3 doesn't implement atomic rename, the commit is not guaranteed to be atomic. You might have concurrent commits clobber one another. That's why we use a metastore implementation to swap pointers to metadata files.

Looks like your table create failed with NPE? What was the exception message and stack trace? If that fails, then you can't read with Spark.

Also, this conversion is for Hive tables. You can alter it to get it working for a directory of Parquet files, but it works out of the box for partitioned Hive tables.

@eyaltrabelsi
Copy link
Author

@rdblue
You might have concurrent commits clobber one another. That's why we use a metastore implementation to swap pointers to metadata files.

by metastore you mean metacat/hive meta store ?

Also, this conversion is for Hive tables.

When I used spark temp views it looked for database which is not supported am i missing anything ?

You can alter it to get it working for a directory of Parquet files, but it works out of the box for partitioned Hive tables.

if the s3 files are partitioned its enough right ?

Looks like your table create failed with NPE? What was the exception message and stack trace? If that fails, then you can't read with Spark.

java.lang.NullPointerException
at com.netflix.iceberg.TableMetadata.newTableMetadata(TableMetadata.java:47)
at com.netflix.iceberg.hadoop.HadoopTables.create(HadoopTables.java:78)
at line8d1d755474794127b4d6ab30ec657cbb29.$read$$iw$$iw$$iw$$iw$$iw$$iw.(command-22420:11)
at line8d1d755474794127b4d6ab30ec657cbb29.$read$$iw$$iw$$iw$$iw$$iw.(command-22420:61)
at line8d1d755474794127b4d6ab30ec657cbb29.$read$$iw$$iw$$iw$$iw.(command-22420:63)
at line8d1d755474794127b4d6ab30ec657cbb29.$read$$iw$$iw$$iw.(command-22420:65)
at line8d1d755474794127b4d6ab30ec657cbb29.$read$$iw$$iw.(command-22420:67)
at line8d1d755474794127b4d6ab30ec657cbb29.$read$$iw.(command-22420:69)
at line8d1d755474794127b4d6ab30ec657cbb29.$read.(command-22420:71)
at line8d1d755474794127b4d6ab30ec657cbb29.$read$.(command-22420:75)
at line8d1d755474794127b4d6ab30ec657cbb29.$read$.(command-22420)
at line8d1d755474794127b4d6ab30ec657cbb29.$eval$.$print$lzycompute(:7)
at line8d1d755474794127b4d6ab30ec657cbb29.$eval$.$print(:6)
at line8d1d755474794127b4d6ab30ec657cbb29.$eval.$print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:186)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply$mcV$sp(ScalaDriverLocal.scala:189)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:189)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:189)
at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:500)
at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:456)
at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:189)
at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$3.apply(DriverLocal.scala:249)
at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$3.apply(DriverLocal.scala:229)
at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:188)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:183)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:43)
at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:221)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:43)
at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:229)
at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:601)
at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:601)
at scala.util.Try$.apply(Try.scala:192)
at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:596)
at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:486)
at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:554)
at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:391)
at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:348)
at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:215)
at java.lang.Thread.run(Thread.java:748)

@rdblue
Copy link
Contributor

rdblue commented Jul 13, 2018

Looks like your partition spec is null. Your source table isn't really a Hive table stored in a Hive metastore. That's what the example is trying to use, so you'll have to modify the example.

For now, lets focus on getting the table created. You probably have a schema for your table's dataframe. You can use the conversion helpers to get an Iceberg schema to pass when creating a table

val icebergSchema = SparkSchemaUtil.convert(df.schema)

See convert.

Then, you can create a partition spec using the spec builder and identity partitions:

val spec = PartitionSpec.builderFor(icebergSchema)
    .identity("part-column-1-name")
    .identity("part-column-2-name")
    .build()

Then, pass that schema and spec in to create the table.

@eyaltrabelsi
Copy link
Author

@rdblue amazing this actually worked for creating iceberge table from spark df

import com.netflix.iceberg.hadoop.HadoopTables
import com.netflix.iceberg.spark.SparkSchemaUtil
import com.netflix.iceberg.PartitionSpec

val tables = new HadoopTables(spark.sparkContext.hadoopConfiguration)
val icebergSchema = SparkSchemaUtil.convert(df.schema)
val icebergSpec = PartitionSpec.builderFor(icebergSchema).identity(partitionKey1).identity(partitionKey2).build()
tables.create(icebergSchema, icebergSpec,icebergeTablePath)
val table = tables.load(icebergeTablePath) // for validation


@rdblue regarding the next part of of going through the partitions it seems and appending it ( i created partitioned hive table for the append part) but i fail on (only when the action occur) :

import com.netflix.iceberg.hadoop.HadoopTables
import org.apache.hadoop.conf.Configuration

partitions.repartition(100).flatMap { row =>
// list the partition and read Parquet footers to get metrics
SparkTableUtil.listPartition(row.getMapString, String.toMap, row.getString(1), row.getString(2))

}.show()

this is the stack trace (truncated):

java.lang.UnsupportedOperationException: empty.max
at scala.collection.TraversableOnce$class.max(TraversableOnce.scala:229)
at scala.collection.AbstractTraversable.max(Traversable.scala:104)
at com.netflix.iceberg.spark.SparkTableUtil$.com$netflix$iceberg$spark$SparkTableUtil$$mapToArray(SparkTableUtil.scala:170)
at com.netflix.iceberg.spark.SparkTableUtil$$anonfun$listParquetPartition$2.apply(SparkTableUtil.scala:254)
at com.netflix.iceberg.spark.SparkTableUtil$$anonfun$listParquetPartition$2.apply(SparkTableUtil.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at com.netflix.iceberg.spark.SparkTableUtil$.listParquetPartition(SparkTableUtil.scala:244)
at com.netflix.iceberg.spark.SparkTableUtil$.listPartition(SparkTableUtil.scala:82)
at lineea53def8802847be8d1d2a416b1855af134.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(command-25309:2)
at lineea53def8802847be8d1d2a416b1855af134.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(command-25309:1)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:49)


@rdblue I dont mind updating the example file do you think it will be beneficial ? if so let me know i will create a pr

@rdblue
Copy link
Contributor

rdblue commented Jul 14, 2018

I just pushed a fix for this issue. I think you were getting an empty map that the code assumed was non-empty. Try it again?

As for the example, it would be great to create one for non-Hive tables. Otherwise, I think maybe we should just document the tools available to convert schemas to Iceberg and to create partition specs.

@eyaltrabelsi
Copy link
Author

@rdblue it work on the mapToArray function but still occur on the bytesMapToArray

java.lang.UnsupportedOperationException: empty.max
at scala.collection.TraversableOnce$class.max(TraversableOnce.scala:229)
at scala.collection.AbstractTraversable.max(Traversable.scala:104)
at com.netflix.iceberg.spark.SparkTableUtil$.com$netflix$iceberg$spark$SparkTableUtil$$bytesMapToArray(SparkTableUtil.scala:136)
at com.netflix.iceberg.spark.SparkTableUtil$$anonfun$listParquetPartition$2.apply(SparkTableUtil.scala:255)
at com.netflix.iceberg.spark.SparkTableUtil$$anonfun$listParquetPartition$2.apply(SparkTableUtil.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at com.netflix.iceberg.spark.SparkTableUtil$.listParquetPartition(SparkTableUtil.scala:244)
at com.netflix.iceberg.spark.SparkTableUtil$.listPartition(SparkTableUtil.scala:82)
at line409272cb8f5845de952c15a95b47206034.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(command-25302:5)
at line409272cb8f5845de952c15a95b47206034.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(command-25302:3)

@rdblue
Copy link
Contributor

rdblue commented Jul 16, 2018

Ah, same problem. I've pushed a fix for that method, too.

@etrabelsi
Copy link

@rdblue I have built the fixed version the part of creating the extended partition map with the stats succeed.
when committing the new append i get NPE on snapshot update this is the stacktrace:

java.lang.NullPointerException
at com.netflix.iceberg.SnapshotUpdate.commit(SnapshotUpdate.java:106)
at line6adee5bd0df34a56b6106cd06d41006534.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(command-25683:21)
at line6adee5bd0df34a56b6106cd06d41006534.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(command-25683:7)
at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$5.apply(objects.scala:189)
at org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$5.apply(objects.scala:186)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:838)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:838)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

@rdblue
Copy link
Contributor

rdblue commented Jul 17, 2018

It doesn't look like any files are getting added. Can you make sure the dataframe of SparkDataFiles has rows?

@etrabelsi
Copy link

@rdblue how can i scan the s3 files using using the table/manifest?

@rdblue
Copy link
Contributor

rdblue commented Jul 17, 2018

Once you've added files to the table, you can scan it using Spark like this:

val df = spark.read.format("iceberg").load("s3://path/to/table").filter(...)

@etrabelsi
Copy link

@rdblue i get NPE on getStatistics in the reader class any idea why ?

java.lang.NullPointerException
at com.netflix.iceberg.spark.source.Reader.getStatistics(Reader.java:201)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation.computeStats(DataSourceV2Relation.scala:34)
at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.SizeInBytesOnlyStatsPlanVisitor$.default(SizeInBytesOnlyStatsPlanVisitor.scala:55)
at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.SizeInBytesOnlyStatsPlanVisitor$.default(SizeInBytesOnlyStatsPlanVisitor.scala:27)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor$class.visit(LogicalPlanVisitor.scala:47)
at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.SizeInBytesOnlyStatsPlanVisitor$.visit(SizeInBytesOnlyStatsPlanVisitor.scala:27)
at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.org$apache$spark$sql$catalyst$plans$logical$statsEstimation$BasicStatsPlanVisitor$$fallback(BasicStatsPlanVisitor.scala:28)
at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.default(BasicStatsPlanVisitor.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.default(BasicStatsPlanVisitor.scala:25)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor$class.visit(LogicalPlanVisitor.scala:47)
at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visit(BasicStatsPlanVisitor.scala:25)
at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:35)
at org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:33)
at scala.Option.getOrElse(Option.scala:121)

@rdblue
Copy link
Contributor

rdblue commented Jul 18, 2018

@eyaltrabelsi, the code was still referencing a field directly instead of using the lazy accessor. Should be fixed now.

@etrabelsi
Copy link

@rdblue Its still the same issue/line thanks

@eyaltrabelsi
Copy link
Author

@rdblue it still accessed

@rdblue
Copy link
Contributor

rdblue commented Jul 29, 2018

Did you rebuild with the latest master?

@eyaltrabelsi
Copy link
Author

@rdblue yea it worked thanks.
I have created a PR extending the example a bit .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants