Skip to content

Commit

Permalink
list-objects: enforce intra-cluster access, validate
Browse files Browse the repository at this point in the history
* in fact, all 'GET /bucket' calls, including summary
* with refactoring

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Dec 9, 2023
1 parent 592981a commit 37101ed
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 55 deletions.
30 changes: 29 additions & 1 deletion ais/test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package integration_test

import (
"errors"
"math/rand"
"os"
"path/filepath"
Expand Down Expand Up @@ -1092,7 +1093,7 @@ func TestMountpathDisableAll(t *testing.T) {
tools.WaitForResilvering(t, baseParams, target)

tlog.Logf("waiting for bucket %s to show up on all targets\n", m.bck)
err = tools.WaitForBucket(m.proxyURL, cmn.QueryBcks(m.bck), true /*exists*/)
err = checkBMDsFor(m.proxyURL, m.bck)
tassert.CheckFatal(t, err)

// Put and read random files
Expand All @@ -1103,6 +1104,33 @@ func TestMountpathDisableAll(t *testing.T) {
m.ensureNumMountpaths(target, origMountpaths)
}

// TODO: instead, need target query w/ access control
func checkBMDsFor(proxyURL string, bck cmn.Bck) error {
bp := tools.BaseAPIParams(proxyURL)
smap, err := api.GetClusterMap(bp)
if err != nil {
return err
}
to := time.Now().Add(10 * time.Second)
b := meta.CloneBck(&bck)
for _, s := range smap.Pmap {
for {
bmd, err := api.GetBMD(tools.BaseAPIParams(s.URL(cmn.NetPublic)))
if err != nil {
return err
}
if _, bucketExists := bmd.Get(b); bucketExists {
break
}
if time.Now().After(to) {
return errors.New("checkBMDsFor: timeout")
}
time.Sleep(time.Second)
}
}
return nil
}

func TestForwardCP(t *testing.T) {
m := ioContext{
t: t,
Expand Down
53 changes: 32 additions & 21 deletions ais/tgtbck.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ func (t *target) httpbckget(w http.ResponseWriter, r *http.Request, dpq *dpq) {
if err != nil {
return
}
if err = t.isIntraCall(r.Header, false); err != nil {
t.writeErr(w, r, err)
return
}
msg, err := t.readAisMsg(w, r)
if err != nil {
return
Expand Down Expand Up @@ -85,8 +89,20 @@ func (t *target) httpbckget(w http.ResponseWriter, r *http.Request, dpq *dpq) {
return
}
}
begin := mono.NanoTime()
if ok := t.listObjects(w, r, bck, msg); !ok {
var (
begin = mono.NanoTime()
lsmsg *apc.LsoMsg
)
if err := cos.MorphMarshal(msg.Value, &lsmsg); err != nil {
t.writeErrf(w, r, cmn.FmtErrMorphUnmarshal, t.si, msg.Action, msg.Value, err)
return
}
if !cos.IsValidUUID(lsmsg.UUID) {
debug.Assert(false, lsmsg.UUID)
t.writeErrf(w, r, "list-objects: invalid UUID %q", lsmsg.UUID)
return
}
if ok := t.listObjects(w, r, bck, lsmsg); !ok {
t.statsT.IncErr(stats.ListCount)
return
}
Expand Down Expand Up @@ -237,42 +253,37 @@ func (t *target) blist(qbck *cmn.QueryBcks, config *cmn.Config, bmd *bucketMD) (

// returns `cmn.LsoResult` containing object names and (requested) props
// control/scope - via `apc.LsoMsg`
func (t *target) listObjects(w http.ResponseWriter, r *http.Request, bck *meta.Bck, actMsg *aisMsg) (ok bool) {
var msg *apc.LsoMsg
if err := cos.MorphMarshal(actMsg.Value, &msg); err != nil {
t.writeErrf(w, r, cmn.FmtErrMorphUnmarshal, t.si, actMsg.Action, actMsg.Value, err)
return
}
if !bck.IsAIS() && !msg.IsFlagSet(apc.LsObjCached) {
func (t *target) listObjects(w http.ResponseWriter, r *http.Request, bck *meta.Bck, lsmsg *apc.LsoMsg) (ok bool) {
if !bck.IsAIS() && !lsmsg.IsFlagSet(apc.LsObjCached) {
maxRemotePageSize := t.Backend(bck).MaxPageSize()
if msg.PageSize > maxRemotePageSize {
t.writeErrf(w, r, "page size %d exceeds the supported maximum (%d)", msg.PageSize, maxRemotePageSize)
if lsmsg.PageSize > maxRemotePageSize {
t.writeErrf(w, r, "page size %d exceeds the supported maximum (%d)", lsmsg.PageSize, maxRemotePageSize)
return false
}
if msg.PageSize == 0 {
msg.PageSize = maxRemotePageSize
if lsmsg.PageSize == 0 {
lsmsg.PageSize = maxRemotePageSize
}
}
debug.Assert(msg.PageSize > 0 && msg.PageSize < 100000 && cos.IsValidUUID(msg.UUID))
debug.Assert(lsmsg.PageSize > 0 && lsmsg.PageSize < 100000)

// (advanced) user-selected target to execute remote ls
if msg.SID != "" {
if lsmsg.SID != "" {
smap := t.owner.smap.get()
if smap.GetTarget(msg.SID) == nil {
err := &errNodeNotFound{"list-objects failure:", msg.SID, t.si, smap}
if smap.GetTarget(lsmsg.SID) == nil {
err := &errNodeNotFound{"list-objects failure:", lsmsg.SID, t.si, smap}
t.writeErr(w, r, err)
return
}
}

var (
xctn cluster.Xact
rns = xreg.RenewLso(t, bck, msg.UUID, msg)
rns = xreg.RenewLso(t, bck, lsmsg.UUID, lsmsg)
)
// check that xaction hasn't finished prior to this page read, restart if needed
if rns.Err == xs.ErrGone {
runtime.Gosched()
rns = xreg.RenewLso(t, bck, msg.UUID, msg)
rns = xreg.RenewLso(t, bck, lsmsg.UUID, lsmsg)
}
if rns.Err != nil {
t.writeErr(w, r, rns.Err)
Expand All @@ -286,12 +297,12 @@ func (t *target) listObjects(w http.ResponseWriter, r *http.Request, bck *meta.B
xls := xctn.(*xs.LsoXact)

// NOTE: blocking next-page request
resp := xls.Do(msg)
resp := xls.Do(lsmsg)
if resp.Err != nil {
t.writeErr(w, r, resp.Err, resp.Status)
return false
}
debug.Assert(resp.Lst.UUID == msg.UUID)
debug.Assert(resp.Lst.UUID == lsmsg.UUID)

// TODO: `Flags` have limited usability, consider to remove
marked := xreg.GetRebMarked()
Expand Down
8 changes: 3 additions & 5 deletions api/ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,9 @@ type (
)

// ListBuckets returns buckets for provided query, where
// - `fltPresence` is one of { apc.FltExists, apc.FltPresent, ... } - see api/apc/query.go
// - ListBuckets utiizes `cmn.QueryBcks` - control structure that's practically identical to `cmn.Bck`,
// except for the fact that some or all its fields can be empty (to facilitate the corresponding
// query).
//
// - `fltPresence` is one of { apc.FltExists, apc.FltPresent, ... } - see api/apc/query.go
// - ListBuckets utilizes `cmn.QueryBcks` - control structure that's practically identical to `cmn.Bck`,
// except for the fact that some or all its fields can be empty (to facilitate the corresponding query).
// See also: QueryBuckets, ListObjects
func ListBuckets(bp BaseParams, qbck cmn.QueryBcks, fltPresence int) (cmn.Bcks, error) {
q := make(url.Values, 4)
Expand Down
27 changes: 0 additions & 27 deletions tools/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,33 +464,6 @@ func BaseAPIParams(urls ...string) api.BaseParams {
return api.BaseParams{Client: gctx.Client, URL: u, Token: LoggedUserToken, UA: "tools/test"}
}

// waitForBucket waits until all targets ack having ais bucket created or deleted
func WaitForBucket(proxyURL string, query cmn.QueryBcks, exists bool) error {
bp := BaseAPIParams(proxyURL)
smap, err := api.GetClusterMap(bp)
if err != nil {
return err
}
to := time.Now().Add(bucketTimeout)
for _, s := range smap.Tmap {
for {
bp := BaseAPIParams(s.URL(cmn.NetPublic))
bucketExists, err := api.QueryBuckets(bp, query, apc.FltExists)
if err != nil {
return err
}
if bucketExists == exists {
break
}
if time.Now().After(to) {
return fmt.Errorf("wait for ais bucket timed out, target = %s", bp.URL)
}
time.Sleep(time.Second)
}
}
return nil
}

func EvictObjects(t *testing.T, proxyURL string, bck cmn.Bck, objList []string) {
bp := BaseAPIParams(proxyURL)
xid, err := api.EvictList(bp, bck, objList)
Expand Down
1 change: 0 additions & 1 deletion tools/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ const (

const (
registerTimeout = time.Minute * 2
bucketTimeout = time.Minute
)

type (
Expand Down

0 comments on commit 37101ed

Please sign in to comment.