diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala index 34ffecb45833..ed6953b81a32 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala @@ -749,7 +749,8 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite } } - test("test mergetree path based write with bucket table") { + // FIXME: very slow after https://github.com/apache/incubator-gluten/pull/6558 + ignore("test mergetree path based write with bucket table") { val dataPath = s"$basePath/lineitem_mergetree_bucket" clearDataPath(dataPath) @@ -759,8 +760,8 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite sourceDF.write .format("clickhouse") - .partitionBy("l_returnflag") - .option("clickhouse.orderByKey", "l_orderkey") + .partitionBy("l_shipdate") + .option("clickhouse.orderByKey", "l_orderkey,l_returnflag") .option("clickhouse.primaryKey", "l_orderkey") .option("clickhouse.numBuckets", "4") .option("clickhouse.bucketColumnNames", "l_partkey") @@ -807,13 +808,13 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite val buckets = ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption assert(buckets.isDefined) assertResult(4)(buckets.get.numBuckets) - assertResult("l_orderkey")( + assertResult("l_orderkey,l_returnflag")( buckets.get.sortColumnNames .mkString(",")) assertResult("l_partkey")( buckets.get.bucketColumnNames .mkString(",")) - assertResult("l_orderkey")( + assertResult("l_orderkey,l_returnflag")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .orderByKeyOption @@ -826,21 +827,20 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite .get .mkString(",")) assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size) - assertResult("l_returnflag")( + assertResult("l_shipdate")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .partitionColumns .head) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assertResult(12)(addFiles.size) + assertResult(10089)(addFiles.size) assertResult(600572)(addFiles.map(_.rows).sum) - assertResult(4)(addFiles.count(_.partitionValues("l_returnflag").equals("A"))) - assertResult(4)(addFiles.count(_.partitionValues("l_returnflag").equals("N"))) - assertResult(4)(addFiles.count(_.partitionValues("l_returnflag").equals("R"))) - assertResult(1)( - addFiles.count( - f => f.partitionValues("l_returnflag").equals("A") && f.bucketNum.equals("00000"))) + assertResult(4)(addFiles.count(_.partitionValues("l_shipdate").equals("1992-06-01"))) + assertResult(4)(addFiles.count(_.partitionValues("l_shipdate").equals("1993-01-01"))) + assertResult(4)(addFiles.count(_.partitionValues("l_shipdate").equals("1995-01-21"))) + assertResult(1)(addFiles.count( + f => f.partitionValues("l_shipdate").equals("1995-01-21") && f.bucketNum.equals("00000"))) } // check part pruning effect of filter on bucket column val df = spark.sql(s""" @@ -855,7 +855,7 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite .flatMap(partition => partition.asInstanceOf[GlutenMergeTreePartition].partList) .map(_.name) .distinct - assertResult(12)(touchedParts.size) + assertResult(4)(touchedParts.size) // test upsert on partitioned & bucketed table upsertSourceTableAndCheck(dataPath) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala index 3b7606daac6b..84218f26a07f 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala @@ -801,37 +801,39 @@ class GlutenClickHouseMergeTreeWriteSuite } } - test("test mergetree write with bucket table") { + // FIXME: very slow after https://github.com/apache/incubator-gluten/pull/6558 + ignore("test mergetree write with bucket table") { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_mergetree_bucket; |""".stripMargin) - spark.sql(s""" - |CREATE TABLE IF NOT EXISTS lineitem_mergetree_bucket - |( - | l_orderkey bigint, - | l_partkey bigint, - | l_suppkey bigint, - | l_linenumber bigint, - | l_quantity double, - | l_extendedprice double, - | l_discount double, - | l_tax double, - | l_returnflag string, - | l_linestatus string, - | l_shipdate date, - | l_commitdate date, - | l_receiptdate date, - | l_shipinstruct string, - | l_shipmode string, - | l_comment string - |) - |USING clickhouse - |PARTITIONED BY (l_returnflag) - |CLUSTERED BY (l_partkey) - |${if (sparkVersion.equals("3.2")) "" else "SORTED BY (l_orderkey)"} INTO 4 BUCKETS - |LOCATION '$basePath/lineitem_mergetree_bucket' - |""".stripMargin) + spark.sql( + s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_bucket + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |PARTITIONED BY (l_shipdate) + |CLUSTERED BY (l_partkey) + |${if (sparkVersion.equals("3.2")) "" else "SORTED BY (l_orderkey, l_returnflag)"} INTO 4 BUCKETS + |LOCATION '$basePath/lineitem_mergetree_bucket' + |""".stripMargin) spark.sql(s""" | insert into table lineitem_mergetree_bucket @@ -879,7 +881,7 @@ class GlutenClickHouseMergeTreeWriteSuite if (sparkVersion.equals("3.2")) { assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty) } else { - assertResult("l_orderkey")( + assertResult("l_orderkey,l_returnflag")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .orderByKeyOption @@ -888,21 +890,20 @@ class GlutenClickHouseMergeTreeWriteSuite } assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty) assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size) - assertResult("l_returnflag")( + assertResult("l_shipdate")( ClickHouseTableV2 .getTable(fileIndex.deltaLog) .partitionColumns .head) val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) - assertResult(12)(addFiles.size) + assertResult(10089)(addFiles.size) assertResult(600572)(addFiles.map(_.rows).sum) - assertResult(4)(addFiles.count(_.partitionValues("l_returnflag").equals("A"))) - assertResult(4)(addFiles.count(_.partitionValues("l_returnflag").equals("N"))) - assertResult(4)(addFiles.count(_.partitionValues("l_returnflag").equals("R"))) - assertResult(1)( - addFiles.count( - f => f.partitionValues("l_returnflag").equals("A") && f.bucketNum.equals("00000"))) + assertResult(4)(addFiles.count(_.partitionValues("l_shipdate").equals("1992-06-01"))) + assertResult(4)(addFiles.count(_.partitionValues("l_shipdate").equals("1993-01-01"))) + assertResult(4)(addFiles.count(_.partitionValues("l_shipdate").equals("1995-01-21"))) + assertResult(1)(addFiles.count( + f => f.partitionValues("l_shipdate").equals("1995-01-21") && f.bucketNum.equals("00000"))) } // check part pruning effect of filter on bucket column val df = spark.sql(s""" @@ -917,7 +918,7 @@ class GlutenClickHouseMergeTreeWriteSuite .flatMap(partition => partition.asInstanceOf[GlutenMergeTreePartition].partList) .map(_.name) .distinct - assertResult(12)(touchedParts.size) + assertResult(4)(touchedParts.size) // test upsert on partitioned & bucketed table upsertSourceTableAndCheck("lineitem_mergetree_bucket") diff --git a/cpp-ch/local-engine/Common/ConcurrentMap.h b/cpp-ch/local-engine/Common/ConcurrentMap.h index 1719d9b255ea..c56926ff505c 100644 --- a/cpp-ch/local-engine/Common/ConcurrentMap.h +++ b/cpp-ch/local-engine/Common/ConcurrentMap.h @@ -27,13 +27,13 @@ class ConcurrentMap public: void insert(const K & key, const V & value) { - std::unique_lock lock{mutex}; + std::lock_guard lock{mutex}; map.insert({key, value}); } V get(const K & key) { - std::shared_lock lock{mutex}; + std::lock_guard lock{mutex}; auto it = map.find(key); if (it == map.end()) { @@ -44,30 +44,24 @@ class ConcurrentMap void erase(const K & key) { - std::unique_lock lock{mutex}; + std::lock_guard lock{mutex}; map.erase(key); } void clear() { - std::unique_lock lock{mutex}; + std::lock_guard lock{mutex}; map.clear(); } - bool contains(const K & key) - { - std::shared_lock lock{mutex}; - return map.contains(key); - } - size_t size() const { - std::shared_lock lock{mutex}; + std::lock_guard lock{mutex}; return map.size(); } private: std::unordered_map map; - mutable std::shared_mutex mutex; + mutable std::mutex mutex; }; } diff --git a/cpp-ch/local-engine/Common/QueryContext.cpp b/cpp-ch/local-engine/Common/QueryContext.cpp index 2d5780a6e35b..68934adad367 100644 --- a/cpp-ch/local-engine/Common/QueryContext.cpp +++ b/cpp-ch/local-engine/Common/QueryContext.cpp @@ -24,7 +24,6 @@ #include #include #include -#include #include #include #include @@ -49,7 +48,8 @@ struct QueryContext ContextMutablePtr query_context; }; -ConcurrentMap> query_map; +std::unordered_map> query_map; +std::mutex query_map_mutex; int64_t QueryContextManager::initializeQuery() { @@ -72,8 +72,9 @@ int64_t QueryContextManager::initializeQuery() query_context->thread_group->memory_tracker.setSoftLimit(memory_limit); query_context->thread_group->memory_tracker.setHardLimit(memory_limit + config.extra_memory_hard_limit); + std::lock_guard lock_guard(query_map_mutex); int64_t id = reinterpret_cast(query_context->thread_group.get()); - query_map.insert(id, query_context); + query_map.emplace(id, query_context); return id; } @@ -83,8 +84,9 @@ DB::ContextMutablePtr QueryContextManager::currentQueryContext() { throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found."); } + std::lock_guard lock_guard(query_map_mutex); int64_t id = reinterpret_cast(CurrentThread::getGroup().get()); - return query_map.get(id)->query_context; + return query_map[id]->query_context; } void QueryContextManager::logCurrentPerformanceCounters(ProfileEvents::Counters & counters) @@ -114,9 +116,10 @@ void QueryContextManager::logCurrentPerformanceCounters(ProfileEvents::Counters size_t QueryContextManager::currentPeakMemory(int64_t id) { + std::lock_guard lock_guard(query_map_mutex); if (!query_map.contains(id)) throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "context released {}", id); - return query_map.get(id)->thread_group->memory_tracker.getPeak(); + return query_map[id]->thread_group->memory_tracker.getPeak(); } void QueryContextManager::finalizeQuery(int64_t id) @@ -127,7 +130,8 @@ void QueryContextManager::finalizeQuery(int64_t id) } std::shared_ptr context; { - context = query_map.get(id); + std::lock_guard lock_guard(query_map_mutex); + context = query_map[id]; } auto query_context = context->thread_status->getQueryContext(); if (!query_context) @@ -148,6 +152,7 @@ void QueryContextManager::finalizeQuery(int64_t id) context->thread_status.reset(); query_context.reset(); { + std::lock_guard lock_guard(query_map_mutex); query_map.erase(id); } }