diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java index bc60759bd038..3fd760c67c4a 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java @@ -436,4 +436,18 @@ public void testNetChangesWithComputeUpdates() { .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("Not support net changes with update images"); } + + @TestTemplate + public void testUpdateWithInComparableType() { + sql( + "CREATE TABLE %s (id INT NOT NULL, data MAP, age INT) USING iceberg", + tableName); + + assertThatThrownBy( + () -> + sql("CALL %s.system.create_changelog_view(table => '%s')", catalogName, tableName)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Identifier field is required as table contains unorderable columns: [data]"); + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java index b4594d91c0ef..ae77b69133f3 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Predicate; +import java.util.stream.Collectors; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -37,6 +38,7 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.OrderUtils; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter; @@ -146,10 +148,21 @@ public InternalRow[] call(InternalRow args) { Dataset df = loadRows(changelogTableIdent, options(input)); boolean netChanges = input.asBoolean(NET_CHANGES, false); + String[] identifierColumns = identifierColumns(input, tableIdent); + Set unorderableColumnNames = + Arrays.stream(df.schema().fields()) + .filter(field -> !OrderUtils.isOrderable(field.dataType())) + .map(StructField::name) + .collect(Collectors.toSet()); + + Preconditions.checkArgument( + identifierColumns.length > 0 || unorderableColumnNames.isEmpty(), + "Identifier field is required as table contains unorderable columns: %s", + unorderableColumnNames); if (shouldComputeUpdateImages(input)) { Preconditions.checkArgument(!netChanges, "Not support net changes with update images"); - df = computeUpdateImages(identifierColumns(input, tableIdent), df); + df = computeUpdateImages(identifierColumns, df); } else { df = removeCarryoverRows(df, netChanges); }