Skip to content

Commit

Permalink
Spark: Support custom data location (#6)
Browse files Browse the repository at this point in the history
This adds a new table property, write.folder-storage.path, that controls the location of new data files.
  • Loading branch information
mccheah authored and rdblue committed Dec 12, 2018
1 parent 733de8c commit c9d0f03
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 9 deletions.
5 changes: 5 additions & 0 deletions core/src/main/java/com/netflix/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public class TableProperties {

public static final String OBJECT_STORE_PATH = "write.object-storage.path";

// This only applies to files written after this property is set. Files previously written aren't relocated to
// reflect this parameter.
// If not set, defaults to a "data" folder underneath the root path of the table.
public static final String WRITE_NEW_DATA_LOCATION = "write.folder-storage.path";

public static final String MANIFEST_LISTS_ENABLED = "write.manifest-lists.enabled";
public static final boolean MANIFEST_LISTS_ENABLED_DEFAULT = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.netflix.iceberg.PartitionSpec;
import com.netflix.iceberg.Schema;
import com.netflix.iceberg.Table;
import com.netflix.iceberg.TableProperties;
import com.netflix.iceberg.avro.Avro;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.hadoop.HadoopInputFile;
Expand Down Expand Up @@ -165,7 +166,9 @@ private int propertyAsInt(String property, int defaultValue) {
}

private String dataLocation() {
return new Path(new Path(table.location()), "data").toString();
return table.properties().getOrDefault(
TableProperties.WRITE_NEW_DATA_LOCATION,
new Path(new Path(table.location()), "data").toString());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public abstract class AvroDataTest {

protected abstract void writeAndValidate(Schema schema) throws IOException;

private static final StructType SUPPORTED_PRIMITIVES = StructType.of(
protected static final StructType SUPPORTED_PRIMITIVES = StructType.of(
required(100, "id", LongType.get()),
optional(101, "data", Types.StringType.get()),
required(102, "b", Types.BooleanType.get()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,25 @@
import com.netflix.iceberg.spark.data.AvroDataTest;
import com.netflix.iceberg.spark.data.RandomData;
import com.netflix.iceberg.spark.data.SparkAvroReader;
import com.netflix.iceberg.types.Types;
import org.apache.avro.generic.GenericData.Record;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.List;

import static com.netflix.iceberg.spark.SparkSchemaUtil.convert;
Expand All @@ -54,7 +58,7 @@
public class TestDataFrameWrites extends AvroDataTest {
private static final Configuration CONF = new Configuration();

private String format = null;
private final String format;

@Parameterized.Parameters
public static Object[][] parameters() {
Expand Down Expand Up @@ -88,23 +92,43 @@ public static void stopSpark() {

@Override
protected void writeAndValidate(Schema schema) throws IOException {
File location = createTableFolder();
Table table = createTable(schema, location);
writeAndValidateWithLocations(table, location, new File(location, "data"));
}

@Test
public void testWriteWithCustomDataLocation() throws IOException {
File location = createTableFolder();
File tablePropertyDataLocation = temp.newFolder("test-table-property-data-dir");
Table table = createTable(new Schema(SUPPORTED_PRIMITIVES.fields()), location);
table.updateProperties().set(
TableProperties.WRITE_NEW_DATA_LOCATION, tablePropertyDataLocation.getAbsolutePath()).commit();
writeAndValidateWithLocations(table, location, tablePropertyDataLocation);
}

private File createTableFolder() throws IOException {
File parent = temp.newFolder("parquet");
File location = new File(parent, "test");
Assert.assertTrue("Mkdir should succeed", location.mkdirs());
return location;
}

private Table createTable(Schema schema, File location) {
HadoopTables tables = new HadoopTables(CONF);
Table table = tables.create(schema, PartitionSpec.unpartitioned(), location.toString());
return tables.create(schema, PartitionSpec.unpartitioned(), location.toString());
}

private void writeAndValidateWithLocations(Table table, File location, File expectedDataDir) throws IOException {
Schema tableSchema = table.schema(); // use the table schema because ids are reassigned

table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit();

List<Record> expected = RandomData.generateList(tableSchema, 100, 0L);
Dataset<Row> df = createDataset(expected, tableSchema);
DataFrameWriter<?> writer = df.write().format("iceberg").mode("append");

df.write()
.format("iceberg")
.mode("append")
.save(location.toString());
writer.save(location.toString());

table.refresh();

Expand All @@ -118,6 +142,14 @@ protected void writeAndValidate(Schema schema) throws IOException {
for (int i = 0; i < expected.size(); i += 1) {
assertEqualsSafe(tableSchema.asStruct(), expected.get(i), actual.get(i));
}

table.currentSnapshot().addedFiles().forEach(dataFile ->
Assert.assertTrue(
String.format(
"File should have the parent directory %s, but has: %s.",
expectedDataDir.getAbsolutePath(),
dataFile.path()),
URI.create(dataFile.path().toString()).getPath().startsWith(expectedDataDir.getAbsolutePath())));
}

private Dataset<Row> createDataset(List<Record> records, Schema schema) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public static void stopSpark() {
public void testBasicWrite() throws IOException {
File parent = temp.newFolder("parquet");
File location = new File(parent, "test");
location.mkdirs();

HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
Expand Down

0 comments on commit c9d0f03

Please sign in to comment.