Skip to content

Commit

Permalink
support skipping image pull when deploy (#635)
Browse files Browse the repository at this point in the history
* allow int for memory

* support skipping image pull when deploy

* add UT for ignore image pull
  • Loading branch information
yuyang0 authored Mar 7, 2024
1 parent 00f6439 commit 54bdcc9
Show file tree
Hide file tree
Showing 8 changed files with 684 additions and 480 deletions.
9 changes: 6 additions & 3 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context,
seq int) (indices []int, err error) {

logger := log.WithFunc("calcium.doDeployWorkloadsOnNode").WithField("node", nodename).WithField("ident", opts.ProcessIdent).WithField("deploy", deploy).WithField("seq", seq)
node, err := c.doGetAndPrepareNode(ctx, nodename, opts.Image)
node, err := c.doGetAndPrepareNode(ctx, nodename, opts.Image, opts.IgnorePull)
if err != nil {
for i := 0; i < deploy; i++ {
logger.Error(ctx, err)
Expand Down Expand Up @@ -280,13 +280,16 @@ func (c *Calcium) doDeployWorkloadsOnNode(ctx context.Context,
return indices, err
}

func (c *Calcium) doGetAndPrepareNode(ctx context.Context, nodename, image string) (*types.Node, error) {
func (c *Calcium) doGetAndPrepareNode(ctx context.Context, nodename, image string, ignorePull bool) (*types.Node, error) {
node, err := c.store.GetNode(ctx, nodename)
if err != nil {
return nil, err
}
if !ignorePull {
err = pullImage(ctx, node, image)
}

return node, pullImage(ctx, node, image)
return node, err
}

// transaction: workload metadata consistency
Expand Down
189 changes: 188 additions & 1 deletion cluster/calcium/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,194 @@ func TestCreateWorkloadTxn(t *testing.T) {
engine.AssertExpectations(t)
}

func newCreateWorkloadCluster(_ *testing.T) (*Calcium, []*types.Node) {
func TestCreateWorkloadIngorePullTxn(t *testing.T) {
c, nodes := newCreateWorkloadCluster(t)
ctx := context.Background()
opts := &types.DeployOptions{
Name: "zc:name",
Count: 2,
DeployStrategy: strategy.Auto,
Podname: "p1",
Resources: resourcetypes.Resources{},
Image: "zc:test",
Entrypoint: &types.Entrypoint{
Name: "good-entrypoint",
},
NodeFilter: &types.NodeFilter{},
IgnorePull: true,
}

store := c.store.(*storemocks.Store)
rmgr := c.rmgr.(*resourcemocks.Manager)
mwal := &walmocks.WAL{}
c.wal = mwal
var walCommitted bool
commit := wal.Commit(func() error {
walCommitted = true
return nil
})
mwal.On("Log", mock.Anything, mock.Anything).Return(commit, nil)
node1, node2 := nodes[0], nodes[1]

// doAllocResource fails: GetNodesDeployCapacity
rmgr.On("GetNodesDeployCapacity", mock.Anything, mock.Anything, mock.Anything).Return(
nil, 0, types.ErrMockError,
).Once()
ch, err := c.CreateWorkload(ctx, opts)
assert.Nil(t, err)
cnt := 0
for m := range ch {
cnt++
assert.Error(t, m.Error, "key is empty")
}
assert.EqualValues(t, 1, cnt)
assert.True(t, walCommitted)
walCommitted = false
rmgr.On("GetNodesDeployCapacity", mock.Anything, mock.Anything, mock.Anything).Return(
map[string]*plugintypes.NodeDeployCapacity{
node1.Name: {
Capacity: 10,
Usage: 0.5,
Rate: 0.05,
Weight: 100,
},
node2.Name: {
Capacity: 10,
Usage: 0.5,
Rate: 0.05,
Weight: 100,
},
}, 20, nil,
)

// doAllocResource fails: GetDeployStatus
store.On("GetDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "GetDeployStatus")).Once()
ch, err = c.CreateWorkload(ctx, opts)
assert.Nil(t, err)
cnt = 0
for m := range ch {
cnt++
assert.ErrorIs(t, m.Error, context.DeadlineExceeded)
assert.Error(t, m.Error, "GetDeployStatus")
}
assert.EqualValues(t, 1, cnt)
assert.True(t, walCommitted)
walCommitted = false
store.On("GetDeployStatus", mock.Anything, mock.Anything, mock.Anything).Return(map[string]int{}, nil)

// doAllocResource fails: Alloc
rmgr.On("Alloc", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
nil, nil, types.ErrMockError,
).Once()
ch, err = c.CreateWorkload(ctx, opts)
assert.Nil(t, err)
cnt = 0
for m := range ch {
cnt++
assert.Error(t, m.Error, "DeadlineExceeded")
}
assert.EqualValues(t, 1, cnt)
assert.True(t, walCommitted)
walCommitted = false
rmgr.On("Alloc", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
[]resourcetypes.Resources{{}, {}},
[]resourcetypes.Resources{
{node1.Name: {}},
{node2.Name: {}},
},
nil,
)
rmgr.On("RollbackAlloc", mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("GetNode",
mock.AnythingOfType("*context.timerCtx"),
mock.AnythingOfType("string"),
).Return(
func(_ context.Context, name string) (node *types.Node) {
node = node1
if name == "n2" {
node = node2
}
return
}, nil)
engine := node1.Engine.(*enginemocks.API)
// engine.On("ImageLocalDigests", mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "ImageLocalDigest")).Twice()
// engine.On("ImagePull", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "ImagePull")).Twice()
store.On("DeleteProcessing", mock.Anything, mock.Anything, mock.Anything).Return(nil)
// ch, err = c.CreateWorkload(ctx, opts)
// assert.Nil(t, err)
// cnt = 0
// for m := range ch {
// cnt++
// assert.Error(t, m.Error, "ImagePull")
// }
// assert.EqualValues(t, 2, cnt)
// assert.True(t, walCommitted)

// doDeployOneWorkload fails: VirtualizationCreate
// engine.On("ImageLocalDigests", mock.Anything, mock.Anything).Return([]string{""}, nil)
// engine.On("ImageRemoteDigest", mock.Anything, mock.Anything).Return("", nil)
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(nil, errors.Wrap(context.DeadlineExceeded, "VirtualizationCreate")).Twice()
engine.On("VirtualizationRemove", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("ListNodeWorkloads", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrMockError)
walCommitted = false
ch, err = c.CreateWorkload(ctx, opts)
assert.Nil(t, err)
cnt = 0
for m := range ch {
cnt++
assert.Error(t, m.Error)
assert.True(t, errors.Is(m.Error, context.DeadlineExceeded))
assert.Error(t, m.Error, "VirtualizationCreate")
}
assert.EqualValues(t, 2, cnt)
assert.True(t, walCommitted)

// doCreateAndStartWorkload fails: AddWorkload
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationCreated{ID: "c1"}, nil)
engine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(nil)
engine.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationInfo{}, nil)
store.On("AddWorkload", mock.Anything, mock.Anything, mock.Anything).Return(errors.Wrap(context.DeadlineExceeded, "AddWorkload")).Twice()
walCommitted = false
ch, err = c.CreateWorkload(ctx, opts)
assert.Nil(t, err)
cnt = 0
for m := range ch {
cnt++
assert.Error(t, m.Error)
assert.True(t, errors.Is(m.Error, context.DeadlineExceeded))
assert.Error(t, m.Error, "AddWorkload")
}
assert.EqualValues(t, 2, cnt)
assert.True(t, walCommitted)

// doCreateAndStartWorkload fails: first time AddWorkload failed
engine.On("VirtualizationCreate", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationCreated{ID: "c1"}, nil)
engine.On("VirtualizationStart", mock.Anything, mock.Anything).Return(nil)
engine.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationInfo{}, nil)
store.On("AddWorkload", mock.Anything, mock.Anything, mock.Anything).Return(errors.Wrap(context.DeadlineExceeded, "AddWorkload2")).Once()
store.On("AddWorkload", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
walCommitted = false
ch, err = c.CreateWorkload(ctx, opts)
assert.Nil(t, err)
cnt = 0
errCnt := 0
for m := range ch {
cnt++
if m.Error != nil {
assert.Error(t, m.Error)
assert.True(t, errors.Is(m.Error, context.DeadlineExceeded))
assert.Error(t, m.Error, "AddWorkload2")
errCnt++
}
}
assert.EqualValues(t, 2, cnt)
assert.EqualValues(t, 1, errCnt)
assert.True(t, walCommitted)
store.AssertExpectations(t)
engine.AssertExpectations(t)
}

func newCreateWorkloadCluster(t *testing.T) (*Calcium, []*types.Node) {
c := NewTestCluster()

engine := &enginemocks.API{}
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (c *Calcium) doReplaceWorkload(
return nil, removeMessage, types.ErrWorkloadIgnored
}
// prepare node
node, err := c.doGetAndPrepareNode(ctx, workload.Nodename, opts.Image)
node, err := c.doGetAndPrepareNode(ctx, workload.Nodename, opts.Image, opts.IgnorePull)
if err != nil {
return nil, removeMessage, err
}
Expand Down
5 changes: 3 additions & 2 deletions resource/plugins/cpumem/types/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,9 @@ func (n *NodeResourceRequest) Parse(config coretypes.Config, rawParams resourcet
n.CPUMap[cpuID] = int(pieces)
}
}

if n.Memory, err = coreutils.ParseRAMInHuman(rawParams.String("memory")); err != nil {
if mem := rawParams.Int64("memory"); mem > 0 {
n.Memory = mem
} else if n.Memory, err = coreutils.ParseRAMInHuman(rawParams.String("memory")); err != nil {
return err
}

Expand Down
Loading

0 comments on commit 54bdcc9

Please sign in to comment.