diff --git a/go.mod b/go.mod index d97707f3b01e..54370ebebf94 100644 --- a/go.mod +++ b/go.mod @@ -47,7 +47,6 @@ require ( google.golang.org/protobuf v1.34.2 gopkg.in/op/go-logging.v1 v1.0.0-20160211212156-b2cb9fa56473 gotest.tools/v3 v3.5.1 - inet.af/tcpproxy v0.0.0-20221017015627-91f861402626 // replaced to github.com/inetaf/tcpproxy (see the bottom of this go.mod file) k8s.io/api v0.31.1 k8s.io/apimachinery v0.31.1 k8s.io/client-go v0.31.1 @@ -127,6 +126,7 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect gvisor.dev/gvisor v0.0.0-20231023213702-2691a8f9b1cf // indirect + inet.af/tcpproxy v0.0.0-20221017015627-91f861402626 // indirect; replaced to github.com/inetaf/tcpproxy (see the bottom of this go.mod file) k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect diff --git a/pkg/vz/network_darwin.go b/pkg/vz/network_darwin.go index 1aceb8e294e1..4f0c8680e42d 100644 --- a/pkg/vz/network_darwin.go +++ b/pkg/vz/network_darwin.go @@ -3,17 +3,18 @@ package vz import ( - "context" "encoding/binary" + "errors" "io" "net" "os" + "sync" + "syscall" "time" "github.com/balajiv113/fd" "github.com/sirupsen/logrus" - "inet.af/tcpproxy" // replaced to github.com/inetaf/tcpproxy in go.mod ) func PassFDToUnix(unixSock string) (*os.File, error) { @@ -40,7 +41,7 @@ func DialQemu(unixSock string) (*os.File, error) { if err != nil { return nil, err } - qemuConn := &QEMUPacketConn{unixConn: unixConn} + qemuConn := &qemuPacketConn{Conn: unixConn} server, client, err := createSockPair() if err != nil { @@ -50,77 +51,117 @@ func DialQemu(unixSock string) (*os.File, error) { if err != nil { return nil, err } + vzConn := &packetConn{Conn: dgramConn} - remote := tcpproxy.DialProxy{ - DialContext: func(context.Context, string, string) (net.Conn, error) { - return dgramConn, nil - }, - } - go remote.HandleConn(qemuConn) + go forwardPackets(qemuConn, vzConn) return client, nil } -// QEMUPacketConn converts raw network packet to a QEMU supported network packet. -type QEMUPacketConn struct { - unixConn net.Conn +func forwardPackets(qemuConn *qemuPacketConn, vzConn *packetConn) { + defer qemuConn.Close() + defer vzConn.Close() + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + if _, err := io.Copy(qemuConn, vzConn); err != nil { + logrus.Errorf("Failed to forward packets from VZ to VMNET: %s", err) + } + }() + + go func() { + defer wg.Done() + if _, err := io.Copy(vzConn, qemuConn); err != nil { + logrus.Errorf("Failed to forward packets from VMNET to VZ: %s", err) + } + }() + + wg.Wait() } -var _ net.Conn = (*QEMUPacketConn)(nil) +// qemuPacketConn converts raw network packet to a QEMU supported network packet. +type qemuPacketConn struct { + net.Conn +} -// Read gets rid of the QEMU header packet and returns the raw packet as response. -func (v *QEMUPacketConn) Read(b []byte) (n int, err error) { - header := make([]byte, 4) - _, err = io.ReadFull(v.unixConn, header) - if err != nil { - logrus.Errorln("Failed to read header", err) +// Read reads a QEMU packet and returns the contained raw packet. Returns (len, +// nil) if a packet was read, and (0, err) on error. Errors means the prorocol +// is broken and the socket must be closed. +func (c *qemuPacketConn) Read(b []byte) (n int, err error) { + var size uint32 + if err := binary.Read(c.Conn, binary.BigEndian, &size); err != nil { + // Likely connection closed by peer. + return 0, err } - size := binary.BigEndian.Uint32(header) - reader := io.LimitReader(v.unixConn, int64(size)) + reader := io.LimitReader(c.Conn, int64(size)) _, err = reader.Read(b) if err != nil { - logrus.Errorln("Failed to read packet", err) + // Likely connection closed by peer. + return 0, err } return int(size), nil } -// Write puts QEMU header packet first and then writes the raw packet. -func (v *QEMUPacketConn) Write(b []byte) (n int, err error) { - header := make([]byte, 4) - binary.BigEndian.PutUint32(header, uint32(len(b))) - _, err = v.unixConn.Write(header) - if err != nil { - logrus.Errorln("Failed to write header", err) +// Write writes a QEMU packet containing the raw packet. Returns (len(b), nil) +// if a packet was written, and (0, err) if a packet was not fully written. +// Errors means the prorocol is broken and the socket must be closed. +func (c *qemuPacketConn) Write(b []byte) (int, error) { + size := len(b) + header := uint32(size) + if err := binary.Write(c.Conn, binary.BigEndian, header); err != nil { + return 0, err } - write, err := v.unixConn.Write(b) - if err != nil { - logrus.Errorln("Failed to write packet", err) + start := 0 + for start < size { + nw, err := c.Conn.Write(b[start:]) + if err != nil { + return 0, err + } + start += nw } - return write, nil + return size, nil } -func (v *QEMUPacketConn) Close() error { - return v.unixConn.Close() -} - -func (v *QEMUPacketConn) LocalAddr() net.Addr { - return v.unixConn.LocalAddr() -} - -func (v *QEMUPacketConn) RemoteAddr() net.Addr { - return v.unixConn.RemoteAddr() -} - -func (v *QEMUPacketConn) SetDeadline(t time.Time) error { - return v.unixConn.SetDeadline(t) -} +// Testing show that retries are very rare (e.g 24 of 62,499,008 packets) and +// requires 1 or 2 retries to complete the write. A 100 microseconds sleep loop +// consumes about 4% CPU on M1 Pro. +const writeRetryDelay = 100 * time.Microsecond -func (v *QEMUPacketConn) SetReadDeadline(t time.Time) error { - return v.unixConn.SetReadDeadline(t) +// packetConn handles ENOBUFS errors when writing to unixgram socket. +type packetConn struct { + net.Conn } -func (v *QEMUPacketConn) SetWriteDeadline(t time.Time) error { - return v.unixConn.SetWriteDeadline(t) +// Write writes a packet retrying on ENOBUFS errors. +func (c *packetConn) Write(b []byte) (int, error) { + var retries uint64 + for { + n, err := c.Conn.Write(b) + if n == 0 && err != nil && errors.Is(err, syscall.ENOBUFS) { + // This is an expected condition on BSD based system. The kernel + // does not support blocking until buffer space is available. + // The only way to recover is to retry the call until it + // succeeds, or drop the packet. + // Handled in a similar way in gvisor-tap-vsock: + // https://github.com/containers/gvisor-tap-vsock/issues/367 + time.Sleep(writeRetryDelay) + retries++ + continue + } + if err != nil { + return 0, err + } + if n < len(b) { + return n, errors.New("incomplete write to unixgram socket") + } + if retries > 0 { + logrus.Debugf("Write completed after %d retries", retries) + } + return n, nil + } }