Skip to content

Commit

Permalink
Merge pull request #477 from briandowns/issue-207
Browse files Browse the repository at this point in the history
Refactor Code to Use K8S Retries
  • Loading branch information
briandowns authored Nov 4, 2020
2 parents db31b3d + ca2f78b commit 0dcc727
Show file tree
Hide file tree
Showing 5 changed files with 539 additions and 76 deletions.
11 changes: 9 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,18 @@ package-images: build-images ## Package docker images for airgap environment
./scripts/package-images

.PHONY: package-bundle
package-bundle: build ## Package the tarball bundle
package-bundle: build ## Package the tarball bundle
./scripts/package-bundle

.PHONY: test
test:
test: unit-tests integration-tests

.PHONY: unit-tests
unit-tests:
./scripts/unit-tests

.PHONY: integration-tests
integration-tests:
./scripts/test

./.dapper:
Expand Down
178 changes: 106 additions & 72 deletions pkg/rke2/psp.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,6 @@ func updateNamespaceRef(ctx context.Context, cs *kubernetes.Clientset, ns *v1.Na
return nil
}

type deployFn func(context.Context, *kubernetes.Clientset, interface{}) error

// newClient create a new Kubernetes client from configuration.
func newClient(kubeConfigPath string, k8sWrapTransport transport.WrapperFunc) (*kubernetes.Clientset, error) {
config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath)
Expand All @@ -389,116 +387,152 @@ func decodeYamlResource(data interface{}, yaml string) error {
return decoder.Decode(data)
}

func retryTo(ctx context.Context, runFunc deployFn, cs *kubernetes.Clientset, resource interface{}, retries, wait int) error {
var err error
if retries <= 0 {
retries = defaultRetries
}
if wait <= 0 {
wait = defaultWaitSeconds
}
for i := 0; i < retries; i++ {
if err = runFunc(ctx, cs, resource); err != nil {
time.Sleep(time.Second * time.Duration(wait))
continue
}
return nil
}
return err
}

func deployPodSecurityPolicyFromYaml(ctx context.Context, cs *kubernetes.Clientset, pspYaml string) error {
func deployPodSecurityPolicyFromYaml(ctx context.Context, cs kubernetes.Interface, pspYaml string) error {
var psp v1beta1.PodSecurityPolicy
if err := decodeYamlResource(&psp, pspYaml); err != nil {
return err
}
return retryTo(ctx, deployPodSecurityPolicy, cs, psp, defaultRetries, defaultWaitSeconds)
}

func deployPodSecurityPolicy(ctx context.Context, cs *kubernetes.Clientset, p interface{}) error {
psp, ok := p.(v1beta1.PodSecurityPolicy)
if !ok {
return fmt.Errorf("invalid type provided: %T, expected: PodSecurityPolicy", p)
}
if _, err := cs.PolicyV1beta1().PodSecurityPolicies().Create(ctx, &psp, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) {
// try to create the given PSP. If it already exists, we fall through to
// attempting to update the existing PSP.
if err := retry.OnError(retry.DefaultBackoff,
func(err error) bool {
return !apierrors.IsAlreadyExists(err)
}, func() error {
_, err := cs.PolicyV1beta1().PodSecurityPolicies().Create(ctx, &psp, metav1.CreateOptions{})
return err
}
if _, err := cs.PolicyV1beta1().PodSecurityPolicies().Update(ctx, &psp, metav1.UpdateOptions{}); err != nil {
},
); err != nil && apierrors.IsAlreadyExists(err) {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
retrievedPSP, err := cs.PolicyV1beta1().PodSecurityPolicies().Get(ctx, psp.Name, metav1.GetOptions{})
if err != nil {
return err
}
if retrievedPSP.Annotations == nil {
retrievedPSP.Annotations = make(map[string]string, len(psp.Annotations))
}
for k, v := range psp.Annotations {
retrievedPSP.Annotations[k] = v
}
retrievedPSP.Spec = psp.Spec
_, err = cs.PolicyV1beta1().PodSecurityPolicies().Update(ctx, retrievedPSP, metav1.UpdateOptions{})
return err
}
})
} else if err != nil {
return err
}
return nil
}

func deployClusterRoleBindingFromYaml(ctx context.Context, cs *kubernetes.Clientset, clusterRoleBindingYaml string) error {
func deployClusterRoleBindingFromYaml(ctx context.Context, cs kubernetes.Interface, clusterRoleBindingYaml string) error {
var clusterRoleBinding rbacv1.ClusterRoleBinding
if err := decodeYamlResource(&clusterRoleBinding, clusterRoleBindingYaml); err != nil {
return err
}
return retryTo(ctx, deployClusterRoleBinding, cs, clusterRoleBinding, defaultRetries, defaultWaitSeconds)
}

func deployClusterRoleBinding(ctx context.Context, cs *kubernetes.Clientset, crb interface{}) error {
clusterRoleBinding, ok := crb.(rbacv1.ClusterRoleBinding)
if !ok {
return fmt.Errorf("invalid type provided: %T, expected: ClusterRoleBinding", crb)
}
if _, err := cs.RbacV1().ClusterRoleBindings().Create(ctx, &clusterRoleBinding, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) {
// try to create the given cluster role binding. If it already exists, we
// fall through to attempting to update the existing cluster role binding.
if err := retry.OnError(retry.DefaultBackoff,
func(err error) bool {
return !apierrors.IsAlreadyExists(err)
}, func() error {
_, err := cs.RbacV1().ClusterRoleBindings().Create(ctx, &clusterRoleBinding, metav1.CreateOptions{})
return err
}
if _, err := cs.RbacV1().ClusterRoleBindings().Update(ctx, &clusterRoleBinding, metav1.UpdateOptions{}); err != nil {
},
); err != nil && apierrors.IsAlreadyExists(err) {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
retrievedCRB, err := cs.RbacV1().ClusterRoleBindings().Get(ctx, clusterRoleBinding.Name, metav1.GetOptions{})
if err != nil {
return err
}
if retrievedCRB.Annotations == nil {
retrievedCRB.Annotations = make(map[string]string, len(clusterRoleBinding.Annotations))
}
for k, v := range clusterRoleBinding.Annotations {
retrievedCRB.Annotations[k] = v
}
retrievedCRB.Subjects = clusterRoleBinding.Subjects
retrievedCRB.RoleRef = clusterRoleBinding.RoleRef
_, err = cs.RbacV1().ClusterRoleBindings().Update(ctx, retrievedCRB, metav1.UpdateOptions{})
return err
}
})
} else if err != nil {
return err
}
return nil
}

func deployClusterRoleFromYaml(ctx context.Context, cs *kubernetes.Clientset, clusterRoleYaml string) error {
func deployClusterRoleFromYaml(ctx context.Context, cs kubernetes.Interface, clusterRoleYaml string) error {
var clusterRole rbacv1.ClusterRole
if err := decodeYamlResource(&clusterRole, clusterRoleYaml); err != nil {
return err
}
return retryTo(ctx, deployClusterRole, cs, clusterRole, defaultRetries, defaultWaitSeconds)
}

func deployClusterRole(ctx context.Context, cs *kubernetes.Clientset, cr interface{}) error {
clusterRole, ok := cr.(rbacv1.ClusterRole)
if !ok {
return fmt.Errorf("invalid type provided: %T, expected: ClusterRole", cr)
}
if _, err := cs.RbacV1().ClusterRoles().Create(ctx, &clusterRole, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) {
// try to create the given cluster role. If it already exists, we
// fall through to attempting to update the existing cluster role.
if err := retry.OnError(retry.DefaultBackoff,
func(err error) bool {
return !apierrors.IsAlreadyExists(err)
}, func() error {
_, err := cs.RbacV1().ClusterRoles().Create(ctx, &clusterRole, metav1.CreateOptions{})
return err
}
if _, err := cs.RbacV1().ClusterRoles().Update(ctx, &clusterRole, metav1.UpdateOptions{}); err != nil {
},
); err != nil && apierrors.IsAlreadyExists(err) {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
retrievedCR, err := cs.RbacV1().ClusterRoles().Get(ctx, clusterRole.Name, metav1.GetOptions{})
if err != nil {
return err
}
if retrievedCR.Annotations == nil {
retrievedCR.Annotations = make(map[string]string, len(clusterRole.Annotations))
}
for k, v := range clusterRole.Annotations {
retrievedCR.Annotations[k] = v
}
retrievedCR.Rules = clusterRole.Rules
_, err = cs.RbacV1().ClusterRoles().Update(ctx, retrievedCR, metav1.UpdateOptions{})
return err
}
})
} else if err != nil {
return err
}
return nil
}

func deployRoleBindingFromYaml(ctx context.Context, cs *kubernetes.Clientset, roleBindingYaml string) error {
func deployRoleBindingFromYaml(ctx context.Context, cs kubernetes.Interface, roleBindingYaml string) error {
var roleBinding rbacv1.RoleBinding
if err := decodeYamlResource(&roleBinding, roleBindingYaml); err != nil {
return err
}
return retryTo(ctx, deployRoleBinding, cs, roleBinding, defaultRetries, defaultWaitSeconds)
}

func deployRoleBinding(ctx context.Context, cs *kubernetes.Clientset, rb interface{}) error {
roleBinding, ok := rb.(rbacv1.RoleBinding)
if !ok {
return fmt.Errorf("invalid type provided: %T, expected: RoleBinding", rb)
}
if _, err := cs.RbacV1().RoleBindings(roleBinding.Namespace).Create(ctx, &roleBinding, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) {
// try to create the given role binding. If it already exists, we fall through to
// attempting to update the existing role binding.
if err := retry.OnError(retry.DefaultBackoff,
func(err error) bool {
return !apierrors.IsAlreadyExists(err)
}, func() error {
_, err := cs.RbacV1().RoleBindings(roleBinding.Namespace).Create(ctx, &roleBinding, metav1.CreateOptions{})
return err
}
if _, err := cs.RbacV1().RoleBindings(roleBinding.Namespace).Update(ctx, &roleBinding, metav1.UpdateOptions{}); err != nil {
},
); err != nil && apierrors.IsAlreadyExists(err) {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
retrievedR, err := cs.RbacV1().RoleBindings(roleBinding.Namespace).Get(ctx, roleBinding.Name, metav1.GetOptions{})
if err != nil {
return err
}
if retrievedR.Annotations == nil {
retrievedR.Annotations = make(map[string]string, len(roleBinding.Annotations))
}
for k, v := range roleBinding.Annotations {
retrievedR.Annotations[k] = v
}
retrievedR.Subjects = roleBinding.Subjects
retrievedR.RoleRef = roleBinding.RoleRef
_, err = cs.RbacV1().RoleBindings(roleBinding.Namespace).Update(ctx, retrievedR, metav1.UpdateOptions{})
return err
}
})
} else if err != nil {
return err
}
return nil
}
3 changes: 1 addition & 2 deletions pkg/rke2/psp_templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ subjects:
name: system:authenticated
`

const systemUnrestrictedPSPTemplate = `
apiVersion: policy/v1beta1
const systemUnrestrictedPSPTemplate = `apiVersion: policy/v1beta1
kind: PodSecurityPolicy
metadata:
name: %s
Expand Down
Loading

0 comments on commit 0dcc727

Please sign in to comment.