From 60e4afb8957c27b2a9a1101939b52d8f5167b1d5 Mon Sep 17 00:00:00 2001 From: Vinicius Fortuna Date: Fri, 19 Apr 2024 13:16:21 -0400 Subject: [PATCH 1/9] Collect connection address --- x/config/config.go | 10 +++++-- x/examples/test-connectivity/main.go | 43 ++++++++++++++++++++++++++-- 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/x/config/config.go b/x/config/config.go index fe43c0a9..c9410650 100644 --- a/x/config/config.go +++ b/x/config/config.go @@ -47,7 +47,7 @@ func NewStreamDialer(transportConfig string) (transport.StreamDialer, error) { return WrapStreamDialer(&transport.TCPDialer{}, transportConfig) } -// WrapStreamDialer created a [transport.StreamDialer] according to transportConfig, using dialer as the +// WrapStreamDialer creates a [transport.StreamDialer] according to transportConfig, using dialer as the // base [transport.StreamDialer]. The given dialer must not be nil. func WrapStreamDialer(dialer transport.StreamDialer, transportConfig string) (transport.StreamDialer, error) { if dialer == nil { @@ -110,11 +110,17 @@ func newStreamDialerFromPart(innerDialer transport.StreamDialer, oneDialerConfig // NewPacketDialer creates a new [transport.PacketDialer] according to the given config. func NewPacketDialer(transportConfig string) (dialer transport.PacketDialer, err error) { - dialer = &transport.UDPDialer{} + return WrapPacketDialer(&transport.UDPDialer{}, transportConfig) +} + +// WrapPacketDialer creates a [transport.PacketDialer] according to transportConfig, using dialer as the +// base [transport.PacketDialer]. The given dialer must not be nil. +func WrapPacketDialer(dialer transport.PacketDialer, transportConfig string) (transport.PacketDialer, error) { transportConfig = strings.TrimSpace(transportConfig) if transportConfig == "" { return dialer, nil } + var err error for _, part := range strings.Split(transportConfig, "|") { dialer, err = newPacketDialerFromPart(dialer, part) if err != nil { diff --git a/x/examples/test-connectivity/main.go b/x/examples/test-connectivity/main.go index 8cc76c2c..95f503ac 100644 --- a/x/examples/test-connectivity/main.go +++ b/x/examples/test-connectivity/main.go @@ -31,6 +31,7 @@ import ( "time" "github.com/Jigsaw-Code/outline-sdk/dns" + "github.com/Jigsaw-Code/outline-sdk/transport" "github.com/Jigsaw-Code/outline-sdk/x/config" "github.com/Jigsaw-Code/outline-sdk/x/connectivity" "github.com/Jigsaw-Code/outline-sdk/x/report" @@ -47,6 +48,9 @@ type connectivityReport struct { // TODO(fortuna): add sanitized transport config. Transport string `json:"transport"` + // The address of the selected connection to the proxy server. + ConnectionAddress addressJSON `json:"connection_address"` + // Observations Time time.Time `json:"time"` DurationMs int64 `json:"duration_ms"` @@ -62,6 +66,19 @@ type errorJSON struct { Msg string `json:"msg,omitempty"` } +type addressJSON struct { + Host string `json:"host"` + Port string `json:"port"` +} + +func newAddressJSON(address string) (addressJSON, error) { + host, port, err := net.SplitHostPort(address) + if err != nil { + return addressJSON{}, err + } + return addressJSON{host, port}, nil +} + func makeErrorRecord(result *connectivity.ConnectivityError) *errorJSON { if result == nil { return nil @@ -167,15 +184,30 @@ func main() { for _, proto := range strings.Split(*protoFlag, ",") { proto = strings.TrimSpace(proto) var resolver dns.Resolver + var connectionAddress string switch proto { case "tcp": - streamDialer, err := config.NewStreamDialer(*transportFlag) + baseDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { + conn, err := (&transport.TCPDialer{}).DialStream(ctx, addr) + if conn != nil { + connectionAddress = conn.RemoteAddr().String() + } + return conn, err + }) + streamDialer, err := config.WrapStreamDialer(baseDialer, *transportFlag) if err != nil { log.Fatalf("Failed to create StreamDialer: %v", err) } resolver = dns.NewTCPResolver(streamDialer, resolverAddress) case "udp": - packetDialer, err := config.NewPacketDialer(*transportFlag) + baseDialer := transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) { + conn, err := (&transport.UDPDialer{}).DialPacket(ctx, addr) + if conn != nil { + connectionAddress = conn.RemoteAddr().String() + } + return conn, err + }) + packetDialer, err := config.WrapPacketDialer(baseDialer, *transportFlag) if err != nil { log.Fatalf("Failed to create PacketDialer: %v", err) } @@ -197,7 +229,7 @@ func main() { if err != nil { log.Fatalf("Failed to sanitize config: %v", err) } - var r report.Report = connectivityReport{ + r := connectivityReport{ Resolver: resolverAddress, Proto: proto, Time: startTime.UTC().Truncate(time.Second), @@ -206,6 +238,11 @@ func main() { DurationMs: testDuration.Milliseconds(), Error: makeErrorRecord(result), } + connectionAddressJSON, err := newAddressJSON(connectionAddress) + if err == nil { + r.ConnectionAddress = connectionAddressJSON + } + if reportCollector != nil { err = reportCollector.Collect(context.Background(), r) if err != nil { From ebecc1f6fdeb9e9a8be79faaddb2753856b25150 Mon Sep 17 00:00:00 2001 From: Vinicius Fortuna Date: Fri, 19 Apr 2024 14:04:49 -0400 Subject: [PATCH 2/9] Try intercepting the transport dialer --- x/connectivity/connectivity.go | 51 ++++++++++++++++++++++++++++ x/examples/test-connectivity/main.go | 42 +++++++---------------- 2 files changed, 64 insertions(+), 29 deletions(-) diff --git a/x/connectivity/connectivity.go b/x/connectivity/connectivity.go index c6d26124..54e6f80f 100644 --- a/x/connectivity/connectivity.go +++ b/x/connectivity/connectivity.go @@ -18,13 +18,24 @@ import ( "context" "errors" "fmt" + "log" + "net" "syscall" "time" "github.com/Jigsaw-Code/outline-sdk/dns" + "github.com/Jigsaw-Code/outline-sdk/transport" "golang.org/x/net/dns/dnsmessage" ) +// ConnectivityResult captures the observed result of the connectivity test. +type ConnectivityResult struct { + // Address we connected to + ConnectionAddress string + // Observed error + Error *ConnectivityError +} + // ConnectivityError captures the observed error of the connectivity test. type ConnectivityError struct { // Which operation in the test that failed: "connect", "send" or "receive" @@ -64,6 +75,46 @@ func makeConnectivityError(op string, err error) *ConnectivityError { return &ConnectivityError{Op: op, PosixError: code, Err: err} } +func TestStreamConnectivityWithDNS(ctx context.Context, dialer transport.StreamDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { + result := &ConnectivityResult{} + captureDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { + conn, err := dialer.DialStream(ctx, addr) + if conn != nil { + result.ConnectionAddress = conn.RemoteAddr().String() + log.Println("address", result.ConnectionAddress) + } + return conn, err + }) + resolver := dns.NewTCPResolver(captureDialer, resolverAddress) + var err error + result.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) + if err != nil { + return nil, err + } + return result, nil +} + +func TestPacketConnectivityWithDNS(ctx context.Context, dialer transport.PacketDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { + result := &ConnectivityResult{} + captureDialer := transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) { + conn, err := dialer.DialPacket(ctx, addr) + if conn != nil { + // This doesn't work with the PacketListenerDialer we use because it returns the address of the target, not the proxy. + // TODO(fortuna): make PLD use the first hop address or try something else. + result.ConnectionAddress = conn.RemoteAddr().String() + log.Println("address", result.ConnectionAddress) + } + return conn, err + }) + resolver := dns.NewUDPResolver(captureDialer, resolverAddress) + var err error + result.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) + if err != nil { + return nil, err + } + return result, nil +} + // TestConnectivityWithResolver tests weather we can get a response from the given [Resolver]. It can be used // to test connectivity of its underlying [transport.StreamDialer] or [transport.PacketDialer]. // Invalid tests that cannot assert connectivity will return (nil, error). diff --git a/x/examples/test-connectivity/main.go b/x/examples/test-connectivity/main.go index 95f503ac..14fa1929 100644 --- a/x/examples/test-connectivity/main.go +++ b/x/examples/test-connectivity/main.go @@ -30,7 +30,6 @@ import ( "strings" "time" - "github.com/Jigsaw-Code/outline-sdk/dns" "github.com/Jigsaw-Code/outline-sdk/transport" "github.com/Jigsaw-Code/outline-sdk/x/config" "github.com/Jigsaw-Code/outline-sdk/x/connectivity" @@ -183,48 +182,33 @@ func main() { resolverAddress := net.JoinHostPort(resolverHost, "53") for _, proto := range strings.Split(*protoFlag, ",") { proto = strings.TrimSpace(proto) - var resolver dns.Resolver - var connectionAddress string + var testResult *connectivity.ConnectivityResult + var testErr error + startTime := time.Now() switch proto { case "tcp": - baseDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { - conn, err := (&transport.TCPDialer{}).DialStream(ctx, addr) - if conn != nil { - connectionAddress = conn.RemoteAddr().String() - } - return conn, err - }) - streamDialer, err := config.WrapStreamDialer(baseDialer, *transportFlag) + dialer, err := config.WrapStreamDialer(&transport.TCPDialer{}, *transportFlag) if err != nil { log.Fatalf("Failed to create StreamDialer: %v", err) } - resolver = dns.NewTCPResolver(streamDialer, resolverAddress) + testResult, testErr = connectivity.TestStreamConnectivityWithDNS(context.Background(), dialer, resolverAddress, *domainFlag) case "udp": - baseDialer := transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) { - conn, err := (&transport.UDPDialer{}).DialPacket(ctx, addr) - if conn != nil { - connectionAddress = conn.RemoteAddr().String() - } - return conn, err - }) - packetDialer, err := config.WrapPacketDialer(baseDialer, *transportFlag) + dialer, err := config.WrapPacketDialer(&transport.UDPDialer{}, *transportFlag) if err != nil { log.Fatalf("Failed to create PacketDialer: %v", err) } - resolver = dns.NewUDPResolver(packetDialer, resolverAddress) + testResult, testErr = connectivity.TestPacketConnectivityWithDNS(context.Background(), dialer, resolverAddress, *domainFlag) default: log.Fatalf(`Invalid proto %v. Must be "tcp" or "udp"`, proto) } - startTime := time.Now() - result, err := connectivity.TestConnectivityWithResolver(context.Background(), resolver, *domainFlag) - if err != nil { - log.Fatalf("Connectivity test failed to run: %v", err) + if testErr != nil { + log.Fatalf("Connectivity test failed to run: %v", testErr) } testDuration := time.Since(startTime) - if result == nil { + if testResult.Error == nil { success = true } - debugLog.Printf("Test %v %v result: %v", proto, resolverAddress, result) + debugLog.Printf("Test %v %v result: %v", proto, resolverAddress, testResult) sanitizedConfig, err := config.SanitizeConfig(*transportFlag) if err != nil { log.Fatalf("Failed to sanitize config: %v", err) @@ -236,9 +220,9 @@ func main() { // TODO(fortuna): Add sanitized config: Transport: sanitizedConfig, DurationMs: testDuration.Milliseconds(), - Error: makeErrorRecord(result), + Error: makeErrorRecord(testResult.Error), } - connectionAddressJSON, err := newAddressJSON(connectionAddress) + connectionAddressJSON, err := newAddressJSON(testResult.ConnectionAddress) if err == nil { r.ConnectionAddress = connectionAddressJSON } From 21e0e1b112c6e3d3335dcaecc4b0eb6d47aefed9 Mon Sep 17 00:00:00 2001 From: Vinicius Fortuna Date: Fri, 19 Apr 2024 14:52:31 -0400 Subject: [PATCH 3/9] Pass wrapper instead --- x/connectivity/connectivity.go | 33 +++++++++++++++++----------- x/examples/test-connectivity/main.go | 14 +++++------- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/x/connectivity/connectivity.go b/x/connectivity/connectivity.go index 54e6f80f..d607eb49 100644 --- a/x/connectivity/connectivity.go +++ b/x/connectivity/connectivity.go @@ -18,7 +18,6 @@ import ( "context" "errors" "fmt" - "log" "net" "syscall" "time" @@ -75,18 +74,22 @@ func makeConnectivityError(op string, err error) *ConnectivityError { return &ConnectivityError{Op: op, PosixError: code, Err: err} } -func TestStreamConnectivityWithDNS(ctx context.Context, dialer transport.StreamDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { +type WrapStreamDialer func(ctx context.Context, baseDialer transport.StreamDialer) (transport.StreamDialer, error) + +func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.StreamDialer, wrap WrapStreamDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { result := &ConnectivityResult{} - captureDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { - conn, err := dialer.DialStream(ctx, addr) + interceptDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { + conn, err := baseDialer.DialStream(ctx, addr) if conn != nil { result.ConnectionAddress = conn.RemoteAddr().String() - log.Println("address", result.ConnectionAddress) } return conn, err }) - resolver := dns.NewTCPResolver(captureDialer, resolverAddress) - var err error + dialer, err := wrap(ctx, interceptDialer) + if err != nil { + return nil, err + } + resolver := dns.NewTCPResolver(dialer, resolverAddress) result.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) if err != nil { return nil, err @@ -94,20 +97,24 @@ func TestStreamConnectivityWithDNS(ctx context.Context, dialer transport.StreamD return result, nil } -func TestPacketConnectivityWithDNS(ctx context.Context, dialer transport.PacketDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { +type WrapPacketDialer func(ctx context.Context, baseDialer transport.PacketDialer) (transport.PacketDialer, error) + +func TestPacketConnectivityWithDNS(ctx context.Context, baseDialer transport.PacketDialer, wrap WrapPacketDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { result := &ConnectivityResult{} - captureDialer := transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) { - conn, err := dialer.DialPacket(ctx, addr) + interceptDialer := transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) { + conn, err := baseDialer.DialPacket(ctx, addr) if conn != nil { // This doesn't work with the PacketListenerDialer we use because it returns the address of the target, not the proxy. // TODO(fortuna): make PLD use the first hop address or try something else. result.ConnectionAddress = conn.RemoteAddr().String() - log.Println("address", result.ConnectionAddress) } return conn, err }) - resolver := dns.NewUDPResolver(captureDialer, resolverAddress) - var err error + dialer, err := wrap(ctx, interceptDialer) + if err != nil { + return nil, err + } + resolver := dns.NewUDPResolver(dialer, resolverAddress) result.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) if err != nil { return nil, err diff --git a/x/examples/test-connectivity/main.go b/x/examples/test-connectivity/main.go index 14fa1929..a1b89009 100644 --- a/x/examples/test-connectivity/main.go +++ b/x/examples/test-connectivity/main.go @@ -187,17 +187,15 @@ func main() { startTime := time.Now() switch proto { case "tcp": - dialer, err := config.WrapStreamDialer(&transport.TCPDialer{}, *transportFlag) - if err != nil { - log.Fatalf("Failed to create StreamDialer: %v", err) + wrap := func(ctx context.Context, baseDialer transport.StreamDialer) (transport.StreamDialer, error) { + return config.WrapStreamDialer(baseDialer, *transportFlag) } - testResult, testErr = connectivity.TestStreamConnectivityWithDNS(context.Background(), dialer, resolverAddress, *domainFlag) + testResult, testErr = connectivity.TestStreamConnectivityWithDNS(context.Background(), &transport.TCPDialer{}, wrap, resolverAddress, *domainFlag) case "udp": - dialer, err := config.WrapPacketDialer(&transport.UDPDialer{}, *transportFlag) - if err != nil { - log.Fatalf("Failed to create PacketDialer: %v", err) + wrap := func(ctx context.Context, baseDialer transport.PacketDialer) (transport.PacketDialer, error) { + return config.WrapPacketDialer(baseDialer, *transportFlag) } - testResult, testErr = connectivity.TestPacketConnectivityWithDNS(context.Background(), dialer, resolverAddress, *domainFlag) + testResult, testErr = connectivity.TestPacketConnectivityWithDNS(context.Background(), &transport.UDPDialer{}, wrap, resolverAddress, *domainFlag) default: log.Fatalf(`Invalid proto %v. Must be "tcp" or "udp"`, proto) } From 577829fb779cd564aa0a5d4a3051ae980c53b02c Mon Sep 17 00:00:00 2001 From: Vinicius Fortuna Date: Fri, 19 Apr 2024 15:01:01 -0400 Subject: [PATCH 4/9] Clean up --- x/connectivity/connectivity.go | 14 ++++++++------ x/examples/test-connectivity/main.go | 4 ++-- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/x/connectivity/connectivity.go b/x/connectivity/connectivity.go index d607eb49..bf8e7026 100644 --- a/x/connectivity/connectivity.go +++ b/x/connectivity/connectivity.go @@ -74,8 +74,10 @@ func makeConnectivityError(op string, err error) *ConnectivityError { return &ConnectivityError{Op: op, PosixError: code, Err: err} } -type WrapStreamDialer func(ctx context.Context, baseDialer transport.StreamDialer) (transport.StreamDialer, error) +type WrapStreamDialer func(baseDialer transport.StreamDialer) (transport.StreamDialer, error) +// TestStreamConnectivityWithDNS tests weather we can get a response from a DNS resolver at resolverAddress over a stream connection. It sends testDomain as the query. +// It uses the baseDialer to create a first-hop connection to the proxy, and the wrap to apply the transport. func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.StreamDialer, wrap WrapStreamDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { result := &ConnectivityResult{} interceptDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { @@ -85,7 +87,7 @@ func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.Str } return conn, err }) - dialer, err := wrap(ctx, interceptDialer) + dialer, err := wrap(interceptDialer) if err != nil { return nil, err } @@ -97,20 +99,20 @@ func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.Str return result, nil } -type WrapPacketDialer func(ctx context.Context, baseDialer transport.PacketDialer) (transport.PacketDialer, error) +type WrapPacketDialer func(baseDialer transport.PacketDialer) (transport.PacketDialer, error) +// TestPacketConnectivityWithDNS tests weather we can get a response from a DNS resolver at resolverAddress over a packet connection. It sends testDomain as the query. +// It uses the baseDialer to create a first-hop connection to the proxy, and the wrap to apply the transport. func TestPacketConnectivityWithDNS(ctx context.Context, baseDialer transport.PacketDialer, wrap WrapPacketDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { result := &ConnectivityResult{} interceptDialer := transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) { conn, err := baseDialer.DialPacket(ctx, addr) if conn != nil { - // This doesn't work with the PacketListenerDialer we use because it returns the address of the target, not the proxy. - // TODO(fortuna): make PLD use the first hop address or try something else. result.ConnectionAddress = conn.RemoteAddr().String() } return conn, err }) - dialer, err := wrap(ctx, interceptDialer) + dialer, err := wrap(interceptDialer) if err != nil { return nil, err } diff --git a/x/examples/test-connectivity/main.go b/x/examples/test-connectivity/main.go index a1b89009..9c4030b0 100644 --- a/x/examples/test-connectivity/main.go +++ b/x/examples/test-connectivity/main.go @@ -187,12 +187,12 @@ func main() { startTime := time.Now() switch proto { case "tcp": - wrap := func(ctx context.Context, baseDialer transport.StreamDialer) (transport.StreamDialer, error) { + wrap := func(baseDialer transport.StreamDialer) (transport.StreamDialer, error) { return config.WrapStreamDialer(baseDialer, *transportFlag) } testResult, testErr = connectivity.TestStreamConnectivityWithDNS(context.Background(), &transport.TCPDialer{}, wrap, resolverAddress, *domainFlag) case "udp": - wrap := func(ctx context.Context, baseDialer transport.PacketDialer) (transport.PacketDialer, error) { + wrap := func(baseDialer transport.PacketDialer) (transport.PacketDialer, error) { return config.WrapPacketDialer(baseDialer, *transportFlag) } testResult, testErr = connectivity.TestPacketConnectivityWithDNS(context.Background(), &transport.UDPDialer{}, wrap, resolverAddress, *domainFlag) From 85269c2e7dc1ee8454f541e0816ba2edb8c80eb7 Mon Sep 17 00:00:00 2001 From: Vinicius Fortuna Date: Fri, 19 Apr 2024 15:07:44 -0400 Subject: [PATCH 5/9] Comment --- x/connectivity/connectivity.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x/connectivity/connectivity.go b/x/connectivity/connectivity.go index bf8e7026..9aa5476a 100644 --- a/x/connectivity/connectivity.go +++ b/x/connectivity/connectivity.go @@ -78,6 +78,7 @@ type WrapStreamDialer func(baseDialer transport.StreamDialer) (transport.StreamD // TestStreamConnectivityWithDNS tests weather we can get a response from a DNS resolver at resolverAddress over a stream connection. It sends testDomain as the query. // It uses the baseDialer to create a first-hop connection to the proxy, and the wrap to apply the transport. +// The baseDialer is typically TCPDialer, but it can be replaced for remote measurements. func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.StreamDialer, wrap WrapStreamDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { result := &ConnectivityResult{} interceptDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { @@ -103,6 +104,7 @@ type WrapPacketDialer func(baseDialer transport.PacketDialer) (transport.PacketD // TestPacketConnectivityWithDNS tests weather we can get a response from a DNS resolver at resolverAddress over a packet connection. It sends testDomain as the query. // It uses the baseDialer to create a first-hop connection to the proxy, and the wrap to apply the transport. +// The baseDialer is typically UDPDialer, but it can be replaced for remote measurements. func TestPacketConnectivityWithDNS(ctx context.Context, baseDialer transport.PacketDialer, wrap WrapPacketDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { result := &ConnectivityResult{} interceptDialer := transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) { From db2f334b6aa46715a679b4998520b18adaabf8f7 Mon Sep 17 00:00:00 2001 From: Vinicius Fortuna Date: Tue, 23 Apr 2024 22:22:30 -0400 Subject: [PATCH 6/9] Collect connections --- x/connectivity/connectivity.go | 70 +++++++++++++++++++++------- x/examples/test-connectivity/main.go | 26 +++++++++-- 2 files changed, 76 insertions(+), 20 deletions(-) diff --git a/x/connectivity/connectivity.go b/x/connectivity/connectivity.go index 9aa5476a..175c9ede 100644 --- a/x/connectivity/connectivity.go +++ b/x/connectivity/connectivity.go @@ -30,7 +30,16 @@ import ( // ConnectivityResult captures the observed result of the connectivity test. type ConnectivityResult struct { // Address we connected to - ConnectionAddress string + Connections []ConnectionResult + // Address of the connection that was selected + SelectedAddress string + // Observed error + Error *ConnectivityError +} + +type ConnectionResult struct { + // Address we connected to + Address string // Observed error Error *ConnectivityError } @@ -80,11 +89,28 @@ type WrapStreamDialer func(baseDialer transport.StreamDialer) (transport.StreamD // It uses the baseDialer to create a first-hop connection to the proxy, and the wrap to apply the transport. // The baseDialer is typically TCPDialer, but it can be replaced for remote measurements. func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.StreamDialer, wrap WrapStreamDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { - result := &ConnectivityResult{} + testResult := &ConnectivityResult{ + Connections: make([]ConnectionResult, 0), + } interceptDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { - conn, err := baseDialer.DialStream(ctx, addr) - if conn != nil { - result.ConnectionAddress = conn.RemoteAddr().String() + host, port, err := net.SplitHostPort(addr) + if err != nil { + return nil, err + } + ips, err := (&net.Resolver{PreferGo: false}).LookupHost(ctx, host) + var conn transport.StreamConn + for _, ip := range ips { + addr := net.JoinHostPort(ip, port) + connResult := ConnectionResult{Address: addr} + conn, err = baseDialer.DialStream(ctx, addr) + if err != nil { + connResult.Error = makeConnectivityError("connect", err) + } + testResult.Connections = append(testResult.Connections, connResult) + if err == nil { + testResult.SelectedAddress = addr + break + } } return conn, err }) @@ -93,11 +119,11 @@ func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.Str return nil, err } resolver := dns.NewTCPResolver(dialer, resolverAddress) - result.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) + testResult.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) if err != nil { return nil, err } - return result, nil + return testResult, nil } type WrapPacketDialer func(baseDialer transport.PacketDialer) (transport.PacketDialer, error) @@ -106,11 +132,26 @@ type WrapPacketDialer func(baseDialer transport.PacketDialer) (transport.PacketD // It uses the baseDialer to create a first-hop connection to the proxy, and the wrap to apply the transport. // The baseDialer is typically UDPDialer, but it can be replaced for remote measurements. func TestPacketConnectivityWithDNS(ctx context.Context, baseDialer transport.PacketDialer, wrap WrapPacketDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { - result := &ConnectivityResult{} + testResult := &ConnectivityResult{} interceptDialer := transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) { - conn, err := baseDialer.DialPacket(ctx, addr) - if conn != nil { - result.ConnectionAddress = conn.RemoteAddr().String() + host, port, err := net.SplitHostPort(addr) + if err != nil { + return nil, err + } + ips, err := (&net.Resolver{PreferGo: false}).LookupHost(ctx, host) + var conn net.Conn + for _, ip := range ips { + addr := net.JoinHostPort(ip, port) + connResult := ConnectionResult{Address: addr} + conn, err = baseDialer.DialPacket(ctx, addr) + if err != nil { + connResult.Error = makeConnectivityError("connect", err) + } + testResult.Connections = append(testResult.Connections, connResult) + if err == nil { + testResult.SelectedAddress = addr + break + } } return conn, err }) @@ -119,11 +160,8 @@ func TestPacketConnectivityWithDNS(ctx context.Context, baseDialer transport.Pac return nil, err } resolver := dns.NewUDPResolver(dialer, resolverAddress) - result.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) - if err != nil { - return nil, err - } - return result, nil + testResult.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) + return testResult, err } // TestConnectivityWithResolver tests weather we can get a response from the given [Resolver]. It can be used diff --git a/x/examples/test-connectivity/main.go b/x/examples/test-connectivity/main.go index d1a4301e..c5575b71 100644 --- a/x/examples/test-connectivity/main.go +++ b/x/examples/test-connectivity/main.go @@ -48,7 +48,8 @@ type connectivityReport struct { Transport string `json:"transport"` // The address of the selected connection to the proxy server. - ConnectionAddress addressJSON `json:"connection_address"` + Connections []connectionJSON `json:"connections"` + SelectedAddress *addressJSON `json:"selected_address,omitempty"` // Observations Time time.Time `json:"time"` @@ -56,6 +57,11 @@ type connectivityReport struct { Error *errorJSON `json:"error"` } +type connectionJSON struct { + Address *addressJSON `json:"address,omitempty"` + Error *errorJSON `json:"error"` +} + type errorJSON struct { // TODO: add Shadowsocks/Transport error Op string `json:"op,omitempty"` @@ -221,9 +227,21 @@ func main() { DurationMs: testDuration.Milliseconds(), Error: makeErrorRecord(testResult.Error), } - connectionAddressJSON, err := newAddressJSON(testResult.ConnectionAddress) - if err == nil { - r.ConnectionAddress = connectionAddressJSON + for _, cr := range testResult.Connections { + cj := connectionJSON{ + Error: makeErrorRecord(cr.Error), + } + addressJSON, err := newAddressJSON(cr.Address) + if err == nil { + cj.Address = &addressJSON + } + r.Connections = append(r.Connections, cj) + } + if testResult.SelectedAddress != "" { + selectedAddressJSON, err := newAddressJSON(testResult.SelectedAddress) + if err == nil { + r.SelectedAddress = &selectedAddressJSON + } } if reportCollector != nil { From 96ae7fb2bc107166368d5571daf45856a186d150 Mon Sep 17 00:00:00 2001 From: Vinicius Fortuna Date: Wed, 24 Apr 2024 12:01:58 -0400 Subject: [PATCH 7/9] Fix --- x/connectivity/connectivity.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/x/connectivity/connectivity.go b/x/connectivity/connectivity.go index 175c9ede..e7e9a958 100644 --- a/x/connectivity/connectivity.go +++ b/x/connectivity/connectivity.go @@ -102,7 +102,10 @@ func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.Str for _, ip := range ips { addr := net.JoinHostPort(ip, port) connResult := ConnectionResult{Address: addr} - conn, err = baseDialer.DialStream(ctx, addr) + deadline := time.Now().Add(1 * time.Second) + ipCtx, cancel := context.WithDeadline(ctx, deadline) + defer cancel() + conn, err = baseDialer.DialStream(ipCtx, addr) if err != nil { connResult.Error = makeConnectivityError("connect", err) } @@ -118,7 +121,13 @@ func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.Str if err != nil { return nil, err } - resolver := dns.NewTCPResolver(dialer, resolverAddress) + resolverConn, err := dialer.DialStream(ctx, resolverAddress) + if err != nil { + return nil, err + } + resolver := dns.NewTCPResolver(transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { + return resolverConn, nil + }), resolverAddress) testResult.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) if err != nil { return nil, err From 7542c5a179bd885df39c2bbcdc8d251f2a71567a Mon Sep 17 00:00:00 2001 From: Vinicius Fortuna Date: Wed, 24 Apr 2024 12:02:12 -0400 Subject: [PATCH 8/9] Timeout --- x/connectivity/connectivity.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x/connectivity/connectivity.go b/x/connectivity/connectivity.go index e7e9a958..cfc1148e 100644 --- a/x/connectivity/connectivity.go +++ b/x/connectivity/connectivity.go @@ -102,7 +102,7 @@ func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.Str for _, ip := range ips { addr := net.JoinHostPort(ip, port) connResult := ConnectionResult{Address: addr} - deadline := time.Now().Add(1 * time.Second) + deadline := time.Now().Add(5 * time.Second) ipCtx, cancel := context.WithDeadline(ctx, deadline) defer cancel() conn, err = baseDialer.DialStream(ipCtx, addr) From 7d8d6020fd1c680e8d45e198d2d06c0391535fe3 Mon Sep 17 00:00:00 2001 From: Vinicius Fortuna Date: Wed, 24 Apr 2024 16:25:23 -0400 Subject: [PATCH 9/9] To fix --- x/connectivity/connectivity.go | 36 ++++++++++++++++++---------- x/examples/test-connectivity/main.go | 11 ++++++--- 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/x/connectivity/connectivity.go b/x/connectivity/connectivity.go index cfc1148e..628856b7 100644 --- a/x/connectivity/connectivity.go +++ b/x/connectivity/connectivity.go @@ -29,21 +29,30 @@ import ( // ConnectivityResult captures the observed result of the connectivity test. type ConnectivityResult struct { - // Address we connected to - Connections []ConnectionResult + // The result of the initial connect attempt + Connect ConnectResult // Address of the connection that was selected SelectedAddress string // Observed error Error *ConnectivityError } -type ConnectionResult struct { - // Address we connected to - Address string +type ConnectResult struct { + // Address we dialed + DialedAddress string + // Address we selected + SelectedAddress string + // Lists each connection attempt + Attempts []ConnectionAttempt // Observed error Error *ConnectivityError } +type ConnectionAttempt struct { + Address string + Error error +} + // ConnectivityError captures the observed error of the connectivity test. type ConnectivityError struct { // Which operation in the test that failed: "connect", "send" or "receive" @@ -89,10 +98,12 @@ type WrapStreamDialer func(baseDialer transport.StreamDialer) (transport.StreamD // It uses the baseDialer to create a first-hop connection to the proxy, and the wrap to apply the transport. // The baseDialer is typically TCPDialer, but it can be replaced for remote measurements. func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.StreamDialer, wrap WrapStreamDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { - testResult := &ConnectivityResult{ - Connections: make([]ConnectionResult, 0), - } + testResult := &ConnectivityResult{} interceptDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { + connectResult := &testResult.Connect + // Captures the address of the first hop, before resolution. + connectResult.DialedAddress = addr + connectResult.Attempts = make([]ConnectionAttempt, 0) host, port, err := net.SplitHostPort(addr) if err != nil { return nil, err @@ -101,17 +112,18 @@ func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.Str var conn transport.StreamConn for _, ip := range ips { addr := net.JoinHostPort(ip, port) - connResult := ConnectionResult{Address: addr} + attemptResult := ConnectionAttempt{Address: addr} + // TODO: This is slow. Race and overlap attempts instead. deadline := time.Now().Add(5 * time.Second) ipCtx, cancel := context.WithDeadline(ctx, deadline) defer cancel() conn, err = baseDialer.DialStream(ipCtx, addr) if err != nil { - connResult.Error = makeConnectivityError("connect", err) + attemptResult.Error = err } - testResult.Connections = append(testResult.Connections, connResult) + connectResult.Attempts = append(connectResult.Attempts, attemptResult) if err == nil { - testResult.SelectedAddress = addr + connectResult.SelectedAddress = addr break } } diff --git a/x/examples/test-connectivity/main.go b/x/examples/test-connectivity/main.go index c5575b71..03daad60 100644 --- a/x/examples/test-connectivity/main.go +++ b/x/examples/test-connectivity/main.go @@ -47,9 +47,9 @@ type connectivityReport struct { // TODO(fortuna): add sanitized transport config. Transport string `json:"transport"` - // The address of the selected connection to the proxy server. - Connections []connectionJSON `json:"connections"` - SelectedAddress *addressJSON `json:"selected_address,omitempty"` + // The result for the connection. + Connect connectAttemptJSON `json:"connect"` + SelectedAddress *addressJSON `json:"selected_address,omitempty"` // Observations Time time.Time `json:"time"` @@ -57,6 +57,11 @@ type connectivityReport struct { Error *errorJSON `json:"error"` } +type connectAttemptJSON struct { + Address *addressJSON `json:"address,omitempty"` + Attempts []connectAttemptJSON `json:"attempts,omitempty"` +} + type connectionJSON struct { Address *addressJSON `json:"address,omitempty"` Error *errorJSON `json:"error"`