Skip to content

Commit

Permalink
*: migrate to SEARCH with strict equality comparator
Browse files Browse the repository at this point in the history
Close #3670

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Nov 22, 2024
1 parent 5f94654 commit 6f3189f
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 11 deletions.
21 changes: 15 additions & 6 deletions cli/util/upload_bin.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ const (
// except the most recent one are guaranteed to be completed and don't contain gaps.
uploadBatchSize = 10000
// Number of objects to search in a batch. If it is larger than uploadBatchSize,
// it may lead to many duplicate uploads.
searchBatchSize = uploadBatchSize
// it may lead to many duplicate uploads. We need to search with EQ filter to
// avoid partially-completed SEARCH responses.
searchBatchSize = 1
// Size of object ID.
oidSize = sha256.Size
)
Expand Down Expand Up @@ -214,8 +215,12 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID

prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchNumGE)
filters.AddFilter(attributeKey, fmt.Sprintf("%d", endIndex), object.MatchNumLT)
if endIndex == startIndex+1 {
filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchStringEqual)
} else {
filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchNumGE)
filters.AddFilter(attributeKey, fmt.Sprintf("%d", endIndex), object.MatchNumLT)
}

Check warning on line 223 in cli/util/upload_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_bin.go#L218-L223

Added lines #L218 - L223 were not covered by tests
prm.SetFilters(filters)
var (
objectIDs []oid.ID
Expand Down Expand Up @@ -527,8 +532,12 @@ func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, accoun
if len(additionalFilters) != 0 {
filters = additionalFilters[0]
}
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchNumGE)
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT)
if end == start+1 {
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchStringEqual)
} else {
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchNumGE)
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT)
}

Check warning on line 540 in cli/util/upload_bin.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_bin.go#L532-L540

Added lines #L532 - L540 were not covered by tests
prm.SetFilters(filters)

var objIDs []oid.ID
Expand Down
3 changes: 1 addition & 2 deletions docs/neofs-blockstorage.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ parameter.
Depending on the mode, the service either:
- Searches for index files by index file attribute and reads block OIDs from index
file object-by-object.
- Searches batches of blocks directly by block attribute (the batch size is
configured via `OIDBatchSize` parameter).
- Searches blocks one by one directly by block attribute.

Once the OIDs are retrieved, they are immediately redirected to the
block downloading routines for further processing. The channel that
Expand Down
11 changes: 8 additions & 3 deletions pkg/services/blockfetcher/blockfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error {
// fetchOIDsBySearch fetches block OIDs from NeoFS by searching through the Block objects.
func (bfs *Service) fetchOIDsBySearch() error {
startIndex := bfs.chain.BlockHeight()
batchSize := uint32(bfs.cfg.OIDBatchSize)
//We need to search with EQ filter to avoid partially-completed SEARCH responses.
batchSize := uint32(1)

Check warning on line 346 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L345-L346

Added lines #L345 - L346 were not covered by tests

for {
select {
Expand All @@ -351,8 +352,12 @@ func (bfs *Service) fetchOIDsBySearch() error {
default:
prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchNumGE)
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex+batchSize-1), object.MatchNumLE)
if startIndex == startIndex+batchSize-1 {
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchStringEqual)
} else {
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex), object.MatchNumGE)
filters.AddFilter(bfs.cfg.BlockAttribute, fmt.Sprintf("%d", startIndex+batchSize-1), object.MatchNumLE)
}

Check warning on line 360 in pkg/services/blockfetcher/blockfetcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/blockfetcher/blockfetcher.go#L355-L360

Added lines #L355 - L360 were not covered by tests
prm.SetFilters(filters)
ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout)
blockOids, err := bfs.objectSearch(ctx, prm)
Expand Down

0 comments on commit 6f3189f

Please sign in to comment.