Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore partition fields that are dropped from the current-schema #11868

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions api/src/main/java/org/apache/iceberg/PartitionSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ public StructType partitionType() {
for (PartitionField field : fields) {
Type sourceType = schema.findType(field.sourceId());
Type resultType = field.transform().getResultType(sourceType);

// When the source field has been dropped we cannot determine the type
if (resultType == null) {
resultType = Types.UnknownType.get();
}

structFields.add(Types.NestedField.optional(field.fieldId(), field.name(), resultType));
}

Expand Down Expand Up @@ -632,9 +638,7 @@ static void checkCompatibility(PartitionSpec spec, Schema schema) {
// https://iceberg.apache.org/spec/#partition-transforms
// We don't care about the source type since a VoidTransform is always compatible and skip the
// checks
if (!transform.equals(Transforms.alwaysNull())) {
ValidationException.check(
sourceType != null, "Cannot find source column for partition field: %s", field);
if (sourceType != null && !transform.equals(Transforms.alwaysNull())) {
ValidationException.check(
sourceType.isPrimitiveType(),
"Cannot partition by non-primitive source field: %s",
Expand Down
3 changes: 2 additions & 1 deletion api/src/main/java/org/apache/iceberg/types/Type.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ enum TypeID {
STRUCT(StructLike.class),
LIST(List.class),
MAP(Map.class),
VARIANT(Object.class);
VARIANT(Object.class),
UNKNOWN(Object.class);

private final Class<?> javaClass;

Expand Down
19 changes: 19 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private Types() {}
.put(StringType.get().toString(), StringType.get())
.put(UUIDType.get().toString(), UUIDType.get())
.put(BinaryType.get().toString(), BinaryType.get())
.put(UnknownType.get().toString(), UnknownType.get())
.buildOrThrow();

private static final Pattern FIXED = Pattern.compile("fixed\\[\\s*(\\d+)\\s*\\]");
Expand Down Expand Up @@ -412,6 +413,24 @@ public String toString() {
}
}

public static class UnknownType extends PrimitiveType {
private static final UnknownType INSTANCE = new UnknownType();

public static UnknownType get() {
return INSTANCE;
}

@Override
public TypeID typeId() {
return TypeID.UNKNOWN;
}

@Override
public String toString() {
return "unknown";
}
}

public static class VariantType implements Type {
private static final VariantType INSTANCE = new VariantType();

Expand Down
4 changes: 2 additions & 2 deletions api/src/test/java/org/apache/iceberg/types/TestTypes.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void fromPrimitiveString() {
assertThat(Types.fromPrimitiveString("Decimal(2,3)")).isEqualTo(Types.DecimalType.of(2, 3));

assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> Types.fromPrimitiveString("Unknown"))
.withMessageContaining("Unknown");
.isThrownBy(() -> Types.fromPrimitiveString("unknown-type"))
.withMessageContaining("unknown-type");
}
}
8 changes: 5 additions & 3 deletions core/src/main/java/org/apache/iceberg/Partitioning.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ public static StructType groupingKeyType(Schema schema, Collection<PartitionSpec
*/
public static StructType partitionType(Table table) {
Collection<PartitionSpec> specs = table.specs().values();
return buildPartitionProjectionType("table partition", specs, allFieldIds(specs));
return buildPartitionProjectionType(
"table partition", specs, allActiveFieldIds(table.schema(), specs));
}

/**
Expand Down Expand Up @@ -346,10 +347,11 @@ private static boolean compatibleTransforms(Transform<?, ?> t1, Transform<?, ?>
|| t2.equals(Transforms.alwaysNull());
}

// collects IDs of all partition field used across specs
private static Set<Integer> allFieldIds(Collection<PartitionSpec> specs) {
// collects IDs of all partition field used across specs that are in the current schema
private static Set<Integer> allActiveFieldIds(Schema schema, Collection<PartitionSpec> specs) {
return FluentIterable.from(specs)
.transformAndConcat(PartitionSpec::fields)
.filter(field -> schema.findField(field.sourceId()) != null)
.transform(PartitionField::fieldId)
.toSet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ static Schema toOption(Schema schema) {
Preconditions.checkArgument(
isOptionSchema(schema), "Union schemas are not supported: %s", schema);
return schema;
} else if (schema.getType() == Schema.Type.NULL) {
return schema;
} else {
return Schema.createUnion(NULL, schema);
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ abstract class TypeToSchema extends TypeUtil.SchemaVisitor<Schema> {
private static final Schema UUID_SCHEMA =
LogicalTypes.uuid().addToSchema(Schema.createFixed("uuid_fixed", null, null, 16));
private static final Schema BINARY_SCHEMA = Schema.create(Schema.Type.BYTES);
private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL);

static {
TIMESTAMP_SCHEMA.addProp(AvroSchemaUtil.ADJUST_TO_UTC_PROP, false);
Expand Down Expand Up @@ -243,6 +244,9 @@ public Schema primitive(Type.PrimitiveType primitive) {
null,
TypeUtil.decimalRequiredBytes(decimal.precision())));
break;
case UNKNOWN:
primitiveSchema = NULL_SCHEMA;
break;
default:
throw new UnsupportedOperationException("Unsupported type ID: " + primitive.typeId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.spark.sql.connector.catalog.CatalogManager;
Expand Down Expand Up @@ -530,26 +534,54 @@ public void testSparkTableAddDropPartitions() throws Exception {
}

@TestTemplate
public void testDropColumnOfOldPartitionFieldV1() {
// default table created in v1 format
public void testDropColumnOfOldPartitionField() {
sql(
"CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts) TBLPROPERTIES('format-version' = '1')",
tableName);
"CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts) TBLPROPERTIES('format-version' = %d)",
tableName, formatVersion);

sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)", tableName);

sql("ALTER TABLE %s DROP COLUMN day_of_ts", tableName);
}

@TestTemplate
public void testDropColumnOfOldPartitionFieldV2() {
private void runCreateAndDropPartitionField(
String column, String partitionType, List<Object[]> expected, String predicate) {
sql("DROP TABLE IF EXISTS %s", tableName);
sql(
"CREATE TABLE %s (id bigint NOT NULL, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts) TBLPROPERTIES('format-version' = '2')",
tableName);

sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)", tableName);
"CREATE TABLE %s (col_int INTEGER, col_ts TIMESTAMP_NTZ, col_long BIGINT) USING ICEBERG TBLPROPERTIES ('format-version' = %d)",
tableName, formatVersion);
sql("INSERT INTO %s VALUES (1000, CAST('2024-03-01 19:25:00' as TIMESTAMP), 2100)", tableName);
sql("ALTER TABLE %s ADD PARTITION FIELD %s AS col2_partition", tableName, partitionType);
sql("INSERT INTO %s VALUES (2000, CAST('2024-04-01 19:25:00' as TIMESTAMP), 2200)", tableName);
sql("ALTER TABLE %s DROP PARTITION FIELD col2_partition", tableName);
sql("INSERT INTO %s VALUES (3000, CAST('2024-05-01 19:25:00' as TIMESTAMP), 2300)", tableName);
sql("ALTER TABLE %s DROP COLUMN %s", tableName, column);

assertEquals(
"Should return correct data",
expected,
sql("SELECT * FROM %s WHERE %s ORDER BY col_int", tableName, predicate));
}

sql("ALTER TABLE %s DROP COLUMN day_of_ts", tableName);
@TestTemplate
public void testDropPartitionAndUnderlyingField() {
String predicateLong = "col_ts >= '2024-04-01 19:25:00'";
List<Object[]> expectedLong =
Lists.newArrayList(
new Object[] {2000, LocalDateTime.ofEpochSecond(1711999500, 0, ZoneOffset.UTC)},
new Object[] {3000, LocalDateTime.ofEpochSecond(1714591500, 0, ZoneOffset.UTC)});
runCreateAndDropPartitionField("col_long", "col_long", expectedLong, predicateLong);
runCreateAndDropPartitionField(
"col_long", "truncate(2, col_long)", expectedLong, predicateLong);
runCreateAndDropPartitionField("col_long", "bucket(16, col_long)", expectedLong, predicateLong);

String predicateTs = "col_long >= 2200";
List<Object[]> expectedTs =
Lists.newArrayList(new Object[] {2000, 2200L}, new Object[] {3000, 2300L});
runCreateAndDropPartitionField("col_ts", "col_ts", expectedTs, predicateTs);
runCreateAndDropPartitionField("col_ts", "year(col_ts)", expectedTs, predicateTs);
runCreateAndDropPartitionField("col_ts", "month(col_ts)", expectedTs, predicateTs);
runCreateAndDropPartitionField("col_ts", "day(col_ts)", expectedTs, predicateTs);
}

private void assertPartitioningEquals(SparkTable table, int len, String transform) {
Expand Down