diff --git a/docs/src/reference/implementations-spark.asciidoc b/docs/src/reference/implementations-spark.asciidoc index 8a513b1b118..2ecfb313262 100644 --- a/docs/src/reference/implementations-spark.asciidoc +++ b/docs/src/reference/implementations-spark.asciidoc @@ -97,6 +97,9 @@ This can save a significant amount of time and space resources. If the `InputRDD `SparkGraphComputer` will partition the graph using a `org.apache.spark.HashPartitioner` with the number of partitions being either the number of existing partitions in the input (i.e. input splits) or the user specified number of `GraphComputer.workers()`. +If the provider/user finds there are many small HDFS files generated by `OutputRDD`. The option `gremlin.spark.outputRepartition` +can help to repartition the output according to the specified number. The option is disabled by default. + ===== Storage Levels The `SparkGraphComputer` uses `MEMORY_ONLY` to cache the input graph and the output graph by default. Users should be aware of the impact of diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java index eb2f94cd532..29f05ed8fea 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java @@ -74,6 +74,7 @@ private Constants() { public static final String GREMLIN_SPARK_SKIP_PARTITIONER = "gremlin.spark.skipPartitioner"; // don't partition the loadedGraphRDD public static final String GREMLIN_SPARK_SKIP_GRAPH_CACHE = "gremlin.spark.skipGraphCache"; // don't cache the loadedGraphRDD (ignores graphStorageLevel) public static final String GREMLIN_SPARK_DONT_DELETE_NON_EMPTY_OUTPUT = "gremlin.spark.dontDeleteNonEmptyOutput"; // don't delete the output if it is not empty + public static final String GREMLIN_SPARK_OUTPUT_REPARTITION = "gremlin.spark.outputRepartition"; // allow set the repartition number of the outputRDD to reduce HDFS small files public static final String SPARK_SERIALIZER = "spark.serializer"; public static final String SPARK_KRYO_REGISTRATOR = "spark.kryo.registrator"; public static final String SPARK_KRYO_REGISTRATION_REQUIRED = "spark.kryo.registrationRequired"; diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputFormatRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputFormatRDD.java index 3d169e493db..e0468771513 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputFormatRDD.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputFormatRDD.java @@ -48,7 +48,8 @@ public void writeGraphRDD(final Configuration configuration, final JavaPairRDD stream for output - graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2())) + JavaPairRDD javaPairRDD = repartitionJavaPairRDD(hadoopConfiguration.get(Constants.GREMLIN_SPARK_OUTPUT_REPARTITION), graphRDD); + javaPairRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2())) .saveAsNewAPIHadoopFile(Constants.getGraphLocation(outputLocation), NullWritable.class, VertexWritable.class, @@ -62,7 +63,8 @@ public Iterator> writeMemoryRDD(final Configuration config final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION); if (null != outputLocation) { // map back to a Hadoop stream for output - memoryRDD.mapToPair(keyValue -> new Tuple2<>(new ObjectWritable<>(keyValue._1()), new ObjectWritable<>(keyValue._2()))) + JavaPairRDD javaPairRDD = repartitionJavaPairRDD(hadoopConfiguration.get(Constants.GREMLIN_SPARK_OUTPUT_REPARTITION), memoryRDD); + javaPairRDD.mapToPair(keyValue -> new Tuple2<>(new ObjectWritable<>(keyValue._1()), new ObjectWritable<>(keyValue._2()))) .saveAsNewAPIHadoopFile(Constants.getMemoryLocation(outputLocation, memoryKey), ObjectWritable.class, ObjectWritable.class, @@ -75,4 +77,4 @@ public Iterator> writeMemoryRDD(final Configuration config } return Collections.emptyIterator(); } -} \ No newline at end of file +} diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDD.java index 666534fc003..aa742d28724 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDD.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDD.java @@ -55,4 +55,17 @@ public interface OutputRDD { public default Iterator> writeMemoryRDD(final Configuration configuration, final String memoryKey, final JavaPairRDD memoryRDD) { return Collections.emptyIterator(); } + + /** + * Allow users to customize the RDD partitions to reduce HDFS small files + */ + public default JavaPairRDD repartitionJavaPairRDD(final String repartitionString, JavaPairRDD graphRDD) { + JavaPairRDD javaPairRDD = graphRDD; + final int repartition = null == repartitionString ? -1 : Integer.parseInt(repartitionString); + if (repartition > 0) { + javaPairRDD = javaPairRDD.repartition(repartition); + } + return javaPairRDD; + } + } diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java index 5c763914985..058135ff0cd 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java @@ -51,15 +51,16 @@ public void writeGraphRDD(final Configuration configuration, final JavaPairRDD javaPairRDD = repartitionJavaPairRDD(configuration.getString(Constants.GREMLIN_SPARK_OUTPUT_REPARTITION), graphRDD); if (!configuration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, true)) - graphRDD.mapValues(vertex -> { + javaPairRDD.mapValues(vertex -> { vertex.get().dropEdges(Direction.BOTH); return vertex; }).setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).persist(storageLevel) // call action to eager store rdd .count(); else - graphRDD.setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).persist(storageLevel) + javaPairRDD.setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).persist(storageLevel) // call action to eager store rdd .count(); Spark.refresh(); // necessary to do really fast so the Spark GC doesn't clear out the RDD @@ -73,11 +74,12 @@ public Iterator> writeMemoryRDD(final Configuration config throw new IllegalArgumentException("There is no provided " + Constants.GREMLIN_HADOOP_OUTPUT_LOCATION + " to write the persisted RDD to"); final String memoryRDDName = Constants.getMemoryLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey); Spark.removeRDD(memoryRDDName); - memoryRDD.setName(memoryRDDName).persist(StorageLevel.fromString(configuration.getString(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, "MEMORY_ONLY"))) + final JavaPairRDD javaPairRDD = repartitionJavaPairRDD(configuration.getString(Constants.GREMLIN_SPARK_OUTPUT_REPARTITION), memoryRDD); + javaPairRDD.setName(memoryRDDName).persist(StorageLevel.fromString(configuration.getString(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, "MEMORY_ONLY"))) // call action to eager store rdd .count(); Spark.refresh(); // necessary to do really fast so the Spark GC doesn't clear out the RDD - return IteratorUtils.map(memoryRDD.collect().iterator(), tuple -> new KeyValue<>(tuple._1(), tuple._2())); + return IteratorUtils.map(javaPairRDD.collect().iterator(), tuple -> new KeyValue<>(tuple._1(), tuple._2())); } @Override