Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create Happy Eyeballs dialer #176

Merged
merged 30 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ use GitHub pull requests for this purpose. Consult
[GitHub Help](https://help.github.com/articles/about-pull-requests/) for more
information on using pull requests.

# Go Documentation

The best way to ensure you got the Go doc formatting right is to visualize it.
To visualize the Go documentation you wrote, run:

```sh
go run golang.org/x/pkgsite/cmd/pkgsite@latest
jyyi1 marked this conversation as resolved.
Show resolved Hide resolved
```

Then open http://localhost:8080 on your browser.

# Cross-platform Development

## Building
Expand Down
237 changes: 237 additions & 0 deletions transport/happyeyeballs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
// Copyright 2024 Jigsaw Operations LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package transport

import (
"context"
"errors"
"fmt"
"net"
"net/netip"
"time"
)

/*
HappyEyeballsStreamDialer is a [StreamDialer] that uses [Happy Eyeballs v2] to establish a connection
to the destination address.

Happy Eyeballs v2 reduces the connection delay when compared to v1, with significant differences when one of the
address lookups times out. V1 will wait for both the IPv4 and IPv6 lookups to return before attempting connections,
while V2 starts connections as soon as it gets a lookup result, with a slight delay if IPv4 arrives before IPv6.

Go and most platforms provide V1 only, so you will benefit from using the HappyEyeballsStreamDialer in place of the
standard dialer, even if you are not using custom transports.

[Happy Eyeballs v2]: https://datatracker.ietf.org/doc/html/rfc8305
*/
type HappyEyeballsStreamDialer struct {
// The base dialer to establish connections. If nil, a direct TCP connection is established.
Dialer StreamDialer
// Function to map a host name to IPv6 addresses. If nil, net.DefaultResolver is used.
LookupIPv6 func(ctx context.Context, host string) ([]netip.Addr, error)
// Function to map a host name to IPv4 addresses. If nil, net.DefaultResolver is used.
LookupIPv4 func(ctx context.Context, host string) ([]netip.Addr, error)
}

var _ StreamDialer = (*HappyEyeballsStreamDialer)(nil)

func (d *HappyEyeballsStreamDialer) dial(ctx context.Context, addr string) (StreamConn, error) {
if d.Dialer != nil {
return d.Dialer.DialStream(ctx, addr)
}
return (&TCPDialer{}).DialStream(ctx, addr)
}

func (d *HappyEyeballsStreamDialer) lookupIPv4(ctx context.Context, host string) ([]netip.Addr, error) {
if d.LookupIPv4 != nil {
return d.LookupIPv4(ctx, host)
}
return net.DefaultResolver.LookupNetIP(ctx, "ip4", host)
}

func (d *HappyEyeballsStreamDialer) lookupIPv6(ctx context.Context, host string) ([]netip.Addr, error) {
if d.LookupIPv6 != nil {
return d.LookupIPv6(ctx, host)
}
return net.DefaultResolver.LookupNetIP(ctx, "ip6", host)
}

func newClosedChan() <-chan struct{} {
closedCh := make(chan struct{})
close(closedCh)
return closedCh
}

// DialStream implements [StreamDialer].
func (d *HappyEyeballsStreamDialer) DialStream(ctx context.Context, addr string) (StreamConn, error) {
host, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, fmt.Errorf("failed to parse address: %w", err)
}
if net.ParseIP(host) != nil {
// Host is already an IP address, just dial the address.
return d.dial(ctx, addr)
}

// Indicates to attempts that the search is done, so they don't get stuck.
searchCtx, searchDone := context.WithCancel(ctx)
jyyi1 marked this conversation as resolved.
Show resolved Hide resolved
defer searchDone()

// DOMAIN NAME LOOKUP SECTION
// We start the IPv4 and IPv6 lookups in parallel, writing to lookup4Ch
// and lookup6Ch when they are done.
type LookupResult struct {
IPs []netip.Addr
Err error
}
lookup6Ch := make(chan LookupResult)
lookup4Ch := make(chan LookupResult)
go func(lookup6Ch chan<- LookupResult, host string) {
ips, err := d.lookupIPv6(searchCtx, host)
if err != nil {
err = fmt.Errorf("failed to lookup IPv6 addresses: %w", err)
}
lookup6Ch <- LookupResult{ips, err}
close(lookup6Ch)
}(lookup6Ch, host)
go func(lookup4Ch chan<- LookupResult, host string) {
ips, err := d.lookupIPv4(searchCtx, host)
if err != nil {
err = fmt.Errorf("failed to lookup IPv4 addresses: %w", err)
}
lookup4Ch <- LookupResult{ips, err}
close(lookup4Ch)
}(lookup4Ch, host)

// DIAL ATTEMPTS SECTION
// We keep IPv4s and IPv6 separate and track the last one attempted so we can
// alternate the address family in the connection attempts.
var ip4s []netip.Addr
var ip6s []netip.Addr
var lastDialed netip.Addr
// Keep track of the lookup and dial errors separately. We prefer the dial errors
// when returning.
var lookupErr error
var dialErr error
// Channel to wait for before a new dial attempt. It starts
// with a closed channel that doesn't block because there's no
// wait initially.
var attemptDelayCh <-chan struct{} = newClosedChan()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit it seems this is the only place newClosedChan() is used. Maybe we can simplify close the channel here and deletenewClosedChan function to save some lines?

Suggested change
var attemptDelayCh <-chan struct{} = newClosedChan()
attemptDelayCh := make(chan struct{})
close(attemptDelayCh)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your code doesn't work because attemptDelayCh must be of type <-chan struct{}, not chan struct{}, otherwise the assignment attemptDelayCh = waitCtx.Done() doesn't compile.

Instead of creating a channel, closing, and then assigning to attemptDelayCh, I found it better to keep that logic out of the core logic, which is already complicated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG, maybe var attemptDelayCh <-chan struct{} = make(chan struct{}) would work here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that too 😅
The problem is that you can't close a read-only channel.

type DialResult struct {
Conn StreamConn
Err error
}
var dialCh = make(chan DialResult)
jyyi1 marked this conversation as resolved.
Show resolved Hide resolved

// Channel that triggers when a new connection can be made. Starts blocked (nil)
// because we need IPs first.
var readyToDialCh <-chan struct{} = nil
// We keep track of pending operations (lookups and IPs to dial) so we can stop when
// there's no more work to wait for.
for opsPending := 2; opsPending > 0; {
if len(ip6s) == 0 && len(ip4s) == 0 {
// No IPs. Keep dial disabled.
readyToDialCh = nil
} else {
// There are IPs to dial.
if !lastDialed.IsValid() && len(ip6s) == 0 && lookup6Ch != nil {
// Attempts haven't started and IPv6 lookup is not done yet. Set up Resolution Delay, as per
// https://datatracker.ietf.org/doc/html/rfc8305#section-8, if it hasn't been set up yet.
if readyToDialCh == nil {
resolutionDelayCtx, cancelResolutionDelay := context.WithTimeout(searchCtx, 50*time.Millisecond)
defer cancelResolutionDelay()
readyToDialCh = resolutionDelayCtx.Done()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can these lines be simplified with the following? Because we already have a case <-searchCtx.Done() statement in the select below, which will be triggered once searchCtx is cancelled. In addition, if we linked searchCtx to readyToDialCh, will there be any potential race conditions, because both case <-searchCtx.Done() and case <-readyToDialCh will be ready when searchCtx is cancelled?

Suggested change
resolutionDelayCtx, cancelResolutionDelay := context.WithTimeout(searchCtx, 50*time.Millisecond)
defer cancelResolutionDelay()
readyToDialCh = resolutionDelayCtx.Done()
readyToDialCh = time.Deplay(50*time.Millisecond)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming you meant time.NewTimer(50*time.Milisecond).C. I wasn't sure if we needed to drain the Timer.C channel, but it seems it's buffered and we don't need to?

In any case, I tried based on your suggestion, but the type of channel is different (<-chan time.Time), so it requires more changes, not really simpler. Also, the timer sends the time once, versus a context closes the channel, which is easier to reason about (a read will never block).

Another important point is that I still need to stop the timer, so I need to pass it to the goroutine to call Stop in case the dial returns before the timer triggers.

On the race condition, that's a valid point. A cancellation will potentially trigger an extra dial. I tried different ways to trigger, but couldn't. In any case, I changed the code to use the background context instead.

}
} else {
// Wait for the previous attempt.
readyToDialCh = attemptDelayCh
}
}
select {
// Receive IPv6 results.
case lookupRes := <-lookup6Ch:
opsPending--
// Set to nil to make the read on lookup6Ch block and to signal IPv6 lookup is done.
lookup6Ch = nil
if lookupRes.Err != nil {
lookupErr = errors.Join(lookupRes.Err)
jyyi1 marked this conversation as resolved.
Show resolved Hide resolved
continue
}
opsPending += len(lookupRes.IPs)
// TODO: sort IPs as per https://datatracker.ietf.org/doc/html/rfc8305#section-4
ip6s = lookupRes.IPs

// Receive IPv4 results.
case lookupRes := <-lookup4Ch:
opsPending--
// Set to nil to make the read on lookup4Ch block and to signal IPv4 lookup is done.
lookup4Ch = nil
if lookupRes.Err != nil {
lookupErr = errors.Join(lookupRes.Err)
jyyi1 marked this conversation as resolved.
Show resolved Hide resolved
continue
}
opsPending += len(lookupRes.IPs)
// TODO: sort IPs as per https://datatracker.ietf.org/doc/html/rfc8305#section-4
ip4s = lookupRes.IPs

// Wait for new attempt done. Dial new IP address.
case <-readyToDialCh:
var toDial netip.Addr
if len(ip6s) == 0 || (lastDialed.Is6() && len(ip4s) > 0) {
jyyi1 marked this conversation as resolved.
Show resolved Hide resolved
toDial = ip4s[0]
ip4s = ip4s[1:]
} else {
toDial = ip6s[0]
ip6s = ip6s[1:]
}
// Connection Attempt Delay, as per https://datatracker.ietf.org/doc/html/rfc8305#section-8
waitCtx, waitDone := context.WithTimeout(searchCtx, 250*time.Millisecond)
attemptDelayCh = waitCtx.Done()
go func(addr string, waitDone context.CancelFunc) {
// Cancel the wait if the dial return early.
defer waitDone()
conn, err := d.dial(searchCtx, addr)
select {
case <-searchCtx.Done():
if conn != nil {
conn.Close()
}
case dialCh <- DialResult{conn, err}:
}
}(net.JoinHostPort(toDial.String(), port), waitDone)
lastDialed = toDial

// Receive dial result.
case dialRes := <-dialCh:
opsPending--
if dialRes.Err != nil {
dialErr = errors.Join(dialRes.Err)
continue
}
return dialRes.Conn, nil

// Dial has been canceled. Return.
case <-searchCtx.Done():
return nil, searchCtx.Err()
}
}
if dialErr != nil {
return nil, dialErr
}
if lookupErr != nil {
return nil, lookupErr
}
return nil, fmt.Errorf("address lookup returned no IPs")
jyyi1 marked this conversation as resolved.
Show resolved Hide resolved
}
Loading
Loading