diff --git a/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenClickhouseSqlBase.g4 b/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenClickhouseSqlBase.g4 index a4ff5112366e7..ac4f66a4fabae 100644 --- a/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenClickhouseSqlBase.g4 +++ b/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenClickhouseSqlBase.g4 @@ -50,7 +50,7 @@ singleStatement ; statement - : CACHE META? DATA ASYN? SELECT selectedColumns=selectedColumnNames + : CACHE META? DATA ASYNC? SELECT selectedColumns=selectedColumnNames FROM (path=STRING | table=qualifiedName) (AFTER filter=filterClause)? (CACHEPROPERTIES cacheProps=propertyList)? #cacheData | .*? #passThrough @@ -113,7 +113,7 @@ quotedIdentifier // Add keywords here so that people's queries don't break if they have a column name as one of // these tokens nonReserved - : CACHE | META | ASYN | DATA + : CACHE | META | ASYNC | DATA | SELECT | FOR | AFTER | CACHEPROPERTIES | TIMESTAMP | AS | OF | DATE_PARTITION ; @@ -121,7 +121,7 @@ nonReserved // Define how the keywords above should appear in a user's SQL statement. CACHE: 'CACHE'; META: 'META'; -ASYN: 'ASYN'; +ASYNC: 'ASYNC'; DATA: 'DATA'; SELECT: 'SELECT'; COMMA: ','; diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala index d4184890ddd3d..e8efc7291266b 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParser.scala @@ -203,7 +203,7 @@ class GlutenClickhouseSqlAstBuilder extends GlutenClickhouseSqlBaseBaseVisitor[A override def visitCacheData(ctx: GlutenClickhouseSqlBaseParser.CacheDataContext): AnyRef = withOrigin(ctx) { val onlyMetaCache = ctx.META != null - val asynExecute = ctx.ASYN != null + val asynExecute = ctx.ASYNC != null val (tsfilter, partitionColumn, partitionValue) = if (ctx.AFTER != null) { if (ctx.filter.TIMESTAMP != null) { (Some(string(ctx.filter.timestamp)), None, None) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala index a3661ebbc81b7..06a5779024c8b 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala @@ -175,7 +175,7 @@ case class GlutenCHCacheDataCommand( onePart.table, ClickhouseSnapshot.genSnapshotId(snapshot), onePart.tablePath, - "", + pathToCache.toString, snapshot.metadata.configuration.getOrElse("orderByKey", ""), snapshot.metadata.configuration.getOrElse("lowCardKey", ""), snapshot.metadata.configuration.getOrElse("minmaxIndexKey", ""), diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala index 59abe4c01fe92..1a714ed88ab09 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeCacheDataSSuite.scala @@ -20,12 +20,12 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.delta.files.TahoeFileIndex import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts - import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import java.io.File +import scala.concurrent.duration.DurationInt // Some sqls' line length exceeds 100 // scalastyle:off line.size.limit @@ -55,7 +55,6 @@ class GlutenClickHouseMergeTreeCacheDataSSuite .set( "spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert", "false") - .set("spark.gluten.sql.columnar.backend.ch.runtime_config.path", "/tmp/ch_path") } override protected def beforeEach(): Unit = { @@ -70,7 +69,18 @@ class GlutenClickHouseMergeTreeCacheDataSSuite FileUtils.forceMkdir(new File(HDFS_CACHE_PATH)) } - test("test mergetree table write") { + def countFiles(directory: File): Int = { + if (directory.exists && directory.isDirectory) { + val files = directory.listFiles + val count = files + .count(_.isFile) + files.filter(_.isDirectory).map(countFiles).sum + count + } else { + 0 + } + } + + test("test cache mergetree data sync") { spark.sql(s""" |DROP TABLE IF EXISTS lineitem_mergetree_hdfs; |""".stripMargin) @@ -109,6 +119,8 @@ class GlutenClickHouseMergeTreeCacheDataSSuite |""".stripMargin) FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH)) FileUtils.forceMkdir(new File(HDFS_METADATA_PATH)) + val dataPath = new File(HDFS_CACHE_PATH) + val initial_cache_files = countFiles(dataPath) val res = spark .sql(s""" @@ -121,11 +133,119 @@ class GlutenClickHouseMergeTreeCacheDataSSuite assertResult(true)(res(0).getBoolean(0)) val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs") assertResult(true)(metaPath.exists() && metaPath.isDirectory) - assertResult(22)(metaPath.list().length) - + assert(countFiles(dataPath) > initial_cache_files) + val first_cache_files = countFiles(dataPath) val res1 = spark.sql(s"cache data select * from lineitem_mergetree_hdfs").collect() assertResult(true)(res1(0).getBoolean(0)) assertResult(31)(metaPath.list().length) + assert(countFiles(dataPath) > first_cache_files) + + val sqlStr = + s""" + |SELECT + | l_returnflag, + | l_linestatus, + | sum(l_quantity) AS sum_qty, + | sum(l_extendedprice) AS sum_base_price, + | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price, + | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge, + | avg(l_quantity) AS avg_qty, + | avg(l_extendedprice) AS avg_price, + | avg(l_discount) AS avg_disc, + | count(*) AS count_order + |FROM + | lineitem_mergetree_hdfs + |WHERE + | l_shipdate >= date'1995-01-10' + |GROUP BY + | l_returnflag, + | l_linestatus + |ORDER BY + | l_returnflag, + | l_linestatus; + | + |""".stripMargin + runSql(sqlStr)( + df => { + val scanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExecTransformer => f + } + assertResult(1)(scanExec.size) + + val mergetreeScan = scanExec.head + assert(mergetreeScan.nodeName.startsWith("Scan mergetree")) + + val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex] + val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts]) + assertResult(7898)(addFiles.map(_.rows).sum) + }) + spark.sql("drop table lineitem_mergetree_hdfs purge") + } + + test("test cache mergetree data async") { + spark.sql(s""" + |DROP TABLE IF EXISTS lineitem_mergetree_hdfs; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs + |( + | l_orderkey bigint, + | l_partkey bigint, + | l_suppkey bigint, + | l_linenumber bigint, + | l_quantity double, + | l_extendedprice double, + | l_discount double, + | l_tax double, + | l_returnflag string, + | l_linestatus string, + | l_shipdate date, + | l_commitdate date, + | l_receiptdate date, + | l_shipinstruct string, + | l_shipmode string, + | l_comment string + |) + |USING clickhouse + |PARTITIONED BY (l_shipdate) + |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs' + |TBLPROPERTIES (storage_policy='__hdfs_main', + | orderByKey='l_linenumber,l_orderkey') + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_mergetree_hdfs + | select * from lineitem a + | where a.l_shipdate between date'1995-01-01' and date'1995-01-31' + |""".stripMargin) + FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH)) + FileUtils.forceMkdir(new File(HDFS_METADATA_PATH)) + val dataPath = new File(HDFS_CACHE_PATH) + val initial_cache_files = countFiles(dataPath) + + val res = spark + .sql(s""" + |cache data async + | select * from lineitem_mergetree_hdfs + | after l_shipdate AS OF '1995-01-10' + | CACHEPROPERTIES(storage_policy='__hdfs_main', + | aaa='ccc')""".stripMargin) + .collect() + assertResult(true)(res(0).getBoolean(0)) + val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs") + assertResult(true)(metaPath.exists() && metaPath.isDirectory) + eventually(timeout(60.seconds), interval(2.seconds)) { + assert(countFiles(dataPath) > initial_cache_files) + } + + val first_cache_files = countFiles(dataPath) + val res1 = spark.sql(s"cache data async select * from lineitem_mergetree_hdfs").collect() + assertResult(true)(res1(0).getBoolean(0)) + eventually(timeout(60.seconds), interval(2.seconds)) { + assertResult(31)(metaPath.list().length) + assert(countFiles(dataPath) > first_cache_files) + } val sqlStr = s""" diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala index d66f386de7888..9412326ae342c 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala @@ -49,7 +49,7 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu val HDFS_METADATA_PATH = s"/tmp/metadata/hdfs/$sparkVersion/" val HDFS_CACHE_PATH = s"/tmp/hdfs_cache/$sparkVersion/" - val HDFS_URL_ENDPOINT = s"hdfs://192.168.0.158:9000" + val HDFS_URL_ENDPOINT = s"hdfs://127.0.0.1:8020" val HDFS_URL = s"$HDFS_URL_ENDPOINT/$sparkVersion" val S3_ACCESS_KEY = "BypTYzcXOlfr03FFIvt4" @@ -82,7 +82,7 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu "/tmp/user_defined") if (UTSystemParameters.testMergeTreeOnObjectStorage) { conf - /* .set("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY) + .set("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY) .set("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY) .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") .set("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT) @@ -120,7 +120,7 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu "main") .set( "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__s3_main.volumes.main.disk", - "s3_cache") */ + "s3_cache") .set( "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.type", "hdfs_gluten") diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp index 9943a2bfa8512..2ff8cafd73ce7 100644 --- a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp +++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp @@ -79,7 +79,7 @@ void CacheManager::cachePart(const MergeTreeTable& table, const MergeTreePart& p job_context.table.parts.clear(); job_context.table.parts.push_back(part); job_context.table.snapshot_id = ""; - auto job = [job_detail = job_context, context = this->context, columns = columns, latch = latch]() + auto job = [job_detail = job_context, context = this->context, read_columns = columns, latch = latch]() { try { @@ -87,17 +87,15 @@ void CacheManager::cachePart(const MergeTreeTable& table, const MergeTreePart& p auto storage = MergeTreeRelParser::parseStorage(job_detail.table, context, true); auto storage_snapshot = std::make_shared(*storage, storage->getInMemoryMetadataPtr()); NamesAndTypesList names_and_types_list; - for (const auto & column : storage->getInMemoryMetadata().getColumns()) + auto meta_columns = storage->getInMemoryMetadata().getColumns(); + for (const auto & column : meta_columns) { - if (columns.contains(column.name)) + if (read_columns.contains(column.name)) names_and_types_list.push_back(NameAndTypePair(column.name, column.type)); } - auto query_info = buildQueryInfo(names_and_types_list); - std::vector selected_parts = StorageMergeTreeFactory::getDataPartsByNames(storage->getStorageID(), "", {job_detail.table.parts.front().name}); - auto read_step = storage->reader.readFromParts( selected_parts, /* alter_conversions = */ @@ -111,9 +109,15 @@ void CacheManager::cachePart(const MergeTreeTable& table, const MergeTreePart& p QueryPlan plan; plan.addStep(std::move(read_step)); auto pipeline_builder = plan.buildQueryPipeline({}, {}); - pipeline_builder->setSinks([&](const auto & header, auto ) {return std::make_shared(header);}); - auto executor = pipeline_builder->execute(); - executor->execute(1, true); + auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*pipeline_builder.get())); + PullingPipelineExecutor executor(pipeline); + while (true) + { + Chunk chunk; + if (!executor.pull(chunk)) + break; + } + LOG_INFO(getLogger("CacheManager"), "Load cache of table {}.{} part {} success.", job_detail.table.database, job_detail.table.table, job_detail.table.parts.front().name); } catch (std::exception& e) { diff --git a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp index b638d9bc27eaf..5d631c958f506 100644 --- a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp +++ b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp @@ -64,21 +64,15 @@ StorageMergeTreeFactory::getStorage(const StorageID& id, const String & snapshot std::lock_guard lock(storage_map_mutex); merge_tree_table.parts.clear(); - auto new_storage = creator(); if (storage_map->has(table_name) && !storage_map->get(table_name)->second.sameStructWith(merge_tree_table)) { - // freeStorage(id); - if (storage_map->has(table_name)) - storage_map->remove(table_name); - { - std::lock_guard lock(datapart_mutex); - if (datapart_map->has(table_name)) - datapart_map->remove(table_name); - } + freeStorage(id); + std::lock_guard lock_datapart(datapart_mutex); + if (datapart_map->has(table_name)) + datapart_map->remove(table_name); } - if (!storage_map->has(table_name)) - storage_map->add(table_name, {new_storage, merge_tree_table}); + storage_map->add(table_name, {creator(), merge_tree_table}); return storage_map->get(table_name)->first; } @@ -129,7 +123,7 @@ DataPartsVector StorageMergeTreeFactory::getDataPartsByNames(const StorageID & i // will be inited in native init phase std::unique_ptr>> StorageMergeTreeFactory::storage_map = nullptr; std::unique_ptr>>> StorageMergeTreeFactory::datapart_map = nullptr; -std::mutex StorageMergeTreeFactory::storage_map_mutex; -std::mutex StorageMergeTreeFactory::datapart_mutex; +std::recursive_mutex StorageMergeTreeFactory::storage_map_mutex; +std::recursive_mutex StorageMergeTreeFactory::datapart_mutex; } diff --git a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h index fca5dea3ed4f4..cb1bd5fb60bde 100644 --- a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h +++ b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h @@ -68,8 +68,8 @@ class StorageMergeTreeFactory private: static std::unique_ptr>> storage_map; static std::unique_ptr>>> datapart_map; - static std::mutex storage_map_mutex; - static std::mutex datapart_mutex; + static std::recursive_mutex storage_map_mutex; + static std::recursive_mutex datapart_mutex; }; struct TempStorageFreer