Skip to content

Commit

Permalink
list-objects: filter remote pages; add local meta (major)
Browse files Browse the repository at this point in the history
* when listing remote buckets, each target will now return its own
  portion (of entries)
* there is still a single target performing remote call - that didn't
  change
* remove cmn.MergeLso
* refactor

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jan 17, 2025
1 parent 7c1d6b0 commit cadb85b
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 195 deletions.
91 changes: 83 additions & 8 deletions ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2300,7 +2300,6 @@ func (p *proxy) lsObjsA(bck *meta.Bck, lsmsg *apc.LsoMsg) (allEntries *cmn.LsoRe
if lsmsg.PageSize == 0 {
lsmsg.PageSize = apc.MaxPageSizeAIS
}
pageSize := lsmsg.PageSize

actMsgExt = p.newAmsgActVal(apc.ActList, &lsmsg)
args = allocBcArgs()
Expand All @@ -2317,7 +2316,7 @@ func (p *proxy) lsObjsA(bck *meta.Bck, lsmsg *apc.LsoMsg) (allEntries *cmn.LsoRe
// Combine the results.
results = p.bcastGroup(args)
freeBcArgs(args)
lsoResList := make([]*cmn.LsoRes, 0, len(results))
lists := make([]*cmn.LsoRes, 0, len(results))
for _, res := range results {
if res.err != nil {
if res.details == "" || res.details == dfltDetail {
Expand All @@ -2329,12 +2328,14 @@ func (p *proxy) lsObjsA(bck *meta.Bck, lsmsg *apc.LsoMsg) (allEntries *cmn.LsoRe
}
lst := res.v.(*cmn.LsoRes)
if len(lst.Entries) > 0 {
lsoResList = append(lsoResList, lst)
lists = append(lists, lst)
}
}
freeBcastRes(results)

return cmn.ConcatLso(lsoResList, lsmsg, int(pageSize)), nil
page := concatLso(lists, lsmsg)
finLsoA(page, lsmsg)
return page, nil
}

func (p *proxy) lsObjsR(bck *meta.Bck, lsmsg *apc.LsoMsg, hdr http.Header, smap *smapX, tsi *meta.Snode, config *cmn.Config,
Expand Down Expand Up @@ -2400,8 +2401,10 @@ func (p *proxy) lsObjsR(bck *meta.Bck, lsmsg *apc.LsoMsg, hdr http.Header, smap

freeBcArgs(args)

// Combine the results.
resLists := make([]*cmn.LsoRes, 0, len(results))
var (
lists = make([]*cmn.LsoRes, 0, len(results))
nextToken string
)
for _, res := range results {
if res.err != nil {
if res.details == "" || res.details == dfltDetail {
Expand All @@ -2411,11 +2414,16 @@ func (p *proxy) lsObjsR(bck *meta.Bck, lsmsg *apc.LsoMsg, hdr http.Header, smap
freeBcastRes(results)
return nil, err
}
resLists = append(resLists, res.v.(*cmn.LsoRes))
lst := res.v.(*cmn.LsoRes)
debug.Assert(nextToken == "" || nextToken == lst.ContinuationToken)
nextToken = lst.ContinuationToken
lists = append(lists, lst)
}
freeBcastRes(results)

return cmn.MergeLso(resLists, lsmsg, 0), nil
page := concatLso(lists, lsmsg)
page.ContinuationToken = nextToken
return page, nil
}

// http-redirect(with-json-message)
Expand Down Expand Up @@ -3394,3 +3402,70 @@ func (p *proxy) notifyCandidate(npsi *meta.Snode, smap *smapX) {
req.Header.Set(apc.HdrCallerSmapVer, smap.vstr)
g.client.control.Do(req) //nolint:bodyclose // exiting
}

//
// list-objects helpers
//

func concatLso(lists []*cmn.LsoRes, lsmsg *apc.LsoMsg) (objs *cmn.LsoRes) {
objs = &cmn.LsoRes{
UUID: lsmsg.UUID,
}
if len(lists) == 0 {
return objs
}

var entryCount int
for _, l := range lists {
objs.Flags |= l.Flags
entryCount += len(l.Entries)
}
if entryCount == 0 {
return objs
}
objs.Entries = make(cmn.LsoEntries, 0, entryCount)
for _, l := range lists {
objs.Entries = append(objs.Entries, l.Entries...)
clear(l.Entries)
}

// For corner case: we have objects with replicas on page threshold
// we have to sort taking status into account. Otherwise wrong
// one(Status=moved) may get into the response
cmn.SortLso(objs.Entries)
return objs
}

func finLsoA(objs *cmn.LsoRes, lsmsg *apc.LsoMsg) {
maxSize := int(lsmsg.PageSize)
// when recursion is disabled (apc.LsNoRecursion)
// the result _may_ include duplicated names of the virtual subdirectories
if lsmsg.IsFlagSet(apc.LsNoRecursion) {
objs.Entries = dedupLso(objs.Entries, maxSize, false /*no-dirs*/)
}
if len(objs.Entries) >= maxSize {
objs.Entries = objs.Entries[:maxSize]
clear(objs.Entries[maxSize:])
objs.ContinuationToken = objs.Entries[len(objs.Entries)-1].Name
}
}

func dedupLso(entries cmn.LsoEntries, maxSize int, noDirs bool) []*cmn.LsoEnt {
var j int
for _, en := range entries {
if j > 0 && entries[j-1].Name == en.Name {
continue
}

debug.Assert(!(noDirs && en.IsDir())) // expecting backends for filter out accordingly

entries[j] = en
j++

if maxSize > 0 && j == maxSize {
break
}
}
clear(entries[j:])
return entries[:j]
}
171 changes: 0 additions & 171 deletions cmn/objlist_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"github.com/NVIDIA/aistore/cmn/debug"
)

var nilEntry LsoEnt

////////////////
// LsoEntries //
////////////////
Expand All @@ -26,38 +24,6 @@ func (entries LsoEntries) cmp(i, j int) bool {
return eni.less(enj)
}

func appSorted(entries LsoEntries, ne *LsoEnt) LsoEntries {
for i, eni := range entries {
if eni.IsDir() != ne.IsDir() {
if eni.IsDir() {
continue
}
} else if ne.Name > eni.Name {
continue
}
// dedup
if ne.Name == eni.Name {
if ne.Status() < eni.Status() {
entries[i] = ne
}
return entries
}
// append or insert
if i == len(entries)-1 {
entries = append(entries, ne)
entries[i], entries[i+1] = entries[i+1], entries[i]
return entries
}
entries = append(entries, &nilEntry)
copy(entries[i+1:], entries[i:]) // shift right
entries[i] = ne
return entries
}

entries = append(entries, ne)
return entries
}

////////////
// LsoEnt //
////////////
Expand Down Expand Up @@ -125,143 +91,6 @@ func (be *LsoEnt) CopyWithProps(propsSet cos.StrSet) (ne *LsoEnt) {

func SortLso(entries LsoEntries) { sort.Slice(entries, entries.cmp) }

func dedupLso(entries LsoEntries, maxSize int, noDirs bool) []*LsoEnt {
var j int
for _, en := range entries {
if j > 0 && entries[j-1].Name == en.Name {
continue
}

debug.Assert(!(noDirs && en.IsDir())) // expecting backends for filter out accordingly

entries[j] = en
j++

if maxSize > 0 && j == maxSize {
break
}
}
clear(entries[j:])
return entries[:j]
}

// MergeLso merges list-objects results received from targets. For the same
// object name (ie., the same object) the corresponding properties are merged.
// If maxSize is greater than 0, the resulting list is sorted and truncated.
func MergeLso(lists []*LsoRes, lsmsg *apc.LsoMsg, maxSize int) *LsoRes {
noDirs := lsmsg.IsFlagSet(apc.LsNoDirs)
if len(lists) == 0 {
return &LsoRes{}
}
resList := lists[0]
token := resList.ContinuationToken
if len(lists) == 1 {
SortLso(resList.Entries)
resList.Entries = dedupLso(resList.Entries, maxSize, noDirs)
resList.ContinuationToken = token
return resList
}

tmp := make(map[string]*LsoEnt, len(resList.Entries)*len(lists))
for _, l := range lists {
resList.Flags |= l.Flags
if token < l.ContinuationToken {
token = l.ContinuationToken
}
for _, en := range l.Entries {
// expecting backends for filter out
debug.Assert(!(noDirs && en.IsDir()))

// add new
entry, exists := tmp[en.Name]
if !exists {
tmp[en.Name] = en
continue
}
// merge existing w/ new props
if !entry.IsPresent() && en.IsPresent() {
en.Version = cos.Left(en.Version, entry.Version)
tmp[en.Name] = en
} else {
entry.Location = cos.Left(entry.Location, en.Location)
entry.Version = cos.Left(entry.Version, en.Version)
}
}
}

// grow cap
for cap(resList.Entries) < len(tmp) {
l := min(len(resList.Entries), len(tmp)-cap(resList.Entries))
resList.Entries = append(resList.Entries, resList.Entries[:l]...)
}

// cleanup and sort
clear(resList.Entries)
resList.Entries = resList.Entries[:0]
resList.ContinuationToken = token

for _, entry := range tmp {
resList.Entries = appSorted(resList.Entries, entry)
}
if maxSize > 0 && len(resList.Entries) > maxSize {
clear(resList.Entries[maxSize:])
resList.Entries = resList.Entries[:maxSize]
}

clear(tmp)
return resList
}

// ConcatLso takes a slice of object lists and concatenates them: all lists
// are appended to the first one.
// If maxSize is greater than 0, the resulting list is sorted and truncated. Zero
// or negative maxSize means returning all objects.
func ConcatLso(lists []*LsoRes, lsmsg *apc.LsoMsg, maxSize int) (objs *LsoRes) {
objs = &LsoRes{
UUID: lsmsg.UUID,
}

if len(lists) == 0 {
return objs
}

entryCount := 0
for _, l := range lists {
objs.Flags |= l.Flags
entryCount += len(l.Entries)
}
if entryCount == 0 {
return objs
}

objs.Entries = make(LsoEntries, 0, entryCount)
for _, l := range lists {
objs.Entries = append(objs.Entries, l.Entries...)
clear(l.Entries)
}

// For corner case: we have objects with replicas on page threshold
// we have to sort taking status into account. Otherwise wrong
// one(Status=moved) may get into the response
SortLso(objs.Entries)

// Remove duplicates
// when recursion is disabled (i.e., lsmsg.IsFlagSet(apc.LsNoRecursion))
// the (`cmn.LsoRes`) result _may_ include duplicated names of the virtual subdirectories
// - that's why:
if lsmsg.IsFlagSet(apc.LsNoRecursion) {
objs.Entries = dedupLso(objs.Entries, maxSize, false /*no-dirs*/)
}

if maxSize > 0 && len(objs.Entries) >= maxSize {
objs.Entries = objs.Entries[:maxSize]
clear(objs.Entries[maxSize:])
objs.ContinuationToken = objs.Entries[len(objs.Entries)-1].Name
}

return
}

// Returns true if the continuation token >= object's name (in other words, the object is
// already listed and must be skipped). Note that string `>=` is lexicographic.
func TokenGreaterEQ(token, objName string) bool { return token >= objName }
Expand Down
10 changes: 7 additions & 3 deletions xact/xs/lso.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,12 +397,16 @@ func (r *LsoXact) nextPageR() (err error) {
// TODO -- FIXME: not counting/sizing (locally) present objects that are missing (deleted?) remotely
if r.walk.this {
nentries := allocLsoEntries()
page, err = npg.nextPageR(nentries, !r.walk.dontPopulate)
page, err = npg.nextPageR(nentries)
if !r.walk.wor && !r.IsAborted() {
if err == nil {
// bcast page
err = r.bcast(page)
} else {
}
if err == nil && !r.walk.dontPopulate {
err = npg.filterAddLmeta(page)
}
if err != nil {
r.sendTerm(r.msg.UUID, nil, err)
}
}
Expand All @@ -417,7 +421,7 @@ func (r *LsoXact) nextPageR() (err error) {
err = rsp.Err
default:
page = rsp.Lst
err = npg.populate(page)
err = npg.filterAddLmeta(page)
}
case <-r.stopCh.Listen():
err = ErrGone
Expand Down
Loading

0 comments on commit cadb85b

Please sign in to comment.