Skip to content

Commit

Permalink
Fix packet forwarding between vz and socket_vmnet
Browse files Browse the repository at this point in the history
We used external package (tcpproxy) for proxying between unix stream and
datagram sockets. This package cannot handle ENOBUFS error, expected
condition on BSD based systems, and worse, it hides errors and stop
forwarding packets silently when write to vz socket fails with
ENOBUFS[1].

Fix the issues by replacing tcpproxy with a simpler and more direct
implementation that will be easier to maintain.

Fixes:

- Fix error handling if write to vz datagram socket fail with ENOBUFS.
  We retry the write until it succeeds with a very short sleep between
  retries.. Similar solution is used in gvisor-tap-vsock[2].

- Fix error handling if we could not read packet header or body from
  socket_vmnet stream socket. Previously we logged an error and
  continue, sending corrupted packet to VZ.

- Fix error handling if writing a packet to socket_vmnet stream socket
  returned after writing partial packet. Now we handle short writes and
  write the complete packet. Previously this ended with 2 corrupted
  packets.

- Log error if forwarding packets from vz to socket_vmnet or from
  socket_vmnet to vz failed.

Simplification:

- Use binary.Read() and binary.Write() to read and write qemu packet
  header.

Visibility:

- Make QEMUPacketConn private since it is an implementation detail
  correct only for lima vz-socket_vmnet use case.

[1] lima-vm/socket_vmnet#39
[2] containers/gvisor-tap-vsock#370

Signed-off-by: Nir Soffer <[email protected]>
  • Loading branch information
nirs committed Oct 3, 2024
1 parent 16fd466 commit 770cbb9
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 56 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ require (
google.golang.org/protobuf v1.34.2
gopkg.in/op/go-logging.v1 v1.0.0-20160211212156-b2cb9fa56473
gotest.tools/v3 v3.5.1
inet.af/tcpproxy v0.0.0-20221017015627-91f861402626 // replaced to github.com/inetaf/tcpproxy (see the bottom of this go.mod file)
inet.af/tcpproxy v0.0.0-20221017015627-91f861402626 // indirect; replaced to github.com/inetaf/tcpproxy (see the bottom of this go.mod file)
k8s.io/api v0.31.1
k8s.io/apimachinery v0.31.1
k8s.io/client-go v0.31.1
Expand Down
169 changes: 114 additions & 55 deletions pkg/vz/network_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@
package vz

import (
"context"
"encoding/binary"
"errors"
"io"
"net"
"os"
"sync"
"syscall"
"time"

"github.com/balajiv113/fd"

"github.com/sirupsen/logrus"
"inet.af/tcpproxy" // replaced to github.com/inetaf/tcpproxy in go.mod
)

func PassFDToUnix(unixSock string) (*os.File, error) {
Expand All @@ -40,7 +41,7 @@ func DialQemu(unixSock string) (*os.File, error) {
if err != nil {
return nil, err
}
qemuConn := &QEMUPacketConn{unixConn: unixConn}
qemuConn := &qemuPacketConn{Conn: unixConn}

server, client, err := createSockPair()
if err != nil {
Expand All @@ -50,77 +51,135 @@ func DialQemu(unixSock string) (*os.File, error) {
if err != nil {
return nil, err
}
vzConn := &packetConn{Conn: dgramConn}

remote := tcpproxy.DialProxy{
DialContext: func(context.Context, string, string) (net.Conn, error) {
return dgramConn, nil
},
}
go remote.HandleConn(qemuConn)
go forwardPackets(qemuConn, vzConn)

return client, nil
}

// QEMUPacketConn converts raw network packet to a QEMU supported network packet.
type QEMUPacketConn struct {
unixConn net.Conn
func forwardPackets(qemuConn *qemuPacketConn, vzConn *packetConn) {
defer qemuConn.Close()
defer vzConn.Close()

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()
if err := qemuConn.ForwardFrom(vzConn); err != nil {
logrus.Errorf("Failed to forward packets from VZ to VMNET: %s", err)
}
}()

go func() {
defer wg.Done()
if err := vzConn.ForwardFrom(qemuConn); err != nil {
logrus.Errorf("Failed to forward packets from VMNET to VZ: %s", err)
}
}()

wg.Wait()
}

var _ net.Conn = (*QEMUPacketConn)(nil)
// qemuPacketConn converts raw network packet to a QEMU supported network packet.
type qemuPacketConn struct {
net.Conn
}

// Read gets rid of the QEMU header packet and returns the raw packet as response.
func (v *QEMUPacketConn) Read(b []byte) (n int, err error) {
header := make([]byte, 4)
_, err = io.ReadFull(v.unixConn, header)
if err != nil {
logrus.Errorln("Failed to read header", err)
// Read reads QEMU packet and returns the raw packet.
func (c *qemuPacketConn) Read(b []byte) (n int, err error) {
var size uint32
if err := binary.Read(c.Conn, binary.BigEndian, &size); err != nil {
// Likely connection closed by peer.
return 0, err
}

size := binary.BigEndian.Uint32(header)
reader := io.LimitReader(v.unixConn, int64(size))
reader := io.LimitReader(c.Conn, int64(size))
_, err = reader.Read(b)
if err != nil {
logrus.Errorln("Failed to read packet", err)
// Likely connection closed by peer.
return 0, err
}
return int(size), nil
}

// Write puts QEMU header packet first and then writes the raw packet.
func (v *QEMUPacketConn) Write(b []byte) (n int, err error) {
header := make([]byte, 4)
binary.BigEndian.PutUint32(header, uint32(len(b)))
_, err = v.unixConn.Write(header)
if err != nil {
logrus.Errorln("Failed to write header", err)
}

write, err := v.unixConn.Write(b)
if err != nil {
logrus.Errorln("Failed to write packet", err)
// ForwardFrom forwards packets from vz connection.
func (c *qemuPacketConn) ForwardFrom(vzConn *packetConn) error {
buf := make([]byte, maxPacketSize)

for {
nr, err := vzConn.Read(buf)
if nr == 0 || err != nil {
return err
}

if err := binary.Write(c.Conn, binary.BigEndian, uint32(nr)); err != nil {
return err
}

pos := 0
for pos < nr {
nw, err := c.Conn.Write(buf[pos:nr])
if err != nil {
return err
}
pos += nw
}
}
return write, nil
}

func (v *QEMUPacketConn) Close() error {
return v.unixConn.Close()
}

func (v *QEMUPacketConn) LocalAddr() net.Addr {
return v.unixConn.LocalAddr()
}

func (v *QEMUPacketConn) RemoteAddr() net.Addr {
return v.unixConn.RemoteAddr()
type packetConn struct {
net.Conn
}

func (v *QEMUPacketConn) SetDeadline(t time.Time) error {
return v.unixConn.SetDeadline(t)
}

func (v *QEMUPacketConn) SetReadDeadline(t time.Time) error {
return v.unixConn.SetReadDeadline(t)
}

func (v *QEMUPacketConn) SetWriteDeadline(t time.Time) error {
return v.unixConn.SetWriteDeadline(t)
// Based on Apple virtulization framework docs, minimum packet size is 1500 and
// maximum is 64k. If vmnet_max_packet_size was passed from socket_vmnet we
// could uss it here.
// https://developer.apple.com/documentation/virtualization/vzfilehandlenetworkdeviceattachment/3969266-maximumtransmissionunit?language=objc
const maxPacketSize = 64 * 1024

// Testing show that retries are very rare. When running 300 seconds iper3
// benchmark at 2.35 Gbits/sec, sending 208,330 data packets, only 16 packets
// failed with ENOBUFS. 7 completed after 1 retry, 8 after 2 retries, and 1
// after 3 retries.
const writeRetryDelay = 100 * time.Microsecond

// ForwardFrom forwards packets from qemu connection.
func (c *packetConn) ForwardFrom(qemuConn *qemuPacketConn) error {
buf := make([]byte, maxPacketSize)

for {
nr, err := qemuConn.Read(buf)
if nr == 0 || err != nil {
return err
}

var retries uint64
for {
nw, err := c.Conn.Write(buf[:nr])
if err != nil && errors.Is(err, syscall.ENOBUFS) {
// This is an expected condition on BSD based system. The kernel
// does not support blocking until buffer space is available.
// The only way to recover is to retry the call until it
// succeeds, or drop the packet.
// Handled in a similar way in gvisor-tap-vsock:
// https://github.com/containers/gvisor-tap-vsock/issues/367
time.Sleep(writeRetryDelay)
retries++
continue
}

if err != nil {
return err
}
if nw < nr {
return errors.New("incmplete write to unixgram socket")
}
if retries > 0 {
logrus.Debugf("Write completed after %d retries", retries)
}
break
}
}
}

0 comments on commit 770cbb9

Please sign in to comment.