diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala index 1939d0e0367d6..bb3cb5acce37c 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, GreaterThanOrEqual, IsNotNull, Literal} import org.apache.spark.sql.delta._ import org.apache.spark.sql.execution.command.LeafRunnableCommand +import org.apache.spark.sql.execution.commands.GlutenCacheBase._ import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts import org.apache.spark.sql.types.{BooleanType, StringType} @@ -49,8 +50,7 @@ case class GlutenCHCacheDataCommand( partitionColumn: Option[String], partitionValue: Option[String], tablePropertyOverrides: Map[String, String] -) extends LeafRunnableCommand - with GlutenCacheBase { +) extends LeafRunnableCommand { override def output: Seq[Attribute] = Seq( AttributeReference("result", BooleanType, nullable = false)(), diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala index 2907febf8028e..c4e9f51bce63e 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala @@ -29,10 +29,10 @@ import scala.collection.mutable.ArrayBuffer import scala.concurrent.Future import scala.concurrent.duration.Duration -trait GlutenCacheBase { +object GlutenCacheBase { def ALL_EXECUTORS: String = "allExecutors" - protected def toExecutorId(executorId: String): String = + def toExecutorId(executorId: String): String = executorId.split("_").last protected def waitRpcResults @@ -46,7 +46,7 @@ trait GlutenCacheBase { resultList } - protected def checkExecutorId(executorId: String): Unit = { + def checkExecutorId(executorId: String): Unit = { if (!GlutenDriverEndpoint.executorDataMap.containsKey(toExecutorId(executorId))) { throw new GlutenException( s"executor $executorId not found," + @@ -87,7 +87,7 @@ trait GlutenCacheBase { (status, messages.mkString(";")) } - protected def collectJobTriggerResult( + def collectJobTriggerResult( jobs: ArrayBuffer[(String, CacheJobInfo)]): (Boolean, ArrayBuffer[String]) = { var status = true val messages = ArrayBuffer[String]() @@ -101,7 +101,7 @@ trait GlutenCacheBase { (status, messages) } - protected def getResult( + def getResult( futureList: ArrayBuffer[(String, Future[CacheJobInfo])], async: Boolean): Seq[Row] = { val resultList = waitRpcResults(futureList) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala index e535097ed9fcf..0a08df7cebade 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala @@ -25,6 +25,7 @@ import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo, GlutenFilesCacheLoa import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.command.LeafRunnableCommand +import org.apache.spark.sql.execution.commands.GlutenCacheBase._ import org.apache.spark.sql.types.{BooleanType, StringType} import org.apache.hadoop.conf.Configuration @@ -42,8 +43,7 @@ case class GlutenCacheFilesCommand( selectedColumn: Option[Seq[String]], filePath: String, propertyOverrides: Map[String, String] -) extends LeafRunnableCommand - with GlutenCacheBase { +) extends LeafRunnableCommand { override def output: Seq[Attribute] = Seq( AttributeReference("result", BooleanType, nullable = false)(), @@ -135,8 +135,7 @@ case class GlutenCacheFilesCommand( executorIdsToLocalFiles.foreach { case (executorId, fileNode) => checkExecutorId(executorId) - val executor = GlutenDriverEndpoint.executorDataMap.get( - GlutenCacheFilesCommand.toExecutorId(executorId)) + val executor = GlutenDriverEndpoint.executorDataMap.get(toExecutorId(executorId)) futureList.append( ( executorId, @@ -187,10 +186,3 @@ case class GlutenCacheFilesCommand( } } } - -object GlutenCacheFilesCommand { - val ALL_EXECUTORS = "allExecutors" - - private def toExecutorId(executorId: String): String = - executorId.split("_").last -} diff --git a/cpp-ch/local-engine/Common/QueryContext.cpp b/cpp-ch/local-engine/Common/QueryContext.cpp index e5f5dd5dccdb8..ff9c151159a6d 100644 --- a/cpp-ch/local-engine/Common/QueryContext.cpp +++ b/cpp-ch/local-engine/Common/QueryContext.cpp @@ -77,14 +77,19 @@ int64_t QueryContextManager::initializeQuery() DB::ContextMutablePtr QueryContextManager::currentQueryContext() { - if (!CurrentThread::getGroup()) - { - throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found."); - } + auto thread_group = currentThreadGroup(); int64_t id = reinterpret_cast(CurrentThread::getGroup().get()); return query_map.get(id)->query_context; } +std::shared_ptr QueryContextManager::currentThreadGroup() +{ + if (auto thread_group = CurrentThread::getGroup()) + return thread_group; + + throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found."); +} + void QueryContextManager::logCurrentPerformanceCounters(ProfileEvents::Counters & counters) { if (!CurrentThread::getGroup()) diff --git a/cpp-ch/local-engine/Common/QueryContext.h b/cpp-ch/local-engine/Common/QueryContext.h index 0fbf4977321f1..4770327d1715c 100644 --- a/cpp-ch/local-engine/Common/QueryContext.h +++ b/cpp-ch/local-engine/Common/QueryContext.h @@ -30,6 +30,7 @@ class QueryContextManager } int64_t initializeQuery(); DB::ContextMutablePtr currentQueryContext(); + static std::shared_ptr currentThreadGroup(); void logCurrentPerformanceCounters(ProfileEvents::Counters& counters); size_t currentPeakMemory(int64_t id); void finalizeQuery(int64_t id); diff --git a/cpp-ch/local-engine/Parser/RelMetric.cpp b/cpp-ch/local-engine/Parser/RelMetric.cpp index 039d978bf77de..e138642607c4e 100644 --- a/cpp-ch/local-engine/Parser/RelMetric.cpp +++ b/cpp-ch/local-engine/Parser/RelMetric.cpp @@ -15,10 +15,12 @@ * limitations under the License. */ #include "RelMetric.h" + #include #include #include #include +#include using namespace rapidjson; @@ -47,7 +49,8 @@ namespace local_engine static void writeCacheHits(Writer & writer) { - auto & counters = DB::CurrentThread::getProfileEvents(); + const auto thread_group = QueryContextManager::currentThreadGroup(); + auto & counters = thread_group->performance_counters; auto read_cache_hits = counters[ProfileEvents::CachedReadBufferReadFromCacheHits].load(); auto miss_cache_hits = counters[ProfileEvents::CachedReadBufferReadFromCacheMisses].load(); auto read_cache_bytes = counters[ProfileEvents::CachedReadBufferReadFromCacheBytes].load();