Skip to content

Commit

Permalink
[GLUTEN-4835][CORE] Match metric names with Spark (#4834)
Browse files Browse the repository at this point in the history
  • Loading branch information
Chungmin Lee authored Mar 11, 2024
1 parent b5b9498 commit cfa9afa
Show file tree
Hide file tree
Showing 32 changed files with 165 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
"iterReadTime" -> SQLMetrics.createTimingMetric(
sparkContext,
"time of reading from iterator"),
"inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"fillingRightJoinSideTime" -> SQLMetrics.createTimingMetric(
sparkContext,
"filling right join side time")
Expand All @@ -59,12 +59,12 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {

override def genBatchScanTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input vectors"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"),
"rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"),
"rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"),
"outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"),
Expand All @@ -79,12 +79,12 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
override def genHiveTableScanTransformerMetrics(
sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input vectors"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"),
"rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"),
"rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"),
"outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"),
Expand All @@ -107,12 +107,12 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
override def genFileSourceScanTransformerMetrics(
sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input vectors"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"),
"rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"),
"rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"),
"outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"),
Expand All @@ -133,10 +133,10 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {

override def genFilterTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"),
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"),
Expand All @@ -149,10 +149,10 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {

override def genProjectTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"),
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"),
Expand All @@ -166,10 +166,10 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
override def genHashAggregateTransformerMetrics(
sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"),
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"),
Expand All @@ -187,10 +187,10 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {

override def genExpandTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"),
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"),
Expand Down Expand Up @@ -233,10 +233,10 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {

override def genWindowTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"),
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"),
Expand All @@ -263,10 +263,10 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {

override def genLimitTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"),
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"),
Expand All @@ -279,10 +279,10 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {

override def genSortTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"),
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"),
Expand Down Expand Up @@ -319,10 +319,10 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {

override def genHashJoinTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"),
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"),
Expand Down Expand Up @@ -358,10 +358,10 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
}
override def genGenerateTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"),
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ class BatchScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric])
extends MetricsUpdater {

val scanTime: SQLMetric = metrics("scanTime")
val outputRows: SQLMetric = metrics("outputRows")
val outputRows: SQLMetric = metrics("numOutputRows")
val outputVectors: SQLMetric = metrics("outputVectors")
val outputBytes: SQLMetric = metrics("outputBytes")
val inputRows: SQLMetric = metrics("inputRows")
val inputRows: SQLMetric = metrics("numInputRows")
val inputBytes: SQLMetric = metrics("inputBytes")
val extraTime: SQLMetric = metrics("extraTime")
val inputWaitTime: SQLMetric = metrics("inputWaitTime")
val outputWaitTime: SQLMetric = metrics("outputWaitTime")

override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
// inputMetrics.bridgeIncBytesRead(metrics("inputBytes").value)
// inputMetrics.bridgeIncRecordsRead(metrics("inputRows").value)
// inputMetrics.bridgeIncRecordsRead(metrics("numInputRows").value)
}

override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ class ExpandMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsU
MetricsUtil.updateExtraTimeMetric(
metricsData,
metrics("extraTime"),
metrics("outputRows"),
metrics("numOutputRows"),
metrics("outputBytes"),
metrics("inputRows"),
metrics("numInputRows"),
metrics("inputBytes"),
ExpandMetricsUpdater.INCLUDING_PROCESSORS,
ExpandMetricsUpdater.CH_PLAN_NODE_NAME
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric
extends MetricsUpdater {

val scanTime: SQLMetric = metrics("scanTime")
val outputRows: SQLMetric = metrics("outputRows")
val outputRows: SQLMetric = metrics("numOutputRows")
val outputVectors: SQLMetric = metrics("outputVectors")
val outputBytes: SQLMetric = metrics("outputBytes")
val inputRows: SQLMetric = metrics("inputRows")
val inputRows: SQLMetric = metrics("numInputRows")
val inputBytes: SQLMetric = metrics("inputBytes")
val extraTime: SQLMetric = metrics("extraTime")
val inputWaitTime: SQLMetric = metrics("inputWaitTime")
val outputWaitTime: SQLMetric = metrics("outputWaitTime")

override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
// inputMetrics.bridgeIncBytesRead(metrics("inputBytes").value)
// inputMetrics.bridgeIncRecordsRead(metrics("inputRows").value)
// inputMetrics.bridgeIncRecordsRead(metrics("numInputRows").value)
}

override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ class FilterMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsU
MetricsUtil.updateExtraTimeMetric(
metricsData,
metrics("extraTime"),
metrics("outputRows"),
metrics("numOutputRows"),
metrics("outputBytes"),
metrics("inputRows"),
metrics("numInputRows"),
metrics("inputBytes"),
FilterMetricsUpdater.INCLUDING_PROCESSORS,
FilterMetricsUpdater.INCLUDING_PROCESSORS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ class GenerateMetricsUpdater(val metrics: Map[String, SQLMetric]) extends Metric
MetricsUtil.updateExtraTimeMetric(
metricsData,
metrics("extraTime"),
metrics("outputRows"),
metrics("numOutputRows"),
metrics("outputBytes"),
metrics("inputRows"),
metrics("numInputRows"),
metrics("inputBytes"),
GenerateMetricsUpdater.INCLUDING_PROCESSORS,
GenerateMetricsUpdater.CH_PLAN_NODE_NAME
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ class HashAggregateMetricsUpdater(val metrics: Map[String, SQLMetric])
MetricsUtil.updateExtraTimeMetric(
aggMetricsData,
metrics("extraTime"),
metrics("outputRows"),
metrics("numOutputRows"),
metrics("outputBytes"),
metrics("inputRows"),
metrics("numInputRows"),
metrics("inputBytes"),
HashAggregateMetricsUpdater.INCLUDING_PROCESSORS,
HashAggregateMetricsUpdater.CH_PLAN_NODE_NAME
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ class HashJoinMetricsUpdater(val metrics: Map[String, SQLMetric])
metrics("extraTime") += (processor.time / 1000L).toLong
}
if (HashJoinMetricsUpdater.CH_PLAN_NODE_NAME.contains(processor.name)) {
metrics("outputRows") += processor.outputRows
metrics("numOutputRows") += processor.outputRows
metrics("outputBytes") += processor.outputBytes
metrics("inputRows") += processor.inputRows
metrics("numInputRows") += processor.inputRows
metrics("inputBytes") += processor.inputBytes
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ class HiveTableScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric]
extends MetricsUpdater {

val scanTime: SQLMetric = metrics("scanTime")
val outputRows: SQLMetric = metrics("outputRows")
val outputRows: SQLMetric = metrics("numOutputRows")
val outputVectors: SQLMetric = metrics("outputVectors")
val outputBytes: SQLMetric = metrics("outputBytes")
val inputRows: SQLMetric = metrics("inputRows")
val inputRows: SQLMetric = metrics("numInputRows")
val inputBytes: SQLMetric = metrics("inputBytes")
val extraTime: SQLMetric = metrics("extraTime")
val inputWaitTime: SQLMetric = metrics("inputWaitTime")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ case class InputIteratorMetricsUpdater(metrics: Map[String, SQLMetric]) extends
InputIteratorMetricsUpdater.CH_PLAN_NODE_NAME
.exists(processor.name.startsWith(_))
) {
metrics("inputRows") += processor.inputRows
metrics("outputRows") += processor.outputRows
metrics("numInputRows") += processor.inputRows
metrics("numOutputRows") += processor.outputRows
}
if (processor.name.equalsIgnoreCase("FillingRightJoinSide")) {
metrics("fillingRightJoinSideTime") += (processor.time / 1000L).toLong
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ class ProjectMetricsUpdater(val metrics: Map[String, SQLMetric]) extends Metrics
MetricsUtil.updateExtraTimeMetric(
metricsData,
metrics("extraTime"),
metrics("outputRows"),
metrics("numOutputRows"),
metrics("outputBytes"),
metrics("inputRows"),
metrics("numInputRows"),
metrics("inputBytes"),
ProjectMetricsUpdater.INCLUDING_PROCESSORS,
ProjectMetricsUpdater.CH_PLAN_NODE_NAME
Expand Down
Loading

0 comments on commit cfa9afa

Please sign in to comment.