Skip to content

Commit

Permalink
[core] Refactor FormatWriter interface (apache#4036)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Aug 22, 2024
1 parent 7275be7 commit 65f400e
Show file tree
Hide file tree
Showing 18 changed files with 40 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@

import org.apache.paimon.data.InternalRow;

import java.io.Closeable;
import java.io.IOException;

/** The writer that writes records. */
public interface FormatWriter {
public interface FormatWriter extends Closeable {

/**
* Adds an element to the encoder. The encoder may temporarily buffer the element, or
Expand All @@ -38,29 +39,6 @@ public interface FormatWriter {
*/
void addElement(InternalRow element) throws IOException;

/**
* Flushes all intermediate buffered data to the output stream. It is expected that flushing
* often may reduce the efficiency of the encoding.
*
* @throws IOException Thrown if the encoder cannot be flushed, or if the output stream throws
* an exception.
*/
void flush() throws IOException;

/**
* Finishes the writing. This must flush all internal buffer, finish encoding, and write
* footers.
*
* <p>The writer is not expected to handle any more records via {@link #addElement(InternalRow)}
* after this method is called.
*
* <p><b>Important:</b> This method MUST NOT close the stream that the writer writes to. Closing
* the stream is expected to happen through the invoker of this method afterwards.
*
* @throws IOException Thrown if the finalization fails.
*/
void finish() throws IOException;

/**
* Check if the writer has reached the <code>targetSize</code>.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ public void testSimpleTypes() throws IOException {
writer.addElement(GenericRow.of(1, 1L));
writer.addElement(GenericRow.of(2, 2L));
writer.addElement(GenericRow.of(3, null));
writer.flush();
writer.finish();
writer.close();
out.close();

RecordReader<InternalRow> reader =
Expand Down Expand Up @@ -120,8 +119,7 @@ public void testFullTypes() throws IOException {
PositionOutputStream out = fileIO.newOutputStream(file, false);
FormatWriter writer = format.createWriterFactory(rowType).create(out, "zstd");
writer.addElement(expected);
writer.flush();
writer.finish();
writer.close();
out.close();

RecordReader<InternalRow> reader =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void testExtract(String mode) throws Exception {
for (GenericRow row : data) {
writer.addElement(row);
}
writer.finish();
writer.close();

SimpleStatsCollector collector = new SimpleStatsCollector(rowType, stats);
for (GenericRow row : data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,11 @@ public void close() throws IOException {
}

try {
writer.flush();
writer.finish();

writer.close();
out.flush();
out.close();
} catch (IOException e) {
LOG.warn("Exception occurs when closing file " + path + ". Cleaning up.", e);
LOG.warn("Exception occurs when closing file {}. Cleaning up.", path, e);
abort();
throw e;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,10 @@ public String writeWithoutRolling(Iterator<T> records) {
Path path = pathFactory.newPath();
try {
try (PositionOutputStream out = fileIO.newOutputStream(path, false)) {
FormatWriter writer = writerFactory.create(out, compression);
try {
try (FormatWriter writer = writerFactory.create(out, compression)) {
while (records.hasNext()) {
writer.addElement(serializer.toRow(records.next()));
}
} finally {
writer.flush();
writer.finish();
}
}
return path.getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void testWriteRead(@TempDir java.nio.file.Path tempDir) throws IOExceptio
for (InternalRow row : expected) {
writer.addElement(row);
}
writer.finish();
writer.close();
out.close();

// read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -53,34 +54,30 @@ public FormatWriterFactory createWriterFactory(RowType type) {
.create(
PositionOutputStream,
CoreOptions.FILE_COMPRESSION.defaultValue());
InternalRowSerializer serializer = new InternalRowSerializer(type);
return new FormatWriter() {
@Override
public void addElement(InternalRow rowData) throws IOException {
wrapped.addElement(rowData);
wrapped.flush();
}

long totalSize = 0;

@Override
public void flush() throws IOException {
wrapped.flush();
public void addElement(InternalRow row) throws IOException {
wrapped.addElement(row);
totalSize += serializer.toBinaryRow(row).getSizeInBytes();
}

@Override
public void finish() throws IOException {
wrapped.finish();
public void close() throws IOException {
wrapped.close();
}

@Override
public boolean reachTargetSize(boolean suggestedCheck, long targetSize)
throws IOException {
return wrapped.reachTargetSize(suggestedCheck, targetSize);
public boolean reachTargetSize(boolean suggestedCheck, long targetSize) {
return totalSize > targetSize;
}
};
};
}

@Override
public void validateDataFields(RowType rowType) {
return;
}
public void validateDataFields(RowType rowType) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,13 @@ public void testRewriteSuccess(boolean rewriteChangelog) throws Exception {

private KeyValueFileWriterFactory createWriterFactory(
Path path, RowType keyType, RowType valueType) {
String formatIdentifier = "avro";
return KeyValueFileWriterFactory.builder(
LocalFileIO.create(),
0,
keyType,
valueType,
new FlushingFileFormat(formatIdentifier),
Collections.singletonMap(formatIdentifier, createNonPartFactory(path)),
new FlushingFileFormat("avro"),
Collections.singletonMap("avro", createNonPartFactory(path)),
VALUE_128_MB.getBytes())
.build(BinaryRow.EMPTY_ROW, 0, new CoreOptions(new Options()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,15 +216,14 @@ private DataFileMeta newFile(int level, KeyValue... records) throws IOException

private KeyValueFileWriterFactory createWriterFactory() {
Path path = new Path(tempDir.toUri().toString());
String identifier = "avro";
Map<String, FileStorePathFactory> pathFactoryMap = new HashMap<>();
pathFactoryMap.put(identifier, createNonPartFactory(path));
pathFactoryMap.put("avro", createNonPartFactory(path));
return KeyValueFileWriterFactory.builder(
FileIOFinder.find(path),
0,
keyType,
rowType,
new FlushingFileFormat(identifier),
new FlushingFileFormat("avro"),
pathFactoryMap,
VALUE_128_MB.getBytes())
.build(BinaryRow.EMPTY_ROW, 0, new CoreOptions(new Options()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@

import org.apache.avro.file.DataFileWriter;

import java.io.Closeable;
import java.io.IOException;

/** A simple writer implementation that wraps an Avro {@link DataFileWriter}. */
public class AvroBulkWriter<T> {
public class AvroBulkWriter<T> implements Closeable {

/** The underlying Avro writer. */
private final DataFileWriter<T> dataFileWriter;
Expand All @@ -45,7 +46,8 @@ public void flush() throws IOException {
dataFileWriter.flush();
}

public void finish() throws IOException {
@Override
public void close() throws IOException {
dataFileWriter.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,8 @@ public void addElement(InternalRow element) throws IOException {
}

@Override
public void flush() throws IOException {
writer.flush();
}

@Override
public void finish() throws IOException {
writer.finish();
public void close() throws IOException {
writer.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,19 @@ public OrcBulkWriter(
public void addElement(InternalRow element) throws IOException {
vectorizer.vectorize(element, rowBatch);
if (rowBatch.size == rowBatch.getMaxSize()) {
writer.addRowBatch(rowBatch);
rowBatch.reset();
flush();
}
}

@Override
public void flush() throws IOException {
private void flush() throws IOException {
if (rowBatch.size != 0) {
writer.addRowBatch(rowBatch);
rowBatch.reset();
}
}

@Override
public void finish() throws IOException {
public void close() throws IOException {
flush();
writer.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,7 @@ public void addElement(InternalRow datum) throws IOException {
}

@Override
public void flush() {
// nothing we can do here
}

@Override
public void finish() throws IOException {
public void close() throws IOException {
parquetWriter.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void testFormatWriteRead(
for (InternalRow row : expected) {
writer.addElement(row);
}
writer.finish();
writer.close();
out.close();

// read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ void testReadRowPosition() throws IOException {
for (int i = 0; i < 1000000; i++) {
writer.addElement(GenericRow.of(i));
}
writer.flush();
writer.finish();
writer.close();
}

try (RecordReader<InternalRow> reader =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ void testWriteOrcWithZstd(@TempDir java.nio.file.Path tempDir) throws IOExceptio
UUID.randomUUID().toString() + random.nextInt()));
formatWriter.addElement(element);
}
formatWriter.finish();
formatWriter.close();
OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(new Configuration());
Reader reader =
OrcFile.createReader(new org.apache.hadoop.fs.Path(path.toString()), readerOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,8 +605,7 @@ private VectorizedRecordIterator createVectorizedRecordIterator(
for (InternalRow row : rows) {
writer.addElement(row);
}
writer.flush();
writer.finish();
writer.close();

ParquetReaderFactory readerFactory =
new ParquetReaderFactory(new Options(), rowType, 1024, FilterCompat.NOOP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,8 +498,7 @@ private Path createTempParquetFileByPaimon(
writer.addElement(row);
}

writer.flush();
writer.finish();
writer.close();
return path;
}

Expand Down

0 comments on commit 65f400e

Please sign in to comment.