From 2a448fb4cd7578328130576a0affb072396f1817 Mon Sep 17 00:00:00 2001 From: Nir Soffer Date: Wed, 2 Oct 2024 03:32:55 +0300 Subject: [PATCH] Fix packet forwarding between vz and socket_vmnet We used external package (tcpproxy) for proxying between unix stream and datagram sockets. This package cannot handle ENOBUFS error, expected condition on BSD based systems, and worse, it hides errors and stop forwarding packets silently when write to vz socket fails with ENOBUFS[1]. Fix the issues by replacing tcpproxy with a simpler and more direct implementation that will be easier to maintain. Fixes: - Fix error handling if write to vz datagram socket fail with ENOBUFS. We retry the write until it succeeds. Same solution is used in gvisor-tap-vsock[2]. - Fix error handling if we could not read packet header or body from socket_vmnet stream socket. Previously we logged an error and continue, sending corrupted packet to VZ. - Fix error handling if writing a packet to socket_vmnet stream socket returned after writing partial packet. Now we handle short writes and write the complete packet. Previously this ended with 2 corrupted packets. - Log error if forwarding packets from vz to socket_vmnet or from socket_vmnet to vz failed. Simplification: - Use binary.Read() and binary.Write() to read and write qemu packet header. [1] https://github.com/lima-vm/socket_vmnet/issues/39 [2] https://github.com/containers/gvisor-tap-vsock/pull/370 Signed-off-by: Nir Soffer --- go.mod | 2 +- pkg/vz/network_darwin.go | 158 +++++++++++++++++++++++++-------------- 2 files changed, 104 insertions(+), 56 deletions(-) diff --git a/go.mod b/go.mod index 30e6105acd18..04f5288d5e9c 100644 --- a/go.mod +++ b/go.mod @@ -47,7 +47,7 @@ require ( google.golang.org/protobuf v1.34.2 gopkg.in/op/go-logging.v1 v1.0.0-20160211212156-b2cb9fa56473 gotest.tools/v3 v3.5.1 - inet.af/tcpproxy v0.0.0-20221017015627-91f861402626 // replaced to github.com/inetaf/tcpproxy (see the bottom of this go.mod file) + inet.af/tcpproxy v0.0.0-20221017015627-91f861402626 // indirect; replaced to github.com/inetaf/tcpproxy (see the bottom of this go.mod file) k8s.io/api v0.31.1 k8s.io/apimachinery v0.31.1 k8s.io/client-go v0.31.1 diff --git a/pkg/vz/network_darwin.go b/pkg/vz/network_darwin.go index 1aceb8e294e1..88d958922e9e 100644 --- a/pkg/vz/network_darwin.go +++ b/pkg/vz/network_darwin.go @@ -3,17 +3,17 @@ package vz import ( - "context" "encoding/binary" + "errors" "io" "net" "os" - "time" + "sync" + "syscall" "github.com/balajiv113/fd" "github.com/sirupsen/logrus" - "inet.af/tcpproxy" // replaced to github.com/inetaf/tcpproxy in go.mod ) func PassFDToUnix(unixSock string) (*os.File, error) { @@ -40,7 +40,7 @@ func DialQemu(unixSock string) (*os.File, error) { if err != nil { return nil, err } - qemuConn := &QEMUPacketConn{unixConn: unixConn} + qemuConn := &QEMUPacketConn{Conn: unixConn} server, client, err := createSockPair() if err != nil { @@ -50,77 +50,125 @@ func DialQemu(unixSock string) (*os.File, error) { if err != nil { return nil, err } + vzConn := &VZPacketConn{Conn: dgramConn} - remote := tcpproxy.DialProxy{ - DialContext: func(context.Context, string, string) (net.Conn, error) { - return dgramConn, nil - }, - } - go remote.HandleConn(qemuConn) + go forwardPackets(qemuConn, vzConn) return client, nil } +func forwardPackets(qemuConn *QEMUPacketConn, vzConn *VZPacketConn) { + defer qemuConn.Close() + defer vzConn.Close() + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + err := qemuConn.ForwardFrom(vzConn) + if err != nil { + logrus.Errorf("Failed to forward packets from VZ to VMNET: %s", err) + } + }() + + go func() { + defer wg.Done() + err := vzConn.ForwardFrom(qemuConn) + if err != nil { + logrus.Errorf("Failed to forward packets from VMNET to VZ: %s", err) + } + }() + + wg.Wait() +} + // QEMUPacketConn converts raw network packet to a QEMU supported network packet. type QEMUPacketConn struct { - unixConn net.Conn + net.Conn } -var _ net.Conn = (*QEMUPacketConn)(nil) - -// Read gets rid of the QEMU header packet and returns the raw packet as response. -func (v *QEMUPacketConn) Read(b []byte) (n int, err error) { - header := make([]byte, 4) - _, err = io.ReadFull(v.unixConn, header) - if err != nil { - logrus.Errorln("Failed to read header", err) +// Read reads QEMU packet and returns the raw packet. +func (c *QEMUPacketConn) Read(b []byte) (n int, err error) { + var size uint32 + if err := binary.Read(c.Conn, binary.BigEndian, &size); err != nil { + // Likely connection closed by peer. + return 0, err } - size := binary.BigEndian.Uint32(header) - reader := io.LimitReader(v.unixConn, int64(size)) + reader := io.LimitReader(c.Conn, int64(size)) _, err = reader.Read(b) if err != nil { - logrus.Errorln("Failed to read packet", err) + // Likely connection closed by peer. + return 0, err } return int(size), nil } -// Write puts QEMU header packet first and then writes the raw packet. -func (v *QEMUPacketConn) Write(b []byte) (n int, err error) { - header := make([]byte, 4) - binary.BigEndian.PutUint32(header, uint32(len(b))) - _, err = v.unixConn.Write(header) - if err != nil { - logrus.Errorln("Failed to write header", err) - } - - write, err := v.unixConn.Write(b) - if err != nil { - logrus.Errorln("Failed to write packet", err) +// ForwardFrom forwards packets from vz connection. +func (c *QEMUPacketConn) ForwardFrom(vzConn *VZPacketConn) error { + buf := make([]byte, maxPacketSize) + + for { + nr, err := vzConn.Read(buf) + if nr == 0 || err != nil { + return err + } + + if err := binary.Write(c.Conn, binary.BigEndian, uint32(nr)); err != nil { + return err + } + + pos := 0 + for pos < nr { + nw, err := c.Conn.Write(buf[pos:nr]) + if err != nil { + return err + } + pos += nw + } } - return write, nil } -func (v *QEMUPacketConn) Close() error { - return v.unixConn.Close() +type VZPacketConn struct { + net.Conn } -func (v *QEMUPacketConn) LocalAddr() net.Addr { - return v.unixConn.LocalAddr() -} - -func (v *QEMUPacketConn) RemoteAddr() net.Addr { - return v.unixConn.RemoteAddr() -} - -func (v *QEMUPacketConn) SetDeadline(t time.Time) error { - return v.unixConn.SetDeadline(t) -} - -func (v *QEMUPacketConn) SetReadDeadline(t time.Time) error { - return v.unixConn.SetReadDeadline(t) -} - -func (v *QEMUPacketConn) SetWriteDeadline(t time.Time) error { - return v.unixConn.SetWriteDeadline(t) +// Based on Apple virtulization framework docs, minimum packet size is 1500 and +// maximum is 64k. If vmnet_max_packet_size was passed from socket_vmnet we +// could uss it here. +// https://developer.apple.com/documentation/virtualization/vzfilehandlenetworkdeviceattachment/3969266-maximumtransmissionunit?language=objc +const maxPacketSize = 64 * 1024 + +// ForwardFrom forwards packets from qemu connection. +func (c *VZPacketConn) ForwardFrom(qemuConn *QEMUPacketConn) error { + buf := make([]byte, maxPacketSize) + + for { + nr, err := qemuConn.Read(buf) + if nr == 0 || err != nil { + return err + } + + for { + nw, err := c.Conn.Write(buf[:nr]) + if err != nil && errors.Is(err, syscall.ENOBUFS) { + // This is an expected condition on BSD based system. The kernel + // does not support blocking until buffer space is available. + // The only way to recover is to retry the call until it + // succeeds, or drop the packet. + // Handled in the same in gvisor-tap-vsock: + // https://github.com/containers/gvisor-tap-vsock/issues/367 + continue + } + + if nw < nr { + return errors.New("incmplete write to unixgram socket") + } + if err != nil { + return err + } + break + } + } }