From 498bb61c4e314a8ecac4ba1fd2d47251ab4cb11c Mon Sep 17 00:00:00 2001 From: Alex Buchanan Date: Fri, 30 Aug 2024 13:25:55 -0700 Subject: [PATCH 1/7] fix: improve port checks --- internal/cmd/cmd.go | 2 +- internal/cmd/local/check.go | 96 ++++++------ internal/cmd/local/check_test.go | 138 ++++++++++++++++++ internal/cmd/local/docker/docker.go | 25 ---- internal/cmd/local/docker/docker_test.go | 131 +---------------- .../cmd/local/docker/dockertest/dockertest.go | 29 +++- internal/cmd/local/local_credentials.go | 2 +- internal/cmd/local/local_install.go | 22 +-- internal/cmd/local/local_status.go | 2 +- 9 files changed, 230 insertions(+), 217 deletions(-) diff --git a/internal/cmd/cmd.go b/internal/cmd/cmd.go index f61bac5..61ca8c5 100644 --- a/internal/cmd/cmd.go +++ b/internal/cmd/cmd.go @@ -42,7 +42,7 @@ This could be in indication that the ingress port is already in use by a differe The ingress port can be changed by passing the flag --port.` // helpPort is displayed if ErrPort is ever returned - helpPort = `An error occurred while verifying if the request port is available. + helpPort = `An error occurred while verifying if the requested port is available. This could be in indication that the ingress port is already in use by a different application. The ingress port can be changed by passing the flag --port.` ) diff --git a/internal/cmd/local/check.go b/internal/cmd/local/check.go index 2fad5a0..3c279b6 100644 --- a/internal/cmd/local/check.go +++ b/internal/cmd/local/check.go @@ -4,13 +4,15 @@ import ( "context" "errors" "fmt" - "io" "net" "net/http" + "os" + "runtime" + "strconv" + "syscall" "time" "github.com/airbytehq/abctl/internal/cmd/local/docker" - "github.com/airbytehq/abctl/internal/cmd/local/k8s" "github.com/airbytehq/abctl/internal/cmd/local/localerr" "github.com/airbytehq/abctl/internal/telemetry" "github.com/pterm/pterm" @@ -78,68 +80,74 @@ func portAvailable(ctx context.Context, port int) error { // net.Listen doesn't support providing a context lc := &net.ListenConfig{} listener, err := lc.Listen(ctx, "tcp", fmt.Sprintf("localhost:%d", port)) + if isErrorAddressAlreadyInUse(err) { + return fmt.Errorf("%w: port %d is already in use", localerr.ErrPort, port) + } if err != nil { - pterm.Debug.Println(fmt.Sprintf("Unable to listen on port '%d': %s", port, err)) - - // check if an existing airbyte installation is already listening on this port - req, errInner := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("http://localhost:%d/api/v1/instance_configuration", port), nil) - if errInner != nil { - pterm.Error.Printfln("Port %d request could not be created", port) - return fmt.Errorf("%w: unable to create request: %w", localerr.ErrPort, err) - } - - res, errInner := httpClient.Do(req) - if errInner != nil { - pterm.Error.Printfln("Port %d appears to already be in use", port) - return fmt.Errorf("%w: unable to send request: %w", localerr.ErrPort, err) - } - - if res.StatusCode == http.StatusOK { - pterm.Success.Printfln("Port %d appears to be running a previous Airbyte installation", port) - return nil - } - - // if we're here, we haven't been able to determine why this port may or may not be available - body, errInner := io.ReadAll(res.Body) - if errInner != nil { - pterm.Debug.Println(fmt.Sprintf("Unable to read response body: %s", errInner)) - } - pterm.Debug.Println(fmt.Sprintf( - "Unable to determine if port '%d' is in use:\n StatusCode: %d\n Body: %s", - port, res.StatusCode, body, - )) - - pterm.Error.Println(fmt.Sprintf( - "Unable to determine if port '%d' is available, consider specifying a different port", - port, - )) - return fmt.Errorf("unable to determine if port '%d' is available: %w", port, err) + return fmt.Errorf("%w: unable to determine if port '%d' is available: %w", localerr.ErrPort, port, err) } // if we're able to bind to the port (and then release it), it should be available defer func() { _ = listener.Close() }() - pterm.Success.Printfln("Port %d appears to be available", port) return nil } -func getPort(ctx context.Context, provider k8s.Provider) (int, error) { +func isErrorAddressAlreadyInUse(err error) bool { + var eOsSyscall *os.SyscallError + if !errors.As(err, &eOsSyscall) { + return false + } + var errErrno syscall.Errno // doesn't need a "*" (ptr) because it's already a ptr (uintptr) + if !errors.As(eOsSyscall, &errErrno) { + return false + } + if errors.Is(errErrno, syscall.EADDRINUSE) { + return true + } + const WSAEADDRINUSE = 10048 + if runtime.GOOS == "windows" && errErrno == WSAEADDRINUSE { + return true + } + return false +} + +func getPort(ctx context.Context, clusterName string) (int, error) { var err error if dockerClient == nil { dockerClient, err = docker.New(ctx) if err != nil { - pterm.Error.Printfln("Unable to connect to Docker daemon") return 0, fmt.Errorf("unable to connect to docker: %w", err) } } - clusterPort, err := dockerClient.Port(ctx, fmt.Sprintf("%s-control-plane", provider.ClusterName)) + container := fmt.Sprintf("%s-control-plane", clusterName) + + ci, err := dockerClient.Client.ContainerInspect(ctx, container) if err != nil { - pterm.Error.Printfln("Unable to determine docker port for cluster '%s'", provider.ClusterName) - return 0, errors.New("unable to determine port cluster was installed with") + return 0, fmt.Errorf("unable to inspect container: %w", err) + } + if ci.State == nil || ci.State.Status != "running" { + status := "unknown" + if ci.State != nil { + status = ci.State.Status + } + return 0, fmt.Errorf("container %q is not running (status = %q)", container, status) + } + + for _, bindings := range ci.HostConfig.PortBindings { + for _, ipPort := range bindings { + if ipPort.HostIP == "0.0.0.0" { + port, err := strconv.Atoi(ipPort.HostPort) + if err != nil { + return 0, fmt.Errorf("unable to convert host port %s to integer: %w", ipPort.HostPort, err) + } + return port, nil + } + } } - return clusterPort, nil + return 0, fmt.Errorf("no matching ports found on container %q", container) } diff --git a/internal/cmd/local/check_test.go b/internal/cmd/local/check_test.go index 504861a..3d59eab 100644 --- a/internal/cmd/local/check_test.go +++ b/internal/cmd/local/check_test.go @@ -13,7 +13,9 @@ import ( "github.com/airbytehq/abctl/internal/cmd/local/localerr" "github.com/airbytehq/abctl/internal/telemetry" "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/system" + "github.com/docker/go-connections/nat" "github.com/google/go-cmp/cmp" "github.com/google/uuid" ) @@ -113,6 +115,142 @@ func TestPortAvailable_Unavailable(t *testing.T) { } } +func TestGetPort_Found(t *testing.T) { + t.Cleanup(func() { + dockerClient = nil + }) + + dockerClient = &docker.Docker{ + Client: dockertest.MockClient{ + FnContainerInspect: func(ctx context.Context, containerID string) (types.ContainerJSON, error) { + return types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + State: &types.ContainerState{ + Status: "running", + }, + HostConfig: &container.HostConfig{ + PortBindings: nat.PortMap{ + "tcp/80": {{ + HostIP: "0.0.0.0", + HostPort: "8000", + }}, + }, + }, + }, + }, nil + }, + }, + } + + port, err := getPort(context.Background(), "test") + if err != nil { + t.Errorf("unexpected error: %s", err) + return + } + if port != 8000 { + t.Errorf("expected 8000 but got %d", port) + } +} + +func TestGetPort_NotRunning(t *testing.T) { + t.Cleanup(func() { + dockerClient = nil + }) + + dockerClient = &docker.Docker{ + Client: dockertest.MockClient{ + FnContainerInspect: func(ctx context.Context, containerID string) (types.ContainerJSON, error) { + return types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + State: &types.ContainerState{ + Status: "stopped", + }, + HostConfig: &container.HostConfig{ + PortBindings: nat.PortMap{ + "tcp/80": {{ + HostIP: "0.0.0.0", + HostPort: "8000", + }}, + }, + }, + }, + }, nil + }, + }, + } + + _, err := getPort(context.Background(), "test") + if err == nil { + t.Error("expected error") + } +} + +func TestGetPort_Missing(t *testing.T) { + t.Cleanup(func() { + dockerClient = nil + }) + + dockerClient = &docker.Docker{ + Client: dockertest.MockClient{ + FnContainerInspect: func(ctx context.Context, containerID string) (types.ContainerJSON, error) { + return types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + State: &types.ContainerState{ + Status: "running", + }, + HostConfig: &container.HostConfig{ + PortBindings: nat.PortMap{ + "tcp/80": {{ + HostIP: "1.2.3.4", + HostPort: "8000", + }}, + }, + }, + }, + }, nil + }, + }, + } + + _, err := getPort(context.Background(), "test") + if err == nil { + t.Error("expected error") + } +} + +func TestGetPort_Invalid(t *testing.T) { + t.Cleanup(func() { + dockerClient = nil + }) + + dockerClient = &docker.Docker{ + Client: dockertest.MockClient{ + FnContainerInspect: func(ctx context.Context, containerID string) (types.ContainerJSON, error) { + return types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + State: &types.ContainerState{ + Status: "running", + }, + HostConfig: &container.HostConfig{ + PortBindings: nat.PortMap{ + "tcp/80": {{ + HostIP: "1.2.3.4", + HostPort: "NaN", + }}, + }, + }, + }, + }, nil + }, + }, + } + + _, err := getPort(context.Background(), "test") + if err == nil { + t.Error("expected error") + } +} + // port returns the port from a string value in the format of "ipv4:port" or "ip::v6:port" func port(s string) int { vals := strings.Split(s, ":") diff --git a/internal/cmd/local/docker/docker.go b/internal/cmd/local/docker/docker.go index 0345a76..9fb5500 100644 --- a/internal/cmd/local/docker/docker.go +++ b/internal/cmd/local/docker/docker.go @@ -2,11 +2,9 @@ package docker import ( "context" - "errors" "fmt" "io" "runtime" - "strconv" "github.com/airbytehq/abctl/internal/cmd/local/localerr" "github.com/airbytehq/abctl/internal/cmd/local/paths" @@ -153,26 +151,3 @@ func (d *Docker) Version(ctx context.Context) (Version, error) { Platform: ver.Platform.Name, }, nil } - -// Port returns the host-port the underlying docker process is currently bound to, for the given container. -// It determines this by walking through all the ports on the container and finding the one that is bound to ip 0.0.0.0. -func (d *Docker) Port(ctx context.Context, container string) (int, error) { - ci, err := d.Client.ContainerInspect(ctx, container) - if err != nil { - return 0, fmt.Errorf("unable to inspect container: %w", err) - } - - for _, bindings := range ci.NetworkSettings.Ports { - for _, ipPort := range bindings { - if ipPort.HostIP == "0.0.0.0" { - port, err := strconv.Atoi(ipPort.HostPort) - if err != nil { - return 0, fmt.Errorf("unable to convert host port %s to integer: %w", ipPort.HostPort, err) - } - return port, nil - } - } - } - - return 0, errors.New("unable to determine port for container") -} diff --git a/internal/cmd/local/docker/docker_test.go b/internal/cmd/local/docker/docker_test.go index 3f944b6..342471b 100644 --- a/internal/cmd/local/docker/docker_test.go +++ b/internal/cmd/local/docker/docker_test.go @@ -9,7 +9,6 @@ import ( "github.com/airbytehq/abctl/internal/cmd/local/localerr" "github.com/docker/docker/api/types" "github.com/docker/docker/client" - "github.com/docker/go-connections/nat" "github.com/google/go-cmp/cmp" ) @@ -18,12 +17,6 @@ import ( // avoid a circular dependency. var _ Client = (*dockertest.MockClient)(nil) -var expVersion = Version{ - Version: "version", - Arch: "arch", - Platform: "platform name", -} - func TestNewWithOptions(t *testing.T) { tests := []struct { name string @@ -49,10 +42,7 @@ func TestNewWithOptions(t *testing.T) { pingCalled := 0 p := mockPinger{ - MockClient: dockertest.MockClient{ - FnContainerInspect: defaultContainerInspect, - FnServerVersion: defaultServerVersion, - }, + MockClient: dockertest.NewMockClient(), ping: func(ctx context.Context) (types.Ping, error) { pingCalled++ return types.Ping{}, nil @@ -78,19 +68,11 @@ func TestNewWithOptions(t *testing.T) { t.Error("ping called incorrect number of times", d) } - port, err := cli.Port(ctx, "container") - if err != nil { - t.Fatal("failed fetching port", err) - } - if d := cmp.Diff(12345, port); d != "" { - t.Error("unexpected port", d) - } - ver, err := cli.Version(ctx) if err != nil { t.Fatal("failed fetching version", err) } - if d := cmp.Diff(expVersion, ver); d != "" { + if d := cmp.Diff(dockertest.DefaultServerVersion, ver); d != "" { t.Error("unexpected version", d) } }) @@ -245,92 +227,6 @@ func TestVersion_Err(t *testing.T) { } } -func TestPort_Missing(t *testing.T) { - ctx := context.Background() - p := mockPinger{ - MockClient: dockertest.MockClient{ - FnContainerInspect: func(ctx context.Context, containerID string) (types.ContainerJSON, error) { - return types.ContainerJSON{ - NetworkSettings: &types.NetworkSettings{ - NetworkSettingsBase: types.NetworkSettingsBase{ - Ports: map[nat.Port][]nat.PortBinding{}, - }, - }, - }, nil - }, - }, - } - - f := func(opts ...client.Opt) (pinger, error) { return p, nil } - - cli, err := newWithOptions(ctx, f, "darwin") - if err != nil { - t.Fatal("failed creating client", err) - } - - _, err = cli.Port(ctx, "container") - if err == nil { - t.Error("expected error") - } -} - -func TestPort_Invalid(t *testing.T) { - ctx := context.Background() - p := mockPinger{ - MockClient: dockertest.MockClient{ - FnContainerInspect: func(ctx context.Context, containerID string) (types.ContainerJSON, error) { - return types.ContainerJSON{ - NetworkSettings: &types.NetworkSettings{ - NetworkSettingsBase: types.NetworkSettingsBase{ - Ports: map[nat.Port][]nat.PortBinding{ - "12345": {{ - HostIP: "0.0.0.0", - HostPort: "NaN", - }}, - }, - }, - }, - }, nil - }, - }, - } - - f := func(opts ...client.Opt) (pinger, error) { return p, nil } - - cli, err := newWithOptions(ctx, f, "darwin") - if err != nil { - t.Fatal("failed creating client", err) - } - - _, err = cli.Port(ctx, "container") - if err == nil { - t.Error("expected error") - } -} - -func TestPort_Err(t *testing.T) { - ctx := context.Background() - p := mockPinger{ - MockClient: dockertest.MockClient{ - FnContainerInspect: func(ctx context.Context, containerID string) (types.ContainerJSON, error) { - return types.ContainerJSON{}, errors.New("test error") - }, - }, - } - - f := func(opts ...client.Opt) (pinger, error) { return p, nil } - - cli, err := newWithOptions(ctx, f, "darwin") - if err != nil { - t.Fatal("failed creating client", err) - } - - _, err = cli.Port(ctx, "container") - if err == nil { - t.Error("expected error") - } -} - // --- mocks var _ pinger = (*mockPinger)(nil) @@ -346,26 +242,3 @@ func (m mockPinger) Ping(ctx context.Context) (types.Ping, error) { return m.ping(ctx) } - -func defaultContainerInspect(_ context.Context, _ string) (types.ContainerJSON, error) { - return types.ContainerJSON{ - NetworkSettings: &types.NetworkSettings{ - NetworkSettingsBase: types.NetworkSettingsBase{ - Ports: map[nat.Port][]nat.PortBinding{ - "12345": {{ - HostIP: "0.0.0.0", - HostPort: "12345", - }}, - }, - }, - }, - }, nil -} - -func defaultServerVersion(_ context.Context) (types.Version, error) { - return types.Version{ - Version: expVersion.Version, - Arch: expVersion.Arch, - Platform: struct{ Name string }{Name: expVersion.Platform}, - }, nil -} diff --git a/internal/cmd/local/docker/dockertest/dockertest.go b/internal/cmd/local/docker/dockertest/dockertest.go index 7229eb2..42412ae 100644 --- a/internal/cmd/local/docker/dockertest/dockertest.go +++ b/internal/cmd/local/docker/dockertest/dockertest.go @@ -10,6 +10,7 @@ import ( "github.com/docker/docker/api/types/network" "github.com/docker/docker/api/types/system" "github.com/docker/docker/api/types/volume" + "github.com/docker/go-connections/nat" ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) @@ -27,7 +28,14 @@ type MockClient struct { FnImagePull func(ctx context.Context, refStr string, options image.PullOptions) (io.ReadCloser, error) FnServerVersion func(ctx context.Context) (types.Version, error) FnVolumeInspect func(ctx context.Context, volumeID string) (volume.Volume, error) - FnInfo func(ctx context.Context) (system.Info, error) + FnInfo func(ctx context.Context) (system.Info, error) +} + +func NewMockClient() MockClient { + return MockClient{ + FnContainerInspect: func(_ context.Context, _ string) (types.ContainerJSON, error) { return DefaultContainerInspect, nil }, + FnServerVersion: func(_ context.Context) (types.Version, error) { return DefaultServerVersion, nil }, + } } func (m MockClient) ContainerCreate(ctx context.Context, config *container.Config, hostConfig *container.HostConfig, networkingConfig *network.NetworkingConfig, platform *ocispec.Platform, containerName string) (container.CreateResponse, error) { @@ -85,3 +93,22 @@ func (m MockClient) VolumeInspect(ctx context.Context, volumeID string) (volume. func (m MockClient) Info(ctx context.Context) (system.Info, error) { return m.FnInfo(ctx) } + +var DefaultContainerInspect = types.ContainerJSON{ + NetworkSettings: &types.NetworkSettings{ + NetworkSettingsBase: types.NetworkSettingsBase{ + Ports: map[nat.Port][]nat.PortBinding{ + "12345": {{ + HostIP: "0.0.0.0", + HostPort: "12345", + }}, + }, + }, + }, +} + +var DefaultServerVersion = types.Version{ + Version: "version", + Arch: "arch", + Platform: struct{ Name string }{Name: "platform name"}, +} diff --git a/internal/cmd/local/local_credentials.go b/internal/cmd/local/local_credentials.go index 9ba9c3c..0be3baa 100644 --- a/internal/cmd/local/local_credentials.go +++ b/internal/cmd/local/local_credentials.go @@ -45,7 +45,7 @@ func (cc *CredentialsCmd) Run(ctx context.Context, provider k8s.Provider, telCli clientId := string(secret.Data[secretClientID]) clientSecret := string(secret.Data[secretClientSecret]) - port, err := getPort(ctx, provider) + port, err := getPort(ctx, provider.ClusterName) if err != nil { return err } diff --git a/internal/cmd/local/local_install.go b/internal/cmd/local/local_install.go index 9d8d27b..6fab843 100644 --- a/internal/cmd/local/local_install.go +++ b/internal/cmd/local/local_install.go @@ -5,7 +5,6 @@ import ( "fmt" "strings" - "github.com/airbytehq/abctl/internal/cmd/local/docker" "github.com/airbytehq/abctl/internal/cmd/local/k8s" "github.com/airbytehq/abctl/internal/cmd/local/local" "github.com/airbytehq/abctl/internal/telemetry" @@ -40,11 +39,6 @@ func (i *InstallCmd) Run(ctx context.Context, provider k8s.Provider, telClient t return fmt.Errorf("unable to determine docker installation status: %w", err) } - spinner.UpdateText(fmt.Sprintf("Checking if port %d is available", i.Port)) - if err := portAvailable(ctx, i.Port); err != nil { - return fmt.Errorf("port %d is not available: %w", i.Port, err) - } - return telClient.Wrap(ctx, telemetry.Install, func() error { spinner.UpdateText(fmt.Sprintf("Checking for existing Kubernetes cluster '%s'", provider.ClusterName)) @@ -61,16 +55,8 @@ func (i *InstallCmd) Run(ctx context.Context, provider k8s.Provider, telClient t // only for kind do we need to check the existing port if provider.Name == k8s.Kind { - if dockerClient == nil { - dockerClient, err = docker.New(ctx) - if err != nil { - pterm.Error.Printfln("Unable to connect to Docker daemon") - return fmt.Errorf("unable to connect to docker: %w", err) - } - } - providedPort := i.Port - i.Port, err = dockerClient.Port(ctx, fmt.Sprintf("%s-control-plane", provider.ClusterName)) + i.Port, err = getPort(ctx, provider.ClusterName) if err != nil { pterm.Warning.Printfln("Unable to determine which port the existing cluster was configured to use.\n" + "Installation will continue but may ultimately fail, in which case it will be necessarily to uninstall first.") @@ -94,6 +80,12 @@ func (i *InstallCmd) Run(ctx context.Context, provider k8s.Provider, telClient t return err } + spinner.UpdateText(fmt.Sprintf("Checking if port %d is available", i.Port)) + if err := portAvailable(ctx, i.Port); err != nil { + return err + } + pterm.Success.Printfln("Port %d appears to be available", i.Port) + if err := cluster.Create(i.Port, extraVolumeMounts); err != nil { pterm.Error.Printfln("Cluster '%s' could not be created", provider.ClusterName) return err diff --git a/internal/cmd/local/local_status.go b/internal/cmd/local/local_status.go index bee0043..d8d484f 100644 --- a/internal/cmd/local/local_status.go +++ b/internal/cmd/local/local_status.go @@ -53,7 +53,7 @@ func status(ctx context.Context, provider k8s.Provider, telClient telemetry.Clie pterm.Success.Printfln("Existing cluster '%s' found", provider.ClusterName) spinner.UpdateText(fmt.Sprintf("Validating existing cluster '%s'", provider.ClusterName)) - port, err := getPort(ctx, provider) + port, err := getPort(ctx, provider.ClusterName) if err != nil { return err } From a8fd7bf6ce7ea822988db3699186b7d2cafb42c5 Mon Sep 17 00:00:00 2001 From: Alex Buchanan Date: Fri, 30 Aug 2024 13:53:57 -0700 Subject: [PATCH 2/7] fail install if getPort() on existing cluster fails --- internal/cmd/local/check.go | 31 +++++++++++++++++++++++++---- internal/cmd/local/check_test.go | 31 ++++++++++++++++++++++++----- internal/cmd/local/local_install.go | 7 ++++--- 3 files changed, 57 insertions(+), 12 deletions(-) diff --git a/internal/cmd/local/check.go b/internal/cmd/local/check.go index 3c279b6..77b3890 100644 --- a/internal/cmd/local/check.go +++ b/internal/cmd/local/check.go @@ -127,14 +127,14 @@ func getPort(ctx context.Context, clusterName string) (int, error) { ci, err := dockerClient.Client.ContainerInspect(ctx, container) if err != nil { - return 0, fmt.Errorf("unable to inspect container: %w", err) + return 0, fmt.Errorf("%w: %w", ErrUnableToInspect, err) } if ci.State == nil || ci.State.Status != "running" { status := "unknown" if ci.State != nil { status = ci.State.Status } - return 0, fmt.Errorf("container %q is not running (status = %q)", container, status) + return 0, ContainerNotRunningError{container, status} } for _, bindings := range ci.HostConfig.PortBindings { @@ -142,12 +142,35 @@ func getPort(ctx context.Context, clusterName string) (int, error) { if ipPort.HostIP == "0.0.0.0" { port, err := strconv.Atoi(ipPort.HostPort) if err != nil { - return 0, fmt.Errorf("unable to convert host port %s to integer: %w", ipPort.HostPort, err) + return 0, InvalidPortError{ipPort.HostPort, err} } return port, nil } } } - return 0, fmt.Errorf("no matching ports found on container %q", container) + return 0, fmt.Errorf("%w on container %q", ErrPortNotFound, container) } + +var ErrPortNotFound = errors.New("no matching port found") +var ErrUnableToInspect = errors.New("unable to inspect container") + +type ContainerNotRunningError struct { + Container string + Status string +} + +func (e ContainerNotRunningError) Error() string { + return fmt.Sprintf("container %q is not running (status = %q)", e.Container, e.Status) +} + +type InvalidPortError struct { + Port string + Inner error +} +func (e InvalidPortError) Unwrap() error { + return e.Inner +} +func (e InvalidPortError) Error() string { + return fmt.Sprintf("unable to convert host port %s to integer: %s", e.Port, e.Inner) +} \ No newline at end of file diff --git a/internal/cmd/local/check_test.go b/internal/cmd/local/check_test.go index 3d59eab..f9dfc37 100644 --- a/internal/cmd/local/check_test.go +++ b/internal/cmd/local/check_test.go @@ -180,8 +180,9 @@ func TestGetPort_NotRunning(t *testing.T) { } _, err := getPort(context.Background(), "test") - if err == nil { - t.Error("expected error") + + if !errors.Is(err, ContainerNotRunningError{"test-control-plane", "stopped"}) { + t.Errorf("expected container not running error but got %v", err) } } @@ -234,7 +235,7 @@ func TestGetPort_Invalid(t *testing.T) { HostConfig: &container.HostConfig{ PortBindings: nat.PortMap{ "tcp/80": {{ - HostIP: "1.2.3.4", + HostIP: "0.0.0.0", HostPort: "NaN", }}, }, @@ -246,8 +247,28 @@ func TestGetPort_Invalid(t *testing.T) { } _, err := getPort(context.Background(), "test") - if err == nil { - t.Error("expected error") + var invalidPortErr InvalidPortError + if !errors.As(err, &invalidPortErr) { + t.Errorf("expected invalid port error but got %v", err) + } +} + +func TestGetPort_InpsectErr(t *testing.T) { + t.Cleanup(func() { + dockerClient = nil + }) + + dockerClient = &docker.Docker{ + Client: dockertest.MockClient{ + FnContainerInspect: func(ctx context.Context, containerID string) (types.ContainerJSON, error) { + return types.ContainerJSON{}, errors.New("test err") + }, + }, + } + + _, err := getPort(context.Background(), "test") + if !errors.Is(err, ErrUnableToInspect) { + t.Errorf("expected ErrUnableToInspect but got %v", err) } } diff --git a/internal/cmd/local/local_install.go b/internal/cmd/local/local_install.go index 6fab843..906a670 100644 --- a/internal/cmd/local/local_install.go +++ b/internal/cmd/local/local_install.go @@ -58,10 +58,11 @@ func (i *InstallCmd) Run(ctx context.Context, provider k8s.Provider, telClient t providedPort := i.Port i.Port, err = getPort(ctx, provider.ClusterName) if err != nil { - pterm.Warning.Printfln("Unable to determine which port the existing cluster was configured to use.\n" + - "Installation will continue but may ultimately fail, in which case it will be necessarily to uninstall first.") + return err + // pterm.Warning.Printfln("Unable to determine which port the existing cluster was configured to use.\n" + + // "Installation will continue but may ultimately fail, in which case it will be necessarily to uninstall first.") // since we can't verify the port is correct, push forward with the provided port - i.Port = providedPort + // i.Port = providedPort } if providedPort != i.Port { pterm.Warning.Printfln("The existing cluster was found to be using port %d, which differs from the provided port %d.\n"+ From b4668202748fc8f3c348ac018356ca83eb2b9bc3 Mon Sep 17 00:00:00 2001 From: Alex Buchanan Date: Fri, 30 Aug 2024 14:18:05 -0700 Subject: [PATCH 3/7] fix test --- internal/cmd/local/docker/docker_test.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/internal/cmd/local/docker/docker_test.go b/internal/cmd/local/docker/docker_test.go index 342471b..26f4600 100644 --- a/internal/cmd/local/docker/docker_test.go +++ b/internal/cmd/local/docker/docker_test.go @@ -35,6 +35,11 @@ func TestNewWithOptions(t *testing.T) { goos: "linux", }, } + expVersion := Version{ + Platform: dockertest.DefaultServerVersion.Platform.Name, + Arch: dockertest.DefaultServerVersion.Arch, + Version: dockertest.DefaultServerVersion.Version, + } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -72,7 +77,7 @@ func TestNewWithOptions(t *testing.T) { if err != nil { t.Fatal("failed fetching version", err) } - if d := cmp.Diff(dockertest.DefaultServerVersion, ver); d != "" { + if d := cmp.Diff(expVersion, ver); d != "" { t.Error("unexpected version", d) } }) From 80b49517e219161211970b99904b092b7908df67 Mon Sep 17 00:00:00 2001 From: Alex Buchanan Date: Fri, 30 Aug 2024 14:40:03 -0700 Subject: [PATCH 4/7] clean up dead code --- internal/cmd/local/check.go | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/internal/cmd/local/check.go b/internal/cmd/local/check.go index 77b3890..b5a9488 100644 --- a/internal/cmd/local/check.go +++ b/internal/cmd/local/check.go @@ -5,12 +5,10 @@ import ( "errors" "fmt" "net" - "net/http" "os" "runtime" "strconv" "syscall" - "time" "github.com/airbytehq/abctl/internal/cmd/local/docker" "github.com/airbytehq/abctl/internal/cmd/local/localerr" @@ -55,14 +53,6 @@ func dockerInstalled(ctx context.Context, telClient telemetry.Client) (docker.Ve } -// doer interface for testing purposes -type doer interface { - Do(req *http.Request) (*http.Response, error) -} - -// httpClient can be overwritten for testing purposes -var httpClient doer = &http.Client{Timeout: 3 * time.Second} - // portAvailable returns a nil error if the port is available, or already is use by Airbyte, otherwise returns an error. // // This function works by attempting to establish a tcp listener on a port. @@ -157,7 +147,7 @@ var ErrUnableToInspect = errors.New("unable to inspect container") type ContainerNotRunningError struct { Container string - Status string + Status string } func (e ContainerNotRunningError) Error() string { @@ -165,12 +155,13 @@ func (e ContainerNotRunningError) Error() string { } type InvalidPortError struct { - Port string + Port string Inner error } + func (e InvalidPortError) Unwrap() error { return e.Inner } func (e InvalidPortError) Error() string { return fmt.Sprintf("unable to convert host port %s to integer: %s", e.Port, e.Inner) -} \ No newline at end of file +} From ff0118753cd509b5ad09e49c79e361df4c47e77f Mon Sep 17 00:00:00 2001 From: Alex Buchanan Date: Tue, 3 Sep 2024 10:13:12 -0700 Subject: [PATCH 5/7] cleanup --- internal/cmd/cmd.go | 31 ----------------------------- internal/cmd/local/local_install.go | 4 ---- 2 files changed, 35 deletions(-) diff --git a/internal/cmd/cmd.go b/internal/cmd/cmd.go index c1630f9..cef2799 100644 --- a/internal/cmd/cmd.go +++ b/internal/cmd/cmd.go @@ -16,37 +16,6 @@ import ( "k8s.io/client-go/tools/clientcmd" ) -// Help messages to display for specific error situations. -const ( - // helpAirbyteDir is displayed if ErrAirbyteDir is ever returned - helpAirbyteDir = `The ~/.airbyte directory is inaccessible. -You may need to remove this directory before trying your command again.` - - // helpCluster is displayed if ErrClusterNotFound is ever returned - helpCluster = `No cluster was found. If this is unexpected, -you may need to run the "local install" command again.` - - // helpDocker is displayed if ErrDocker is ever returned - helpDocker = `An error occurred while communicating with the Docker daemon. -Ensure that Docker is running and is accessible. You may need to upgrade to a newer version of Docker. -For additional help please visit https://docs.docker.com/get-docker/` - - // helpKubernetes is displayed if ErrKubernetes is ever returned - helpKubernetes = `An error occurred while communicating with the Kubernetes cluster. -If this error persists, you may need to run the uninstall command before attempting to run -the install command again.` - - // helpIngress is displayed if ErrIngress is ever returned - helpIngress = `An error occurred while configuring ingress. -This could be in indication that the ingress port is already in use by a different application. -The ingress port can be changed by passing the flag --port.` - - // helpPort is displayed if ErrPort is ever returned - helpPort = `An error occurred while verifying if the requested port is available. -This could be in indication that the ingress port is already in use by a different application. -The ingress port can be changed by passing the flag --port.` -) - func HandleErr(err error) { if err == nil { return diff --git a/internal/cmd/local/local_install.go b/internal/cmd/local/local_install.go index 906a670..17aca77 100644 --- a/internal/cmd/local/local_install.go +++ b/internal/cmd/local/local_install.go @@ -59,10 +59,6 @@ func (i *InstallCmd) Run(ctx context.Context, provider k8s.Provider, telClient t i.Port, err = getPort(ctx, provider.ClusterName) if err != nil { return err - // pterm.Warning.Printfln("Unable to determine which port the existing cluster was configured to use.\n" + - // "Installation will continue but may ultimately fail, in which case it will be necessarily to uninstall first.") - // since we can't verify the port is correct, push forward with the provided port - // i.Port = providedPort } if providedPort != i.Port { pterm.Warning.Printfln("The existing cluster was found to be using port %d, which differs from the provided port %d.\n"+ From bf7f6410704cf6dde36cd77bcea028449bae7d1f Mon Sep 17 00:00:00 2001 From: Alex Buchanan Date: Tue, 3 Sep 2024 10:28:34 -0700 Subject: [PATCH 6/7] fix spinner update --- internal/cmd/local/local_install.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cmd/local/local_install.go b/internal/cmd/local/local_install.go index 17aca77..a9cf800 100644 --- a/internal/cmd/local/local_install.go +++ b/internal/cmd/local/local_install.go @@ -70,7 +70,6 @@ func (i *InstallCmd) Run(ctx context.Context, provider k8s.Provider, telClient t } else { // no existing cluster, need to create one pterm.Info.Println(fmt.Sprintf("No existing cluster found, cluster '%s' will be created", provider.ClusterName)) - spinner.UpdateText(fmt.Sprintf("Creating cluster '%s'", provider.ClusterName)) extraVolumeMounts, err := parseVolumeMounts(i.Volume) if err != nil { @@ -82,6 +81,7 @@ func (i *InstallCmd) Run(ctx context.Context, provider k8s.Provider, telClient t return err } pterm.Success.Printfln("Port %d appears to be available", i.Port) + spinner.UpdateText(fmt.Sprintf("Creating cluster '%s'", provider.ClusterName)) if err := cluster.Create(i.Port, extraVolumeMounts); err != nil { pterm.Error.Printfln("Cluster '%s' could not be created", provider.ClusterName) From 5b91fafd7e04cce431aeb6f65b191ea5073d91dc Mon Sep 17 00:00:00 2001 From: Alex Buchanan Date: Tue, 3 Sep 2024 11:15:05 -0700 Subject: [PATCH 7/7] add struct keys --- internal/cmd/local/check.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/cmd/local/check.go b/internal/cmd/local/check.go index b5a9488..eee7e51 100644 --- a/internal/cmd/local/check.go +++ b/internal/cmd/local/check.go @@ -124,7 +124,7 @@ func getPort(ctx context.Context, clusterName string) (int, error) { if ci.State != nil { status = ci.State.Status } - return 0, ContainerNotRunningError{container, status} + return 0, ContainerNotRunningError{Container: container, Status: status} } for _, bindings := range ci.HostConfig.PortBindings { @@ -132,7 +132,7 @@ func getPort(ctx context.Context, clusterName string) (int, error) { if ipPort.HostIP == "0.0.0.0" { port, err := strconv.Atoi(ipPort.HostPort) if err != nil { - return 0, InvalidPortError{ipPort.HostPort, err} + return 0, InvalidPortError{Port: ipPort.HostPort, Inner: err} } return port, nil }