diff --git a/.github/workflows/goreleaser.yml b/.github/workflows/goreleaser.yml index ea57737d9..52f509fc2 100644 --- a/.github/workflows/goreleaser.yml +++ b/.github/workflows/goreleaser.yml @@ -18,9 +18,6 @@ jobs: run: | echo "::set-env name=VERSION::$(git describe --tags $(git rev-list --tags --max-count=1))" - - name: Unshallow - run: git fetch --prune --unshallow - - name: Set up Go uses: actions/setup-go@v2 with: diff --git a/cluster/calcium/realloc.go b/cluster/calcium/realloc.go index 286549bcb..53ffabb81 100644 --- a/cluster/calcium/realloc.go +++ b/cluster/calcium/realloc.go @@ -233,37 +233,42 @@ func (c *Calcium) updateContainersResources(ctx context.Context, ch chan *types. } func (c *Calcium) updateResource(ctx context.Context, node *types.Node, container *types.Container, newResource *enginetypes.VirtualizationResource) error { - updateResourceErr := node.Engine.VirtualizationUpdateResource(ctx, container.ID, newResource) - if updateResourceErr == nil { - oldVolumeSize := container.Volumes.TotalSize() - container.CPU = newResource.CPU - container.Quota = newResource.Quota - container.Memory = newResource.Memory - container.Volumes, _ = types.MakeVolumeBindings(newResource.Volumes) - container.VolumePlan = types.MustToVolumePlan(newResource.VolumePlan) - container.Storage += container.Volumes.TotalSize() - oldVolumeSize - } else { - log.Errorf("[updateResource] When Realloc container, VirtualizationUpdateResource %s failed %v", container.ID, updateResourceErr) - } - // 成功失败都需要修改 node 的占用 - // 成功的话,node 占用为新资源 - // 失败的话,node 占用为老资源 - node.CPU.Sub(container.CPU) - node.SetCPUUsed(container.Quota, types.IncrUsage) - node.Volume.Sub(container.VolumePlan.IntoVolumeMap()) - node.SetVolumeUsed(container.VolumePlan.IntoVolumeMap().Total(), types.IncrUsage) - node.StorageCap -= container.Storage - node.MemCap -= container.Memory - if nodeID := node.GetNUMANode(container.CPU); nodeID != "" { - node.DecrNUMANodeMemory(nodeID, container.Memory) - } - // 更新 container 元数据 - // since we don't rollback VirutalUpdateResource, client can't interrupt - if err := c.store.UpdateContainer(context.Background(), container); err != nil { - log.Errorf("[updateResource] Realloc finish but update container %s failed %v", container.ID, err) + var updateErr error + if err := utils.Txn( + ctx, + func(ctx context.Context) error { + if updateErr = node.Engine.VirtualizationUpdateResource(ctx, container.ID, newResource); updateErr == nil { + oldVolumeSize := container.Volumes.TotalSize() + container.CPU = newResource.CPU + container.Quota = newResource.Quota + container.Memory = newResource.Memory + container.Volumes, _ = types.MakeVolumeBindings(newResource.Volumes) + container.VolumePlan = types.MustToVolumePlan(newResource.VolumePlan) + container.Storage += container.Volumes.TotalSize() - oldVolumeSize + } + return nil + }, + func(ctx context.Context) error { + // 成功失败都需要修改 node 的占用 + // 成功的话,node 占用为新资源 + // 失败的话,node 占用为老资源 + node.CPU.Sub(container.CPU) + node.SetCPUUsed(container.Quota, types.IncrUsage) + node.Volume.Sub(container.VolumePlan.IntoVolumeMap()) + node.SetVolumeUsed(container.VolumePlan.IntoVolumeMap().Total(), types.IncrUsage) + node.StorageCap -= container.Storage + node.MemCap -= container.Memory + if nodeID := node.GetNUMANode(container.CPU); nodeID != "" { + node.DecrNUMANodeMemory(nodeID, container.Memory) + } + return c.store.UpdateContainer(ctx, container) + }, + nil, + c.config.GlobalTimeout, + ); err != nil { return err } - return updateResourceErr + return updateErr } func (c *Calcium) reallocVolume(node *types.Node, containers []*types.Container, vbs types.VolumeBindings) (plans map[*types.Container]types.VolumePlan, err error) { diff --git a/cluster/calcium/service.go b/cluster/calcium/service.go index 134fca7ea..ec3526c61 100644 --- a/cluster/calcium/service.go +++ b/cluster/calcium/service.go @@ -17,14 +17,14 @@ type serviceWatcher struct { subs sync.Map } -func (w *serviceWatcher) Start(s store.Store, pushInterval time.Duration) { +func (w *serviceWatcher) Start(ctx context.Context, s store.Store, pushInterval time.Duration) { w.once.Do(func() { - w.start(s, pushInterval) + w.start(ctx, s, pushInterval) }) } -func (w *serviceWatcher) start(s store.Store, pushInterval time.Duration) { - ch, err := s.ServiceStatusStream(context.Background()) +func (w *serviceWatcher) start(ctx context.Context, s store.Store, pushInterval time.Duration) { + ch, err := s.ServiceStatusStream(ctx) if err != nil { log.Errorf("[WatchServiceStatus] failed to start watch: %v", err) return @@ -84,7 +84,7 @@ func (w *serviceWatcher) Unsubscribe(id uuid.UUID) { // WatchServiceStatus returns chan of available service address func (c *Calcium) WatchServiceStatus(ctx context.Context) (<-chan types.ServiceStatus, error) { ch := make(chan types.ServiceStatus) - c.watcher.Start(c.store, c.config.GRPCConfig.ServiceDiscoveryPushInterval) + c.watcher.Start(ctx, c.store, c.config.GRPCConfig.ServiceDiscoveryPushInterval) id := c.watcher.Subscribe(ch) go func() { <-ctx.Done() diff --git a/core.go b/core.go index 03e8d27ae..740810c06 100644 --- a/core.go +++ b/core.go @@ -1,7 +1,6 @@ package main import ( - "context" "fmt" "net" "net/http" @@ -46,7 +45,7 @@ func setupLog(l string) error { return nil } -func serve() { +func serve(c *cli.Context) error { config, err := utils.LoadConfig(configPath) if err != nil { log.Fatalf("[main] %v", err) @@ -70,7 +69,8 @@ func serve() { vibranium := rpc.New(cluster, config, rpcch) s, err := net.Listen("tcp", config.Bind) if err != nil { - log.Fatalf("[main] %v", err) + log.Errorf("[main] %v", err) + return err } opts := []grpc.ServerOption{ @@ -90,9 +90,10 @@ func serve() { pb.RegisterCoreRPCServer(grpcServer, vibranium) go func() { if err := grpcServer.Serve(s); err != nil { - log.Fatalf("[main] start grpc failed %v", err) + log.Errorf("[main] start grpc failed %v", err) } }() + if config.Profile != "" { http.Handle("/metrics", metrics.Client.ResourceMiddleware(cluster)(promhttp.Handler())) go func() { @@ -102,10 +103,10 @@ func serve() { }() } - unregisterService, err := cluster.RegisterService(context.Background()) + unregisterService, err := cluster.RegisterService(c.Context) if err != nil { log.Errorf("[main] failed to register service: %v", err) - return + return err } log.Info("[main] Cluster started successfully.") @@ -122,6 +123,7 @@ func serve() { log.Info("[main] Check if cluster still have running tasks.") vibranium.Wait() log.Info("[main] cluster gracefully stopped.") + return nil } func main() { @@ -147,10 +149,6 @@ func main() { Destination: &embeddedStorage, }, } - app.Action = func(c *cli.Context) error { - serve() - return nil - } - + app.Action = serve _ = app.Run(os.Args) } diff --git a/utils/transaction.go b/utils/transaction.go index a54e46710..4457ab7a9 100644 --- a/utils/transaction.go +++ b/utils/transaction.go @@ -40,9 +40,7 @@ func Txn(ctx context.Context, cond contextFunc, then contextFunc, rollback conte thenCtx, thenCancel = context.WithTimeout(context.Background(), ttl) defer thenCancel() } - if then != nil { - tnxErr = then(thenCtx) - } + tnxErr = then(thenCtx) } return tnxErr