Skip to content

Commit

Permalink
fix!: data races (#2843)
Browse files Browse the repository at this point in the history
* ci: enable test race checks

Enable race checks for all tests in CI.

* fix!: data various data races

Fix data race when determining default network, this required making
DockerProviderOptions.DefaultNetwork field private which is a breaking
change.

Fix data race in test bufLogger.

Fix data races on log production context cancellation and context timeout
not being cancelled in read loop.

BREAKING_CHANGE!

---------

Co-authored-by: Manuel de la Peña <[email protected]>
  • Loading branch information
stevenh and mdelapenya authored Oct 28, 2024
1 parent 11eb809 commit 032a69f
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 91 deletions.
3 changes: 2 additions & 1 deletion commons-test.mk
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ test-%: $(GOBIN)/gotestsum
-- \
-v \
-coverprofile=coverage.out \
-timeout=30m
-timeout=30m \
-race

.PHONY: tools
tools:
Expand Down
179 changes: 97 additions & 82 deletions docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"path/filepath"
"regexp"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
Expand Down Expand Up @@ -762,11 +763,15 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro
// Setup the log production context which will be used to stop the log production.
c.logProductionCtx, c.logProductionCancel = context.WithCancelCause(ctx)

go func() {
err := c.logProducer(stdout, stderr)
// Set context cancel cause, if not already set.
c.logProductionCancel(err)
}()
// We capture context cancel function to avoid data race with multiple
// calls to startLogProduction.
go func(cancel context.CancelCauseFunc) {
// Ensure the context is cancelled when log productions completes
// so that GetLogProductionErrorChannel functions correctly.
defer cancel(nil)

c.logProducer(stdout, stderr)
}(c.logProductionCancel)

return nil
}
Expand All @@ -775,40 +780,49 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro
// - logProductionCtx is done
// - A fatal error occurs
// - No more logs are available
func (c *DockerContainer) logProducer(stdout, stderr io.Writer) error {
func (c *DockerContainer) logProducer(stdout, stderr io.Writer) {
// Clean up idle client connections.
defer c.provider.Close()

// Setup the log options, start from the beginning.
options := container.LogsOptions{
options := &container.LogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true,
}

for {
timeoutCtx, cancel := context.WithTimeout(c.logProductionCtx, *c.logProductionTimeout)
defer cancel()
// Use a separate method so that timeout cancel function is
// called correctly.
for c.copyLogsTimeout(stdout, stderr, options) {
}
}

err := c.copyLogs(timeoutCtx, stdout, stderr, options)
switch {
case err == nil:
// No more logs available.
return nil
case c.logProductionCtx.Err() != nil:
// Log production was stopped or caller context is done.
return nil
case timeoutCtx.Err() != nil, errors.Is(err, net.ErrClosed):
// Timeout or client connection closed, retry.
default:
// Unexpected error, retry.
Logger.Printf("Unexpected error reading logs: %v", err)
}
// copyLogsTimeout copies logs from the container to stdout and stderr with a timeout.
// It returns true if the log production should be retried, false otherwise.
func (c *DockerContainer) copyLogsTimeout(stdout, stderr io.Writer, options *container.LogsOptions) bool {
timeoutCtx, cancel := context.WithTimeout(c.logProductionCtx, *c.logProductionTimeout)
defer cancel()

// Retry from the last log received.
now := time.Now()
options.Since = fmt.Sprintf("%d.%09d", now.Unix(), int64(now.Nanosecond()))
err := c.copyLogs(timeoutCtx, stdout, stderr, *options)
switch {
case err == nil:
// No more logs available.
return false
case c.logProductionCtx.Err() != nil:
// Log production was stopped or caller context is done.
return false
case timeoutCtx.Err() != nil, errors.Is(err, net.ErrClosed):
// Timeout or client connection closed, retry.
default:
// Unexpected error, retry.
Logger.Printf("Unexpected error reading logs: %v", err)
}

// Retry from the last log received.
now := time.Now()
options.Since = fmt.Sprintf("%d.%09d", now.Unix(), int64(now.Nanosecond()))

return true
}

// copyLogs copies logs from the container to stdout and stderr.
Expand Down Expand Up @@ -866,10 +880,12 @@ func (c *DockerContainer) GetLogProductionErrorChannel() <-chan error {
}

errCh := make(chan error, 1)
go func() {
<-c.logProductionCtx.Done()
errCh <- context.Cause(c.logProductionCtx)
}()
go func(ctx context.Context) {
<-ctx.Done()
errCh <- context.Cause(ctx)
close(errCh)
}(c.logProductionCtx)

return errCh
}

Expand Down Expand Up @@ -906,6 +922,7 @@ type DockerProvider struct {
host string
hostCache string
config config.Config
mtx sync.Mutex
}

// Client gets the docker client used by the provider
Expand Down Expand Up @@ -984,29 +1001,26 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
// defer the close of the Docker client connection the soonest
defer p.Close()

// Make sure that bridge network exists
// In case it is disabled we will create reaper_default network
if p.DefaultNetwork == "" {
p.DefaultNetwork, err = p.getDefaultNetwork(ctx, p.client)
if err != nil {
return nil, err
}
var defaultNetwork string
defaultNetwork, err = p.ensureDefaultNetwork(ctx)
if err != nil {
return nil, fmt.Errorf("ensure default network: %w", err)
}

// If default network is not bridge make sure it is attached to the request
// as container won't be attached to it automatically
// in case of Podman the bridge network is called 'podman' as 'bridge' would conflict
if p.DefaultNetwork != p.defaultBridgeNetworkName {
if defaultNetwork != p.defaultBridgeNetworkName {
isAttached := false
for _, net := range req.Networks {
if net == p.DefaultNetwork {
if net == defaultNetwork {
isAttached = true
break
}
}

if !isAttached {
req.Networks = append(req.Networks, p.DefaultNetwork)
req.Networks = append(req.Networks, defaultNetwork)
}
}

Expand Down Expand Up @@ -1461,12 +1475,8 @@ func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest)
// defer the close of the Docker client connection the soonest
defer p.Close()

// Make sure that bridge network exists
// In case it is disabled we will create reaper_default network
if p.DefaultNetwork == "" {
if p.DefaultNetwork, err = p.getDefaultNetwork(ctx, p.client); err != nil {
return nil, err
}
if _, err = p.ensureDefaultNetwork(ctx); err != nil {
return nil, fmt.Errorf("ensure default network: %w", err)
}

if req.Labels == nil {
Expand Down Expand Up @@ -1537,14 +1547,12 @@ func (p *DockerProvider) GetNetwork(ctx context.Context, req NetworkRequest) (ne

func (p *DockerProvider) GetGatewayIP(ctx context.Context) (string, error) {
// Use a default network as defined in the DockerProvider
if p.DefaultNetwork == "" {
var err error
p.DefaultNetwork, err = p.getDefaultNetwork(ctx, p.client)
if err != nil {
return "", err
}
defaultNetwork, err := p.ensureDefaultNetwork(ctx)
if err != nil {
return "", fmt.Errorf("ensure default network: %w", err)
}
nw, err := p.GetNetwork(ctx, NetworkRequest{Name: p.DefaultNetwork})

nw, err := p.GetNetwork(ctx, NetworkRequest{Name: defaultNetwork})
if err != nil {
return "", err
}
Expand All @@ -1563,43 +1571,50 @@ func (p *DockerProvider) GetGatewayIP(ctx context.Context) (string, error) {
return ip, nil
}

func (p *DockerProvider) getDefaultNetwork(ctx context.Context, cli client.APIClient) (string, error) {
// Get list of available networks
networkResources, err := cli.NetworkList(ctx, network.ListOptions{})
if err != nil {
return "", err
}
// ensureDefaultNetwork ensures that defaultNetwork is set and creates
// it if it does not exist, returning its value.
// It is safe to call this method concurrently.
func (p *DockerProvider) ensureDefaultNetwork(ctx context.Context) (string, error) {
p.mtx.Lock()
defer p.mtx.Unlock()

reaperNetwork := ReaperDefault
if p.defaultNetwork != "" {
// Already set.
return p.defaultNetwork, nil
}

reaperNetworkExists := false
networkResources, err := p.client.NetworkList(ctx, network.ListOptions{})
if err != nil {
return "", fmt.Errorf("network list: %w", err)
}

for _, net := range networkResources {
if net.Name == p.defaultBridgeNetworkName {
return p.defaultBridgeNetworkName, nil
}

if net.Name == reaperNetwork {
reaperNetworkExists = true
switch net.Name {
case p.defaultBridgeNetworkName:
p.defaultNetwork = p.defaultBridgeNetworkName
return p.defaultNetwork, nil
case ReaperDefault:
p.defaultNetwork = ReaperDefault
return p.defaultNetwork, nil
}
}

// Create a bridge network for the container communications
if !reaperNetworkExists {
_, err = cli.NetworkCreate(ctx, reaperNetwork, network.CreateOptions{
Driver: Bridge,
Attachable: true,
Labels: GenericLabels(),
})
// If the network already exists, we can ignore the error as that can
// happen if we are running multiple tests in parallel and we only
// need to ensure that the network exists.
if err != nil && !errdefs.IsConflict(err) {
return "", err
}
// Create a bridge network for the container communications.
_, err = p.client.NetworkCreate(ctx, ReaperDefault, network.CreateOptions{
Driver: Bridge,
Attachable: true,
Labels: GenericLabels(),
})
// If the network already exists, we can ignore the error as that can
// happen if we are running multiple tests in parallel and we only
// need to ensure that the network exists.
if err != nil && !errdefs.IsConflict(err) {
return "", fmt.Errorf("network create: %w", err)
}

return reaperNetwork, nil
p.defaultNetwork = ReaperDefault

return p.defaultNetwork, nil
}

// containerFromDockerResponse builds a Docker container struct from the response of the Docker API
Expand Down
11 changes: 7 additions & 4 deletions docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1790,11 +1790,14 @@ func TestGetGatewayIP(t *testing.T) {
require.NoError(t, err)
defer provider.Close()

ip, err := provider.(*DockerProvider).GetGatewayIP(context.Background())
require.NoError(t, err)
if ip == "" {
t.Fatal("could not get gateway ip")
dockerProvider, ok := provider.(*DockerProvider)
if !ok {
t.Skip("provider is not a DockerProvider")
}

ip, err := dockerProvider.GetGatewayIP(context.Background())
require.NoError(t, err)
require.NotEmpty(t, ip)
}

func TestNetworkModeWithContainerReference(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions network.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ type DefaultNetwork string

// Deprecated: will be removed in the future.
func (n DefaultNetwork) ApplyGenericTo(opts *GenericProviderOptions) {
opts.DefaultNetwork = string(n)
opts.defaultNetwork = string(n)
}

// Deprecated: will be removed in the future.
func (n DefaultNetwork) ApplyDockerTo(opts *DockerProviderOptions) {
opts.DefaultNetwork = string(n)
opts.defaultNetwork = string(n)
}

// Deprecated: will be removed in the future
Expand Down
2 changes: 1 addition & 1 deletion provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type (
// GenericProviderOptions defines options applicable to all providers
GenericProviderOptions struct {
Logger Logging
DefaultNetwork string
defaultNetwork string
}

// GenericProviderOption defines a common interface to modify GenericProviderOptions
Expand Down
7 changes: 6 additions & 1 deletion reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,12 @@ func (r *reaperSpawner) newReaper(ctx context.Context, sessionID string, provide

// Attach reaper container to a requested network if it is specified
if p, ok := provider.(*DockerProvider); ok {
req.Networks = append(req.Networks, p.DefaultNetwork)
defaultNetwork, err := p.ensureDefaultNetwork(ctx)
if err != nil {
return nil, fmt.Errorf("ensure default network: %w", err)
}

req.Networks = append(req.Networks, defaultNetwork)
}

c, err := provider.RunContainer(ctx, req)
Expand Down

0 comments on commit 032a69f

Please sign in to comment.