Skip to content

Commit

Permalink
feat: add backoff/retry to all client operations
Browse files Browse the repository at this point in the history
injected retry with backoff and jitter for all client operations
extended ListModules server handler to include the latest version in the result
  • Loading branch information
Dylan Bourque committed Jul 29, 2024
1 parent 6bdd47d commit eb09bfe
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 42 deletions.
25 changes: 25 additions & 0 deletions internal/server/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,31 @@ func (s *connectServer) ListModules(ctx context.Context, req *connect.Request[pe
mod := &perseusapi.Module{
Name: m.Name,
}
// include the latest version for each matched module
versionQ := store.ModuleVersionQuery{
ModuleFilter: m.Name,
LatestOnly: true,
IncludePrerelease: false,
}
vers, _, err := s.store.QueryModuleVersions(ctx, versionQ)
if err != nil {
log.Error(err, "unable to query for latest module version", "moduleFilter", m.Name, "latestOnly", true, "includePrerelease", false)
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("Unable to determine latest version for module %s: a database operation failed", m.Name))
}
// if no stable version exists, try to find a pre-release
if len(vers) == 0 {
versionQ.IncludePrerelease = true
vers, _, err = s.store.QueryModuleVersions(ctx, versionQ)
if err != nil {
log.Error(err, "unable to query for latest module version", "moduleFilter", m.Name, "latestOnly", true, "includePrerelease", true)
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("Unable to determine latest version for module %s: a database operation failed", m.Name))
}
}
// assign the latest version of the module, if found
if len(vers) > 0 {
mod.Versions = []string{"v" + vers[0].Version}
}

resp.Modules = append(resp.Modules, mod)
}
return connect.NewResponse(resp), nil
Expand Down
56 changes: 20 additions & 36 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,9 @@ func lookupLatestModuleVersion(ctx context.Context, c perseusapiconnect.PerseusS
ModuleName: modulePath,
VersionOption: perseusapi.ModuleVersionOption_latest,
})
resp, err := c.ListModuleVersions(ctx, req)
resp, err := retryOp(func() (*connect.Response[perseusapi.ListModuleVersionsResponse], error) {
return c.ListModuleVersions(ctx, req)
})
if err != nil {
return "", err
}
Expand Down Expand Up @@ -391,7 +393,9 @@ func walkDependencies(ctx context.Context, client perseusapiconnect.PerseusServi
Direction: direction,
})
for done := false; !done; done = (req.Msg.PageToken != "") {
resp, err := client.QueryDependencies(ctx, req)
resp, err := retryOp(func() (*connect.Response[perseusapi.QueryDependenciesResponse], error) {
return client.QueryDependencies(ctx, req)
})
if err != nil {
return dependencyTreeNode{}, err
}
Expand Down Expand Up @@ -424,43 +428,21 @@ func listModules(ctx context.Context, ps perseusapiconnect.PerseusServiceClient,
})
for done := false; !done; {
status("retrieving modules")
resp, err := ps.ListModules(ctx, req)
resp, err := retryOp(func() (*connect.Response[perseusapi.ListModulesResponse], error) {
return ps.ListModules(ctx, req)
})
if err != nil {
return nil, fmt.Errorf("todo: %w", err)
return nil, fmt.Errorf("Unable to list modules matching the provided filter: %w", err)
}
for _, mod := range resp.Msg.Modules {
status(fmt.Sprintf("determining latest version for %s", mod.GetName()))
req2 := connect.NewRequest(&perseusapi.ListModuleVersionsRequest{
ModuleName: mod.GetName(),
VersionOption: perseusapi.ModuleVersionOption_latest,
})
resp2, err := ps.ListModuleVersions(ctx, req2)
switch {
case err != nil:
return nil, fmt.Errorf("Unable to determine the current version for %s: %w", mod.GetName(), err)

case len(resp2.Msg.Modules) == 0 || len(resp2.Msg.Modules[0].Versions) == 0:
status(fmt.Sprintf("no stable version found for %s, trying to find a pre-release", mod.GetName()))
req2.Msg.IncludePrerelease = true
resp2, err = ps.ListModuleVersions(ctx, req2)
switch {
case err != nil:
return nil, fmt.Errorf("Unable to determine the current version for %s: %w", mod.GetName(), err)

case len(resp2.Msg.Modules) == 0 || len(resp2.Msg.Modules[0].Versions) == 0:
return nil, fmt.Errorf("No versions found for %s", mod.GetName())

default:
// got it
}
default:
// got it
}

results = append(results, dependencyItem{
item := dependencyItem{
Path: mod.GetName(),
Version: resp2.Msg.Modules[0].Versions[0],
})
Version: "-", // show a dash if we don't have a version
}
if vers := mod.GetVersions(); len(vers) > 0 {
item.Version = vers[0]
}
results = append(results, item)
}
req.Msg.PageToken = resp.Msg.GetNextPageToken()
done = (req.Msg.PageToken != "")
Expand Down Expand Up @@ -493,7 +475,9 @@ func listModuleVersions(ctx context.Context, ps perseusapiconnect.PerseusService
apiRequest.Msg.VersionOption = perseusapi.ModuleVersionOption_latest
}
req.updateStatus(fmt.Sprintf("retreiving versions for modules matching %q", req.modulePattern))
resp, err := ps.ListModuleVersions(ctx, apiRequest)
resp, err := retryOp(func() (*connect.Response[perseusapi.ListModuleVersionsResponse], error) {
return ps.ListModuleVersions(ctx, apiRequest)
})
if err != nil {
return nil, fmt.Errorf("todo: %w", err)
}
Expand Down
48 changes: 48 additions & 0 deletions retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package main

import (
"crypto/rand"
"math/big"
"time"

"connectrpc.com/connect"
)

var (
// subsequent retry delays for retryOp()
// - use the first 5 Fibonacci numbers for semi-exponential growth
// - the extra 0 value is a sentinel so we don't do another wait after we've exhausted all 5 retries
backoffDelays = []time.Duration{
100 * time.Millisecond,
200 * time.Millisecond,
300 * time.Millisecond,
500 * time.Millisecond,
800 * time.Millisecond,
0,
}
)

// retryOp performs the specified operation, retrying up to 5 times if the request returns a 502-Unavailable
// status to provide resiliency for transient failures due to LB flakiness (especially within K8S).
func retryOp[T any](op func() (T, error)) (result T, err error) {
var zero T
for _, wait := range backoffDelays {
result, err = op()
switch {
case err == nil:
return result, nil
case connect.CodeOf(err) == connect.CodeUnavailable:
if wait > 0 {
// inject up to 20% jitter
maxJitter := big.NewInt(int64(float64(int64(wait)) * 0.2))
jitter, _ := rand.Int(rand.Reader, maxJitter)
wait += time.Duration(jitter.Int64())
<-time.After(wait)
}
default:
return zero, err
}
}
// if we get here, err is non-nil and from a 502 but we have exhausted all retries
return zero, err
}
18 changes: 12 additions & 6 deletions update.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"os"
"path"
"time"

"connectrpc.com/connect"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -192,7 +193,10 @@ func getModuleInfoFromProxy(modulePath string) (moduleInfo, error) {
// applyUpdates calls the Perseus server to update the dependencies of the specified module
func applyUpdates(conf clientConfig, mod module.Version, deps []module.Version) (err error) {
// create the client and call the server
ctx := context.Background()
// . be sure we don't hang "forever". 5s is a bit over 2X the cumulative retry delays (1900 ms)
// so this shouldn't generate any pre-mature aborts
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
client := conf.getClient()
req := connect.NewRequest(&perseusapi.UpdateDependenciesRequest{
ModuleName: mod.Path,
Expand All @@ -205,15 +209,17 @@ func applyUpdates(conf clientConfig, mod module.Version, deps []module.Version)
Versions: []string{d.Version},
}
}
if _, err = client.UpdateDependencies(ctx, req); err != nil {
return err
}
return nil

_, err = retryOp(func() (struct{}, error) {
_, err := client.UpdateDependencies(ctx, req)
return struct{}{}, err
})
return err
}

// moduleInfo represents the relevant Go module metadata for this application.
//
// This struct does not contain a version because the Go module library (golang.org/x/mod/modfile)
// Values may not contain a version because the Go module library (golang.org/x/mod/modfile)
// does not return a version for the "main" module even if it is a library package.
type moduleInfo struct {
// the module name, ex: github.com/CrowdStrike/perseus
Expand Down

0 comments on commit eb09bfe

Please sign in to comment.