Skip to content

Commit

Permalink
Merge pull request #186 from galal-hussein/worker_parallel
Browse files Browse the repository at this point in the history
Add concurrency to worker deployments
  • Loading branch information
Alena Prokharchyk authored Jan 2, 2018
2 parents 00ffa8e + 828f6e4 commit 952436a
Showing 1 changed file with 41 additions and 30 deletions.
71 changes: 41 additions & 30 deletions services/workerplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,32 @@ import (
"github.com/rancher/rke/hosts"
"github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)

func RunWorkerPlane(controlHosts, workerHosts []*hosts.Host, workerServices v3.RKEConfigServices, nginxProxyImage, sidekickImage string, healthcheckDialerFactory hosts.DialerFactory) error {
logrus.Infof("[%s] Building up Worker Plane..", WorkerRole)
var errgrp errgroup.Group

// Deploy worker components on control hosts
for _, host := range controlHosts {
// run sidekick
if err := runSidekick(host, sidekickImage); err != nil {
return err
}
// run kubelet
// only one master for now
if err := runKubelet(host, workerServices.Kubelet, healthcheckDialerFactory); err != nil {
return err
}
if err := runKubeproxy(host, workerServices.Kubeproxy, healthcheckDialerFactory); err != nil {
return err
}
controlHost := host
errgrp.Go(func() error {
return doDeployWorkerPlane(controlHost, workerServices, nginxProxyImage, sidekickImage, healthcheckDialerFactory, controlHosts)
})
}
if err := errgrp.Wait(); err != nil {
return err
}
// Deploy worker components on worker hosts
for _, host := range workerHosts {
// run nginx proxy
if !host.IsControl {
if err := runNginxProxy(host, controlHosts, nginxProxyImage); err != nil {
return err
}
}
// run sidekick
if err := runSidekick(host, sidekickImage); err != nil {
return err
}
// run kubelet
if err := runKubelet(host, workerServices.Kubelet, healthcheckDialerFactory); err != nil {
return err
}
// run kubeproxy
if err := runKubeproxy(host, workerServices.Kubeproxy, healthcheckDialerFactory); err != nil {
return err
}
workerHost := host
errgrp.Go(func() error {
return doDeployWorkerPlane(workerHost, workerServices, nginxProxyImage, sidekickImage, healthcheckDialerFactory, controlHosts)
})
}
if err := errgrp.Wait(); err != nil {
return err
}
logrus.Infof("[%s] Successfully started Worker Plane..", WorkerRole)
return nil
Expand Down Expand Up @@ -72,3 +61,25 @@ func RemoveWorkerPlane(workerHosts []*hosts.Host, force bool) error {

return nil
}

func doDeployWorkerPlane(host *hosts.Host,
workerServices v3.RKEConfigServices,
nginxProxyImage, sidekickImage string,
healthcheckDialerFactory hosts.DialerFactory,
controlHosts []*hosts.Host) error {
// run nginx proxy
if !host.IsControl {
if err := runNginxProxy(host, controlHosts, nginxProxyImage); err != nil {
return err
}
}
// run sidekick
if err := runSidekick(host, sidekickImage); err != nil {
return err
}
// run kubelet
if err := runKubelet(host, workerServices.Kubelet, healthcheckDialerFactory); err != nil {
return err
}
return runKubeproxy(host, workerServices.Kubeproxy, healthcheckDialerFactory)
}

0 comments on commit 952436a

Please sign in to comment.