Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fixes and improved support for Parquet TIMESTAMP #4801

Merged
merged 5 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.LocalDateTime;

/**
* Utility class to concentrate {@link ObjectCodec} lookups.
Expand Down Expand Up @@ -76,6 +77,7 @@ private static boolean noCodecRequired(@NotNull final Class<?> dataType) {
dataType == Instant.class ||
dataType == LocalDate.class ||
dataType == LocalTime.class ||
dataType == LocalDateTime.class ||
dataType == String.class ||
// A BigDecimal column maps to a logical type of decimal, with
// appropriate precision and scale calculated from column data,
Expand Down
60 changes: 60 additions & 0 deletions engine/time/src/main/java/io/deephaven/time/DateTimeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,20 @@ public static long epochNanos(@Nullable final ZonedDateTime dateTime) {
return safeComputeNanos(dateTime.toEpochSecond(), dateTime.getNano());
}

/**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case my comments seem strong, this library is very exposed to users, so it needs to be ultra currated. Functions should only get added when there is a compelling reason.

  1. I am not a fan of having hard-coded methods for any specific timezone.
  2. The methods should accept a time zone as an input.
  3. If methods are added for LocalDateTime, LocalDateTime signatures should be added to all relevant methods.

* Returns nanoseconds from the Epoch for a {@link LocalDateTime} value in UTC timezone.
*
* @param localDateTime the local date time to compute the Epoch offset for
* @return nanoseconds since Epoch, or a NULL_LONG value if the local date time is null
*/
@ScriptApi
public static long epochNanosUTC(@Nullable final LocalDateTime localDateTime) {
if (localDateTime == null) {
return NULL_LONG;
}
return epochNanos(localDateTime.toInstant(ZoneOffset.UTC));
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Returns microseconds from the Epoch for an {@link Instant} value.
*
Expand Down Expand Up @@ -1399,6 +1413,52 @@ public static ZonedDateTime excelToZonedDateTime(final double excel, @Nullable f
return epochMillisToZonedDateTime(excelTimeToEpochMillis(excel, timeZone), timeZone);
}

/**
* Converts nanoseconds from the Epoch to a {@link LocalDateTime} in UTC timezone.
*
* @param nanos nanoseconds since Epoch
* @return {@code null} if the input is {@link QueryConstants#NULL_LONG}; otherwise the input nanoseconds from the
* Epoch converted to a {@link LocalDateTime} in UTC timezone
*/
public static @Nullable LocalDateTime epochNanosToLocalDateTimeUTC(final long nanos) {
if (nanos == QueryConstants.NULL_LONG) {
return null;
}
return LocalDateTime.ofEpochSecond(nanos / SECOND, (int) (nanos % SECOND), ZoneOffset.UTC);
}

/**
* Converts microseconds from the Epoch to a {@link LocalDateTime} in UTC timezone.
*
* @param micros microseconds since Epoch
* @return {@code null} if the input is {@link QueryConstants#NULL_LONG}; otherwise the input microseconds from the
* Epoch converted to a {@link LocalDateTime} in UTC timezone
*/
public static @Nullable LocalDateTime epochMicrosToLocalDateTimeUTC(final long micros) {
if (micros == QueryConstants.NULL_LONG) {
return null;
}
final long microsPerSecond = 1_000_000;
return LocalDateTime.ofEpochSecond(micros / microsPerSecond, (int) ((micros % microsPerSecond) * MICRO),
ZoneOffset.UTC);
}

/**
* Converts milliseconds from the Epoch to a {@link LocalDateTime} in UTC timezone.
*
* @param millis milliseconds since Epoch
* @return {@code null} if the input is {@link QueryConstants#NULL_LONG}; otherwise the input milliseconds from the
* Epoch converted to a {@link LocalDateTime} in UTC timezone
*/
public static @Nullable LocalDateTime epochMillisToLocalDateTimeUTC(final long millis) {
if (millis == QueryConstants.NULL_LONG) {
return null;
}
final long millisPerSecond = 1_000;
return LocalDateTime.ofEpochSecond(millis / millisPerSecond, (int) ((millis % millisPerSecond) * MILLI),
ZoneOffset.UTC);
}
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved

// endregion

// region Arithmetic
Expand Down
16 changes: 16 additions & 0 deletions engine/time/src/test/java/io/deephaven/time/TestDateTimeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,10 @@ public void testEpochNanos() {

TestCase.assertEquals(nanos, DateTimeUtils.epochNanos(dt3));
TestCase.assertEquals(NULL_LONG, DateTimeUtils.epochNanos((ZonedDateTime) null));

final LocalDateTime ldt = LocalDateTime.ofInstant(dt2, ZoneId.of("UTC"));
TestCase.assertEquals(nanos, DateTimeUtils.epochNanosUTC(ldt));
TestCase.assertEquals(NULL_LONG, DateTimeUtils.epochNanosUTC(null));
}

public void testEpochMicros() {
Expand Down Expand Up @@ -1456,6 +1460,10 @@ public void testEpochNanosTo() {
TestCase.assertEquals(dt3, DateTimeUtils.epochNanosToZonedDateTime(nanos, TZ_JP));
TestCase.assertNull(DateTimeUtils.epochNanosToZonedDateTime(NULL_LONG, TZ_JP));
TestCase.assertNull(DateTimeUtils.epochNanosToZonedDateTime(nanos, null));

final LocalDateTime ldt = LocalDateTime.ofInstant(dt2, ZoneId.of("UTC"));
TestCase.assertEquals(ldt, DateTimeUtils.epochNanosToLocalDateTimeUTC(nanos));
TestCase.assertNull(DateTimeUtils.epochNanosToLocalDateTimeUTC(NULL_LONG));
}

public void testEpochMicrosTo() {
Expand All @@ -1471,6 +1479,10 @@ public void testEpochMicrosTo() {
TestCase.assertEquals(dt3, DateTimeUtils.epochMicrosToZonedDateTime(micros, TZ_JP));
TestCase.assertNull(DateTimeUtils.epochMicrosToZonedDateTime(NULL_LONG, TZ_JP));
TestCase.assertNull(DateTimeUtils.epochMicrosToZonedDateTime(micros, null));

final LocalDateTime ldt = LocalDateTime.ofInstant(dt2, ZoneId.of("UTC"));
TestCase.assertEquals(ldt, DateTimeUtils.epochMicrosToLocalDateTimeUTC(micros));
TestCase.assertNull(DateTimeUtils.epochMicrosToLocalDateTimeUTC(NULL_LONG));
}

public void testEpochMillisTo() {
Expand All @@ -1486,6 +1498,10 @@ public void testEpochMillisTo() {
TestCase.assertEquals(dt3, DateTimeUtils.epochMillisToZonedDateTime(millis, TZ_JP));
TestCase.assertNull(DateTimeUtils.epochMillisToZonedDateTime(NULL_LONG, TZ_JP));
TestCase.assertNull(DateTimeUtils.epochMillisToZonedDateTime(millis, null));

final LocalDateTime ldt = LocalDateTime.ofInstant(dt2, ZoneId.of("UTC"));
TestCase.assertEquals(ldt, DateTimeUtils.epochMillisToLocalDateTimeUTC(millis));
TestCase.assertNull(DateTimeUtils.epochMillisToLocalDateTimeUTC(NULL_LONG));
}

public void testEpochSecondsTo() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,23 +236,13 @@ private static void buildChildren(Types.GroupBuilder builder, Iterator<SchemaEle
}

if (schemaElement.isSetLogicalType()) {
LogicalType logicalType = schemaElement.logicalType;
if (logicalType.isSetTIMESTAMP()) {
TimestampType timestamp = logicalType.getTIMESTAMP();
if (!timestamp.isAdjustedToUTC) {
// TODO(deephaven-core#976): Unable to read non UTC adjusted timestamps
throw new ParquetFileReaderException(String.format(
"Only UTC timestamp is supported, found time column `%s` with isAdjustedToUTC=false",
schemaElement.getName()));
}
}
((Types.Builder) childBuilder).as(getLogicalTypeAnnotation(logicalType));
((Types.Builder) childBuilder).as(getLogicalTypeAnnotation(schemaElement.logicalType));
}

if (schemaElement.isSetConverted_type()) {
LogicalTypeAnnotation originalType =
getLogicalTypeAnnotation(schemaElement.converted_type, schemaElement);
LogicalTypeAnnotation newOriginalType = schemaElement.isSetLogicalType()
final LogicalTypeAnnotation originalType = getLogicalTypeAnnotation(
schemaElement.converted_type, schemaElement.logicalType, schemaElement);
final LogicalTypeAnnotation newOriginalType = schemaElement.isSetLogicalType()
&& getLogicalTypeAnnotation(schemaElement.logicalType) != null
? getLogicalTypeAnnotation(schemaElement.logicalType)
: null;
Expand Down Expand Up @@ -299,20 +289,20 @@ static LogicalTypeAnnotation getLogicalTypeAnnotation(LogicalType type) throws P
case LIST:
return LogicalTypeAnnotation.listType();
case TIME:
TimeType time = type.getTIME();
final TimeType time = type.getTIME();
return LogicalTypeAnnotation.timeType(time.isAdjustedToUTC, convertTimeUnit(time.unit));
case STRING:
return LogicalTypeAnnotation.stringType();
case DECIMAL:
DecimalType decimal = type.getDECIMAL();
final DecimalType decimal = type.getDECIMAL();
return LogicalTypeAnnotation.decimalType(decimal.scale, decimal.precision);
case INTEGER:
IntType integer = type.getINTEGER();
final IntType integer = type.getINTEGER();
return LogicalTypeAnnotation.intType(integer.bitWidth, integer.isSigned);
case UNKNOWN:
return null;
case TIMESTAMP:
TimestampType timestamp = type.getTIMESTAMP();
final TimestampType timestamp = type.getTIMESTAMP();
return LogicalTypeAnnotation.timestampType(timestamp.isAdjustedToUTC, convertTimeUnit(timestamp.unit));
default:
throw new ParquetFileReaderException("Unknown logical type " + type);
Expand Down Expand Up @@ -354,9 +344,9 @@ private static org.apache.parquet.schema.ColumnOrder fromParquetColumnOrder(Colu
return org.apache.parquet.schema.ColumnOrder.undefined();
}

private static LogicalTypeAnnotation getLogicalTypeAnnotation(ConvertedType type, SchemaElement schemaElement)
throws ParquetFileReaderException {
switch (type) {
private static LogicalTypeAnnotation getLogicalTypeAnnotation(final ConvertedType convertedType,
final LogicalType logicalType, final SchemaElement schemaElement) throws ParquetFileReaderException {
switch (convertedType) {
case UTF8:
return LogicalTypeAnnotation.stringType();
case MAP:
Expand All @@ -368,23 +358,23 @@ private static LogicalTypeAnnotation getLogicalTypeAnnotation(ConvertedType type
case ENUM:
return LogicalTypeAnnotation.enumType();
case DECIMAL:
int scale = schemaElement == null ? 0 : schemaElement.scale;
int precision = schemaElement == null ? 0 : schemaElement.precision;
final int scale = schemaElement == null ? 0 : schemaElement.scale;
final int precision = schemaElement == null ? 0 : schemaElement.precision;
return LogicalTypeAnnotation.decimalType(scale, precision);
case DATE:
return LogicalTypeAnnotation.dateType();
case TIME_MILLIS:
// TODO(deephaven-core#976) Assuming that time is adjusted to UTC
// isAdjustedToUTC parameter is ignored while reading Parquet TIME type, so assume it whatever
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MILLIS);
case TIME_MICROS:
// TODO(deephaven-core#976) Assuming that time is adjusted to UTC
return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MICROS);
case TIMESTAMP_MILLIS:
// TODO(deephaven-core#976) Assuming that time is adjusted to UTC
return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS);
// Converted type doesn't have isAdjustedToUTC parameter, so use the information from logical type
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
return LogicalTypeAnnotation.timestampType(isAdjustedToUTC(logicalType),
LogicalTypeAnnotation.TimeUnit.MILLIS);
case TIMESTAMP_MICROS:
// TODO(deephaven-core#976) Assuming that time is adjusted to UTC
return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS);
return LogicalTypeAnnotation.timestampType(isAdjustedToUTC(logicalType),
LogicalTypeAnnotation.TimeUnit.MICROS);
case INTERVAL:
return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance();
case INT_8:
Expand All @@ -409,8 +399,21 @@ private static LogicalTypeAnnotation getLogicalTypeAnnotation(ConvertedType type
return LogicalTypeAnnotation.bsonType();
default:
throw new ParquetFileReaderException(
"Can't convert converted type to logical type, unknown converted type " + type);
"Can't convert converted type to logical type, unknown converted type " + convertedType);
}
}

/**
* Helper method to determine if a logical type is adjusted to UTC.
*
* @param logicalType the logical type to check
* @return true if the logical type is a timestamp adjusted to UTC, false otherwise
*/
private static boolean isAdjustedToUTC(final LogicalType logicalType) {
if (logicalType.getSetField() == LogicalType._Fields.TIMESTAMP) {
return logicalType.getTIMESTAMP().isAdjustedToUTC;
}
return false;
}

public MessageType getSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.math.BigInteger;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.*;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -350,15 +351,13 @@ public Optional<Class<?>> visit(final LogicalTypeAnnotation.TimeLogicalTypeAnnot
@Override
public Optional<Class<?>> visit(
final LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
// TODO(deephaven-core#976): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted
// to UTC
if (timestampLogicalType.isAdjustedToUTC()) {
switch (timestampLogicalType.getUnit()) {
case MILLIS:
case MICROS:
case NANOS:
return Optional.of(Instant.class);
}
switch (timestampLogicalType.getUnit()) {
case MILLIS:
case MICROS:
case NANOS:
// TIMESTAMP fields if adjusted to UTC are read as Instants, else as LocalDatetimes.
return timestampLogicalType.isAdjustedToUTC() ? Optional.of(Instant.class)
: Optional.of(LocalDateTime.class);
}
errorString.setValue("TimestampLogicalType, isAdjustedToUTC=" + timestampLogicalType.isAdjustedToUTC()
+ ", unit=" + timestampLogicalType.getUnit());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.LocalDateTime;
import java.util.*;
import java.util.function.Supplier;

Expand All @@ -51,6 +52,7 @@ public class TypeInfos {
BigIntegerType.INSTANCE,
LocalDateType.INSTANCE,
LocalTimeType.INSTANCE,
LocalDateTimeType.INSTANCE,
};

private static final Map<Class<?>, TypeInfo> BY_CLASS;
Expand Down Expand Up @@ -376,11 +378,33 @@ public PrimitiveBuilder<PrimitiveType> getBuilder(boolean required, boolean repe
if (!isValidFor(dataType)) {
throw new IllegalArgumentException("Invalid data type " + dataType);
}
// Write instants as Parquet TIMESTAMP(isAdjustedToUTC = true, unit = NANOS)
return type(PrimitiveTypeName.INT64, required, repeating)
.as(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.NANOS));
}
}

private enum LocalDateTimeType implements TypeInfo {
INSTANCE;

private static final Set<Class<?>> clazzes = Collections.singleton(LocalDateTime.class);

@Override
public Set<Class<?>> getTypes() {
return clazzes;
}

@Override
public PrimitiveBuilder<PrimitiveType> getBuilder(boolean required, boolean repeating, Class<?> dataType) {
if (!isValidFor(dataType)) {
throw new IllegalArgumentException("Invalid data type " + dataType);
}
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
// Write LocalDateTime as Parquet TIMESTAMP(isAdjustedToUTC = false, unit = NANOS)
return type(PrimitiveTypeName.INT64, required, repeating)
.as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.NANOS));
}
}

private enum LocalDateType implements TypeInfo {
INSTANCE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,13 +723,10 @@ private static class LogicalTypeVisitor<ATTR extends Any>
@Override
public Optional<ToPage<ATTR, ?>> visit(
final LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
// TODO(deephaven-core#976): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted
// to UTC
if (timestampLogicalType.isAdjustedToUTC()) {
return Optional
.of(ToInstantPage.create(componentType, timestampLogicalType.getUnit()));
return Optional.of(ToInstantPage.create(componentType, timestampLogicalType.getUnit()));
}
return Optional.empty();
return Optional.of(ToLocalDateTimePage.create(componentType, timestampLogicalType.getUnit()));
}

@Override
Expand Down
Loading
Loading