Skip to content

Commit

Permalink
global rebalance: amend streams init/term
Browse files Browse the repository at this point in the history
* add mutex, rm atomic

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Jan 22, 2025
1 parent 866a508 commit 8c45ccf
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 17 deletions.
7 changes: 6 additions & 1 deletion ais/test/regression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,13 @@ func postRenameWaitAndCheck(t *testing.T, baseParams api.BaseParams, rtd regress
tlog.Logf("not found: %s\n", name)
}
}
t.Fatalf("wrong number of objects in the bucket %s renamed as %s (before: %d. after: %d)",
err := fmt.Errorf("wrong number of objects in the bucket %s renamed as %s (before: %d. after: %d)",
rtd.bck.String(), rtd.renamedBck.String(), numPuts, len(unique))
if rtd.wait {
t.Fatal(err)
} else {
tlog.Logln("Warning: " + err.Error())
}
}
}

Expand Down
39 changes: 23 additions & 16 deletions transport/bundle/dmover.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package bundle
import (
"errors"
"io"
"sync"
"time"

"github.com/NVIDIA/aistore/api/apc"
Expand Down Expand Up @@ -38,8 +39,8 @@ type (
multiplier int
owt cmn.OWT
stage struct {
regred atomic.Bool
regout atomic.Bool
regmtx sync.Mutex
regged atomic.Bool
opened atomic.Bool
laterx atomic.Bool
}
Expand Down Expand Up @@ -68,7 +69,6 @@ func NewDM(trname string, recvCB transport.RecvObj, owt cmn.OWT, extra Extra) *D
dm.owt = owt
dm.multiplier = extra.Multiplier
dm.sizePDU, dm.maxHdrSize = extra.SizePDU, extra.MaxHdrSize
dm.stage.regout.Store(true)

if extra.Compression == "" {
extra.Compression = apc.CompressNever
Expand Down Expand Up @@ -118,12 +118,15 @@ func (dm *DataMover) Renew(trname string, recvCB transport.RecvObj, owt cmn.OWT,

// register user's receive-data (and, optionally, receive-ack) wrappers
func (dm *DataMover) RegRecv() error {
if !dm.stage.regred.CAS(false, true) {
return errors.New(dm.String() + ": duplicated or early reg-recv")
dm.stage.regmtx.Lock()
defer dm.stage.regmtx.Unlock()

if dm.stage.regged.Load() {
return errors.New("duplicated reg: " + dm.String())
}
if err := transport.Handle(dm.data.trname, dm.wrapRecvData); err != nil {
nlog.Errorln(err, "[", dm.String(), "]")
dm.stage.regred.Store(false)
// (unlikely)
debug.AssertNoErr(err)
return err
}
if dm.useACKs() {
Expand All @@ -132,23 +135,27 @@ func (dm *DataMover) RegRecv() error {
nlog.Errorln("FATAL:", err, "[ nested:", nerr, dm.String(), "]")
debug.AssertNoErr(nerr)
}
dm.stage.regred.Store(false)
return err
}
}

dm.stage.regout.Store(false)
dm.stage.regged.Store(true)
return nil
}

func (dm *DataMover) UnregRecv() {
if dm == nil {
return
}
defer dm.stage.regout.Store(true)
if !dm.stage.regred.CAS(true, false) {
return // e.g., 2PC (begin => abort) sequence with no Open
dm.stage.regmtx.Lock()
defer dm.stage.regmtx.Unlock()

if !dm.stage.regged.Load() {
nlog.WarningDepth(1, "duplicated unreg:", dm.String())
return
}
defer dm.stage.regged.Store(false)

if dm.xctn != nil {
timeout := dm.config.Transport.QuiesceTime.D()
if dm.xctn.IsAborted() {
Expand All @@ -167,7 +174,7 @@ func (dm *DataMover) UnregRecv() {
}

func (dm *DataMover) IsFree() bool {
return !dm.stage.regred.Load() && dm.stage.regout.Load()
return !dm.stage.regged.Load()
}

func (dm *DataMover) Open() {
Expand Down Expand Up @@ -207,11 +214,11 @@ func (dm *DataMover) String() string {
switch {
case dm.stage.opened.Load():
s = "open-"
case dm.stage.regred.Load():
s = "reg-" // not open yet or closed but not unreg-ed yet
case dm.stage.regged.Load():
s = "reg-" // reg-ed handlers, not open yet tho
}
if dm.data.streams == nil {
return "dm-nil-" + s
return "dm-" + s + "no-streams"
}
if dm.data.streams.UsePDU() {
return "dm-pdu-" + s + dm.data.streams.Trname()
Expand Down

0 comments on commit 8c45ccf

Please sign in to comment.