Skip to content

Commit

Permalink
Spark 3.5: Use FileGenerationUtil in PlanningBenchmark (#11027)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Aug 29, 2024
1 parent 6c79640 commit 9c344f9
Showing 1 changed file with 26 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,49 +20,41 @@

import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
import static org.apache.iceberg.PlanningMode.LOCAL;
import static org.apache.spark.sql.functions.lit;

import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.BatchScan;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.FileGenerationUtil;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SparkDistributedDataScan;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.data.RandomData;
import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.parser.ParseException;
import org.apache.spark.sql.types.StructType;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand Down Expand Up @@ -108,10 +100,8 @@ public class PlanningBenchmark {
Expressions.and(PARTITION_PREDICATE, SORT_KEY_PREDICATE);

private static final int NUM_PARTITIONS = 30;
private static final int NUM_REAL_DATA_FILES_PER_PARTITION = 25;
private static final int NUM_REPLICA_DATA_FILES_PER_PARTITION = 50_000;
private static final int NUM_DATA_FILES_PER_PARTITION = 50_000;
private static final int NUM_DELETE_FILES_PER_PARTITION = 50;
private static final int NUM_ROWS_PER_DATA_FILE = 500;

private final Configuration hadoopConf = new Configuration();
private SparkSession spark;
Expand Down Expand Up @@ -285,99 +275,43 @@ private void dropTable() {
sql("DROP TABLE IF EXISTS %s PURGE", TABLE_NAME);
}

private DataFile loadAddedDataFile() {
table.refresh();

Iterable<DataFile> dataFiles = table.currentSnapshot().addedDataFiles(table.io());
return Iterables.getOnlyElement(dataFiles);
}

private DeleteFile loadAddedDeleteFile() {
table.refresh();

Iterable<DeleteFile> deleteFiles = table.currentSnapshot().addedDeleteFiles(table.io());
return Iterables.getOnlyElement(deleteFiles);
}

private void initDataAndDeletes() throws NoSuchTableException {
Schema schema = table.schema();
PartitionSpec spec = table.spec();
LocationProvider locations = table.locationProvider();

private void initDataAndDeletes() {
for (int partitionOrdinal = 0; partitionOrdinal < NUM_PARTITIONS; partitionOrdinal++) {
Dataset<Row> inputDF =
randomDataDF(schema, NUM_ROWS_PER_DATA_FILE)
.drop(PARTITION_COLUMN)
.withColumn(PARTITION_COLUMN, lit(partitionOrdinal))
.drop(SORT_KEY_COLUMN)
.withColumn(SORT_KEY_COLUMN, lit(Integer.MIN_VALUE));

for (int fileOrdinal = 0; fileOrdinal < NUM_REAL_DATA_FILES_PER_PARTITION; fileOrdinal++) {
appendAsFile(inputDF);
}
StructLike partition = TestHelpers.Row.of(partitionOrdinal);

DataFile dataFile = loadAddedDataFile();

sql(
"DELETE FROM %s WHERE ss_item_sk IS NULL AND %s = %d",
TABLE_NAME, PARTITION_COLUMN, partitionOrdinal);

DeleteFile deleteFile = loadAddedDeleteFile();

AppendFiles append = table.newFastAppend();
RowDelta rowDelta = table.newRowDelta();

for (int fileOrdinal = 0; fileOrdinal < NUM_REPLICA_DATA_FILES_PER_PARTITION; fileOrdinal++) {
String replicaFileName = UUID.randomUUID() + "-replica.parquet";
DataFile replicaDataFile =
DataFiles.builder(spec)
.copy(dataFile)
.withPath(locations.newDataLocation(spec, dataFile.partition(), replicaFileName))
.build();
append.appendFile(replicaDataFile);
for (int fileOrdinal = 0; fileOrdinal < NUM_DATA_FILES_PER_PARTITION; fileOrdinal++) {
DataFile dataFile = generateDataFile(partition, Integer.MIN_VALUE, Integer.MIN_VALUE);
rowDelta.addRows(dataFile);
}

append.commit();

RowDelta rowDelta = table.newRowDelta();
// add one data file that would match the sort key predicate
DataFile sortKeyDataFile = generateDataFile(partition, SORT_KEY_VALUE, SORT_KEY_VALUE);
rowDelta.addRows(sortKeyDataFile);

for (int fileOrdinal = 0; fileOrdinal < NUM_DELETE_FILES_PER_PARTITION; fileOrdinal++) {
String replicaFileName = UUID.randomUUID() + "-replica.parquet";
DeleteFile replicaDeleteFile =
FileMetadata.deleteFileBuilder(spec)
.copy(deleteFile)
.withPath(locations.newDataLocation(spec, deleteFile.partition(), replicaFileName))
.build();
rowDelta.addDeletes(replicaDeleteFile);
DeleteFile deleteFile = FileGenerationUtil.generatePositionDeleteFile(table, partition);
rowDelta.addDeletes(deleteFile);
}

rowDelta.commit();

Dataset<Row> sortedInputDF =
randomDataDF(schema, NUM_ROWS_PER_DATA_FILE)
.drop(SORT_KEY_COLUMN)
.withColumn(SORT_KEY_COLUMN, lit(SORT_KEY_VALUE))
.drop(PARTITION_COLUMN)
.withColumn(PARTITION_COLUMN, lit(partitionOrdinal));
appendAsFile(sortedInputDF);
}
}

private void appendAsFile(Dataset<Row> df) throws NoSuchTableException {
df.coalesce(1).writeTo(TABLE_NAME).append();
private DataFile generateDataFile(StructLike partition, int sortKeyMin, int sortKeyMax) {
int sortKeyFieldId = table.schema().findField(SORT_KEY_COLUMN).fieldId();
ByteBuffer lower = Conversions.toByteBuffer(Types.IntegerType.get(), sortKeyMin);
Map<Integer, ByteBuffer> lowerBounds = ImmutableMap.of(sortKeyFieldId, lower);
ByteBuffer upper = Conversions.toByteBuffer(Types.IntegerType.get(), sortKeyMax);
Map<Integer, ByteBuffer> upperBounds = ImmutableMap.of(sortKeyFieldId, upper);
return FileGenerationUtil.generateDataFile(table, partition, lowerBounds, upperBounds);
}

private String newWarehouseDir() {
return hadoopConf.get("hadoop.tmp.dir") + UUID.randomUUID();
}

private Dataset<Row> randomDataDF(Schema schema, int numRows) {
Iterable<InternalRow> rows = RandomData.generateSpark(schema, numRows, 0);
JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<InternalRow> rowRDD = context.parallelize(Lists.newArrayList(rows));
StructType rowSparkType = SparkSchemaUtil.convert(schema);
return spark.internalCreateDataFrame(JavaRDD.toRDD(rowRDD), rowSparkType, false);
}

private List<ScanTask> planFilesWithoutColumnStats(BatchScan scan, Expression predicate) {
return planFiles(scan, predicate, false);
}
Expand Down

0 comments on commit 9c344f9

Please sign in to comment.