From d49ac31d2ebcfa880b164181ff3e1eb9a15047b3 Mon Sep 17 00:00:00 2001 From: Nir Soffer Date: Wed, 2 Oct 2024 03:32:55 +0300 Subject: [PATCH] Fix packet forwarding between vz and socket_vmnet We used external package (tcpproxy) for proxying between unix stream and datagram sockets. This package cannot handle ENOBUFS error, expected condition on BSD based systems, and worse, it hides errors and stop forwarding packets silently when write to vz socket fails with ENOBUFS[1]. Fix the issues by replacing tcpproxy with a simpler and more direct implementation that will be easier to maintain. Fixes: - Fix error handling if write to vz datagram socket fail with ENOBUFS. We retry the write until it succeeds with a very short sleep between retries. Similar solution is used in gvisor-tap-vsock[2]. - Fix error handling if we could not read packet header or body from socket_vmnet stream socket. Previously we logged an error and continue to send corrupted packet to vz from the point of the failure. - Fix error handling if writing a packet to socket_vmnet stream socket returned after writing partial packet. Now we handle short writes and write the complete packet. Previously would break the protocol and continue to send corrupted packet from the point of the failure. - Log error if forwarding packets from vz to socket_vmnet or from socket_vmnet to vz failed. Simplification: - Use binary.Read() and binary.Write() to read and write qemu packet header. Visibility: - Make QEMUPacketConn private since it is an implementation detail of vz when using socket_vmnet. Testing: - Add a packet forwarding test covering the happy path in 10 milliseconds. [1] https://github.com/lima-vm/socket_vmnet/issues/39 [2] https://github.com/containers/gvisor-tap-vsock/pull/370 Signed-off-by: Nir Soffer --- go.mod | 2 +- pkg/vz/network_darwin.go | 145 ++++++++++++++++++++------------ pkg/vz/network_darwin_test.go | 151 ++++++++++++++++++++++++++++++++++ 3 files changed, 245 insertions(+), 53 deletions(-) create mode 100644 pkg/vz/network_darwin_test.go diff --git a/go.mod b/go.mod index eb1239251d7..accec9d3d81 100644 --- a/go.mod +++ b/go.mod @@ -47,7 +47,6 @@ require ( google.golang.org/protobuf v1.35.1 gopkg.in/op/go-logging.v1 v1.0.0-20160211212156-b2cb9fa56473 gotest.tools/v3 v3.5.1 - inet.af/tcpproxy v0.0.0-20221017015627-91f861402626 // replaced to github.com/inetaf/tcpproxy (see the bottom of this go.mod file) k8s.io/api v0.31.1 k8s.io/apimachinery v0.31.1 k8s.io/client-go v0.31.1 @@ -127,6 +126,7 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect gvisor.dev/gvisor v0.0.0-20231023213702-2691a8f9b1cf // indirect + inet.af/tcpproxy v0.0.0-20221017015627-91f861402626 // indirect; replaced to github.com/inetaf/tcpproxy (see the bottom of this go.mod file) k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect diff --git a/pkg/vz/network_darwin.go b/pkg/vz/network_darwin.go index 1aceb8e294e..4f0c8680e42 100644 --- a/pkg/vz/network_darwin.go +++ b/pkg/vz/network_darwin.go @@ -3,17 +3,18 @@ package vz import ( - "context" "encoding/binary" + "errors" "io" "net" "os" + "sync" + "syscall" "time" "github.com/balajiv113/fd" "github.com/sirupsen/logrus" - "inet.af/tcpproxy" // replaced to github.com/inetaf/tcpproxy in go.mod ) func PassFDToUnix(unixSock string) (*os.File, error) { @@ -40,7 +41,7 @@ func DialQemu(unixSock string) (*os.File, error) { if err != nil { return nil, err } - qemuConn := &QEMUPacketConn{unixConn: unixConn} + qemuConn := &qemuPacketConn{Conn: unixConn} server, client, err := createSockPair() if err != nil { @@ -50,77 +51,117 @@ func DialQemu(unixSock string) (*os.File, error) { if err != nil { return nil, err } + vzConn := &packetConn{Conn: dgramConn} - remote := tcpproxy.DialProxy{ - DialContext: func(context.Context, string, string) (net.Conn, error) { - return dgramConn, nil - }, - } - go remote.HandleConn(qemuConn) + go forwardPackets(qemuConn, vzConn) return client, nil } -// QEMUPacketConn converts raw network packet to a QEMU supported network packet. -type QEMUPacketConn struct { - unixConn net.Conn +func forwardPackets(qemuConn *qemuPacketConn, vzConn *packetConn) { + defer qemuConn.Close() + defer vzConn.Close() + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + if _, err := io.Copy(qemuConn, vzConn); err != nil { + logrus.Errorf("Failed to forward packets from VZ to VMNET: %s", err) + } + }() + + go func() { + defer wg.Done() + if _, err := io.Copy(vzConn, qemuConn); err != nil { + logrus.Errorf("Failed to forward packets from VMNET to VZ: %s", err) + } + }() + + wg.Wait() } -var _ net.Conn = (*QEMUPacketConn)(nil) +// qemuPacketConn converts raw network packet to a QEMU supported network packet. +type qemuPacketConn struct { + net.Conn +} -// Read gets rid of the QEMU header packet and returns the raw packet as response. -func (v *QEMUPacketConn) Read(b []byte) (n int, err error) { - header := make([]byte, 4) - _, err = io.ReadFull(v.unixConn, header) - if err != nil { - logrus.Errorln("Failed to read header", err) +// Read reads a QEMU packet and returns the contained raw packet. Returns (len, +// nil) if a packet was read, and (0, err) on error. Errors means the prorocol +// is broken and the socket must be closed. +func (c *qemuPacketConn) Read(b []byte) (n int, err error) { + var size uint32 + if err := binary.Read(c.Conn, binary.BigEndian, &size); err != nil { + // Likely connection closed by peer. + return 0, err } - size := binary.BigEndian.Uint32(header) - reader := io.LimitReader(v.unixConn, int64(size)) + reader := io.LimitReader(c.Conn, int64(size)) _, err = reader.Read(b) if err != nil { - logrus.Errorln("Failed to read packet", err) + // Likely connection closed by peer. + return 0, err } return int(size), nil } -// Write puts QEMU header packet first and then writes the raw packet. -func (v *QEMUPacketConn) Write(b []byte) (n int, err error) { - header := make([]byte, 4) - binary.BigEndian.PutUint32(header, uint32(len(b))) - _, err = v.unixConn.Write(header) - if err != nil { - logrus.Errorln("Failed to write header", err) +// Write writes a QEMU packet containing the raw packet. Returns (len(b), nil) +// if a packet was written, and (0, err) if a packet was not fully written. +// Errors means the prorocol is broken and the socket must be closed. +func (c *qemuPacketConn) Write(b []byte) (int, error) { + size := len(b) + header := uint32(size) + if err := binary.Write(c.Conn, binary.BigEndian, header); err != nil { + return 0, err } - write, err := v.unixConn.Write(b) - if err != nil { - logrus.Errorln("Failed to write packet", err) + start := 0 + for start < size { + nw, err := c.Conn.Write(b[start:]) + if err != nil { + return 0, err + } + start += nw } - return write, nil + return size, nil } -func (v *QEMUPacketConn) Close() error { - return v.unixConn.Close() -} - -func (v *QEMUPacketConn) LocalAddr() net.Addr { - return v.unixConn.LocalAddr() -} - -func (v *QEMUPacketConn) RemoteAddr() net.Addr { - return v.unixConn.RemoteAddr() -} - -func (v *QEMUPacketConn) SetDeadline(t time.Time) error { - return v.unixConn.SetDeadline(t) -} +// Testing show that retries are very rare (e.g 24 of 62,499,008 packets) and +// requires 1 or 2 retries to complete the write. A 100 microseconds sleep loop +// consumes about 4% CPU on M1 Pro. +const writeRetryDelay = 100 * time.Microsecond -func (v *QEMUPacketConn) SetReadDeadline(t time.Time) error { - return v.unixConn.SetReadDeadline(t) +// packetConn handles ENOBUFS errors when writing to unixgram socket. +type packetConn struct { + net.Conn } -func (v *QEMUPacketConn) SetWriteDeadline(t time.Time) error { - return v.unixConn.SetWriteDeadline(t) +// Write writes a packet retrying on ENOBUFS errors. +func (c *packetConn) Write(b []byte) (int, error) { + var retries uint64 + for { + n, err := c.Conn.Write(b) + if n == 0 && err != nil && errors.Is(err, syscall.ENOBUFS) { + // This is an expected condition on BSD based system. The kernel + // does not support blocking until buffer space is available. + // The only way to recover is to retry the call until it + // succeeds, or drop the packet. + // Handled in a similar way in gvisor-tap-vsock: + // https://github.com/containers/gvisor-tap-vsock/issues/367 + time.Sleep(writeRetryDelay) + retries++ + continue + } + if err != nil { + return 0, err + } + if n < len(b) { + return n, errors.New("incomplete write to unixgram socket") + } + if retries > 0 { + logrus.Debugf("Write completed after %d retries", retries) + } + return n, nil + } } diff --git a/pkg/vz/network_darwin_test.go b/pkg/vz/network_darwin_test.go new file mode 100644 index 00000000000..43189e33ca7 --- /dev/null +++ b/pkg/vz/network_darwin_test.go @@ -0,0 +1,151 @@ +//go:build darwin && !no_vz + +package vz + +import ( + "encoding/binary" + "fmt" + "net" + "path/filepath" + "testing" +) + +const vmnetMaxPacketSize = 1514 +const packetsCount = 1000 + +func TestDialQemu(t *testing.T) { + listener, err := listenUnix(t.TempDir()) + if err != nil { + t.Fatal(err) + } + defer listener.Close() + t.Logf("Listening at %q", listener.Addr()) + + errc := make(chan error, 2) + + // Start the fake vmnet server. + go func() { + t.Log("Fake vmnet started") + errc <- serveOneClient(listener) + t.Log("Fake vmnet finished") + }() + + // Connect to the fake vmnet server. + client, err := DialQemu(listener.Addr().String()) + if err != nil { + t.Fatal(err) + } + t.Log("Connected to fake vment server") + + dgramConn, err := net.FileConn(client) + if err != nil { + t.Fatal(err) + } + + vzConn := packetConn{Conn: dgramConn} + defer vzConn.Close() + + go func() { + t.Log("Sender started") + buf := make([]byte, vmnetMaxPacketSize) + for i := 0; i < vmnetMaxPacketSize; i++ { + buf[i] = 0x55 + } + + // data packet format: + // 0-4 packet number + // 4-1514 0x55 ... + for i := 0; i < packetsCount; i++ { + binary.BigEndian.PutUint32(buf, uint32(i)) + if _, err := vzConn.Write(buf); err != nil { + errc <- err + return + } + } + t.Logf("Sent %d data packets", packetsCount) + + // quit packet format: + // 0-4: "quit" + copy(buf[:4], []byte("quit")) + if _, err := vzConn.Write(buf[:4]); err != nil { + errc <- err + return + } + + errc <- nil + t.Log("Sender finished") + }() + + // Read and verify packets to the server. + + buf := make([]byte, vmnetMaxPacketSize) + + t.Logf("Receiving and verifying data packets...") + for i := 0; i < packetsCount; i++ { + n, err := vzConn.Read(buf) + if err != nil { + t.Fatal(err) + } + if n < vmnetMaxPacketSize { + t.Fatalf("Expected %d bytes, got %d", vmnetMaxPacketSize, n) + } + + number := binary.BigEndian.Uint32(buf[:4]) + if number != uint32(i) { + t.Fatalf("Expected packet %d, got packet %d", i, number) + } + + for j := 4; j < vmnetMaxPacketSize; j++ { + if buf[j] != 0x55 { + t.Fatalf("Expected byte 0x55 at offset %d, got 0x%02x", j, buf[j]) + } + } + } + t.Logf("Recived and verified %d data packets", packetsCount) + + for i := 0; i < 2; i++ { + err := <-errc + if err != nil { + t.Fatal(err) + } + } +} + +// serveOneClient accepts one client and echo back received packets until a +// "quit" packet is sent. +func serveOneClient(listener *net.UnixListener) error { + conn, err := listener.Accept() + if err != nil { + return err + } + qemuConn := qemuPacketConn{Conn: conn} + defer qemuConn.Close() + + buf := make([]byte, vmnetMaxPacketSize) + for { + nr, err := qemuConn.Read(buf) + if err != nil { + return err + } + if string(buf[:4]) == "quit" { + return nil + } + nw, err := qemuConn.Write(buf[:nr]) + if err != nil { + return err + } + if nw != nr { + return fmt.Errorf("incomplete write: expected: %d, wrote: %d", nr, nw) + } + } +} + +// listenUnix creates and listen to unix socket under dir +func listenUnix(dir string) (*net.UnixListener, error) { + sock := filepath.Join(dir, "sock") + addr, err := net.ResolveUnixAddr("unix", sock) + if err != nil { + return nil, err + } + return net.ListenUnix("unix", addr) +}