From e3d398eb590859027d410798b22eaeadb2c652cb Mon Sep 17 00:00:00 2001 From: Petr Zhizhin Date: Sun, 10 Nov 2024 01:23:18 +0100 Subject: [PATCH 1/3] Allow for a connection to check if it's sending bytes --- x/sockopt/is_sending_bytes_linux.go | 39 ++++++++++++++ x/sockopt/is_sending_bytes_not_implemented.go | 17 ++++++ x/sockopt/sockopt.go | 53 ++++++++++++++++++- 3 files changed, 108 insertions(+), 1 deletion(-) create mode 100644 x/sockopt/is_sending_bytes_linux.go create mode 100644 x/sockopt/is_sending_bytes_not_implemented.go diff --git a/x/sockopt/is_sending_bytes_linux.go b/x/sockopt/is_sending_bytes_linux.go new file mode 100644 index 00000000..01ce4fa9 --- /dev/null +++ b/x/sockopt/is_sending_bytes_linux.go @@ -0,0 +1,39 @@ +//go:build linux + +package sockopt + +import ( + "net" + + "golang.org/x/sys/unix" +) + +func isSocketFdSendingBytes(fd int) (bool, error) { + tcpInfo, err := unix.GetsockoptTCPInfo(fd, unix.IPPROTO_TCP, unix.TCP_INFO) + if err != nil { + return false, err + } + + // 1 == TCP_ESTABLISHED, but for some reason not available in the package + if tcpInfo.State != unix.BPF_TCP_ESTABLISHED { + // If the connection is not established, the socket is not sending bytes + return false, nil + } + + return tcpInfo.Notsent_bytes != 0, nil +} + +func isConnectionSendingBytesImplemented() bool { + return true +} + +func isConnectionSendingBytes(conn *net.TCPConn) (result bool, err error) { + syscallConn, err := conn.SyscallConn() + if err != nil { + return false, err + } + syscallConn.Control(func(fd uintptr) { + result, err = isSocketFdSendingBytes(int(fd)) + }) + return +} diff --git a/x/sockopt/is_sending_bytes_not_implemented.go b/x/sockopt/is_sending_bytes_not_implemented.go new file mode 100644 index 00000000..b76024d4 --- /dev/null +++ b/x/sockopt/is_sending_bytes_not_implemented.go @@ -0,0 +1,17 @@ +//go:build !linux + +package sockopt + +import ( + "errors" + "fmt" + "net" +) + +func isConnectionSendingBytesImplemented() bool { + return false +} + +func isConnectionSendingBytes(_ *net.TCPConn) (bool, error) { + return false, fmt.Errorf("%w: checking if socket is sending bytes is not implemented on this platform", errors.ErrUnsupported) +} diff --git a/x/sockopt/sockopt.go b/x/sockopt/sockopt.go index 44cb575e..e22744d6 100644 --- a/x/sockopt/sockopt.go +++ b/x/sockopt/sockopt.go @@ -19,11 +19,21 @@ import ( "fmt" "net" "net/netip" + "time" "golang.org/x/net/ipv4" "golang.org/x/net/ipv6" ) +type HasWaitUntilBytesAreSent interface { + // Wait until all bytes are sent to the socket. + // Returns ErrUnsupported if the platform doesn't support it. + // May return a different error. + WaitUntilBytesAreSent() error + // Checks if the OS supports waiting until the bytes are sent + OsSupportsWaitingUntilBytesAreSent() bool +} + // HasHopLimit enables manipulation of the hop limit option. type HasHopLimit interface { // HopLimit returns the hop limit field value for outgoing packets. @@ -50,15 +60,51 @@ var _ HasHopLimit = (*hopLimitOption)(nil) // TCPOptions represents options for TCP connections. type TCPOptions interface { + HasWaitUntilBytesAreSent HasHopLimit } type tcpOptions struct { hopLimitOption + + conn *net.TCPConn + + // Timeout after which we return an error + waitingTimeout time.Duration + // Delay between checking the socket + waitingDelay time.Duration } var _ TCPOptions = (*tcpOptions)(nil) +func (o *tcpOptions) SetWaitingTimeout(timeout time.Duration) { + o.waitingTimeout = timeout +} + +func (o *tcpOptions) SetWaitingDelay(delay time.Duration) { + o.waitingDelay = delay +} + +func (o *tcpOptions) OsSupportsWaitingUntilBytesAreSent() bool { + return isConnectionSendingBytesImplemented() +} + +func (o *tcpOptions) WaitUntilBytesAreSent() error { + startTime := time.Now() + for time.Since(startTime) < o.waitingTimeout { + isSendingBytes, err := isConnectionSendingBytes(o.conn) + if err != nil { + return err + } + if !isSendingBytes { + return nil + } + + time.Sleep(o.waitingDelay) + } + return fmt.Errorf("waiting for socket to send all bytes: timeout exceeded") +} + // newHopLimit creates a hopLimitOption from a [net.Conn]. Works for both TCP or UDP. func newHopLimit(conn net.Conn) (*hopLimitOption, error) { addr, err := netip.ParseAddrPort(conn.LocalAddr().String()) @@ -87,5 +133,10 @@ func NewTCPOptions(conn *net.TCPConn) (TCPOptions, error) { if err != nil { return nil, err } - return &tcpOptions{hopLimitOption: *hopLimit}, nil + return &tcpOptions{ + hopLimitOption: *hopLimit, + conn: conn, + waitingTimeout: 10 * time.Millisecond, + waitingDelay: 100 * time.Microsecond, + }, nil } From 3f82afb726b6a0edd3dd9e9bc6243c3c3c2cfd7c Mon Sep 17 00:00:00 2001 From: Petr Zhizhin Date: Tue, 12 Nov 2024 22:30:00 +0100 Subject: [PATCH 2/3] Add waitstream strategy --- x/configurl/module.go | 2 ++ x/configurl/wait_stream.go | 32 +++++++++++++++++++ x/wait_stream/stream_dialer.go | 58 ++++++++++++++++++++++++++++++++++ x/wait_stream/writer.go | 49 ++++++++++++++++++++++++++++ 4 files changed, 141 insertions(+) create mode 100644 x/configurl/wait_stream.go create mode 100644 x/wait_stream/stream_dialer.go create mode 100644 x/wait_stream/writer.go diff --git a/x/configurl/module.go b/x/configurl/module.go index 83e14b89..3e11f10c 100644 --- a/x/configurl/module.go +++ b/x/configurl/module.go @@ -65,6 +65,8 @@ func RegisterDefaultProviders(c *ProviderContainer) *ProviderContainer { registerWebsocketStreamDialer(&c.StreamDialers, "ws", c.StreamDialers.NewInstance) registerWebsocketPacketDialer(&c.PacketDialers, "ws", c.StreamDialers.NewInstance) + registerWaitStreamDialer(&c.StreamDialers, "waitstream", c.StreamDialers.NewInstance) + return c } diff --git a/x/configurl/wait_stream.go b/x/configurl/wait_stream.go new file mode 100644 index 00000000..337f6b05 --- /dev/null +++ b/x/configurl/wait_stream.go @@ -0,0 +1,32 @@ +// Copyright 2024 The Outline Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package configurl + +import ( + "context" + + "github.com/Jigsaw-Code/outline-sdk/transport" + "github.com/Jigsaw-Code/outline-sdk/x/wait_stream" +) + +func registerWaitStreamDialer(r TypeRegistry[transport.StreamDialer], typeID string, newSD BuildFunc[transport.StreamDialer]) { + r.RegisterType(typeID, func(ctx context.Context, config *Config) (transport.StreamDialer, error) { + sd, err := newSD(ctx, config.BaseConfig) + if err != nil { + return nil, err + } + return wait_stream.NewStreamDialer(sd) + }) +} diff --git a/x/wait_stream/stream_dialer.go b/x/wait_stream/stream_dialer.go new file mode 100644 index 00000000..e2ffff98 --- /dev/null +++ b/x/wait_stream/stream_dialer.go @@ -0,0 +1,58 @@ +// Copyright 2024 The Outline Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wait_stream + +import ( + "context" + "errors" + "net" + + "github.com/Jigsaw-Code/outline-sdk/transport" + "github.com/Jigsaw-Code/outline-sdk/x/sockopt" +) + +type waitStreamDialer struct { + dialer transport.StreamDialer +} + +var _ transport.StreamDialer = (*waitStreamDialer)(nil) + +func NewStreamDialer(dialer transport.StreamDialer) (transport.StreamDialer, error) { + if dialer == nil { + return nil, errors.New("argument dialer must not be nil") + } + return &waitStreamDialer{dialer: dialer}, nil +} + +func (d *waitStreamDialer) DialStream(ctx context.Context, remoteAddr string) (transport.StreamConn, error) { + innerConn, err := d.dialer.DialStream(ctx, remoteAddr) + if err != nil { + return nil, err + } + + tcpInnerConn, ok := innerConn.(*net.TCPConn) + if !ok { + return nil, errors.New("wait_stream strategy: expected base dialer to return TCPConn") + } + + tcpOptions, err := sockopt.NewTCPOptions(tcpInnerConn) + if err != nil { + return nil, err + } + + dw := NewWriter(innerConn, tcpOptions) + + return transport.WrapConn(innerConn, innerConn, dw), nil +} diff --git a/x/wait_stream/writer.go b/x/wait_stream/writer.go new file mode 100644 index 00000000..103a3a64 --- /dev/null +++ b/x/wait_stream/writer.go @@ -0,0 +1,49 @@ +// Copyright 2024 The Outline Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wait_stream + +import ( + "errors" + "fmt" + "io" + + "github.com/Jigsaw-Code/outline-sdk/x/sockopt" +) + +type waitStreamWriter struct { + conn io.Writer + tcpOptions sockopt.TCPOptions +} + +var _ io.Writer = (*waitStreamWriter)(nil) + +func NewWriter(conn io.Writer, tcpOptions sockopt.TCPOptions) io.Writer { + return &waitStreamWriter{ + conn: conn, + tcpOptions: tcpOptions, + } +} + +func (w *waitStreamWriter) Write(data []byte) (written int, err error) { + written, err = w.conn.Write(data) + + // This may not be implemented, so it's best effort really. + waitUntilBytesAreSentErr := w.tcpOptions.WaitUntilBytesAreSent() + if waitUntilBytesAreSentErr != nil && !errors.Is(waitUntilBytesAreSentErr, errors.ErrUnsupported) { + return written, fmt.Errorf("error when waiting for stream to send all bytes: %w", waitUntilBytesAreSentErr) + } + + return +} From 699975616b5d00162e1cd69690c25642c3461955 Mon Sep 17 00:00:00 2001 From: Petr Zhizhin Date: Wed, 13 Nov 2024 02:07:34 +0100 Subject: [PATCH 3/3] Review fixes --- x/configurl/wait_stream.go | 32 ++++++++++- x/sockopt/is_sending_bytes_linux.go | 39 -------------- x/sockopt/sockopt.go | 53 +----------------- x/wait_stream/is_sending_bytes_linux.go | 15 ++++++ .../is_sending_bytes_not_implemented.go | 9 +--- x/wait_stream/stream_dialer.go | 42 ++++++++++----- x/wait_stream/writer.go | 54 ++++++++++++++----- 7 files changed, 121 insertions(+), 123 deletions(-) delete mode 100644 x/sockopt/is_sending_bytes_linux.go create mode 100644 x/wait_stream/is_sending_bytes_linux.go rename x/{sockopt => wait_stream}/is_sending_bytes_not_implemented.go (54%) diff --git a/x/configurl/wait_stream.go b/x/configurl/wait_stream.go index 337f6b05..f6e1f4bf 100644 --- a/x/configurl/wait_stream.go +++ b/x/configurl/wait_stream.go @@ -16,6 +16,9 @@ package configurl import ( "context" + "fmt" + "net/url" + "time" "github.com/Jigsaw-Code/outline-sdk/transport" "github.com/Jigsaw-Code/outline-sdk/x/wait_stream" @@ -27,6 +30,33 @@ func registerWaitStreamDialer(r TypeRegistry[transport.StreamDialer], typeID str if err != nil { return nil, err } - return wait_stream.NewStreamDialer(sd) + + queryUrlParameters, err := url.ParseQuery(config.URL.Opaque) + if err != nil { + return nil, fmt.Errorf("waitstream: failed to parse URL parameters: %w", err) + } + + resultStreamDialer, err := wait_stream.NewStreamDialer(sd) + if err != nil { + return nil, err + } + + if queryUrlParameters.Has("timeout") { + timeout, err := time.ParseDuration(queryUrlParameters.Get("timeout")) + if err != nil { + return nil, fmt.Errorf("waitstream: failed to parse timeout parameter: %w", err) + } + resultStreamDialer.SetWaitingTimeout(timeout) + } + + if queryUrlParameters.Has("delay") { + delay, err := time.ParseDuration(queryUrlParameters.Get("delay")) + if err != nil { + return nil, fmt.Errorf("waitstream: failed to parse delay parameter: %w", err) + } + resultStreamDialer.SetWaitingDelay(delay) + } + + return resultStreamDialer, err }) } diff --git a/x/sockopt/is_sending_bytes_linux.go b/x/sockopt/is_sending_bytes_linux.go deleted file mode 100644 index 01ce4fa9..00000000 --- a/x/sockopt/is_sending_bytes_linux.go +++ /dev/null @@ -1,39 +0,0 @@ -//go:build linux - -package sockopt - -import ( - "net" - - "golang.org/x/sys/unix" -) - -func isSocketFdSendingBytes(fd int) (bool, error) { - tcpInfo, err := unix.GetsockoptTCPInfo(fd, unix.IPPROTO_TCP, unix.TCP_INFO) - if err != nil { - return false, err - } - - // 1 == TCP_ESTABLISHED, but for some reason not available in the package - if tcpInfo.State != unix.BPF_TCP_ESTABLISHED { - // If the connection is not established, the socket is not sending bytes - return false, nil - } - - return tcpInfo.Notsent_bytes != 0, nil -} - -func isConnectionSendingBytesImplemented() bool { - return true -} - -func isConnectionSendingBytes(conn *net.TCPConn) (result bool, err error) { - syscallConn, err := conn.SyscallConn() - if err != nil { - return false, err - } - syscallConn.Control(func(fd uintptr) { - result, err = isSocketFdSendingBytes(int(fd)) - }) - return -} diff --git a/x/sockopt/sockopt.go b/x/sockopt/sockopt.go index e22744d6..44cb575e 100644 --- a/x/sockopt/sockopt.go +++ b/x/sockopt/sockopt.go @@ -19,21 +19,11 @@ import ( "fmt" "net" "net/netip" - "time" "golang.org/x/net/ipv4" "golang.org/x/net/ipv6" ) -type HasWaitUntilBytesAreSent interface { - // Wait until all bytes are sent to the socket. - // Returns ErrUnsupported if the platform doesn't support it. - // May return a different error. - WaitUntilBytesAreSent() error - // Checks if the OS supports waiting until the bytes are sent - OsSupportsWaitingUntilBytesAreSent() bool -} - // HasHopLimit enables manipulation of the hop limit option. type HasHopLimit interface { // HopLimit returns the hop limit field value for outgoing packets. @@ -60,51 +50,15 @@ var _ HasHopLimit = (*hopLimitOption)(nil) // TCPOptions represents options for TCP connections. type TCPOptions interface { - HasWaitUntilBytesAreSent HasHopLimit } type tcpOptions struct { hopLimitOption - - conn *net.TCPConn - - // Timeout after which we return an error - waitingTimeout time.Duration - // Delay between checking the socket - waitingDelay time.Duration } var _ TCPOptions = (*tcpOptions)(nil) -func (o *tcpOptions) SetWaitingTimeout(timeout time.Duration) { - o.waitingTimeout = timeout -} - -func (o *tcpOptions) SetWaitingDelay(delay time.Duration) { - o.waitingDelay = delay -} - -func (o *tcpOptions) OsSupportsWaitingUntilBytesAreSent() bool { - return isConnectionSendingBytesImplemented() -} - -func (o *tcpOptions) WaitUntilBytesAreSent() error { - startTime := time.Now() - for time.Since(startTime) < o.waitingTimeout { - isSendingBytes, err := isConnectionSendingBytes(o.conn) - if err != nil { - return err - } - if !isSendingBytes { - return nil - } - - time.Sleep(o.waitingDelay) - } - return fmt.Errorf("waiting for socket to send all bytes: timeout exceeded") -} - // newHopLimit creates a hopLimitOption from a [net.Conn]. Works for both TCP or UDP. func newHopLimit(conn net.Conn) (*hopLimitOption, error) { addr, err := netip.ParseAddrPort(conn.LocalAddr().String()) @@ -133,10 +87,5 @@ func NewTCPOptions(conn *net.TCPConn) (TCPOptions, error) { if err != nil { return nil, err } - return &tcpOptions{ - hopLimitOption: *hopLimit, - conn: conn, - waitingTimeout: 10 * time.Millisecond, - waitingDelay: 100 * time.Microsecond, - }, nil + return &tcpOptions{hopLimitOption: *hopLimit}, nil } diff --git a/x/wait_stream/is_sending_bytes_linux.go b/x/wait_stream/is_sending_bytes_linux.go new file mode 100644 index 00000000..751a7045 --- /dev/null +++ b/x/wait_stream/is_sending_bytes_linux.go @@ -0,0 +1,15 @@ +//go:build linux + +package wait_stream + +import ( + "golang.org/x/sys/unix" +) + +func isSocketFdSendingBytes(fd int) (bool, error) { + tcpInfo, err := unix.GetsockoptTCPInfo(fd, unix.IPPROTO_TCP, unix.TCP_INFO) + if err != nil { + return false, err + } + return tcpInfo.Notsent_bytes != 0, nil +} diff --git a/x/sockopt/is_sending_bytes_not_implemented.go b/x/wait_stream/is_sending_bytes_not_implemented.go similarity index 54% rename from x/sockopt/is_sending_bytes_not_implemented.go rename to x/wait_stream/is_sending_bytes_not_implemented.go index b76024d4..2c89203a 100644 --- a/x/sockopt/is_sending_bytes_not_implemented.go +++ b/x/wait_stream/is_sending_bytes_not_implemented.go @@ -1,17 +1,12 @@ //go:build !linux -package sockopt +package wait_stream import ( "errors" "fmt" - "net" ) -func isConnectionSendingBytesImplemented() bool { - return false -} - -func isConnectionSendingBytes(_ *net.TCPConn) (bool, error) { +func isSocketFdSendingBytes(_ int) (bool, error) { return false, fmt.Errorf("%w: checking if socket is sending bytes is not implemented on this platform", errors.ErrUnsupported) } diff --git a/x/wait_stream/stream_dialer.go b/x/wait_stream/stream_dialer.go index e2ffff98..4a1f8bc1 100644 --- a/x/wait_stream/stream_dialer.go +++ b/x/wait_stream/stream_dialer.go @@ -18,25 +18,48 @@ import ( "context" "errors" "net" + "time" "github.com/Jigsaw-Code/outline-sdk/transport" - "github.com/Jigsaw-Code/outline-sdk/x/sockopt" ) -type waitStreamDialer struct { +type WaitStreamDialer struct { dialer transport.StreamDialer + + // Stop waiting on a packet after this timeout + waitingTimeout time.Duration + // Check if socket is sending bytes that often + waitingDelay time.Duration } -var _ transport.StreamDialer = (*waitStreamDialer)(nil) +var _ transport.StreamDialer = (*WaitStreamDialer)(nil) + +// byeDPI uses a default delay of 500ms with 1ms sleep +// We might reconsider the defaults later, if needed. +// https://github.com/hufrea/byedpi/blob/main/desync.c#L90 +var defaultTimeout = time.Millisecond * 10 +var defaultDelay = time.Microsecond * 1 -func NewStreamDialer(dialer transport.StreamDialer) (transport.StreamDialer, error) { +func NewStreamDialer(dialer transport.StreamDialer) (*WaitStreamDialer, error) { if dialer == nil { return nil, errors.New("argument dialer must not be nil") } - return &waitStreamDialer{dialer: dialer}, nil + return &WaitStreamDialer{ + dialer: dialer, + waitingTimeout: defaultTimeout, + waitingDelay: defaultDelay, + }, nil +} + +func (d *WaitStreamDialer) SetWaitingTimeout(timeout time.Duration) { + d.waitingTimeout = timeout +} + +func (d *WaitStreamDialer) SetWaitingDelay(timeout time.Duration) { + d.waitingDelay = timeout } -func (d *waitStreamDialer) DialStream(ctx context.Context, remoteAddr string) (transport.StreamConn, error) { +func (d *WaitStreamDialer) DialStream(ctx context.Context, remoteAddr string) (transport.StreamConn, error) { innerConn, err := d.dialer.DialStream(ctx, remoteAddr) if err != nil { return nil, err @@ -47,12 +70,7 @@ func (d *waitStreamDialer) DialStream(ctx context.Context, remoteAddr string) (t return nil, errors.New("wait_stream strategy: expected base dialer to return TCPConn") } - tcpOptions, err := sockopt.NewTCPOptions(tcpInnerConn) - if err != nil { - return nil, err - } - - dw := NewWriter(innerConn, tcpOptions) + dw := NewWriter(tcpInnerConn, d.waitingTimeout, d.waitingDelay) return transport.WrapConn(innerConn, innerConn, dw), nil } diff --git a/x/wait_stream/writer.go b/x/wait_stream/writer.go index 103a3a64..ab59e361 100644 --- a/x/wait_stream/writer.go +++ b/x/wait_stream/writer.go @@ -18,32 +18,62 @@ import ( "errors" "fmt" "io" - - "github.com/Jigsaw-Code/outline-sdk/x/sockopt" + "net" + "time" ) type waitStreamWriter struct { - conn io.Writer - tcpOptions sockopt.TCPOptions + conn *net.TCPConn + + waitingTimeout time.Duration + waitingDelay time.Duration } var _ io.Writer = (*waitStreamWriter)(nil) -func NewWriter(conn io.Writer, tcpOptions sockopt.TCPOptions) io.Writer { +func NewWriter(conn *net.TCPConn, waitingTimeout time.Duration, waitingDelay time.Duration) io.Writer { return &waitStreamWriter{ - conn: conn, - tcpOptions: tcpOptions, + conn: conn, + waitingTimeout: waitingTimeout, + waitingDelay: waitingDelay, } } -func (w *waitStreamWriter) Write(data []byte) (written int, err error) { - written, err = w.conn.Write(data) +func isConnectionSendingBytes(conn *net.TCPConn) (result bool, err error) { + syscallConn, err := conn.SyscallConn() + if err != nil { + return false, err + } + syscallConn.Control(func(fd uintptr) { + result, err = isSocketFdSendingBytes(int(fd)) + }) + return +} +func waitUntilBytesAreSent(conn *net.TCPConn, waitingTimeout time.Duration, waitingDelay time.Duration) error { + startTime := time.Now() + for time.Since(startTime) < waitingTimeout { + isSendingBytes, err := isConnectionSendingBytes(conn) + if err != nil { + return err + } + if !isSendingBytes { + return nil + } + + time.Sleep(waitingDelay) + } + // not sure about the right behaviour here: fail or give up waiting? + // giving up feels safer, and matches byeDPI behavior + return nil +} + +func (w *waitStreamWriter) Write(data []byte) (written int, err error) { // This may not be implemented, so it's best effort really. - waitUntilBytesAreSentErr := w.tcpOptions.WaitUntilBytesAreSent() + waitUntilBytesAreSentErr := waitUntilBytesAreSent(w.conn, w.waitingTimeout, w.waitingDelay) if waitUntilBytesAreSentErr != nil && !errors.Is(waitUntilBytesAreSentErr, errors.ErrUnsupported) { - return written, fmt.Errorf("error when waiting for stream to send all bytes: %w", waitUntilBytesAreSentErr) + return 0, fmt.Errorf("error when waiting for stream to send all bytes: %w", waitUntilBytesAreSentErr) } - return + return w.conn.Write(data) }