diff --git a/README.md b/README.md index 217b47d..952877b 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ printf "label=something_else" | nc -N localhost 8080 In the ryuk window you'll see containers/networks/volumes deleted after 10s ```log -time=2024-09-30T19:42:30.000+01:00 level=INFO msg=starting connection_timeout=1m0s reconnection_timeout=10s request_timeout=10s shutdown_timeout=10m0s remove_retries=10 retry_offset=-1s port=8080 verbose=false +time=2024-09-30T19:42:30.000+01:00 level=INFO msg=starting connection_timeout=1m0s reconnection_timeout=10s request_timeout=10s shutdown_timeout=10m0s remove_retries=10 retry_offset=-1s changes_retry_interval=1s port=8080 verbose=false time=2024-09-30T19:42:30.001+01:00 level=INFO msg="Started" time=2024-09-30T19:42:30.001+01:00 level=INFO msg="client processing started" time=2024-09-30T19:42:38.002+01:00 level=INFO msg="client connected" address=127.0.0.1:56432 clients=1 @@ -68,13 +68,14 @@ time=2024-09-30T19:42:52.216+01:00 level=INFO msg=done The following environment variables can be configured to change the behaviour: -| Environment Variable | Default | Format | Description | -| --------------------------- | ------- | ------- | ------------ | -| `RYUK_CONNECTION_TIMEOUT` | `60s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration without receiving any connections which will trigger a shutdown | -| `RYUK_PORT` | `8080` | `uint16` | The port to listen on for connections | -| `RYUK_RECONNECTION_TIMEOUT` | `10s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration after the last connection closes which will trigger resource clean up and shutdown | -| `RYUK_REQUEST_TIMEOUT` | `10s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The timeout for any Docker requests | -| `RYUK_REMOVE_RETRIES` | `10` | `int` | The number of times to retry removing a resource | -| `RYUK_RETRY_OFFSET` | `-1s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The offset added to the start time of the prune pass that is used as the minimum resource creation time. Any resource created after this calculated time will trigger a retry to ensure in use resources are not removed | -| `RYUK_VERBOSE` | `false` | `bool` | Whether to enable verbose aka debug logging | -| `RYUK_SHUTDOWN_TIMEOUT` | `10m` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration after shutdown has been requested when the remaining connections are ignored and prune checks start | +| Environment Variable | Default | Format | Description | +| ----------------------------- | ------- | ------- | ------------ | +| `RYUK_CONNECTION_TIMEOUT` | `60s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration without receiving any connections which will trigger a shutdown | +| `RYUK_PORT` | `8080` | `uint16` | The port to listen on for connections | +| `RYUK_RECONNECTION_TIMEOUT` | `10s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration after the last connection closes which will trigger resource clean up and shutdown | +| `RYUK_REQUEST_TIMEOUT` | `10s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The timeout for any Docker requests | +| `RYUK_REMOVE_RETRIES` | `10` | `int` | The number of times to retry removing a resource | +| `RYUK_RETRY_OFFSET` | `-1s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The offset added to the start time of the prune pass that is used as the minimum resource creation time. Any resource created after this calculated time will trigger a retry to ensure in use resources are not removed | +| `RYUK_CHANGES_RETRY_INTERVAL` | `1s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The internal between retries if resource changes (containers, networks, images, and volumes) are detected while pruning | +| `RYUK_VERBOSE` | `false` | `bool` | Whether to enable verbose aka debug logging | +| `RYUK_SHUTDOWN_TIMEOUT` | `10m` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration after shutdown has been requested when the remaining connections are ignored and prune checks start | diff --git a/config.go b/config.go index d594747..ca8607c 100644 --- a/config.go +++ b/config.go @@ -28,6 +28,10 @@ type config struct { // calculated time will trigger a retry to ensure in use resources are not removed. RetryOffset time.Duration `env:"RYUK_RETRY_OFFSET" envDefault:"-1s"` + // ChangesRetryInterval is the internal between retries if resource changes (containers, + // networks, images, and volumes) are detected while pruning. + ChangesRetryInterval time.Duration `env:"RYUK_CHANGES_RETRY_INTERVAL" envDefault:"1s"` + // ShutdownTimeout is the maximum amount of time the reaper will wait // for once signalled to shutdown before it terminates even if connections // are still established. @@ -49,6 +53,7 @@ func (c config) LogAttrs() []slog.Attr { slog.Duration("shutdown_timeout", c.ShutdownTimeout), slog.Int("remove_retries", c.RemoveRetries), slog.Duration("retry_offset", c.RetryOffset), + slog.Duration("changes_retry_interval", c.ChangesRetryInterval), slog.Int("port", int(c.Port)), slog.Bool("verbose", c.Verbose), } diff --git a/config_test.go b/config_test.go index 43f847f..9c5c84d 100644 --- a/config_test.go +++ b/config_test.go @@ -30,13 +30,14 @@ func Test_loadConfig(t *testing.T) { t.Run("defaults", func(t *testing.T) { expected := config{ - Port: 8080, - ConnectionTimeout: time.Minute, - ReconnectionTimeout: time.Second * 10, - ShutdownTimeout: time.Minute * 10, - RemoveRetries: 10, - RequestTimeout: time.Second * 10, - RetryOffset: -time.Second, + Port: 8080, + ConnectionTimeout: time.Minute, + ReconnectionTimeout: time.Second * 10, + ShutdownTimeout: time.Minute * 10, + RemoveRetries: 10, + RequestTimeout: time.Second * 10, + RetryOffset: -time.Second, + ChangesRetryInterval: time.Second, } cfg, err := loadConfig() @@ -53,16 +54,18 @@ func Test_loadConfig(t *testing.T) { t.Setenv("RYUK_REQUEST_TIMEOUT", "4s") t.Setenv("RYUK_REMOVE_RETRIES", "5") t.Setenv("RYUK_RETRY_OFFSET", "-6s") + t.Setenv("RYUK_CHANGES_RETRY_INTERVAL", "8s") expected := config{ - Port: 1234, - ConnectionTimeout: time.Second * 2, - ReconnectionTimeout: time.Second * 3, - ShutdownTimeout: time.Second * 7, - Verbose: true, - RemoveRetries: 5, - RequestTimeout: time.Second * 4, - RetryOffset: -time.Second * 6, + Port: 1234, + ConnectionTimeout: time.Second * 2, + ReconnectionTimeout: time.Second * 3, + ShutdownTimeout: time.Second * 7, + Verbose: true, + RemoveRetries: 5, + RequestTimeout: time.Second * 4, + RetryOffset: -time.Second * 6, + ChangesRetryInterval: time.Second * 8, } cfg, err := loadConfig() diff --git a/reaper.go b/reaper.go index ff5916f..3c6ec0c 100644 --- a/reaper.go +++ b/reaper.go @@ -286,6 +286,7 @@ func (r *reaper) pruneWait(ctx context.Context) (*resources, error) { clients := 0 pruneCheck := time.NewTicker(r.cfg.ConnectionTimeout) done := ctx.Done() + var shutdownDeadline time.Time for { select { case addr := <-r.connected: @@ -308,6 +309,7 @@ func (r *reaper) pruneWait(ctx context.Context) (*resources, error) { // a pruneCheck after a timeout and setting done // to nil so we don't enter this case again. r.shutdownListener() + shutdownDeadline = time.Now().Add(r.cfg.ShutdownTimeout) timeout := r.cfg.ShutdownTimeout if clients == 0 { // No clients connected, shutdown immediately. @@ -317,17 +319,23 @@ func (r *reaper) pruneWait(ctx context.Context) (*resources, error) { pruneCheck.Reset(timeout) done = nil case now := <-pruneCheck.C: - r.logger.Info("prune check", fieldClients, clients) - + level := slog.LevelInfo if clients > 0 { - r.logger.Warn("shutdown timeout", fieldClients, clients) + level = slog.LevelWarn } + r.logger.Log(context.Background(), level, "prune check", fieldClients, clients) //nolint:contextcheck // Ensure log is written. resources, err := r.resources(now.Add(r.cfg.RetryOffset)) //nolint:contextcheck // Needs its own context to ensure clean up completes. if err != nil { if errors.Is(err, errChangesDetected) { - r.logger.Warn("change detected, waiting again", fieldError, err) - continue + if shutdownDeadline.IsZero() || now.Before(shutdownDeadline) { + r.logger.Warn("change detected, waiting again", fieldError, err) + pruneCheck.Reset(r.cfg.ChangesRetryInterval) + continue + } + + // Still changes detected after shutdown timeout, force best effort prune. + r.logger.Warn("shutdown timeout reached, forcing prune", fieldError, err) } return resources, fmt.Errorf("resources: %w", err) @@ -338,51 +346,65 @@ func (r *reaper) pruneWait(ctx context.Context) (*resources, error) { } } -// resources returns the resources that match the collected filters. +// resources returns the resources that match the collected filters +// for which there are no changes detected. func (r *reaper) resources(since time.Time) (*resources, error) { var ret resources - var err error var errs []error // We combine errors so we can do best effort removal. for _, args := range r.filterArgs() { - if ret.containers, err = r.affectedContainers(since, args); err != nil { + containers, err := r.affectedContainers(since, args) + if err != nil { if !errors.Is(err, errChangesDetected) { r.logger.Error("affected containers", fieldError, err) } errs = append(errs, fmt.Errorf("affected containers: %w", err)) } - if ret.networks, err = r.affectedNetworks(since, args); err != nil { + ret.containers = append(ret.containers, containers...) + + networks, err := r.affectedNetworks(since, args) + if err != nil { if !errors.Is(err, errChangesDetected) { r.logger.Error("affected networks", fieldError, err) } errs = append(errs, fmt.Errorf("affected networks: %w", err)) } - if ret.volumes, err = r.affectedVolumes(since, args); err != nil { + ret.networks = append(ret.networks, networks...) + + volumes, err := r.affectedVolumes(since, args) + if err != nil { if !errors.Is(err, errChangesDetected) { r.logger.Error("affected volumes", fieldError, err) } errs = append(errs, fmt.Errorf("affected volumes: %w", err)) } - if ret.images, err = r.affectedImages(since, args); err != nil { + ret.volumes = append(ret.volumes, volumes...) + + images, err := r.affectedImages(since, args) + if err != nil { if !errors.Is(err, errChangesDetected) { r.logger.Error("affected images", fieldError, err) } errs = append(errs, fmt.Errorf("affected images: %w", err)) } + + ret.images = append(ret.images, images...) } return &ret, errors.Join(errs...) } // affectedContainers returns a slice of container IDs that match the filters. -// If a matching container was created after since, an error is returned. +// If a matching container was created after since, an error is returned and +// the container is not included in the list. func (r *reaper) affectedContainers(since time.Time, args filters.Args) ([]string, error) { ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout) defer cancel() + // List all containers including stopped ones. options := container.ListOptions{All: true, Filters: args} r.logger.Debug("listing containers", "filter", options) containers, err := r.client.ContainerList(ctx, options) @@ -390,6 +412,7 @@ func (r *reaper) affectedContainers(since time.Time, args filters.Args) ([]strin return nil, fmt.Errorf("container list: %w", err) } + var errChanges []error containerIDs := make([]string, 0, len(containers)) for _, container := range containers { if container.Labels[ryukLabel] == "true" { @@ -416,17 +439,19 @@ func (r *reaper) affectedContainers(since time.Time, args filters.Args) ([]strin if changed { // Its not safe to remove a container which was created after // the prune was initiated, as this may lead to unexpected behaviour. - return nil, fmt.Errorf("container %s: %w", container.ID, errChangesDetected) + errChanges = append(errChanges, fmt.Errorf("container %s: %w", container.ID, errChangesDetected)) + continue } containerIDs = append(containerIDs, container.ID) } - return containerIDs, nil + return containerIDs, errors.Join(errChanges...) } // affectedNetworks returns a list of network IDs that match the filters. -// If a matching network was created after since, an error is returned. +// If a matching network was created after since, an error is returned and +// the network is not included in the list. func (r *reaper) affectedNetworks(since time.Time, args filters.Args) ([]string, error) { ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout) defer cancel() @@ -438,6 +463,7 @@ func (r *reaper) affectedNetworks(since time.Time, args filters.Args) ([]string, return nil, fmt.Errorf("network list: %w", err) } + var errChanges []error networks := make([]string, 0, len(report)) for _, network := range report { changed := network.Created.After(since) @@ -451,17 +477,19 @@ func (r *reaper) affectedNetworks(since time.Time, args filters.Args) ([]string, if changed { // Its not safe to remove a network which was created after // the prune was initiated, as this may lead to unexpected behaviour. - return nil, fmt.Errorf("network %s: %w", network.ID, errChangesDetected) + errChanges = append(errChanges, fmt.Errorf("network %s: %w", network.ID, errChangesDetected)) + continue } networks = append(networks, network.ID) } - return networks, nil + return networks, errors.Join(errChanges...) } // affectedVolumes returns a list of volume names that match the filters. -// If a matching volume was created after since, an error is returned. +// If a matching volume was created after since, an error is returned and +// the volume is not included in the list. func (r *reaper) affectedVolumes(since time.Time, args filters.Args) ([]string, error) { ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout) defer cancel() @@ -473,6 +501,7 @@ func (r *reaper) affectedVolumes(since time.Time, args filters.Args) ([]string, return nil, fmt.Errorf("volume list: %w", err) } + var errChanges []error volumes := make([]string, 0, len(report.Volumes)) for _, volume := range report.Volumes { created, perr := time.Parse(time.RFC3339, volume.CreatedAt) @@ -493,17 +522,19 @@ func (r *reaper) affectedVolumes(since time.Time, args filters.Args) ([]string, if changed { // Its not safe to remove a volume which was created after // the prune was initiated, as this may lead to unexpected behaviour. - return nil, fmt.Errorf("volume %s: %w", volume.Name, errChangesDetected) + errChanges = append(errChanges, fmt.Errorf("volume %s: %w", volume.Name, errChangesDetected)) + continue } volumes = append(volumes, volume.Name) } - return volumes, nil + return volumes, errors.Join(errChanges...) } // affectedImages returns a list of image IDs that match the filters. -// If a matching volume was created after since, an error is returned. +// If a matching image was created after since, an error is returned and +// the image is not included in the list. func (r *reaper) affectedImages(since time.Time, args filters.Args) ([]string, error) { ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout) defer cancel() @@ -515,6 +546,7 @@ func (r *reaper) affectedImages(since time.Time, args filters.Args) ([]string, e return nil, fmt.Errorf("image list: %w", err) } + var errChanges []error images := make([]string, 0, len(report)) for _, image := range report { created := time.Unix(image.Created, 0) @@ -529,13 +561,14 @@ func (r *reaper) affectedImages(since time.Time, args filters.Args) ([]string, e if changed { // Its not safe to remove an image which was created after // the prune was initiated, as this may lead to unexpected behaviour. - return nil, fmt.Errorf("image %s: %w", image.ID, errChangesDetected) + errChanges = append(errChanges, fmt.Errorf("image %s: %w", image.ID, errChangesDetected)) + continue } images = append(images, image.ID) } - return images, nil + return images, errors.Join(errChanges...) } // addFilter adds a filter to prune. diff --git a/reaper_test.go b/reaper_test.go index c7e306d..ee17a45 100644 --- a/reaper_test.go +++ b/reaper_test.go @@ -9,6 +9,7 @@ import ( "io" "log/slog" "net" + "strconv" "strings" "sync" "syscall" @@ -17,6 +18,7 @@ import ( "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/image" "github.com/docker/docker/api/types/network" "github.com/docker/docker/api/types/volume" @@ -44,24 +46,34 @@ const ( var ( // testConfig is a config used for testing. testConfig = withConfig(config{ - Port: 0, - ConnectionTimeout: time.Millisecond * 500, - ReconnectionTimeout: time.Millisecond * 100, - RequestTimeout: time.Millisecond * 50, - ShutdownTimeout: time.Second * 2, - RemoveRetries: 1, - RetryOffset: -time.Second * 2, - Verbose: true, + Port: 0, + ConnectionTimeout: time.Millisecond * 500, + ReconnectionTimeout: time.Millisecond * 100, + RequestTimeout: time.Millisecond * 50, + ShutdownTimeout: time.Second * 2, + RemoveRetries: 1, + RetryOffset: -time.Second * 2, + ChangesRetryInterval: time.Millisecond * 100, + Verbose: true, }) // discardLogger is a logger that discards all logs. discardLogger = withLogger(slog.New(slog.NewTextHandler(io.Discard, nil))) - // testLabels is a set of test labels. - testLabels = map[string]string{ + // testLabels1 is a set of unique test labels. + testLabels1 = map[string]string{ labelBase: "true", - labelBase + ".sessionID": "test-session", + labelBase + ".sessionID": testID(), labelBase + ".version": "0.1.0", + labelBase + ".second": "true", + } + + // testLabels2 is a second set of unique test labels. + testLabels2 = map[string]string{ + labelBase: "true", + labelBase + ".sessionID": testID(), + labelBase + ".version": "0.1.0", + labelBase + ".first": "true", } // mockContext is a matcher that matches any context. @@ -103,15 +115,15 @@ func Test_newReaper(t *testing.T) { // testConnect connects to the given endpoint, sends filter labels, // and expects an ACK. The connection is closed when the context is done. -func testConnect(ctx context.Context, t *testing.T, endpoint string) { +func testConnect(ctx context.Context, t *testing.T, endpoint string, labels map[string]string) { t.Helper() var d net.Dialer conn, err := d.DialContext(ctx, "tcp", endpoint) require.NoError(t, err) - labelFilters := make([]string, 0, len(testLabels)) - for l, v := range testLabels { + labelFilters := make([]string, 0, len(labels)) + for l, v := range labels { labelFilters = append(labelFilters, fmt.Sprintf("label=%s=%s", l, v)) } @@ -167,6 +179,16 @@ func newRunTest() *runTest { } } +// filterArgs returns a new filter args for the given labels. +func filterArgs(labels map[string]string) filters.Args { + args := filters.NewArgs() + for k, v := range labels { + args.Add("label", k+"="+v) + } + + return args +} + // newMockClient returns a new mock client for the given test case. func newMockClient(tc *runTest) *mockClient { cli := &mockClient{} @@ -174,7 +196,9 @@ func newMockClient(tc *runTest) *mockClient { cli.On("NegotiateAPIVersion", mockContext).Return() // Mock the container list and remove calls. - cli.On("ContainerList", mockContext, mock.Anything).Return([]types.Container{ + filters1 := filterArgs(testLabels1) + filters2 := filterArgs(testLabels2) + cli.On("ContainerList", mockContext, container.ListOptions{All: true, Filters: filters1}).Return([]types.Container{ { ID: containerID1, Created: tc.createdAt1.Unix(), @@ -186,8 +210,10 @@ func newMockClient(tc *runTest) *mockClient { Type: "tcp", }}, State: "running", - Labels: testLabels, + Labels: testLabels1, }, + }, tc.containerListErr) + cli.On("ContainerList", mockContext, container.ListOptions{All: true, Filters: filters2}).Return([]types.Container{ { ID: containerID2, Created: tc.containerCreated2.Unix(), @@ -199,7 +225,7 @@ func newMockClient(tc *runTest) *mockClient { Type: "tcp", }}, State: "running", - Labels: testLabels, + Labels: testLabels2, }, }, tc.containerListErr) @@ -209,9 +235,12 @@ func newMockClient(tc *runTest) *mockClient { Return(tc.containerRemoveErr2) // Mock the network list and remove calls. - cli.On("NetworkList", mockContext, mock.Anything). + cli.On("NetworkList", mockContext, network.ListOptions{Filters: filters1}). Return([]network.Summary{ {ID: networkID1, Created: tc.createdAt1}, + }, tc.networkListErr) + cli.On("NetworkList", mockContext, network.ListOptions{Filters: filters2}). + Return([]network.Summary{ {ID: networkID2, Created: tc.networkCreated2}, }, tc.networkListErr) cli.On("NetworkRemove", mockContext, networkID1). @@ -220,10 +249,15 @@ func newMockClient(tc *runTest) *mockClient { Return(tc.networkRemoveErr2) // Mock the volume list and remove calls. - cli.On("VolumeList", mockContext, mock.Anything). + cli.On("VolumeList", mockContext, volume.ListOptions{Filters: filters1}). Return(volume.ListResponse{ Volumes: []*volume.Volume{ {Name: volumeName1, CreatedAt: tc.createdAt1.Format(time.RFC3339)}, + }, + }, tc.volumeListErr) + cli.On("VolumeList", mockContext, volume.ListOptions{Filters: filters2}). + Return(volume.ListResponse{ + Volumes: []*volume.Volume{ {Name: volumeName2, CreatedAt: tc.volumeCreated2.Format(time.RFC3339)}, }, }, tc.volumeListErr) @@ -233,8 +267,10 @@ func newMockClient(tc *runTest) *mockClient { Return(tc.volumeRemoveErr2) // Mock the image list and remove calls. - cli.On("ImageList", mockContext, mock.Anything).Return([]image.Summary{ + cli.On("ImageList", mockContext, image.ListOptions{Filters: filters1}).Return([]image.Summary{ {ID: imageID1, Created: tc.createdAt1.Unix()}, + }, tc.imageListErr) + cli.On("ImageList", mockContext, image.ListOptions{Filters: filters2}).Return([]image.Summary{ {ID: imageID2, Created: tc.imageCreated2.Unix()}, }, tc.imageListErr) cli.On("ImageRemove", mockContext, imageID1, imageRemoveOptions). @@ -269,8 +305,9 @@ func testReaperRun(t *testing.T, tc *runTest) (string, error) { t.Cleanup(clientCancel) addr := r.listener.Addr().String() - testConnect(clientCtx, t, addr) - testConnect(clientCtx, t, addr) + // Connect twice with different labels. + testConnect(clientCtx, t, addr, testLabels1) + testConnect(clientCtx, t, addr, testLabels2) select { case err = <-errCh: @@ -515,7 +552,7 @@ func TestAbortedClient(t *testing.T) { require.Contains(t, log.String(), "shutdown, aborting client") } -func TestShutdownTimeout(t *testing.T) { +func TestShutdownSignal(t *testing.T) { t.Run("slow-timeout", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) t.Cleanup(cancel) @@ -536,7 +573,7 @@ func TestShutdownTimeout(t *testing.T) { errCh <- r.run(runCtx) }() - testConnect(ctx, t, r.listener.Addr().String()) + testConnect(ctx, t, r.listener.Addr().String(), testLabels1) runCancel() select { @@ -548,7 +585,7 @@ func TestShutdownTimeout(t *testing.T) { data := log.String() require.Contains(t, data, "signal received") - require.Contains(t, data, "shutdown timeout") + require.Contains(t, data, `WARN msg="prune check" clients=1`) require.Contains(t, data, "done") }) @@ -574,7 +611,7 @@ func TestShutdownTimeout(t *testing.T) { connectCtx, connectCancel := context.WithTimeout(ctx, time.Millisecond*100) t.Cleanup(connectCancel) - testConnect(connectCtx, t, r.listener.Addr().String()) + testConnect(connectCtx, t, r.listener.Addr().String(), testLabels1) runCancel() select { @@ -586,7 +623,83 @@ func TestShutdownTimeout(t *testing.T) { data := log.String() require.Contains(t, data, "signal received") - require.NotContains(t, data, "shutdown timeout") + require.NotContains(t, data, `WARN msg="prune check" clients=1`) + require.Contains(t, data, "done") + }) + + t.Run("immediate-no-clients", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) + t.Cleanup(cancel) + + var log safeBuffer + logger := withLogger(slog.New(slog.NewTextHandler(&log, &slog.HandlerOptions{ + Level: slog.LevelDebug, + }))) + tc := newRunTest() + cli := newMockClient(tc) + r, err := newReaper(ctx, logger, withClient(cli), testConfig) + require.NoError(t, err) + + errCh := make(chan error, 1) + runCtx, runCancel := context.WithCancel(ctx) + t.Cleanup(runCancel) + go func() { + errCh <- r.run(runCtx) + }() + runCancel() + + select { + case err = <-errCh: + require.NoError(t, err) + case <-ctx.Done(): + t.Fatal("timeout", log.String()) + } + + data := log.String() + require.Contains(t, data, "signal received") + require.NotContains(t, data, `WARN msg="prune check" clients=1`) + require.Contains(t, data, "done") + }) + + t.Run("shutdown-timeout", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + t.Cleanup(cancel) + + var log safeBuffer + logger := withLogger(slog.New(slog.NewTextHandler(&log, &slog.HandlerOptions{ + Level: slog.LevelDebug, + }))) + tc := newRunTest() + // Always trigger a change. + tc.containerCreated2 = time.Now().Add(time.Hour) + cli := newMockClient(tc) + r, err := newReaper(ctx, logger, withClient(cli), testConfig) + require.NoError(t, err) + + errCh := make(chan error, 1) + runCtx, runCancel := context.WithCancel(ctx) + t.Cleanup(runCancel) + go func() { + errCh <- r.run(runCtx) + }() + + connectCtx, connectCancel := context.WithCancel(ctx) + t.Cleanup(connectCancel) + testConnect(connectCtx, t, r.listener.Addr().String(), testLabels2) + connectCancel() + runCancel() + + select { + case err = <-errCh: + require.EqualError(t, err, "prune wait: resources: affected containers: container container2: changes detected") + case <-ctx.Done(): + t.Fatal("timeout", log.String()) + } + + data := log.String() + require.Contains(t, data, "signal received") + require.Contains(t, data, "change detected, waiting again") + require.Contains(t, data, "shutdown timeout reached, forcing prune") require.Contains(t, data, "done") }) } @@ -595,34 +708,38 @@ func TestReapContainer(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) t.Cleanup(cancel) - // Run a test container. + // Run two containers with different labels. cli := testClient(t) - config := &container.Config{ - Image: testImage, - Cmd: []string{"sleep", "10"}, - Labels: testLabels, - } - resp, err := cli.ContainerCreate(ctx, config, nil, nil, nil, testID()) - if errdefs.IsNotFound(err) { - // Image not found, pull it. - var rc io.ReadCloser - rc, err = cli.ImagePull(ctx, testImage, image.PullOptions{}) + ids := make([]string, 2) + for i, labels := range []map[string]string{testLabels1, testLabels2} { + config := &container.Config{ + Image: testImage, + Cmd: []string{"sleep", "10"}, + Labels: labels, + } + resp, err := cli.ContainerCreate(ctx, config, nil, nil, nil, testID()) + if errdefs.IsNotFound(err) { + // Image not found, pull it. + var rc io.ReadCloser + rc, err = cli.ImagePull(ctx, testImage, image.PullOptions{}) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, rc.Close()) + }) + _, err = io.Copy(io.Discard, rc) + require.NoError(t, err) + resp, err = cli.ContainerCreate(ctx, config, nil, nil, nil, testID()) + } require.NoError(t, err) + ids[i] = resp.ID + t.Cleanup(func() { - require.NoError(t, rc.Close()) + // Ensure the container was / is removed. + err = cli.ContainerRemove(ctx, resp.ID, container.RemoveOptions{}) + require.Error(t, err) + require.True(t, errdefs.IsNotFound(err)) }) - _, err = io.Copy(io.Discard, rc) - require.NoError(t, err) - resp, err = cli.ContainerCreate(ctx, config, nil, nil, nil, testID()) } - require.NoError(t, err) - - t.Cleanup(func() { - // Ensure the container was / is removed. - err = cli.ContainerRemove(ctx, resp.ID, container.RemoveOptions{}) - require.Error(t, err) - require.True(t, errdefs.IsNotFound(err)) - }) // Speed up reaper for testing. t.Setenv("RYUK_RECONNECTION_TIMEOUT", "10ms") @@ -630,8 +747,9 @@ func TestReapContainer(t *testing.T) { t.Setenv("RYUK_PORT", "0") testReaper(ctx, t, - "msg=removed containers=1 networks=0 volumes=0 images=0", - "msg=remove resource=container id="+resp.ID, + "msg=removed containers=2 networks=0 volumes=0 images=0", + "msg=remove resource=container id="+ids[0], + "msg=remove resource=container id="+ids[1], ) } @@ -639,23 +757,28 @@ func TestReapNetwork(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) t.Cleanup(cancel) - // Create a test network. + // Create two networks with different labels. cli := testClient(t) - resp, err := cli.NetworkCreate(ctx, testID(), network.CreateOptions{ - Labels: testLabels, - }) - require.NoError(t, err) + ids := make([]string, 2) + for i, labels := range []map[string]string{testLabels1, testLabels2} { + resp, err := cli.NetworkCreate(ctx, testID(), network.CreateOptions{ + Labels: labels, + }) + require.NoError(t, err) + ids[i] = resp.ID - t.Cleanup(func() { - // Ensure the network was / is removed. - err = cli.NetworkRemove(ctx, resp.ID) - require.Error(t, err) - require.True(t, errdefs.IsNotFound(err)) - }) + t.Cleanup(func() { + // Ensure the network was / is removed. + err = cli.NetworkRemove(ctx, resp.ID) + require.Error(t, err) + require.True(t, errdefs.IsNotFound(err)) + }) + } testReaper(ctx, t, - "msg=removed containers=0 networks=1 volumes=0 images=0", - "msg=remove resource=network id="+resp.ID, + "msg=removed containers=0 networks=2 volumes=0 images=0", + "msg=remove resource=network id="+ids[0], + "msg=remove resource=network id="+ids[1], ) } @@ -663,23 +786,28 @@ func TestReapVolume(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) t.Cleanup(cancel) - // Create a test volume. + // Create two volumes with different labels. cli := testClient(t) - resp, err := cli.VolumeCreate(ctx, volume.CreateOptions{ - Labels: testLabels, - }) - require.NoError(t, err) + ids := make([]string, 2) + for i, labels := range []map[string]string{testLabels1, testLabels2} { + resp, err := cli.VolumeCreate(ctx, volume.CreateOptions{ + Labels: labels, + }) + require.NoError(t, err) + ids[i] = resp.Name - t.Cleanup(func() { - // Ensure the volume was / is removed. - err = cli.VolumeRemove(ctx, resp.Name, false) - require.Error(t, err) - require.True(t, errdefs.IsNotFound(err)) - }) + t.Cleanup(func() { + // Ensure the volume was / is removed. + err = cli.VolumeRemove(ctx, resp.Name, false) + require.Error(t, err) + require.True(t, errdefs.IsNotFound(err)) + }) + } testReaper(ctx, t, - "msg=removed containers=0 networks=0 volumes=1 images=0", - "msg=remove resource=volume id="+resp.Name, + "msg=removed containers=0 networks=0 volumes=2 images=0", + "msg=remove resource=volume id="+ids[0], + "msg=remove resource=volume id="+ids[1], ) } @@ -687,45 +815,55 @@ func TestReapImage(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*1000) t.Cleanup(cancel) - // Create a test image. + // Create two images with different labels. cli := testClient(t) - context, err := archive.Tar("testdata", archive.Uncompressed) - require.NoError(t, err) - resp, err := cli.ImageBuild(ctx, context, types.ImageBuildOptions{ - Version: types.BuilderBuildKit, - Labels: testLabels, - }) - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, resp.Body.Close()) - }) + ids := make([]string, 2) + for i, labels := range []map[string]string{testLabels1, testLabels2} { + context, err := archive.Tar("testdata", archive.Uncompressed) + require.NoError(t, err) + + arg1 := strconv.Itoa(i) + resp, err := cli.ImageBuild(ctx, context, types.ImageBuildOptions{ + Version: types.BuilderBuildKit, + BuildArgs: map[string]*string{ + "arg1": &arg1, + }, + Labels: labels, + }) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, resp.Body.Close()) + }) - // Process the build output, discarding it so we catch any - // errors and get the image ID. - var imageID string - auxCallback := func(msg jsonmessage.JSONMessage) { - if msg.ID != imageBuildResult { - return + // Process the build output, discarding it so we catch any + // errors and get the image ID. + var imageID string + auxCallback := func(msg jsonmessage.JSONMessage) { + if msg.ID != imageBuildResult { + return + } + var result types.BuildResult + err = json.Unmarshal(*msg.Aux, &result) + require.NoError(t, err) + imageID = result.ID } - var result types.BuildResult - err = json.Unmarshal(*msg.Aux, &result) + err = jsonmessage.DisplayJSONMessagesStream(resp.Body, io.Discard, 0, false, auxCallback) require.NoError(t, err) - imageID = result.ID - } - err = jsonmessage.DisplayJSONMessagesStream(resp.Body, io.Discard, 0, false, auxCallback) - require.NoError(t, err) - require.NotEmpty(t, imageID) + require.NotEmpty(t, imageID) + ids[i] = imageID - t.Cleanup(func() { - // Ensure the image was / is removed. - resp, errc := cli.ImageRemove(ctx, imageID, image.RemoveOptions{}) - require.Error(t, errc) - require.Empty(t, resp) - }) + t.Cleanup(func() { + // Ensure the image was / is removed. + resp, errc := cli.ImageRemove(ctx, imageID, image.RemoveOptions{}) + require.Error(t, errc) + require.Empty(t, resp) + }) + } testReaper(ctx, t, - "msg=removed containers=0 networks=0 volumes=0 images=1", - "msg=remove resource=image id="+imageID, + "msg=removed containers=0 networks=0 volumes=0 images=2", + "msg=remove resource=image id="+ids[0], + "msg=remove resource=image id="+ids[1], ) } @@ -777,7 +915,10 @@ func testReaper(ctx context.Context, t *testing.T, expect ...string) { addr := r.listener.Addr().String() clientCtx, clientCancel := context.WithCancel(ctx) t.Cleanup(clientCancel) // Ensure the clientCtx is cancelled on failure. - testConnect(clientCtx, t, addr) + // Connect multiple times with different labels. + testConnect(clientCtx, t, addr, testLabels1) + testConnect(clientCtx, t, addr, testLabels2) + testConnect(clientCtx, t, addr, testLabels1) // Duplicate should be ignored. clientCancel() select { diff --git a/testdata/Dockerfile b/testdata/Dockerfile index c35f1b5..9ffcdda 100644 --- a/testdata/Dockerfile +++ b/testdata/Dockerfile @@ -1 +1,3 @@ FROM scratch +ARG arg1 +WORKDIR ${arg1}}