diff --git a/internal/cmd/local/check.go b/internal/cmd/local/check.go index 2fad5a0..eee7e51 100644 --- a/internal/cmd/local/check.go +++ b/internal/cmd/local/check.go @@ -4,13 +4,13 @@ import ( "context" "errors" "fmt" - "io" "net" - "net/http" - "time" + "os" + "runtime" + "strconv" + "syscall" "github.com/airbytehq/abctl/internal/cmd/local/docker" - "github.com/airbytehq/abctl/internal/cmd/local/k8s" "github.com/airbytehq/abctl/internal/cmd/local/localerr" "github.com/airbytehq/abctl/internal/telemetry" "github.com/pterm/pterm" @@ -53,14 +53,6 @@ func dockerInstalled(ctx context.Context, telClient telemetry.Client) (docker.Ve } -// doer interface for testing purposes -type doer interface { - Do(req *http.Request) (*http.Response, error) -} - -// httpClient can be overwritten for testing purposes -var httpClient doer = &http.Client{Timeout: 3 * time.Second} - // portAvailable returns a nil error if the port is available, or already is use by Airbyte, otherwise returns an error. // // This function works by attempting to establish a tcp listener on a port. @@ -78,68 +70,98 @@ func portAvailable(ctx context.Context, port int) error { // net.Listen doesn't support providing a context lc := &net.ListenConfig{} listener, err := lc.Listen(ctx, "tcp", fmt.Sprintf("localhost:%d", port)) + if isErrorAddressAlreadyInUse(err) { + return fmt.Errorf("%w: port %d is already in use", localerr.ErrPort, port) + } if err != nil { - pterm.Debug.Println(fmt.Sprintf("Unable to listen on port '%d': %s", port, err)) - - // check if an existing airbyte installation is already listening on this port - req, errInner := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("http://localhost:%d/api/v1/instance_configuration", port), nil) - if errInner != nil { - pterm.Error.Printfln("Port %d request could not be created", port) - return fmt.Errorf("%w: unable to create request: %w", localerr.ErrPort, err) - } - - res, errInner := httpClient.Do(req) - if errInner != nil { - pterm.Error.Printfln("Port %d appears to already be in use", port) - return fmt.Errorf("%w: unable to send request: %w", localerr.ErrPort, err) - } - - if res.StatusCode == http.StatusOK { - pterm.Success.Printfln("Port %d appears to be running a previous Airbyte installation", port) - return nil - } - - // if we're here, we haven't been able to determine why this port may or may not be available - body, errInner := io.ReadAll(res.Body) - if errInner != nil { - pterm.Debug.Println(fmt.Sprintf("Unable to read response body: %s", errInner)) - } - pterm.Debug.Println(fmt.Sprintf( - "Unable to determine if port '%d' is in use:\n StatusCode: %d\n Body: %s", - port, res.StatusCode, body, - )) - - pterm.Error.Println(fmt.Sprintf( - "Unable to determine if port '%d' is available, consider specifying a different port", - port, - )) - return fmt.Errorf("unable to determine if port '%d' is available: %w", port, err) + return fmt.Errorf("%w: unable to determine if port '%d' is available: %w", localerr.ErrPort, port, err) } // if we're able to bind to the port (and then release it), it should be available defer func() { _ = listener.Close() }() - pterm.Success.Printfln("Port %d appears to be available", port) return nil } -func getPort(ctx context.Context, provider k8s.Provider) (int, error) { +func isErrorAddressAlreadyInUse(err error) bool { + var eOsSyscall *os.SyscallError + if !errors.As(err, &eOsSyscall) { + return false + } + var errErrno syscall.Errno // doesn't need a "*" (ptr) because it's already a ptr (uintptr) + if !errors.As(eOsSyscall, &errErrno) { + return false + } + if errors.Is(errErrno, syscall.EADDRINUSE) { + return true + } + const WSAEADDRINUSE = 10048 + if runtime.GOOS == "windows" && errErrno == WSAEADDRINUSE { + return true + } + return false +} + +func getPort(ctx context.Context, clusterName string) (int, error) { var err error if dockerClient == nil { dockerClient, err = docker.New(ctx) if err != nil { - pterm.Error.Printfln("Unable to connect to Docker daemon") return 0, fmt.Errorf("unable to connect to docker: %w", err) } } - clusterPort, err := dockerClient.Port(ctx, fmt.Sprintf("%s-control-plane", provider.ClusterName)) + container := fmt.Sprintf("%s-control-plane", clusterName) + + ci, err := dockerClient.Client.ContainerInspect(ctx, container) if err != nil { - pterm.Error.Printfln("Unable to determine docker port for cluster '%s'", provider.ClusterName) - return 0, errors.New("unable to determine port cluster was installed with") + return 0, fmt.Errorf("%w: %w", ErrUnableToInspect, err) } + if ci.State == nil || ci.State.Status != "running" { + status := "unknown" + if ci.State != nil { + status = ci.State.Status + } + return 0, ContainerNotRunningError{Container: container, Status: status} + } + + for _, bindings := range ci.HostConfig.PortBindings { + for _, ipPort := range bindings { + if ipPort.HostIP == "0.0.0.0" { + port, err := strconv.Atoi(ipPort.HostPort) + if err != nil { + return 0, InvalidPortError{Port: ipPort.HostPort, Inner: err} + } + return port, nil + } + } + } + + return 0, fmt.Errorf("%w on container %q", ErrPortNotFound, container) +} - return clusterPort, nil +var ErrPortNotFound = errors.New("no matching port found") +var ErrUnableToInspect = errors.New("unable to inspect container") + +type ContainerNotRunningError struct { + Container string + Status string +} + +func (e ContainerNotRunningError) Error() string { + return fmt.Sprintf("container %q is not running (status = %q)", e.Container, e.Status) +} + +type InvalidPortError struct { + Port string + Inner error +} + +func (e InvalidPortError) Unwrap() error { + return e.Inner +} +func (e InvalidPortError) Error() string { + return fmt.Sprintf("unable to convert host port %s to integer: %s", e.Port, e.Inner) } diff --git a/internal/cmd/local/check_test.go b/internal/cmd/local/check_test.go index 504861a..f9dfc37 100644 --- a/internal/cmd/local/check_test.go +++ b/internal/cmd/local/check_test.go @@ -13,7 +13,9 @@ import ( "github.com/airbytehq/abctl/internal/cmd/local/localerr" "github.com/airbytehq/abctl/internal/telemetry" "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/system" + "github.com/docker/go-connections/nat" "github.com/google/go-cmp/cmp" "github.com/google/uuid" ) @@ -113,6 +115,163 @@ func TestPortAvailable_Unavailable(t *testing.T) { } } +func TestGetPort_Found(t *testing.T) { + t.Cleanup(func() { + dockerClient = nil + }) + + dockerClient = &docker.Docker{ + Client: dockertest.MockClient{ + FnContainerInspect: func(ctx context.Context, containerID string) (types.ContainerJSON, error) { + return types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + State: &types.ContainerState{ + Status: "running", + }, + HostConfig: &container.HostConfig{ + PortBindings: nat.PortMap{ + "tcp/80": {{ + HostIP: "0.0.0.0", + HostPort: "8000", + }}, + }, + }, + }, + }, nil + }, + }, + } + + port, err := getPort(context.Background(), "test") + if err != nil { + t.Errorf("unexpected error: %s", err) + return + } + if port != 8000 { + t.Errorf("expected 8000 but got %d", port) + } +} + +func TestGetPort_NotRunning(t *testing.T) { + t.Cleanup(func() { + dockerClient = nil + }) + + dockerClient = &docker.Docker{ + Client: dockertest.MockClient{ + FnContainerInspect: func(ctx context.Context, containerID string) (types.ContainerJSON, error) { + return types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + State: &types.ContainerState{ + Status: "stopped", + }, + HostConfig: &container.HostConfig{ + PortBindings: nat.PortMap{ + "tcp/80": {{ + HostIP: "0.0.0.0", + HostPort: "8000", + }}, + }, + }, + }, + }, nil + }, + }, + } + + _, err := getPort(context.Background(), "test") + + if !errors.Is(err, ContainerNotRunningError{"test-control-plane", "stopped"}) { + t.Errorf("expected container not running error but got %v", err) + } +} + +func TestGetPort_Missing(t *testing.T) { + t.Cleanup(func() { + dockerClient = nil + }) + + dockerClient = &docker.Docker{ + Client: dockertest.MockClient{ + FnContainerInspect: func(ctx context.Context, containerID string) (types.ContainerJSON, error) { + return types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + State: &types.ContainerState{ + Status: "running", + }, + HostConfig: &container.HostConfig{ + PortBindings: nat.PortMap{ + "tcp/80": {{ + HostIP: "1.2.3.4", + HostPort: "8000", + }}, + }, + }, + }, + }, nil + }, + }, + } + + _, err := getPort(context.Background(), "test") + if err == nil { + t.Error("expected error") + } +} + +func TestGetPort_Invalid(t *testing.T) { + t.Cleanup(func() { + dockerClient = nil + }) + + dockerClient = &docker.Docker{ + Client: dockertest.MockClient{ + FnContainerInspect: func(ctx context.Context, containerID string) (types.ContainerJSON, error) { + return types.ContainerJSON{ + ContainerJSONBase: &types.ContainerJSONBase{ + State: &types.ContainerState{ + Status: "running", + }, + HostConfig: &container.HostConfig{ + PortBindings: nat.PortMap{ + "tcp/80": {{ + HostIP: "0.0.0.0", + HostPort: "NaN", + }}, + }, + }, + }, + }, nil + }, + }, + } + + _, err := getPort(context.Background(), "test") + var invalidPortErr InvalidPortError + if !errors.As(err, &invalidPortErr) { + t.Errorf("expected invalid port error but got %v", err) + } +} + +func TestGetPort_InpsectErr(t *testing.T) { + t.Cleanup(func() { + dockerClient = nil + }) + + dockerClient = &docker.Docker{ + Client: dockertest.MockClient{ + FnContainerInspect: func(ctx context.Context, containerID string) (types.ContainerJSON, error) { + return types.ContainerJSON{}, errors.New("test err") + }, + }, + } + + _, err := getPort(context.Background(), "test") + if !errors.Is(err, ErrUnableToInspect) { + t.Errorf("expected ErrUnableToInspect but got %v", err) + } +} + // port returns the port from a string value in the format of "ipv4:port" or "ip::v6:port" func port(s string) int { vals := strings.Split(s, ":") diff --git a/internal/cmd/local/docker/docker.go b/internal/cmd/local/docker/docker.go index 0345a76..9fb5500 100644 --- a/internal/cmd/local/docker/docker.go +++ b/internal/cmd/local/docker/docker.go @@ -2,11 +2,9 @@ package docker import ( "context" - "errors" "fmt" "io" "runtime" - "strconv" "github.com/airbytehq/abctl/internal/cmd/local/localerr" "github.com/airbytehq/abctl/internal/cmd/local/paths" @@ -153,26 +151,3 @@ func (d *Docker) Version(ctx context.Context) (Version, error) { Platform: ver.Platform.Name, }, nil } - -// Port returns the host-port the underlying docker process is currently bound to, for the given container. -// It determines this by walking through all the ports on the container and finding the one that is bound to ip 0.0.0.0. -func (d *Docker) Port(ctx context.Context, container string) (int, error) { - ci, err := d.Client.ContainerInspect(ctx, container) - if err != nil { - return 0, fmt.Errorf("unable to inspect container: %w", err) - } - - for _, bindings := range ci.NetworkSettings.Ports { - for _, ipPort := range bindings { - if ipPort.HostIP == "0.0.0.0" { - port, err := strconv.Atoi(ipPort.HostPort) - if err != nil { - return 0, fmt.Errorf("unable to convert host port %s to integer: %w", ipPort.HostPort, err) - } - return port, nil - } - } - } - - return 0, errors.New("unable to determine port for container") -} diff --git a/internal/cmd/local/docker/docker_test.go b/internal/cmd/local/docker/docker_test.go index 3f944b6..26f4600 100644 --- a/internal/cmd/local/docker/docker_test.go +++ b/internal/cmd/local/docker/docker_test.go @@ -9,7 +9,6 @@ import ( "github.com/airbytehq/abctl/internal/cmd/local/localerr" "github.com/docker/docker/api/types" "github.com/docker/docker/client" - "github.com/docker/go-connections/nat" "github.com/google/go-cmp/cmp" ) @@ -18,12 +17,6 @@ import ( // avoid a circular dependency. var _ Client = (*dockertest.MockClient)(nil) -var expVersion = Version{ - Version: "version", - Arch: "arch", - Platform: "platform name", -} - func TestNewWithOptions(t *testing.T) { tests := []struct { name string @@ -42,6 +35,11 @@ func TestNewWithOptions(t *testing.T) { goos: "linux", }, } + expVersion := Version{ + Platform: dockertest.DefaultServerVersion.Platform.Name, + Arch: dockertest.DefaultServerVersion.Arch, + Version: dockertest.DefaultServerVersion.Version, + } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -49,10 +47,7 @@ func TestNewWithOptions(t *testing.T) { pingCalled := 0 p := mockPinger{ - MockClient: dockertest.MockClient{ - FnContainerInspect: defaultContainerInspect, - FnServerVersion: defaultServerVersion, - }, + MockClient: dockertest.NewMockClient(), ping: func(ctx context.Context) (types.Ping, error) { pingCalled++ return types.Ping{}, nil @@ -78,14 +73,6 @@ func TestNewWithOptions(t *testing.T) { t.Error("ping called incorrect number of times", d) } - port, err := cli.Port(ctx, "container") - if err != nil { - t.Fatal("failed fetching port", err) - } - if d := cmp.Diff(12345, port); d != "" { - t.Error("unexpected port", d) - } - ver, err := cli.Version(ctx) if err != nil { t.Fatal("failed fetching version", err) @@ -245,92 +232,6 @@ func TestVersion_Err(t *testing.T) { } } -func TestPort_Missing(t *testing.T) { - ctx := context.Background() - p := mockPinger{ - MockClient: dockertest.MockClient{ - FnContainerInspect: func(ctx context.Context, containerID string) (types.ContainerJSON, error) { - return types.ContainerJSON{ - NetworkSettings: &types.NetworkSettings{ - NetworkSettingsBase: types.NetworkSettingsBase{ - Ports: map[nat.Port][]nat.PortBinding{}, - }, - }, - }, nil - }, - }, - } - - f := func(opts ...client.Opt) (pinger, error) { return p, nil } - - cli, err := newWithOptions(ctx, f, "darwin") - if err != nil { - t.Fatal("failed creating client", err) - } - - _, err = cli.Port(ctx, "container") - if err == nil { - t.Error("expected error") - } -} - -func TestPort_Invalid(t *testing.T) { - ctx := context.Background() - p := mockPinger{ - MockClient: dockertest.MockClient{ - FnContainerInspect: func(ctx context.Context, containerID string) (types.ContainerJSON, error) { - return types.ContainerJSON{ - NetworkSettings: &types.NetworkSettings{ - NetworkSettingsBase: types.NetworkSettingsBase{ - Ports: map[nat.Port][]nat.PortBinding{ - "12345": {{ - HostIP: "0.0.0.0", - HostPort: "NaN", - }}, - }, - }, - }, - }, nil - }, - }, - } - - f := func(opts ...client.Opt) (pinger, error) { return p, nil } - - cli, err := newWithOptions(ctx, f, "darwin") - if err != nil { - t.Fatal("failed creating client", err) - } - - _, err = cli.Port(ctx, "container") - if err == nil { - t.Error("expected error") - } -} - -func TestPort_Err(t *testing.T) { - ctx := context.Background() - p := mockPinger{ - MockClient: dockertest.MockClient{ - FnContainerInspect: func(ctx context.Context, containerID string) (types.ContainerJSON, error) { - return types.ContainerJSON{}, errors.New("test error") - }, - }, - } - - f := func(opts ...client.Opt) (pinger, error) { return p, nil } - - cli, err := newWithOptions(ctx, f, "darwin") - if err != nil { - t.Fatal("failed creating client", err) - } - - _, err = cli.Port(ctx, "container") - if err == nil { - t.Error("expected error") - } -} - // --- mocks var _ pinger = (*mockPinger)(nil) @@ -346,26 +247,3 @@ func (m mockPinger) Ping(ctx context.Context) (types.Ping, error) { return m.ping(ctx) } - -func defaultContainerInspect(_ context.Context, _ string) (types.ContainerJSON, error) { - return types.ContainerJSON{ - NetworkSettings: &types.NetworkSettings{ - NetworkSettingsBase: types.NetworkSettingsBase{ - Ports: map[nat.Port][]nat.PortBinding{ - "12345": {{ - HostIP: "0.0.0.0", - HostPort: "12345", - }}, - }, - }, - }, - }, nil -} - -func defaultServerVersion(_ context.Context) (types.Version, error) { - return types.Version{ - Version: expVersion.Version, - Arch: expVersion.Arch, - Platform: struct{ Name string }{Name: expVersion.Platform}, - }, nil -} diff --git a/internal/cmd/local/docker/dockertest/dockertest.go b/internal/cmd/local/docker/dockertest/dockertest.go index 7229eb2..42412ae 100644 --- a/internal/cmd/local/docker/dockertest/dockertest.go +++ b/internal/cmd/local/docker/dockertest/dockertest.go @@ -10,6 +10,7 @@ import ( "github.com/docker/docker/api/types/network" "github.com/docker/docker/api/types/system" "github.com/docker/docker/api/types/volume" + "github.com/docker/go-connections/nat" ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) @@ -27,7 +28,14 @@ type MockClient struct { FnImagePull func(ctx context.Context, refStr string, options image.PullOptions) (io.ReadCloser, error) FnServerVersion func(ctx context.Context) (types.Version, error) FnVolumeInspect func(ctx context.Context, volumeID string) (volume.Volume, error) - FnInfo func(ctx context.Context) (system.Info, error) + FnInfo func(ctx context.Context) (system.Info, error) +} + +func NewMockClient() MockClient { + return MockClient{ + FnContainerInspect: func(_ context.Context, _ string) (types.ContainerJSON, error) { return DefaultContainerInspect, nil }, + FnServerVersion: func(_ context.Context) (types.Version, error) { return DefaultServerVersion, nil }, + } } func (m MockClient) ContainerCreate(ctx context.Context, config *container.Config, hostConfig *container.HostConfig, networkingConfig *network.NetworkingConfig, platform *ocispec.Platform, containerName string) (container.CreateResponse, error) { @@ -85,3 +93,22 @@ func (m MockClient) VolumeInspect(ctx context.Context, volumeID string) (volume. func (m MockClient) Info(ctx context.Context) (system.Info, error) { return m.FnInfo(ctx) } + +var DefaultContainerInspect = types.ContainerJSON{ + NetworkSettings: &types.NetworkSettings{ + NetworkSettingsBase: types.NetworkSettingsBase{ + Ports: map[nat.Port][]nat.PortBinding{ + "12345": {{ + HostIP: "0.0.0.0", + HostPort: "12345", + }}, + }, + }, + }, +} + +var DefaultServerVersion = types.Version{ + Version: "version", + Arch: "arch", + Platform: struct{ Name string }{Name: "platform name"}, +} diff --git a/internal/cmd/local/local_credentials.go b/internal/cmd/local/local_credentials.go index 9ba9c3c..0be3baa 100644 --- a/internal/cmd/local/local_credentials.go +++ b/internal/cmd/local/local_credentials.go @@ -45,7 +45,7 @@ func (cc *CredentialsCmd) Run(ctx context.Context, provider k8s.Provider, telCli clientId := string(secret.Data[secretClientID]) clientSecret := string(secret.Data[secretClientSecret]) - port, err := getPort(ctx, provider) + port, err := getPort(ctx, provider.ClusterName) if err != nil { return err } diff --git a/internal/cmd/local/local_install.go b/internal/cmd/local/local_install.go index 9d8d27b..a9cf800 100644 --- a/internal/cmd/local/local_install.go +++ b/internal/cmd/local/local_install.go @@ -5,7 +5,6 @@ import ( "fmt" "strings" - "github.com/airbytehq/abctl/internal/cmd/local/docker" "github.com/airbytehq/abctl/internal/cmd/local/k8s" "github.com/airbytehq/abctl/internal/cmd/local/local" "github.com/airbytehq/abctl/internal/telemetry" @@ -40,11 +39,6 @@ func (i *InstallCmd) Run(ctx context.Context, provider k8s.Provider, telClient t return fmt.Errorf("unable to determine docker installation status: %w", err) } - spinner.UpdateText(fmt.Sprintf("Checking if port %d is available", i.Port)) - if err := portAvailable(ctx, i.Port); err != nil { - return fmt.Errorf("port %d is not available: %w", i.Port, err) - } - return telClient.Wrap(ctx, telemetry.Install, func() error { spinner.UpdateText(fmt.Sprintf("Checking for existing Kubernetes cluster '%s'", provider.ClusterName)) @@ -61,21 +55,10 @@ func (i *InstallCmd) Run(ctx context.Context, provider k8s.Provider, telClient t // only for kind do we need to check the existing port if provider.Name == k8s.Kind { - if dockerClient == nil { - dockerClient, err = docker.New(ctx) - if err != nil { - pterm.Error.Printfln("Unable to connect to Docker daemon") - return fmt.Errorf("unable to connect to docker: %w", err) - } - } - providedPort := i.Port - i.Port, err = dockerClient.Port(ctx, fmt.Sprintf("%s-control-plane", provider.ClusterName)) + i.Port, err = getPort(ctx, provider.ClusterName) if err != nil { - pterm.Warning.Printfln("Unable to determine which port the existing cluster was configured to use.\n" + - "Installation will continue but may ultimately fail, in which case it will be necessarily to uninstall first.") - // since we can't verify the port is correct, push forward with the provided port - i.Port = providedPort + return err } if providedPort != i.Port { pterm.Warning.Printfln("The existing cluster was found to be using port %d, which differs from the provided port %d.\n"+ @@ -87,13 +70,19 @@ func (i *InstallCmd) Run(ctx context.Context, provider k8s.Provider, telClient t } else { // no existing cluster, need to create one pterm.Info.Println(fmt.Sprintf("No existing cluster found, cluster '%s' will be created", provider.ClusterName)) - spinner.UpdateText(fmt.Sprintf("Creating cluster '%s'", provider.ClusterName)) extraVolumeMounts, err := parseVolumeMounts(i.Volume) if err != nil { return err } + spinner.UpdateText(fmt.Sprintf("Checking if port %d is available", i.Port)) + if err := portAvailable(ctx, i.Port); err != nil { + return err + } + pterm.Success.Printfln("Port %d appears to be available", i.Port) + spinner.UpdateText(fmt.Sprintf("Creating cluster '%s'", provider.ClusterName)) + if err := cluster.Create(i.Port, extraVolumeMounts); err != nil { pterm.Error.Printfln("Cluster '%s' could not be created", provider.ClusterName) return err diff --git a/internal/cmd/local/local_status.go b/internal/cmd/local/local_status.go index bee0043..d8d484f 100644 --- a/internal/cmd/local/local_status.go +++ b/internal/cmd/local/local_status.go @@ -53,7 +53,7 @@ func status(ctx context.Context, provider k8s.Provider, telClient telemetry.Clie pterm.Success.Printfln("Existing cluster '%s' found", provider.ClusterName) spinner.UpdateText(fmt.Sprintf("Validating existing cluster '%s'", provider.ClusterName)) - port, err := getPort(ctx, provider) + port, err := getPort(ctx, provider.ClusterName) if err != nil { return err }