-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix backup post hook issue: hook executes before the backup is finished Signed-off-by: Wenkai Yin(尹文开) <[email protected]>
- Loading branch information
Showing
23 changed files
with
1,510 additions
and
1,343 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
filename: "{{.InterfaceName | snakecase}}.go" | ||
outpkg: mocks | ||
packages: | ||
github.com/vmware-tanzu/velero/pkg/podvolume: | ||
interfaces: | ||
Backupper: | ||
config: | ||
dir: pkg/podvolume/mocks | ||
github.com/vmware-tanzu/velero/pkg/podexec: | ||
interfaces: | ||
PodCommandExecutor: | ||
config: | ||
dir: pkg/podexec/mocks | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fix backup post hook issue |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
package hook | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
"time" | ||
|
||
"github.com/pkg/errors" | ||
"github.com/sirupsen/logrus" | ||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | ||
|
||
"github.com/vmware-tanzu/velero/pkg/podexec" | ||
"github.com/vmware-tanzu/velero/pkg/podvolume" | ||
) | ||
|
||
// Handler handles all the hooks of one backup or restore. | ||
// | ||
// The pod's backup post hooks cannot execute until the PVBs are processed, Handler leverages the podvolume.Backupper to check this, | ||
// and the podvolume.Backupper is per backup/restore, so one instance of Handler can only handle the hooks for one backup or restore. | ||
// | ||
// Handler only handles the hooks of pods for now, but it can be extended | ||
// to other resources or even the hook defined for backup/restore if needed | ||
type Handler interface { | ||
// HandleResourceHooks handles a group of same type hooks for a specific resource, e.g. handles all backup pre hooks for one pod. | ||
// Because whether to execute the hook may depend on the execution result of previous hooks (e.g. hooks will not execute | ||
// if the previous hook is failed and marked as not continue), this function accepts a hook list as a group to handle. | ||
// | ||
// This function blocks until the hook completed, use "AsyncHandleResourceHooks()" instead if you want to handle the hooks asynchronously. | ||
// | ||
// The execution results are returned and also tracked inside the handler, calling the "WaitAllResourceHooksCompleted()" returns the results. | ||
// | ||
// This function only handles the hooks of pod for now, but it can be extended to other resources easily | ||
HandleResourceHooks(ctx context.Context, log logrus.FieldLogger, resource *unstructured.Unstructured, hooks []*ResourceHook) []*ResourceHookResult | ||
// AsyncHandleResourceHooks is the asynchronous version of "HandleResourceHooks()". | ||
// | ||
// Call "WaitAllHooksCompleted()" to wait all hooks completed and get the results. | ||
AsyncHandleResourceHooks(ctx context.Context, log logrus.FieldLogger, resource *unstructured.Unstructured, hooks []*ResourceHook) | ||
// WaitAllResourceHooksCompleted waits resource hooks completed and returns the execution results | ||
WaitAllResourceHooksCompleted(ctx context.Context, log logrus.FieldLogger) *ResourceHookResults | ||
} | ||
|
||
// make sure "handler" implements "Handler" interface | ||
var _ Handler = &handler{} | ||
|
||
func NewHandler(podVolumeBackupper podvolume.Backupper, podCommandExecutor podexec.PodCommandExecutor) Handler { | ||
return &handler{ | ||
WaitGroup: &sync.WaitGroup{}, | ||
results: &ResourceHookResults{ | ||
RWMutex: &sync.RWMutex{}, | ||
Results: []*ResourceHookResult{}, | ||
}, | ||
podVolumeBackupper: podVolumeBackupper, | ||
podCommandExecutor: podCommandExecutor, | ||
} | ||
} | ||
|
||
type handler struct { | ||
*sync.WaitGroup | ||
results *ResourceHookResults | ||
podVolumeBackupper podvolume.Backupper | ||
podCommandExecutor podexec.PodCommandExecutor | ||
} | ||
|
||
func (h *handler) HandleResourceHooks(ctx context.Context, log logrus.FieldLogger, resource *unstructured.Unstructured, hooks []*ResourceHook) []*ResourceHookResult { | ||
if len(hooks) == 0 { | ||
return nil | ||
} | ||
|
||
var results []*ResourceHookResult | ||
// make sure the results are tracked inside the handler | ||
defer func() { | ||
for _, result := range results { | ||
h.results.AddResult(result) | ||
} | ||
}() | ||
|
||
markHooksFailed := func(hooks []*ResourceHook, err error) []*ResourceHookResult { | ||
now := time.Now() | ||
for _, hook := range hooks { | ||
results = append(results, &ResourceHookResult{ | ||
Hook: hook, | ||
Status: StatusFailed, | ||
StartTime: now, | ||
EndTime: now, | ||
Error: err, | ||
}) | ||
} | ||
return results | ||
} | ||
|
||
resourceHookHandler, err := h.getResourceHookHandler(hooks[0].Type) | ||
if err != nil { | ||
return markHooksFailed(hooks, errors.Wrapf(err, "failed to get the resource hook handler for type %q", hooks[0].Type)) | ||
} | ||
|
||
if err = resourceHookHandler.WaitUntilReadyToExec(ctx, log, resource); err != nil { | ||
return markHooksFailed(hooks, errors.Wrap(err, "failed to wait ready to execute hook")) | ||
} | ||
|
||
skip := false | ||
for _, hook := range hooks { | ||
now := time.Now() | ||
result := &ResourceHookResult{ | ||
Hook: hook, | ||
StartTime: now, | ||
} | ||
|
||
// skip the execution of the following hooks if the execution of last hook failed | ||
// and is marked as not continue | ||
if skip { | ||
result.Status = StatusSkipped | ||
result.EndTime = now | ||
results = append(results, result) | ||
continue | ||
} | ||
|
||
// execution failed | ||
if err = resourceHookHandler.Exec(ctx, log, resource, hook); err != nil { | ||
result.Status = StatusFailed | ||
result.EndTime = time.Now() | ||
result.Error = err | ||
results = append(results, result) | ||
if !hook.ContinueOnError { | ||
skip = true | ||
} | ||
continue | ||
} | ||
|
||
// execution completed | ||
result.Status = StatusCompleted | ||
result.EndTime = time.Now() | ||
results = append(results, result) | ||
} | ||
|
||
return results | ||
} | ||
|
||
func (h *handler) AsyncHandleResourceHooks(ctx context.Context, log logrus.FieldLogger, resource *unstructured.Unstructured, hooks []*ResourceHook) { | ||
n := len(hooks) | ||
h.WaitGroup.Add(n) | ||
go func() { | ||
defer func() { | ||
for i := 0; i < n; i++ { | ||
h.Done() | ||
} | ||
}() | ||
|
||
results := h.HandleResourceHooks(ctx, log, resource, hooks) | ||
_ = results | ||
}() | ||
} | ||
|
||
func (h *handler) WaitAllResourceHooksCompleted(ctx context.Context, log logrus.FieldLogger) *ResourceHookResults { | ||
h.Wait() | ||
return h.results | ||
} | ||
|
||
func (h *handler) getResourceHookHandler(hookType string) (ResourceHookHandler, error) { | ||
switch hookType { | ||
case TypePodBackupPreHook: | ||
return NewPodBackupPreHookHandler(h.podCommandExecutor), nil | ||
case TypePodBackupPostHook: | ||
return NewPodBackupPostHookHandler(h.podVolumeBackupper, h.podCommandExecutor), nil | ||
default: | ||
return nil, errors.Errorf("unknown hook type %q", hookType) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,189 @@ | ||
package hook | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"testing" | ||
|
||
"github.com/sirupsen/logrus" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/mock" | ||
"github.com/stretchr/testify/require" | ||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | ||
|
||
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" | ||
mock_podexec "github.com/vmware-tanzu/velero/pkg/podexec/mocks" | ||
mock_podvolume "github.com/vmware-tanzu/velero/pkg/podvolume/mocks" | ||
) | ||
|
||
const ( | ||
pod = `{ | ||
"apiVersion": "v1", | ||
"kind": "Pod", | ||
"metadata": { | ||
"name": "nginx", | ||
"namespace": "nginx", | ||
"labels": { | ||
"app": "nginx" | ||
} | ||
}, | ||
"spec": { | ||
"containers": [ | ||
{ | ||
"name": "nginx", | ||
"image": "nginx:1.14.2", | ||
"ports": [ | ||
{ | ||
"containerPort": 80 | ||
} | ||
] | ||
} | ||
] | ||
} | ||
} | ||
` | ||
) | ||
|
||
func TestHandleResourceHooks(t *testing.T) { | ||
podvolumeBackupper := mock_podvolume.NewMockBackupper(t) | ||
podCMDExecutor := mock_podexec.NewMockPodCommandExecutor(t) | ||
handler := NewHandler(podvolumeBackupper, podCMDExecutor) | ||
ctx := context.Background() | ||
log := logrus.New() | ||
res := &unstructured.Unstructured{} | ||
err := res.UnmarshalJSON([]byte(pod)) | ||
require.NoError(t, err) | ||
var hooks []*ResourceHook | ||
|
||
// empty hooks list | ||
results := handler.HandleResourceHooks(ctx, log, res, hooks) | ||
require.Empty(t, results) | ||
|
||
// unknown hooks | ||
hooks = []*ResourceHook{ | ||
{ | ||
Name: "hook01", | ||
Type: "unknown", | ||
}, | ||
{ | ||
Name: "hook02", | ||
Type: "unknown", | ||
}, | ||
} | ||
results = handler.HandleResourceHooks(ctx, log, res, hooks) | ||
require.Len(t, results, 2) | ||
assert.Equal(t, StatusFailed, results[0].Status) | ||
assert.Equal(t, StatusFailed, results[1].Status) | ||
|
||
// skip other hooks if the former one failed and marked as not continue | ||
podCMDExecutor.On("ExecutePodCommand", mock.Anything, mock.Anything, mock.Anything, | ||
mock.Anything, mock.Anything, mock.Anything).Return(errors.New("failed to exec command")) | ||
hooks = []*ResourceHook{ | ||
{ | ||
Name: "hook01", | ||
Type: TypePodBackupPreHook, | ||
Spec: &velerov1.ExecHook{}, | ||
Resource: res, | ||
ContinueOnError: true, | ||
}, | ||
{ | ||
Name: "hook02", | ||
Type: TypePodBackupPreHook, | ||
Spec: &velerov1.ExecHook{}, | ||
Resource: res, | ||
ContinueOnError: false, | ||
}, | ||
{ | ||
Name: "hook03", | ||
Type: TypePodBackupPreHook, | ||
Spec: &velerov1.ExecHook{}, | ||
Resource: res, | ||
ContinueOnError: false, | ||
}, | ||
} | ||
results = handler.HandleResourceHooks(ctx, log, res, hooks) | ||
require.Len(t, results, 3) | ||
assert.Equal(t, StatusFailed, results[0].Status) | ||
assert.Equal(t, StatusFailed, results[1].Status) | ||
assert.Equal(t, StatusSkipped, results[2].Status) | ||
|
||
// all completed | ||
podCMDExecutor.On("ExecutePodCommand").Unset() | ||
podCMDExecutor.On("ExecutePodCommand", mock.Anything, mock.Anything, mock.Anything, | ||
mock.Anything, mock.Anything, mock.Anything).Return(nil) | ||
hooks = []*ResourceHook{ | ||
{ | ||
Name: "hook01", | ||
Type: TypePodBackupPreHook, | ||
Spec: &velerov1.ExecHook{}, | ||
Resource: res, | ||
ContinueOnError: true, | ||
}, | ||
{ | ||
Name: "hook02", | ||
Type: TypePodBackupPreHook, | ||
Spec: &velerov1.ExecHook{}, | ||
Resource: res, | ||
ContinueOnError: false, | ||
}, | ||
} | ||
results = handler.HandleResourceHooks(ctx, log, res, hooks) | ||
require.Len(t, results, 2) | ||
assert.Equal(t, StatusCompleted, results[0].Status) | ||
assert.Equal(t, StatusCompleted, results[1].Status) | ||
} | ||
|
||
func TestAsyncHandleResourceHooksAndWaitAllResourceHooksCompleted(t *testing.T) { | ||
podvolumeBackupper := mock_podvolume.NewMockBackupper(t) | ||
podCMDExecutor := mock_podexec.NewMockPodCommandExecutor(t) | ||
handler := NewHandler(podvolumeBackupper, podCMDExecutor) | ||
ctx := context.Background() | ||
log := logrus.New() | ||
res := &unstructured.Unstructured{} | ||
err := res.UnmarshalJSON([]byte(pod)) | ||
require.NoError(t, err) | ||
|
||
podCMDExecutor.On("ExecutePodCommand", mock.Anything, mock.Anything, mock.Anything, | ||
mock.Anything, mock.Anything, mock.Anything).Return(nil) | ||
hooks := []*ResourceHook{ | ||
{ | ||
Name: "hook01", | ||
Type: TypePodBackupPreHook, | ||
Spec: &velerov1.ExecHook{}, | ||
Resource: res, | ||
ContinueOnError: true, | ||
}, | ||
{ | ||
Name: "hook02", | ||
Type: TypePodBackupPreHook, | ||
Spec: &velerov1.ExecHook{}, | ||
Resource: res, | ||
ContinueOnError: false, | ||
}, | ||
} | ||
handler.AsyncHandleResourceHooks(ctx, log, res, hooks) | ||
results := handler.WaitAllResourceHooksCompleted(ctx, log) | ||
require.NotNil(t, results) | ||
require.Equal(t, 2, results.Total) | ||
require.Equal(t, 2, results.Completed) | ||
assert.Equal(t, StatusCompleted, results.Results[0].Status) | ||
assert.Equal(t, StatusCompleted, results.Results[1].Status) | ||
} | ||
|
||
func Test_getResourceHookHandler(t *testing.T) { | ||
handler := &handler{} | ||
|
||
// pod backup pre hook | ||
resourceHookHandler, err := handler.getResourceHookHandler(TypePodBackupPreHook) | ||
require.NoError(t, err) | ||
assert.NotNil(t, resourceHookHandler) | ||
|
||
// pod backup post hook | ||
resourceHookHandler, err = handler.getResourceHookHandler(TypePodBackupPostHook) | ||
require.NoError(t, err) | ||
assert.NotNil(t, resourceHookHandler) | ||
|
||
// unknown hook | ||
_, err = handler.getResourceHookHandler("unknown") | ||
require.Error(t, err) | ||
} |
Oops, something went wrong.