Skip to content

Commit

Permalink
feat: the rest of the workload orchestration logic (#199)
Browse files Browse the repository at this point in the history
* Feat: code complete with integration test

* bump ci go version

* fix build

* address comments

* add one comment

Co-authored-by: Ryan Zhang <[email protected]>
  • Loading branch information
ryanzhang-oss and Ryan Zhang authored Aug 11, 2022
1 parent 9df1982 commit 87129c9
Show file tree
Hide file tree
Showing 42 changed files with 3,106 additions and 149 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ on:
- release-*
paths-ignore: [docs/**, "**.md", "**.mdx", "**.png", "**.jpg"]

env:
GO_VERSION: '1.18'

jobs:
detect-noop:
Expand Down Expand Up @@ -49,7 +51,7 @@ jobs:
## Repository upload token - get it from codecov.io. Required only for private repositories
token: ${{ secrets.CODECOV_TOKEN }}
## Comma-separated list of files to upload
files: ./coverage.xml
files: ./it-coverage.xml;./ut-coverage.xml

e2e-tests:
runs-on: ubuntu-latest
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ hack/tools/bin/

# cover profile
coverage.xml
it-coverage.xml
ut-coverage.xml

# editor and IDE paraphernalia
.idea
Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,15 @@ load-member-docker-image:
## --------------------------------------

.PHONY: test
test: manifests generate fmt vet local-unit-test## Run tests.
test: manifests generate fmt vet local-unit-test integration-test## Run tests.

.PHONY: local-unit-test
local-unit-test: $(ENVTEST) ## Run tests.
CGO_ENABLED=1 KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" go test ./pkg/... -race -coverprofile=coverage.xml -covermode=atomic -v
CGO_ENABLED=1 KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" go test ./pkg/... -race -coverprofile=ut-coverage.xml -covermode=atomic -v

.PHONY: integration-test
integration-test: $(ENVTEST) ## Run tests.
CGO_ENABLED=1 KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" go test ./test/integration/... -race -v
CGO_ENABLED=1 KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" go test ./test/integration/... -coverpkg=./... -race -coverprofile=it-coverage.xml -v

## e2e tests
install-hub-agent-helm:
Expand Down
2 changes: 2 additions & 0 deletions cmd/hub-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,12 @@ func SetupCustomControllers(ctx context.Context, mgr ctrl.Manager, config *rest.
// Set up a custom controller to reconcile cluster resource placement
klog.Info("Setting up clusterResourcePlacement controller")
crpc := &clusterresourceplacement.Reconciler{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor(clusterResourcePlacementName),
RestMapper: mgr.GetRESTMapper(),
InformerManager: dynamicInformerManager,
DisabledResourceConfig: disabledResourceConfig,
WorkPendingGracePeriod: opts.WorkPendingGracePeriod,
}

ratelimiter := options.DefaultControllerRateLimiter(opts.RateLimiterOpts)
Expand Down
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ require (
github.com/crossplane/crossplane-runtime v0.16.0
github.com/onsi/ginkgo/v2 v2.1.4
github.com/onsi/gomega v1.19.0
github.com/openkruise/kruise v1.2.0
github.com/spf13/cobra v1.3.0
github.com/stretchr/testify v1.7.0
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
k8s.io/apimachinery v0.23.5
k8s.io/apiserver v0.23.0
Expand Down Expand Up @@ -70,7 +72,7 @@ require (
github.com/golang-jwt/jwt v3.2.1+incompatible // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.6
github.com/google/go-cmp v0.5.8
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.2.0 // indirect
github.com/googleapis/gnostic v0.5.5 // indirect
Expand Down Expand Up @@ -99,15 +101,14 @@ require (
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
k8s.io/api v0.23.5
k8s.io/apiextensions-apiserver v0.23.5 // indirect
k8s.io/apiextensions-apiserver v0.23.5
k8s.io/component-base v0.23.5
k8s.io/klog/v2 v2.70.0
k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,9 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
Expand Down Expand Up @@ -443,6 +444,8 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw=
github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro=
github.com/openkruise/kruise v1.2.0 h1:EPhaFPe1LyBT2Z7ny5nhKymm6LewNV7nAfS/dDh/1BU=
github.com/openkruise/kruise v1.2.0/go.mod h1:R0Nr5GmyxPMncBYvRIJXmFeji9j3AS1iGX35srpxOb4=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
Expand Down Expand Up @@ -624,6 +627,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e h1:+WEEuIdZHnUeJJmEUjyYC2gfUMj69yZXw17EnHg/otA=
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e/go.mod h1:Kr81I6Kryrl9sr8s2FK3vxD90NdsKWRuOIl2O4CvYbA=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down Expand Up @@ -901,7 +906,6 @@ golang.org/x/tools v0.1.6-0.20210820212750-d4cc65f0b2ff/go.mod h1:YD9qOF0M9xpSpd
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gomodules.xyz/jsonpatch/v2 v2.2.0 h1:4pT439QV83L+G9FkcCriY6EkpcK6r6bK+A5FBUMI7qY=
gomodules.xyz/jsonpatch/v2 v2.2.0/go.mod h1:WXp+iVDkoLQqPudfQ9GBlwB2eZ5DKOnjQZCYdOS8GPY=
Expand Down
83 changes: 61 additions & 22 deletions pkg/controllers/clusterresourceplacement/cluster_selector.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
/*
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
*/

package clusterresourceplacement

import (
"fmt"

"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"

fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
Expand All @@ -14,51 +21,83 @@ import (

// selectClusters selected the resources according to the placement resourceSelectors and
// update the results in its status
func (r *Reconciler) selectClusters(placement *fleetv1alpha1.ClusterResourcePlacement) ([]string, error) {
func (r *Reconciler) selectClusters(placement *fleetv1alpha1.ClusterResourcePlacement) (clusterNames []string, err error) {
defer func() {
if err == nil {
// Set the status
placement.Status.TargetClusters = clusterNames
}
}()
// no policy set
if placement.Spec.Policy == nil {
clusterNames, err = r.listClusters(labels.Everything())
if err != nil {
return nil, err
}
klog.V(4).InfoS("we select all the available clusters in the fleet without a policy",
"placement", placement.Name, "clusters", clusterNames)
return clusterNames, nil
}
// a fix list of clusters set
if len(placement.Spec.Policy.ClusterNames) != 0 {
klog.V(4).InfoS("use the cluster names provided as the list of cluster we select", "placement", placement.Name)
klog.V(4).InfoS("use the cluster names provided as the list of cluster we select",
"placement", placement.Name, "clusters", placement.Spec.Policy.ClusterNames)
return placement.Spec.Policy.ClusterNames, nil
}

// no Affinity or ClusterAffinity set
if placement.Spec.Policy.Affinity == nil || placement.Spec.Policy.Affinity.ClusterAffinity == nil {
clusterNames, err = r.listClusters(labels.Everything())
if err != nil {
return nil, err
}
klog.V(4).InfoS("we select all the available clusters in the fleet without a cluster affinity",
"placement", placement.Name, "clusters", clusterNames)
return clusterNames, nil
}

selectedClusters := make(map[string]bool)
for _, clusterSelector := range placement.Spec.Policy.Affinity.ClusterAffinity.ClusterSelectorTerms {
clusterNames, err := r.listClusters(&clusterSelector.LabelSelector)
selector, err := metav1.LabelSelectorAsSelector(&clusterSelector.LabelSelector)
if err != nil {
return nil, errors.Wrap(err, "cannot convert the label clusterSelector to a clusterSelector")
}
clusterNames, err := r.listClusters(selector)
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("selector = %v", clusterSelector.LabelSelector))
}
for _, clusterName := range clusterNames {
selectedClusters[clusterName] = true
}
}
clusterNames := make([]string, 0)

for cluster := range selectedClusters {
klog.V(5).InfoS("matched a cluster", "cluster", cluster, "placement", placement.Name)
klog.V(4).InfoS("matched a cluster", "cluster", cluster, "placement", placement.Name)
clusterNames = append(clusterNames, cluster)
}
return clusterNames, nil
}

// listClusters retrieves the clusters according to its label selector, this will hit the informer cache.
func (r *Reconciler) listClusters(labelSelector *metav1.LabelSelector) ([]string, error) {
if !r.memberClusterInformerSynced && !r.InformerManager.IsInformerSynced(utils.MemberClusterGVR) {
return nil, fmt.Errorf("informer cache for memberCluster is not synced yet")
}
r.memberClusterInformerSynced = true

clusterSelector, err := metav1.LabelSelectorAsSelector(labelSelector)
if err != nil {
return nil, errors.Wrap(err, "cannot convert the label clusterSelector to a clusterSelector")
}
clusterNames := make([]string, 0)
objs, err := r.InformerManager.Lister(utils.MemberClusterGVR).List(clusterSelector)
func (r *Reconciler) listClusters(labelSelector labels.Selector) ([]string, error) {
objs, err := r.InformerManager.Lister(utils.MemberClusterGVR).List(labelSelector)
if err != nil {
return nil, errors.Wrap(err, "failed to list the clusters according to obj label selector")
}
for _, obj := range objs {
clusterObj, err := meta.Accessor(obj)

clusterNames := make([]string, len(objs))
for i, obj := range objs {
uObj := obj.DeepCopyObject().(*unstructured.Unstructured)
var clusterObj fleetv1alpha1.MemberCluster
err = runtime.DefaultUnstructuredConverter.FromUnstructured(uObj.Object, &clusterObj)
if err != nil {
return nil, errors.Wrap(err, "cannot get the name of a cluster object")
return nil, errors.Wrap(err, "cannot decode the member cluster object")
}
// only schedule the resource to a joined cluster
// TODO: check the health/condition of the cluster too
if clusterObj.Spec.State == fleetv1alpha1.ClusterStateJoin {
clusterNames[i] = clusterObj.GetName()
}
clusterNames = append(clusterNames, clusterObj.GetName())
}
return clusterNames, nil
}
Loading

0 comments on commit 87129c9

Please sign in to comment.