From 312a6488d83be062ef35d1fee497ca2204803bcc Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Tue, 10 Dec 2024 12:46:24 -0500 Subject: [PATCH] global rebalance vs out-of-band updates (major) Signed-off-by: Alex Aizman --- reb/recv.go | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/reb/recv.go b/reb/recv.go index 999d95bcc57..c709a4aff7b 100644 --- a/reb/recv.go +++ b/reb/recv.go @@ -170,11 +170,55 @@ func (reb *Reb) recvObjRegular(hdr *transport.ObjHdr, smap *meta.Smap, unpacker nlog.Errorf("%s g[%d]: early receive from %s %s (stage %s)", core.T, reb.RebID(), meta.Tname(tsid), lom, stages[stage]) } + // + // when destination exists + // + if lom.Load(false, false) == nil { + if lom.CheckEq(&hdr.ObjAttrs) == nil { + // no-op: optimize-out duplicated write + cos.DrainReader(objReader) + return reb.regACK(smap, hdr, tsid) + } + if lom.Bck().IsRemote() { + oa, ecode, err := core.T.HeadCold(lom, nil) + if err == nil { + // receive latest version from tsid + if oa.CheckEq(&hdr.ObjAttrs) == nil { + goto rx // go ahead to overwrite this lom + } + } + if cos.IsNotExist(err, ecode) && lom.VersionConf().Sync { + // try to delete in place (TODO: compare with lom.CheckRemoteMD; unify) + locked := lom.TryLock(true) + errDel := lom.RemoveObj(true) + if locked { + lom.Unlock(true) + } + if errDel != nil { + nlog.Errorf("%s g[%d]: failed to sync-delete %s: %v", core.T, reb.RebID(), lom, errDel) + } else if cmn.Rom.FastV(5, cos.SmoduleReb) { + nlog.Infof("%s g[%d]: sync-deleted %s", core.T, reb.RebID(), lom) + } + } + } + + // cannot choose between the source and the destination + if cmn.Rom.FastV(5, cos.SmoduleReb) { + nlog.Warningf("%s g[%d]: recv ambiguity (%s, %s) vs (%s) - dropping, discarding", core.T, reb.RebID(), + lom, lom.ObjAttrs().String(), hdr.ObjAttrs.String()) + } + cos.DrainReader(objReader) + return reb.regACK(smap, hdr, tsid) + } + +rx: lom.CopyAttrs(&hdr.ObjAttrs, true /*skip-checksum*/) // see "PUT is a no-op" + xreb := reb.xctn() if xreb.IsAborted() { return nil } + params := core.AllocPutParams() { params.WorkTag = fs.WorkfilePut @@ -194,6 +238,10 @@ func (reb *Reb) recvObjRegular(hdr *transport.ObjHdr, smap *meta.Smap, unpacker xreb.InObjsAdd(1, hdr.ObjAttrs.Size) // ACK + return reb.regACK(smap, hdr, tsid) +} + +func (reb *Reb) regACK(smap *meta.Smap, hdr *transport.ObjHdr, tsid string) error { tsi := smap.GetTarget(tsid) if tsi == nil { err := fmt.Errorf("g[%d]: %s is not in the %s", reb.RebID(), meta.Tname(tsid), smap)