Skip to content

Commit

Permalink
[GLUTEN-6531] Minor polish for metrics code
Browse files Browse the repository at this point in the history
  • Loading branch information
xumingming committed Jul 20, 2024
1 parent 8a2ac2a commit 09c2e24
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class VeloxMetricsApi extends MetricsApi with Logging {
relMap: JMap[JLong, JList[JLong]],
joinParamsMap: JMap[JLong, JoinParams],
aggParamsMap: JMap[JLong, AggregationParams]): IMetrics => Unit = {
MetricsUtil.updateNativeMetrics(child, relMap, joinParamsMap, aggParamsMap)
MetricsUtil.genMetricsUpdatingFunction(child, relMap, joinParamsMap, aggParamsMap)
}

override def genInputIteratorTransformerMetrics(
Expand Down Expand Up @@ -349,9 +349,7 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"numMemoryAllocations" -> SQLMetrics.createMetric(
sparkContext,
"number of memory allocations"),
"spilledBytes" -> SQLMetrics.createSizeMetric(
sparkContext,
"total bytes written for spilling"),
"spilledBytes" -> SQLMetrics.createSizeMetric(sparkContext, "bytes written for spilling"),
"spilledRows" -> SQLMetrics.createMetric(sparkContext, "total rows written for spilling"),
"spilledPartitions" -> SQLMetrics.createMetric(sparkContext, "total spilled partitions"),
"spilledFiles" -> SQLMetrics.createMetric(sparkContext, "total spilled files")
Expand Down Expand Up @@ -438,7 +436,7 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"number of hash build memory allocations"),
"hashBuildSpilledBytes" -> SQLMetrics.createSizeMetric(
sparkContext,
"total bytes written for spilling of hash build"),
"bytes written for spilling of hash build"),
"hashBuildSpilledRows" -> SQLMetrics.createMetric(
sparkContext,
"total rows written for spilling of hash build"),
Expand Down Expand Up @@ -472,7 +470,7 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"number of hash probe memory allocations"),
"hashProbeSpilledBytes" -> SQLMetrics.createSizeMetric(
sparkContext,
"total bytes written for spilling of hash probe"),
"bytes written for spilling of hash probe"),
"hashProbeSpilledRows" -> SQLMetrics.createMetric(
sparkContext,
"total rows written for spilling of hash probe"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
object MetricsUtil extends Logging {

/**
* Update metrics fetched from certain iterator to transformers.
* Generate the function which updates metrics fetched from certain iterator to transformers.
*
* @param child
* the child spark plan
Expand All @@ -39,7 +39,7 @@ object MetricsUtil extends Logging {
* @param aggParamsMap
* the map between operator index and aggregation parameters
*/
def updateNativeMetrics(
def genMetricsUpdatingFunction(
child: SparkPlan,
relMap: JMap[JLong, JList[JLong]],
joinParamsMap: JMap[JLong, JoinParams],
Expand All @@ -66,7 +66,7 @@ object MetricsUtil extends Logging {

val mut: MetricsUpdaterTree = treeifyMetricsUpdaters(child)

updateTransformerMetrics(
genMetricsUpdatingFunction(
mut,
relMap,
JLong.valueOf(relMap.size() - 1),
Expand Down Expand Up @@ -269,24 +269,25 @@ object MetricsUtil extends Logging {
}

/**
* A recursive function updating the metrics of one transformer and its child.
* Get a function which would update the metrics of transformers.
*
* @param mut
* @param mutNode
* the metrics updater tree built from the original plan
* @param relMap
* the map between operator index and its rels
* @param operatorIdx
* the index of operator
* @param metrics
* the metrics fetched from native
* @param metricsIdx
* the index of metrics
* @param joinParamsMap
* the map between operator index and join parameters
* @param aggParamsMap
* the map between operator index and aggregation parameters
*
* @return
* A recursive function updating the metrics of operator(transformer) and its children.
*/
def updateTransformerMetrics(
def genMetricsUpdatingFunction(
mutNode: MetricsUpdaterTree,
relMap: JMap[JLong, JList[JLong]],
operatorIdx: JLong,
Expand Down

0 comments on commit 09c2e24

Please sign in to comment.