diff --git a/Dockerfile.pg-dev b/Dockerfile.pg-dev index 39f7033..2828b3f 100644 --- a/Dockerfile.pg-dev +++ b/Dockerfile.pg-dev @@ -1,7 +1,7 @@ -ARG POSTGRES_VERSION=14 +ARG POSTGRES_VERSION=16 FROM postgres:${POSTGRES_VERSION} -ARG SEMVER_VERSION=0.31.2 +ARG SEMVER_VERSION=0.40.0 ARG POSTGRES_VERSION WORKDIR /tmp diff --git a/client_config.go b/client_config.go index 92400a5..5082800 100644 --- a/client_config.go +++ b/client_config.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "os" "strconv" + "strings" "connectrpc.com/connect" "github.com/bufbuild/httplb" @@ -86,6 +87,9 @@ func (conf *clientConfig) getClient() (client perseusapiconnect.PerseusServiceCl MinVersion: tls.VersionTLS13, } opts = append(opts, httplb.WithTLSConfig(&tlsc, 0)) + } else if strings.HasPrefix(conf.serverAddr, "http:") { + // switch to H2C if TLS is disabled since we're using gRPC over Connect + conf.serverAddr = "h2c" + conf.serverAddr[4:] } // we include WithGRPC() so that the CLI can hit an existing gRPC-based server instance diff --git a/docker-compose.yml b/docker-compose.yml index 8ca47c9..d6e910c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,11 +1,10 @@ -version: '3.8' services: db: - build: + build: context: . dockerfile: Dockerfile.pg-dev args: - SEMVER_VERSION: 0.31.2 + SEMVER_VERSION: 0.40.0 restart: always environment: - POSTGRES_DB=perseus @@ -13,7 +12,7 @@ services: - POSTGRES_PASSWORD=postgres ports: - '5432:5432' - volumes: + volumes: - db:/var/lib/postgresql/data volumes: db: diff --git a/internal/server/connect.go b/internal/server/connect.go index cc2052f..87129e2 100644 --- a/internal/server/connect.go +++ b/internal/server/connect.go @@ -76,6 +76,31 @@ func (s *connectServer) ListModules(ctx context.Context, req *connect.Request[pe mod := &perseusapi.Module{ Name: m.Name, } + // include the latest version for each matched module + versionQ := store.ModuleVersionQuery{ + ModuleFilter: m.Name, + LatestOnly: true, + IncludePrerelease: false, + } + vers, _, err := s.store.QueryModuleVersions(ctx, versionQ) + if err != nil { + log.Error(err, "unable to query for latest module version", "moduleFilter", m.Name, "latestOnly", true, "includePrerelease", false) + return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("Unable to determine latest version for module %s: a database operation failed", m.Name)) + } + // if no stable version exists, try to find a pre-release + if len(vers) == 0 { + versionQ.IncludePrerelease = true + vers, _, err = s.store.QueryModuleVersions(ctx, versionQ) + if err != nil { + log.Error(err, "unable to query for latest module version", "moduleFilter", m.Name, "latestOnly", true, "includePrerelease", true) + return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("Unable to determine latest version for module %s: a database operation failed", m.Name)) + } + } + // assign the latest version of the module, if found + if len(vers) > 0 { + mod.Versions = []string{"v" + vers[0].Version} + } + resp.Modules = append(resp.Modules, mod) } return connect.NewResponse(resp), nil diff --git a/query.go b/query.go index fb854ad..ed112b3 100644 --- a/query.go +++ b/query.go @@ -348,7 +348,9 @@ func lookupLatestModuleVersion(ctx context.Context, c perseusapiconnect.PerseusS ModuleName: modulePath, VersionOption: perseusapi.ModuleVersionOption_latest, }) - resp, err := c.ListModuleVersions(ctx, req) + resp, err := retryOp(func() (*connect.Response[perseusapi.ListModuleVersionsResponse], error) { + return c.ListModuleVersions(ctx, req) + }) if err != nil { return "", err } @@ -391,7 +393,9 @@ func walkDependencies(ctx context.Context, client perseusapiconnect.PerseusServi Direction: direction, }) for done := false; !done; done = (req.Msg.PageToken != "") { - resp, err := client.QueryDependencies(ctx, req) + resp, err := retryOp(func() (*connect.Response[perseusapi.QueryDependenciesResponse], error) { + return client.QueryDependencies(ctx, req) + }) if err != nil { return dependencyTreeNode{}, err } @@ -424,43 +428,21 @@ func listModules(ctx context.Context, ps perseusapiconnect.PerseusServiceClient, }) for done := false; !done; { status("retrieving modules") - resp, err := ps.ListModules(ctx, req) + resp, err := retryOp(func() (*connect.Response[perseusapi.ListModulesResponse], error) { + return ps.ListModules(ctx, req) + }) if err != nil { - return nil, fmt.Errorf("todo: %w", err) + return nil, fmt.Errorf("Unable to list modules matching the provided filter: %w", err) } for _, mod := range resp.Msg.Modules { - status(fmt.Sprintf("determining latest version for %s", mod.GetName())) - req2 := connect.NewRequest(&perseusapi.ListModuleVersionsRequest{ - ModuleName: mod.GetName(), - VersionOption: perseusapi.ModuleVersionOption_latest, - }) - resp2, err := ps.ListModuleVersions(ctx, req2) - switch { - case err != nil: - return nil, fmt.Errorf("Unable to determine the current version for %s: %w", mod.GetName(), err) - - case len(resp2.Msg.Modules) == 0 || len(resp2.Msg.Modules[0].Versions) == 0: - status(fmt.Sprintf("no stable version found for %s, trying to find a pre-release", mod.GetName())) - req2.Msg.IncludePrerelease = true - resp2, err = ps.ListModuleVersions(ctx, req2) - switch { - case err != nil: - return nil, fmt.Errorf("Unable to determine the current version for %s: %w", mod.GetName(), err) - - case len(resp2.Msg.Modules) == 0 || len(resp2.Msg.Modules[0].Versions) == 0: - return nil, fmt.Errorf("No versions found for %s", mod.GetName()) - - default: - // got it - } - default: - // got it - } - - results = append(results, dependencyItem{ + item := dependencyItem{ Path: mod.GetName(), - Version: resp2.Msg.Modules[0].Versions[0], - }) + Version: "-", // show a dash if we don't have a version + } + if vers := mod.GetVersions(); len(vers) > 0 { + item.Version = vers[0] + } + results = append(results, item) } req.Msg.PageToken = resp.Msg.GetNextPageToken() done = (req.Msg.PageToken != "") @@ -493,7 +475,9 @@ func listModuleVersions(ctx context.Context, ps perseusapiconnect.PerseusService apiRequest.Msg.VersionOption = perseusapi.ModuleVersionOption_latest } req.updateStatus(fmt.Sprintf("retreiving versions for modules matching %q", req.modulePattern)) - resp, err := ps.ListModuleVersions(ctx, apiRequest) + resp, err := retryOp(func() (*connect.Response[perseusapi.ListModuleVersionsResponse], error) { + return ps.ListModuleVersions(ctx, apiRequest) + }) if err != nil { return nil, fmt.Errorf("todo: %w", err) } diff --git a/retry.go b/retry.go new file mode 100644 index 0000000..1758b4c --- /dev/null +++ b/retry.go @@ -0,0 +1,48 @@ +package main + +import ( + "crypto/rand" + "math/big" + "time" + + "connectrpc.com/connect" +) + +var ( + // subsequent retry delays for retryOp() + // - use the first 5 Fibonacci numbers for semi-exponential growth + // - the extra 0 value is a sentinel so we don't do another wait after we've exhausted all 5 retries + backoffDelays = []time.Duration{ + 100 * time.Millisecond, + 200 * time.Millisecond, + 300 * time.Millisecond, + 500 * time.Millisecond, + 800 * time.Millisecond, + 0, + } +) + +// retryOp performs the specified operation, retrying up to 5 times if the request returns a 502-Unavailable +// status to provide resiliency for transient failures due to LB flakiness (especially within K8S). +func retryOp[T any](op func() (T, error)) (result T, err error) { + var zero T + for _, wait := range backoffDelays { + result, err = op() + switch { + case err == nil: + return result, nil + case connect.CodeOf(err) == connect.CodeUnavailable: + if wait > 0 { + // inject up to 20% jitter + maxJitter := big.NewInt(int64(float64(int64(wait)) * 0.2)) + jitter, _ := rand.Int(rand.Reader, maxJitter) + wait += time.Duration(jitter.Int64()) + <-time.After(wait) + } + default: + return zero, err + } + } + // if we get here, err is non-nil and from a 502 but we have exhausted all retries + return zero, err +} diff --git a/update.go b/update.go index 5d5cc4d..3103a5e 100644 --- a/update.go +++ b/update.go @@ -7,6 +7,7 @@ import ( "net/http" "os" "path" + "time" "connectrpc.com/connect" "github.com/spf13/cobra" @@ -192,7 +193,10 @@ func getModuleInfoFromProxy(modulePath string) (moduleInfo, error) { // applyUpdates calls the Perseus server to update the dependencies of the specified module func applyUpdates(conf clientConfig, mod module.Version, deps []module.Version) (err error) { // create the client and call the server - ctx := context.Background() + // . be sure we don't hang "forever". 5s is a bit over 2X the cumulative retry delays (1900 ms) + // so this shouldn't generate any pre-mature aborts + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() client := conf.getClient() req := connect.NewRequest(&perseusapi.UpdateDependenciesRequest{ ModuleName: mod.Path, @@ -205,15 +209,17 @@ func applyUpdates(conf clientConfig, mod module.Version, deps []module.Version) Versions: []string{d.Version}, } } - if _, err = client.UpdateDependencies(ctx, req); err != nil { - return err - } - return nil + + _, err = retryOp(func() (struct{}, error) { + _, err := client.UpdateDependencies(ctx, req) + return struct{}{}, err + }) + return err } // moduleInfo represents the relevant Go module metadata for this application. // -// This struct does not contain a version because the Go module library (golang.org/x/mod/modfile) +// Values may not contain a version because the Go module library (golang.org/x/mod/modfile) // does not return a version for the "main" module even if it is a library package. type moduleInfo struct { // the module name, ex: github.com/CrowdStrike/perseus