Skip to content

Commit

Permalink
fix: handle sharded stage events (#1470)
Browse files Browse the repository at this point in the history
Signed-off-by: Soumya Ghosh Dastidar <[email protected]>
  • Loading branch information
gdsoumya authored Feb 10, 2024
1 parent a3eea08 commit 3d49794
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 12 deletions.
16 changes: 16 additions & 0 deletions internal/controller/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"sigs.k8s.io/controller-runtime/pkg/predicate"

kargoapi "github.com/akuity/kargo/api/v1alpha1"
Expand Down Expand Up @@ -39,3 +40,18 @@ func GetShardPredicate(shard string) (predicate.Predicate, error) {
)
return pred, errors.Wrap(err, "error creating shard selector predicate")
}

func GetShardRequirement(shard string) (*labels.Requirement, error) {
req, err := labels.NewRequirement(kargoapi.ShardLabelKey, selection.Equals, []string{shard})
if err != nil {
return nil, errors.Wrap(err, "error creating shard label selector")
}
if shard == "" {
req, err = labels.NewRequirement(kargoapi.ShardLabelKey, selection.DoesNotExist, nil)
if err != nil {
return nil, errors.Wrap(err, "error creating default label selector")
}
}

return req, nil
}
37 changes: 29 additions & 8 deletions internal/controller/stages/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -203,6 +204,8 @@ type reconciler struct {
clearVerificationsFn func(context.Context, *kargoapi.Stage) error

clearApprovalsFn func(context.Context, *kargoapi.Stage) error

shardRequirement *labels.Requirement
}

// SetupReconcilerWithManager initializes a reconciler for Stage resources and
Expand Down Expand Up @@ -273,6 +276,11 @@ func SetupReconcilerWithManager(
return errors.Wrap(err, "error creating shard predicate")
}

shardRequirement, err := controller.GetShardRequirement(cfg.ShardName)
if err != nil {
return errors.Wrap(err, "error creating shard selector")
}
shardSelector := labels.NewSelector().Add(*shardRequirement)
var argocdClient, rolloutsClient client.Client
if argocdMgr != nil {
argocdClient = argocdMgr.GetClient()
Expand Down Expand Up @@ -308,6 +316,7 @@ func SetupReconcilerWithManager(
argocdClient,
rolloutsClient,
cfg,
shardRequirement,
),
)
if err != nil {
Expand Down Expand Up @@ -337,7 +346,8 @@ func SetupReconcilerWithManager(
// Watch Freight that has been marked as verified in a Stage and enqueue
// downstream Stages
verifiedFreightHandler := &verifiedFreightEventHandler{
kargoClient: kargoMgr.GetClient(),
kargoClient: kargoMgr.GetClient(),
shardSelector: shardSelector,
}
if err := c.Watch(
source.Kind(
Expand All @@ -363,7 +373,8 @@ func SetupReconcilerWithManager(
}

createdFreightEventHandler := &createdFreightEventHandler{
kargoClient: kargoMgr.GetClient(),
kargoClient: kargoMgr.GetClient(),
shardSelector: shardSelector,
}
if err := c.Watch(
source.Kind(
Expand All @@ -379,7 +390,8 @@ func SetupReconcilerWithManager(
// care about this watch anyway.
if argocdMgr != nil {
updatedArgoCDAppHandler := &updatedArgoCDAppHandler{
kargoClient: kargoMgr.GetClient(),
kargoClient: kargoMgr.GetClient(),
shardSelector: shardSelector,
}
if err := c.Watch(
source.Kind(
Expand All @@ -396,7 +408,8 @@ func SetupReconcilerWithManager(
// won't care about this watch anyway.
if rolloutsMgr != nil {
phaseChangedAnalysisRunHandler := &phaseChangedAnalysisRunHandler{
kargoClient: kargoMgr.GetClient(),
kargoClient: kargoMgr.GetClient(),
shardSelector: shardSelector,
}
if err := c.Watch(
source.Kind(
Expand All @@ -417,12 +430,14 @@ func newReconciler(
argocdClient client.Client,
rolloutsClient client.Client,
cfg ReconcilerConfig,
shardRequirement *labels.Requirement,
) *reconciler {
r := &reconciler{
kargoClient: kargoClient,
argocdClient: argocdClient,
rolloutsClient: rolloutsClient,
cfg: cfg,
kargoClient: kargoClient,
argocdClient: argocdClient,
rolloutsClient: rolloutsClient,
cfg: cfg,
shardRequirement: shardRequirement,
}
// The following default behaviors are overridable for testing purposes:
// Loop guard:
Expand Down Expand Up @@ -494,6 +509,12 @@ func (r *reconciler) Reconcile(
result.RequeueAfter = 0 // Do not requeue
return result, nil
}

if ok := r.shardRequirement.Matches(labels.Set(stage.Labels)); !ok {
// Ignore if stage does not belong to given shard
result.RequeueAfter = 0
return result, nil
}
logger.Debug("found Stage")

var newStatus kargoapi.StageStatus
Expand Down
4 changes: 4 additions & 0 deletions internal/controller/stages/stages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/fake"

kargoapi "github.com/akuity/kargo/api/v1alpha1"
"github.com/akuity/kargo/internal/controller"
rollouts "github.com/akuity/kargo/internal/controller/rollouts/api/v1alpha1"
)

Expand All @@ -22,11 +23,14 @@ func TestNewReconciler(t *testing.T) {
RolloutsControllerInstanceID: "fake-instance-id",
}
kubeClient := fake.NewClientBuilder().Build()
requirement, err := controller.GetShardRequirement(testCfg.ShardName)
require.NoError(t, err)
r := newReconciler(
kubeClient,
kubeClient,
kubeClient,
testCfg,
requirement,
)
require.Equal(t, testCfg, r.cfg)
require.NotNil(t, r.kargoClient)
Expand Down
17 changes: 13 additions & 4 deletions internal/controller/stages/watches.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
Expand All @@ -23,7 +24,8 @@ import (
// Stages when Freight is marked as verified in a Stage, so that those Stages
// can reconcile and possibly create a Promotion if auto-promotion is enabled.
type verifiedFreightEventHandler struct {
kargoClient client.Client
kargoClient client.Client
shardSelector labels.Selector
}

// Create implements EventHandler.
Expand Down Expand Up @@ -87,6 +89,7 @@ func (v *verifiedFreightEventHandler) Update(
kubeclient.StagesByUpstreamStagesIndexField,
newlyVerifiedStage,
),
LabelSelector: v.shardSelector,
},
); err != nil {
logger.Errorf(
Expand Down Expand Up @@ -213,7 +216,8 @@ func getNewlyApprovedStages(old, new *kargoapi.Freight) []string {
// those Stages can reconcile and possibly create a Promotion if auto-promotion
// is enabled.
type createdFreightEventHandler struct {
kargoClient client.Client
kargoClient client.Client
shardSelector labels.Selector
}

// Create implements EventHandler.
Expand Down Expand Up @@ -244,6 +248,7 @@ func (c *createdFreightEventHandler) Create(
kubeclient.StagesByWarehouseIndexField,
warehouse,
),
LabelSelector: c.shardSelector,
},
); err != nil {
logger.Errorf(
Expand Down Expand Up @@ -300,7 +305,8 @@ func (c *createdFreightEventHandler) Update(
// with an Argo CD Application whenever that Application's health or sync status
// changes, so that those Stages can reconcile.
type updatedArgoCDAppHandler struct {
kargoClient client.Client
kargoClient client.Client
shardSelector labels.Selector
}

// Create implements EventHandler.
Expand Down Expand Up @@ -351,6 +357,7 @@ func (u *updatedArgoCDAppHandler) Update(
e.ObjectNew.GetName(),
),
),
LabelSelector: u.shardSelector,
},
); err != nil {
logger.Errorf(
Expand Down Expand Up @@ -410,7 +417,8 @@ func appHealthOrSyncStatusChanged(ctx context.Context, e event.UpdateEvent) bool
// associated with an Argo Rollouts AnalysisRun whenever that AnalysisRun's
// phase changes.
type phaseChangedAnalysisRunHandler struct {
kargoClient client.Client
kargoClient client.Client
shardSelector labels.Selector
}

// Create implements EventHandler.
Expand Down Expand Up @@ -462,6 +470,7 @@ func (p *phaseChangedAnalysisRunHandler) Update(
e.ObjectNew.GetName(),
),
),
LabelSelector: p.shardSelector,
},
); err != nil {
logger.Errorf(
Expand Down

0 comments on commit 3d49794

Please sign in to comment.