Skip to content

Commit

Permalink
fix: multiple filters (#173)
Browse files Browse the repository at this point in the history
Fix multiple filters which was only applying the removal of the last
iterated set.

Add tests to validate that:
* Multiple filters are correctly processed
* Duplicate filters are correctly ignored
* Immediate shutdown occurs if no clients

This increasing code coverage to 89.4%.

Use best effort in removal if resource changes are detected after
a shutdown has been signalled to allow as much to be cleaned up as
possible.

This includes a new setting ChangesRetryInterval exposed by the env
variable RYUK_CHANGES_RETRY_INTERVAL and defaults to 1 second which
is used to control the interval between retries if resource changes
are detected.
  • Loading branch information
stevenh authored Oct 23, 2024
1 parent cbdc142 commit e08866d
Show file tree
Hide file tree
Showing 6 changed files with 345 additions and 160 deletions.
23 changes: 12 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ printf "label=something_else" | nc -N localhost 8080
In the ryuk window you'll see containers/networks/volumes deleted after 10s

```log
time=2024-09-30T19:42:30.000+01:00 level=INFO msg=starting connection_timeout=1m0s reconnection_timeout=10s request_timeout=10s shutdown_timeout=10m0s remove_retries=10 retry_offset=-1s port=8080 verbose=false
time=2024-09-30T19:42:30.000+01:00 level=INFO msg=starting connection_timeout=1m0s reconnection_timeout=10s request_timeout=10s shutdown_timeout=10m0s remove_retries=10 retry_offset=-1s changes_retry_interval=1s port=8080 verbose=false
time=2024-09-30T19:42:30.001+01:00 level=INFO msg="Started"
time=2024-09-30T19:42:30.001+01:00 level=INFO msg="client processing started"
time=2024-09-30T19:42:38.002+01:00 level=INFO msg="client connected" address=127.0.0.1:56432 clients=1
Expand All @@ -68,13 +68,14 @@ time=2024-09-30T19:42:52.216+01:00 level=INFO msg=done

The following environment variables can be configured to change the behaviour:

| Environment Variable | Default | Format | Description |
| --------------------------- | ------- | ------- | ------------ |
| `RYUK_CONNECTION_TIMEOUT` | `60s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration without receiving any connections which will trigger a shutdown |
| `RYUK_PORT` | `8080` | `uint16` | The port to listen on for connections |
| `RYUK_RECONNECTION_TIMEOUT` | `10s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration after the last connection closes which will trigger resource clean up and shutdown |
| `RYUK_REQUEST_TIMEOUT` | `10s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The timeout for any Docker requests |
| `RYUK_REMOVE_RETRIES` | `10` | `int` | The number of times to retry removing a resource |
| `RYUK_RETRY_OFFSET` | `-1s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The offset added to the start time of the prune pass that is used as the minimum resource creation time. Any resource created after this calculated time will trigger a retry to ensure in use resources are not removed |
| `RYUK_VERBOSE` | `false` | `bool` | Whether to enable verbose aka debug logging |
| `RYUK_SHUTDOWN_TIMEOUT` | `10m` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration after shutdown has been requested when the remaining connections are ignored and prune checks start |
| Environment Variable | Default | Format | Description |
| ----------------------------- | ------- | ------- | ------------ |
| `RYUK_CONNECTION_TIMEOUT` | `60s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration without receiving any connections which will trigger a shutdown |
| `RYUK_PORT` | `8080` | `uint16` | The port to listen on for connections |
| `RYUK_RECONNECTION_TIMEOUT` | `10s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration after the last connection closes which will trigger resource clean up and shutdown |
| `RYUK_REQUEST_TIMEOUT` | `10s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The timeout for any Docker requests |
| `RYUK_REMOVE_RETRIES` | `10` | `int` | The number of times to retry removing a resource |
| `RYUK_RETRY_OFFSET` | `-1s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The offset added to the start time of the prune pass that is used as the minimum resource creation time. Any resource created after this calculated time will trigger a retry to ensure in use resources are not removed |
| `RYUK_CHANGES_RETRY_INTERVAL` | `1s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The internal between retries if resource changes (containers, networks, images, and volumes) are detected while pruning |
| `RYUK_VERBOSE` | `false` | `bool` | Whether to enable verbose aka debug logging |
| `RYUK_SHUTDOWN_TIMEOUT` | `10m` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration after shutdown has been requested when the remaining connections are ignored and prune checks start |
5 changes: 5 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ type config struct {
// calculated time will trigger a retry to ensure in use resources are not removed.
RetryOffset time.Duration `env:"RYUK_RETRY_OFFSET" envDefault:"-1s"`

// ChangesRetryInterval is the internal between retries if resource changes (containers,
// networks, images, and volumes) are detected while pruning.
ChangesRetryInterval time.Duration `env:"RYUK_CHANGES_RETRY_INTERVAL" envDefault:"1s"`

// ShutdownTimeout is the maximum amount of time the reaper will wait
// for once signalled to shutdown before it terminates even if connections
// are still established.
Expand All @@ -49,6 +53,7 @@ func (c config) LogAttrs() []slog.Attr {
slog.Duration("shutdown_timeout", c.ShutdownTimeout),
slog.Int("remove_retries", c.RemoveRetries),
slog.Duration("retry_offset", c.RetryOffset),
slog.Duration("changes_retry_interval", c.ChangesRetryInterval),
slog.Int("port", int(c.Port)),
slog.Bool("verbose", c.Verbose),
}
Expand Down
33 changes: 18 additions & 15 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ func Test_loadConfig(t *testing.T) {

t.Run("defaults", func(t *testing.T) {
expected := config{
Port: 8080,
ConnectionTimeout: time.Minute,
ReconnectionTimeout: time.Second * 10,
ShutdownTimeout: time.Minute * 10,
RemoveRetries: 10,
RequestTimeout: time.Second * 10,
RetryOffset: -time.Second,
Port: 8080,
ConnectionTimeout: time.Minute,
ReconnectionTimeout: time.Second * 10,
ShutdownTimeout: time.Minute * 10,
RemoveRetries: 10,
RequestTimeout: time.Second * 10,
RetryOffset: -time.Second,
ChangesRetryInterval: time.Second,
}

cfg, err := loadConfig()
Expand All @@ -53,16 +54,18 @@ func Test_loadConfig(t *testing.T) {
t.Setenv("RYUK_REQUEST_TIMEOUT", "4s")
t.Setenv("RYUK_REMOVE_RETRIES", "5")
t.Setenv("RYUK_RETRY_OFFSET", "-6s")
t.Setenv("RYUK_CHANGES_RETRY_INTERVAL", "8s")

expected := config{
Port: 1234,
ConnectionTimeout: time.Second * 2,
ReconnectionTimeout: time.Second * 3,
ShutdownTimeout: time.Second * 7,
Verbose: true,
RemoveRetries: 5,
RequestTimeout: time.Second * 4,
RetryOffset: -time.Second * 6,
Port: 1234,
ConnectionTimeout: time.Second * 2,
ReconnectionTimeout: time.Second * 3,
ShutdownTimeout: time.Second * 7,
Verbose: true,
RemoveRetries: 5,
RequestTimeout: time.Second * 4,
RetryOffset: -time.Second * 6,
ChangesRetryInterval: time.Second * 8,
}

cfg, err := loadConfig()
Expand Down
79 changes: 56 additions & 23 deletions reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func (r *reaper) pruneWait(ctx context.Context) (*resources, error) {
clients := 0
pruneCheck := time.NewTicker(r.cfg.ConnectionTimeout)
done := ctx.Done()
var shutdownDeadline time.Time
for {
select {
case addr := <-r.connected:
Expand All @@ -308,6 +309,7 @@ func (r *reaper) pruneWait(ctx context.Context) (*resources, error) {
// a pruneCheck after a timeout and setting done
// to nil so we don't enter this case again.
r.shutdownListener()
shutdownDeadline = time.Now().Add(r.cfg.ShutdownTimeout)
timeout := r.cfg.ShutdownTimeout
if clients == 0 {
// No clients connected, shutdown immediately.
Expand All @@ -317,17 +319,23 @@ func (r *reaper) pruneWait(ctx context.Context) (*resources, error) {
pruneCheck.Reset(timeout)
done = nil
case now := <-pruneCheck.C:
r.logger.Info("prune check", fieldClients, clients)

level := slog.LevelInfo
if clients > 0 {
r.logger.Warn("shutdown timeout", fieldClients, clients)
level = slog.LevelWarn
}
r.logger.Log(context.Background(), level, "prune check", fieldClients, clients) //nolint:contextcheck // Ensure log is written.

resources, err := r.resources(now.Add(r.cfg.RetryOffset)) //nolint:contextcheck // Needs its own context to ensure clean up completes.
if err != nil {
if errors.Is(err, errChangesDetected) {
r.logger.Warn("change detected, waiting again", fieldError, err)
continue
if shutdownDeadline.IsZero() || now.Before(shutdownDeadline) {
r.logger.Warn("change detected, waiting again", fieldError, err)
pruneCheck.Reset(r.cfg.ChangesRetryInterval)
continue
}

// Still changes detected after shutdown timeout, force best effort prune.
r.logger.Warn("shutdown timeout reached, forcing prune", fieldError, err)
}

return resources, fmt.Errorf("resources: %w", err)
Expand All @@ -338,58 +346,73 @@ func (r *reaper) pruneWait(ctx context.Context) (*resources, error) {
}
}

// resources returns the resources that match the collected filters.
// resources returns the resources that match the collected filters
// for which there are no changes detected.
func (r *reaper) resources(since time.Time) (*resources, error) {
var ret resources
var err error
var errs []error
// We combine errors so we can do best effort removal.
for _, args := range r.filterArgs() {
if ret.containers, err = r.affectedContainers(since, args); err != nil {
containers, err := r.affectedContainers(since, args)
if err != nil {
if !errors.Is(err, errChangesDetected) {
r.logger.Error("affected containers", fieldError, err)
}
errs = append(errs, fmt.Errorf("affected containers: %w", err))
}

if ret.networks, err = r.affectedNetworks(since, args); err != nil {
ret.containers = append(ret.containers, containers...)

networks, err := r.affectedNetworks(since, args)
if err != nil {
if !errors.Is(err, errChangesDetected) {
r.logger.Error("affected networks", fieldError, err)
}
errs = append(errs, fmt.Errorf("affected networks: %w", err))
}

if ret.volumes, err = r.affectedVolumes(since, args); err != nil {
ret.networks = append(ret.networks, networks...)

volumes, err := r.affectedVolumes(since, args)
if err != nil {
if !errors.Is(err, errChangesDetected) {
r.logger.Error("affected volumes", fieldError, err)
}
errs = append(errs, fmt.Errorf("affected volumes: %w", err))
}

if ret.images, err = r.affectedImages(since, args); err != nil {
ret.volumes = append(ret.volumes, volumes...)

images, err := r.affectedImages(since, args)
if err != nil {
if !errors.Is(err, errChangesDetected) {
r.logger.Error("affected images", fieldError, err)
}
errs = append(errs, fmt.Errorf("affected images: %w", err))
}

ret.images = append(ret.images, images...)
}

return &ret, errors.Join(errs...)
}

// affectedContainers returns a slice of container IDs that match the filters.
// If a matching container was created after since, an error is returned.
// If a matching container was created after since, an error is returned and
// the container is not included in the list.
func (r *reaper) affectedContainers(since time.Time, args filters.Args) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout)
defer cancel()

// List all containers including stopped ones.
options := container.ListOptions{All: true, Filters: args}
r.logger.Debug("listing containers", "filter", options)
containers, err := r.client.ContainerList(ctx, options)
if err != nil {
return nil, fmt.Errorf("container list: %w", err)
}

var errChanges []error
containerIDs := make([]string, 0, len(containers))
for _, container := range containers {
if container.Labels[ryukLabel] == "true" {
Expand All @@ -416,17 +439,19 @@ func (r *reaper) affectedContainers(since time.Time, args filters.Args) ([]strin
if changed {
// Its not safe to remove a container which was created after
// the prune was initiated, as this may lead to unexpected behaviour.
return nil, fmt.Errorf("container %s: %w", container.ID, errChangesDetected)
errChanges = append(errChanges, fmt.Errorf("container %s: %w", container.ID, errChangesDetected))
continue
}

containerIDs = append(containerIDs, container.ID)
}

return containerIDs, nil
return containerIDs, errors.Join(errChanges...)
}

// affectedNetworks returns a list of network IDs that match the filters.
// If a matching network was created after since, an error is returned.
// If a matching network was created after since, an error is returned and
// the network is not included in the list.
func (r *reaper) affectedNetworks(since time.Time, args filters.Args) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout)
defer cancel()
Expand All @@ -438,6 +463,7 @@ func (r *reaper) affectedNetworks(since time.Time, args filters.Args) ([]string,
return nil, fmt.Errorf("network list: %w", err)
}

var errChanges []error
networks := make([]string, 0, len(report))
for _, network := range report {
changed := network.Created.After(since)
Expand All @@ -451,17 +477,19 @@ func (r *reaper) affectedNetworks(since time.Time, args filters.Args) ([]string,
if changed {
// Its not safe to remove a network which was created after
// the prune was initiated, as this may lead to unexpected behaviour.
return nil, fmt.Errorf("network %s: %w", network.ID, errChangesDetected)
errChanges = append(errChanges, fmt.Errorf("network %s: %w", network.ID, errChangesDetected))
continue
}

networks = append(networks, network.ID)
}

return networks, nil
return networks, errors.Join(errChanges...)
}

// affectedVolumes returns a list of volume names that match the filters.
// If a matching volume was created after since, an error is returned.
// If a matching volume was created after since, an error is returned and
// the volume is not included in the list.
func (r *reaper) affectedVolumes(since time.Time, args filters.Args) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout)
defer cancel()
Expand All @@ -473,6 +501,7 @@ func (r *reaper) affectedVolumes(since time.Time, args filters.Args) ([]string,
return nil, fmt.Errorf("volume list: %w", err)
}

var errChanges []error
volumes := make([]string, 0, len(report.Volumes))
for _, volume := range report.Volumes {
created, perr := time.Parse(time.RFC3339, volume.CreatedAt)
Expand All @@ -493,17 +522,19 @@ func (r *reaper) affectedVolumes(since time.Time, args filters.Args) ([]string,
if changed {
// Its not safe to remove a volume which was created after
// the prune was initiated, as this may lead to unexpected behaviour.
return nil, fmt.Errorf("volume %s: %w", volume.Name, errChangesDetected)
errChanges = append(errChanges, fmt.Errorf("volume %s: %w", volume.Name, errChangesDetected))
continue
}

volumes = append(volumes, volume.Name)
}

return volumes, nil
return volumes, errors.Join(errChanges...)
}

// affectedImages returns a list of image IDs that match the filters.
// If a matching volume was created after since, an error is returned.
// If a matching image was created after since, an error is returned and
// the image is not included in the list.
func (r *reaper) affectedImages(since time.Time, args filters.Args) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout)
defer cancel()
Expand All @@ -515,6 +546,7 @@ func (r *reaper) affectedImages(since time.Time, args filters.Args) ([]string, e
return nil, fmt.Errorf("image list: %w", err)
}

var errChanges []error
images := make([]string, 0, len(report))
for _, image := range report {
created := time.Unix(image.Created, 0)
Expand All @@ -529,13 +561,14 @@ func (r *reaper) affectedImages(since time.Time, args filters.Args) ([]string, e
if changed {
// Its not safe to remove an image which was created after
// the prune was initiated, as this may lead to unexpected behaviour.
return nil, fmt.Errorf("image %s: %w", image.ID, errChangesDetected)
errChanges = append(errChanges, fmt.Errorf("image %s: %w", image.ID, errChangesDetected))
continue
}

images = append(images, image.ID)
}

return images, nil
return images, errors.Join(errChanges...)
}

// addFilter adds a filter to prune.
Expand Down
Loading

0 comments on commit e08866d

Please sign in to comment.