diff --git a/go.mod b/go.mod index 30e6105acd18..04f5288d5e9c 100644 --- a/go.mod +++ b/go.mod @@ -47,7 +47,7 @@ require ( google.golang.org/protobuf v1.34.2 gopkg.in/op/go-logging.v1 v1.0.0-20160211212156-b2cb9fa56473 gotest.tools/v3 v3.5.1 - inet.af/tcpproxy v0.0.0-20221017015627-91f861402626 // replaced to github.com/inetaf/tcpproxy (see the bottom of this go.mod file) + inet.af/tcpproxy v0.0.0-20221017015627-91f861402626 // indirect; replaced to github.com/inetaf/tcpproxy (see the bottom of this go.mod file) k8s.io/api v0.31.1 k8s.io/apimachinery v0.31.1 k8s.io/client-go v0.31.1 diff --git a/pkg/vz/network_darwin.go b/pkg/vz/network_darwin.go index 1aceb8e294e1..88d958922e9e 100644 --- a/pkg/vz/network_darwin.go +++ b/pkg/vz/network_darwin.go @@ -3,17 +3,17 @@ package vz import ( - "context" "encoding/binary" + "errors" "io" "net" "os" - "time" + "sync" + "syscall" "github.com/balajiv113/fd" "github.com/sirupsen/logrus" - "inet.af/tcpproxy" // replaced to github.com/inetaf/tcpproxy in go.mod ) func PassFDToUnix(unixSock string) (*os.File, error) { @@ -40,7 +40,7 @@ func DialQemu(unixSock string) (*os.File, error) { if err != nil { return nil, err } - qemuConn := &QEMUPacketConn{unixConn: unixConn} + qemuConn := &QEMUPacketConn{Conn: unixConn} server, client, err := createSockPair() if err != nil { @@ -50,77 +50,125 @@ func DialQemu(unixSock string) (*os.File, error) { if err != nil { return nil, err } + vzConn := &VZPacketConn{Conn: dgramConn} - remote := tcpproxy.DialProxy{ - DialContext: func(context.Context, string, string) (net.Conn, error) { - return dgramConn, nil - }, - } - go remote.HandleConn(qemuConn) + go forwardPackets(qemuConn, vzConn) return client, nil } +func forwardPackets(qemuConn *QEMUPacketConn, vzConn *VZPacketConn) { + defer qemuConn.Close() + defer vzConn.Close() + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + err := qemuConn.ForwardFrom(vzConn) + if err != nil { + logrus.Errorf("Failed to forward packets from VZ to VMNET: %s", err) + } + }() + + go func() { + defer wg.Done() + err := vzConn.ForwardFrom(qemuConn) + if err != nil { + logrus.Errorf("Failed to forward packets from VMNET to VZ: %s", err) + } + }() + + wg.Wait() +} + // QEMUPacketConn converts raw network packet to a QEMU supported network packet. type QEMUPacketConn struct { - unixConn net.Conn + net.Conn } -var _ net.Conn = (*QEMUPacketConn)(nil) - -// Read gets rid of the QEMU header packet and returns the raw packet as response. -func (v *QEMUPacketConn) Read(b []byte) (n int, err error) { - header := make([]byte, 4) - _, err = io.ReadFull(v.unixConn, header) - if err != nil { - logrus.Errorln("Failed to read header", err) +// Read reads QEMU packet and returns the raw packet. +func (c *QEMUPacketConn) Read(b []byte) (n int, err error) { + var size uint32 + if err := binary.Read(c.Conn, binary.BigEndian, &size); err != nil { + // Likely connection closed by peer. + return 0, err } - size := binary.BigEndian.Uint32(header) - reader := io.LimitReader(v.unixConn, int64(size)) + reader := io.LimitReader(c.Conn, int64(size)) _, err = reader.Read(b) if err != nil { - logrus.Errorln("Failed to read packet", err) + // Likely connection closed by peer. + return 0, err } return int(size), nil } -// Write puts QEMU header packet first and then writes the raw packet. -func (v *QEMUPacketConn) Write(b []byte) (n int, err error) { - header := make([]byte, 4) - binary.BigEndian.PutUint32(header, uint32(len(b))) - _, err = v.unixConn.Write(header) - if err != nil { - logrus.Errorln("Failed to write header", err) - } - - write, err := v.unixConn.Write(b) - if err != nil { - logrus.Errorln("Failed to write packet", err) +// ForwardFrom forwards packets from vz connection. +func (c *QEMUPacketConn) ForwardFrom(vzConn *VZPacketConn) error { + buf := make([]byte, maxPacketSize) + + for { + nr, err := vzConn.Read(buf) + if nr == 0 || err != nil { + return err + } + + if err := binary.Write(c.Conn, binary.BigEndian, uint32(nr)); err != nil { + return err + } + + pos := 0 + for pos < nr { + nw, err := c.Conn.Write(buf[pos:nr]) + if err != nil { + return err + } + pos += nw + } } - return write, nil } -func (v *QEMUPacketConn) Close() error { - return v.unixConn.Close() +type VZPacketConn struct { + net.Conn } -func (v *QEMUPacketConn) LocalAddr() net.Addr { - return v.unixConn.LocalAddr() -} - -func (v *QEMUPacketConn) RemoteAddr() net.Addr { - return v.unixConn.RemoteAddr() -} - -func (v *QEMUPacketConn) SetDeadline(t time.Time) error { - return v.unixConn.SetDeadline(t) -} - -func (v *QEMUPacketConn) SetReadDeadline(t time.Time) error { - return v.unixConn.SetReadDeadline(t) -} - -func (v *QEMUPacketConn) SetWriteDeadline(t time.Time) error { - return v.unixConn.SetWriteDeadline(t) +// Based on Apple virtulization framework docs, minimum packet size is 1500 and +// maximum is 64k. If vmnet_max_packet_size was passed from socket_vmnet we +// could uss it here. +// https://developer.apple.com/documentation/virtualization/vzfilehandlenetworkdeviceattachment/3969266-maximumtransmissionunit?language=objc +const maxPacketSize = 64 * 1024 + +// ForwardFrom forwards packets from qemu connection. +func (c *VZPacketConn) ForwardFrom(qemuConn *QEMUPacketConn) error { + buf := make([]byte, maxPacketSize) + + for { + nr, err := qemuConn.Read(buf) + if nr == 0 || err != nil { + return err + } + + for { + nw, err := c.Conn.Write(buf[:nr]) + if err != nil && errors.Is(err, syscall.ENOBUFS) { + // This is an expected condition on BSD based system. The kernel + // does not support blocking until buffer space is available. + // The only way to recover is to retry the call until it + // succeeds, or drop the packet. + // Handled in the same in gvisor-tap-vsock: + // https://github.com/containers/gvisor-tap-vsock/issues/367 + continue + } + + if nw < nr { + return errors.New("incmplete write to unixgram socket") + } + if err != nil { + return err + } + break + } + } }