From e5fc4720386239492c5f12e76b2e5b059c36f5b3 Mon Sep 17 00:00:00 2001 From: xiaofanluan Date: Sun, 9 Feb 2025 22:48:28 +0800 Subject: [PATCH] enhance: improve bloomfilter performance Signed-off-by: xiaofanluan --- configs/milvus.yaml | 4 ++-- internal/storage/pk_statistics.go | 10 ++-------- pkg/util/paramtable/component_param.go | 10 +++++----- pkg/util/paramtable/component_param_test.go | 2 +- 4 files changed, 10 insertions(+), 16 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index cc02fee17e4d6..677bb3c222bf8 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -482,7 +482,7 @@ queryNode: enableSegmentPrune: false # use partition stats to prune data in search/query on shard delegator queryStreamBatchSize: 4194304 # return min batch size of stream query queryStreamMaxBatchSize: 134217728 # return max batch size of stream query - bloomFilterApplyParallelFactor: 4 # parallel factor when to apply pk to bloom filter, default to 4*CPU_CORE_NUM + bloomFilterApplyParallelFactor: 2 # parallel factor when to apply pk to bloom filter, default to 2*CPU_CORE_NUM workerPooling: size: 10 # the size for worker querynode client pool ip: # TCP/IP address of queryNode. If not specified, use the first unicastable address @@ -694,7 +694,7 @@ dataNode: clusteringCompaction: memoryBufferRatio: 0.3 # The ratio of memory buffer of clustering compaction. Data larger than threshold will be flushed to storage. workPoolSize: 8 # worker pool size for one clustering compaction job. - bloomFilterApplyParallelFactor: 4 # parallel factor when to apply pk to bloom filter, default to 4*CPU_CORE_NUM + bloomFilterApplyParallelFactor: 2 # parallel factor when to apply pk to bloom filter, default to 2*CPU_CORE_NUM storage: deltalog: json # deltalog format, options: [json, parquet] ip: # TCP/IP address of dataNode. If not specified, use the first unicastable address diff --git a/internal/storage/pk_statistics.go b/internal/storage/pk_statistics.go index f984583e755db..445338e91b6c5 100644 --- a/internal/storage/pk_statistics.go +++ b/internal/storage/pk_statistics.go @@ -149,15 +149,9 @@ func (st *PkStatistics) BatchPkExist(lc *BatchLocationsCache, hits []bool) []boo // check bf first, TestLocation just do some bitset compute, cost is cheaper locations := lc.Locations(st.PkFilter.K(), st.PkFilter.Type()) - ret := st.PkFilter.BatchTestLocations(locations, hits) - - // todo: a bit ugly, hits[i]'s value will depends on multi bf in single segment, - // hits array will be removed after we merge bf in segment pks := lc.PKs() - for i := range ret { - if !hits[i] { - hits[i] = ret[i] && st.MinPK.LE(pks[i]) && st.MaxPK.GE(pks[i]) - } + for i, hit := range hits { + hits[i] = hit || st.PkFilter.TestLocations(locations[i]) && st.MinPK.LE(pks[i]) && st.MaxPK.GE(pks[i]) } return hits diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 5e07f792ee2cf..bb6d59dcbb66c 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -859,7 +859,7 @@ This helps Milvus-CDC synchronize incremental data`, p.BloomFilterApplyBatchSize = ParamItem{ Key: "common.bloomFilterApplyBatchSize", Version: "2.4.5", - DefaultValue: "1000", + DefaultValue: "10000", Doc: "batch size when to apply pk to bloom filter", Export: true, } @@ -3307,8 +3307,8 @@ user-task-polling: Key: "queryNode.bloomFilterApplyParallelFactor", FallbackKeys: []string{"queryNode.bloomFilterApplyBatchSize"}, Version: "2.4.5", - DefaultValue: "4", - Doc: "parallel factor when to apply pk to bloom filter, default to 4*CPU_CORE_NUM", + DefaultValue: "2", + Doc: "parallel factor when to apply pk to bloom filter, default to 2*CPU_CORE_NUM", Export: true, } p.BloomFilterApplyParallelFactor.Init(base.mgr) @@ -4792,8 +4792,8 @@ if this parameter <= 0, will set it as 10`, Key: "dataNode.bloomFilterApplyParallelFactor", FallbackKeys: []string{"datanode.bloomFilterApplyBatchSize"}, Version: "2.4.5", - DefaultValue: "4", - Doc: "parallel factor when to apply pk to bloom filter, default to 4*CPU_CORE_NUM", + DefaultValue: "2", + Doc: "parallel factor when to apply pk to bloom filter, default to 2*CPU_CORE_NUM", Export: true, } p.BloomFilterApplyParallelFactor.Init(base.mgr) diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 4bc48f933fad3..9c8c514cf3d5e 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -600,7 +600,7 @@ func TestComponentParam(t *testing.T) { params.Save("datanode.clusteringCompaction.workPoolSize", "2") assert.Equal(t, int64(2), Params.ClusteringCompactionWorkerPoolSize.GetAsInt64()) - assert.Equal(t, 4, Params.BloomFilterApplyParallelFactor.GetAsInt()) + assert.Equal(t, 2, Params.BloomFilterApplyParallelFactor.GetAsInt()) }) t.Run("test indexNodeConfig", func(t *testing.T) {