Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry with backoff to all client operations #186

Merged
merged 3 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile.pg-dev
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
ARG POSTGRES_VERSION=14
ARG POSTGRES_VERSION=16
FROM postgres:${POSTGRES_VERSION}

ARG SEMVER_VERSION=0.31.2
ARG SEMVER_VERSION=0.40.0
ARG POSTGRES_VERSION

WORKDIR /tmp
Expand Down
4 changes: 4 additions & 0 deletions client_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/tls"
"os"
"strconv"
"strings"

"connectrpc.com/connect"
"github.com/bufbuild/httplb"
Expand Down Expand Up @@ -86,6 +87,9 @@ func (conf *clientConfig) getClient() (client perseusapiconnect.PerseusServiceCl
MinVersion: tls.VersionTLS13,
}
opts = append(opts, httplb.WithTLSConfig(&tlsc, 0))
} else if strings.HasPrefix(conf.serverAddr, "http:") {
// switch to H2C if TLS is disabled since we're using gRPC over Connect
conf.serverAddr = "h2c" + conf.serverAddr[4:]
}

// we include WithGRPC() so that the CLI can hit an existing gRPC-based server instance
Expand Down
7 changes: 3 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
version: '3.8'
services:
db:
build:
build:
context: .
dockerfile: Dockerfile.pg-dev
args:
SEMVER_VERSION: 0.31.2
SEMVER_VERSION: 0.40.0
restart: always
environment:
- POSTGRES_DB=perseus
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
ports:
- '5432:5432'
volumes:
volumes:
- db:/var/lib/postgresql/data
volumes:
db:
Expand Down
25 changes: 25 additions & 0 deletions internal/server/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,31 @@ func (s *connectServer) ListModules(ctx context.Context, req *connect.Request[pe
mod := &perseusapi.Module{
Name: m.Name,
}
// include the latest version for each matched module
versionQ := store.ModuleVersionQuery{
ModuleFilter: m.Name,
LatestOnly: true,
IncludePrerelease: false,
}
vers, _, err := s.store.QueryModuleVersions(ctx, versionQ)
if err != nil {
log.Error(err, "unable to query for latest module version", "moduleFilter", m.Name, "latestOnly", true, "includePrerelease", false)
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("Unable to determine latest version for module %s: a database operation failed", m.Name))
}
// if no stable version exists, try to find a pre-release
if len(vers) == 0 {
versionQ.IncludePrerelease = true
vers, _, err = s.store.QueryModuleVersions(ctx, versionQ)
if err != nil {
log.Error(err, "unable to query for latest module version", "moduleFilter", m.Name, "latestOnly", true, "includePrerelease", true)
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("Unable to determine latest version for module %s: a database operation failed", m.Name))
}
}
// assign the latest version of the module, if found
if len(vers) > 0 {
mod.Versions = []string{"v" + vers[0].Version}
}

resp.Modules = append(resp.Modules, mod)
}
return connect.NewResponse(resp), nil
Expand Down
56 changes: 20 additions & 36 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,9 @@ func lookupLatestModuleVersion(ctx context.Context, c perseusapiconnect.PerseusS
ModuleName: modulePath,
VersionOption: perseusapi.ModuleVersionOption_latest,
})
resp, err := c.ListModuleVersions(ctx, req)
resp, err := retryOp(func() (*connect.Response[perseusapi.ListModuleVersionsResponse], error) {
return c.ListModuleVersions(ctx, req)
})
if err != nil {
return "", err
}
Expand Down Expand Up @@ -391,7 +393,9 @@ func walkDependencies(ctx context.Context, client perseusapiconnect.PerseusServi
Direction: direction,
})
for done := false; !done; done = (req.Msg.PageToken != "") {
resp, err := client.QueryDependencies(ctx, req)
resp, err := retryOp(func() (*connect.Response[perseusapi.QueryDependenciesResponse], error) {
return client.QueryDependencies(ctx, req)
})
if err != nil {
return dependencyTreeNode{}, err
}
Expand Down Expand Up @@ -424,43 +428,21 @@ func listModules(ctx context.Context, ps perseusapiconnect.PerseusServiceClient,
})
for done := false; !done; {
status("retrieving modules")
resp, err := ps.ListModules(ctx, req)
resp, err := retryOp(func() (*connect.Response[perseusapi.ListModulesResponse], error) {
return ps.ListModules(ctx, req)
})
if err != nil {
return nil, fmt.Errorf("todo: %w", err)
return nil, fmt.Errorf("Unable to list modules matching the provided filter: %w", err)
}
for _, mod := range resp.Msg.Modules {
status(fmt.Sprintf("determining latest version for %s", mod.GetName()))
req2 := connect.NewRequest(&perseusapi.ListModuleVersionsRequest{
ModuleName: mod.GetName(),
VersionOption: perseusapi.ModuleVersionOption_latest,
})
resp2, err := ps.ListModuleVersions(ctx, req2)
switch {
case err != nil:
return nil, fmt.Errorf("Unable to determine the current version for %s: %w", mod.GetName(), err)

case len(resp2.Msg.Modules) == 0 || len(resp2.Msg.Modules[0].Versions) == 0:
status(fmt.Sprintf("no stable version found for %s, trying to find a pre-release", mod.GetName()))
req2.Msg.IncludePrerelease = true
resp2, err = ps.ListModuleVersions(ctx, req2)
switch {
case err != nil:
return nil, fmt.Errorf("Unable to determine the current version for %s: %w", mod.GetName(), err)

case len(resp2.Msg.Modules) == 0 || len(resp2.Msg.Modules[0].Versions) == 0:
return nil, fmt.Errorf("No versions found for %s", mod.GetName())

default:
// got it
}
default:
// got it
}

results = append(results, dependencyItem{
item := dependencyItem{
Path: mod.GetName(),
Version: resp2.Msg.Modules[0].Versions[0],
})
Version: "-", // show a dash if we don't have a version
}
if vers := mod.GetVersions(); len(vers) > 0 {
item.Version = vers[0]
}
results = append(results, item)
}
req.Msg.PageToken = resp.Msg.GetNextPageToken()
done = (req.Msg.PageToken != "")
Expand Down Expand Up @@ -493,7 +475,9 @@ func listModuleVersions(ctx context.Context, ps perseusapiconnect.PerseusService
apiRequest.Msg.VersionOption = perseusapi.ModuleVersionOption_latest
}
req.updateStatus(fmt.Sprintf("retreiving versions for modules matching %q", req.modulePattern))
resp, err := ps.ListModuleVersions(ctx, apiRequest)
resp, err := retryOp(func() (*connect.Response[perseusapi.ListModuleVersionsResponse], error) {
return ps.ListModuleVersions(ctx, apiRequest)
})
if err != nil {
return nil, fmt.Errorf("todo: %w", err)
}
Expand Down
48 changes: 48 additions & 0 deletions retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package main

import (
"crypto/rand"
"math/big"
"time"

"connectrpc.com/connect"
)

var (
// subsequent retry delays for retryOp()
// - use the first 5 Fibonacci numbers for semi-exponential growth
// - the extra 0 value is a sentinel so we don't do another wait after we've exhausted all 5 retries
backoffDelays = []time.Duration{
100 * time.Millisecond,
200 * time.Millisecond,
300 * time.Millisecond,
500 * time.Millisecond,
800 * time.Millisecond,
0,
}
)

// retryOp performs the specified operation, retrying up to 5 times if the request returns a 502-Unavailable
// status to provide resiliency for transient failures due to LB flakiness (especially within K8S).
func retryOp[T any](op func() (T, error)) (result T, err error) {
var zero T
for _, wait := range backoffDelays {
result, err = op()
switch {
case err == nil:
return result, nil
case connect.CodeOf(err) == connect.CodeUnavailable:
if wait > 0 {
// inject up to 20% jitter
maxJitter := big.NewInt(int64(float64(int64(wait)) * 0.2))
jitter, _ := rand.Int(rand.Reader, maxJitter)
wait += time.Duration(jitter.Int64())
<-time.After(wait)
}
default:
return zero, err
}
}
// if we get here, err is non-nil and from a 502 but we have exhausted all retries
return zero, err
}
18 changes: 12 additions & 6 deletions update.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"os"
"path"
"time"

"connectrpc.com/connect"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -192,7 +193,10 @@ func getModuleInfoFromProxy(modulePath string) (moduleInfo, error) {
// applyUpdates calls the Perseus server to update the dependencies of the specified module
func applyUpdates(conf clientConfig, mod module.Version, deps []module.Version) (err error) {
// create the client and call the server
ctx := context.Background()
// . be sure we don't hang "forever". 5s is a bit over 2X the cumulative retry delays (1900 ms)
// so this shouldn't generate any pre-mature aborts
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
client := conf.getClient()
req := connect.NewRequest(&perseusapi.UpdateDependenciesRequest{
ModuleName: mod.Path,
Expand All @@ -205,15 +209,17 @@ func applyUpdates(conf clientConfig, mod module.Version, deps []module.Version)
Versions: []string{d.Version},
}
}
if _, err = client.UpdateDependencies(ctx, req); err != nil {
return err
}
return nil

_, err = retryOp(func() (struct{}, error) {
_, err := client.UpdateDependencies(ctx, req)
return struct{}{}, err
})
return err
}

// moduleInfo represents the relevant Go module metadata for this application.
//
// This struct does not contain a version because the Go module library (golang.org/x/mod/modfile)
// Values may not contain a version because the Go module library (golang.org/x/mod/modfile)
// does not return a version for the "main" module even if it is a library package.
type moduleInfo struct {
// the module name, ex: github.com/CrowdStrike/perseus
Expand Down