Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TINKERPOP-3133] Allow customize the output partition #3026

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ private Constants() {
public static final String GREMLIN_SPARK_SKIP_PARTITIONER = "gremlin.spark.skipPartitioner"; // don't partition the loadedGraphRDD
public static final String GREMLIN_SPARK_SKIP_GRAPH_CACHE = "gremlin.spark.skipGraphCache"; // don't cache the loadedGraphRDD (ignores graphStorageLevel)
public static final String GREMLIN_SPARK_DONT_DELETE_NON_EMPTY_OUTPUT = "gremlin.spark.dontDeleteNonEmptyOutput"; // don't delete the output if it is not empty
public static final String GREMLIN_SPARK_OUTPUT_REPARTITION = "gremlin.spark.outputRepartition"; // allow set the repartition number of the outputRDD to reduce HDFS small files
public static final String SPARK_SERIALIZER = "spark.serializer";
public static final String SPARK_KRYO_REGISTRATOR = "spark.kryo.registrator";
public static final String SPARK_KRYO_REGISTRATION_REQUIRED = "spark.kryo.registrationRequired";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<O
final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
if (null != outputLocation) {
// map back to a <nullwritable,vertexwritable> stream for output
graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2()))
JavaPairRDD<Object, VertexWritable> javaPairRDD = repartitionJavaPairRDD(hadoopConfiguration, graphRDD);
javaPairRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2()))
.saveAsNewAPIHadoopFile(Constants.getGraphLocation(outputLocation),
NullWritable.class,
VertexWritable.class,
Expand All @@ -62,7 +63,8 @@ public <K, V> Iterator<KeyValue<K, V>> writeMemoryRDD(final Configuration config
final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
if (null != outputLocation) {
// map back to a Hadoop stream for output
memoryRDD.mapToPair(keyValue -> new Tuple2<>(new ObjectWritable<>(keyValue._1()), new ObjectWritable<>(keyValue._2())))
JavaPairRDD<K, V> javaPairRDD = repartitionJavaPairRDD(hadoopConfiguration, memoryRDD);
javaPairRDD.mapToPair(keyValue -> new Tuple2<>(new ObjectWritable<>(keyValue._1()), new ObjectWritable<>(keyValue._2())))
.saveAsNewAPIHadoopFile(Constants.getMemoryLocation(outputLocation, memoryKey),
ObjectWritable.class,
ObjectWritable.class,
Expand All @@ -75,4 +77,17 @@ public <K, V> Iterator<KeyValue<K, V>> writeMemoryRDD(final Configuration config
}
return Collections.emptyIterator();
}
}

/**
* Allow users to customize the RDD partitions to reduce HDFS small files
*/
private static <K, V> JavaPairRDD<K, V> repartitionJavaPairRDD(final org.apache.hadoop.conf.Configuration hadoopConfiguration, JavaPairRDD<K, V> graphRDD) {
JavaPairRDD<K, V> javaPairRDD = graphRDD;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does PersistedOutputRDD also need to be aware of this configuration?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PersistedOutputRDD wants to persist the RDD, which is a little different from writing RDD to HDFS and does not generate small files. But in order to keep consistent, I also apply the change.
Here is the summary from ChatGPT:

In summary, persisting an RDD in Spark does not directly lead to the generation of small files in HDFS. The generation of small files in HDFS is more commonly associated with writing RDD data to HDFS using methods like saveAsTextFile, which can happen independently of persisting the RDD in memory or disk.

final String repartitionString = hadoopConfiguration.get(Constants.GREMLIN_SPARK_OUTPUT_REPARTITION);
final int repartition = null == repartitionString ? -1 : Integer.parseInt(repartitionString);
if (repartition > 0) {
javaPairRDD = javaPairRDD.repartition(repartition);
}
return javaPairRDD;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,16 @@ public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<O
SparkContextStorage.open(configuration).rm(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)); // this might be bad cause it unpersists the job RDD
// determine which storage level to persist the RDD as with MEMORY_ONLY being the default cache()
final StorageLevel storageLevel = StorageLevel.fromString(configuration.getString(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, "MEMORY_ONLY"));
final JavaPairRDD<Object, VertexWritable> javaPairRDD = repartitionJavaPairRDD(configuration, graphRDD);
if (!configuration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, true))
graphRDD.mapValues(vertex -> {
javaPairRDD.mapValues(vertex -> {
vertex.get().dropEdges(Direction.BOTH);
return vertex;
}).setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).persist(storageLevel)
// call action to eager store rdd
.count();
else
graphRDD.setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).persist(storageLevel)
javaPairRDD.setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).persist(storageLevel)
// call action to eager store rdd
.count();
Spark.refresh(); // necessary to do really fast so the Spark GC doesn't clear out the RDD
Expand All @@ -73,15 +74,29 @@ public <K, V> Iterator<KeyValue<K, V>> writeMemoryRDD(final Configuration config
throw new IllegalArgumentException("There is no provided " + Constants.GREMLIN_HADOOP_OUTPUT_LOCATION + " to write the persisted RDD to");
final String memoryRDDName = Constants.getMemoryLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey);
Spark.removeRDD(memoryRDDName);
memoryRDD.setName(memoryRDDName).persist(StorageLevel.fromString(configuration.getString(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, "MEMORY_ONLY")))
final JavaPairRDD<K, V> javaPairRDD = repartitionJavaPairRDD(configuration, memoryRDD);
javaPairRDD.setName(memoryRDDName).persist(StorageLevel.fromString(configuration.getString(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, "MEMORY_ONLY")))
// call action to eager store rdd
.count();
Spark.refresh(); // necessary to do really fast so the Spark GC doesn't clear out the RDD
return IteratorUtils.map(memoryRDD.collect().iterator(), tuple -> new KeyValue<>(tuple._1(), tuple._2()));
return IteratorUtils.map(javaPairRDD.collect().iterator(), tuple -> new KeyValue<>(tuple._1(), tuple._2()));
}

@Override
public boolean supportsResultGraphPersistCombination(final GraphComputer.ResultGraph resultGraph, final GraphComputer.Persist persist) {
return persist.equals(GraphComputer.Persist.NOTHING) || resultGraph.equals(GraphComputer.ResultGraph.NEW);
}

/**
* Allow users to customize the RDD partitions to reduce HDFS small files
*/
private static <K, V> JavaPairRDD<K, V> repartitionJavaPairRDD(final Configuration configuration, JavaPairRDD<K, V> graphRDD) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: you can pull this common code into the shared interface OutputRDD as a default method.

JavaPairRDD<K, V> javaPairRDD = graphRDD;
final String repartitionString = configuration.getString(Constants.GREMLIN_SPARK_OUTPUT_REPARTITION);
final int repartition = null == repartitionString ? -1 : Integer.parseInt(repartitionString);
if (repartition > 0) {
javaPairRDD = javaPairRDD.repartition(repartition);
}
return javaPairRDD;
}
}
Loading