Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support cancelling restores #250

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions internal/helpers/helper_types.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
package helpers

import (
"context"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// LagoonEnvironmentVariable is used to define Lagoon environment variables.
type LagoonEnvironmentVariable struct {
Name string `json:"name"`
Expand Down Expand Up @@ -31,3 +39,27 @@ type LagoonAPIConfiguration struct {
SSHHost string
SSHPort string
}

func K8UPVersions(ctx context.Context, c client.Client) (bool, bool, error) {
k8upv1alpha1Exists := false
k8upv1Exists := false
crdv1alpha1 := &apiextensionsv1.CustomResourceDefinition{}
if err := c.Get(context.TODO(), types.NamespacedName{Name: "restores.backup.appuio.ch"}, crdv1alpha1); err != nil {
if err := IgnoreNotFound(err); err != nil {
return k8upv1alpha1Exists, k8upv1Exists, err
}
}
if crdv1alpha1.ObjectMeta.Name == "restores.backup.appuio.ch" {
k8upv1alpha1Exists = true
}
crdv1 := &apiextensionsv1.CustomResourceDefinition{}
if err := c.Get(context.TODO(), types.NamespacedName{Name: "restores.k8up.io"}, crdv1); err != nil {
if err := IgnoreNotFound(err); err != nil {
return k8upv1alpha1Exists, k8upv1Exists, err
}
}
if crdv1.ObjectMeta.Name == "restores.k8up.io" {
k8upv1Exists = true
}
return k8upv1alpha1Exists, k8upv1Exists, nil
}
46 changes: 45 additions & 1 deletion internal/messenger/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,14 +406,25 @@ func (m *Messenger) Consumer(targetName string) { //error {
}
}
case "deploytarget:restic:backup:restore", "kubernetes:restic:backup:restore":
v1alpha1, v1, err := helpers.K8UPVersions(ctx, m.Client)
if err != nil {
//@TODO: send msg back to lagoon and update task to failed?
message.Ack(false) // ack to remove from queue
return
}
if !v1alpha1 && !v1 {
// k8up not installed
message.Ack(false) // ack to remove from queue
return
}
opLog.Info(
fmt.Sprintf(
"Received backup restoration for project %s, environment %s",
jobSpec.Project.Name,
jobSpec.Environment.Name,
),
)
err := m.ResticRestore(namespace, jobSpec)
err = m.ResticRestore(ctx, namespace, jobSpec, v1alpha1, v1, false)
if err != nil {
opLog.Error(err,
fmt.Sprintf(
Expand All @@ -426,6 +437,39 @@ func (m *Messenger) Consumer(targetName string) { //error {
message.Ack(false) // ack to remove from queue
return
}
case "deploytarget:restic:cancel:restore":
v1alpha1, v1, err := helpers.K8UPVersions(ctx, m.Client)
if err != nil {
//@TODO: send msg back to lagoon and update task to failed?
message.Ack(false) // ack to remove from queue
return
}
if !v1alpha1 && !v1 {
// k8up not installed
message.Ack(false) // ack to remove from queue
return
}
// if this is a request to cancel a restore attempt
opLog.Info(
fmt.Sprintf(
"Received restore cancellation for project %s, environment %s",
jobSpec.Project.Name,
jobSpec.Environment.Name,
),
)
err = m.ResticRestore(ctx, namespace, jobSpec, v1alpha1, v1, true)
if err != nil {
opLog.Error(err,
fmt.Sprintf(
"Cancel restore for project %s, environment %s failed",
jobSpec.Project.Name,
jobSpec.Environment.Name,
),
)
//@TODO: send msg back to lagoon and update task to failed?
message.Ack(false) // ack to remove from queue
return
}
case "deploytarget:route:migrate", "kubernetes:route:migrate", "openshift:route:migrate":
opLog.Info(
fmt.Sprintf(
Expand Down
164 changes: 122 additions & 42 deletions internal/messenger/tasks_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,24 @@ import (
"fmt"

"github.com/go-logr/logr"
"github.com/uselagoon/machinery/api/schema"
lagoonv1beta2 "github.com/uselagoon/remote-controller/api/lagoon/v1beta2"
"github.com/uselagoon/remote-controller/internal/helpers"
"k8s.io/apimachinery/pkg/types"

ctrl "sigs.k8s.io/controller-runtime"

k8upv1 "github.com/k8up-io/k8up/v2/api/v1"
k8upv1alpha1 "github.com/vshn/k8up/api/v1alpha1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/types"
)

type cancelRestore struct {
RestoreName string `json:"restoreName"`
BackupID string `json:"backupId"`
}

// ResticRestore handles creating the restic restore jobs.
func (m *Messenger) ResticRestore(namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error {
func (m *Messenger) ResticRestore(ctx context.Context, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec, v1alpha1, v1, cancel bool) error {
opLog := ctrl.Log.WithName("handlers").WithName("LagoonTasks")
vers, err := checkRestoreVersionFromCore(jobSpec.Misc.MiscResource)
if err != nil {
Expand All @@ -31,51 +37,41 @@ func (m *Messenger) ResticRestore(namespace string, jobSpec *lagoonv1beta2.Lagoo
return nil
}

// check if k8up crds exist in the cluster
k8upv1alpha1Exists := false
k8upv1Exists := false
crdv1alpha1 := &apiextensionsv1.CustomResourceDefinition{}
if err = m.Client.Get(context.TODO(), types.NamespacedName{Name: "restores.backup.appuio.ch"}, crdv1alpha1); err != nil {
if err := helpers.IgnoreNotFound(err); err != nil {
return err
}
}
if crdv1alpha1.ObjectMeta.Name == "restores.backup.appuio.ch" {
k8upv1alpha1Exists = true
}
crdv1 := &apiextensionsv1.CustomResourceDefinition{}
if err = m.Client.Get(context.TODO(), types.NamespacedName{Name: "restores.k8up.io"}, crdv1); err != nil {
if err := helpers.IgnoreNotFound(err); err != nil {
return err
}
}
if crdv1.ObjectMeta.Name == "restores.k8up.io" {
k8upv1Exists = true
}
handlev1alpha1 := false
handlev1 := false
// check the version, if there is no version in the payload, assume it is k8up v2
if m.SupportK8upV2 {
if vers == "backup.appuio.ch/v1alpha1" {
if k8upv1alpha1Exists {
return m.createv1alpha1Restore(opLog, namespace, jobSpec)
if v1alpha1 {
handlev1alpha1 = true
}
} else {
if k8upv1Exists {
if err := m.createv1Restore(opLog, namespace, jobSpec); err != nil {
return err
}
if v1 {
handlev1 = true
} else {
if k8upv1alpha1Exists {
if err := m.createv1alpha1Restore(opLog, namespace, jobSpec); err != nil {
return err
}
if v1alpha1 {
handlev1alpha1 = true
}
}
}
} else {
if k8upv1alpha1Exists {
if err := m.createv1alpha1Restore(opLog, namespace, jobSpec); err != nil {
return err
}
if v1alpha1 {
handlev1alpha1 = true
}
}

if handlev1alpha1 {
if cancel {
return m.cancelv1alpha1Restore(ctx, opLog, namespace, jobSpec)
} else {
return m.createv1alpha1Restore(ctx, opLog, namespace, jobSpec)
}
}
if handlev1 {
if cancel {
return m.cancelv1Restore(ctx, opLog, namespace, jobSpec)
} else {
return m.createv1Restore(ctx, opLog, namespace, jobSpec)
}
}
return nil
Expand All @@ -97,7 +93,7 @@ func checkRestoreVersionFromCore(resource []byte) (string, error) {
}

// createv1alpha1Restore will create a restore task using the restores.backup.appuio.ch v1alpha1 api (k8up v1)
func (m *Messenger) createv1alpha1Restore(opLog logr.Logger, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error {
func (m *Messenger) createv1alpha1Restore(ctx context.Context, opLog logr.Logger, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error {
restorev1alpha1 := &k8upv1alpha1.Restore{}
if err := json.Unmarshal(jobSpec.Misc.MiscResource, restorev1alpha1); err != nil {
opLog.Error(err,
Expand All @@ -109,7 +105,7 @@ func (m *Messenger) createv1alpha1Restore(opLog logr.Logger, namespace string, j
return err
}
restorev1alpha1.SetNamespace(namespace)
if err := m.Client.Create(context.Background(), restorev1alpha1); err != nil {
if err := m.Client.Create(ctx, restorev1alpha1); err != nil {
opLog.Error(err,
fmt.Sprintf(
"Unable to create restore %s with k8up v1alpha1 api.",
Expand All @@ -122,7 +118,7 @@ func (m *Messenger) createv1alpha1Restore(opLog logr.Logger, namespace string, j
}

// createv1Restore will create a restore task using the restores.k8up.io v1 api (k8up v2)
func (m *Messenger) createv1Restore(opLog logr.Logger, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error {
func (m *Messenger) createv1Restore(ctx context.Context, opLog logr.Logger, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error {
restorev1 := &k8upv1.Restore{}
if err := json.Unmarshal(jobSpec.Misc.MiscResource, restorev1); err != nil {
opLog.Error(err,
Expand All @@ -134,7 +130,7 @@ func (m *Messenger) createv1Restore(opLog logr.Logger, namespace string, jobSpec
return err
}
restorev1.SetNamespace(namespace)
if err := m.Client.Create(context.Background(), restorev1); err != nil {
if err := m.Client.Create(ctx, restorev1); err != nil {
opLog.Error(err,
fmt.Sprintf(
"Unable to create restore %s with k8up v1 api.",
Expand All @@ -145,3 +141,87 @@ func (m *Messenger) createv1Restore(opLog logr.Logger, namespace string, jobSpec
}
return nil
}

// cancelv1alpha1Restore will attempt to cancel a restore task using the restores.backup.appuio.ch v1alpha1 api (k8up v1)
func (m *Messenger) cancelv1alpha1Restore(ctx context.Context, opLog logr.Logger, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error {
restorev1alpha1 := &k8upv1alpha1.Restore{}
cr := &cancelRestore{}
if err := json.Unmarshal(jobSpec.Misc.MiscResource, &cr); err != nil {
return err
}
if err := m.Client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: cr.RestoreName}, restorev1alpha1); helpers.IgnoreNotFound(err) != nil {
opLog.Error(err,
fmt.Sprintf(
"Unable to get restore %s with k8up v1alpha1 api.",
cr.RestoreName,
),
)
return err
}
if restorev1alpha1.Name != "" {
if err := m.Client.Delete(ctx, restorev1alpha1); err != nil {
opLog.Error(err,
fmt.Sprintf(
"Unable to delete restore %s with k8up v1alpha1 api.",
cr.RestoreName,
),
)
return err
}
}
// if no matching restore found, or the restore is deleted, send the cancellation message back to core
m.pubRestoreCancel(opLog, namespace, cr.RestoreName, jobSpec)
return nil
}

// cancelv1Restore will attempt to cancel a restore task using the restores.k8up.io v1 api (k8up v2)
func (m *Messenger) cancelv1Restore(ctx context.Context, opLog logr.Logger, namespace string, jobSpec *lagoonv1beta2.LagoonTaskSpec) error {
restorev1 := &k8upv1.Restore{}
cr := &cancelRestore{}
if err := json.Unmarshal(jobSpec.Misc.MiscResource, &cr); err != nil {
return err
}
if err := m.Client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: cr.RestoreName}, restorev1); helpers.IgnoreNotFound(err) != nil {
opLog.Error(err,
fmt.Sprintf(
"Unable to get restore %s with k8up v1 api.",
cr.RestoreName,
),
)
return err
}
if restorev1.Name != "" {
if err := m.Client.Delete(ctx, restorev1); err != nil {
opLog.Error(err,
fmt.Sprintf(
"Unable to delete restore %s with k8up v1alpha1 api.",
cr.RestoreName,
),
)
return err
}
}
// if no matching restore found, or the restore is deleted, send the cancellation message back to core
m.pubRestoreCancel(opLog, namespace, cr.RestoreName, jobSpec)
return nil
}

func (m *Messenger) pubRestoreCancel(opLog logr.Logger, namespace, restorename string, jobSpec *lagoonv1beta2.LagoonTaskSpec) {
msg := schema.LagoonMessage{
Type: "restore:cancel",
Namespace: namespace,
Meta: &schema.LagoonLogMeta{
Environment: jobSpec.Environment.Name,
Project: jobSpec.Project.Name,
JobName: restorename,
},
}
msgBytes, err := json.Marshal(msg)
if err != nil {
opLog.Error(err, "Unable to encode message as JSON")
}
// publish the cancellation result back to lagoon
if err := m.Publish("lagoon-tasks:controller", msgBytes); err != nil {
opLog.Error(err, "Unable to publish message.")
}
}
30 changes: 30 additions & 0 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,36 @@ var _ = Describe("controller", Ordered, func() {
Expect(strings.TrimSpace(string(result))).To(Equal(string(testResult)))
}

By("validating that restore cancellations are working")
By("creating a restore cancellation task via rabbitmq")
cmd = exec.Command(
"curl",
"-s",
"-u",
"guest:guest",
"-H",
"'Accept: application/json'",
"-H",
"'Content-Type:application/json'",
"-X",
"POST",
"-d",
"@test/e2e/testdata/cancel-restore.json",
"http://172.17.0.1:15672/api/exchanges/%2f/lagoon-tasks/publish",
)
_, err = utils.Run(cmd)
ExpectWithOffset(1, err).NotTo(HaveOccurred())

time.Sleep(10 * time.Second)

By("validating that the restore is deleted")
cmd = exec.Command("kubectl", "get",
"restores.k8up.io", "restore-bf072a0-uqxqo4",
"-n", "nginx-example-main",
)
_, err = utils.Run(cmd)
ExpectWithOffset(1, err).To(HaveOccurred())

By("validating that the harbor robot credentials get rotated successfully")
cmd = exec.Command("kubectl", "get",
"pods", "-l", "control-plane=controller-manager",
Expand Down
17 changes: 17 additions & 0 deletions test/e2e/testdata/cancel-restore.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{"properties":{"delivery_mode":2},"routing_key":"ci-local-controller-kubernetes:misc",
"payload":"{
\"misc\":{
\"miscResource\":\"eyJyZXN0b3JlTmFtZSI6InJlc3RvcmUtYmYwNzJhMC11cXhxbzQiLCJiYWNrdXBJZCI6ImJmMDcyYTA5ZTE3NzI2ZGE1NGFkYzc5OTM2ZWM4NzQ1NTIxOTkzNTk5ZDQxMjExZGZjOTQ2NmRmZDViYzMyYTUifQ==\"
},
\"key\":\"deploytarget:restic:cancel:restore\",
\"environment\":{
\"name\":\"main\",
\"openshiftProjectName\":\"nginx-example-main\"
},
\"project\":{
\"name\":\"nginx-example\"
},
\"advancedTask\":{}
}",
"payload_encoding":"string"
}
Loading