From 8c45ccfb9b542652a2da89f3d1080afa5d794b4f Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Tue, 21 Jan 2025 15:05:06 -0500 Subject: [PATCH] global rebalance: amend streams init/term * add mutex, rm atomic Signed-off-by: Alex Aizman --- ais/test/regression_test.go | 7 ++++++- transport/bundle/dmover.go | 39 ++++++++++++++++++++++--------------- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/ais/test/regression_test.go b/ais/test/regression_test.go index 53440d13036..3817b91e3ed 100644 --- a/ais/test/regression_test.go +++ b/ais/test/regression_test.go @@ -411,8 +411,13 @@ func postRenameWaitAndCheck(t *testing.T, baseParams api.BaseParams, rtd regress tlog.Logf("not found: %s\n", name) } } - t.Fatalf("wrong number of objects in the bucket %s renamed as %s (before: %d. after: %d)", + err := fmt.Errorf("wrong number of objects in the bucket %s renamed as %s (before: %d. after: %d)", rtd.bck.String(), rtd.renamedBck.String(), numPuts, len(unique)) + if rtd.wait { + t.Fatal(err) + } else { + tlog.Logln("Warning: " + err.Error()) + } } } diff --git a/transport/bundle/dmover.go b/transport/bundle/dmover.go index 73cc1e4d49f..9530c0c1f57 100644 --- a/transport/bundle/dmover.go +++ b/transport/bundle/dmover.go @@ -8,6 +8,7 @@ package bundle import ( "errors" "io" + "sync" "time" "github.com/NVIDIA/aistore/api/apc" @@ -38,8 +39,8 @@ type ( multiplier int owt cmn.OWT stage struct { - regred atomic.Bool - regout atomic.Bool + regmtx sync.Mutex + regged atomic.Bool opened atomic.Bool laterx atomic.Bool } @@ -68,7 +69,6 @@ func NewDM(trname string, recvCB transport.RecvObj, owt cmn.OWT, extra Extra) *D dm.owt = owt dm.multiplier = extra.Multiplier dm.sizePDU, dm.maxHdrSize = extra.SizePDU, extra.MaxHdrSize - dm.stage.regout.Store(true) if extra.Compression == "" { extra.Compression = apc.CompressNever @@ -118,12 +118,15 @@ func (dm *DataMover) Renew(trname string, recvCB transport.RecvObj, owt cmn.OWT, // register user's receive-data (and, optionally, receive-ack) wrappers func (dm *DataMover) RegRecv() error { - if !dm.stage.regred.CAS(false, true) { - return errors.New(dm.String() + ": duplicated or early reg-recv") + dm.stage.regmtx.Lock() + defer dm.stage.regmtx.Unlock() + + if dm.stage.regged.Load() { + return errors.New("duplicated reg: " + dm.String()) } if err := transport.Handle(dm.data.trname, dm.wrapRecvData); err != nil { - nlog.Errorln(err, "[", dm.String(), "]") - dm.stage.regred.Store(false) + // (unlikely) + debug.AssertNoErr(err) return err } if dm.useACKs() { @@ -132,12 +135,11 @@ func (dm *DataMover) RegRecv() error { nlog.Errorln("FATAL:", err, "[ nested:", nerr, dm.String(), "]") debug.AssertNoErr(nerr) } - dm.stage.regred.Store(false) return err } } - dm.stage.regout.Store(false) + dm.stage.regged.Store(true) return nil } @@ -145,10 +147,15 @@ func (dm *DataMover) UnregRecv() { if dm == nil { return } - defer dm.stage.regout.Store(true) - if !dm.stage.regred.CAS(true, false) { - return // e.g., 2PC (begin => abort) sequence with no Open + dm.stage.regmtx.Lock() + defer dm.stage.regmtx.Unlock() + + if !dm.stage.regged.Load() { + nlog.WarningDepth(1, "duplicated unreg:", dm.String()) + return } + defer dm.stage.regged.Store(false) + if dm.xctn != nil { timeout := dm.config.Transport.QuiesceTime.D() if dm.xctn.IsAborted() { @@ -167,7 +174,7 @@ func (dm *DataMover) UnregRecv() { } func (dm *DataMover) IsFree() bool { - return !dm.stage.regred.Load() && dm.stage.regout.Load() + return !dm.stage.regged.Load() } func (dm *DataMover) Open() { @@ -207,11 +214,11 @@ func (dm *DataMover) String() string { switch { case dm.stage.opened.Load(): s = "open-" - case dm.stage.regred.Load(): - s = "reg-" // not open yet or closed but not unreg-ed yet + case dm.stage.regged.Load(): + s = "reg-" // reg-ed handlers, not open yet tho } if dm.data.streams == nil { - return "dm-nil-" + s + return "dm-" + s + "no-streams" } if dm.data.streams.UsePDU() { return "dm-pdu-" + s + dm.data.streams.Trname()