Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Config for deciding whether to use Iceberg Time type #11174

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String DEFAULT_CONTROL_TOPIC = "control-iceberg";
public static final String DEFAULT_CONTROL_GROUP_PREFIX = "cg-control-";

private static final String CONVERT_CONNECT_TIME_TO_ICEBERG_INTEGER_TYPE = "iceberg.convert.connect-time-to.iceberg-time-type";
private static final boolean CONVERT_CONNECT_TIME_TO_ICEBERG_INTEGER_TYPE_DEFAULT = false;

public static final int SCHEMA_UPDATE_RETRIES = 2; // 3 total attempts
public static final int CREATE_TABLE_RETRIES = 2; // 3 total attempts

Expand Down Expand Up @@ -210,6 +213,13 @@ private static ConfigDef newConfigDef() {
null,
Importance.MEDIUM,
"If specified, Hadoop config files in this directory will be loaded");
configDef.define(
CONVERT_CONNECT_TIME_TO_ICEBERG_INTEGER_TYPE,
ConfigDef.Type.BOOLEAN,
CONVERT_CONNECT_TIME_TO_ICEBERG_INTEGER_TYPE_DEFAULT,
Importance.HIGH,
"specify whether to convert to iceberg time type as spark does not have the support to read"
+ "iceberg time type");
return configDef;
}

Expand Down Expand Up @@ -322,6 +332,10 @@ public String tablesDefaultPartitionBy() {
return getString(TABLES_DEFAULT_PARTITION_BY);
}

public boolean shouldConvertConnectTimeToIcebergIntegerType() {
return getBoolean(CONVERT_CONNECT_TIME_TO_ICEBERG_INTEGER_TYPE);
}

public TableSinkConfig tableConfig(String tableName) {
return tableConfigMap.computeIfAbsent(
tableName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ protected int convertInt(Object value) {
return ((Number) value).intValue();
} else if (value instanceof String) {
return Integer.parseInt((String) value);
} else if (!config.shouldConvertConnectTimeToIcebergIntegerType() && value instanceof Date) {
return Long.valueOf(((Date) value).getTime()).intValue();
}
throw new IllegalArgumentException("Cannot convert to int: " + value.getClass().getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ Type toIcebergType(Schema valueSchema) {
case INT32:
if (Date.LOGICAL_NAME.equals(valueSchema.name())) {
return DateType.get();
} else if (Time.LOGICAL_NAME.equals(valueSchema.name())) {
} else if (!config.shouldConvertConnectTimeToIcebergIntegerType() && Time.LOGICAL_NAME.equals(valueSchema.name())) {
return TimeType.get();
}
return IntegerType.get();
Expand Down Expand Up @@ -307,7 +307,7 @@ Type inferIcebergType(Object value) {
} else if (value instanceof LocalDate) {
return DateType.get();
} else if (value instanceof LocalTime) {
return TimeType.get();
return config.shouldConvertConnectTimeToIcebergIntegerType() ? IntegerType.get() : TimeType.get();
} else if (value instanceof java.util.Date || value instanceof OffsetDateTime) {
return TimestampType.withZone();
} else if (value instanceof LocalDateTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ public static void beforeAll() {
public void before() {
this.config = mock(IcebergSinkConfig.class);
when(config.jsonConverter()).thenReturn(JSON_CONVERTER);
when(config.shouldConvertConnectTimeToIcebergIntegerType()).thenReturn(false);
}

@Test
Expand Down Expand Up @@ -780,7 +781,7 @@ private void assertTypesAddedFromStruct(Function<String, Type> fn) {
assertThat(fn.apply("i")).isInstanceOf(IntegerType.class);
assertThat(fn.apply("l")).isInstanceOf(LongType.class);
assertThat(fn.apply("d")).isInstanceOf(DateType.class);
assertThat(fn.apply("t")).isInstanceOf(TimeType.class);
assertThat(fn.apply("t")).isInstanceOf(config.shouldConvertConnectTimeToIcebergIntegerType() ? IntegerType.class : TimeType.class);
assertThat(fn.apply("ts")).isInstanceOf(TimestampType.class);
assertThat(fn.apply("tsz")).isInstanceOf(TimestampType.class);
assertThat(fn.apply("fl")).isInstanceOf(FloatType.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void testToIcebergType(boolean forceOptional) {
assertThat(SchemaUtils.toIcebergType(Schema.STRING_SCHEMA, config))
.isInstanceOf(StringType.class);
assertThat(SchemaUtils.toIcebergType(Date.SCHEMA, config)).isInstanceOf(DateType.class);
assertThat(SchemaUtils.toIcebergType(Time.SCHEMA, config)).isInstanceOf(TimeType.class);
assertThat(SchemaUtils.toIcebergType(Time.SCHEMA, config)).isInstanceOf(config.shouldConvertConnectTimeToIcebergIntegerType() ? IntegerType.class : TimeType.class);

Type timestampType = SchemaUtils.toIcebergType(Timestamp.SCHEMA, config);
assertThat(timestampType).isInstanceOf(TimestampType.class);
Expand Down Expand Up @@ -281,7 +281,7 @@ public void testInferIcebergType() {
assertThat(SchemaUtils.inferIcebergType("foobar", config)).isInstanceOf(StringType.class);
assertThat(SchemaUtils.inferIcebergType(true, config)).isInstanceOf(BooleanType.class);
assertThat(SchemaUtils.inferIcebergType(LocalDate.now(), config)).isInstanceOf(DateType.class);
assertThat(SchemaUtils.inferIcebergType(LocalTime.now(), config)).isInstanceOf(TimeType.class);
assertThat(SchemaUtils.inferIcebergType(LocalTime.now(), config)).isInstanceOf(config.shouldConvertConnectTimeToIcebergIntegerType() ? IntegerType.class : TimeType.class);

Type timestampType = SchemaUtils.inferIcebergType(new java.util.Date(), config);
assertThat(timestampType).isInstanceOf(TimestampType.class);
Expand Down