diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/CodecLookup.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/CodecLookup.java index 0c241cbec57..f60cad3f99e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/CodecLookup.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/CodecLookup.java @@ -19,6 +19,7 @@ import java.math.BigInteger; import java.time.Instant; import java.time.LocalDate; +import java.time.LocalTime; /** * Utility class to concentrate {@link ObjectCodec} lookups. @@ -74,6 +75,7 @@ private static boolean noCodecRequired(@NotNull final Class dataType) { return dataType == Boolean.class || dataType == Instant.class || dataType == LocalDate.class || + dataType == LocalTime.class || dataType == String.class || // A BigDecimal column maps to a logical type of decimal, with // appropriate precision and scale calculated from column data, diff --git a/engine/time/src/main/java/io/deephaven/time/DateTimeUtils.java b/engine/time/src/main/java/io/deephaven/time/DateTimeUtils.java index 5c428ef1a30..4c1a3846e68 100644 --- a/engine/time/src/main/java/io/deephaven/time/DateTimeUtils.java +++ b/engine/time/src/main/java/io/deephaven/time/DateTimeUtils.java @@ -876,6 +876,45 @@ public static LocalTime toLocalTime(@Nullable final ZonedDateTime dateTime) { return dateTime.toLocalTime(); } + /** + * Converts the number of milliseconds from midnight to a {@link LocalTime} + * + * @param millis milliseconds from midnight + * @return the {@link LocalTime}, or {@code null} if any input is {@link QueryConstants#NULL_INT NULL_INT} + */ + public static @Nullable LocalTime millisOfDayToLocalTime(final int millis) { + if (millis == NULL_INT) { + return null; + } + return LocalTime.ofNanoOfDay(millis * MILLI); + } + + /** + * Converts the number of microseconds from midnight to a {@link LocalTime} + * + * @param micros microseconds from midnight + * @return the {@link LocalTime}, or {@code null} if any input is {@link QueryConstants#NULL_LONG NULL_LONG} + */ + public static @Nullable LocalTime microsOfDayToLocalTime(final long micros) { + if (micros == NULL_LONG) { + return null; + } + return LocalTime.ofNanoOfDay(micros * MICRO); + } + + /** + * Converts the number of nanoseconds from midnight to a {@link LocalTime} + * + * @param nanos nanoseconds from midnight + * @return the {@link LocalTime}, or {@code null} if any input is {@link QueryConstants#NULL_LONG NULL_LONG} + */ + public static @Nullable LocalTime nanosOfDayToLocalTime(final long nanos) { + if (nanos == NULL_LONG) { + return null; + } + return LocalTime.ofNanoOfDay(nanos); + } + /** * Converts an {@link Instant} to a {@link Date}. {@code instant} will be truncated to millisecond resolution. * @@ -2350,7 +2389,7 @@ public static int minuteOfHour(@Nullable final ZonedDateTime dateTime) { * * @param instant time * @param timeZone time zone - * @return {@link QueryConstants#NULL_INT} if either input is {@code null}; otherwise, number of nanoseconds that + * @return {@link QueryConstants#NULL_LONG} if either input is {@code null}; otherwise, number of nanoseconds that * have elapsed since the top of the day */ @ScriptApi @@ -2370,7 +2409,7 @@ public static long nanosOfDay(@Nullable final Instant instant, @Nullable final Z * upon if the daylight savings time adjustment is forwards or backwards. * * @param dateTime time - * @return {@link QueryConstants#NULL_INT} if either input is {@code null}; otherwise, number of nanoseconds that + * @return {@link QueryConstants#NULL_LONG} if either input is {@code null}; otherwise, number of nanoseconds that * have elapsed since the top of the day */ @ScriptApi @@ -2382,6 +2421,21 @@ public static long nanosOfDay(@Nullable final ZonedDateTime dateTime) { return epochNanos(dateTime) - epochNanos(atMidnight(dateTime)); } + /** + * Returns the number of nanoseconds that have elapsed since the top of the day. + * + * @param localTime time + * @return {@link QueryConstants#NULL_LONG} if input is {@code null}; otherwise, number of nanoseconds that have + * elapsed since the top of the day + */ + public static long nanosOfDay(@Nullable final LocalTime localTime) { + if (localTime == null) { + return NULL_LONG; + } + + return localTime.toNanoOfDay(); + } + /** * Returns the number of milliseconds that have elapsed since the top of the day. *

diff --git a/engine/time/src/test/java/io/deephaven/time/TestDateTimeUtils.java b/engine/time/src/test/java/io/deephaven/time/TestDateTimeUtils.java index 012b24d249c..c8fbfc2d78d 100644 --- a/engine/time/src/test/java/io/deephaven/time/TestDateTimeUtils.java +++ b/engine/time/src/test/java/io/deephaven/time/TestDateTimeUtils.java @@ -1350,6 +1350,19 @@ public void testToLocalTime() { TestCase.assertEquals(lt, DateTimeUtils.toLocalTime(dt3)); // noinspection ConstantConditions TestCase.assertNull(DateTimeUtils.toLocalTime(null)); + + final LocalTime someTimeInMillis = LocalTime.of(6, 33, 9, (int) (123 * DateTimeUtils.MILLI)); + TestCase.assertEquals(someTimeInMillis, + DateTimeUtils.millisOfDayToLocalTime((int) (someTimeInMillis.toNanoOfDay() / DateTimeUtils.MILLI))); + TestCase.assertNull(DateTimeUtils.millisOfDayToLocalTime(NULL_INT)); + + final LocalTime someTimeInMicros = LocalTime.of(6, 33, 9, (int) (123456 * DateTimeUtils.MICRO)); + TestCase.assertEquals(someTimeInMicros, + DateTimeUtils.microsOfDayToLocalTime(someTimeInMicros.toNanoOfDay() / DateTimeUtils.MICRO)); + TestCase.assertNull(DateTimeUtils.microsOfDayToLocalTime(NULL_LONG)); + + TestCase.assertEquals(lt, DateTimeUtils.nanosOfDayToLocalTime(lt.toNanoOfDay())); + TestCase.assertNull(DateTimeUtils.nanosOfDayToLocalTime(NULL_LONG)); } public void testToZonedDateTime() { @@ -2665,15 +2678,18 @@ public void testNanosOfMilli() { public void testNanosOfDay() { final Instant dt2 = DateTimeUtils.parseInstant("2023-01-02T11:23:45.123456789 JP"); final ZonedDateTime dt3 = dt2.atZone(TZ_JP); + final LocalTime lt = dt3.toLocalTime(); + final long expectedNanos = 123456789L + 1_000_000_000L * (45 + 23 * 60 + 11 * 60 * 60); - TestCase.assertEquals(123456789L + 1_000_000_000L * (45 + 23 * 60 + 11 * 60 * 60), - DateTimeUtils.nanosOfDay(dt2, TZ_JP)); + TestCase.assertEquals(expectedNanos, DateTimeUtils.nanosOfDay(dt2, TZ_JP)); TestCase.assertEquals(NULL_LONG, DateTimeUtils.nanosOfDay(dt2, null)); TestCase.assertEquals(NULL_LONG, DateTimeUtils.nanosOfDay(null, TZ_JP)); - TestCase.assertEquals(123456789L + 1_000_000_000L * (45 + 23 * 60 + 11 * 60 * 60), - DateTimeUtils.nanosOfDay(dt3)); - TestCase.assertEquals(NULL_LONG, DateTimeUtils.nanosOfDay(null)); + TestCase.assertEquals(expectedNanos, DateTimeUtils.nanosOfDay(dt3)); + TestCase.assertEquals(NULL_LONG, DateTimeUtils.nanosOfDay((ZonedDateTime) null)); + + TestCase.assertEquals(expectedNanos, DateTimeUtils.nanosOfDay(lt)); + TestCase.assertEquals(NULL_LONG, DateTimeUtils.nanosOfDay((LocalTime) null)); // Test daylight savings time @@ -2681,36 +2697,49 @@ public void testNanosOfDay() { final Instant dstI11 = DateTimeUtils.plus(dstMid1, DateTimeUtils.HOUR); final ZonedDateTime dstZdt11 = DateTimeUtils.toZonedDateTime(dstI11, ZoneId.of("America/Denver")); + final LocalTime dstLt11 = dstZdt11.toLocalTime(); TestCase.assertEquals(DateTimeUtils.HOUR, DateTimeUtils.nanosOfDay(dstI11, ZoneId.of("America/Denver"))); TestCase.assertEquals(DateTimeUtils.HOUR, DateTimeUtils.nanosOfDay(dstZdt11)); + TestCase.assertEquals(DateTimeUtils.HOUR, DateTimeUtils.nanosOfDay(dstLt11)); + final Instant dstI12 = DateTimeUtils.plus(dstMid1, 2 * DateTimeUtils.HOUR); final ZonedDateTime dstZdt12 = DateTimeUtils.toZonedDateTime(dstI12, ZoneId.of("America/Denver")); + final LocalTime dstLt12 = dstZdt12.toLocalTime(); TestCase.assertEquals(2 * DateTimeUtils.HOUR, DateTimeUtils.nanosOfDay(dstI12, ZoneId.of("America/Denver"))); TestCase.assertEquals(2 * DateTimeUtils.HOUR, DateTimeUtils.nanosOfDay(dstZdt12)); + TestCase.assertEquals(3 * DateTimeUtils.HOUR, DateTimeUtils.nanosOfDay(dstLt12)); // Adjusted final Instant dstI13 = DateTimeUtils.plus(dstMid1, 3 * DateTimeUtils.HOUR); final ZonedDateTime dstZdt13 = DateTimeUtils.toZonedDateTime(dstI13, ZoneId.of("America/Denver")); + final LocalTime dstLt13 = dstZdt13.toLocalTime(); TestCase.assertEquals(3 * DateTimeUtils.HOUR, DateTimeUtils.nanosOfDay(dstI13, ZoneId.of("America/Denver"))); TestCase.assertEquals(3 * DateTimeUtils.HOUR, DateTimeUtils.nanosOfDay(dstZdt13)); + TestCase.assertEquals(4 * DateTimeUtils.HOUR, DateTimeUtils.nanosOfDay(dstLt13)); // Adjusted final Instant dstMid2 = DateTimeUtils.parseInstant("2023-11-05T00:00:00 America/Denver"); final Instant dstI21 = DateTimeUtils.plus(dstMid2, DateTimeUtils.HOUR); final ZonedDateTime dstZdt21 = DateTimeUtils.toZonedDateTime(dstI21, ZoneId.of("America/Denver")); + final LocalTime dstLt21 = dstZdt21.toLocalTime(); TestCase.assertEquals(DateTimeUtils.HOUR, DateTimeUtils.nanosOfDay(dstI21, ZoneId.of("America/Denver"))); TestCase.assertEquals(DateTimeUtils.HOUR, DateTimeUtils.nanosOfDay(dstZdt21)); + TestCase.assertEquals(DateTimeUtils.HOUR, DateTimeUtils.nanosOfDay(dstLt21)); final Instant dstI22 = DateTimeUtils.plus(dstMid2, 2 * DateTimeUtils.HOUR); final ZonedDateTime dstZdt22 = DateTimeUtils.toZonedDateTime(dstI22, ZoneId.of("America/Denver")); + final LocalTime dstLt22 = dstZdt22.toLocalTime(); TestCase.assertEquals(2 * DateTimeUtils.HOUR, DateTimeUtils.nanosOfDay(dstI22, ZoneId.of("America/Denver"))); TestCase.assertEquals(2 * DateTimeUtils.HOUR, DateTimeUtils.nanosOfDay(dstZdt22)); + TestCase.assertEquals(DateTimeUtils.HOUR, DateTimeUtils.nanosOfDay(dstLt22)); // Adjusted final Instant dstI23 = DateTimeUtils.plus(dstMid2, 3 * DateTimeUtils.HOUR); final ZonedDateTime dstZdt23 = DateTimeUtils.toZonedDateTime(dstI23, ZoneId.of("America/Denver")); + final LocalTime dstLt23 = dstZdt23.toLocalTime(); TestCase.assertEquals(3 * DateTimeUtils.HOUR, DateTimeUtils.nanosOfDay(dstI23, ZoneId.of("America/Denver"))); TestCase.assertEquals(3 * DateTimeUtils.HOUR, DateTimeUtils.nanosOfDay(dstZdt23)); + TestCase.assertEquals(2 * DateTimeUtils.HOUR, DateTimeUtils.nanosOfDay(dstLt23)); // Adjusted } public void testMillisOfSecond() { diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java index fee012b5e8e..92a517be525 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java @@ -26,6 +26,7 @@ import java.math.BigInteger; import java.time.Instant; import java.time.LocalDate; +import java.time.LocalTime; import java.util.*; import java.util.function.BiFunction; import java.util.function.Supplier; @@ -343,10 +344,7 @@ public Optional> visit(final LogicalTypeAnnotation.DateLogicalTypeAnnot @Override public Optional> visit(final LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) { - if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { - return Optional.of(int.class); - } - return Optional.of(long.class); + return Optional.of(LocalTime.class); } @Override diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java index 4a0d1477372..37a78a7a360 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java @@ -25,6 +25,7 @@ import java.math.BigInteger; import java.time.Instant; import java.time.LocalDate; +import java.time.LocalTime; import java.util.*; import java.util.function.Supplier; @@ -48,7 +49,8 @@ public class TypeInfos { StringType.INSTANCE, InstantType.INSTANCE, BigIntegerType.INSTANCE, - LocalDateType.INSTANCE + LocalDateType.INSTANCE, + LocalTimeType.INSTANCE, }; private static final Map, TypeInfo> BY_CLASS; @@ -399,6 +401,28 @@ public PrimitiveBuilder getBuilder(boolean required, boolean repe } } + private enum LocalTimeType implements TypeInfo { + INSTANCE; + + private static final Set> clazzes = Collections.singleton(LocalTime.class); + + @Override + public Set> getTypes() { + return clazzes; + } + + @Override + public PrimitiveBuilder getBuilder(boolean required, boolean repeating, Class dataType) { + if (!isValidFor(dataType)) { + throw new IllegalArgumentException("Invalid data type " + dataType); + } + // Always write in (isAdjustedToUTC = true, unit = NANOS) format + return type(PrimitiveTypeName.INT64, required, repeating) + .as(LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.NANOS)); + } + } + + /** * We will encode BigIntegers as Decimal types. Parquet has no special type for BigIntegers, but we can maintain * external compatibility by encoding them as fixed length decimals of scale 1. Internally, we'll record that we diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java index 1ee56df908c..1adf7044ce7 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java @@ -706,8 +706,8 @@ private static class LogicalTypeVisitor private final ColumnChunkReader columnChunkReader; private final Class componentType; - LogicalTypeVisitor(@NotNull String name, @NotNull ColumnChunkReader columnChunkReader, - Class componentType) { + LogicalTypeVisitor(@NotNull final String name, @NotNull final ColumnChunkReader columnChunkReader, + final Class componentType) { this.name = name; this.columnChunkReader = columnChunkReader; this.componentType = componentType; @@ -715,14 +715,14 @@ private static class LogicalTypeVisitor @Override public Optional> visit( - LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) { + final LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) { return Optional .of(ToStringPage.create(componentType, columnChunkReader.getDictionarySupplier())); } @Override public Optional> visit( - LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { + final LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { // TODO(deephaven-core#976): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted // to UTC if (timestampLogicalType.isAdjustedToUTC()) { @@ -733,8 +733,7 @@ private static class LogicalTypeVisitor } @Override - public Optional> visit( - LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) { + public Optional> visit(final LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) { if (intLogicalType.isSigned()) { switch (intLogicalType.getBitWidth()) { @@ -761,18 +760,14 @@ private static class LogicalTypeVisitor } @Override - public Optional> visit( - LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) { + public Optional> visit(final LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) { return Optional.of(ToDatePageFromInt.create(componentType)); } @Override - public Optional> visit( - LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) { - if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { - return Optional.of(ToIntPage.create(componentType)); - } - return Optional.of(ToLongPage.create(componentType)); + public Optional> visit(final LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) { + return Optional + .of(ToTimePage.create(componentType, timeLogicalType.getUnit(), timeLogicalType.isAdjustedToUTC())); } @Override diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToTimePage.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToTimePage.java new file mode 100644 index 00000000000..604f09a6794 --- /dev/null +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/pagestore/topage/ToTimePage.java @@ -0,0 +1,117 @@ +/** + * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.parquet.table.pagestore.topage; + +import io.deephaven.chunk.ChunkType; +import io.deephaven.chunk.attributes.Any; +import io.deephaven.time.DateTimeUtils; +import io.deephaven.util.QueryConstants; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.jetbrains.annotations.NotNull; + +import java.time.LocalTime; + +public class ToTimePage implements ToPage { + + @SuppressWarnings("rawtypes") + private static final ToPage MILLIS_INSTANCE = new ToTimePageFromMillis(); + @SuppressWarnings("rawtypes") + private static final ToPage MICROS_INSTANCE = new ToTimePageFromMicros(); + @SuppressWarnings("rawtypes") + private static final ToPage NANOS_INSTANCE = new ToTimePageFromNanos(); + + @SuppressWarnings("unchecked") + public static ToPage create( + @NotNull final Class nativeType, + @NotNull final LogicalTypeAnnotation.TimeUnit unit, + @SuppressWarnings("unused") final boolean isAdjustedToUTC) { + // isAdjustedToUTC parameter is ignored while reading from Parquet files + if (LocalTime.class.equals(nativeType)) { + switch (unit) { + case MILLIS: + return MILLIS_INSTANCE; + case MICROS: + return MICROS_INSTANCE; + case NANOS: + return NANOS_INSTANCE; + default: + throw new IllegalArgumentException("Unsupported unit=" + unit); + } + } + throw new IllegalArgumentException("The native type for a Time column is " + nativeType.getCanonicalName()); + } + + ToTimePage() {} + + @Override + @NotNull + public final Class getNativeType() { + return LocalTime.class; + } + + @Override + @NotNull + public final ChunkType getChunkType() { + return ChunkType.Object; + } + + private static final class ToTimePageFromMillis extends ToTimePage { + @Override + @NotNull + public Object nullValue() { + return QueryConstants.NULL_INT_BOXED; + } + + @Override + public LocalTime[] convertResult(@NotNull final Object result) { + final int[] from = (int[]) result; + final LocalTime[] to = new LocalTime[from.length]; + + for (int i = 0; i < from.length; ++i) { + to[i] = DateTimeUtils.millisOfDayToLocalTime(from[i]); + } + return to; + } + } + + private static class ToTimePageFromLong extends ToTimePage { + @Override + @NotNull + public final Object nullValue() { + return QueryConstants.NULL_LONG_BOXED; + } + + /** + * Convert a {@code long} value in the units of this page (can be micros or nanos) to a {@link LocalTime} + */ + interface ToLocalTimeFromUnits { + LocalTime apply(final long value); + } + + static LocalTime[] convertResultHelper(@NotNull final Object result, + final ToLocalTimeFromUnits toLocalTimeFromUnits) { + final long[] from = (long[]) result; + final LocalTime[] to = new LocalTime[from.length]; + + for (int i = 0; i < from.length; ++i) { + to[i] = toLocalTimeFromUnits.apply(from[i]); + } + return to; + } + } + + private static final class ToTimePageFromMicros extends ToTimePageFromLong { + @Override + public LocalTime[] convertResult(@NotNull final Object result) { + return convertResultHelper(result, DateTimeUtils::microsOfDayToLocalTime); + } + } + + private static final class ToTimePageFromNanos extends ToTimePageFromLong { + @Override + public LocalTime[] convertResult(@NotNull final Object result) { + return convertResultHelper(result, DateTimeUtils::nanosOfDayToLocalTime); + } + } +} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/BooleanTransfer.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/BooleanTransfer.java index cef2c56a582..cf8ef4ad45b 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/BooleanTransfer.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/BooleanTransfer.java @@ -12,7 +12,7 @@ import java.nio.ByteBuffer; -final class BooleanTransfer extends PrimitiveTransfer, ByteBuffer> { +final class BooleanTransfer extends FillingPrimitiveTransfer, ByteBuffer> { // We encode booleans as bytes here and bit pack them with 8 booleans per byte at the time of writing. // Therefore, max values per page are (targetPageSizeInBytes * 8). static BooleanTransfer create(@NotNull final ColumnSource columnSource, @NotNull final RowSet tableRowSet, diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/DateArrayTransfer.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/DateArrayTransfer.java index 1bb1a48a41b..199f3d9714d 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/DateArrayTransfer.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/DateArrayTransfer.java @@ -1,6 +1,11 @@ /** * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending */ +/* + * --------------------------------------------------------------------------------------------------------------------- + * AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY - for any changes edit InstantArrayTransfer and regenerate + * --------------------------------------------------------------------------------------------------------------------- + */ package io.deephaven.parquet.table.transfer; import io.deephaven.engine.rowset.RowSequence; @@ -12,7 +17,7 @@ import java.time.LocalDate; final class DateArrayTransfer extends PrimitiveArrayAndVectorTransfer { - + // We encode LocalDate as primitive ints DateArrayTransfer(@NotNull final ColumnSource columnSource, @NotNull final RowSequence tableRowSet, final int targetPageSizeInBytes) { super(columnSource, tableRowSet, targetPageSizeInBytes / Integer.BYTES, targetPageSizeInBytes, @@ -32,7 +37,6 @@ void resizeBuffer(final int length) { @Override void copyToBuffer(@NotNull final EncodedData data) { for (final LocalDate t : data.encodedValues) { - // Store the number of days from the Unix epoch, 1 January 1970 buffer.put(DateTimeUtils.epochDaysAsInt(t)); } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/DateVectorTransfer.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/DateVectorTransfer.java index a582ad49e96..d57f3d90334 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/DateVectorTransfer.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/DateVectorTransfer.java @@ -1,6 +1,11 @@ /** * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending */ +/* + * --------------------------------------------------------------------------------------------------------------------- + * AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY - for any changes edit InstantVectorTransfer and regenerate + * --------------------------------------------------------------------------------------------------------------------- + */ package io.deephaven.parquet.table.transfer; import io.deephaven.engine.primitive.iterator.CloseableIterator; @@ -14,7 +19,7 @@ import java.time.LocalDate; final class DateVectorTransfer extends PrimitiveVectorTransfer, IntBuffer> { - + // We encode LocalDate as primitive ints DateVectorTransfer(@NotNull final ColumnSource columnSource, @NotNull final RowSequence tableRowSet, final int targetPageSizeInBytes) { super(columnSource, tableRowSet, targetPageSizeInBytes / Integer.BYTES, targetPageSizeInBytes, @@ -29,7 +34,6 @@ void resizeBuffer(final int length) { @Override void copyToBuffer(@NotNull final EncodedData> data) { try (final CloseableIterator dataIterator = data.encodedValues.iterator()) { - // Store the number of days from the Unix epoch, 1 January 1970 dataIterator.forEachRemaining((LocalDate t) -> buffer.put(DateTimeUtils.epochDaysAsInt(t))); } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/DoubleTransfer.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/DoubleTransfer.java index 1c1a84a0bcc..84ecbc8bf64 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/DoubleTransfer.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/DoubleTransfer.java @@ -17,7 +17,7 @@ import java.nio.DoubleBuffer; -final class DoubleTransfer extends PrimitiveTransfer, DoubleBuffer> { +final class DoubleTransfer extends FillingPrimitiveTransfer, DoubleBuffer> { static DoubleTransfer create(@NotNull final ColumnSource columnSource, @NotNull final RowSet tableRowSet, final int targetPageSizeInBytes) { final int targetElementsPerPage = Math.toIntExact(Math.min(tableRowSet.size(), targetPageSizeInBytes / Double.BYTES)); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/PrimitiveTransfer.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/FillingPrimitiveTransfer.java similarity index 72% rename from extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/PrimitiveTransfer.java rename to extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/FillingPrimitiveTransfer.java index 2fbdd7e714f..12a195b5097 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/PrimitiveTransfer.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/FillingPrimitiveTransfer.java @@ -14,22 +14,24 @@ import java.nio.Buffer; /** - * PrimitiveTransfer is a generic class that can be used to transfer primitive data types directly from a ColumnSource - * to a Buffer using {@link ColumnSource#fillChunk(ChunkSource.FillContext, WritableChunk, RowSequence)}. + * This is a generic class that can be used to transfer primitive data types directly from a {@link ColumnSource} to a + * {@link Buffer} using {@link ColumnSource#fillChunk(ChunkSource.FillContext, WritableChunk, RowSequence)}. This class + * can only be used if the {@link WritableChunk} and {@link Buffer} are backed by the same array. */ -abstract class PrimitiveTransfer, B extends Buffer> implements TransferObject { - private final C chunk; - private final B buffer; +abstract class FillingPrimitiveTransfer, BUFFER_TYPE extends Buffer> + implements TransferObject { + private final CHUNK_TYPE chunk; + private final BUFFER_TYPE buffer; private final ColumnSource columnSource; private final RowSequence.Iterator tableRowSetIt; private final ChunkSource.FillContext context; private final int targetElementsPerPage; - PrimitiveTransfer( + FillingPrimitiveTransfer( @NotNull final ColumnSource columnSource, @NotNull final RowSequence tableRowSet, - @NotNull final C chunk, - @NotNull final B buffer, + @NotNull final CHUNK_TYPE chunk, + @NotNull final BUFFER_TYPE buffer, final int targetElementsPerPage) { this.columnSource = columnSource; this.tableRowSetIt = tableRowSet.getRowSequenceIterator(); @@ -60,7 +62,7 @@ public final boolean hasMoreDataToBuffer() { } @Override - public final B getBuffer() { + public final BUFFER_TYPE getBuffer() { return buffer; } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/FloatTransfer.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/FloatTransfer.java index 0ec48b9d43d..00f088de577 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/FloatTransfer.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/FloatTransfer.java @@ -17,7 +17,7 @@ import java.nio.FloatBuffer; -final class FloatTransfer extends PrimitiveTransfer, FloatBuffer> { +final class FloatTransfer extends FillingPrimitiveTransfer, FloatBuffer> { static FloatTransfer create(@NotNull final ColumnSource columnSource, @NotNull final RowSet tableRowSet, final int targetPageSizeInBytes) { final int targetElementsPerPage = Math.toIntExact(Math.min(tableRowSet.size(), targetPageSizeInBytes / Float.BYTES)); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/GettingPrimitiveTransfer.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/GettingPrimitiveTransfer.java new file mode 100644 index 00000000000..123cf07645e --- /dev/null +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/GettingPrimitiveTransfer.java @@ -0,0 +1,80 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.parquet.table.transfer; + +import io.deephaven.base.verify.Assert; +import io.deephaven.chunk.ChunkBase; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.engine.rowset.RowSequence; +import io.deephaven.engine.table.ChunkSource; +import io.deephaven.engine.table.ColumnSource; +import org.jetbrains.annotations.NotNull; + +import java.nio.Buffer; + +/** + * This is a generic class used to transfer primitive data types from a {@link ColumnSource} to a {@link Buffer} using + * {@link ColumnSource#getChunk(ChunkSource.GetContext, RowSequence)} and then copying values into the buffer. + */ +abstract class GettingPrimitiveTransfer, BUFFER_TYPE extends Buffer> + implements TransferObject { + protected CHUNK_TYPE chunk; + protected final BUFFER_TYPE buffer; + private final ColumnSource columnSource; + private final RowSequence.Iterator tableRowSetIt; + private final ChunkSource.GetContext context; + private final int targetElementsPerPage; + + GettingPrimitiveTransfer( + @NotNull final ColumnSource columnSource, + @NotNull final RowSequence tableRowSet, + final BUFFER_TYPE buffer, + final int targetElementsPerPage) { + this.columnSource = columnSource; + this.tableRowSetIt = tableRowSet.getRowSequenceIterator(); + this.buffer = buffer; + Assert.gtZero(targetElementsPerPage, "targetElementsPerPage"); + this.targetElementsPerPage = targetElementsPerPage; + context = columnSource.makeGetContext(targetElementsPerPage); + } + + @Override + public int transferOnePageToBuffer() { + if (!hasMoreDataToBuffer()) { + return 0; + } + buffer.clear(); + // Fetch one page worth of data from the column source + final RowSequence rs = tableRowSetIt.getNextRowSequenceWithLength((long) targetElementsPerPage); + // noinspection unchecked + chunk = (CHUNK_TYPE) columnSource.getChunk(context, rs); + copyAllFromChunkToBuffer(); + buffer.flip(); + int ret = chunk.size(); + chunk = null; + return ret; + } + + /** + * Helper method to copy all data from {@code this.chunk} to {@code this.buffer}. The buffer should be cleared + * before calling this method and is positioned for a {@link Buffer#flip()} after the call. + */ + abstract void copyAllFromChunkToBuffer(); + + @Override + public final boolean hasMoreDataToBuffer() { + return tableRowSetIt.hasMore(); + } + + @Override + public final BUFFER_TYPE getBuffer() { + return buffer; + } + + @Override + public final void close() { + context.close(); + tableRowSetIt.close(); + } +} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/InstantArrayTransfer.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/InstantArrayTransfer.java index 786e3ea5c01..fea86404d7b 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/InstantArrayTransfer.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/InstantArrayTransfer.java @@ -12,7 +12,7 @@ import java.time.Instant; final class InstantArrayTransfer extends PrimitiveArrayAndVectorTransfer { - // We encode Instants as primitive longs + // We encode Instant as primitive longs InstantArrayTransfer(@NotNull final ColumnSource columnSource, @NotNull final RowSequence tableRowSet, final int targetPageSizeInBytes) { super(columnSource, tableRowSet, targetPageSizeInBytes / Long.BYTES, targetPageSizeInBytes, diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/InstantVectorTransfer.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/InstantVectorTransfer.java index 13f74b90bdc..88bc385153f 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/InstantVectorTransfer.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/InstantVectorTransfer.java @@ -14,7 +14,7 @@ import java.time.Instant; final class InstantVectorTransfer extends PrimitiveVectorTransfer, LongBuffer> { - // We encode Instants as primitive longs + // We encode Instant as primitive longs InstantVectorTransfer(@NotNull final ColumnSource columnSource, @NotNull final RowSequence tableRowSet, final int targetPageSizeInBytes) { super(columnSource, tableRowSet, targetPageSizeInBytes / Long.BYTES, targetPageSizeInBytes, diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/IntCastablePrimitiveTransfer.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/IntCastablePrimitiveTransfer.java index 5ea7e13b970..023793dc798 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/IntCastablePrimitiveTransfer.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/IntCastablePrimitiveTransfer.java @@ -1,76 +1,21 @@ -/** - * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending - */ package io.deephaven.parquet.table.transfer; -import io.deephaven.base.verify.Assert; import io.deephaven.chunk.ChunkBase; import io.deephaven.chunk.attributes.Values; import io.deephaven.engine.rowset.RowSequence; -import io.deephaven.engine.table.ChunkSource; import io.deephaven.engine.table.ColumnSource; import org.jetbrains.annotations.NotNull; -import java.nio.Buffer; import java.nio.IntBuffer; /** - * A transfer object base class for primitive types that can be cast to int without loss of precision. + * A transfer object base class for primitive types that can be cast to {@code int} without loss of precision. + * Uses {@link IntBuffer} as the buffer type. */ -abstract class IntCastablePrimitiveTransfer> implements TransferObject { - protected T chunk; - protected final IntBuffer buffer; - private final ColumnSource columnSource; - private final RowSequence.Iterator tableRowSetIt; - private final ChunkSource.GetContext context; - private final int targetElementsPerPage; - - IntCastablePrimitiveTransfer(@NotNull final ColumnSource columnSource, @NotNull final RowSequence tableRowSet, - final int targetPageSizeInBytes) { - this.columnSource = columnSource; - this.tableRowSetIt = tableRowSet.getRowSequenceIterator(); - this.targetElementsPerPage = Math.toIntExact(Math.min(tableRowSet.size(), targetPageSizeInBytes / Integer.BYTES)); - Assert.gtZero(targetElementsPerPage, "targetElementsPerPage"); - this.buffer = IntBuffer.allocate(targetElementsPerPage); - context = columnSource.makeGetContext(targetElementsPerPage); - } - - @Override - public int transferOnePageToBuffer() { - if (!hasMoreDataToBuffer()) { - return 0; - } - buffer.clear(); - // Fetch one page worth of data from the column source - final RowSequence rs = tableRowSetIt.getNextRowSequenceWithLength((long) targetElementsPerPage); - // noinspection unchecked - chunk = (T) columnSource.getChunk(context, rs); - copyAllFromChunkToBuffer(); - buffer.flip(); - int ret = chunk.size(); - chunk = null; - return ret; - } - - /** - * Helper method to copy all data from {@code this.chunk} to {@code this.buffer}. The buffer should be cleared - * before calling this method and is positioned for a {@link Buffer#flip()} after the call. - */ - abstract void copyAllFromChunkToBuffer(); - - @Override - public final boolean hasMoreDataToBuffer() { - return tableRowSetIt.hasMore(); - } - - @Override - public final IntBuffer getBuffer() { - return buffer; - } - - @Override - public final void close() { - context.close(); - tableRowSetIt.close(); +abstract class IntCastablePrimitiveTransfer> extends GettingPrimitiveTransfer { + IntCastablePrimitiveTransfer(@NotNull final ColumnSource columnSource, @NotNull final RowSequence tableRowSet, final int targetPageSizeInBytes) { + super(columnSource, tableRowSet, + IntBuffer.allocate(Math.toIntExact(Math.min(tableRowSet.size(), targetPageSizeInBytes / Integer.BYTES))), + Math.toIntExact(Math.min(tableRowSet.size(), targetPageSizeInBytes / Integer.BYTES))); } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/IntTransfer.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/IntTransfer.java index a1ab95f449e..87d8f3f817a 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/IntTransfer.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/IntTransfer.java @@ -12,7 +12,7 @@ import java.nio.IntBuffer; -final class IntTransfer extends PrimitiveTransfer, IntBuffer> { +final class IntTransfer extends FillingPrimitiveTransfer, IntBuffer> { static IntTransfer create(@NotNull final ColumnSource columnSource, @NotNull final RowSet tableRowSet, final int targetPageSizeInBytes) { final int targetElementsPerPage = Math.toIntExact(Math.min(tableRowSet.size(), targetPageSizeInBytes / Integer.BYTES)); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/LongTransfer.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/LongTransfer.java index e4250115b47..3fb5069d8a9 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/LongTransfer.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/LongTransfer.java @@ -17,7 +17,7 @@ import java.nio.LongBuffer; -final class LongTransfer extends PrimitiveTransfer, LongBuffer> { +final class LongTransfer extends FillingPrimitiveTransfer, LongBuffer> { static LongTransfer create(@NotNull final ColumnSource columnSource, @NotNull final RowSet tableRowSet, final int targetPageSizeInBytes) { final int targetElementsPerPage = Math.toIntExact(Math.min(tableRowSet.size(), targetPageSizeInBytes / Long.BYTES)); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/TimeArrayTransfer.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/TimeArrayTransfer.java new file mode 100644 index 00000000000..6664a68fe31 --- /dev/null +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/TimeArrayTransfer.java @@ -0,0 +1,43 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +/* + * --------------------------------------------------------------------------------------------------------------------- + * AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY - for any changes edit InstantArrayTransfer and regenerate + * --------------------------------------------------------------------------------------------------------------------- + */ +package io.deephaven.parquet.table.transfer; + +import io.deephaven.engine.rowset.RowSequence; +import io.deephaven.engine.table.ColumnSource; +import io.deephaven.time.DateTimeUtils; +import org.jetbrains.annotations.NotNull; + +import java.nio.LongBuffer; +import java.time.LocalTime; + +final class TimeArrayTransfer extends PrimitiveArrayAndVectorTransfer { + // We encode LocalTime as primitive longs + TimeArrayTransfer(@NotNull final ColumnSource columnSource, @NotNull final RowSequence tableRowSet, + final int targetPageSizeInBytes) { + super(columnSource, tableRowSet, targetPageSizeInBytes / Long.BYTES, targetPageSizeInBytes, + LongBuffer.allocate(targetPageSizeInBytes / Long.BYTES), Long.BYTES); + } + + @Override + int getSize(final LocalTime @NotNull [] data) { + return data.length; + } + + @Override + void resizeBuffer(final int length) { + buffer = LongBuffer.allocate(length); + } + + @Override + void copyToBuffer(@NotNull final EncodedData data) { + for (final LocalTime t : data.encodedValues) { + buffer.put(DateTimeUtils.nanosOfDay(t)); + } + } +} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/TimeTransfer.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/TimeTransfer.java new file mode 100644 index 00000000000..9021af1511f --- /dev/null +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/TimeTransfer.java @@ -0,0 +1,30 @@ +package io.deephaven.parquet.table.transfer; + +import io.deephaven.chunk.WritableObjectChunk; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.engine.rowset.RowSequence; +import io.deephaven.engine.table.ColumnSource; +import io.deephaven.time.DateTimeUtils; +import org.jetbrains.annotations.NotNull; + +import java.nio.LongBuffer; +import java.time.LocalTime; + +final class TimeTransfer extends GettingPrimitiveTransfer, LongBuffer> { + + TimeTransfer(@NotNull final ColumnSource columnSource, @NotNull final RowSequence tableRowSet, + final int targetPageSizeInBytes) { + super(columnSource, tableRowSet, + LongBuffer.allocate(Math.toIntExact(Math.min(tableRowSet.size(), targetPageSizeInBytes / Long.BYTES))), + Math.toIntExact(Math.min(tableRowSet.size(), targetPageSizeInBytes / Long.BYTES))); + } + + @Override + void copyAllFromChunkToBuffer() { + final int chunkSize = chunk.size(); + for (int chunkIdx = 0; chunkIdx < chunkSize; ++chunkIdx) { + // Store the number of nanoseconds after midnight + buffer.put(DateTimeUtils.nanosOfDay(chunk.get(chunkIdx))); + } + } +} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/TimeVectorTransfer.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/TimeVectorTransfer.java new file mode 100644 index 00000000000..d0936e27ddd --- /dev/null +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/TimeVectorTransfer.java @@ -0,0 +1,40 @@ +/** + * Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending + */ +/* + * --------------------------------------------------------------------------------------------------------------------- + * AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY - for any changes edit InstantVectorTransfer and regenerate + * --------------------------------------------------------------------------------------------------------------------- + */ +package io.deephaven.parquet.table.transfer; + +import io.deephaven.engine.primitive.iterator.CloseableIterator; +import io.deephaven.engine.rowset.RowSequence; +import io.deephaven.engine.table.ColumnSource; +import io.deephaven.time.DateTimeUtils; +import io.deephaven.vector.ObjectVector; +import org.jetbrains.annotations.NotNull; + +import java.nio.LongBuffer; +import java.time.LocalTime; + +final class TimeVectorTransfer extends PrimitiveVectorTransfer, LongBuffer> { + // We encode LocalTime as primitive longs + TimeVectorTransfer(@NotNull final ColumnSource columnSource, @NotNull final RowSequence tableRowSet, + final int targetPageSizeInBytes) { + super(columnSource, tableRowSet, targetPageSizeInBytes / Long.BYTES, targetPageSizeInBytes, + LongBuffer.allocate(targetPageSizeInBytes / Long.BYTES), Long.BYTES); + } + + @Override + void resizeBuffer(final int length) { + buffer = LongBuffer.allocate(length); + } + + @Override + void copyToBuffer(@NotNull final EncodedData> data) { + try (final CloseableIterator dataIterator = data.encodedValues.iterator()) { + dataIterator.forEachRemaining((LocalTime t) -> buffer.put(DateTimeUtils.nanosOfDay(t))); + } + } +} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/TransferObject.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/TransferObject.java index 850d8ee3ab1..756029c299d 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/TransferObject.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/TransferObject.java @@ -20,15 +20,16 @@ import java.nio.IntBuffer; import java.time.Instant; import java.time.LocalDate; +import java.time.LocalTime; import java.util.Map; /** * Classes that implement this interface are responsible for converting data from individual DH columns into buffers * to be written out to the Parquet file. * - * @param The type of the buffer to be written out to the Parquet file + * @param The type of the buffer to be written out to the Parquet file */ -public interface TransferObject extends SafeCloseable { +public interface TransferObject extends SafeCloseable { static TransferObject create( @NotNull final RowSet tableRowSet, @NotNull final ParquetInstructions instructions, @@ -93,6 +94,9 @@ static TransferObject create( if (columnType == LocalDate.class) { return new DateTransfer(columnSource, tableRowSet, instructions.getTargetPageSize()); } + if (columnType == LocalTime.class) { + return new TimeTransfer(columnSource, tableRowSet, instructions.getTargetPageSize()); + } @Nullable final Class componentType = columnSource.getComponentType(); if (columnType.isArray()) { @@ -133,6 +137,9 @@ static TransferObject create( if (componentType == LocalDate.class) { return new DateArrayTransfer(columnSource, tableRowSet, instructions.getTargetPageSize()); } + if (componentType == LocalTime.class) { + return new TimeArrayTransfer(columnSource, tableRowSet, instructions.getTargetPageSize()); + } // TODO(deephaven-core#4612): Handle arrays of BigDecimal and if explicit codec provided } if (Vector.class.isAssignableFrom(columnType)) { @@ -173,6 +180,9 @@ static TransferObject create( if (componentType == LocalDate.class) { return new DateVectorTransfer(columnSource, tableRowSet, instructions.getTargetPageSize()); } + if (componentType == LocalTime.class) { + return new TimeVectorTransfer(columnSource, tableRowSet, instructions.getTargetPageSize()); + } // TODO(deephaven-core#4612): Handle vectors of BigDecimal and if explicit codec provided } @@ -220,7 +230,7 @@ static TransferObject create( * * @return the buffer */ - B getBuffer(); + BUFFER_TYPE getBuffer(); /** * Returns whether we encountered any null value while transferring page data to buffer. This method is only used diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index 40f873f8a3b..752315c0d2e 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -48,6 +48,8 @@ import org.apache.commons.lang3.mutable.*; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -60,6 +62,7 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.time.Instant; +import java.time.LocalTime; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -85,7 +88,7 @@ public final class ParquetTableReadWriteTest { private static final String ROOT_FILENAME = ParquetTableReadWriteTest.class.getName() + "_root"; - public static final int LARGE_TABLE_SIZE = 2_000_000; + private static final int LARGE_TABLE_SIZE = 2_000_000; private static File rootFile; @@ -125,6 +128,7 @@ private static Table getTableFlat(int size, boolean includeSerializable, boolean "someKey = `` + (int)(i /100)", "someBiColumn = java.math.BigInteger.valueOf(ii)", "someDateColumn = i % 10 == 0 ? null : java.time.LocalDate.ofEpochDay(i)", + "someTimeColumn = i % 10 == 0 ? null : java.time.LocalTime.of(i%24, i%60, (i+10)%60)", "nullKey = i < -1?`123`:null", "nullIntColumn = (int)null", "nullLongColumn = (long)null", @@ -137,7 +141,8 @@ private static Table getTableFlat(int size, boolean includeSerializable, boolean "nullTime = (Instant)null", "nullBiColumn = (java.math.BigInteger)null", "nullString = (String)null", - "nullDateColumn = (java.time.LocalDate)null")); + "nullDateColumn = (java.time.LocalDate)null", + "nullTimeColumn = (java.time.LocalTime)null")); if (includeBigDecimal) { columns.add("bdColumn = java.math.BigDecimal.valueOf(ii).stripTrailingZeros()"); } @@ -434,7 +439,8 @@ private static void writeReadTableTest(final Table table, final File dest) { writeReadTableTest(table, dest, ParquetInstructions.EMPTY); } - private static void writeReadTableTest(final Table table, final File dest, ParquetInstructions writeInstructions) { + private static void writeReadTableTest(final Table table, final File dest, + final ParquetInstructions writeInstructions) { ParquetTools.writeTable(table, dest, writeInstructions); final Table fromDisk = ParquetTools.readTable(dest); TstUtils.assertTableEquals(table, fromDisk); @@ -503,6 +509,7 @@ public void testArrayColumns() { "someTimeArrayColumn = new Instant[] {i % 10 == 0 ? null : (Instant)DateTimeUtils.now() + i}", "someBiColumn = new java.math.BigInteger[] {i % 10 == 0 ? null : java.math.BigInteger.valueOf(i)}", "someDateColumn = new java.time.LocalDate[] {i % 10 == 0 ? null : java.time.LocalDate.ofEpochDay(i)}", + "someTimeColumn = new java.time.LocalTime[] {i % 10 == 0 ? null : java.time.LocalTime.of(i%24, i%60, (i+10)%60)}", "nullStringArrayColumn = new String[] {(String)null}", "nullIntArrayColumn = new int[] {(int)null}", "nullLongArrayColumn = new long[] {(long)null}", @@ -514,7 +521,8 @@ public void testArrayColumns() { "nullCharArrayColumn = new char[] {(char)null}", "nullTimeArrayColumn = new Instant[] {(Instant)null}", "nullBiColumn = new java.math.BigInteger[] {(java.math.BigInteger)null}", - "nullDateColumn = new java.time.LocalDate[] {(java.time.LocalDate)null}")); + "nullDateColumn = new java.time.LocalDate[] {(java.time.LocalDate)null}", + "nullTimeColumn = new java.time.LocalTime[] {(java.time.LocalTime)null}")); Table arrayTable = TableTools.emptyTable(10000).select(Selectable.from(columns)); final File dest = new File(rootFile + File.separator + "testArrayColumns.parquet"); @@ -1259,6 +1267,30 @@ public void readWriteStatisticsTest() { assertTableStatistics(groupedTableToSave, groupedTableDest); } + @Test + public void readWriteDateTimeTest() { + final int NUM_ROWS = 1000; + final Table table = TableTools.emptyTable(NUM_ROWS).view( + "someDateColumn = i % 10 == 0 ? null : java.time.LocalDate.ofEpochDay(i)", + "someTimeColumn = i % 10 == 0 ? null : java.time.LocalTime.of(i%24, i%60, (i+10)%60)"); + final File dest = new File(rootFile, "readWriteDateTimeTest.parquet"); + writeReadTableTest(table, dest); + + // Verify that the types are correct in the schema + final ParquetMetadata metadata = new ParquetTableLocationKey(dest, 0, null).getMetadata(); + final ColumnChunkMetaData dateColMetadata = metadata.getBlocks().get(0).getColumns().get(0); + assertTrue(dateColMetadata.toString().contains("someDateColumn")); + assertEquals(PrimitiveType.PrimitiveTypeName.INT32, dateColMetadata.getPrimitiveType().getPrimitiveTypeName()); + assertEquals(LogicalTypeAnnotation.dateType(), dateColMetadata.getPrimitiveType().getLogicalTypeAnnotation()); + + final ColumnChunkMetaData timeColMetadata = metadata.getBlocks().get(0).getColumns().get(1); + assertTrue(timeColMetadata.toString().contains("someTimeColumn")); + assertEquals(PrimitiveType.PrimitiveTypeName.INT64, timeColMetadata.getPrimitiveType().getPrimitiveTypeName()); + final boolean isAdjustedToUTC = true; + assertEquals(LogicalTypeAnnotation.timeType(isAdjustedToUTC, LogicalTypeAnnotation.TimeUnit.NANOS), + timeColMetadata.getPrimitiveType().getLogicalTypeAnnotation()); + } + /** * Test our manual verification techniques against a file generated by pyarrow. Here is the code to produce the file * when/if this file needs to be re-generated or changed. diff --git a/py/server/tests/test_parquet.py b/py/server/tests/test_parquet.py index 42d1e857e3b..c921a7e02a4 100644 --- a/py/server/tests/test_parquet.py +++ b/py/server/tests/test_parquet.py @@ -354,10 +354,12 @@ def test_dictionary_encoding(self): self.assertTrue((metadata.row_group(0).column(2).path_in_schema == 'someIntColumn') & ('RLE_DICTIONARY' not in str(metadata.row_group(0).column(2).encodings))) - def test_dates(self): - dh_table = empty_table(20).update(formulas=[ + def test_dates_and_time(self): + dh_table = empty_table(10000).update(formulas=[ "someDateColumn = i % 10 == 0 ? null : java.time.LocalDate.ofEpochDay(i)", "nullDateColumn = (java.time.LocalDate)null", + "someTimeColumn = i % 10 == 0 ? null : java.time.LocalTime.of(i%24, i%60, (i+10)%60)", + "nullTimeColumn = (java.time.LocalTime)null" ]) write(dh_table, "data_from_dh.parquet", compression_codec_name="SNAPPY") @@ -369,8 +371,10 @@ def test_dates(self): df_from_pandas = pandas.read_parquet("data_from_dh.parquet", use_nullable_dtypes=True) else: df_from_pandas = pandas.read_parquet("data_from_dh.parquet", dtype_backend="numpy_nullable") - self.assertTrue(df_from_disk[["nullDateColumn"]].isnull().values.all()) - self.assertTrue(df_from_pandas[["nullDateColumn"]].isnull().values.all()) + + # Test that all null columns are written as null + self.assertTrue(df_from_disk[["nullDateColumn", "nullTimeColumn"]].isnull().values.all()) + self.assertTrue(df_from_pandas[["nullDateColumn", "nullTimeColumn"]].isnull().values.all()) # Pandas and DH convert date to different types when converting to dataframe, so we need to convert the # dataframe to strings to compare the values @@ -382,10 +386,37 @@ def test_dates(self): df_from_pandas.to_parquet('data_from_pandas.parquet', compression='SNAPPY') from_disk_pandas = read('data_from_pandas.parquet') - # Compare only the date column since the null column is written as different logical types by pandas and + # Compare only the non-null columns because null columns are written as different logical types by pandas and # deephaven - self.assert_table_equals(dh_table.select("someDateColumn"), from_disk_pandas.select("someDateColumn")) + self.assert_table_equals(dh_table.select(["someDateColumn", "someTimeColumn"]), + from_disk_pandas.select(["someDateColumn", "someTimeColumn"])) + def test_time_with_different_units(self): + """ Test that we can write and read time columns with different units """ + dh_table = empty_table(20000).update(formulas=[ + "someTimeColumn = i % 10 == 0 ? null : java.time.LocalTime.of(i%24, i%60, (i+10)%60)" + ]) + write(dh_table, "data_from_dh.parquet") + table = pyarrow.parquet.read_table('data_from_dh.parquet') + + def time_test_helper(pa_table, new_schema, dest): + # Write the provided pyarrow table type-casted to the new schema + pyarrow.parquet.write_table(pa_table.cast(new_schema), dest) + from_disk = read(dest) + df_from_disk = to_pandas(from_disk) + original_df = pa_table.to_pandas() + # Compare the dataframes as strings + print((df_from_disk.astype(str) == original_df.astype(str)).all().values.all()) + + # Test for nanoseconds, microseconds, and milliseconds + schema_nsec = table.schema.set(0, pyarrow.field('someTimeColumn', pyarrow.time64('ns'))) + time_test_helper(table, schema_nsec, "data_from_pq_nsec.parquet") + + schema_usec = table.schema.set(0, pyarrow.field('someTimeColumn', pyarrow.time64('us'))) + time_test_helper(table, schema_usec, "data_from_pq_usec.parquet") + + schema_msec = table.schema.set(0, pyarrow.field('someTimeColumn', pyarrow.time32('ms'))) + time_test_helper(table, schema_msec, "data_from_pq_msec.parquet") if __name__ == '__main__': unittest.main() diff --git a/replication/static/src/main/java/io/deephaven/replicators/ReplicateParquetTransferObjects.java b/replication/static/src/main/java/io/deephaven/replicators/ReplicateParquetTransferObjects.java index d21b3e553d3..1a90d784880 100644 --- a/replication/static/src/main/java/io/deephaven/replicators/ReplicateParquetTransferObjects.java +++ b/replication/static/src/main/java/io/deephaven/replicators/ReplicateParquetTransferObjects.java @@ -15,6 +15,19 @@ public class ReplicateParquetTransferObjects { private static final String PARQUET_INT_ARRAY_TRANSFER_PATH = PARQUET_TRANSFER_DIR + "IntArrayTransfer.java"; private static final String PARQUET_INT_VECTOR_TRANSFER_PATH = PARQUET_TRANSFER_DIR + "IntVectorTransfer.java"; + private static final String PARQUET_INSTANT_ARRAY_TRANSFER_PATH = + PARQUET_TRANSFER_DIR + "InstantArrayTransfer.java"; + private static final String PARQUET_INSTANT_VECTOR_TRANSFER_PATH = + PARQUET_TRANSFER_DIR + "InstantVectorTransfer.java"; + + private static final String PARQUET_DATE_ARRAY_TRANSFER_PATH = PARQUET_TRANSFER_DIR + "DateArrayTransfer.java"; + private static final String PARQUET_DATE_VECTOR_TRANSFER_PATH = PARQUET_TRANSFER_DIR + "DateVectorTransfer.java"; + + private static final String PARQUET_TIME_ARRAY_TRANSFER_PATH = PARQUET_TRANSFER_DIR + "TimeArrayTransfer.java"; + private static final String PARQUET_TIME_VECTOR_TRANSFER_PATH = PARQUET_TRANSFER_DIR + "TimeVectorTransfer.java"; + + private static final String[] NO_EXCEPTIONS = new String[0]; + public static void main(String[] args) throws IOException { charToShortAndByte(PARQUET_CHAR_TRANSFER_PATH); charToShortAndByte(PARQUET_CHAR_ARRAY_TRANSFER_PATH); @@ -25,5 +38,27 @@ public static void main(String[] args) throws IOException { intToLongAndFloatingPoints(PARQUET_INT_ARRAY_TRANSFER_PATH, "int targetPageSizeInBytes", "int length", "int getSize"); intToLongAndFloatingPoints(PARQUET_INT_VECTOR_TRANSFER_PATH, "int targetPageSizeInBytes", "int length"); + + String[][] pairs = new String[][] { + {"InstantArrayTransfer", "DateArrayTransfer"}, + {"InstantVectorTransfer", "DateVectorTransfer"}, + {"DateTimeUtils.epochNanos", "DateTimeUtils.epochDaysAsInt"}, + {"Instant", "LocalDate"}, + {"LongBuffer", "IntBuffer"}, + {"Long", "Integer"}, + {"long", "int"}, + }; + replaceAll(PARQUET_INSTANT_ARRAY_TRANSFER_PATH, PARQUET_DATE_ARRAY_TRANSFER_PATH, null, NO_EXCEPTIONS, pairs); + replaceAll(PARQUET_INSTANT_VECTOR_TRANSFER_PATH, PARQUET_DATE_VECTOR_TRANSFER_PATH, null, NO_EXCEPTIONS, pairs); + + pairs = new String[][] { + {"InstantArrayTransfer", "TimeArrayTransfer"}, + {"InstantVectorTransfer", "TimeVectorTransfer"}, + {"DateTimeUtils.epochNanos", "DateTimeUtils.nanosOfDay"}, + {"Instant", "LocalTime"} + }; + replaceAll(PARQUET_INSTANT_ARRAY_TRANSFER_PATH, PARQUET_TIME_ARRAY_TRANSFER_PATH, null, NO_EXCEPTIONS, pairs); + replaceAll(PARQUET_INSTANT_VECTOR_TRANSFER_PATH, PARQUET_TIME_VECTOR_TRANSFER_PATH, null, NO_EXCEPTIONS, pairs); + // Additional differences can be generated by Spotless } } diff --git a/replication/util/src/main/java/io/deephaven/replication/ReplicatePrimitiveCode.java b/replication/util/src/main/java/io/deephaven/replication/ReplicatePrimitiveCode.java index 2cf19ae13ca..314ea9ff394 100644 --- a/replication/util/src/main/java/io/deephaven/replication/ReplicatePrimitiveCode.java +++ b/replication/util/src/main/java/io/deephaven/replication/ReplicatePrimitiveCode.java @@ -465,8 +465,31 @@ private static void floatToAllFloatingPoints(String sourceClassJavaPath, floatToDouble(sourceClassJavaPath, serialVersionUIDs, exemptions); } + private static String getResultClassPathFromSourceClassPath(String sourceClassJavaPath, + String[] exemptions, String[]... pairs) { + final String sourceClassName = className(sourceClassJavaPath); + final String resultClassName = replaceAllInternal(sourceClassName, null, exemptions, pairs); + final String resultClassJavaPath = basePath(sourceClassJavaPath) + '/' + resultClassName + ".java"; + return resultClassJavaPath; + } + public static String replaceAll(String sourceClassJavaPath, Map serialVersionUIDs, String[] exemptions, String[]... pairs) throws IOException { + final String resultClassJavaPath = + getResultClassPathFromSourceClassPath(sourceClassJavaPath, exemptions, pairs); + return replaceAll(sourceClassJavaPath, resultClassJavaPath, serialVersionUIDs, exemptions, pairs); + } + + public static String replaceAll(String sourceClassJavaPath, String resultClassJavaPath, + Map serialVersionUIDs, String[] exemptions, String[]... pairs) throws IOException { + if (resultClassJavaPath == null || resultClassJavaPath.isEmpty()) { + resultClassJavaPath = getResultClassPathFromSourceClassPath(sourceClassJavaPath, exemptions, pairs); + } + final String resultClassName = className(resultClassJavaPath); + final String resultClassPackageName = packageName(resultClassJavaPath); + final String fullResultClassName = resultClassPackageName + '.' + resultClassName; + final Long serialVersionUID = serialVersionUIDs == null ? null : serialVersionUIDs.get(fullResultClassName); + final InputStream inputStream = new FileInputStream(sourceClassJavaPath); int nextChar; final StringBuilder inputText = new StringBuilder(); @@ -475,17 +498,9 @@ public static String replaceAll(String sourceClassJavaPath, Map se } inputStream.close(); - final String sourceClassName = className(sourceClassJavaPath); - final String packageName = packageName(sourceClassJavaPath); - - final String resultClassName = replaceAllInternal(sourceClassName, null, exemptions, pairs); - final String fullResultClassName = packageName + '.' + resultClassName; - final Long serialVersionUID = serialVersionUIDs == null ? null : serialVersionUIDs.get(fullResultClassName); - final String resultClassJavaPath = basePath(sourceClassJavaPath) + '/' + resultClassName + ".java"; - final Map> noReplicateParts = findNoLocateRegions(resultClassJavaPath); - System.out.println("Generating java file " + resultClassJavaPath); String body = replaceAllInternal(inputText.toString(), serialVersionUID, exemptions, pairs); + final Map> noReplicateParts = findNoLocateRegions(resultClassJavaPath); if (!noReplicateParts.isEmpty()) { final StringReader sr = new StringReader(body); List lines = IOUtils.readLines(sr); @@ -510,7 +525,7 @@ public static String replaceAll(String sourceClassJavaPath, Map se out.println( " * ---------------------------------------------------------------------------------------------------------------------"); out.println(" * AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY - for any changes edit " - + sourceClassName + " and regenerate"); + + className(sourceClassJavaPath) + " and regenerate"); out.println( " * ---------------------------------------------------------------------------------------------------------------------"); out.println(