Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: periodically prune orphan Vault Clients #813

Draft
wants to merge 1 commit into
base: VAULT-27944/core-prune-orphaned-vault-clients
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions internal/vault/client_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ const (
ClientCallbackOnCacheRemoval
)

// defaultPruneOrphanAge is the default age at which orphaned clients are
// eligible for pruning.
var defaultPruneOrphanAge = 1 * time.Minute

func (o ClientCallbackOn) String() string {
switch o {
case ClientCallbackOnLifetimeWatcherDone:
Expand Down Expand Up @@ -927,6 +931,7 @@ func (m *cachingClientFactory) startOrphanClientPruner(ctx context.Context) {
ctx_, cancel := context.WithCancel(ctx)
m.orphanPrunerCancel = cancel
// TODO: make period a command line option
ticker := time.NewTicker(30 * time.Minute)
go func() {
defer func() {
close(m.orphanPrunerClientCh)
Expand Down Expand Up @@ -957,11 +962,78 @@ func (m *cachingClientFactory) startOrphanClientPruner(ctx context.Context) {
m.cache.Remove(cacheKey)
}
}
case <-ticker.C:
// catch-all for the pruner
if count, err := m.pruneOrphanClients(ctx); err != nil {
logger.Error(err, "Prune orphan Vault Clients", "trigger", "tick")
} else {
logger.Info("Prune orphan Vault Clients", "count", count, "trigger", "tick")
}
}
}
}()
}

// pruneOrphanClients will remove all clients from the cache that are not
// associated with any of the following custom resources:
// secretsv1beta1.VaultStaticSecret, secretsv1beta1.VaultPKISecret,
// secretsv1beta1.VaultDynamicSecret.
//
// The function will return the number of clients pruned. No clients will be
// pruned if an error occurs when getting the custom resources.
func (m *cachingClientFactory) pruneOrphanClients(ctx context.Context) (int, error) {
m.mu.Lock()
defer m.mu.Unlock()

logger := m.logger.WithName("pruneOrphanClients")

currentClientCacheKeys, err := GetGlobalVaultCacheKeys(ctx, m.ctrlClient)
if err != nil {
return 0, err
}

var toPrune []ClientCacheKey
for _, c := range m.cache.Values() {
key, err := c.GetCacheKey()
if err != nil {
continue
}

if _, ok := currentClientCacheKeys[key]; !ok {
if key == m.clientCacheKeyEncrypt {
continue
}
stat := c.Stat()
if stat == nil {
continue
}
// prune clients that have not been created in the last 5 minutes, this gives
// time for any referring resource to update their
// .status.vaultClientMeta.cacheKey
if stat.Age() >= defaultPruneOrphanAge {
toPrune = append(toPrune, key)
}
}
}

// TODO: ensure that this does not block forever...
var count int
wg := sync.WaitGroup{}
wg.Add(len(toPrune))
for _, key := range toPrune {
count++
go func() {
defer wg.Done()
m.cache.Remove(key)
}()
}
wg.Wait()

logger.V(consts.LogLevelDebug).Info(
"Pruned orphaned clients", "count", count, "pruned", toPrune)
return count, nil
}

// NewCachingClientFactory returns a CachingClientFactory with ClientCache initialized.
// The ClientCache's onEvictCallback is registered with the factory's onClientEvict(),
// to ensure any evictions are handled by the factory (this is very important).
Expand Down Expand Up @@ -1103,6 +1175,47 @@ type nullEventRecorder struct {

func (n *nullEventRecorder) Event(_ runtime.Object, _, _, _ string) {}

// GetGlobalVaultCacheKeys returns the current set of vault.ClientCacheKey(s) that are in
// use.
func GetGlobalVaultCacheKeys(ctx context.Context, client ctrlclient.Client) (map[ClientCacheKey]int, error) {
currentClientCacheKeys := map[ClientCacheKey]int{}
addCurrentClientCacheKeys := func(meta secretsv1beta1.VaultClientMeta) {
if meta.CacheKey != "" {
key := ClientCacheKey(meta.CacheKey)
currentClientCacheKeys[key] = currentClientCacheKeys[key] + 1
}
}

var vssList secretsv1beta1.VaultStaticSecretList
err := client.List(ctx, &vssList)
if err != nil {
return nil, err
}

for _, o := range vssList.Items {
addCurrentClientCacheKeys(o.Status.VaultClientMeta)
}
var vpsList secretsv1beta1.VaultPKISecretList
err = client.List(ctx, &vpsList)
if err != nil {
return nil, err
}
for _, o := range vpsList.Items {
addCurrentClientCacheKeys(o.Status.VaultClientMeta)
}

var vdsList secretsv1beta1.VaultDynamicSecretList
err = client.List(ctx, &vdsList)
if err != nil {
return nil, err
}
for _, o := range vdsList.Items {
addCurrentClientCacheKeys(o.Status.VaultClientMeta)
}

return currentClientCacheKeys, nil
}

// getVaultClientMeta returns the VaultClientMeta for the provided Object. It
// supports these types: VaultStaticSecret, VaultPKISecret, VaultDynamicSecret.
//
Expand Down
216 changes: 216 additions & 0 deletions internal/vault/client_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@ package vault

import (
"context"
"errors"
"fmt"
"sync"
"testing"
"time"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

secretsv1beta1 "github.com/hashicorp/vault-secrets-operator/api/v1beta1"
Expand Down Expand Up @@ -308,6 +313,217 @@ func Test_cachingClientFactory_callClientCallbacks(t *testing.T) {
}
}

func Test_cachingClientFactory_pruneOrphanClients(t *testing.T) {
t.Parallel()

ctx := context.Background()

type keyTest struct {
key ClientCacheKey
creationTimeOffset time.Duration
}

newCache := func(size int, keyTests ...keyTest) *clientCache {
lruCache, err := lru.NewWithEvict[ClientCacheKey, Client](size, nil)
require.NoError(t, err)
c := &clientCache{
cache: lruCache,
}
for _, k := range keyTests {
c.cache.Add(k.key, &stubClient{
cacheKey: k.key,
clientStat: &clientStat{
creationTimestamp: time.Now().Add(k.creationTimeOffset),
},
})
}
return c
}

clientBuilder := newClientBuilder()
schemeLessBuilder := fake.NewClientBuilder()
tests := []struct {
name string
cache ClientCache
c ctrlclient.Client
want int
createFunc func(t *testing.T, c ctrlclient.Client) error
wantClientCacheKeys []ClientCacheKey
wantErr assert.ErrorAssertionFunc
}{
{
name: "empty-cache",
c: clientBuilder.Build(),
cache: newCache(1),
wantErr: assert.NoError,
},
{
name: "no-referring-objects-purge",
c: clientBuilder.Build(),
cache: newCache(1,
keyTest{
key: "kubernetes-123456",
creationTimeOffset: -defaultPruneOrphanAge,
},
),
want: 1,
wantErr: assert.NoError,
},
{
name: "prune-some",
c: clientBuilder.Build(),
createFunc: func(t *testing.T, c ctrlclient.Client) error {
t.Helper()
var errs error
for _, o := range []ctrlclient.Object{
&secretsv1beta1.VaultStaticSecret{
ObjectMeta: metav1.ObjectMeta{
Name: "vss-1",
Namespace: "default",
},
Status: secretsv1beta1.VaultStaticSecretStatus{
VaultClientMeta: secretsv1beta1.VaultClientMeta{
CacheKey: "kubernetes-123456",
},
},
},
} {
errs = errors.Join(errs, c.Create(ctx, o))
}
return errs
},
cache: newCache(2,
keyTest{
key: "kubernetes-123455",
creationTimeOffset: -defaultPruneOrphanAge,
},
keyTest{
key: "kubernetes-123456",
creationTimeOffset: -defaultPruneOrphanAge,
},
),
wantClientCacheKeys: []ClientCacheKey{
ClientCacheKey("kubernetes-123456"),
},
want: 1,
wantErr: assert.NoError,
},
{
name: "none",
c: clientBuilder.Build(),
cache: newCache(1,
keyTest{
key: "kubernetes-123456",
creationTimeOffset: -defaultPruneOrphanAge,
},
),
createFunc: func(t *testing.T, c ctrlclient.Client) error {
t.Helper()
var errs error
for _, o := range []ctrlclient.Object{
&secretsv1beta1.VaultStaticSecret{
ObjectMeta: metav1.ObjectMeta{
Name: "vss-1",
Namespace: "default",
},
Status: secretsv1beta1.VaultStaticSecretStatus{
VaultClientMeta: secretsv1beta1.VaultClientMeta{
CacheKey: "kubernetes-123456",
},
},
},
&secretsv1beta1.VaultPKISecret{
ObjectMeta: metav1.ObjectMeta{
Name: "vps-1",
Namespace: "default",
},
Status: secretsv1beta1.VaultPKISecretStatus{
VaultClientMeta: secretsv1beta1.VaultClientMeta{
CacheKey: "kubernetes-123456",
},
},
},
&secretsv1beta1.VaultDynamicSecret{
ObjectMeta: metav1.ObjectMeta{
Name: "vds-1",
Namespace: "default",
},
Status: secretsv1beta1.VaultDynamicSecretStatus{
VaultClientMeta: secretsv1beta1.VaultClientMeta{
CacheKey: "kubernetes-123456",
},
},
},
} {
errs = errors.Join(errs, c.Create(ctx, o))
}
return errs
},
wantErr: assert.NoError,
wantClientCacheKeys: []ClientCacheKey{
ClientCacheKey("kubernetes-123456"),
},
want: 0,
},
{
name: "no-prune-recent",
c: clientBuilder.Build(),
cache: newCache(2,
keyTest{
key: "kubernetes-123455",
creationTimeOffset: -(defaultPruneOrphanAge - time.Second*1),
},
keyTest{
key: "kubernetes-123456",
creationTimeOffset: -(defaultPruneOrphanAge - time.Second*1),
},
),
wantClientCacheKeys: []ClientCacheKey{
ClientCacheKey("kubernetes-123455"),
ClientCacheKey("kubernetes-123456"),
},
want: 0,
wantErr: assert.NoError,
},
{
name: "vss-scheme-not-set",
c: schemeLessBuilder.Build(),
cache: newCache(1,
keyTest{
key: "kubernetes-123456",
creationTimeOffset: -defaultPruneOrphanAge,
},
),
want: 0,
wantClientCacheKeys: []ClientCacheKey{
ClientCacheKey("kubernetes-123456"),
},
wantErr: func(t assert.TestingT, err error, msgAndArgs ...interface{}) bool {
return assert.ErrorContains(t, err, "no kind is registered for the type v1beta1.VaultStaticSecret")
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &cachingClientFactory{
cache: tt.cache,
ctrlClient: tt.c,
}

if tt.createFunc != nil {
require.NoError(t, tt.createFunc(t, tt.c))
}

got, err := m.pruneOrphanClients(ctx)
if !tt.wantErr(t, err, fmt.Sprintf("pruneOrphanClients(%v)", ctx)) {
return
}
assert.Equalf(t, tt.want, got, "pruneOrphanClients(%v)", ctx)
assert.ElementsMatchf(t, tt.wantClientCacheKeys, m.cache.Keys(), "pruneOrphanClients(%v)", ctx)
})
}
}

// newClientBuilder returns a new fake.ClientBuilder with the necessary schemes.
// copied from helpers
func newClientBuilder() *fake.ClientBuilder {
Expand Down