diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index f4b2be343cb3..d8bbd84081e1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -2154,6 +2154,10 @@ public boolean isCompactionLogBlockMetricsOn() { return metricsConfig.isCompactionLogBlockMetricsOn(); } + public boolean isPartitionLevelMetricsOn() { + return getBoolean(HoodieMetricsConfig.PARTITION_LEVEL_METRICS_ENABLE); + } + public boolean isExecutorMetricsEnabled() { return metricsConfig.isExecutorMetricsEnabled(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java index d30f3491a223..8e8c21776ce2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java @@ -59,6 +59,25 @@ public class HoodieMetrics { public static final String DURATION_STR = "duration"; public static final String SOURCE_READ_AND_INDEX_ACTION = "source_read_and_index"; + /* --------------- Partition Level Metrics --------------- */ + public static final String LOG_RECORDS_SCAN_TIME = "logRecordsScanTime"; + public static final String CREATE_OR_UPSERT_TIME = "createTimeOrUpsertTime"; + // Operation time means the time taken to perform the operation on the partition, equals to the sum of createTime/updateTime and logRecordsScanTime + public static final String OPERATION_TIME = "operationTime"; + public static final String WRITE_RECORDS_NUM = "writeRecordsNum"; + public static final String UPDATE_RECORDS_NUM = "updateRecordsNum"; + public static final String INSERT_RECORDS_NUM = "insertRecordsNum"; + public static final String DELETE_RECORDS_NUM = "deleteRecordsNum"; + public static final String BYTES_WRITTEN = "bytesWritten"; + // The ratio of spilled log records to total log records + public static final String LOG_RECORDS_SPILL_PERCENT = "logRecordsSpillPercent"; + public static final String MAX_MEMORY_FOR_COMPACTION = "maxMemoryForCompaction"; + + + public static final String PARTITION_LEVEL_PREFIX = "partition="; + public static final String UN_PARTITIONED_TABLE_METRIC_NAME = "UN_PARTITIONED_METRICS"; + /* --------------- Partition Level Metrics --------------- */ + private Metrics metrics; // Some timers public String rollbackTimerName = null; @@ -261,6 +280,22 @@ public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, Hoo metrics.registerGauge(getMetricsName(actionType, TOTAL_CORRUPTED_LOG_BLOCKS_STR), totalCorruptedLogBlocks); metrics.registerGauge(getMetricsName(actionType, TOTAL_ROLLBACK_LOG_BLOCKS_STR), totalRollbackLogBlocks); } + if (config.isPartitionLevelMetricsOn()) { + metadata.getPartitionToWriteStats().values().stream().flatMap(stats -> stats.stream()).forEach(stat -> { + metrics.registerHistogram(getPartitionMetricsName(actionType, stat.getPartitionPath(), LOG_RECORDS_SCAN_TIME), stat.getRuntimeStats().getTotalScanTime()); + metrics.registerHistogram(getPartitionMetricsName(actionType, stat.getPartitionPath(), CREATE_OR_UPSERT_TIME), + stat.getRuntimeStats().getTotalCreateTime() + stat.getRuntimeStats().getTotalUpsertTime()); + metrics.registerHistogram(getPartitionMetricsName(actionType, stat.getPartitionPath(), OPERATION_TIME), + stat.getRuntimeStats().getTotalScanTime() + stat.getRuntimeStats().getTotalCreateTime() + stat.getRuntimeStats().getTotalUpsertTime()); + metrics.registerHistogram(getPartitionMetricsName(actionType, stat.getPartitionPath(), WRITE_RECORDS_NUM), stat.getNumWrites()); + metrics.registerHistogram(getPartitionMetricsName(actionType, stat.getPartitionPath(), UPDATE_RECORDS_NUM), stat.getNumUpdateWrites()); + metrics.registerHistogram(getPartitionMetricsName(actionType, stat.getPartitionPath(), INSERT_RECORDS_NUM), stat.getNumInserts()); + metrics.registerHistogram(getPartitionMetricsName(actionType, stat.getPartitionPath(), DELETE_RECORDS_NUM), stat.getNumDeletes()); + metrics.registerHistogram(getPartitionMetricsName(actionType, stat.getPartitionPath(), BYTES_WRITTEN), stat.getTotalWriteBytes()); + metrics.registerHistogram(getPartitionMetricsName(actionType, stat.getPartitionPath(), LOG_RECORDS_SPILL_PERCENT), (long) (stat.getRuntimeStats().getLogRecordsSpillRatio() * 100)); + metrics.registerHistogram(getPartitionMetricsName(actionType, stat.getPartitionPath(), MAX_MEMORY_FOR_COMPACTION), stat.getRuntimeStats().getMaxMemoryForCompaction()); + }); + } } } @@ -334,6 +369,11 @@ public String getMetricsName(String action, String metric) { } } + public String getPartitionMetricsName(String action, String partition, String metric) { + partition = StringUtils.isNullOrEmpty(partition) ? UN_PARTITIONED_TABLE_METRIC_NAME : partition; + return config == null ? null : String.format("%s.%s.%s.%s", config.getMetricReporterMetricsNamePrefix(), action, PARTITION_LEVEL_PREFIX + partition, metric); + } + public void updateClusteringFileCreationMetrics(long durationInMs) { reportMetrics("replacecommit", "fileCreationTime", durationInMs); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java index 9defec99c38e..7abb96a294d5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java @@ -256,8 +256,10 @@ public List compact(HoodieCompactionHandler compactionHandler, if (stat.getRuntimeStats() != null) { runtimeStats.setTotalCreateTime(stat.getRuntimeStats().getTotalCreateTime()); runtimeStats.setTotalUpsertTime(stat.getRuntimeStats().getTotalUpsertTime()); + runtimeStats.setLogRecordsSpillRatio(scanner.getSpillRatio()); + runtimeStats.setMaxMemoryForCompaction(maxMemoryPerCompaction); } - stat.setRuntimeStats(runtimeStats); + s.getStat().setRuntimeStats(runtimeStats); }).collect(toList()); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java index c156997441b8..ac307a7a6c53 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieConsoleMetrics.java @@ -50,6 +50,7 @@ public void start() { when(writeConfig.isMetricsOn()).thenReturn(true); when(metricsConfig.getMetricsReporterType()).thenReturn(MetricsReporterType.CONSOLE); when(metricsConfig.getBasePath()).thenReturn("s3://test" + UUID.randomUUID()); + when(metricsConfig.getMetricReporterMetricsNamePrefix()).thenReturn("hoodie"); hoodieMetrics = new HoodieMetrics(writeConfig, HoodieTestUtils.getDefaultStorage()); metrics = hoodieMetrics.getMetrics(); } @@ -64,4 +65,12 @@ public void testRegisterGauge() { metrics.registerGauge("metric1", 123L); assertEquals("123", metrics.getRegistry().getGauges().get("metric1").getValue().toString()); } + + @Test + public void testRegisterHistogram() { + for (int i = 0; i < 10; i++) { + metrics.registerHistogram("histogram1", i); + } + assertEquals(10, metrics.getRegistry().getHistograms().get("histogram1").getCount()); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java index 3c98a510317d..bf73ad9ae4a9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java @@ -423,6 +423,16 @@ public static class RuntimeStats implements Serializable { */ private long totalCreateTime; + /** + * Ratio of log records spilled to total log records. + */ + private double logRecordsSpillRatio; + + /** + * Max memory used for compaction. + */ + private long maxMemoryForCompaction; + public long getTotalScanTime() { return totalScanTime; } @@ -446,5 +456,21 @@ public long getTotalCreateTime() { public void setTotalCreateTime(long totalCreateTime) { this.totalCreateTime = totalCreateTime; } + + public double getLogRecordsSpillRatio() { + return logRecordsSpillRatio; + } + + public void setLogRecordsSpillRatio(double logRecordsSpillRatio) { + this.logRecordsSpillRatio = logRecordsSpillRatio; + } + + public long getMaxMemoryForCompaction() { + return maxMemoryForCompaction; + } + + public void setMaxMemoryForCompaction(long maxMemoryForCompaction) { + this.maxMemoryForCompaction = maxMemoryForCompaction; + } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index 9106f6b0de5f..3876179d9b21 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -89,6 +89,8 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader private final long maxMemorySizeInBytes; // Stores the total time taken to perform reading and merging of log blocks private long totalTimeTakenToReadAndMergeBlocks; + // ratio of records to be spilled to disk + private double spillRatio; @SuppressWarnings("unchecked") private HoodieMergedLogRecordScanner(HoodieStorage storage, String basePath, List logFilePaths, Schema readerSchema, @@ -205,6 +207,7 @@ private void performScan() { this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer(); this.numMergedRecordsInLog = records.size(); + this.spillRatio = records.getInDiskRecordsNumRatio(); if (LOG.isInfoEnabled()) { LOG.info("Number of log files scanned => {}", logFilePaths.size()); @@ -213,6 +216,7 @@ private void performScan() { LOG.info("Total size in bytes of MemoryBasedMap in ExternalSpillableMap => {}", records.getCurrentInMemoryMapSize()); LOG.info("Number of entries in DiskBasedMap in ExternalSpillableMap => {}", records.getDiskBasedMapNumEntries()); LOG.info("Size of file spilled to disk => {}", records.getSizeOfFileOnDiskInBytes()); + LOG.info("Spill ratio => " + this.spillRatio); } } @@ -306,6 +310,10 @@ public long getTotalTimeTakenToReadAndMergeBlocks() { return totalTimeTakenToReadAndMergeBlocks; } + public double getSpillRatio() { + return spillRatio; + } + @Override public void close() { if (records != null) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java index 3d5fd1d57542..2829905be23c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java @@ -155,6 +155,10 @@ public int getInMemoryMapNumEntries() { return inMemoryMap.size(); } + public double getInDiskRecordsNumRatio() { + return (double) getDiskBasedMap().size() / (inMemoryMap.size() + getDiskBasedMap().size()); + } + /** * Approximate memory footprint of the in-memory map. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java b/hudi-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java index 6ad389c05d7f..d11907536f8a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java @@ -120,6 +120,12 @@ public class HoodieMetricsConfig extends HoodieConfig { .sinceVersion("0.14.0") .withDocumentation("Turn on/off metrics reporting for log blocks with compaction commit. off by default."); + public static final ConfigProperty PARTITION_LEVEL_METRICS_ENABLE = ConfigProperty + .key(METRIC_PREFIX + ".partition.level.metrics.enable") + .defaultValue(false) + .sinceVersion("0.15.0") + .withDocumentation("Enable partition level metrics. off by default."); + /** * @deprecated Use {@link #TURN_METRICS_ON} and its methods instead */ @@ -403,6 +409,11 @@ public Builder withLockingMetrics(boolean enable) { return this; } + public Builder withPartitionLevelMetrics(boolean enable) { + hoodieMetricsConfig.setValue(PARTITION_LEVEL_METRICS_ENABLE, String.valueOf(enable)); + return this; + } + public HoodieMetricsConfig build() { hoodieMetricsConfig.setDefaults(HoodieMetricsConfig.class.getName()); diff --git a/hudi-common/src/main/java/org/apache/hudi/metrics/HoodieHistogram.java b/hudi-common/src/main/java/org/apache/hudi/metrics/HoodieHistogram.java new file mode 100644 index 000000000000..dc4c6a793b7d --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/metrics/HoodieHistogram.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metrics; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Reservoir; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.UniformSnapshot; + +import java.util.ArrayList; +import java.util.List; + +public class HoodieHistogram extends Histogram { + + public HoodieHistogram() { + super(new HoodieUnlimitedReservoir()); + } + + /** + * UNSAFE: This class will cache all measurements in memory and may cause OOM if the number of measurements is too large. + */ + private static class HoodieUnlimitedReservoir implements Reservoir { + + private final List measurements = new ArrayList<>(); + + @Override + public int size() { + return measurements.size(); + } + + @Override + public void update(long value) { + measurements.add(value); + } + + @Override + public Snapshot getSnapshot() { + return new UniformSnapshot(measurements); + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-common/src/main/java/org/apache/hudi/metrics/Metrics.java index 441c0b5b002f..2c3f02db850d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metrics/Metrics.java +++ b/hudi-common/src/main/java/org/apache/hudi/metrics/Metrics.java @@ -175,6 +175,15 @@ public Option> registerGauge(String metricName) { return registerGauge(metricName, 0); } + public void registerHistograms(Map metricsMap, Option prefix) { + String metricPrefix = prefix.isPresent() ? prefix.get() + "." : ""; + metricsMap.forEach((k, v) -> registerHistogram(metricPrefix + k, v)); + } + + public void registerHistogram(String metricName, long value) { + registry.histogram(metricName, () -> new HoodieHistogram()).update(value); + } + public MetricRegistry getRegistry() { return registry; } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCompactionTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCompactionTable.scala index 31948c3298da..e6e1b857636a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCompactionTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCompactionTable.scala @@ -43,6 +43,9 @@ class TestCompactionTable extends HoodieSparkSqlTestBase { // disable automatic inline compaction spark.sql("set hoodie.compact.inline=false") spark.sql("set hoodie.compact.schedule.inline=false") + spark.sql("set hoodie.metrics.on=true") + spark.sql("set hoodie.metrics.reporter.type=CONSOLE") + spark.sql("set hoodie.metrics.partition.level.metrics.enable=true") spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")