From da819fadf8b603c8ba0df7163107437461b2f78b Mon Sep 17 00:00:00 2001 From: Fokko Date: Wed, 20 Nov 2024 17:33:28 +0100 Subject: [PATCH] Ignore partition fields that are dropped from the current-schema Fixes #4563 --- .../org/apache/iceberg/PartitionSpec.java | 10 ++-- .../java/org/apache/iceberg/types/Type.java | 3 +- .../java/org/apache/iceberg/types/Types.java | 19 +++++++ .../org/apache/iceberg/types/TestTypes.java | 4 +- .../java/org/apache/iceberg/Partitioning.java | 8 +-- .../apache/iceberg/avro/AvroSchemaUtil.java | 2 + .../org/apache/iceberg/avro/TypeToSchema.java | 4 ++ .../TestAlterTablePartitionFields.java | 54 +++++++++++++++---- 8 files changed, 84 insertions(+), 20 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/PartitionSpec.java b/api/src/main/java/org/apache/iceberg/PartitionSpec.java index 9b74893f1831..871d49459155 100644 --- a/api/src/main/java/org/apache/iceberg/PartitionSpec.java +++ b/api/src/main/java/org/apache/iceberg/PartitionSpec.java @@ -131,6 +131,12 @@ public StructType partitionType() { for (PartitionField field : fields) { Type sourceType = schema.findType(field.sourceId()); Type resultType = field.transform().getResultType(sourceType); + + // When the source field has been dropped we cannot determine the type + if (resultType == null) { + resultType = Types.UnknownType.get(); + } + structFields.add(Types.NestedField.optional(field.fieldId(), field.name(), resultType)); } @@ -632,9 +638,7 @@ static void checkCompatibility(PartitionSpec spec, Schema schema) { // https://iceberg.apache.org/spec/#partition-transforms // We don't care about the source type since a VoidTransform is always compatible and skip the // checks - if (!transform.equals(Transforms.alwaysNull())) { - ValidationException.check( - sourceType != null, "Cannot find source column for partition field: %s", field); + if (sourceType != null && !transform.equals(Transforms.alwaysNull())) { ValidationException.check( sourceType.isPrimitiveType(), "Cannot partition by non-primitive source field: %s", diff --git a/api/src/main/java/org/apache/iceberg/types/Type.java b/api/src/main/java/org/apache/iceberg/types/Type.java index 30870535521f..f4c6f22134a5 100644 --- a/api/src/main/java/org/apache/iceberg/types/Type.java +++ b/api/src/main/java/org/apache/iceberg/types/Type.java @@ -46,7 +46,8 @@ enum TypeID { STRUCT(StructLike.class), LIST(List.class), MAP(Map.class), - VARIANT(Object.class); + VARIANT(Object.class), + UNKNOWN(Object.class); private final Class javaClass; diff --git a/api/src/main/java/org/apache/iceberg/types/Types.java b/api/src/main/java/org/apache/iceberg/types/Types.java index 3c03a3defb42..c610f498670d 100644 --- a/api/src/main/java/org/apache/iceberg/types/Types.java +++ b/api/src/main/java/org/apache/iceberg/types/Types.java @@ -55,6 +55,7 @@ private Types() {} .put(StringType.get().toString(), StringType.get()) .put(UUIDType.get().toString(), UUIDType.get()) .put(BinaryType.get().toString(), BinaryType.get()) + .put(UnknownType.get().toString(), UnknownType.get()) .buildOrThrow(); private static final Pattern FIXED = Pattern.compile("fixed\\[\\s*(\\d+)\\s*\\]"); @@ -412,6 +413,24 @@ public String toString() { } } + public static class UnknownType extends PrimitiveType { + private static final UnknownType INSTANCE = new UnknownType(); + + public static UnknownType get() { + return INSTANCE; + } + + @Override + public TypeID typeId() { + return TypeID.UNKNOWN; + } + + @Override + public String toString() { + return "unknown"; + } + } + public static class VariantType implements Type { private static final VariantType INSTANCE = new VariantType(); diff --git a/api/src/test/java/org/apache/iceberg/types/TestTypes.java b/api/src/test/java/org/apache/iceberg/types/TestTypes.java index 226c53f1e9ce..f66bc2e93694 100644 --- a/api/src/test/java/org/apache/iceberg/types/TestTypes.java +++ b/api/src/test/java/org/apache/iceberg/types/TestTypes.java @@ -44,7 +44,7 @@ public void fromPrimitiveString() { assertThat(Types.fromPrimitiveString("Decimal(2,3)")).isEqualTo(Types.DecimalType.of(2, 3)); assertThatExceptionOfType(IllegalArgumentException.class) - .isThrownBy(() -> Types.fromPrimitiveString("Unknown")) - .withMessageContaining("Unknown"); + .isThrownBy(() -> Types.fromPrimitiveString("unknown-type")) + .withMessageContaining("unknown-type"); } } diff --git a/core/src/main/java/org/apache/iceberg/Partitioning.java b/core/src/main/java/org/apache/iceberg/Partitioning.java index 832e0b59fe50..c708d39f523e 100644 --- a/core/src/main/java/org/apache/iceberg/Partitioning.java +++ b/core/src/main/java/org/apache/iceberg/Partitioning.java @@ -239,7 +239,8 @@ public static StructType groupingKeyType(Schema schema, Collection specs = table.specs().values(); - return buildPartitionProjectionType("table partition", specs, allFieldIds(specs)); + return buildPartitionProjectionType( + "table partition", specs, allActiveFieldIds(table.schema(), specs)); } /** @@ -346,10 +347,11 @@ private static boolean compatibleTransforms(Transform t1, Transform || t2.equals(Transforms.alwaysNull()); } - // collects IDs of all partition field used across specs - private static Set allFieldIds(Collection specs) { + // collects IDs of all partition field used across specs that are in the current schema + private static Set allActiveFieldIds(Schema schema, Collection specs) { return FluentIterable.from(specs) .transformAndConcat(PartitionSpec::fields) + .filter(field -> schema.findField(field.sourceId()) != null) .transform(PartitionField::fieldId) .toSet(); } diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java index 032d63105dfe..3e3f8d38e46c 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java @@ -188,6 +188,8 @@ static Schema toOption(Schema schema) { Preconditions.checkArgument( isOptionSchema(schema), "Union schemas are not supported: %s", schema); return schema; + } else if (schema.getType() == Schema.Type.NULL) { + return schema; } else { return Schema.createUnion(NULL, schema); } diff --git a/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java b/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java index 05ce4e618662..2383d0a01455 100644 --- a/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java +++ b/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java @@ -49,6 +49,7 @@ abstract class TypeToSchema extends TypeUtil.SchemaVisitor { private static final Schema UUID_SCHEMA = LogicalTypes.uuid().addToSchema(Schema.createFixed("uuid_fixed", null, null, 16)); private static final Schema BINARY_SCHEMA = Schema.create(Schema.Type.BYTES); + private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL); static { TIMESTAMP_SCHEMA.addProp(AvroSchemaUtil.ADJUST_TO_UTC_PROP, false); @@ -243,6 +244,9 @@ public Schema primitive(Type.PrimitiveType primitive) { null, TypeUtil.decimalRequiredBytes(decimal.precision()))); break; + case UNKNOWN: + primitiveSchema = NULL_SCHEMA; + break; default: throw new UnsupportedOperationException("Unsupported type ID: " + primitive.typeId()); } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java index 38e5c942c9ff..b3fd08572b21 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java @@ -20,6 +20,9 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.List; import org.apache.iceberg.Parameter; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Parameters; @@ -27,6 +30,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkCatalogConfig; import org.apache.iceberg.spark.source.SparkTable; import org.apache.spark.sql.connector.catalog.CatalogManager; @@ -530,26 +534,54 @@ public void testSparkTableAddDropPartitions() throws Exception { } @TestTemplate - public void testDropColumnOfOldPartitionFieldV1() { - // default table created in v1 format + public void testDropColumnOfOldPartitionField() { sql( - "CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts) TBLPROPERTIES('format-version' = '1')", - tableName); + "CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts) TBLPROPERTIES('format-version' = %d)", + tableName, formatVersion); sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)", tableName); sql("ALTER TABLE %s DROP COLUMN day_of_ts", tableName); } - @TestTemplate - public void testDropColumnOfOldPartitionFieldV2() { + private void runCreateAndDropPartitionField( + String column, String partitionType, List expected, String predicate) { + sql("DROP TABLE IF EXISTS %s", tableName); sql( - "CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts) TBLPROPERTIES('format-version' = '2')", - tableName); - - sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)", tableName); + "CREATE TABLE %s (col_int INTEGER, col_ts TIMESTAMP_NTZ, col_long BIGINT) USING ICEBERG TBLPROPERTIES ('format-version' = %d)", + tableName, formatVersion); + sql("INSERT INTO %s VALUES (1000, CAST('2024-03-01 19:25:00' as TIMESTAMP), 2100)", tableName); + sql("ALTER TABLE %s ADD PARTITION FIELD %s AS col2_partition", tableName, partitionType); + sql("INSERT INTO %s VALUES (2000, CAST('2024-04-01 19:25:00' as TIMESTAMP), 2200)", tableName); + sql("ALTER TABLE %s DROP PARTITION FIELD col2_partition", tableName); + sql("INSERT INTO %s VALUES (3000, CAST('2024-05-01 19:25:00' as TIMESTAMP), 2300)", tableName); + sql("ALTER TABLE %s DROP COLUMN %s", tableName, column); + + assertEquals( + "Should return correct data", + expected, + sql("SELECT * FROM %s WHERE %s ORDER BY col_int", tableName, predicate)); + } - sql("ALTER TABLE %s DROP COLUMN day_of_ts", tableName); + @TestTemplate + public void testDropPartitionAndUnderlyingField() { + String predicateLong = "col_ts >= '2024-04-01 19:25:00'"; + List expectedLong = + Lists.newArrayList( + new Object[] {2000, LocalDateTime.ofEpochSecond(1711999500, 0, ZoneOffset.UTC)}, + new Object[] {3000, LocalDateTime.ofEpochSecond(1714591500, 0, ZoneOffset.UTC)}); + runCreateAndDropPartitionField("col_long", "col_long", expectedLong, predicateLong); + runCreateAndDropPartitionField( + "col_long", "truncate(2, col_long)", expectedLong, predicateLong); + runCreateAndDropPartitionField("col_long", "bucket(16, col_long)", expectedLong, predicateLong); + + String predicateTs = "col_long >= 2200"; + List expectedTs = + Lists.newArrayList(new Object[] {2000, 2200L}, new Object[] {3000, 2300L}); + runCreateAndDropPartitionField("col_ts", "col_ts", expectedTs, predicateTs); + runCreateAndDropPartitionField("col_ts", "year(col_ts)", expectedTs, predicateTs); + runCreateAndDropPartitionField("col_ts", "month(col_ts)", expectedTs, predicateTs); + runCreateAndDropPartitionField("col_ts", "day(col_ts)", expectedTs, predicateTs); } private void assertPartitioningEquals(SparkTable table, int len, String transform) {