diff --git a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto index ff3515b11d2af..63a44c451c6c8 100644 --- a/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto +++ b/gluten-core/src/main/resources/substrait/proto/substrait/algebra.proto @@ -158,11 +158,11 @@ message ReadRel { message DeleteFile { FileContent fileContent = 1; string filePath = 2; - uint64 fileSize = 5; - uint64 recordCount = 6; + uint64 fileSize = 3; + uint64 recordCount = 4; oneof file_format { - ParquetReadOptions parquet = 7; - OrcReadOptions orc = 8; + ParquetReadOptions parquet = 5; + OrcReadOptions orc = 6; } } oneof file_format { diff --git a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala index 0eee5f629c95f..c70525f406cbd 100644 --- a/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala +++ b/gluten-iceberg/src/test/scala/io/glutenproject/execution/VeloxIcebergSuite.scala @@ -58,7 +58,49 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite { } } - test("iceberg read mor table") { + test("iceberg partitioned table") { + withTable("p_str_tb", "p_int_tb") { + // Partition key of string type. + withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") { + // Gluten does not support write iceberg table. + spark.sql( + """ + |create table p_str_tb(id int, name string, p string) using iceberg partitioned by (p); + |""".stripMargin) + spark.sql( + """ + |insert into table p_str_tb values(1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'); + |""".stripMargin + ) + } + runQueryAndCompare(""" + |select * from p_str_tb; + |""".stripMargin) { + checkOperatorMatch[IcebergScanTransformer] + } + + // Partition key of integer type. + withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") { + // Gluten does not support write iceberg table. + spark.sql( + """ + |create table p_int_tb(id int, name string, p int) using iceberg partitioned by (p); + |""".stripMargin) + spark.sql( + """ + |insert into table p_int_tb values(1, 'a1', 1), (2, 'a2', 1), (3, 'a3', 2); + |""".stripMargin + ) + } + runQueryAndCompare(""" + |select * from p_int_tb; + |""".stripMargin) { + checkOperatorMatch[IcebergScanTransformer] + } + } + } + + test("iceberg read mor table - delete and update") { withTable("iceberg_mor_tb") { withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") { spark.sql(""" @@ -75,17 +117,32 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite { |) |partitioned by (p); |""".stripMargin) + // Insert some test rows. spark.sql(""" |insert into table iceberg_mor_tb - |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'); + |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'), + | (4, 'a4', 'p1'), (5, 'a5', 'p2'), (6, 'a6', 'p1'); |""".stripMargin) + // Delete row. spark.sql( """ |delete from iceberg_mor_tb where name = 'a1'; |""".stripMargin ) + // Update row. + spark.sql( + """ + |update iceberg_mor_tb set name = 'new_a2' where id = 'a2'; + |""".stripMargin + ) + // Delete row again. + spark.sql( + """ + |delete from iceberg_mor_tb where id = 6; + |""".stripMargin + ) } runQueryAndCompare(""" |select * from iceberg_mor_tb; @@ -95,42 +152,70 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite { } } - test("iceberg partitioned table") { - withTable("p_str_tb", "p_int_tb") { - // Partition key of string type. + test("iceberg read mor table - merge into") { + withTable("iceberg_mor_tb", "merge_into_source_tb") { withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") { - // Gluten does not support write iceberg table. - spark.sql( - """ - |create table p_str_tb(id int, name string, p string) using iceberg partitioned by (p); - |""".stripMargin) + spark.sql(""" + |create table iceberg_mor_tb ( + | id int, + | name string, + | p string + |) using iceberg + |tblproperties ( + | 'format-version' = '2', + | 'write.delete.mode' = 'merge-on-read', + | 'write.update.mode' = 'merge-on-read', + | 'write.merge.mode' = 'merge-on-read' + |) + |partitioned by (p); + |""".stripMargin) + spark.sql(""" + |create table merge_into_source_tb ( + | id int, + | name string, + | p string + |) using iceberg; + |""".stripMargin) + + // Insert some test rows. + spark.sql(""" + |insert into table iceberg_mor_tb + |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'); + |""".stripMargin) + spark.sql(""" + |insert into table merge_into_source_tb + |values (1, 'a1_1', 'p2'), (2, 'a2_1', 'p2'), (3, 'a3_1', 'p1'), + | (4, 'a4', 'p2'), (5, 'a5', 'p1'), (6, 'a6', 'p2'); + |""".stripMargin) + + // Delete row. spark.sql( """ - |insert into table p_str_tb values(1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'); + |delete from iceberg_mor_tb where name = 'a1'; |""".stripMargin ) - } - runQueryAndCompare(""" - |select * from p_str_tb; - |""".stripMargin) { - checkOperatorMatch[IcebergScanTransformer] - } - - // Partition key of integer type. - withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") { - // Gluten does not support write iceberg table. + // Update row. spark.sql( """ - |create table p_int_tb(id int, name string, p int) using iceberg partitioned by (p); - |""".stripMargin) + |update iceberg_mor_tb set name = 'new_a2' where id = 'a2'; + |""".stripMargin + ) + + // Merge into. spark.sql( """ - |insert into table p_int_tb values(1, 'a1', 1), (2, 'a2', 1), (3, 'a3', 2); + |merge into iceberg_mor_tb t + |using (select * from merge_into_source_tb) s + |on t.id = s.id + |when matched then + | update set t.name = s.name, t.p = s.p + |when not matched then + | insert (id, name, p) values (s.id, s.name, s.p); |""".stripMargin ) } runQueryAndCompare(""" - |select * from p_int_tb; + |select * from iceberg_mor_tb; |""".stripMargin) { checkOperatorMatch[IcebergScanTransformer] }