Skip to content

Commit

Permalink
Add fixed-width column support
Browse files Browse the repository at this point in the history
  • Loading branch information
kosak committed Nov 4, 2024
1 parent 80a5fa2 commit 8bd5ae1
Show file tree
Hide file tree
Showing 5 changed files with 608 additions and 7 deletions.
53 changes: 53 additions & 0 deletions src/main/java/io/deephaven/csv/CsvSpecs.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,35 @@ public interface Builder {
*/
Builder headerValidator(Predicate<String> headerValidator);

/**
* True if the input is organized into fixed width columns rather than delimited by a delimiter.
*/
Builder hasFixedWidthColumns(boolean hasFixedWidthColumns);

/**
* When {@link #hasFixedWidthColumns} is set, the library either determines the column widths from the header
* row (provided {@link #hasHeaderRow} is set), or the column widths can be specified explictly by the caller.
* If the caller wants to specify them explicitly, they can use this method.
* @param fixedColumnWidths The caller-specified widths of the columns.
*/
Builder fixedColumnWidths(Iterable<Integer> fixedColumnWidths);

/**
* This setting controls what units fixed width columns are measured in.
* When true, fixed width columns are measured in Unicode code points.
* When false, fixed width columns are measured in UTF-16 units (aka Java chars).
* The difference arises when encountering characters outside the Unicode Basic Multilingual Plane.
* For example, the Unicode code point 💔 (U+1F494) is one Unicode code point, but takes
* two Java chars to represent. Along these lines, the string 💔💔💔 would fit in a column of width 3
* when utf32CountingMode is true, but would require a column width of at least 6 when utf32CountingMode
* is false.
*
* The default setting of true is arguably more natural for users (the number of characters they see
* matches the visual width of the column). But some programs may want the value of false because they
* are counting Java chars.
*/
Builder useUtf32CountingConvention(boolean useUtf32CountingConvention);

/**
* Number of data rows to skip before processing data. This is useful when you want to parse data in chunks.
* Typically used together with {@link Builder#numRows}. Defaults to 0.
Expand Down Expand Up @@ -340,6 +369,30 @@ public Predicate<String> headerValidator() {
return c -> true;
}

/**
* See {@link Builder#hasFixedWidthColumns}.
*/
@Default
public boolean hasFixedWidthColumns() {
return false;
}

/**
* See {@link Builder#fixedColumnWidths}.
*/
@Default
public List<Integer> fixedColumnWidths() {
return Collections.emptyList();
}

/**
* See {@link Builder#useUtf32CountingConvention}.
*/
@Default
public boolean useUtf32CountingConvention() {
return true;
}

/**
* See {@link Builder#skipRows}.
*/
Expand Down
16 changes: 15 additions & 1 deletion src/main/java/io/deephaven/csv/reading/CsvReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import io.deephaven.csv.parsers.Parser;
import io.deephaven.csv.reading.cells.CellGrabber;
import io.deephaven.csv.reading.cells.DelimitedCellGrabber;
import io.deephaven.csv.reading.cells.FixedCellGrabber;
import io.deephaven.csv.reading.headers.DelimitedHeaderFinder;
import io.deephaven.csv.reading.headers.FixedHeaderFinder;
import io.deephaven.csv.sinks.Sink;
import io.deephaven.csv.sinks.SinkFactory;
import io.deephaven.csv.util.*;
Expand Down Expand Up @@ -63,7 +65,9 @@ private CsvReader() {}
*/
public static Result read(final CsvSpecs specs, final InputStream stream, final SinkFactory sinkFactory)
throws CsvReaderException {
return delimitedReadLogic(specs, stream, sinkFactory);
return specs.hasFixedWidthColumns() ?
fixedReadLogic(specs, stream, sinkFactory) :
delimitedReadLogic(specs, stream, sinkFactory);
}

private static Result delimitedReadLogic(
Expand Down Expand Up @@ -97,6 +101,16 @@ private static Result delimitedReadLogic(
return commonReadLogic(specs, grabber, firstDataRow, numInputCols, numOutputCols, headersToUse, sinkFactory);
}

private static Result fixedReadLogic(
final CsvSpecs specs, final InputStream stream, final SinkFactory sinkFactory) throws CsvReaderException {
final CellGrabber lineGrabber = FixedCellGrabber.makeLineGrabber(stream);
MutableObject<int[]> columnWidths = new MutableObject<>();
final String[] headers = FixedHeaderFinder.determineHeadersToUse(specs, lineGrabber, columnWidths);
final int numCols = headers.length;
final CellGrabber grabber = new FixedCellGrabber(lineGrabber, columnWidths.getValue(),
specs.ignoreSurroundingSpaces(), specs.useUtf32CountingConvention());
return commonReadLogic(specs, grabber, null, numCols, numCols, headers, sinkFactory);
}

private static Result commonReadLogic(final CsvSpecs specs, CellGrabber grabber, byte[][] optionalFirstDataRow,
int numInputCols, int numOutputCols,
Expand Down
114 changes: 114 additions & 0 deletions src/main/java/io/deephaven/csv/reading/cells/FixedCellGrabber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package io.deephaven.csv.reading.cells;

import io.deephaven.csv.containers.ByteSlice;
import io.deephaven.csv.reading.ReaderUtil;
import io.deephaven.csv.util.CsvReaderException;
import io.deephaven.csv.util.MutableBoolean;
import io.deephaven.csv.util.MutableInt;

import java.io.InputStream;

/**
* This class uses an underlying DelimitedCellGrabber to grab whole lines at a time from the input stream,
* and then it breaks them into fixed-sized cells to return to the caller.
*/
public class FixedCellGrabber implements CellGrabber {
/**
* Makes a degenerate CellGrabber that has no delimiters or quotes and therefore returns whole lines.
* This is a somewhat quick-and-dirty way to reuse the buffering and newline logic in DelimitedCellGrabber
* without rewriting it.
* @param stream The underlying stream.
* @return The "line grabber"
*/
public static CellGrabber makeLineGrabber(InputStream stream) {
final byte IllegalUtf8 = (byte)0xff;
return new DelimitedCellGrabber(stream, IllegalUtf8, IllegalUtf8, true, false);
}

private final CellGrabber lineGrabber;
private final int[] columnWidths;
private final boolean ignoreSurroundingSpaces;
private final boolean utf32CountingMode;
private final ByteSlice rowText;
private boolean needsUnderlyingRefresh;
private int colIndex;
private final MutableBoolean dummy1;
private final MutableInt dummy2;

/** Constructor. */
public FixedCellGrabber(final CellGrabber lineGrabber, final int[] columnWidths, boolean ignoreSurroundingSpaces,
boolean utf32CountingMode) {
this.lineGrabber = lineGrabber;
this.columnWidths = columnWidths;
this.ignoreSurroundingSpaces = ignoreSurroundingSpaces;
this.utf32CountingMode = utf32CountingMode;
this.rowText = new ByteSlice();
this.needsUnderlyingRefresh = true;
this.colIndex = 0;
this.dummy1 = new MutableBoolean();
this.dummy2 = new MutableInt();
}

@Override
public void grabNext(ByteSlice dest, MutableBoolean lastInRow, MutableBoolean endOfInput) throws CsvReaderException {
if (needsUnderlyingRefresh) {
// Underlying row used up, and all columns provided. Ask underlying CellGrabber for the next line.
lineGrabber.grabNext(rowText, dummy1, endOfInput);

if (endOfInput.booleanValue()) {
// Set dest to the empty string, and leave 'endOfInput' set to true.
dest.reset(rowText.data(), rowText.end(), rowText.end());
return;
}

needsUnderlyingRefresh = false;
colIndex = 0;
}

// There is data to return. Count off N characters. The final column gets all remaining characters.
final boolean lastCol = colIndex == columnWidths.length - 1;
final int numCharsToTake = lastCol ? Integer.MAX_VALUE : columnWidths[colIndex];
takeNCharactersInCharset(rowText, dest, numCharsToTake, utf32CountingMode, dummy2);
++colIndex;
needsUnderlyingRefresh = lastCol || dest.size() == 0;
lastInRow.setValue(needsUnderlyingRefresh);
endOfInput.setValue(false);

if (ignoreSurroundingSpaces) {
ReaderUtil.trimWhitespace(dest);
}
}

private static void takeNCharactersInCharset(ByteSlice src, ByteSlice dest, int numCharsToTake,
boolean utf32CountingMode, MutableInt tempInt) {
final byte[] data = src.data();
final int cellBegin = src.begin();
int current = cellBegin;
while (numCharsToTake > 0) {
if (current == src.end()) {
break;
}
final int utf8Length = ReaderUtil.getUtf8LengthAndCharLength(data[current], src.end() - current,
utf32CountingMode, tempInt);
if (numCharsToTake < tempInt.intValue()) {
// There is not enough space left in the field to store this character.
// This can happen if CsvSpecs is set for the UTF16 counting convention,
// there is one unit left in the field, and we encounter a character outside
// the Basic Multilingual Plane, which would require two units.
break;
}
numCharsToTake -= tempInt.intValue();
current += utf8Length;
if (current > src.end()) {
throw new RuntimeException("Data error: partial UTF-8 sequence found in input");
}
}
dest.reset(src.data(), cellBegin, current);
src.reset(src.data(), current, src.end());
}

@Override
public int physicalRowNum() {
return lineGrabber.physicalRowNum();
}
}
175 changes: 175 additions & 0 deletions src/main/java/io/deephaven/csv/reading/headers/FixedHeaderFinder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package io.deephaven.csv.reading.headers;

import io.deephaven.csv.CsvSpecs;
import io.deephaven.csv.containers.ByteSlice;
import io.deephaven.csv.reading.ReaderUtil;
import io.deephaven.csv.reading.cells.CellGrabber;
import io.deephaven.csv.tokenization.Tokenizer;
import io.deephaven.csv.util.CsvReaderException;
import io.deephaven.csv.util.MutableBoolean;
import io.deephaven.csv.util.MutableInt;
import io.deephaven.csv.util.MutableObject;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class FixedHeaderFinder {
/**
* Determine which headers to use. The result comes from either the first row of the file or the user-specified
* overrides.
*/
public static String[] determineHeadersToUse(
final CsvSpecs specs,
final CellGrabber lineGrabber,
MutableObject<int[]> columnWidthsResult)
throws CsvReaderException {
String[] headersToUse;
// Get user-specified column widths, if any. If not, this will be an array of length 0.
// UNITS: UTF8 CHARACTERS
int[] columnWidthsToUse = specs.fixedColumnWidths().stream().mapToInt(Integer::intValue).toArray();
if (specs.hasHeaderRow()) {
long skipCount = specs.skipHeaderRows();
final ByteSlice headerRow = new ByteSlice();
MutableBoolean lastInRow = new MutableBoolean();
MutableBoolean endOfInput = new MutableBoolean();
while (true) {
lineGrabber.grabNext(headerRow, lastInRow, endOfInput);
if (endOfInput.booleanValue()) {
throw new CsvReaderException(
"Can't proceed because hasHeaderRow is set but input file is empty or shorter than skipHeaderRows");
}
if (skipCount == 0) {
break;
}
--skipCount;
}
final byte paddingByte = (byte)specs.delimiter();
if (columnWidthsToUse.length == 0) {
// UNITS: UTF8 CHARACTERS
columnWidthsToUse = inferColumnWidths(headerRow, paddingByte, specs.useUtf32CountingConvention());
}

// DESIRED UNITS: UTF8 CHARACTERS
headersToUse = extractHeaders(headerRow, columnWidthsToUse, paddingByte, specs.useUtf32CountingConvention());
} else {
if (columnWidthsToUse.length == 0) {
throw new CsvReaderException("Can't proceed because hasHeaderRow is false but fixedColumnWidths is unspecified");
}
headersToUse = ReaderUtil.makeSyntheticHeaders(columnWidthsToUse.length);
}

// Whether or not the input had headers, maybe override with client-specified headers.
if (specs.headers().size() != 0) {
if (specs.headers().size() != headersToUse.length) {
final String message = String.format("Library determined %d headers; caller overrode with %d headers",
headersToUse.length, specs.headers().size());
throw new CsvReaderException(message);
}
headersToUse = specs.headers().toArray(new String[0]);
}

// Apply column specific overrides.
for (Map.Entry<Integer, String> entry : specs.headerForIndex().entrySet()) {
headersToUse[entry.getKey()] = entry.getValue();
}

// DESIRED UNITS: UTF8 CHARACTERS
columnWidthsResult.setValue(columnWidthsToUse);
return headersToUse;
}

// RETURNS UNITS: UTF8 CHARACTERS
private static int[] inferColumnWidths(ByteSlice row, byte delimiterAsByte, boolean useUtf32CountingConvention) {
// A column start is a non-delimiter character preceded by a delimiter (or present at the start of line).
// If the start of the line is a delimiter, that is an error.
final List<Integer> columnWidths = new ArrayList<>();
final MutableInt charCountResult = new MutableInt();
boolean prevCharIsDelimiter = false;
final byte[] data = row.data();
int numChars = 0;
int currentIndex = row.begin();
while (true) {
if (currentIndex == row.end()) {
columnWidths.add(numChars);
return columnWidths.stream().mapToInt(Integer::intValue).toArray();
}
// If this character is not a delimiter, but the previous one was, then this is the start of a new column.
byte ch = data[currentIndex];
boolean thisCharIsDelimiter = ch == delimiterAsByte;
if (currentIndex == row.begin() && thisCharIsDelimiter) {
throw new IllegalArgumentException(
String.format("Header row cannot start with the delimiter character '%c'", (char)delimiterAsByte));
}
if (!thisCharIsDelimiter && prevCharIsDelimiter) {
columnWidths.add(numChars);
numChars = 0;
}
prevCharIsDelimiter = thisCharIsDelimiter;
final int utf8Length = ReaderUtil.getUtf8LengthAndCharLength(ch, row.end() - currentIndex,
useUtf32CountingConvention, charCountResult);
currentIndex += utf8Length;
numChars += charCountResult.intValue();
}
}

// UNITS: UTF8 CHARACTERS
private static String[] extractHeaders(ByteSlice row, int[] columnWidths, byte paddingByte,
boolean utf32CountingMode) {
final int numCols = columnWidths.length;
if (numCols == 0) {
return new String[0];
}
final int[] byteWidths = new int[numCols];
final ByteSlice tempSlice = new ByteSlice();
final int excessBytes = charWidthsToByteWidths(row, columnWidths, utf32CountingMode, byteWidths);
// Our policy is that the last column gets any excess bytes that are in the row.
byteWidths[numCols - 1] += excessBytes;
final String[] result = new String[numCols];

int beginByte = row.begin();
for (int colNum = 0; colNum != numCols; ++colNum) {
final int proposedEndByte = beginByte + byteWidths[colNum];
final int actualEndByte = Math.min(proposedEndByte, row.end());
tempSlice.reset(row.data(), beginByte, actualEndByte);
tempSlice.trimPadding(paddingByte);
result[colNum] = tempSlice.toString();
beginByte = actualEndByte;
}
return result;
}

private static int charWidthsToByteWidths(ByteSlice row, int[] charWidths, boolean utf32CountingMode,
int[] byteWidths) {
int numCols = charWidths.length;
if (byteWidths.length != numCols) {
throw new IllegalArgumentException(String.format("Expected charWidths.length (%d) == byteWidths.length (%d)",
charWidths.length, byteWidths.length));
}
final MutableInt charCountResult = new MutableInt();
final byte[] data = row.data();
int start = row.begin();
int current = start;
int colIndex = 0;
int charCount = 0;
while (true) {
if (colIndex == numCols) {
// Excess bytes not claimed by any column
return row.end() - current;
}
if (charCount == charWidths[colIndex]) {
byteWidths[colIndex] = current - start;
start = current;
charCount = 0;
++colIndex;
continue;
}

final byte ch = data[current];
final int utf8Length = ReaderUtil.getUtf8LengthAndCharLength(ch, row.end() - current, utf32CountingMode,
charCountResult);
current += utf8Length;
charCount += charCountResult.intValue();
}
}
}
Loading

0 comments on commit 8bd5ae1

Please sign in to comment.