Skip to content

Commit

Permalink
Add more test case
Browse files Browse the repository at this point in the history
  • Loading branch information
liujiayi771 committed Mar 12, 2024
1 parent d52d863 commit 032bf3e
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,11 @@ message ReadRel {
message DeleteFile {
FileContent fileContent = 1;
string filePath = 2;
uint64 fileSize = 5;
uint64 recordCount = 6;
uint64 fileSize = 3;
uint64 recordCount = 4;
oneof file_format {
ParquetReadOptions parquet = 7;
OrcReadOptions orc = 8;
ParquetReadOptions parquet = 5;
OrcReadOptions orc = 6;
}
}
oneof file_format {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,49 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
}
}

test("iceberg read mor table") {
test("iceberg partitioned table") {
withTable("p_str_tb", "p_int_tb") {
// Partition key of string type.
withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") {
// Gluten does not support write iceberg table.
spark.sql(
"""
|create table p_str_tb(id int, name string, p string) using iceberg partitioned by (p);
|""".stripMargin)
spark.sql(
"""
|insert into table p_str_tb values(1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2');
|""".stripMargin
)
}
runQueryAndCompare("""
|select * from p_str_tb;
|""".stripMargin) {
checkOperatorMatch[IcebergScanTransformer]
}

// Partition key of integer type.
withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") {
// Gluten does not support write iceberg table.
spark.sql(
"""
|create table p_int_tb(id int, name string, p int) using iceberg partitioned by (p);
|""".stripMargin)
spark.sql(
"""
|insert into table p_int_tb values(1, 'a1', 1), (2, 'a2', 1), (3, 'a3', 2);
|""".stripMargin
)
}
runQueryAndCompare("""
|select * from p_int_tb;
|""".stripMargin) {
checkOperatorMatch[IcebergScanTransformer]
}
}
}

test("iceberg read mor table - delete and update") {
withTable("iceberg_mor_tb") {
withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") {
spark.sql("""
Expand All @@ -75,17 +117,32 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
|)
|partitioned by (p);
|""".stripMargin)

// Insert some test rows.
spark.sql("""
|insert into table iceberg_mor_tb
|values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2');
|values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'),
| (4, 'a4', 'p1'), (5, 'a5', 'p2'), (6, 'a6', 'p1');
|""".stripMargin)

// Delete row.
spark.sql(
"""
|delete from iceberg_mor_tb where name = 'a1';
|""".stripMargin
)
// Update row.
spark.sql(
"""
|update iceberg_mor_tb set name = 'new_a2' where id = 'a2';
|""".stripMargin
)
// Delete row again.
spark.sql(
"""
|delete from iceberg_mor_tb where id = 6;
|""".stripMargin
)
}
runQueryAndCompare("""
|select * from iceberg_mor_tb;
Expand All @@ -95,42 +152,70 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
}
}

test("iceberg partitioned table") {
withTable("p_str_tb", "p_int_tb") {
// Partition key of string type.
test("iceberg read mor table - merge into") {
withTable("iceberg_mor_tb", "merge_into_source_tb") {
withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") {
// Gluten does not support write iceberg table.
spark.sql(
"""
|create table p_str_tb(id int, name string, p string) using iceberg partitioned by (p);
|""".stripMargin)
spark.sql("""
|create table iceberg_mor_tb (
| id int,
| name string,
| p string
|) using iceberg
|tblproperties (
| 'format-version' = '2',
| 'write.delete.mode' = 'merge-on-read',
| 'write.update.mode' = 'merge-on-read',
| 'write.merge.mode' = 'merge-on-read'
|)
|partitioned by (p);
|""".stripMargin)
spark.sql("""
|create table merge_into_source_tb (
| id int,
| name string,
| p string
|) using iceberg;
|""".stripMargin)

// Insert some test rows.
spark.sql("""
|insert into table iceberg_mor_tb
|values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2');
|""".stripMargin)
spark.sql("""
|insert into table merge_into_source_tb
|values (1, 'a1_1', 'p2'), (2, 'a2_1', 'p2'), (3, 'a3_1', 'p1'),
| (4, 'a4', 'p2'), (5, 'a5', 'p1'), (6, 'a6', 'p2');
|""".stripMargin)

// Delete row.
spark.sql(
"""
|insert into table p_str_tb values(1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2');
|delete from iceberg_mor_tb where name = 'a1';
|""".stripMargin
)
}
runQueryAndCompare("""
|select * from p_str_tb;
|""".stripMargin) {
checkOperatorMatch[IcebergScanTransformer]
}

// Partition key of integer type.
withSQLConf(GlutenConfig.GLUTEN_ENABLE_KEY -> "false") {
// Gluten does not support write iceberg table.
// Update row.
spark.sql(
"""
|create table p_int_tb(id int, name string, p int) using iceberg partitioned by (p);
|""".stripMargin)
|update iceberg_mor_tb set name = 'new_a2' where id = 'a2';
|""".stripMargin
)

// Merge into.
spark.sql(
"""
|insert into table p_int_tb values(1, 'a1', 1), (2, 'a2', 1), (3, 'a3', 2);
|merge into iceberg_mor_tb t
|using (select * from merge_into_source_tb) s
|on t.id = s.id
|when matched then
| update set t.name = s.name, t.p = s.p
|when not matched then
| insert (id, name, p) values (s.id, s.name, s.p);
|""".stripMargin
)
}
runQueryAndCompare("""
|select * from p_int_tb;
|select * from iceberg_mor_tb;
|""".stripMargin) {
checkOperatorMatch[IcebergScanTransformer]
}
Expand Down

0 comments on commit 032bf3e

Please sign in to comment.