Skip to content

Commit

Permalink
Spark 3.5: Display write metrics on SQL UI
Browse files Browse the repository at this point in the history
  • Loading branch information
manuzhang committed Oct 18, 2024
1 parent 9d58865 commit 99d4b65
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 1 deletion.
7 changes: 6 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.metrics.LoggingMetricsReporter;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.metrics.MetricsReporters;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
Expand All @@ -41,7 +42,7 @@
public class BaseTable implements Table, HasTableOperations, Serializable {
private final TableOperations ops;
private final String name;
private final MetricsReporter reporter;
private MetricsReporter reporter;

public BaseTable(TableOperations ops, String name) {
this(ops, name, LoggingMetricsReporter.instance());
Expand All @@ -54,6 +55,10 @@ public BaseTable(TableOperations ops, String name, MetricsReporter reporter) {
this.reporter = reporter;
}

public void combineMetricsReporter(MetricsReporter metricsReporter) {
this.reporter = MetricsReporters.combine(this.reporter, metricsReporter);
}

MetricsReporter reporter() {
return reporter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,11 @@ public ScanReport scanReport() {
"Metrics report is not a scan report");
return (ScanReport) metricsReport;
}

public CommitReport commitReport() {
Preconditions.checkArgument(
metricsReport == null || metricsReport instanceof CommitReport,
"Metrics report is not a commit report");
return (CommitReport) metricsReport;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
Expand All @@ -51,18 +53,23 @@
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.PartitioningWriter;
import org.apache.iceberg.io.RollingDataWriter;
import org.apache.iceberg.metrics.CommitReport;
import org.apache.iceberg.metrics.CounterResult;
import org.apache.iceberg.metrics.InMemoryMetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.SparkWriteConf;
import org.apache.iceberg.spark.SparkWriteRequirements;
import org.apache.iceberg.spark.source.metrics.TotalDataFiles;
import org.apache.iceberg.util.DataFileSet;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.executor.OutputMetrics;
import org.apache.spark.sql.MetricsUtils;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.distributions.Distribution;
Expand Down Expand Up @@ -103,6 +110,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
private final Map<String, String> writeProperties;

private boolean cleanupOnAbort = false;
private InMemoryMetricsReporter metricsReporter;

SparkWrite(
SparkSession spark,
Expand Down Expand Up @@ -130,6 +138,11 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
this.writeRequirements = writeRequirements;
this.outputSpecId = writeConf.outputSpecId();
this.writeProperties = writeConf.writeProperties();

if (this.table instanceof BaseTable) {
this.metricsReporter = new InMemoryMetricsReporter();
((BaseTable) this.table).combineMetricsReporter(metricsReporter);
}
}

@Override
Expand Down Expand Up @@ -231,6 +244,7 @@ private void commitOperation(SnapshotUpdate<?> operation, String description) {
try {
long start = System.currentTimeMillis();
operation.commit(); // abort is automatically called if this fails
postDriverMetrics();
long duration = System.currentTimeMillis() - start;
LOG.info("Committed in {} ms", duration);
} catch (Exception e) {
Expand Down Expand Up @@ -260,6 +274,20 @@ private List<DataFile> files(WriterCommitMessage[] messages) {
return files;
}

private void postDriverMetrics() {
if (metricsReporter != null) {
CommitReport commitReport = metricsReporter.commitReport();
if (commitReport != null) {
TotalDataFiles totalDataFiles = new TotalDataFiles();
CounterResult counterResult = commitReport.commitMetrics().totalDataFiles();
if (counterResult != null) {
MetricsUtils.postDriverMetrics(
sparkContext.sc(), Collections.singletonMap(totalDataFiles, counterResult.value()));
}
}
}
}

@Override
public String toString() {
return String.format("IcebergWrite(table=%s, format=%s)", table, format);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.spark.SparkWriteConf;
import org.apache.iceberg.spark.SparkWriteRequirements;
import org.apache.iceberg.spark.source.metrics.TotalDataFiles;
import org.apache.iceberg.types.TypeUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.metric.CustomMetric;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.write.BatchWrite;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
Expand Down Expand Up @@ -158,6 +160,11 @@ public StreamingWrite toStreaming() {
return asStreamingAppend();
}
}

@Override
public CustomMetric[] supportedCustomMetrics() {
return new CustomMetric[] {new TotalDataFiles()};
}
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.source.metrics;

import org.apache.spark.sql.connector.metric.CustomSumMetric;

public class TotalDataFiles extends CustomSumMetric {

static final String NAME = "totalDataFiles";

@Override
public String name() {
return NAME;
}

@Override
public String description() {
return "number of total data files";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.spark.sql

import org.apache.spark.SparkContext
import org.apache.spark.sql.connector.metric.CustomMetric
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.metric.SQLMetrics
import scala.collection.JavaConverters

object MetricsUtils {

def postDriverMetrics(sparkContext: SparkContext, metricValues: java.util.Map[CustomMetric, Long]): Unit = {
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
val metrics = SQLExecution.getQueryExecution(executionId.toLong).executedPlan.metrics
val sqlMetrics = JavaConverters.mapAsScalaMap(metricValues).map { case (metric, value) =>
val sqlMetric = metrics(metric.name)
sqlMetric.set(value)
sqlMetric
}.toSeq
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, sqlMetrics)
}
}

0 comments on commit 99d4b65

Please sign in to comment.