Skip to content

Commit

Permalink
Merge pull request dokku#7403 from Tashows/fix-ims-k3s-run
Browse files Browse the repository at this point in the history
Properly resolve imagePullSecrets from app and deploymentID if relevant property does not exist
  • Loading branch information
josegonzalez authored Dec 11, 2024
2 parents 6d537f5 + ed0ba70 commit 5fcbd5b
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 27 deletions.
83 changes: 65 additions & 18 deletions plugins/scheduler-k3s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ func getKubeContext() string {
return common.PropertyGetDefault("scheduler-k3s", "--global", "kube-context", DefaultKubeContext)
}

type NotFoundError struct {
Message string
}

func (e *NotFoundError) Error() string {
return e.Message
}

type EmptyResultsError struct {
Message string
}

func (e *EmptyResultsError) Error() string {
return e.Message
}

// KubernetesClient is a wrapper around the Kubernetes client
type KubernetesClient struct {
// Client is the Kubernetes client
Expand Down Expand Up @@ -141,7 +157,7 @@ func (k KubernetesClient) AnnotateNode(ctx context.Context, input AnnotateNodeIn
}

if node == nil {
return errors.New("node is nil")
return &NotFoundError{"node is nil"}
}

keyPath := fmt.Sprintf("/metadata/annotations/%s", jsonpointer.Escape(input.Key))
Expand Down Expand Up @@ -206,7 +222,7 @@ func (k KubernetesClient) CreateJob(ctx context.Context, input CreateJobInput) (
}

if job == nil {
return batchv1.Job{}, errors.New("job is nil")
return batchv1.Job{}, &NotFoundError{"job is nil"}
}

return *job, err
Expand Down Expand Up @@ -237,7 +253,7 @@ func (k KubernetesClient) CreateNamespace(ctx context.Context, input CreateNames
}

if namespace == nil {
return corev1.Namespace{}, errors.New("namespace is nil")
return corev1.Namespace{}, &NotFoundError{"namespace is nil"}
}

return *namespace, err
Expand Down Expand Up @@ -418,7 +434,7 @@ func (k KubernetesClient) GetNode(ctx context.Context, input GetNodeInput) (Node
}

if node == nil {
return Node{}, errors.New("node is nil")
return Node{}, &NotFoundError{"node is nil"}
}

return kubernetesNodeToNode(*node), err
Expand All @@ -441,12 +457,35 @@ func (k KubernetesClient) GetPod(ctx context.Context, input GetPodInput) (corev1
}

if pod == nil {
return corev1.Pod{}, errors.New("pod is nil")
return corev1.Pod{}, &NotFoundError{"pod is nil"}
}

return *pod, err
}

// GetSecretInput contains all the information needed to get a Kubernetes secret
type GetSecretInput struct {
// Name is the Kubernetes secret name
Name string

// Namespace is the Kubernetes namespace
Namespace string
}

// GetSecret gets a Kubernetes secret
func (k KubernetesClient) GetSecret(ctx context.Context, input GetSecretInput) (corev1.Secret, error) {
secret, err := k.Client.CoreV1().Secrets(input.Namespace).Get(ctx, input.Name, metav1.GetOptions{})
if err != nil {
return corev1.Secret{}, err
}

if secret == nil {
return corev1.Secret{}, &NotFoundError{"secret is nil"}
}

return *secret, err
}

// LabelNodeInput contains all the information needed to label a Kubernetes node
type LabelNodeInput struct {
// Name is the Kubernetes node name
Expand All @@ -465,7 +504,7 @@ func (k KubernetesClient) LabelNode(ctx context.Context, input LabelNodeInput) e
}

if node == nil {
return errors.New("node is nil")
return &NotFoundError{"node is nil"}
}

keyPath := fmt.Sprintf("/metadata/labels/%s", jsonpointer.Escape(input.Key))
Expand Down Expand Up @@ -502,6 +541,10 @@ func (k KubernetesClient) ListClusterTriggerAuthentications(ctx context.Context,
return []kedav1alpha1.ClusterTriggerAuthentication{}, err
}

if response == nil || len(response.Items) == 0 {
return []kedav1alpha1.ClusterTriggerAuthentication{}, &EmptyResultsError{"cluster trigger authentications is nil"}
}

triggerAuthentications := []kedav1alpha1.ClusterTriggerAuthentication{}
for _, triggerAuthentication := range response.Items {
var ta kedav1alpha1.ClusterTriggerAuthentication
Expand Down Expand Up @@ -537,8 +580,8 @@ func (k KubernetesClient) ListCronJobs(ctx context.Context, input ListCronJobsIn
return []batchv1.CronJob{}, err
}

if cronJobs == nil {
return []batchv1.CronJob{}, errors.New("cron jobs is nil")
if cronJobs == nil || len(cronJobs.Items) == 0 {
return []batchv1.CronJob{}, &EmptyResultsError{"cron jobs is nil"}
}

return cronJobs.Items, err
Expand All @@ -561,8 +604,8 @@ func (k KubernetesClient) ListDeployments(ctx context.Context, input ListDeploym
return []appsv1.Deployment{}, err
}

if deployments == nil {
return []appsv1.Deployment{}, errors.New("deployments is nil")
if deployments == nil || len(deployments.Items) == 0 {
return []appsv1.Deployment{}, &EmptyResultsError{"deployments list is nil"}
}

return deployments.Items, nil
Expand All @@ -585,8 +628,8 @@ func (k KubernetesClient) ListIngresses(ctx context.Context, input ListIngresses
return []networkingv1.Ingress{}, err
}

if ingresses == nil {
return []networkingv1.Ingress{}, errors.New("ingresses is nil")
if ingresses == nil || len(ingresses.Items) == 0 {
return []networkingv1.Ingress{}, &EmptyResultsError{"ingresses is nil"}
}

return ingresses.Items, nil
Expand All @@ -598,8 +641,8 @@ func (k KubernetesClient) ListNamespaces(ctx context.Context) ([]corev1.Namespac
if err != nil {
return []corev1.Namespace{}, err
}
if namespaces == nil {
return []corev1.Namespace{}, errors.New("namespaces is nil")
if namespaces == nil || len(namespaces.Items) == 0 {
return []corev1.Namespace{}, &EmptyResultsError{"namespaces list is nil"}
}

return namespaces.Items, nil
Expand All @@ -623,8 +666,8 @@ func (k KubernetesClient) ListNodes(ctx context.Context, input ListNodesInput) (
return []corev1.Node{}, err
}

if nodeList == nil {
return []corev1.Node{}, errors.New("pod list is nil")
if nodeList == nil || len(nodeList.Items) == 0 {
return []corev1.Node{}, &EmptyResultsError{"pod list is nil"}
}

return nodeList.Items, err
Expand All @@ -647,8 +690,8 @@ func (k KubernetesClient) ListPods(ctx context.Context, input ListPodsInput) ([]
return []corev1.Pod{}, err
}

if podList == nil {
return []corev1.Pod{}, errors.New("pod list is nil")
if podList == nil || len(podList.Items) == 0 {
return []corev1.Pod{}, &EmptyResultsError{"pod list is nil"}
}

return podList.Items, err
Expand Down Expand Up @@ -678,6 +721,10 @@ func (k KubernetesClient) ListTriggerAuthentications(ctx context.Context, input
return []kedav1alpha1.TriggerAuthentication{}, err
}

if response == nil || len(response.Items) == 0 {
return []kedav1alpha1.TriggerAuthentication{}, &EmptyResultsError{"trigger authentications is nil"}
}

triggerAuthentications := []kedav1alpha1.TriggerAuthentication{}
for _, triggerAuthentication := range response.Items {
var ta kedav1alpha1.TriggerAuthentication
Expand Down
34 changes: 25 additions & 9 deletions plugins/scheduler-k3s/triggers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,32 @@ func TriggerSchedulerRun(scheduler string, appName string, envCount int, args []
}

attachToPod := os.Getenv("DOKKU_DETACH_CONTAINER") != "1"

clientset, err := NewKubernetesClient()
if err != nil {
return fmt.Errorf("Error creating kubernetes client: %w", err)
}

if err := clientset.Ping(); err != nil {
return fmt.Errorf("kubernetes api not available: %w", err)
}

imagePullSecrets := getComputedImagePullSecrets(appName)
if imagePullSecrets == "" {
imagePullSecrets = fmt.Sprintf("ims-%s.%d", appName, deploymentID)
_, err := clientset.GetSecret(context.Background(), GetSecretInput{
Name: imagePullSecrets,
Namespace: namespace,
})

if err != nil {
if _, ok := err.(*NotFoundError); !ok {
return fmt.Errorf("Error getting image pull secret: %w", err)
}
imagePullSecrets = ""
}
}

workingDir := common.GetWorkingDir(appName, image)
job, err := templateKubernetesJob(Job{
AppName: appName,
Expand All @@ -1132,15 +1157,6 @@ func TriggerSchedulerRun(scheduler string, appName string, envCount int, args []
color.NoColor = false
}

clientset, err := NewKubernetesClient()
if err != nil {
return fmt.Errorf("Error creating kubernetes client: %w", err)
}

if err := clientset.Ping(); err != nil {
return fmt.Errorf("kubernetes api not available: %w", err)
}

ctx, cancel := context.WithCancel(context.Background())
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt, syscall.SIGHUP,
Expand Down

0 comments on commit 5fcbd5b

Please sign in to comment.