Skip to content

Commit

Permalink
fix: improve port checks
Browse files Browse the repository at this point in the history
  • Loading branch information
abuchanan-airbyte committed Aug 30, 2024
1 parent bedbead commit 498bb61
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 217 deletions.
2 changes: 1 addition & 1 deletion internal/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ This could be in indication that the ingress port is already in use by a differe
The ingress port can be changed by passing the flag --port.`

// helpPort is displayed if ErrPort is ever returned
helpPort = `An error occurred while verifying if the request port is available.
helpPort = `An error occurred while verifying if the requested port is available.
This could be in indication that the ingress port is already in use by a different application.
The ingress port can be changed by passing the flag --port.`
)
Expand Down
96 changes: 52 additions & 44 deletions internal/cmd/local/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
"runtime"
"strconv"
"syscall"
"time"

"github.com/airbytehq/abctl/internal/cmd/local/docker"
"github.com/airbytehq/abctl/internal/cmd/local/k8s"
"github.com/airbytehq/abctl/internal/cmd/local/localerr"
"github.com/airbytehq/abctl/internal/telemetry"
"github.com/pterm/pterm"
Expand Down Expand Up @@ -78,68 +80,74 @@ func portAvailable(ctx context.Context, port int) error {
// net.Listen doesn't support providing a context
lc := &net.ListenConfig{}
listener, err := lc.Listen(ctx, "tcp", fmt.Sprintf("localhost:%d", port))
if isErrorAddressAlreadyInUse(err) {
return fmt.Errorf("%w: port %d is already in use", localerr.ErrPort, port)
}
if err != nil {
pterm.Debug.Println(fmt.Sprintf("Unable to listen on port '%d': %s", port, err))

// check if an existing airbyte installation is already listening on this port
req, errInner := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("http://localhost:%d/api/v1/instance_configuration", port), nil)
if errInner != nil {
pterm.Error.Printfln("Port %d request could not be created", port)
return fmt.Errorf("%w: unable to create request: %w", localerr.ErrPort, err)
}

res, errInner := httpClient.Do(req)
if errInner != nil {
pterm.Error.Printfln("Port %d appears to already be in use", port)
return fmt.Errorf("%w: unable to send request: %w", localerr.ErrPort, err)
}

if res.StatusCode == http.StatusOK {
pterm.Success.Printfln("Port %d appears to be running a previous Airbyte installation", port)
return nil
}

// if we're here, we haven't been able to determine why this port may or may not be available
body, errInner := io.ReadAll(res.Body)
if errInner != nil {
pterm.Debug.Println(fmt.Sprintf("Unable to read response body: %s", errInner))
}
pterm.Debug.Println(fmt.Sprintf(
"Unable to determine if port '%d' is in use:\n StatusCode: %d\n Body: %s",
port, res.StatusCode, body,
))

pterm.Error.Println(fmt.Sprintf(
"Unable to determine if port '%d' is available, consider specifying a different port",
port,
))
return fmt.Errorf("unable to determine if port '%d' is available: %w", port, err)
return fmt.Errorf("%w: unable to determine if port '%d' is available: %w", localerr.ErrPort, port, err)
}
// if we're able to bind to the port (and then release it), it should be available
defer func() {
_ = listener.Close()
}()

pterm.Success.Printfln("Port %d appears to be available", port)
return nil
}

func getPort(ctx context.Context, provider k8s.Provider) (int, error) {
func isErrorAddressAlreadyInUse(err error) bool {
var eOsSyscall *os.SyscallError
if !errors.As(err, &eOsSyscall) {
return false
}
var errErrno syscall.Errno // doesn't need a "*" (ptr) because it's already a ptr (uintptr)
if !errors.As(eOsSyscall, &errErrno) {
return false
}
if errors.Is(errErrno, syscall.EADDRINUSE) {
return true
}
const WSAEADDRINUSE = 10048
if runtime.GOOS == "windows" && errErrno == WSAEADDRINUSE {
return true
}
return false
}

func getPort(ctx context.Context, clusterName string) (int, error) {
var err error

if dockerClient == nil {
dockerClient, err = docker.New(ctx)
if err != nil {
pterm.Error.Printfln("Unable to connect to Docker daemon")
return 0, fmt.Errorf("unable to connect to docker: %w", err)
}
}

clusterPort, err := dockerClient.Port(ctx, fmt.Sprintf("%s-control-plane", provider.ClusterName))
container := fmt.Sprintf("%s-control-plane", clusterName)

ci, err := dockerClient.Client.ContainerInspect(ctx, container)
if err != nil {
pterm.Error.Printfln("Unable to determine docker port for cluster '%s'", provider.ClusterName)
return 0, errors.New("unable to determine port cluster was installed with")
return 0, fmt.Errorf("unable to inspect container: %w", err)
}
if ci.State == nil || ci.State.Status != "running" {
status := "unknown"
if ci.State != nil {
status = ci.State.Status
}
return 0, fmt.Errorf("container %q is not running (status = %q)", container, status)
}

for _, bindings := range ci.HostConfig.PortBindings {
for _, ipPort := range bindings {
if ipPort.HostIP == "0.0.0.0" {
port, err := strconv.Atoi(ipPort.HostPort)
if err != nil {
return 0, fmt.Errorf("unable to convert host port %s to integer: %w", ipPort.HostPort, err)
}
return port, nil
}
}
}

return clusterPort, nil
return 0, fmt.Errorf("no matching ports found on container %q", container)
}
138 changes: 138 additions & 0 deletions internal/cmd/local/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
"github.com/airbytehq/abctl/internal/cmd/local/localerr"
"github.com/airbytehq/abctl/internal/telemetry"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/system"
"github.com/docker/go-connections/nat"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
)
Expand Down Expand Up @@ -113,6 +115,142 @@ func TestPortAvailable_Unavailable(t *testing.T) {
}
}

func TestGetPort_Found(t *testing.T) {
t.Cleanup(func() {
dockerClient = nil
})

dockerClient = &docker.Docker{
Client: dockertest.MockClient{
FnContainerInspect: func(ctx context.Context, containerID string) (types.ContainerJSON, error) {
return types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
State: &types.ContainerState{
Status: "running",
},
HostConfig: &container.HostConfig{
PortBindings: nat.PortMap{
"tcp/80": {{
HostIP: "0.0.0.0",
HostPort: "8000",
}},
},
},
},
}, nil
},
},
}

port, err := getPort(context.Background(), "test")
if err != nil {
t.Errorf("unexpected error: %s", err)
return
}
if port != 8000 {
t.Errorf("expected 8000 but got %d", port)
}
}

func TestGetPort_NotRunning(t *testing.T) {
t.Cleanup(func() {
dockerClient = nil
})

dockerClient = &docker.Docker{
Client: dockertest.MockClient{
FnContainerInspect: func(ctx context.Context, containerID string) (types.ContainerJSON, error) {
return types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
State: &types.ContainerState{
Status: "stopped",
},
HostConfig: &container.HostConfig{
PortBindings: nat.PortMap{
"tcp/80": {{
HostIP: "0.0.0.0",
HostPort: "8000",
}},
},
},
},
}, nil
},
},
}

_, err := getPort(context.Background(), "test")
if err == nil {
t.Error("expected error")
}
}

func TestGetPort_Missing(t *testing.T) {
t.Cleanup(func() {
dockerClient = nil
})

dockerClient = &docker.Docker{
Client: dockertest.MockClient{
FnContainerInspect: func(ctx context.Context, containerID string) (types.ContainerJSON, error) {
return types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
State: &types.ContainerState{
Status: "running",
},
HostConfig: &container.HostConfig{
PortBindings: nat.PortMap{
"tcp/80": {{
HostIP: "1.2.3.4",
HostPort: "8000",
}},
},
},
},
}, nil
},
},
}

_, err := getPort(context.Background(), "test")
if err == nil {
t.Error("expected error")
}
}

func TestGetPort_Invalid(t *testing.T) {
t.Cleanup(func() {
dockerClient = nil
})

dockerClient = &docker.Docker{
Client: dockertest.MockClient{
FnContainerInspect: func(ctx context.Context, containerID string) (types.ContainerJSON, error) {
return types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
State: &types.ContainerState{
Status: "running",
},
HostConfig: &container.HostConfig{
PortBindings: nat.PortMap{
"tcp/80": {{
HostIP: "1.2.3.4",
HostPort: "NaN",
}},
},
},
},
}, nil
},
},
}

_, err := getPort(context.Background(), "test")
if err == nil {
t.Error("expected error")
}
}

// port returns the port from a string value in the format of "ipv4:port" or "ip::v6:port"
func port(s string) int {
vals := strings.Split(s, ":")
Expand Down
25 changes: 0 additions & 25 deletions internal/cmd/local/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package docker

import (
"context"
"errors"
"fmt"
"io"
"runtime"
"strconv"

"github.com/airbytehq/abctl/internal/cmd/local/localerr"
"github.com/airbytehq/abctl/internal/cmd/local/paths"
Expand Down Expand Up @@ -153,26 +151,3 @@ func (d *Docker) Version(ctx context.Context) (Version, error) {
Platform: ver.Platform.Name,
}, nil
}

// Port returns the host-port the underlying docker process is currently bound to, for the given container.
// It determines this by walking through all the ports on the container and finding the one that is bound to ip 0.0.0.0.
func (d *Docker) Port(ctx context.Context, container string) (int, error) {
ci, err := d.Client.ContainerInspect(ctx, container)
if err != nil {
return 0, fmt.Errorf("unable to inspect container: %w", err)
}

for _, bindings := range ci.NetworkSettings.Ports {
for _, ipPort := range bindings {
if ipPort.HostIP == "0.0.0.0" {
port, err := strconv.Atoi(ipPort.HostPort)
if err != nil {
return 0, fmt.Errorf("unable to convert host port %s to integer: %w", ipPort.HostPort, err)
}
return port, nil
}
}
}

return 0, errors.New("unable to determine port for container")
}
Loading

0 comments on commit 498bb61

Please sign in to comment.