Skip to content

Commit

Permalink
[Breaking] Deleted deprecated parquet read/write APIs (#5510)
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam authored Jun 6, 2024
1 parent fc24a1f commit a8f121c
Show file tree
Hide file tree
Showing 22 changed files with 394 additions and 832 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.parquet.table.ParquetTools;
import io.deephaven.engine.util.TableTools;
import io.deephaven.engine.table.impl.util.TableBuilder;
Expand Down Expand Up @@ -48,7 +49,8 @@ public void logOutput() throws IOException {
.resolve(BenchmarkTools.getDetailOutputPath(benchmarkName) + PARQUET_FILE_EXTENSION);

final Table output = outputBuilder.build();
ParquetTools.writeTable(output, outputPath.toFile(), RESULT_DEF);
ParquetTools.writeTable(output, outputPath.toString(),
ParquetInstructions.EMPTY.withTableDefinition(RESULT_DEF));
}

public void reset() {
Expand Down Expand Up @@ -76,7 +78,7 @@ public long resultSize() {
}

public static Table readBin(File location) {
return ParquetTools.readTable(location);
return ParquetTools.readTable(location.getPath());
}

public String getResultHash() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1090,12 +1090,12 @@ public void testPartitionedTableSort() throws IOException {
final ParquetInstructions instructions = ParquetInstructions.builder().useDictionary("I", true).build();
Table a = emptyTable(200).update("I = `` + (50 + (ii % 100))", "K = ii");
Table b = emptyTable(200).update("I = `` + (ii % 100)", "K = ii");
ParquetTools.writeTable(a, new java.io.File(tmpDir + "/Partition=p0/data.parquet"), instructions);
ParquetTools.writeTable(b, new java.io.File(tmpDir + "/Partition=p1/data.parquet"), instructions);
ParquetTools.writeTable(a, tmpDir + "/Partition=p0/data.parquet", instructions);
ParquetTools.writeTable(b, tmpDir + "/Partition=p1/data.parquet", instructions);
a = a.updateView("Partition = `p0`").moveColumnsUp("Partition");
b = b.updateView("Partition = `p1`").moveColumnsUp("Partition");

final Table fromDisk = ParquetTools.readTable(tmpDir);
final Table fromDisk = ParquetTools.readTable(tmpDir.getPath());

// Assert non-partitioned table sorts.
final Table diskOuterSort = fromDisk.sort("I");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3930,21 +3930,21 @@ public void testMultiPartitionSymbolTableBy() throws IOException {


ParquetTools.writeTable(t1, new File(testRootFile,
"Date=2021-07-20" + File.separator + "Num=100" + File.separator + "file1.parquet"));
"Date=2021-07-20" + File.separator + "Num=100" + File.separator + "file1.parquet").getPath());
ParquetTools.writeTable(t2, new File(testRootFile,
"Date=2021-07-20" + File.separator + "Num=200" + File.separator + "file2.parquet"));
"Date=2021-07-20" + File.separator + "Num=200" + File.separator + "file2.parquet").getPath());
ParquetTools.writeTable(t3, new File(testRootFile,
"Date=2021-07-21" + File.separator + "Num=300" + File.separator + "file3.parquet"));
"Date=2021-07-21" + File.separator + "Num=300" + File.separator + "file3.parquet").getPath());
ParquetTools.writeTable(t4, new File(testRootFile,
"Date=2021-07-21" + File.separator + "Num=400" + File.separator + "file4.parquet"));
"Date=2021-07-21" + File.separator + "Num=400" + File.separator + "file4.parquet").getPath());

final Table merged = TableTools.merge(
t1.updateView("Date=`2021-07-20`", "Num=100"),
t2.updateView("Date=`2021-07-20`", "Num=200"),
t3.updateView("Date=`2021-07-21`", "Num=300"),
t4.updateView("Date=`2021-07-21`", "Num=400")).moveColumnsUp("Date", "Num");

final Table loaded = ParquetTools.readPartitionedTableInferSchema(
final Table loaded = ParquetTools.readTable(
new ParquetKeyValuePartitionedLayout(testRootFile.toURI(), 2, ParquetInstructions.EMPTY),
ParquetInstructions.EMPTY);

Expand Down Expand Up @@ -4008,8 +4008,7 @@ private Table makeDiskTable(File directory) throws IOException {

final File outputFile = new File(directory, "disk_table" + PARQUET_FILE_EXTENSION);

ParquetTools.writeTable(result, outputFile, result.getDefinition());

return ParquetTools.readTable(outputFile);
ParquetTools.writeTable(result, outputFile.getPath());
return ParquetTools.readTable(outputFile.getPath());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.deephaven.engine.testutil.generator.*;
import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase;
import io.deephaven.engine.util.TableTools;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.parquet.table.ParquetTools;
import io.deephaven.test.types.OutOfBandTest;
import io.deephaven.time.DateTimeUtils;
Expand Down Expand Up @@ -1706,8 +1707,9 @@ private Table makeLeftDiskTable(File leftLocation) {
final String[] leftSyms = new String[] {"Apple", "Banana", "Cantaloupe", "DragonFruit",
"Apple", "Cantaloupe", "Banana", "Banana", "Cantaloupe"};
final Table leftTable = newTable(stringCol("Symbol", leftSyms)).update("LeftSentinel=i");
ParquetTools.writeTable(leftTable, leftLocation, leftDefinition);
return ParquetTools.readTable(leftLocation);
ParquetTools.writeTable(leftTable, leftLocation.getPath(),
ParquetInstructions.EMPTY.withTableDefinition(leftDefinition));
return ParquetTools.readTable(leftLocation.getPath());
}

@NotNull
Expand All @@ -1717,7 +1719,8 @@ private Table makeRightDiskTable(File rightLocation) {
ColumnDefinition.ofInt("RightSentinel"));
final String[] rightSyms = new String[] {"Elderberry", "Apple", "Banana", "Cantaloupe"};
final Table rightTable = newTable(stringCol("Symbol", rightSyms)).update("RightSentinel=100+i");
ParquetTools.writeTable(rightTable, rightLocation, rightDefinition);
return ParquetTools.readTable(rightLocation);
ParquetTools.writeTable(rightTable, rightLocation.getPath(),
ParquetInstructions.EMPTY.withTableDefinition(rightDefinition));
return ParquetTools.readTable(rightLocation.getPath());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.deephaven.engine.table.vectors.ColumnVectors;
import io.deephaven.engine.testutil.*;
import io.deephaven.engine.testutil.generator.*;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.engine.context.QueryScope;
import io.deephaven.parquet.table.ParquetTools;
Expand Down Expand Up @@ -877,8 +878,8 @@ private void diskBackedTestHarness(Consumer<Table> testFunction) throws IOExcept
testDirectory.mkdirs();
final File dest = new File(testDirectory, "Table.parquet");
try {
ParquetTools.writeTable(source, dest, definition);
final Table table = ParquetTools.readTable(dest);
ParquetTools.writeTable(source, dest.getPath(), ParquetInstructions.EMPTY.withTableDefinition(definition));
final Table table = ParquetTools.readTable(dest.getPath());
testFunction.accept(table);
table.close();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.deephaven.engine.updategraph.UpdateGraphLock;
import io.deephaven.engine.util.TableTools;
import io.deephaven.io.log.LogEntry;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.parquet.table.ParquetTools;
import io.deephaven.test.types.OutOfBandTest;
import io.deephaven.time.DateTimeUtils;
Expand Down Expand Up @@ -3306,8 +3307,8 @@ private void diskBackedTestHarness(Consumer<Table> testFunction) throws IOExcept
testDirectory.mkdirs();
final File dest = new File(testDirectory, "Table.parquet");
try {
ParquetTools.writeTable(source, dest, definition);
final Table table = ParquetTools.readTable(dest);
ParquetTools.writeTable(source, dest.getPath(), ParquetInstructions.EMPTY.withTableDefinition(definition));
final Table table = ParquetTools.readTable(dest.getPath());
testFunction.accept(table);
table.close();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.util.SafeCloseable;
import io.deephaven.vector.Vector;
import io.deephaven.util.type.ArrayTypeUtils;
Expand Down Expand Up @@ -71,9 +72,9 @@ public Table createTestTable() {
final File dest = new File(tableDirectory, "Table.parquet");
ParquetTools.writeTable(
newTable(stringCol("USym", symbol), doubleCol("Bid", bid), doubleCol("BidSize", bidSize)),
dest,
tableDefinition);
return ParquetTools.readTable(dest);
dest.getPath(),
ParquetInstructions.EMPTY.withTableDefinition(tableDefinition));
return ParquetTools.readTable(dest.getPath());
}

Table doAggregatedQuery() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void doColumnsTest() throws IOException {
final File dir = Files.createTempDirectory(Paths.get(""), "CODEC_TEST").toFile();
final File dest = new File(dir, "Test.parquet");
try {
ParquetTools.writeTable(table, dest, table.getDefinition(), writeInstructions);
ParquetTools.writeTable(table, dest.getPath(), writeInstructions);
final MutableObject<ParquetInstructions> instructionsOut = new MutableObject<>();
final Table result =
ParquetTools.readParquetSchemaAndTable(dest, ParquetInstructions.EMPTY, instructionsOut);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ public void doColumnsTest() throws IOException {
final File dir = Files.createTempDirectory(Paths.get(""), "CODEC_TEST").toFile();
final File dest = new File(dir, "Table.parquet");
try {
ParquetTools.writeTable(table, dest, table.getDefinition(), writeInstructions);
final Table result = ParquetTools.readTable(dest);
ParquetTools.writeTable(table, dest.getPath(), writeInstructions);
final Table result = ParquetTools.readTable(dest.getPath());
TableTools.show(result);
TestCase.assertEquals(TABLE_DEFINITION, result.getDefinition());
TstUtils.assertTableEquals(table, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ public void testPreserveDeferredGrouping() throws IOException {
DataIndexer.getOrCreateDataIndex(x, "Sym");

System.out.println(x.getDefinition());
ParquetTools.writeTable(x, dest);
ParquetTools.writeTable(x, dest.getPath());

final Table readBack = ParquetTools.readTable(dest);
final Table readBack = ParquetTools.readTable(dest.getPath());
TableTools.showWithRowSet(readBack);

assertTrue(DataIndexer.hasDataIndex(readBack, "Sym"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,45 +106,49 @@ private void doTest(final boolean missingIndexes) {

final String tableName = "TestTable";

// @formatter:off
ParquetTools.writeTable(
partitions[0],
new File(dataDirectory,
"IP" + File.separator + "0000" + File.separator + tableName + File.separator
+ PARQUET_FILE_NAME),
partitionedDataDefinition);
+ PARQUET_FILE_NAME).getPath(),
ParquetInstructions.EMPTY.withTableDefinition(partitionedDataDefinition));
ParquetTools.writeTable(
partitions[1],
new File(dataDirectory,
"IP" + File.separator + "0001" + File.separator + tableName + File.separator
+ PARQUET_FILE_NAME),
partitionedDataDefinition);
+ PARQUET_FILE_NAME).getPath(),
ParquetInstructions.EMPTY.withTableDefinition(partitionedDataDefinition));
ParquetTools.writeTable(
partitions[2],
new File(dataDirectory,
"IP" + File.separator + "0002" + File.separator + tableName + File.separator
+ PARQUET_FILE_NAME),
missingIndexes ? partitionedMissingDataDefinition : partitionedDataDefinition);
+ PARQUET_FILE_NAME).getPath(),
ParquetInstructions.EMPTY.withTableDefinition(
missingIndexes ? partitionedMissingDataDefinition : partitionedDataDefinition));
ParquetTools.writeTable(
partitions[3],
new File(dataDirectory,
"IP" + File.separator + "0003" + File.separator + tableName + File.separator
+ PARQUET_FILE_NAME),
missingIndexes ? partitionedMissingDataDefinition : partitionedDataDefinition);
+ PARQUET_FILE_NAME).getPath(),
ParquetInstructions.EMPTY.withTableDefinition(
missingIndexes ? partitionedMissingDataDefinition : partitionedDataDefinition));
ParquetTools.writeTables(
Arrays.copyOfRange(partitions, 4, partitions.length),
partitionedDataDefinition,
IntStream.range(4, 260)
.mapToObj(pcv -> new File(dataDirectory,
"IP" + File.separator + String.format("%04d", pcv) + File.separator + tableName
+ File.separator + PARQUET_FILE_NAME))
.toArray(File[]::new));
.map(File::getPath).toArray(String[]::new),
ParquetInstructions.EMPTY.withTableDefinition(partitionedDataDefinition));
// TODO (deephaven/deephaven-core/issues/321): Re-add this part of the test when the parquet bug is fixed
ParquetTools.writeTable(
TableTools.emptyTable(0).updateView("Sym=NULL_CHAR", "Other=NULL_LONG"),
new File(dataDirectory,
"IP" + File.separator + "XXXX" + File.separator + tableName + File.separator
+ PARQUET_FILE_NAME),
partitionedDataDefinition);
+ PARQUET_FILE_NAME).getPath(),
ParquetInstructions.EMPTY.withTableDefinition(partitionedDataDefinition));
// @formatter:on

if (missingIndexes) {
// Put Sym back on for the partitions that dropped it.
Expand All @@ -154,11 +158,10 @@ private void doTest(final boolean missingIndexes) {
// Column ordering was changed by groupBy()/ungroup() above, restore it here.
final Table expected = TableTools.merge(partitions).view("Part", "Sym", "Other");

final Table actual = ParquetTools.readPartitionedTable(
final Table actual = ParquetTools.readTable(
DeephavenNestedPartitionLayout.forParquet(dataDirectory, tableName, "Part", ipn -> ipn.equals("IP"),
ParquetInstructions.EMPTY),
ParquetInstructions.EMPTY,
partitionedDataDefinition).coalesce();
ParquetInstructions.EMPTY.withTableDefinition(partitionedDataDefinition)).coalesce();

TstUtils.assertTableEquals(expected, actual);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,10 @@ public void setUp() throws Exception {
"Bl_R = booleanAsByte(Bl)",
"DT_R = epochNanos(DT)");

actual = ParquetTools.readPartitionedTable(
actual = ParquetTools.readTable(
DeephavenNestedPartitionLayout.forParquet(dataDirectory, tableName, "PC", null,
ParquetInstructions.EMPTY),
ParquetInstructions.EMPTY,
partitionedDataDefinition).updateView(
ParquetInstructions.EMPTY.withTableDefinition(partitionedDataDefinition)).updateView(
List.of(
new ReinterpretedColumn<>("Bl", Boolean.class, "Bl_R", byte.class),
new ReinterpretedColumn<>("DT", Instant.class, "DT_R", long.class)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ public void testUngroup() {
assertEquals(String.class, ungroupedTable.getDefinition().getColumn("C").getDataType());

File dest = new File(dataDirectory, "testUngroup.parquet");
ParquetTools.writeTable(groupedTable, dest);
final Table actual = ParquetTools.readTable(dest);
ParquetTools.writeTable(groupedTable, dest.getPath());
final Table actual = ParquetTools.readTable(dest.getPath());

assertTrue(ObjectVector.class.isAssignableFrom(actual.getDefinition().getColumn("C").getDataType()));
assertEquals(String.class, actual.getDefinition().getColumn("C").getComponentType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public Table writeTable(@NotNull final Blackhole bh) {
final ParquetInstructions instructions = ParquetInstructions.builder()
.setCompressionCodecName(compressionCodec)
.build();
ParquetTools.writeTable(table, rootPath.resolve("table.parquet").toFile(), instructions);
ParquetTools.writeTable(table, rootPath.resolve("table.parquet").toString(), instructions);
return table;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ public boolean equals(Object obj) {
}

private void compressionCodecTestHelper(final ParquetInstructions codec) {
File dest = new File(rootFile + File.separator + "Table1.parquet");
final File dest = new File(rootFile + File.separator + "Table1.parquet");
final Table table1 = getTableFlat(10000, false);
ParquetTools.writeTable(table1, dest, codec);
ParquetTools.writeTable(table1, dest.getPath(), codec);
assertTrue(dest.length() > 0L);
final Table table2 = ParquetTools.readTable(dest);
final Table table2 = ParquetTools.readTable(dest.getPath());
TstUtils.assertTableEquals(table1, table2);
}

Expand Down
Loading

0 comments on commit a8f121c

Please sign in to comment.