Skip to content

Commit

Permalink
extend 'copy bucket' to sync remote
Browse files Browse the repository at this point in the history
* part two

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Dec 13, 2023
1 parent 6be0b2d commit bec1dd5
Show file tree
Hide file tree
Showing 11 changed files with 53 additions and 35 deletions.
1 change: 1 addition & 0 deletions ais/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -1354,6 +1354,7 @@ func (t *target) objMv(lom *cluster.LOM, msg *apc.ActMsg) (err error) {
coi.objnameTo = msg.Name /* new object name */
coi.buf = buf
coi.t = t
coi.config = cmn.GCO.Get()
coi.owt = cmn.OwtMigrate
coi.finalize = true
}
Expand Down
11 changes: 6 additions & 5 deletions ais/tgtimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,19 @@ func (t *target) HeadObjT2T(lom *cluster.LOM, si *meta.Snode) bool {
// the AIS cluster (by performing a cold GET if need be).
// - if the dst is cloud, we perform a regular PUT logic thus also making sure that the new
// replica gets created in the cloud bucket of _this_ AIS cluster.
func (t *target) CopyObject(lom *cluster.LOM, dm cluster.DataMover, dp cluster.DP, xact cluster.Xact,
bckTo *meta.Bck, objnameTo string, buf []byte, dryRun bool) (size int64, err error) {
func (t *target) CopyObject(lom *cluster.LOM, dm cluster.DataMover, dp cluster.DP, xact cluster.Xact, config *cmn.Config,
bckTo *meta.Bck, objnameTo string, buf []byte, dryRun, syncRemote bool) (size int64, err error) {
coi := allocCOI()
{
coi.dm = dm
coi.dp = dp
coi.xact = xact
coi.config = config
coi.bckTo = bckTo
coi.objnameTo = objnameTo
coi.buf = buf
coi.dryRun = dryRun
coi.syncRemote = syncRemote
// defaults
coi.t = t
coi.owt = cmn.OwtMigrate
Expand All @@ -146,9 +148,6 @@ func (t *target) CopyObject(lom *cluster.LOM, dm cluster.DataMover, dp cluster.D
coi.objnameTo = lom.ObjName
}

// TODO -- FIXME: must be provided by the caller
coi.copyRemote = lom.Bck().Equal(coi.bckTo, true /*same ID*/, true /*same backend*/) && lom.ObjName == coi.objnameTo

switch {
case dp != nil: // 1. w/ transformation
size, err = coi.copyReader(lom)
Expand Down Expand Up @@ -308,6 +307,7 @@ func (t *target) _promLocal(params *cluster.PromoteParams, lom *cluster.LOM) (fi
{
poi.atime = time.Now().UnixNano()
poi.t = t
poi.config = params.Config
poi.lom = lom
poi.workFQN = workFQN
poi.owt = cmn.OwtPromote
Expand Down Expand Up @@ -335,6 +335,7 @@ func (t *target) _promRemote(params *cluster.PromoteParams, lom *cluster.LOM, ts
coi.bckTo = lom.Bck()
coi.owt = cmn.OwtPromote
coi.xact = params.Xact
coi.config = params.Config
}
size, err := coi.sendRemote(lom, lom.ObjName, tsi)
freeCOI(coi)
Expand Down
17 changes: 9 additions & 8 deletions ais/tgtobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,14 @@ type (
dp cluster.DP
xact cluster.Xact
t *target
config *cmn.Config
bckTo *meta.Bck
objnameTo string
buf []byte
owt cmn.OWT
finalize bool // copies and EC (as in poi.finalize())
dryRun bool
copyRemote bool // as is: gs://abc => gs://abc
syncRemote bool // as is: gs://abc => gs://abc
}
sendArgs struct {
reader cos.ReadOpenCloser
Expand Down Expand Up @@ -1265,8 +1266,9 @@ func (a *apndOI) flush() (int, error) {
}

params := cluster.PromoteParams{
Bck: a.lom.Bck(),
Cksum: partialCksum,
Bck: a.lom.Bck(),
Cksum: partialCksum,
Config: a.config,
PromoteArgs: cluster.PromoteArgs{
SrcFQN: a.hdl.workFQN,
ObjName: a.lom.ObjName,
Expand Down Expand Up @@ -1450,16 +1452,16 @@ func (coi *copyOI) putReader(lom, dst *cluster.LOM) (size int64, err error) {
{
poi.t = coi.t
poi.lom = dst
poi.config = cmn.GCO.Get() // TODO -- FIXME: called must provide
poi.config = coi.config
poi.r = reader
poi.workFQN = fs.CSM.Gen(dst, fs.WorkfileType, "copy-dp")
poi.atime = lom.Atime().UnixNano()
poi.xctn = coi.xact
// TODO -- FIXME: checksum
}
switch {
case coi.copyRemote:
poi.owt = cmn.OwtCopyRemote
case coi.syncRemote:
poi.owt = cmn.OwtSyncRemote
case coi.dm != nil:
poi.owt = coi.dm.OWT()
default:
Expand Down Expand Up @@ -1613,8 +1615,7 @@ func (coi *copyOI) put(sargs *sendArgs) error {
Header: hdr,
BodyR: sargs.reader,
}
config := cmn.GCO.Get()
req, _, cancel, err := reqArgs.ReqWithTimeout(config.Timeout.SendFile.D())
req, _, cancel, err := reqArgs.ReqWithTimeout(coi.config.Timeout.SendFile.D())
if err != nil {
cos.Close(sargs.reader)
return fmt.Errorf("unexpected failure to create request, err: %w", err)
Expand Down
11 changes: 6 additions & 5 deletions ais/tgts3.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ func (t *target) putCopyMpt(w http.ResponseWriter, r *http.Request, config *cmn.
t.putMptPart(w, r, items, q, bck)
}
case r.Header.Get(cos.S3HdrObjSrc) == "":
t.putObjS3(w, r, items, bck)
t.putObjS3(w, r, bck, config, items)
default:
t.copyObjS3(w, r, items)
t.copyObjS3(w, r, config, items)
}
}

// Copy object (maybe from another bucket)
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html
func (t *target) copyObjS3(w http.ResponseWriter, r *http.Request, items []string) {
func (t *target) copyObjS3(w http.ResponseWriter, r *http.Request, config *cmn.Config, items []string) {
if len(items) < 2 {
s3.WriteErr(w, r, errS3Obj, 0)
return
Expand Down Expand Up @@ -139,6 +139,7 @@ func (t *target) copyObjS3(w http.ResponseWriter, r *http.Request, items []strin
coi := allocCOI()
{
coi.t = t
coi.config = config
coi.bckTo = bckDst
coi.objnameTo = s3.ObjName(items)
coi.owt = cmn.OwtMigrate
Expand Down Expand Up @@ -172,7 +173,7 @@ func (t *target) copyObjS3(w http.ResponseWriter, r *http.Request, items []strin
sgl.Free()
}

func (t *target) putObjS3(w http.ResponseWriter, r *http.Request, items []string, bck *meta.Bck) {
func (t *target) putObjS3(w http.ResponseWriter, r *http.Request, bck *meta.Bck, config *cmn.Config, items []string) {
if len(items) < 2 {
s3.WriteErr(w, r, errS3Obj, 0)
return
Expand Down Expand Up @@ -206,7 +207,7 @@ func (t *target) putObjS3(w http.ResponseWriter, r *http.Request, items []string
poi.atime = started.UnixNano()
poi.t = t
poi.lom = lom
poi.config = cmn.GCO.Get()
poi.config = config
poi.skipVC = cmn.Rom.Features().IsSet(feat.SkipVC) || cos.IsParseBool(dpq.skipVC) // apc.QparamSkipVC
poi.restful = true
}
Expand Down
4 changes: 3 additions & 1 deletion ais/tgttxn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,7 @@ func prmScan(dirFQN string, prmMsg *cluster.PromoteArgs) (fqns []string, totalN
// synchronously wo/ xaction
func (t *target) prmNumFiles(c *txnServerCtx, txnPrm *txnPromote, confirmedFshare bool) error {
smap := t.owner.smap.Get()
config := cmn.GCO.Get()
for _, fqn := range txnPrm.fqns {
objName, err := xs.PrmObjName(fqn, txnPrm.dirFQN, txnPrm.msg.ObjName)
if err != nil {
Expand All @@ -1036,7 +1037,8 @@ func (t *target) prmNumFiles(c *txnServerCtx, txnPrm *txnPromote, confirmedFshar
}
}
params := cluster.PromoteParams{
Bck: c.bck,
Bck: c.bck,
Config: config,
PromoteArgs: cluster.PromoteArgs{
SrcFQN: fqn,
ObjName: objName,
Expand Down
4 changes: 2 additions & 2 deletions cluster/mock/target_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func (*TargetMock) BMDVersionFixup(*http.Request, ...cmn.Bck)
func (*TargetMock) FSHC(error, string) {}
func (*TargetMock) OOS(*fs.CapStatus) fs.CapStatus { return fs.CapStatus{} }

func (*TargetMock) CopyObject(*cluster.LOM, cluster.DataMover, cluster.DP, cluster.Xact, *meta.Bck, string, []byte,
bool) (int64, error) {
func (*TargetMock) CopyObject(*cluster.LOM, cluster.DataMover, cluster.DP, cluster.Xact, *cmn.Config, *meta.Bck, string, []byte,
bool, bool) (int64, error) {
return 0, nil
}

Expand Down
13 changes: 7 additions & 6 deletions cluster/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ type (
Promote(params *PromoteParams) (errCode int, err error)
HeadObjT2T(lom *LOM, si *meta.Snode) bool

CopyObject(lom *LOM, dm DataMover, dp DP, xact Xact, bckTo *meta.Bck, objnameTo string, buf []byte,
dryRun bool) (int64, error)
CopyObject(lom *LOM, dm DataMover, dp DP, xact Xact, config *cmn.Config, bckTo *meta.Bck, objnameTo string, buf []byte,
dryRun, syncRemote bool) (int64, error)
}

TargetExt interface {
Expand Down Expand Up @@ -126,9 +126,10 @@ type (
SrcIsNotFshare bool `json:"notshr,omitempty"` // the source is not a file share equally accessible by all targets
}
PromoteParams struct {
Bck *meta.Bck // destination bucket
Cksum *cos.Cksum // checksum to validate
Xact Xact // responsible xaction
PromoteArgs // all of the above
Bck *meta.Bck // destination bucket
Cksum *cos.Cksum // checksum to validate
Config *cmn.Config // during xaction
Xact Xact // responsible xaction
PromoteArgs // all of the above
}
)
2 changes: 1 addition & 1 deletion cmn/owt.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const (
OwtGetLock // lock(exclusive); read from remote; ...
OwtGet // GET (with upgrading read-lock in the local-write path)
OwtGetPrefetchLock // (used for maximum parallelism when prefetching)
OwtCopyRemote // as in: gs://abc => gs://abc
OwtSyncRemote // as in: gs://abc => gs://abc
)

func (owt *OWT) FromS(s string) {
Expand Down
13 changes: 9 additions & 4 deletions mirror/tcb.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ type (
dm *bundle.DataMover
rxlast atomic.Int64 // finishing
xact.BckJog
wg sync.WaitGroup // starting up
refc atomic.Int32 // finishing
wg sync.WaitGroup // starting up
refc atomic.Int32 // finishing
syncRemote bool // when BckFrom = BckTo
}
)

Expand Down Expand Up @@ -73,6 +74,10 @@ func (p *tcbFactory) Start() error {
debug.AssertNoErr(err)
p.xctn = newTCB(p, slab, config)

// sync same-name remote
p.xctn.syncRemote = p.kind != apc.ActETLBck &&
p.args.BckFrom.Equal(p.args.BckTo, true /*same BID*/, true /*same backend*/)

// refcount OpcTxnDone; this target must ve active (ref: ignoreMaintenance)
smap := p.T.Sowner().Get()
if err := p.xctn.InMaintOrDecomm(smap, p.T.Snode()); err != nil {
Expand Down Expand Up @@ -120,7 +125,7 @@ func (p *tcbFactory) WhenPrevIsRunning(prevEntry xreg.Renewable) (wpr xreg.WPR,
err = cmn.NewErrXactUsePrev(prevEntry.Get().String())
return
}
bckEq := prev.args.BckFrom.Equal(p.args.BckFrom, true /*same BID*/, true /* same backend */)
bckEq := prev.args.BckFrom.Equal(p.args.BckFrom, true /*same BID*/, true /*same backend*/)
debug.Assert(bckEq)
debug.Assert(prev.phase == apc.ActBegin && p.phase == apc.ActCommit)
prev.args.Phase = apc.ActCommit // transition
Expand Down Expand Up @@ -226,7 +231,7 @@ func (r *XactTCB) copyObject(lom *cluster.LOM, buf []byte) (err error) {
if r.BckJog.Config.FastV(5, cos.SmoduleMirror) {
nlog.Infof("%s: %s => %s", r.Base.Name(), lom.Cname(), args.BckTo.Cname(toName))
}
_, err = r.p.T.CopyObject(lom, r.dm, args.DP, r, args.BckTo, toName, buf, args.Msg.DryRun)
_, err = r.p.T.CopyObject(lom, r.dm, args.DP, r, r.Config, args.BckTo, toName, buf, args.Msg.DryRun, r.syncRemote)
if err != nil {
if cos.IsErrOOS(err) {
// TODO: call r.Abort() instead
Expand Down
5 changes: 3 additions & 2 deletions xact/xs/dpromote.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ func (r *XactDirPromote) walk(fqn string, de fs.DirEntry) error {
}
}
params := cluster.PromoteParams{
Bck: bck,
Xact: r,
Bck: bck,
Xact: r,
Config: r.Config,
PromoteArgs: cluster.PromoteArgs{
SrcFQN: fqn,
ObjName: objName,
Expand Down
7 changes: 6 additions & 1 deletion xact/xs/tcobjs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type (
args *xreg.TCObjsArgs
workCh chan *cmn.TCObjsMsg
streamingX
syncRemote bool
}
tcowi struct {
r *XactTCObjs
Expand Down Expand Up @@ -86,6 +87,9 @@ func (p *tcoFactory) Start() (err error) {
// unlike apc.ActCopyObjects (where we know the size)
// apc.ActETLObjects (transform) generates arbitrary sizes where we use PDU-based transport
sizePDU = memsys.DefaultBufSize
} else {
// sync same-name remote
r.syncRemote = p.args.BckFrom.Equal(p.args.BckTo, true /*same BID*/, true /*same backend*/)
}
if err = p.newDM(p.Args.UUID /*trname*/, r.recv, r.config, sizePDU); err != nil {
return
Expand Down Expand Up @@ -285,7 +289,8 @@ func (wi *tcowi) do(lom *cluster.LOM, lrit *lriterator) {
// under ETL, the returned sizes of transformed objects are unknown (cos.ContentLengthUnknown)
// until after the transformation; here we are disregarding the size anyway as the stats
// are done elsewhere
_, err := lrit.t.CopyObject(lom, wi.r.p.dm, wi.r.args.DP, wi.r, wi.r.args.BckTo, objNameTo, buf, wi.msg.DryRun)
_, err := lrit.t.CopyObject(lom, wi.r.p.dm, wi.r.args.DP, wi.r, wi.r.config, wi.r.args.BckTo, objNameTo, buf,
wi.msg.DryRun, wi.r.syncRemote)
slab.Free(buf)

if err != nil {
Expand Down

0 comments on commit bec1dd5

Please sign in to comment.