Skip to content

Commit

Permalink
enhance: improve bloomfilter performance
Browse files Browse the repository at this point in the history
Signed-off-by: xiaofanluan <[email protected]>
  • Loading branch information
xiaofan-luan committed Feb 10, 2025
1 parent 1f14053 commit e5fc472
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 16 deletions.
4 changes: 2 additions & 2 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ queryNode:
enableSegmentPrune: false # use partition stats to prune data in search/query on shard delegator
queryStreamBatchSize: 4194304 # return min batch size of stream query
queryStreamMaxBatchSize: 134217728 # return max batch size of stream query
bloomFilterApplyParallelFactor: 4 # parallel factor when to apply pk to bloom filter, default to 4*CPU_CORE_NUM
bloomFilterApplyParallelFactor: 2 # parallel factor when to apply pk to bloom filter, default to 2*CPU_CORE_NUM
workerPooling:
size: 10 # the size for worker querynode client pool
ip: # TCP/IP address of queryNode. If not specified, use the first unicastable address
Expand Down Expand Up @@ -694,7 +694,7 @@ dataNode:
clusteringCompaction:
memoryBufferRatio: 0.3 # The ratio of memory buffer of clustering compaction. Data larger than threshold will be flushed to storage.
workPoolSize: 8 # worker pool size for one clustering compaction job.
bloomFilterApplyParallelFactor: 4 # parallel factor when to apply pk to bloom filter, default to 4*CPU_CORE_NUM
bloomFilterApplyParallelFactor: 2 # parallel factor when to apply pk to bloom filter, default to 2*CPU_CORE_NUM
storage:
deltalog: json # deltalog format, options: [json, parquet]
ip: # TCP/IP address of dataNode. If not specified, use the first unicastable address
Expand Down
10 changes: 2 additions & 8 deletions internal/storage/pk_statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,9 @@ func (st *PkStatistics) BatchPkExist(lc *BatchLocationsCache, hits []bool) []boo

// check bf first, TestLocation just do some bitset compute, cost is cheaper
locations := lc.Locations(st.PkFilter.K(), st.PkFilter.Type())
ret := st.PkFilter.BatchTestLocations(locations, hits)

// todo: a bit ugly, hits[i]'s value will depends on multi bf in single segment,
// hits array will be removed after we merge bf in segment
pks := lc.PKs()
for i := range ret {
if !hits[i] {
hits[i] = ret[i] && st.MinPK.LE(pks[i]) && st.MaxPK.GE(pks[i])
}
for i, hit := range hits {
hits[i] = hit || st.PkFilter.TestLocations(locations[i]) && st.MinPK.LE(pks[i]) && st.MaxPK.GE(pks[i])
}

return hits
Expand Down
10 changes: 5 additions & 5 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ This helps Milvus-CDC synchronize incremental data`,
p.BloomFilterApplyBatchSize = ParamItem{
Key: "common.bloomFilterApplyBatchSize",
Version: "2.4.5",
DefaultValue: "1000",
DefaultValue: "10000",
Doc: "batch size when to apply pk to bloom filter",
Export: true,
}
Expand Down Expand Up @@ -3307,8 +3307,8 @@ user-task-polling:
Key: "queryNode.bloomFilterApplyParallelFactor",
FallbackKeys: []string{"queryNode.bloomFilterApplyBatchSize"},
Version: "2.4.5",
DefaultValue: "4",
Doc: "parallel factor when to apply pk to bloom filter, default to 4*CPU_CORE_NUM",
DefaultValue: "2",
Doc: "parallel factor when to apply pk to bloom filter, default to 2*CPU_CORE_NUM",
Export: true,
}
p.BloomFilterApplyParallelFactor.Init(base.mgr)
Expand Down Expand Up @@ -4792,8 +4792,8 @@ if this parameter <= 0, will set it as 10`,
Key: "dataNode.bloomFilterApplyParallelFactor",
FallbackKeys: []string{"datanode.bloomFilterApplyBatchSize"},
Version: "2.4.5",
DefaultValue: "4",
Doc: "parallel factor when to apply pk to bloom filter, default to 4*CPU_CORE_NUM",
DefaultValue: "2",
Doc: "parallel factor when to apply pk to bloom filter, default to 2*CPU_CORE_NUM",
Export: true,
}
p.BloomFilterApplyParallelFactor.Init(base.mgr)
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ func TestComponentParam(t *testing.T) {
params.Save("datanode.clusteringCompaction.workPoolSize", "2")
assert.Equal(t, int64(2), Params.ClusteringCompactionWorkerPoolSize.GetAsInt64())

assert.Equal(t, 4, Params.BloomFilterApplyParallelFactor.GetAsInt())
assert.Equal(t, 2, Params.BloomFilterApplyParallelFactor.GetAsInt())
})

t.Run("test indexNodeConfig", func(t *testing.T) {
Expand Down

0 comments on commit e5fc472

Please sign in to comment.