From 4cb2ac52190ac09a4ed89ac0fab754aeb603cb4a Mon Sep 17 00:00:00 2001 From: Petr Zhizhin Date: Sat, 9 Nov 2024 23:41:40 +0100 Subject: [PATCH] Fix PR comments --- x/configurl/disorder.go | 6 ++-- x/disorder/stream_dialer.go | 54 ++++++++---------------------- x/disorder/writer.go | 66 +++++++++++++++++++++---------------- x/go.mod | 10 +++--- 4 files changed, 60 insertions(+), 76 deletions(-) diff --git a/x/configurl/disorder.go b/x/configurl/disorder.go index da0695c5..5ad5b40a 100644 --- a/x/configurl/disorder.go +++ b/x/configurl/disorder.go @@ -29,11 +29,11 @@ func registerDisorderDialer(r TypeRegistry[transport.StreamDialer], typeID strin if err != nil { return nil, err } - prefixBytesStr := config.URL.Opaque - prefixBytes, err := strconv.Atoi(prefixBytesStr) + disorderPacketNStr := config.URL.Opaque + disorderPacketN, err := strconv.Atoi(disorderPacketNStr) if err != nil { return nil, fmt.Errorf("disoder: could not parse splice position: %v", err) } - return disorder.NewStreamDialer(sd, int64(prefixBytes)) + return disorder.NewStreamDialer(sd, disorderPacketN) }) } diff --git a/x/disorder/stream_dialer.go b/x/disorder/stream_dialer.go index 30989f49..eccdadf7 100644 --- a/x/disorder/stream_dialer.go +++ b/x/disorder/stream_dialer.go @@ -19,18 +19,14 @@ import ( "errors" "fmt" "net" - "net/netip" "github.com/Jigsaw-Code/outline-sdk/transport" - "golang.org/x/net/ipv4" - "golang.org/x/net/ipv6" + "github.com/Jigsaw-Code/outline-sdk/x/sockopt" ) -var defaultTTL = 64 - type disorderDialer struct { - dialer transport.StreamDialer - splitPoint int64 + dialer transport.StreamDialer + disorderPacketN int } var _ transport.StreamDialer = (*disorderDialer)(nil) @@ -43,11 +39,11 @@ var _ transport.StreamDialer = (*disorderDialer)(nil) // * The next part of data is sent normally // * Server notices the lost fragment and requests re-transmission // Currently this only works with Linux kernel (for Windows/Mac a different implementation is required) -func NewStreamDialer(dialer transport.StreamDialer, prefixBytes int64) (transport.StreamDialer, error) { +func NewStreamDialer(dialer transport.StreamDialer, disorderPacketN int) (transport.StreamDialer, error) { if dialer == nil { return nil, errors.New("argument dialer must not be nil") } - return &disorderDialer{dialer: dialer, splitPoint: prefixBytes}, nil + return &disorderDialer{dialer: dialer, disorderPacketN: disorderPacketN}, nil } // DialStream implements [transport.StreamDialer].DialStream. @@ -57,43 +53,21 @@ func (d *disorderDialer) DialStream(ctx context.Context, remoteAddr string) (tra return nil, err } - oldTTL, err := setHopLimit(innerConn, 1) - if err != nil { - return nil, fmt.Errorf("disorder strategy: failed to change ttl: %w", err) + tcpInnerConn, ok := innerConn.(*net.TCPConn) + if !ok { + return nil, fmt.Errorf("disorder strategy: expected base dialer to return TCPConn") } - - dw := NewWriter(innerConn, d.splitPoint, oldTTL) - - return transport.WrapConn(innerConn, innerConn, dw), nil -} - -// setHopLimit changes the socket TTL for IPv4 (or HopLimit for IPv6) and returns the old value -// socket must be `*net.TCPConn` -func setHopLimit(conn net.Conn, ttl int) (oldTTL int, err error) { - addr, err := netip.ParseAddrPort(conn.RemoteAddr().String()) + tcpOptions, err := sockopt.NewTCPOptions(tcpInnerConn) if err != nil { - return 0, fmt.Errorf("could not parse remote addr: %w", err) + return nil, err } - switch { - case addr.Addr().Is4(): - conn := ipv4.NewConn(conn) - oldTTL, _ = conn.TTL() - err = conn.SetTTL(ttl) - case addr.Addr().Is6(): - conn := ipv6.NewConn(conn) - oldTTL, _ = conn.HopLimit() - err = conn.SetHopLimit(ttl) - default: - return 0, fmt.Errorf("unknown remote addr type (%v)", addr.Addr().String()) - } + defaultHopLimit, err := tcpOptions.HopLimit() if err != nil { - return 0, fmt.Errorf("failed to change TTL: %w", err) + return nil, fmt.Errorf("disorder strategy: failed to get base connection HopLimit: %w", err) } - if oldTTL == 0 { - oldTTL = defaultTTL - } + dw := NewWriter(innerConn, tcpOptions, d.disorderPacketN, defaultHopLimit) - return + return transport.WrapConn(innerConn, innerConn, dw), nil } diff --git a/x/disorder/writer.go b/x/disorder/writer.go index d7a87cf8..8804ebc3 100644 --- a/x/disorder/writer.go +++ b/x/disorder/writer.go @@ -17,50 +17,60 @@ package disorder import ( "fmt" "io" - "net" - "sync" + + "github.com/Jigsaw-Code/outline-sdk/x/sockopt" ) type disorderWriter struct { - conn net.Conn - resetTTL sync.Once - prefixBytes int64 - oldTTL int + conn io.Writer + tcpOptions sockopt.TCPOptions + runAtPacketN int + defaultHopLimit int + writeCalls int } var _ io.Writer = (*disorderWriter)(nil) -// TODO -// var _ io.ReaderFrom = (*splitWriterReaderFrom)(nil) +// Setting number of hops to 1 will lead to data to get lost on host +var disorderHopN = 1 -// TODO -func NewWriter(conn net.Conn, prefixBytes int64, oldTTL int) io.Writer { - // TODO support ReaderFrom +func NewWriter(conn io.Writer, tcpOptions sockopt.TCPOptions, runAtPacketN int, defaultHopLimit int) io.Writer { return &disorderWriter{ - conn: conn, - prefixBytes: prefixBytes, - oldTTL: oldTTL, + conn: conn, + tcpOptions: tcpOptions, + runAtPacketN: runAtPacketN, + defaultHopLimit: defaultHopLimit, + writeCalls: 0, } } func (w *disorderWriter) Write(data []byte) (written int, err error) { - if 0 < w.prefixBytes && w.prefixBytes < int64(len(data)) { - written, err = w.conn.Write(data[:w.prefixBytes]) - w.prefixBytes -= int64(written) + shouldDoDisorder := w.writeCalls == w.runAtPacketN + if shouldDoDisorder { + err = w.tcpOptions.SetHopLimit(disorderHopN) if err != nil { - return written, err + return 0, fmt.Errorf("failed to set the hop limit to %d: %w", disorderHopN, err) } - data = data[written:] - } - w.resetTTL.Do(func() { - _, err = setHopLimit(w.conn, w.oldTTL) - }) - if err != nil { - return written, fmt.Errorf("setsockopt IPPROTO_IP/IP_TTL error: %w", err) + + // The packet will get lost at the first send, since the hop limit is too low } n, err := w.conn.Write(data) - written += n - w.prefixBytes -= int64(n) - return written, err + + // TODO: Wait for queued data to be sent by the kernel to the socket + + if shouldDoDisorder { + // The packet with low hop limit was sent + // Make next calls send data normally + // + // The packet with the low hop limit will get resent by the kernel later + // The network filters will receive data out of order + err = w.tcpOptions.SetHopLimit(w.defaultHopLimit) + if err != nil { + return n, fmt.Errorf("failed to set the hop limit error %d: %w", w.defaultHopLimit, err) + } + } + + w.writeCalls += 1 + return n, err } diff --git a/x/go.mod b/x/go.mod index 71db16e8..b432aec4 100644 --- a/x/go.mod +++ b/x/go.mod @@ -13,9 +13,9 @@ require ( github.com/stretchr/testify v1.9.0 github.com/vishvananda/netlink v1.1.0 golang.org/x/mobile v0.0.0-20240520174638-fa72addaaa1b - golang.org/x/net v0.30.0 - golang.org/x/sys v0.26.0 - golang.org/x/term v0.25.0 + golang.org/x/net v0.28.0 + golang.org/x/sys v0.23.0 + golang.org/x/term v0.23.0 ) require ( @@ -72,11 +72,11 @@ require ( github.com/wader/filtertransport v0.0.0-20200316221534-bdd9e61eee78 // indirect gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/goptlib v1.5.0 // indirect go.uber.org/mock v0.4.0 // indirect - golang.org/x/crypto v0.28.0 // indirect + golang.org/x/crypto v0.26.0 // indirect golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/mod v0.17.0 // indirect golang.org/x/sync v0.8.0 // indirect - golang.org/x/text v0.19.0 // indirect + golang.org/x/text v0.17.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect