Skip to content

Commit

Permalink
Core: periodically prune orphan Vault Clients
Browse files Browse the repository at this point in the history
This change adds a new periodic function that will prune any Vault
client that has no referring objects.
  • Loading branch information
benashz committed Jun 12, 2024
1 parent 3377c25 commit 1817961
Show file tree
Hide file tree
Showing 2 changed files with 329 additions and 0 deletions.
113 changes: 113 additions & 0 deletions internal/vault/client_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ const (
ClientCallbackOnCacheRemoval
)

// defaultPruneOrphanAge is the default age at which orphaned clients are
// eligible for pruning.
var defaultPruneOrphanAge = 1 * time.Minute

func (o ClientCallbackOn) String() string {
switch o {
case ClientCallbackOnLifetimeWatcherDone:
Expand Down Expand Up @@ -927,6 +931,7 @@ func (m *cachingClientFactory) startOrphanClientPruner(ctx context.Context) {
ctx_, cancel := context.WithCancel(ctx)
m.orphanPrunerCancel = cancel
// TODO: make period a command line option
ticker := time.NewTicker(30 * time.Minute)
go func() {
defer func() {
close(m.orphanPrunerClientCh)
Expand Down Expand Up @@ -957,11 +962,78 @@ func (m *cachingClientFactory) startOrphanClientPruner(ctx context.Context) {
m.cache.Remove(cacheKey)
}
}
case <-ticker.C:
// catch-all for the pruner
if count, err := m.pruneOrphanClients(ctx); err != nil {
logger.Error(err, "Prune orphan Vault Clients", "trigger", "tick")
} else {
logger.Info("Prune orphan Vault Clients", "count", count, "trigger", "tick")
}
}
}
}()
}

// pruneOrphanClients will remove all clients from the cache that are not
// associated with any of the following custom resources:
// secretsv1beta1.VaultStaticSecret, secretsv1beta1.VaultPKISecret,
// secretsv1beta1.VaultDynamicSecret.
//
// The function will return the number of clients pruned. No clients will be
// pruned if an error occurs when getting the custom resources.
func (m *cachingClientFactory) pruneOrphanClients(ctx context.Context) (int, error) {
m.mu.Lock()
defer m.mu.Unlock()

logger := m.logger.WithName("pruneOrphanClients")

currentClientCacheKeys, err := GetGlobalVaultCacheKeys(ctx, m.ctrlClient)
if err != nil {
return 0, err
}

var toPrune []ClientCacheKey
for _, c := range m.cache.Values() {
key, err := c.GetCacheKey()
if err != nil {
continue
}

if _, ok := currentClientCacheKeys[key]; !ok {
if key == m.clientCacheKeyEncrypt {
continue
}
stat := c.Stat()
if stat == nil {
continue
}
// prune clients that have not been created in the last 5 minutes, this gives
// time for any referring resource to update their
// .status.vaultClientMeta.cacheKey
if stat.Age() >= defaultPruneOrphanAge {
toPrune = append(toPrune, key)
}
}
}

// TODO: ensure that this does not block forever...
var count int
wg := sync.WaitGroup{}
wg.Add(len(toPrune))
for _, key := range toPrune {
count++
go func() {
defer wg.Done()
m.cache.Remove(key)
}()
}
wg.Wait()

logger.V(consts.LogLevelDebug).Info(
"Pruned orphaned clients", "count", count, "pruned", toPrune)
return count, nil
}

// NewCachingClientFactory returns a CachingClientFactory with ClientCache initialized.
// The ClientCache's onEvictCallback is registered with the factory's onClientEvict(),
// to ensure any evictions are handled by the factory (this is very important).
Expand Down Expand Up @@ -1103,6 +1175,47 @@ type nullEventRecorder struct {

func (n *nullEventRecorder) Event(_ runtime.Object, _, _, _ string) {}

// GetGlobalVaultCacheKeys returns the current set of vault.ClientCacheKey(s) that are in
// use.
func GetGlobalVaultCacheKeys(ctx context.Context, client ctrlclient.Client) (map[ClientCacheKey]int, error) {
currentClientCacheKeys := map[ClientCacheKey]int{}
addCurrentClientCacheKeys := func(meta secretsv1beta1.VaultClientMeta) {
if meta.CacheKey != "" {
key := ClientCacheKey(meta.CacheKey)
currentClientCacheKeys[key] = currentClientCacheKeys[key] + 1
}
}

var vssList secretsv1beta1.VaultStaticSecretList
err := client.List(ctx, &vssList)
if err != nil {
return nil, err
}

for _, o := range vssList.Items {
addCurrentClientCacheKeys(o.Status.VaultClientMeta)
}
var vpsList secretsv1beta1.VaultPKISecretList
err = client.List(ctx, &vpsList)
if err != nil {
return nil, err
}
for _, o := range vpsList.Items {
addCurrentClientCacheKeys(o.Status.VaultClientMeta)
}

var vdsList secretsv1beta1.VaultDynamicSecretList
err = client.List(ctx, &vdsList)
if err != nil {
return nil, err
}
for _, o := range vdsList.Items {
addCurrentClientCacheKeys(o.Status.VaultClientMeta)
}

return currentClientCacheKeys, nil
}

// getVaultClientMeta returns the VaultClientMeta for the provided Object. It
// supports these types: VaultStaticSecret, VaultPKISecret, VaultDynamicSecret.
//
Expand Down
216 changes: 216 additions & 0 deletions internal/vault/client_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@ package vault

import (
"context"
"errors"
"fmt"
"sync"
"testing"
"time"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

secretsv1beta1 "github.com/hashicorp/vault-secrets-operator/api/v1beta1"
Expand Down Expand Up @@ -308,6 +313,217 @@ func Test_cachingClientFactory_callClientCallbacks(t *testing.T) {
}
}

func Test_cachingClientFactory_pruneOrphanClients(t *testing.T) {
t.Parallel()

ctx := context.Background()

type keyTest struct {
key ClientCacheKey
creationTimeOffset time.Duration
}

newCache := func(size int, keyTests ...keyTest) *clientCache {
lruCache, err := lru.NewWithEvict[ClientCacheKey, Client](size, nil)
require.NoError(t, err)
c := &clientCache{
cache: lruCache,
}
for _, k := range keyTests {
c.cache.Add(k.key, &stubClient{
cacheKey: k.key,
clientStat: &clientStat{
creationTimestamp: time.Now().Add(k.creationTimeOffset),
},
})
}
return c
}

clientBuilder := newClientBuilder()
schemeLessBuilder := fake.NewClientBuilder()
tests := []struct {
name string
cache ClientCache
c ctrlclient.Client
want int
createFunc func(t *testing.T, c ctrlclient.Client) error
wantClientCacheKeys []ClientCacheKey
wantErr assert.ErrorAssertionFunc
}{
{
name: "empty-cache",
c: clientBuilder.Build(),
cache: newCache(1),
wantErr: assert.NoError,
},
{
name: "no-referring-objects-purge",
c: clientBuilder.Build(),
cache: newCache(1,
keyTest{
key: "kubernetes-123456",
creationTimeOffset: -defaultPruneOrphanAge,
},
),
want: 1,
wantErr: assert.NoError,
},
{
name: "prune-some",
c: clientBuilder.Build(),
createFunc: func(t *testing.T, c ctrlclient.Client) error {
t.Helper()
var errs error
for _, o := range []ctrlclient.Object{
&secretsv1beta1.VaultStaticSecret{
ObjectMeta: metav1.ObjectMeta{
Name: "vss-1",
Namespace: "default",
},
Status: secretsv1beta1.VaultStaticSecretStatus{
VaultClientMeta: secretsv1beta1.VaultClientMeta{
CacheKey: "kubernetes-123456",
},
},
},
} {
errs = errors.Join(errs, c.Create(ctx, o))
}
return errs
},
cache: newCache(2,
keyTest{
key: "kubernetes-123455",
creationTimeOffset: -defaultPruneOrphanAge,
},
keyTest{
key: "kubernetes-123456",
creationTimeOffset: -defaultPruneOrphanAge,
},
),
wantClientCacheKeys: []ClientCacheKey{
ClientCacheKey("kubernetes-123456"),
},
want: 1,
wantErr: assert.NoError,
},
{
name: "none",
c: clientBuilder.Build(),
cache: newCache(1,
keyTest{
key: "kubernetes-123456",
creationTimeOffset: -defaultPruneOrphanAge,
},
),
createFunc: func(t *testing.T, c ctrlclient.Client) error {
t.Helper()
var errs error
for _, o := range []ctrlclient.Object{
&secretsv1beta1.VaultStaticSecret{
ObjectMeta: metav1.ObjectMeta{
Name: "vss-1",
Namespace: "default",
},
Status: secretsv1beta1.VaultStaticSecretStatus{
VaultClientMeta: secretsv1beta1.VaultClientMeta{
CacheKey: "kubernetes-123456",
},
},
},
&secretsv1beta1.VaultPKISecret{
ObjectMeta: metav1.ObjectMeta{
Name: "vps-1",
Namespace: "default",
},
Status: secretsv1beta1.VaultPKISecretStatus{
VaultClientMeta: secretsv1beta1.VaultClientMeta{
CacheKey: "kubernetes-123456",
},
},
},
&secretsv1beta1.VaultDynamicSecret{
ObjectMeta: metav1.ObjectMeta{
Name: "vds-1",
Namespace: "default",
},
Status: secretsv1beta1.VaultDynamicSecretStatus{
VaultClientMeta: secretsv1beta1.VaultClientMeta{
CacheKey: "kubernetes-123456",
},
},
},
} {
errs = errors.Join(errs, c.Create(ctx, o))
}
return errs
},
wantErr: assert.NoError,
wantClientCacheKeys: []ClientCacheKey{
ClientCacheKey("kubernetes-123456"),
},
want: 0,
},
{
name: "no-prune-recent",
c: clientBuilder.Build(),
cache: newCache(2,
keyTest{
key: "kubernetes-123455",
creationTimeOffset: -(defaultPruneOrphanAge - time.Second*1),
},
keyTest{
key: "kubernetes-123456",
creationTimeOffset: -(defaultPruneOrphanAge - time.Second*1),
},
),
wantClientCacheKeys: []ClientCacheKey{
ClientCacheKey("kubernetes-123455"),
ClientCacheKey("kubernetes-123456"),
},
want: 0,
wantErr: assert.NoError,
},
{
name: "vss-scheme-not-set",
c: schemeLessBuilder.Build(),
cache: newCache(1,
keyTest{
key: "kubernetes-123456",
creationTimeOffset: -defaultPruneOrphanAge,
},
),
want: 0,
wantClientCacheKeys: []ClientCacheKey{
ClientCacheKey("kubernetes-123456"),
},
wantErr: func(t assert.TestingT, err error, msgAndArgs ...interface{}) bool {
return assert.ErrorContains(t, err, "no kind is registered for the type v1beta1.VaultStaticSecret")
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &cachingClientFactory{
cache: tt.cache,
ctrlClient: tt.c,
}

if tt.createFunc != nil {
require.NoError(t, tt.createFunc(t, tt.c))
}

got, err := m.pruneOrphanClients(ctx)
if !tt.wantErr(t, err, fmt.Sprintf("pruneOrphanClients(%v)", ctx)) {
return
}
assert.Equalf(t, tt.want, got, "pruneOrphanClients(%v)", ctx)
assert.ElementsMatchf(t, tt.wantClientCacheKeys, m.cache.Keys(), "pruneOrphanClients(%v)", ctx)
})
}
}

// newClientBuilder returns a new fake.ClientBuilder with the necessary schemes.
// copied from helpers
func newClientBuilder() *fake.ClientBuilder {
Expand Down

0 comments on commit 1817961

Please sign in to comment.