Skip to content

Commit

Permalink
fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
liuneng1994 committed Jul 29, 2024
1 parent 4abc92a commit 173ebcb
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ singleStatement
;

statement
: CACHE META? DATA ASYN? SELECT selectedColumns=selectedColumnNames
: CACHE META? DATA ASYNC? SELECT selectedColumns=selectedColumnNames
FROM (path=STRING | table=qualifiedName) (AFTER filter=filterClause)?
(CACHEPROPERTIES cacheProps=propertyList)? #cacheData
| .*? #passThrough
Expand Down Expand Up @@ -113,15 +113,15 @@ quotedIdentifier
// Add keywords here so that people's queries don't break if they have a column name as one of
// these tokens
nonReserved
: CACHE | META | ASYN | DATA
: CACHE | META | ASYNC | DATA
| SELECT | FOR | AFTER | CACHEPROPERTIES
| TIMESTAMP | AS | OF | DATE_PARTITION
;

// Define how the keywords above should appear in a user's SQL statement.
CACHE: 'CACHE';
META: 'META';
ASYN: 'ASYN';
ASYNC: 'ASYNC';
DATA: 'DATA';
SELECT: 'SELECT';
COMMA: ',';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class GlutenClickhouseSqlAstBuilder extends GlutenClickhouseSqlBaseBaseVisitor[A
override def visitCacheData(ctx: GlutenClickhouseSqlBaseParser.CacheDataContext): AnyRef =
withOrigin(ctx) {
val onlyMetaCache = ctx.META != null
val asynExecute = ctx.ASYN != null
val asynExecute = ctx.ASYNC != null
val (tsfilter, partitionColumn, partitionValue) = if (ctx.AFTER != null) {
if (ctx.filter.TIMESTAMP != null) {
(Some(string(ctx.filter.timestamp)), None, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ case class GlutenCHCacheDataCommand(
onePart.table,
ClickhouseSnapshot.genSnapshotId(snapshot),
onePart.tablePath,
"",
pathToCache.toString,
snapshot.metadata.configuration.getOrElse("orderByKey", ""),
snapshot.metadata.configuration.getOrElse("lowCardKey", ""),
snapshot.metadata.configuration.getOrElse("minmaxIndexKey", ""),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts

import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem

import java.io.File
import scala.concurrent.duration.DurationInt

// Some sqls' line length exceeds 100
// scalastyle:off line.size.limit
Expand Down Expand Up @@ -55,7 +55,6 @@ class GlutenClickHouseMergeTreeCacheDataSSuite
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
"false")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.path", "/tmp/ch_path")
}

override protected def beforeEach(): Unit = {
Expand All @@ -70,7 +69,18 @@ class GlutenClickHouseMergeTreeCacheDataSSuite
FileUtils.forceMkdir(new File(HDFS_CACHE_PATH))
}

test("test mergetree table write") {
def countFiles(directory: File): Int = {
if (directory.exists && directory.isDirectory) {
val files = directory.listFiles
val count = files
.count(_.isFile) + files.filter(_.isDirectory).map(countFiles).sum
count
} else {
0
}
}

test("test cache mergetree data sync") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
|""".stripMargin)
Expand Down Expand Up @@ -109,6 +119,8 @@ class GlutenClickHouseMergeTreeCacheDataSSuite
|""".stripMargin)
FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
val dataPath = new File(HDFS_CACHE_PATH)
val initial_cache_files = countFiles(dataPath)

val res = spark
.sql(s"""
Expand All @@ -121,11 +133,119 @@ class GlutenClickHouseMergeTreeCacheDataSSuite
assertResult(true)(res(0).getBoolean(0))
val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs")
assertResult(true)(metaPath.exists() && metaPath.isDirectory)
assertResult(22)(metaPath.list().length)

assert(countFiles(dataPath) > initial_cache_files)
val first_cache_files = countFiles(dataPath)
val res1 = spark.sql(s"cache data select * from lineitem_mergetree_hdfs").collect()
assertResult(true)(res1(0).getBoolean(0))
assertResult(31)(metaPath.list().length)
assert(countFiles(dataPath) > first_cache_files)

val sqlStr =
s"""
|SELECT
| l_returnflag,
| l_linestatus,
| sum(l_quantity) AS sum_qty,
| sum(l_extendedprice) AS sum_base_price,
| sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
| sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
| avg(l_quantity) AS avg_qty,
| avg(l_extendedprice) AS avg_price,
| avg(l_discount) AS avg_disc,
| count(*) AS count_order
|FROM
| lineitem_mergetree_hdfs
|WHERE
| l_shipdate >= date'1995-01-10'
|GROUP BY
| l_returnflag,
| l_linestatus
|ORDER BY
| l_returnflag,
| l_linestatus;
|
|""".stripMargin
runSql(sqlStr)(
df => {
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assertResult(1)(scanExec.size)

val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))

val fileIndex = mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddMergeTreeParts])
assertResult(7898)(addFiles.map(_.rows).sum)
})
spark.sql("drop table lineitem_mergetree_hdfs purge")
}

test("test cache mergetree data async") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
|""".stripMargin)

spark.sql(s"""
|CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs
|(
| l_orderkey bigint,
| l_partkey bigint,
| l_suppkey bigint,
| l_linenumber bigint,
| l_quantity double,
| l_extendedprice double,
| l_discount double,
| l_tax double,
| l_returnflag string,
| l_linestatus string,
| l_shipdate date,
| l_commitdate date,
| l_receiptdate date,
| l_shipinstruct string,
| l_shipmode string,
| l_comment string
|)
|USING clickhouse
|PARTITIONED BY (l_shipdate)
|LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
|TBLPROPERTIES (storage_policy='__hdfs_main',
| orderByKey='l_linenumber,l_orderkey')
|""".stripMargin)

spark.sql(s"""
| insert into table lineitem_mergetree_hdfs
| select * from lineitem a
| where a.l_shipdate between date'1995-01-01' and date'1995-01-31'
|""".stripMargin)
FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
val dataPath = new File(HDFS_CACHE_PATH)
val initial_cache_files = countFiles(dataPath)

val res = spark
.sql(s"""
|cache data async
| select * from lineitem_mergetree_hdfs
| after l_shipdate AS OF '1995-01-10'
| CACHEPROPERTIES(storage_policy='__hdfs_main',
| aaa='ccc')""".stripMargin)
.collect()
assertResult(true)(res(0).getBoolean(0))
val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs")
assertResult(true)(metaPath.exists() && metaPath.isDirectory)
eventually(timeout(60.seconds), interval(2.seconds)) {
assert(countFiles(dataPath) > initial_cache_files)
}

val first_cache_files = countFiles(dataPath)
val res1 = spark.sql(s"cache data async select * from lineitem_mergetree_hdfs").collect()
assertResult(true)(res1(0).getBoolean(0))
eventually(timeout(60.seconds), interval(2.seconds)) {
assertResult(31)(metaPath.list().length)
assert(countFiles(dataPath) > first_cache_files)
}

val sqlStr =
s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu

val HDFS_METADATA_PATH = s"/tmp/metadata/hdfs/$sparkVersion/"
val HDFS_CACHE_PATH = s"/tmp/hdfs_cache/$sparkVersion/"
val HDFS_URL_ENDPOINT = s"hdfs://192.168.0.158:9000"
val HDFS_URL_ENDPOINT = s"hdfs://127.0.0.1:8020"
val HDFS_URL = s"$HDFS_URL_ENDPOINT/$sparkVersion"

val S3_ACCESS_KEY = "BypTYzcXOlfr03FFIvt4"
Expand Down Expand Up @@ -82,7 +82,7 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu
"/tmp/user_defined")
if (UTSystemParameters.testMergeTreeOnObjectStorage) {
conf
/* .set("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY)
.set("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY)
.set("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY)
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.set("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT)
Expand Down Expand Up @@ -120,7 +120,7 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu
"main")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__s3_main.volumes.main.disk",
"s3_cache") */
"s3_cache")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.type",
"hdfs_gluten")
Expand Down
22 changes: 13 additions & 9 deletions cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,25 +79,23 @@ void CacheManager::cachePart(const MergeTreeTable& table, const MergeTreePart& p
job_context.table.parts.clear();
job_context.table.parts.push_back(part);
job_context.table.snapshot_id = "";
auto job = [job_detail = job_context, context = this->context, columns = columns, latch = latch]()
auto job = [job_detail = job_context, context = this->context, read_columns = columns, latch = latch]()
{
try
{
SCOPE_EXIT({ if (latch) latch->count_down();});
auto storage = MergeTreeRelParser::parseStorage(job_detail.table, context, true);
auto storage_snapshot = std::make_shared<StorageSnapshot>(*storage, storage->getInMemoryMetadataPtr());
NamesAndTypesList names_and_types_list;
for (const auto & column : storage->getInMemoryMetadata().getColumns())
auto meta_columns = storage->getInMemoryMetadata().getColumns();
for (const auto & column : meta_columns)
{
if (columns.contains(column.name))
if (read_columns.contains(column.name))
names_and_types_list.push_back(NameAndTypePair(column.name, column.type));
}

auto query_info = buildQueryInfo(names_and_types_list);

std::vector<DataPartPtr> selected_parts
= StorageMergeTreeFactory::getDataPartsByNames(storage->getStorageID(), "", {job_detail.table.parts.front().name});

auto read_step = storage->reader.readFromParts(
selected_parts,
/* alter_conversions = */
Expand All @@ -111,9 +109,15 @@ void CacheManager::cachePart(const MergeTreeTable& table, const MergeTreePart& p
QueryPlan plan;
plan.addStep(std::move(read_step));
auto pipeline_builder = plan.buildQueryPipeline({}, {});
pipeline_builder->setSinks([&](const auto & header, auto ) {return std::make_shared<NullSink>(header);});
auto executor = pipeline_builder->execute();
executor->execute(1, true);
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*pipeline_builder.get()));
PullingPipelineExecutor executor(pipeline);
while (true)
{
Chunk chunk;
if (!executor.pull(chunk))
break;
}
LOG_INFO(getLogger("CacheManager"), "Load cache of table {}.{} part {} success.", job_detail.table.database, job_detail.table.table, job_detail.table.parts.front().name);
}
catch (std::exception& e)
{
Expand Down
20 changes: 7 additions & 13 deletions cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,15 @@ StorageMergeTreeFactory::getStorage(const StorageID& id, const String & snapshot
std::lock_guard lock(storage_map_mutex);

merge_tree_table.parts.clear();
auto new_storage = creator();
if (storage_map->has(table_name) && !storage_map->get(table_name)->second.sameStructWith(merge_tree_table))
{
// freeStorage(id);
if (storage_map->has(table_name))
storage_map->remove(table_name);
{
std::lock_guard lock(datapart_mutex);
if (datapart_map->has(table_name))
datapart_map->remove(table_name);
}
freeStorage(id);
std::lock_guard lock_datapart(datapart_mutex);
if (datapart_map->has(table_name))
datapart_map->remove(table_name);
}

if (!storage_map->has(table_name))
storage_map->add(table_name, {new_storage, merge_tree_table});
storage_map->add(table_name, {creator(), merge_tree_table});
return storage_map->get(table_name)->first;
}

Expand Down Expand Up @@ -129,7 +123,7 @@ DataPartsVector StorageMergeTreeFactory::getDataPartsByNames(const StorageID & i
// will be inited in native init phase
std::unique_ptr<Poco::LRUCache<std::string, std::pair<CustomStorageMergeTreePtr, MergeTreeTable>>> StorageMergeTreeFactory::storage_map = nullptr;
std::unique_ptr<Poco::LRUCache<std::string, std::shared_ptr<Poco::LRUCache<std::string, DataPartPtr>>>> StorageMergeTreeFactory::datapart_map = nullptr;
std::mutex StorageMergeTreeFactory::storage_map_mutex;
std::mutex StorageMergeTreeFactory::datapart_mutex;
std::recursive_mutex StorageMergeTreeFactory::storage_map_mutex;
std::recursive_mutex StorageMergeTreeFactory::datapart_mutex;

}
4 changes: 2 additions & 2 deletions cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ class StorageMergeTreeFactory
private:
static std::unique_ptr<Poco::LRUCache<std::string, std::pair<CustomStorageMergeTreePtr, MergeTreeTable>>> storage_map;
static std::unique_ptr<Poco::LRUCache<std::string, std::shared_ptr<Poco::LRUCache<std::string, DataPartPtr>>>> datapart_map;
static std::mutex storage_map_mutex;
static std::mutex datapart_mutex;
static std::recursive_mutex storage_map_mutex;
static std::recursive_mutex datapart_mutex;
};

struct TempStorageFreer
Expand Down

0 comments on commit 173ebcb

Please sign in to comment.