Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix packet forwarding between vz and socket_vmnet #2680

Merged
merged 1 commit into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ require (
google.golang.org/protobuf v1.35.1
gopkg.in/op/go-logging.v1 v1.0.0-20160211212156-b2cb9fa56473
gotest.tools/v3 v3.5.1
inet.af/tcpproxy v0.0.0-20221017015627-91f861402626 // replaced to github.com/inetaf/tcpproxy (see the bottom of this go.mod file)
k8s.io/api v0.31.1
k8s.io/apimachinery v0.31.1
k8s.io/client-go v0.31.1
Expand Down Expand Up @@ -127,6 +126,7 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gvisor.dev/gvisor v0.0.0-20231023213702-2691a8f9b1cf // indirect
inet.af/tcpproxy v0.0.0-20221017015627-91f861402626 // indirect; replaced to github.com/inetaf/tcpproxy (see the bottom of this go.mod file)
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
Expand Down
145 changes: 93 additions & 52 deletions pkg/vz/network_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@
package vz

import (
"context"
"encoding/binary"
"errors"
"io"
"net"
"os"
"sync"
"syscall"
"time"

"github.com/balajiv113/fd"

"github.com/sirupsen/logrus"
"inet.af/tcpproxy" // replaced to github.com/inetaf/tcpproxy in go.mod
)

func PassFDToUnix(unixSock string) (*os.File, error) {
Expand All @@ -40,7 +41,7 @@ func DialQemu(unixSock string) (*os.File, error) {
if err != nil {
return nil, err
}
qemuConn := &QEMUPacketConn{unixConn: unixConn}
qemuConn := &qemuPacketConn{Conn: unixConn}

server, client, err := createSockPair()
if err != nil {
Expand All @@ -50,77 +51,117 @@ func DialQemu(unixSock string) (*os.File, error) {
if err != nil {
return nil, err
}
vzConn := &packetConn{Conn: dgramConn}

remote := tcpproxy.DialProxy{
DialContext: func(context.Context, string, string) (net.Conn, error) {
return dgramConn, nil
},
}
go remote.HandleConn(qemuConn)
go forwardPackets(qemuConn, vzConn)

return client, nil
}

// QEMUPacketConn converts raw network packet to a QEMU supported network packet.
type QEMUPacketConn struct {
unixConn net.Conn
func forwardPackets(qemuConn *qemuPacketConn, vzConn *packetConn) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need our custom forwardPackets ??

I would still prefer to use inetproxy itself. Unless it doesn't work even after wrapping

We could simply wrap the packetConn with the fileconn. This way retry on ENOBUFS are present.
https://github.com/lima-vm/lima/blob/master/pkg/vz/network_darwin.go#L49

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need our custom forwardPackets ??

Yes, for 2 reasons:

  • tcpproxy hides errors during io.Copy(), making debugging impossible
  • we don't need tcpproxy since copying bytes between the sockets it trivial

I would still prefer to use inetproxy itself. Unless it doesn't work even after wrapping

Can you explain why?

Looking at https://github.com/inetaf/tcpproxy:

Proxy TCP connections based on static rules, HTTP Host headers, and SNI server names (Go package or binary)

We don't do any of that. We used a tiny bit of tcproxy copying bytes around, and this is better done in lima itself, where we can implement it in the best way for lima, and change it easily when needed.

We could simply wrap the packetConn with the fileconn. This way retry on ENOBUFS are present. https://github.com/lima-vm/lima/blob/master/pkg/vz/network_darwin.go#L49

Adding another layer of wrapping to keep unneeded dependency does not sounds like the right way to me.

defer qemuConn.Close()
defer vzConn.Close()

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()
if _, err := io.Copy(qemuConn, vzConn); err != nil {
logrus.Errorf("Failed to forward packets from VZ to VMNET: %s", err)
}
}()

go func() {
defer wg.Done()
if _, err := io.Copy(vzConn, qemuConn); err != nil {
logrus.Errorf("Failed to forward packets from VMNET to VZ: %s", err)
}
}()

wg.Wait()
}

var _ net.Conn = (*QEMUPacketConn)(nil)
nirs marked this conversation as resolved.
Show resolved Hide resolved
// qemuPacketConn converts raw network packet to a QEMU supported network packet.
type qemuPacketConn struct {
net.Conn
}

// Read gets rid of the QEMU header packet and returns the raw packet as response.
func (v *QEMUPacketConn) Read(b []byte) (n int, err error) {
header := make([]byte, 4)
_, err = io.ReadFull(v.unixConn, header)
if err != nil {
logrus.Errorln("Failed to read header", err)
// Read reads a QEMU packet and returns the contained raw packet. Returns (len,
// nil) if a packet was read, and (0, err) on error. Errors means the prorocol
// is broken and the socket must be closed.
func (c *qemuPacketConn) Read(b []byte) (n int, err error) {
var size uint32
if err := binary.Read(c.Conn, binary.BigEndian, &size); err != nil {
// Likely connection closed by peer.
return 0, err
}

size := binary.BigEndian.Uint32(header)
reader := io.LimitReader(v.unixConn, int64(size))
reader := io.LimitReader(c.Conn, int64(size))
_, err = reader.Read(b)
if err != nil {
logrus.Errorln("Failed to read packet", err)
// Likely connection closed by peer.
return 0, err
}
return int(size), nil
}

// Write puts QEMU header packet first and then writes the raw packet.
func (v *QEMUPacketConn) Write(b []byte) (n int, err error) {
header := make([]byte, 4)
binary.BigEndian.PutUint32(header, uint32(len(b)))
_, err = v.unixConn.Write(header)
if err != nil {
logrus.Errorln("Failed to write header", err)
// Write writes a QEMU packet containing the raw packet. Returns (len(b), nil)
// if a packet was written, and (0, err) if a packet was not fully written.
// Errors means the prorocol is broken and the socket must be closed.
func (c *qemuPacketConn) Write(b []byte) (int, error) {
size := len(b)
header := uint32(size)
if err := binary.Write(c.Conn, binary.BigEndian, header); err != nil {
return 0, err
}

write, err := v.unixConn.Write(b)
if err != nil {
logrus.Errorln("Failed to write packet", err)
start := 0
for start < size {
nw, err := c.Conn.Write(b[start:])
if err != nil {
return 0, err
}
start += nw
}
return write, nil
return size, nil
}

func (v *QEMUPacketConn) Close() error {
return v.unixConn.Close()
}

func (v *QEMUPacketConn) LocalAddr() net.Addr {
return v.unixConn.LocalAddr()
}

func (v *QEMUPacketConn) RemoteAddr() net.Addr {
return v.unixConn.RemoteAddr()
}

func (v *QEMUPacketConn) SetDeadline(t time.Time) error {
return v.unixConn.SetDeadline(t)
}
// Testing show that retries are very rare (e.g 24 of 62,499,008 packets) and
// requires 1 or 2 retries to complete the write. A 100 microseconds sleep loop
// consumes about 4% CPU on M1 Pro.
const writeRetryDelay = 100 * time.Microsecond

func (v *QEMUPacketConn) SetReadDeadline(t time.Time) error {
return v.unixConn.SetReadDeadline(t)
// packetConn handles ENOBUFS errors when writing to unixgram socket.
type packetConn struct {
net.Conn
}

func (v *QEMUPacketConn) SetWriteDeadline(t time.Time) error {
return v.unixConn.SetWriteDeadline(t)
// Write writes a packet retrying on ENOBUFS errors.
func (c *packetConn) Write(b []byte) (int, error) {
var retries uint64
for {
n, err := c.Conn.Write(b)
if n == 0 && err != nil && errors.Is(err, syscall.ENOBUFS) {
// This is an expected condition on BSD based system. The kernel
// does not support blocking until buffer space is available.
// The only way to recover is to retry the call until it
// succeeds, or drop the packet.
// Handled in a similar way in gvisor-tap-vsock:
// https://github.com/containers/gvisor-tap-vsock/issues/367
time.Sleep(writeRetryDelay)
retries++
continue
}
if err != nil {
return 0, err
}
if n < len(b) {
return n, errors.New("incomplete write to unixgram socket")
}
if retries > 0 {
logrus.Debugf("Write completed after %d retries", retries)
}
return n, nil
}
}
151 changes: 151 additions & 0 deletions pkg/vz/network_darwin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
//go:build darwin && !no_vz

package vz

import (
"encoding/binary"
"fmt"
"net"
"path/filepath"
"testing"
)

const vmnetMaxPacketSize = 1514
const packetsCount = 1000

func TestDialQemu(t *testing.T) {
listener, err := listenUnix(t.TempDir())
if err != nil {
t.Fatal(err)
}
defer listener.Close()
t.Logf("Listening at %q", listener.Addr())

errc := make(chan error, 2)

// Start the fake vmnet server.
go func() {
t.Log("Fake vmnet started")
errc <- serveOneClient(listener)
t.Log("Fake vmnet finished")
}()

// Connect to the fake vmnet server.
client, err := DialQemu(listener.Addr().String())
if err != nil {
t.Fatal(err)
}
t.Log("Connected to fake vment server")

dgramConn, err := net.FileConn(client)
if err != nil {
t.Fatal(err)
}

vzConn := packetConn{Conn: dgramConn}
defer vzConn.Close()

go func() {
t.Log("Sender started")
buf := make([]byte, vmnetMaxPacketSize)
for i := 0; i < vmnetMaxPacketSize; i++ {
buf[i] = 0x55
}

// data packet format:
// 0-4 packet number
// 4-1514 0x55 ...
for i := 0; i < packetsCount; i++ {
binary.BigEndian.PutUint32(buf, uint32(i))
if _, err := vzConn.Write(buf); err != nil {
errc <- err
return
}
}
t.Logf("Sent %d data packets", packetsCount)

// quit packet format:
// 0-4: "quit"
copy(buf[:4], []byte("quit"))
if _, err := vzConn.Write(buf[:4]); err != nil {
errc <- err
return
}

errc <- nil
t.Log("Sender finished")
}()

// Read and verify packets to the server.

buf := make([]byte, vmnetMaxPacketSize)

t.Logf("Receiving and verifying data packets...")
for i := 0; i < packetsCount; i++ {
n, err := vzConn.Read(buf)
if err != nil {
t.Fatal(err)
}
if n < vmnetMaxPacketSize {
t.Fatalf("Expected %d bytes, got %d", vmnetMaxPacketSize, n)
}

number := binary.BigEndian.Uint32(buf[:4])
if number != uint32(i) {
t.Fatalf("Expected packet %d, got packet %d", i, number)
}

for j := 4; j < vmnetMaxPacketSize; j++ {
if buf[j] != 0x55 {
t.Fatalf("Expected byte 0x55 at offset %d, got 0x%02x", j, buf[j])
}
}
}
t.Logf("Recived and verified %d data packets", packetsCount)

for i := 0; i < 2; i++ {
err := <-errc
if err != nil {
t.Fatal(err)
}
}
}

// serveOneClient accepts one client and echo back received packets until a
// "quit" packet is sent.
func serveOneClient(listener *net.UnixListener) error {
conn, err := listener.Accept()
if err != nil {
return err
}
qemuConn := qemuPacketConn{Conn: conn}
defer qemuConn.Close()

buf := make([]byte, vmnetMaxPacketSize)
for {
nr, err := qemuConn.Read(buf)
if err != nil {
return err
}
if string(buf[:4]) == "quit" {
return nil
}
nw, err := qemuConn.Write(buf[:nr])
if err != nil {
return err
}
if nw != nr {
return fmt.Errorf("incomplete write: expected: %d, wrote: %d", nr, nw)
}
}
}

// listenUnix creates and listen to unix socket under dir
func listenUnix(dir string) (*net.UnixListener, error) {
sock := filepath.Join(dir, "sock")
addr, err := net.ResolveUnixAddr("unix", sock)
if err != nil {
return nil, err
}
return net.ListenUnix("unix", addr)
}
Loading