Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create Happy Eyeballs dialer #176

Merged
merged 30 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ use GitHub pull requests for this purpose. Consult
[GitHub Help](https://help.github.com/articles/about-pull-requests/) for more
information on using pull requests.

# Go Documentation

The best way to ensure you got the Go doc formatting right is to visualize it.
To visualize the Go documentation you wrote, run:

```sh
go run golang.org/x/pkgsite/cmd/pkgsite@latest
jyyi1 marked this conversation as resolved.
Show resolved Hide resolved
```

Then open http://localhost:8080 on your browser.

# Cross-platform Development

## Building
Expand Down
259 changes: 259 additions & 0 deletions transport/happyeyeballs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
// Copyright 2024 Jigsaw Operations LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package transport

import (
"context"
"errors"
"fmt"
"net"
"net/netip"
"sync/atomic"
"time"
)

/*
HappyEyeballsStreamDialer is a [StreamDialer] that uses [Happy Eyeballs v2] to establish a connection
to the destination address.

Happy Eyeballs v2 reduces the connection delay when compared to v1, with significant differences when one of the
address lookups times out. V1 will wait for both the IPv4 and IPv6 lookups to return before attempting connections,
while V2 starts connections as soon as it gets a lookup result, with a slight delay if IPv4 arrives before IPv6.

Go and most platforms provide V1 only, so you will benefit from using the HappyEyeballsStreamDialer in place of the
standard dialer, even if you are not using custom transports.

[Happy Eyeballs v2]: https://datatracker.ietf.org/doc/html/rfc8305
*/
type HappyEyeballsStreamDialer struct {
// Dialer is used to establish the connection attempts. If nil, a direct TCP connection is established.
Dialer StreamDialer
// Resolve is a function to map a host name to IP addresses. See HappyEyeballsResolver.
Resolve HappyEyeballsResolveFunc
}

// HappyEyeballsResolveFunc performs concurrent hostname resolution for [HappyEyeballsStreamDialer].
//
// The function should return a channel quickly, and then send the resolution results to it
// as they become available. HappyEyeballsStreamDialer will read the resolutions from the channel.
// The returned channel must be closed when there are no
// more resolutions pending, to indicate that the resolution is done. If that is not
// done, HappyEyeballsStreamDialer will keep waiting.
//
// It's recommended to return a buffered channel with size equal to the number of
// lookups, so that it will never block on write.
// If the channel is unbuffered, you must use select when writing to the channel against
// ctx.Done(), to make sure you don't write when HappyEyeballsStreamDialer is no longer reading.
// Othewise your goroutine will get stuck.
//
// It's recommended to resolve IPv6 and IPv4 in parallel, so the connection attempts
// are started as soon as addresses are received. That's the primary benefit of Happy
// Eyeballs v2. If you resolve in series, and only send the addresses when both
// resolutions are done, you will get behavior similar to Happy Eyeballs v1.
type HappyEyeballsResolveFunc = func(ctx context.Context, hostname string) <-chan HappyEyeballsResolution

// HappyEyeballsResolution represents a result of a hostname resolution.
// Happy Eyeballs sorts the IPs in a specific way, updating the order as
// new results are received. It's recommended to returns all IPs you receive
// as a group, rather than one IP at a time, since a later IP may be preferred.
type HappyEyeballsResolution struct {
IPs []netip.Addr
Err error
}

// NewParallelHappyEyeballsResolveFunc creates a [HappyEyeballsResolveFunc] that uses the given list of functions to resolve host names.
// The given functions will all run in parallel, with results being output as they are received.
// Typically you will pass one function for IPv6 and one for IPv4 to achieve Happy Eyballs v2 behavior.
// It takes care of creating the channel and the parallelization and coordination between the calls.
func NewParallelHappyEyeballsResolveFunc(resolveFuncs ...func(ctx context.Context, hostname string) ([]netip.Addr, error)) HappyEyeballsResolveFunc {
return func(ctx context.Context, host string) <-chan HappyEyeballsResolution {
// Use a buffered channel with space for both lookups, to ensure the goroutines won't
// block on channel write if the Happy Eyeballs algorithm is cancelled and no longer reading.
resultsCh := make(chan HappyEyeballsResolution, len(resolveFuncs))
if len(resolveFuncs) == 0 {
close(resultsCh)
return resultsCh
}

var pending atomic.Int32
pending.Store(int32(len(resolveFuncs)))
for _, resolve := range resolveFuncs {
go func(resolve func(ctx context.Context, hostname string) ([]netip.Addr, error), hostname string) {
ips, err := resolve(ctx, hostname)
resultsCh <- HappyEyeballsResolution{ips, err}
if pending.Add(-1) == 0 {
// Close results channel when no other goroutine is pending.
close(resultsCh)
}
}(resolve, host)
}
return resultsCh
}
}

var _ StreamDialer = (*HappyEyeballsStreamDialer)(nil)

func (d *HappyEyeballsStreamDialer) dial(ctx context.Context, addr string) (StreamConn, error) {
if d.Dialer != nil {
return d.Dialer.DialStream(ctx, addr)
}
return (&TCPDialer{}).DialStream(ctx, addr)
}

func newClosedChan() <-chan struct{} {
closedCh := make(chan struct{})
close(closedCh)
return closedCh
}

// DialStream implements [StreamDialer].
func (d *HappyEyeballsStreamDialer) DialStream(ctx context.Context, addr string) (StreamConn, error) {
hostname, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, fmt.Errorf("failed to parse address: %w", err)
}
if net.ParseIP(hostname) != nil {
// Host is already an IP address, just dial the address.
return d.dial(ctx, addr)
}

// Indicates to attempts that the dialing process is done, so they don't get stuck.
ctx, dialDone := context.WithCancel(ctx)
defer dialDone()
Comment on lines +132 to +134
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I mean we did not do anything special with dialDone in the last review. Why not just simply using the ctx parameter directly in the code below?

Suggested change
// Indicates to attempts that the dialing process is done, so they don't get stuck.
ctx, dialDone := context.WithCancel(ctx)
defer dialDone()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no guarantee that the input context will be cancelled. In this code, I cancel the context to make sure all subtasks are notifed and can complete.
If I remove the dialDone, the subtasks may hang for a while.


// HOSTNAME RESOLUTION QUERY HANDLING
// https://datatracker.ietf.org/doc/html/rfc8305#section-3
resolutionCh := d.Resolve(ctx, hostname)

// CONNECTION ATTEMPTS
// https://datatracker.ietf.org/doc/html/rfc8305#section-5
// We keep IPv4s and IPv6 separate and track the last one attempted so we can
// alternate the address family in the connection attempts.
ip4s := make([]netip.Addr, 0, 1)
ip6s := make([]netip.Addr, 0, 1)
var lastDialed netip.Addr
// Keep track of the lookup and dial errors separately. We prefer the dial errors
// when returning.
var lookupErr error
var dialErr error
// Channel to wait for before a new dial attempt. It starts
// with a closed channel that doesn't block because there's no
// wait initially.
var attemptDelayCh <-chan struct{} = newClosedChan()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit it seems this is the only place newClosedChan() is used. Maybe we can simplify close the channel here and deletenewClosedChan function to save some lines?

Suggested change
var attemptDelayCh <-chan struct{} = newClosedChan()
attemptDelayCh := make(chan struct{})
close(attemptDelayCh)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your code doesn't work because attemptDelayCh must be of type <-chan struct{}, not chan struct{}, otherwise the assignment attemptDelayCh = waitCtx.Done() doesn't compile.

Instead of creating a channel, closing, and then assigning to attemptDelayCh, I found it better to keep that logic out of the core logic, which is already complicated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG, maybe var attemptDelayCh <-chan struct{} = make(chan struct{}) would work here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that too 😅
The problem is that you can't close a read-only channel.

type DialResult struct {
Conn StreamConn
Err error
}
dialCh := make(chan DialResult)

// Channel that triggers when a new connection can be made. Starts blocked (nil)
// because we need IPs first.
var readyToDialCh <-chan struct{} = nil
// We keep track of pending operations (lookups and IPs to dial) so we can stop when
// there's no more work to wait for.
for opsPending := 1; opsPending > 0; {
if len(ip6s) == 0 && len(ip4s) == 0 {
// No IPs. Keep dial disabled.
readyToDialCh = nil
} else {
// There are IPs to dial.
if !lastDialed.IsValid() && len(ip6s) == 0 && resolutionCh != nil {
// Attempts haven't started and IPv6 lookup is not done yet. Set up Resolution Delay, as per
// https://datatracker.ietf.org/doc/html/rfc8305#section-8, if it hasn't been set up yet.
if readyToDialCh == nil {
resolutionDelayCtx, cancelResolutionDelay := context.WithTimeout(ctx, 50*time.Millisecond)
defer cancelResolutionDelay()
readyToDialCh = resolutionDelayCtx.Done()
}
} else {
// Wait for the previous attempt.
readyToDialCh = attemptDelayCh
}
}
select {
// Receive lookup results.
case lookupRes, ok := <-resolutionCh:
if !ok {
opsPending--
// Set to nil to make the read on lookupCh block and to signal lookup is done.
resolutionCh = nil
}
if lookupRes.Err != nil {
lookupErr = errors.Join(lookupErr, lookupRes.Err)
continue
}
opsPending += len(lookupRes.IPs)
// TODO: sort IPs as per https://datatracker.ietf.org/doc/html/rfc8305#section-4
for _, ip := range lookupRes.IPs {
if ip.Is6() {
ip6s = append(ip6s, ip)
} else {
ip4s = append(ip4s, ip)
}
}

// Wait for Connection Attempt Delay or attempt done.
// This case is disabled above when len(ip6s) == 0 && len(ip4s) == 0.
case <-readyToDialCh:
var toDial netip.Addr
// Alternate between IPv6 and IPv4.
if len(ip6s) == 0 || (lastDialed.Is6() && len(ip4s) > 0) {
jyyi1 marked this conversation as resolved.
Show resolved Hide resolved
toDial = ip4s[0]
ip4s = ip4s[1:]
} else {
toDial = ip6s[0]
ip6s = ip6s[1:]
}
// Reset Connection Attempt Delay, as per https://datatracker.ietf.org/doc/html/rfc8305#section-8
// We don't tie the delay context to the parent because we don't want the readyToDialCh case
// to trigger on the parent cancellation.
delayCtx, cancelDelay := context.WithTimeout(context.Background(), 250*time.Millisecond)
attemptDelayCh = delayCtx.Done()
go func(addr string, cancelDelay context.CancelFunc) {
// Cancel the wait if the dial return early.
defer cancelDelay()
conn, err := d.dial(ctx, addr)
select {
case <-ctx.Done():
if conn != nil {
conn.Close()
}
case dialCh <- DialResult{conn, err}:
}
}(net.JoinHostPort(toDial.String(), port), cancelDelay)
lastDialed = toDial

// Receive dial result.
case dialRes := <-dialCh:
opsPending--
if dialRes.Err != nil {
dialErr = errors.Join(dialErr, dialRes.Err)
continue
}
return dialRes.Conn, nil

// Dial has been canceled. Return.
case <-ctx.Done():
return nil, ctx.Err()
}
}
if dialErr != nil {
return nil, dialErr
}
if lookupErr != nil {
return nil, lookupErr
}
return nil, errors.New("address lookup returned no IPs")
}
Loading
Loading