Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for a connection to check if it's sending bytes #324

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions x/configurl/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func RegisterDefaultProviders(c *ProviderContainer) *ProviderContainer {
registerWebsocketStreamDialer(&c.StreamDialers, "ws", c.StreamDialers.NewInstance)
registerWebsocketPacketDialer(&c.PacketDialers, "ws", c.StreamDialers.NewInstance)

registerWaitStreamDialer(&c.StreamDialers, "waitstream", c.StreamDialers.NewInstance)

return c
}

Expand Down
32 changes: 32 additions & 0 deletions x/configurl/wait_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2024 The Outline Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package configurl

import (
"context"

"github.com/Jigsaw-Code/outline-sdk/transport"
"github.com/Jigsaw-Code/outline-sdk/x/wait_stream"
)

func registerWaitStreamDialer(r TypeRegistry[transport.StreamDialer], typeID string, newSD BuildFunc[transport.StreamDialer]) {
r.RegisterType(typeID, func(ctx context.Context, config *Config) (transport.StreamDialer, error) {
sd, err := newSD(ctx, config.BaseConfig)
if err != nil {
return nil, err
}
return wait_stream.NewStreamDialer(sd)
})
}
39 changes: 39 additions & 0 deletions x/sockopt/is_sending_bytes_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//go:build linux

package sockopt

import (
"net"

"golang.org/x/sys/unix"
)

func isSocketFdSendingBytes(fd int) (bool, error) {
tcpInfo, err := unix.GetsockoptTCPInfo(fd, unix.IPPROTO_TCP, unix.TCP_INFO)
if err != nil {
return false, err
}

// 1 == TCP_ESTABLISHED, but for some reason not available in the package
if tcpInfo.State != unix.BPF_TCP_ESTABLISHED {
// If the connection is not established, the socket is not sending bytes
return false, nil
}

return tcpInfo.Notsent_bytes != 0, nil
}

func isConnectionSendingBytesImplemented() bool {
return true
}

func isConnectionSendingBytes(conn *net.TCPConn) (result bool, err error) {
syscallConn, err := conn.SyscallConn()
if err != nil {
return false, err
}
syscallConn.Control(func(fd uintptr) {
result, err = isSocketFdSendingBytes(int(fd))
})
return
}
17 changes: 17 additions & 0 deletions x/sockopt/is_sending_bytes_not_implemented.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
//go:build !linux
PeterZhizhin marked this conversation as resolved.
Show resolved Hide resolved

package sockopt

import (
"errors"
"fmt"
"net"
)

func isConnectionSendingBytesImplemented() bool {
return false
}

func isConnectionSendingBytes(_ *net.TCPConn) (bool, error) {
return false, fmt.Errorf("%w: checking if socket is sending bytes is not implemented on this platform", errors.ErrUnsupported)
}
53 changes: 52 additions & 1 deletion x/sockopt/sockopt.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry about having TCPOptions have options that are not cross-platform. I'm still trying to figure out what a good design for this stuff is, and we'll only know better when we try different things.

In favor of simpler APIs, for now, let's go with a simple standalone function BytesToSend(Rawconn) that lives in the fake code, since that's the only strategy that really needs it. It shouldn't be defined in platforms that don't support it.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that lives in the fake code, since that's the only strategy that really needs it. It shouldn't be defined in platforms that don't support it.

at least on linux, more strategies need it (split, esp. when it performs multiple small splits, disorder)

Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@ import (
"fmt"
"net"
"net/netip"
"time"

"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
)

type HasWaitUntilBytesAreSent interface {
// Wait until all bytes are sent to the socket.
// Returns ErrUnsupported if the platform doesn't support it.
// May return a different error.
WaitUntilBytesAreSent() error
// Checks if the OS supports waiting until the bytes are sent
OsSupportsWaitingUntilBytesAreSent() bool
}

// HasHopLimit enables manipulation of the hop limit option.
type HasHopLimit interface {
// HopLimit returns the hop limit field value for outgoing packets.
Expand All @@ -50,15 +60,51 @@ var _ HasHopLimit = (*hopLimitOption)(nil)

// TCPOptions represents options for TCP connections.
type TCPOptions interface {
HasWaitUntilBytesAreSent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't have such high-level functionality here. As mentioned below, we should perhaps expose lower level functionality instead.

HasHopLimit
}

type tcpOptions struct {
hopLimitOption

conn *net.TCPConn

// Timeout after which we return an error
waitingTimeout time.Duration
// Delay between checking the socket
waitingDelay time.Duration
}

var _ TCPOptions = (*tcpOptions)(nil)

func (o *tcpOptions) SetWaitingTimeout(timeout time.Duration) {
o.waitingTimeout = timeout
}

func (o *tcpOptions) SetWaitingDelay(delay time.Duration) {
o.waitingDelay = delay
}

func (o *tcpOptions) OsSupportsWaitingUntilBytesAreSent() bool {
return isConnectionSendingBytesImplemented()
}

func (o *tcpOptions) WaitUntilBytesAreSent() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This functionality is too high-level to live here.

I'd say we just need the bytes not sent and state options.

Then you can implement the logic to wait completely separate.

startTime := time.Now()
for time.Since(startTime) < o.waitingTimeout {
isSendingBytes, err := isConnectionSendingBytes(o.conn)
if err != nil {
return err
}
if !isSendingBytes {
return nil
}

time.Sleep(o.waitingDelay)
}
return fmt.Errorf("waiting for socket to send all bytes: timeout exceeded")
}

// newHopLimit creates a hopLimitOption from a [net.Conn]. Works for both TCP or UDP.
func newHopLimit(conn net.Conn) (*hopLimitOption, error) {
addr, err := netip.ParseAddrPort(conn.LocalAddr().String())
Expand Down Expand Up @@ -87,5 +133,10 @@ func NewTCPOptions(conn *net.TCPConn) (TCPOptions, error) {
if err != nil {
return nil, err
}
return &tcpOptions{hopLimitOption: *hopLimit}, nil
return &tcpOptions{
hopLimitOption: *hopLimit,
conn: conn,
waitingTimeout: 10 * time.Millisecond,
waitingDelay: 100 * time.Microsecond,
}, nil
}
58 changes: 58 additions & 0 deletions x/wait_stream/stream_dialer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2024 The Outline Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package wait_stream

import (
"context"
"errors"
"net"

"github.com/Jigsaw-Code/outline-sdk/transport"
"github.com/Jigsaw-Code/outline-sdk/x/sockopt"
)

type waitStreamDialer struct {
dialer transport.StreamDialer
}

var _ transport.StreamDialer = (*waitStreamDialer)(nil)

func NewStreamDialer(dialer transport.StreamDialer) (transport.StreamDialer, error) {
if dialer == nil {
return nil, errors.New("argument dialer must not be nil")
}
return &waitStreamDialer{dialer: dialer}, nil
}

func (d *waitStreamDialer) DialStream(ctx context.Context, remoteAddr string) (transport.StreamConn, error) {
innerConn, err := d.dialer.DialStream(ctx, remoteAddr)
if err != nil {
return nil, err
}

tcpInnerConn, ok := innerConn.(*net.TCPConn)
if !ok {
return nil, errors.New("wait_stream strategy: expected base dialer to return TCPConn")
}

tcpOptions, err := sockopt.NewTCPOptions(tcpInnerConn)
if err != nil {
return nil, err
}

dw := NewWriter(innerConn, tcpOptions)

return transport.WrapConn(innerConn, innerConn, dw), nil
}
49 changes: 49 additions & 0 deletions x/wait_stream/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2024 The Outline Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package wait_stream

import (
"errors"
"fmt"
"io"

"github.com/Jigsaw-Code/outline-sdk/x/sockopt"
)

type waitStreamWriter struct {
conn io.Writer
tcpOptions sockopt.TCPOptions
}

var _ io.Writer = (*waitStreamWriter)(nil)

func NewWriter(conn io.Writer, tcpOptions sockopt.TCPOptions) io.Writer {
return &waitStreamWriter{
conn: conn,
tcpOptions: tcpOptions,
}
}

func (w *waitStreamWriter) Write(data []byte) (written int, err error) {
written, err = w.conn.Write(data)

// This may not be implemented, so it's best effort really.
waitUntilBytesAreSentErr := w.tcpOptions.WaitUntilBytesAreSent()
PeterZhizhin marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be before the Write is done.

Also, this may significantly slow down communication. What's the performance impact? We will need to understand that for practical use.

We may want to restrict the wait for only when it's needed (on splits).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used a fetch-speed tool:

go run *.go -timeout 100500s -transport "waitstream"  http://speedtest.belwue.net/100M
Downloaded 100.00 MiB in 9.01s
Downloaded Speed: 11.10 MiB/s

Baseline:

go run *.go -timeout 100500s -transport ""  http://speedtest.belwue.net/100M                                                                                          
Downloaded 100.00 MiB in 9.75s
Downloaded Speed: 10.26 MiB/s

if waitUntilBytesAreSentErr != nil && !errors.Is(waitUntilBytesAreSentErr, errors.ErrUnsupported) {
return written, fmt.Errorf("error when waiting for stream to send all bytes: %w", waitUntilBytesAreSentErr)
}

return
}
Loading