Skip to content

Commit

Permalink
feat: implement resource filtering in app source and app filtering in…
Browse files Browse the repository at this point in the history
… appset (argoproj#58)

Signed-off-by: Alexander Matyushentsev <[email protected]>
  • Loading branch information
alexmt committed Dec 6, 2024
1 parent fc3f9f1 commit bbb7a7c
Show file tree
Hide file tree
Showing 15 changed files with 4,050 additions and 1,041 deletions.
10 changes: 10 additions & 0 deletions applicationset/controllers/applicationset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type ApplicationSetReconciler struct {
GlobalPreservedAnnotations []string
GlobalPreservedLabels []string
Metrics *metrics.ApplicationsetMetrics
Matcher *utils.AppsMatcher
}

// +kubebuilder:rbac:groups=argoproj.io,resources=applicationsets,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -141,6 +142,15 @@ func (r *ApplicationSetReconciler) Reconcile(ctx context.Context, req ctrl.Reque
_ = utils.CheckInvalidGenerators(&applicationSetInfo)
// desiredApplications is the main list of all expected Applications from all generators in this appset.
desiredApplications, applicationSetReason, err := template.GenerateApplications(logCtx, applicationSetInfo, r.Generators, r.Renderer, r.Client)
if err == nil {
if applicationSetInfo.Spec.Filter != nil {
if filtered, filterErr := r.Matcher.FilterApps(ctx, *applicationSetInfo.Spec.Filter, desiredApplications); filterErr == nil {
desiredApplications = filtered
} else {
err = filterErr
}
}
}
if err != nil {
_ = r.setApplicationSetStatusCondition(ctx,
&applicationSetInfo,
Expand Down
32 changes: 32 additions & 0 deletions applicationset/services/mocks/Repos.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions applicationset/services/repo_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type Repos interface {

// GetDirectories returns a list of directories (not files) within the target repo
GetDirectories(ctx context.Context, repoURL string, revision string, noRevisionCache, verifyCommit bool) ([]string, error)

// GenerateManifest generates manifests from a git repository
GenerateManifest(ctx context.Context, q *apiclient.ManifestRequest) (*apiclient.ManifestResponse, error)
}

func NewArgoCDService(getRepository func(ctx context.Context, url, project string) (*v1alpha1.Repository, error), submoduleEnabled bool, repoClientset apiclient.Clientset, newFileGlobbingEnabled bool) (Repos, error) {
Expand Down Expand Up @@ -89,3 +92,13 @@ func (a *argoCDService) GetDirectories(ctx context.Context, repoURL string, revi
}
return dirResponse.GetPaths(), nil
}

func (a *argoCDService) GenerateManifest(ctx context.Context, q *apiclient.ManifestRequest) (*apiclient.ManifestResponse, error) {
closer, client, err := a.repoServerClientSet.NewRepoServerClient()
if err != nil {
return nil, fmt.Errorf("error initialising new repo server client: %w", err)
}
defer io.Close(closer)

return client.GenerateManifest(ctx, q)
}
192 changes: 192 additions & 0 deletions applicationset/utils/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package utils

import (
"context"
gosync "sync"
"time"

"github.com/argoproj/pkg/sync"
"github.com/golang-jwt/jwt/v4"
"google.golang.org/grpc"
"k8s.io/client-go/tools/cache"

"github.com/argoproj/argo-cd/v2/applicationset/services"
"github.com/argoproj/argo-cd/v2/common"
appapi "github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/v2/reposerver/apiclient"
servercache "github.com/argoproj/argo-cd/v2/server/cache"
cacheutil "github.com/argoproj/argo-cd/v2/util/cache"
"github.com/argoproj/argo-cd/v2/util/cache/appstate"
"github.com/argoproj/argo-cd/v2/util/db"
"github.com/argoproj/argo-cd/v2/util/io"
"github.com/argoproj/argo-cd/v2/util/kube"
"github.com/argoproj/argo-cd/v2/util/rbac"
"github.com/argoproj/argo-cd/v2/util/settings"

appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned"

"github.com/expr-lang/expr"
"github.com/expr-lang/expr/vm"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/yaml"

alpha1 "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
"github.com/argoproj/argo-cd/v2/server/application"
)

type repoClientSetAdapter struct {
apiclient.RepoServerServiceClient
repoService services.Repos
}

func (r *repoClientSetAdapter) GenerateManifest(ctx context.Context, q *apiclient.ManifestRequest, _ ...grpc.CallOption) (*apiclient.ManifestResponse, error) {
return r.repoService.GenerateManifest(ctx, q)
}

func (r *repoClientSetAdapter) NewRepoServerClient() (io.Closer, apiclient.RepoServerServiceClient, error) {
return io.NopCloser, r, nil
}

func NewAppsMatcher(repoService services.Repos, kubeClientSet kubernetes.Interface, db db.ArgoDB, namespace string, settingsManager *settings.SettingsManager, projInformer cache.SharedIndexInformer) *AppsMatcher {
return &AppsMatcher{
repoService: repoService,
kubeClientSet: kubeClientSet,
db: db,
namespace: namespace,
settingsManager: settingsManager,
projInformer: projInformer,
}
}

type AppsMatcher struct {
repoService services.Repos
kubeClientSet kubernetes.Interface
appClientSet appclientset.Interface
db db.ArgoDB
namespace string
settingsManager *settings.SettingsManager
projInformer cache.SharedIndexInformer
}

func exprFunc(fn func() (interface{}, error)) func() interface{} {
once := gosync.Once{}
return func() interface{} {
var res interface{}
var err error
once.Do(func() {
res, err = fn()
})
if err != nil {
panic(err)
}
return res
}
}

type informersAdapter struct {
cache.SharedIndexInformer
alpha1.ApplicationLister
app *v1alpha1.Application
}

func (i *informersAdapter) Get(_ string) (*v1alpha1.Application, error) {
return i.app, nil
}

func (i *informersAdapter) Applications(_ string) alpha1.ApplicationNamespaceLister {
return i
}

func (i *informersAdapter) AddEventHandler(_ cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
return nil, nil
}

func (f *AppsMatcher) getContext(ctx context.Context, app *v1alpha1.Application) interface{} {
enforcer := rbac.NewEnforcer(f.kubeClientSet, f.namespace, common.ArgoCDRBACConfigMapName, func(claims jwt.Claims, rvals ...interface{}) bool {
return true
})
enforcer.EnableEnforce(false)

d := time.Minute
adapter := &informersAdapter{app: app}
srv, _ := application.NewServer(f.namespace,
f.kubeClientSet,
f.appClientSet,
adapter,
adapter,
nil,
&repoClientSetAdapter{repoService: f.repoService},
servercache.NewCache(appstate.NewCache(cacheutil.NewCache(cacheutil.NewInMemoryCache(d)), d), d, d, d),
kube.NewKubectl(),
f.db,
enforcer,
sync.NewKeyLock(),
f.settingsManager,
f.projInformer,
[]string{app.Namespace}, nil)
getManagedResources := func() (interface{}, error) {
resp, err := srv.GetManifests(ctx, &appapi.ApplicationManifestQuery{Name: &app.Name, AppNamespace: &app.Namespace})
if err != nil {
return nil, err
}
var objs []interface{}
for _, item := range resp.Manifests {
obj := unstructured.Unstructured{}
if err := yaml.Unmarshal([]byte(item), &obj); err != nil {
return nil, err
}
objs = append(objs, obj.Object)
}
return objs, nil
}
return map[string]interface{}{
"app": app,
"appInfo": map[string]interface{}{
"GetManagedResources": exprFunc(getManagedResources),
},
}
}

// FilterApps filters applications based on the filter
func (f *AppsMatcher) FilterApps(ctx context.Context, filter v1alpha1.ApplicationSetFilter, apps []v1alpha1.Application) ([]v1alpha1.Application, error) {
var res []v1alpha1.Application
for _, app := range apps {
match, err := f.Check(ctx, filter, app)
if err != nil {
return nil, err
}
if match {
res = append(res, app)
}
}
return res, nil
}

// Check evaluates the filter against the application
func (f *AppsMatcher) Check(ctx context.Context, filter v1alpha1.ApplicationSetFilter, app v1alpha1.Application) (bool, error) {
var programs []*vm.Program
for _, item := range filter.Expressions {
program, err := expr.Compile(item)
if err != nil {
return false, err
}
programs = append(programs, program)
}

programCtx := f.getContext(ctx, &app)
for _, program := range programs {
out, err := expr.Run(program, programCtx)
if err != nil {
return false, err
}
switch condResult := out.(type) {
case bool:
return condResult, nil
default:
return false, nil
}
}
return false, nil
}
53 changes: 53 additions & 0 deletions assets/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit bbb7a7c

Please sign in to comment.