Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix race condition in cloud endpoint controller #575

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 60 additions & 9 deletions internal/controller/ngrok/cloudendpoint_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand Down Expand Up @@ -63,6 +64,9 @@
Log logr.Logger
Recorder record.EventRecorder
NgrokClientset ngrokapi.Clientset

// Holds a map of cloud endpoints and their IDs to avoid race conditions from relying on the cloud endpoint ID from the k8s resource status
existingCloudEndpoints map[types.NamespacedName]string
}

// Define a custom error types to catch and handle requeuing logic for
Expand All @@ -77,15 +81,26 @@
return errors.New("NgrokClientset is required")
}

r.existingCloudEndpoints = make(map[types.NamespacedName]string)

Check warning on line 84 in internal/controller/ngrok/cloudendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/ngrok/cloudendpoint_controller.go#L84

Added line #L84 was not covered by tests
r.controller = &controller.BaseController[*ngrokv1alpha1.CloudEndpoint]{
Kube: r.Client,
Log: r.Log,
Recorder: r.Recorder,

StatusID: func(clep *ngrokv1alpha1.CloudEndpoint) string { return clep.Status.ID },
Create: r.create,
Update: r.update,
Delete: r.delete,
StatusID: func(clep *ngrokv1alpha1.CloudEndpoint) string {
// We keep an in-memory map of the existing cloud endpoints to tell if we should create/update the cloud endpoint definition on the ngrok server side
// Relying on the resource's status.ID being set is error-prone because it can result in race conditions where the resource is updated
// after it was created but before its status.ID is initialized
if _, exists := r.existingCloudEndpoints[types.NamespacedName{
Name: clep.Name,
Namespace: clep.Namespace,
}]; exists {
return fmt.Sprintf("%s.%s", clep.Name, clep.Namespace) // Returning a non-empty string lets the controller know to call r.update because this resource exists already
}
return "" // Returning an empty string lets the controller know to call r.create because this is a new resource

Check warning on line 99 in internal/controller/ngrok/cloudendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/ngrok/cloudendpoint_controller.go#L89-L99

Added lines #L89 - L99 were not covered by tests
},
Create: r.create,
Update: r.update,
Delete: r.delete,
ErrResult: func(op controller.BaseControllerOp, cr *ngrokv1alpha1.CloudEndpoint, err error) (ctrl.Result, error) {
if errors.Is(err, ErrDomainCreating) {
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
Expand Down Expand Up @@ -166,6 +181,10 @@
if err != nil {
return err
}
r.existingCloudEndpoints[types.NamespacedName{
Name: clep.Name,
Namespace: clep.Namespace,
}] = ngrokClep.ID

Check warning on line 187 in internal/controller/ngrok/cloudendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/ngrok/cloudendpoint_controller.go#L184-L187

Added lines #L184 - L187 were not covered by tests

return r.updateStatus(ctx, clep, ngrokClep, domain)
}
Expand All @@ -183,8 +202,18 @@
return err
}

// If the endpoint exists in our in-memory map, grab the assigend ID rather than waiting on the ID to be assigned to the reconciled endpoint's status
clepKey := types.NamespacedName{
Name: clep.Name,
Namespace: clep.Namespace,
}
existingID, exists := r.existingCloudEndpoints[clepKey]
if exists {
clep.Status.ID = existingID
}

Check warning on line 213 in internal/controller/ngrok/cloudendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/ngrok/cloudendpoint_controller.go#L206-L213

Added lines #L206 - L213 were not covered by tests

updateParams := &ngrok.EndpointUpdate{
ID: clep.Status.ID,
ID: existingID,

Check warning on line 216 in internal/controller/ngrok/cloudendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/ngrok/cloudendpoint_controller.go#L216

Added line #L216 was not covered by tests
Url: &clep.Spec.URL,
Description: &clep.Spec.Description,
Metadata: &clep.Spec.Metadata,
Expand All @@ -195,24 +224,46 @@
ngrokClep, err := r.NgrokClientset.Endpoints().Update(ctx, updateParams)
if ngrok.IsNotFound(err) {
// Couldn't find endpoint by ID to update, so blank it out and create a new one
r.Recorder.Event(clep, v1.EventTypeWarning, "EndpointNotFound", fmt.Sprintf("Failed to update endpoint %s by ID because it was not found. Creating a new one", clep.Status.ID))
r.Recorder.Event(clep, v1.EventTypeWarning, "EndpointNotFound", fmt.Sprintf("Failed to update endpoint %q by ID because it was not found. Creating a new one", clep.Status.ID))

Check warning on line 227 in internal/controller/ngrok/cloudendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/ngrok/cloudendpoint_controller.go#L227

Added line #L227 was not covered by tests
clep.Status.ID = ""
delete(r.existingCloudEndpoints, clepKey)

Check warning on line 229 in internal/controller/ngrok/cloudendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/ngrok/cloudendpoint_controller.go#L229

Added line #L229 was not covered by tests
_ = r.Client.Status().Update(ctx, clep)
return r.create(ctx, clep)
}
if err != nil {
return err
}

r.existingCloudEndpoints[clepKey] = ngrokClep.ID

Check warning on line 236 in internal/controller/ngrok/cloudendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/ngrok/cloudendpoint_controller.go#L236

Added line #L236 was not covered by tests
return r.updateStatus(ctx, clep, ngrokClep, domain)
}

// Simply attempt to delete it. The base controller handles not found errors
func (r *CloudEndpointReconciler) delete(ctx context.Context, clep *ngrokv1alpha1.CloudEndpoint) error {
return r.NgrokClientset.Endpoints().Delete(ctx, clep.Status.ID)
// If the endpoint exists in our in-memory map, grab the assigend ID rather than waiting on the ID to be assigned to the reconciled endpoint's status
clepKey := types.NamespacedName{
Name: clep.Name,
Namespace: clep.Namespace,
}
existingID, exists := r.existingCloudEndpoints[clepKey]
if exists {
clep.Status.ID = existingID
}
if err := r.NgrokClientset.Endpoints().Delete(ctx, clep.Status.ID); err != nil {
return err
}
delete(r.existingCloudEndpoints, clepKey)
return nil

Check warning on line 255 in internal/controller/ngrok/cloudendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/ngrok/cloudendpoint_controller.go#L242-L255

Added lines #L242 - L255 were not covered by tests
}

func (r *CloudEndpointReconciler) updateStatus(ctx context.Context, clep *ngrokv1alpha1.CloudEndpoint, ngrokClep *ngrok.Endpoint, domain *ingressv1alpha1.Domain) error {
// Only update if there's a meaningful change to reduce reconciliation loops
if clep.Status.ID == ngrokClep.ID && clep.Status.Domain == &domain.Status {
r.Log.Info("No status difference detected for cloud endpoint, skipping status update",
"cloud endpoint", fmt.Sprintf("%s.%s", clep.Name, clep.Namespace),
)
return nil
}

Check warning on line 265 in internal/controller/ngrok/cloudendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/ngrok/cloudendpoint_controller.go#L259-L265

Added lines #L259 - L265 were not covered by tests

clep.Status.ID = ngrokClep.ID
if domain != nil {
clep.Status.Domain = &domain.Status
Expand Down
Loading