Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix!: data races #2843

Merged
merged 3 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion commons-test.mk
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ test-%: $(GOBIN)/gotestsum
-- \
-v \
-coverprofile=coverage.out \
-timeout=30m
-timeout=30m \
-race

.PHONY: tools
tools:
Expand Down
179 changes: 97 additions & 82 deletions docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"path/filepath"
"regexp"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
Expand Down Expand Up @@ -762,11 +763,15 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro
// Setup the log production context which will be used to stop the log production.
c.logProductionCtx, c.logProductionCancel = context.WithCancelCause(ctx)

go func() {
err := c.logProducer(stdout, stderr)
// Set context cancel cause, if not already set.
c.logProductionCancel(err)
}()
// We capture context cancel function to avoid data race with multiple
// calls to startLogProduction.
go func(cancel context.CancelCauseFunc) {
// Ensure the context is cancelled when log productions completes
// so that GetLogProductionErrorChannel functions correctly.
defer cancel(nil)

c.logProducer(stdout, stderr)
}(c.logProductionCancel)

return nil
}
Expand All @@ -775,40 +780,49 @@ func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogPro
// - logProductionCtx is done
// - A fatal error occurs
// - No more logs are available
func (c *DockerContainer) logProducer(stdout, stderr io.Writer) error {
func (c *DockerContainer) logProducer(stdout, stderr io.Writer) {
// Clean up idle client connections.
defer c.provider.Close()

// Setup the log options, start from the beginning.
options := container.LogsOptions{
options := &container.LogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true,
}

for {
timeoutCtx, cancel := context.WithTimeout(c.logProductionCtx, *c.logProductionTimeout)
defer cancel()
// Use a separate method so that timeout cancel function is
// called correctly.
for c.copyLogsTimeout(stdout, stderr, options) {
}
}

err := c.copyLogs(timeoutCtx, stdout, stderr, options)
switch {
case err == nil:
// No more logs available.
return nil
case c.logProductionCtx.Err() != nil:
// Log production was stopped or caller context is done.
return nil
case timeoutCtx.Err() != nil, errors.Is(err, net.ErrClosed):
// Timeout or client connection closed, retry.
default:
// Unexpected error, retry.
Logger.Printf("Unexpected error reading logs: %v", err)
}
// copyLogsTimeout copies logs from the container to stdout and stderr with a timeout.
// It returns true if the log production should be retried, false otherwise.
func (c *DockerContainer) copyLogsTimeout(stdout, stderr io.Writer, options *container.LogsOptions) bool {
timeoutCtx, cancel := context.WithTimeout(c.logProductionCtx, *c.logProductionTimeout)
defer cancel()

// Retry from the last log received.
now := time.Now()
options.Since = fmt.Sprintf("%d.%09d", now.Unix(), int64(now.Nanosecond()))
err := c.copyLogs(timeoutCtx, stdout, stderr, *options)
switch {
case err == nil:
// No more logs available.
return false
case c.logProductionCtx.Err() != nil:
// Log production was stopped or caller context is done.
return false
case timeoutCtx.Err() != nil, errors.Is(err, net.ErrClosed):
// Timeout or client connection closed, retry.
default:
// Unexpected error, retry.
Logger.Printf("Unexpected error reading logs: %v", err)
}

// Retry from the last log received.
now := time.Now()
options.Since = fmt.Sprintf("%d.%09d", now.Unix(), int64(now.Nanosecond()))

return true
}

// copyLogs copies logs from the container to stdout and stderr.
Expand Down Expand Up @@ -866,10 +880,12 @@ func (c *DockerContainer) GetLogProductionErrorChannel() <-chan error {
}

errCh := make(chan error, 1)
go func() {
<-c.logProductionCtx.Done()
errCh <- context.Cause(c.logProductionCtx)
}()
go func(ctx context.Context) {
<-ctx.Done()
errCh <- context.Cause(ctx)
close(errCh)
}(c.logProductionCtx)

return errCh
}

Expand Down Expand Up @@ -906,6 +922,7 @@ type DockerProvider struct {
host string
hostCache string
config config.Config
mtx sync.Mutex
}

// Client gets the docker client used by the provider
Expand Down Expand Up @@ -984,29 +1001,26 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
// defer the close of the Docker client connection the soonest
defer p.Close()

// Make sure that bridge network exists
// In case it is disabled we will create reaper_default network
if p.DefaultNetwork == "" {
p.DefaultNetwork, err = p.getDefaultNetwork(ctx, p.client)
if err != nil {
return nil, err
}
var defaultNetwork string
defaultNetwork, err = p.ensureDefaultNetwork(ctx)
if err != nil {
return nil, fmt.Errorf("ensure default network: %w", err)
}

// If default network is not bridge make sure it is attached to the request
// as container won't be attached to it automatically
// in case of Podman the bridge network is called 'podman' as 'bridge' would conflict
if p.DefaultNetwork != p.defaultBridgeNetworkName {
if defaultNetwork != p.defaultBridgeNetworkName {
isAttached := false
for _, net := range req.Networks {
if net == p.DefaultNetwork {
if net == defaultNetwork {
isAttached = true
break
}
}

if !isAttached {
req.Networks = append(req.Networks, p.DefaultNetwork)
req.Networks = append(req.Networks, defaultNetwork)
}
}

Expand Down Expand Up @@ -1461,12 +1475,8 @@ func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest)
// defer the close of the Docker client connection the soonest
defer p.Close()

// Make sure that bridge network exists
// In case it is disabled we will create reaper_default network
if p.DefaultNetwork == "" {
if p.DefaultNetwork, err = p.getDefaultNetwork(ctx, p.client); err != nil {
return nil, err
}
if _, err = p.ensureDefaultNetwork(ctx); err != nil {
return nil, fmt.Errorf("ensure default network: %w", err)
}

if req.Labels == nil {
Expand Down Expand Up @@ -1537,14 +1547,12 @@ func (p *DockerProvider) GetNetwork(ctx context.Context, req NetworkRequest) (ne

func (p *DockerProvider) GetGatewayIP(ctx context.Context) (string, error) {
// Use a default network as defined in the DockerProvider
if p.DefaultNetwork == "" {
var err error
p.DefaultNetwork, err = p.getDefaultNetwork(ctx, p.client)
if err != nil {
return "", err
}
defaultNetwork, err := p.ensureDefaultNetwork(ctx)
if err != nil {
return "", fmt.Errorf("ensure default network: %w", err)
}
nw, err := p.GetNetwork(ctx, NetworkRequest{Name: p.DefaultNetwork})

nw, err := p.GetNetwork(ctx, NetworkRequest{Name: defaultNetwork})
if err != nil {
return "", err
}
Expand All @@ -1563,43 +1571,50 @@ func (p *DockerProvider) GetGatewayIP(ctx context.Context) (string, error) {
return ip, nil
}

func (p *DockerProvider) getDefaultNetwork(ctx context.Context, cli client.APIClient) (string, error) {
// Get list of available networks
networkResources, err := cli.NetworkList(ctx, network.ListOptions{})
if err != nil {
return "", err
}
// ensureDefaultNetwork ensures that defaultNetwork is set and creates
// it if it does not exist, returning its value.
// It is safe to call this method concurrently.
func (p *DockerProvider) ensureDefaultNetwork(ctx context.Context) (string, error) {
stevenh marked this conversation as resolved.
Show resolved Hide resolved
p.mtx.Lock()
defer p.mtx.Unlock()

reaperNetwork := ReaperDefault
if p.defaultNetwork != "" {
// Already set.
return p.defaultNetwork, nil
}

reaperNetworkExists := false
networkResources, err := p.client.NetworkList(ctx, network.ListOptions{})
if err != nil {
return "", fmt.Errorf("network list: %w", err)
}

for _, net := range networkResources {
if net.Name == p.defaultBridgeNetworkName {
return p.defaultBridgeNetworkName, nil
}

if net.Name == reaperNetwork {
reaperNetworkExists = true
switch net.Name {
case p.defaultBridgeNetworkName:
p.defaultNetwork = p.defaultBridgeNetworkName
return p.defaultNetwork, nil
case ReaperDefault:
p.defaultNetwork = ReaperDefault
return p.defaultNetwork, nil
}
}

// Create a bridge network for the container communications
if !reaperNetworkExists {
_, err = cli.NetworkCreate(ctx, reaperNetwork, network.CreateOptions{
Driver: Bridge,
Attachable: true,
Labels: GenericLabels(),
})
// If the network already exists, we can ignore the error as that can
// happen if we are running multiple tests in parallel and we only
// need to ensure that the network exists.
if err != nil && !errdefs.IsConflict(err) {
return "", err
}
// Create a bridge network for the container communications.
_, err = p.client.NetworkCreate(ctx, ReaperDefault, network.CreateOptions{
Driver: Bridge,
Attachable: true,
Labels: GenericLabels(),
})
// If the network already exists, we can ignore the error as that can
// happen if we are running multiple tests in parallel and we only
// need to ensure that the network exists.
if err != nil && !errdefs.IsConflict(err) {
return "", fmt.Errorf("network create: %w", err)
}

return reaperNetwork, nil
p.defaultNetwork = ReaperDefault

return p.defaultNetwork, nil
}

// containerFromDockerResponse builds a Docker container struct from the response of the Docker API
Expand Down
11 changes: 7 additions & 4 deletions docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1790,11 +1790,14 @@ func TestGetGatewayIP(t *testing.T) {
require.NoError(t, err)
defer provider.Close()

ip, err := provider.(*DockerProvider).GetGatewayIP(context.Background())
require.NoError(t, err)
if ip == "" {
t.Fatal("could not get gateway ip")
dockerProvider, ok := provider.(*DockerProvider)
if !ok {
t.Skip("provider is not a DockerProvider")
}

ip, err := dockerProvider.GetGatewayIP(context.Background())
require.NoError(t, err)
require.NotEmpty(t, ip)
}

func TestNetworkModeWithContainerReference(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions network.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ type DefaultNetwork string

// Deprecated: will be removed in the future.
func (n DefaultNetwork) ApplyGenericTo(opts *GenericProviderOptions) {
opts.DefaultNetwork = string(n)
opts.defaultNetwork = string(n)
}

// Deprecated: will be removed in the future.
func (n DefaultNetwork) ApplyDockerTo(opts *DockerProviderOptions) {
opts.DefaultNetwork = string(n)
opts.defaultNetwork = string(n)
}

// Deprecated: will be removed in the future
Expand Down
2 changes: 1 addition & 1 deletion provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type (
// GenericProviderOptions defines options applicable to all providers
GenericProviderOptions struct {
Logger Logging
DefaultNetwork string
defaultNetwork string
}

// GenericProviderOption defines a common interface to modify GenericProviderOptions
Expand Down
7 changes: 6 additions & 1 deletion reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,12 @@ func (r *reaperSpawner) newReaper(ctx context.Context, sessionID string, provide

// Attach reaper container to a requested network if it is specified
if p, ok := provider.(*DockerProvider); ok {
req.Networks = append(req.Networks, p.DefaultNetwork)
defaultNetwork, err := p.ensureDefaultNetwork(ctx)
if err != nil {
return nil, fmt.Errorf("ensure default network: %w", err)
}

req.Networks = append(req.Networks, defaultNetwork)
}

c, err := provider.RunContainer(ctx, req)
Expand Down