Skip to content

Commit

Permalink
feat: support dpdk interface hotplug for kubevirt (#3438)
Browse files Browse the repository at this point in the history
Signed-off-by: wujixin <[email protected]>
Co-authored-by: wujixin <[email protected]>
  • Loading branch information
2 people authored and bobz965 committed Nov 25, 2023
1 parent ed2186f commit 1186636
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 53 deletions.
46 changes: 24 additions & 22 deletions cmd/cni/cni.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,20 @@ func cmdAdd(args *skel.CmdArgs) error {

client := request.NewCniServerClient(netConf.ServerSocket)
response, err := client.Add(request.CniRequest{
CniType: netConf.Type,
PodName: podName,
PodNamespace: podNamespace,
ContainerID: args.ContainerID,
NetNs: args.Netns,
IfName: args.IfName,
Provider: netConf.Provider,
Routes: netConf.Routes,
DNS: netConf.DNS,
DeviceID: netConf.DeviceID,
VfDriver: netConf.VfDriver,
VhostUserSocketVolumeName: netConf.VhostUserSocketVolumeName,
VhostUserSocketName: netConf.VhostUserSocketName,
CniType: netConf.Type,
PodName: podName,
PodNamespace: podNamespace,
ContainerID: args.ContainerID,
NetNs: args.Netns,
IfName: args.IfName,
Provider: netConf.Provider,
Routes: netConf.Routes,
DNS: netConf.DNS,
DeviceID: netConf.DeviceID,
VfDriver: netConf.VfDriver,
VhostUserSocketVolumeName: netConf.VhostUserSocketVolumeName,
VhostUserSocketName: netConf.VhostUserSocketName,
VhostUserSocketConsumption: netConf.VhostUserSocketConsumption,
})
if err != nil {
return types.NewError(types.ErrTryAgainLater, "RPC failed", err.Error())
Expand Down Expand Up @@ -151,15 +152,16 @@ func cmdDel(args *skel.CmdArgs) error {
}

err = client.Del(request.CniRequest{
CniType: netConf.Type,
PodName: podName,
PodNamespace: podNamespace,
ContainerID: args.ContainerID,
NetNs: args.Netns,
IfName: args.IfName,
Provider: netConf.Provider,
DeviceID: netConf.DeviceID,
VhostUserSocketVolumeName: netConf.VhostUserSocketVolumeName,
CniType: netConf.Type,
PodName: podName,
PodNamespace: podNamespace,
ContainerID: args.ContainerID,
NetNs: args.Netns,
IfName: args.IfName,
Provider: netConf.Provider,
DeviceID: netConf.DeviceID,
VhostUserSocketVolumeName: netConf.VhostUserSocketVolumeName,
VhostUserSocketConsumption: netConf.VhostUserSocketConsumption,
})
if err != nil {
return types.NewError(types.ErrTryAgainLater, "RPC failed", err.Error())
Expand Down
5 changes: 3 additions & 2 deletions cmd/cni/netconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ type netConf struct {
DeviceID string `json:"deviceID"`
VfDriver string `json:"vf_driver"`
// for dpdk
VhostUserSocketVolumeName string `json:"vhost_user_socket_volume_name"`
VhostUserSocketName string `json:"vhost_user_socket_name"`
VhostUserSocketVolumeName string `json:"vhost_user_socket_volume_name"`
VhostUserSocketName string `json:"vhost_user_socket_name"`
VhostUserSocketConsumption string `json:"vhost_user_socket_consumption"`
}

func (n *netConf) postLoad() {
Expand Down
5 changes: 3 additions & 2 deletions cmd/cni/netconf_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ type netConf struct {
DeviceID string `json:"deviceID"`
VfDriver string `json:"vf_driver"`
// for dpdk
VhostUserSocketVolumeName string `json:"vhost_user_socket_volume_name"`
VhostUserSocketName string `json:"vhost_user_socket_name"`
VhostUserSocketVolumeName string `json:"vhost_user_socket_volume_name"`
VhostUserSocketName string `json:"vhost_user_socket_name"`
VhostUserSocketConsumption string `json:"vhost_user_socket_consumption"`
}

func (n *netConf) postLoad() {
Expand Down
16 changes: 13 additions & 3 deletions pkg/daemon/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,17 @@ func (csh cniServerHandler) handleAdd(req *restful.Request, resp *restful.Respon
ifName = "eth0"
}

// For Support kubevirt hotplug dpdk nic, forbidden set the volume name
if podRequest.VhostUserSocketConsumption == util.ConsumptionKubevirt {
podRequest.VhostUserSocketVolumeName = util.VhostUserSocketVolumeName
}

switch {
case podRequest.DeviceID != "":
nicType = util.OffloadType
case podRequest.VhostUserSocketVolumeName != "":
nicType = util.DpdkType
if err = createShortSharedDir(pod, podRequest.VhostUserSocketVolumeName, csh.Config.KubeletDir); err != nil {
if err = createShortSharedDir(pod, podRequest.VhostUserSocketVolumeName, podRequest.VhostUserSocketConsumption, csh.Config.KubeletDir); err != nil {
klog.Error(err.Error())
if err = resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: err.Error()}); err != nil {
klog.Errorf("failed to write response: %v", err)
Expand Down Expand Up @@ -293,7 +298,7 @@ func (csh cniServerHandler) handleAdd(req *restful.Request, resp *restful.Respon
case util.InternalType:
podNicName, err = csh.configureNicWithInternalPort(podRequest.PodName, podRequest.PodNamespace, podRequest.Provider, podRequest.NetNs, podRequest.ContainerID, ifName, macAddr, mtu, ipAddr, gw, isDefaultRoute, detectIPConflict, routes, podRequest.DNS.Nameservers, podRequest.DNS.Search, ingress, egress, podRequest.DeviceID, nicType, latency, limit, loss, jitter, gatewayCheckMode, u2oInterconnectionIP)
case util.DpdkType:
err = csh.configureDpdkNic(podRequest.PodName, podRequest.PodNamespace, podRequest.Provider, podRequest.NetNs, podRequest.ContainerID, ifName, macAddr, mtu, ipAddr, gw, ingress, egress, getShortSharedDir(pod.UID, podRequest.VhostUserSocketVolumeName), podRequest.VhostUserSocketName)
err = csh.configureDpdkNic(podRequest.PodName, podRequest.PodNamespace, podRequest.Provider, podRequest.NetNs, podRequest.ContainerID, ifName, macAddr, mtu, ipAddr, gw, ingress, egress, getShortSharedDir(pod.UID, podRequest.VhostUserSocketVolumeName), podRequest.VhostUserSocketName, podRequest.VhostUserSocketConsumption)
default:
err = csh.configureNic(podRequest.PodName, podRequest.PodNamespace, podRequest.Provider, podRequest.NetNs, podRequest.ContainerID, podRequest.VfDriver, ifName, macAddr, mtu, ipAddr, gw, isDefaultRoute, detectIPConflict, routes, podRequest.DNS.Nameservers, podRequest.DNS.Search, ingress, egress, podRequest.DeviceID, nicType, latency, limit, loss, jitter, gatewayCheckMode, u2oInterconnectionIP, oldPodName)
}
Expand Down Expand Up @@ -419,13 +424,18 @@ func (csh cniServerHandler) handleDel(req *restful.Request, resp *restful.Respon
}
}

// For Support kubevirt hotplug dpdk nic, forbidden set the volume name
if podRequest.VhostUserSocketConsumption == util.ConsumptionKubevirt {
podRequest.VhostUserSocketVolumeName = util.VhostUserSocketVolumeName
}

var nicType string
switch {
case podRequest.DeviceID != "":
nicType = util.OffloadType
case podRequest.VhostUserSocketVolumeName != "":
nicType = util.DpdkType
if err = removeShortSharedDir(pod, podRequest.VhostUserSocketVolumeName); err != nil {
if err = removeShortSharedDir(pod, podRequest.VhostUserSocketVolumeName, podRequest.VhostUserSocketConsumption); err != nil {
klog.Error(err.Error())
if err = resp.WriteHeaderAndEntity(http.StatusInternalServerError, request.CniResponse{Err: err.Error()}); err != nil {
klog.Errorf("failed to write response: %v", err)
Expand Down
48 changes: 33 additions & 15 deletions pkg/daemon/handler_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (csh cniServerHandler) validatePodRequest(_ *request.CniRequest) error {
return nil
}

func createShortSharedDir(pod *v1.Pod, volumeName, kubeletDir string) (err error) {
func createShortSharedDir(pod *v1.Pod, volumeName, socketConsumption, kubeletDir string) (err error) {
var volume *v1.Volume
for index, v := range pod.Spec.Volumes {
if v.Name == volumeName {
Expand All @@ -41,32 +41,50 @@ func createShortSharedDir(pod *v1.Pod, volumeName, kubeletDir string) (err error
// set vhostuser dir 777 for qemu has the permission to create sock
mask := syscall.Umask(0)
defer syscall.Umask(mask)
if _, err = os.Stat(newSharedDir); os.IsNotExist(err) {
err = os.MkdirAll(newSharedDir, 0o777)
if err != nil {
return fmt.Errorf("createSharedDir: Failed to create dir (%s): %v", newSharedDir, err)
}

if strings.Contains(newSharedDir, util.DefaultHostVhostuserBaseDir) {
klog.Infof("createSharedDir: Mount from %s to %s", originSharedDir, newSharedDir)
err = unix.Mount(originSharedDir, newSharedDir, "", unix.MS_BIND, "")
if _, err = os.Stat(newSharedDir); err != nil {
if os.IsNotExist(err) {
err = os.MkdirAll(newSharedDir, 0o777)
if err != nil {
return fmt.Errorf("createSharedDir: Failed to bind mount: %s", err)
return fmt.Errorf("createSharedDir: Failed to create dir (%s): %v", newSharedDir, err)
}

if strings.Contains(newSharedDir, util.DefaultHostVhostuserBaseDir) {
klog.Infof("createSharedDir: Mount from %s to %s", originSharedDir, newSharedDir)
err = unix.Mount(originSharedDir, newSharedDir, "", unix.MS_BIND, "")
if err != nil {
return fmt.Errorf("createSharedDir: Failed to bind mount: %s", err)
}
}
return nil
}
return nil
return err
}

if socketConsumption != util.ConsumptionKubevirt {
return fmt.Errorf("createSharedDir: voume name %s is exists", volumeName)
}
return err

return nil
}

func removeShortSharedDir(pod *v1.Pod, volumeName string) (err error) {
func removeShortSharedDir(pod *v1.Pod, volumeName, socketConsumption string) (err error) {
sharedDir := getShortSharedDir(pod.UID, volumeName)
if _, err = os.Stat(sharedDir); os.IsNotExist(err) {
klog.Errorf("shared directory %s does not exist to unmount, %s", sharedDir, err)
klog.Infof("shared directory %s does not exist to unmount, %s", sharedDir, err)
return nil
}

// keep mount util dpdk sock not used by kuebvirt
if socketConsumption == util.ConsumptionKubevirt {
files, err := os.ReadDir(sharedDir)
if err != nil {
return fmt.Errorf("read file from dpdk share dir error: %s", err)
}
if len(files) != 0 {
return nil
}
}

foundMount, err := mountinfo.Mounted(sharedDir)
if errors.Is(err, fs.ErrNotExist) || (err == nil && !foundMount) {
klog.Infof("volume: %s not mounted, no need to unmount", sharedDir)
Expand Down
4 changes: 2 additions & 2 deletions pkg/daemon/handler_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ func (csh cniServerHandler) validatePodRequest(req *request.CniRequest) error {
return nil
}

func createShortSharedDir(pod *v1.Pod, volumeName, kubeletDir string) error {
func createShortSharedDir(pod *v1.Pod, volumeName, socketConsumption, kubeletDir string) error {
// nothing to do on Windows
return nil
}

func removeShortSharedDir(pod *v1.Pod, volumeName string) error {
func removeShortSharedDir(pod *v1.Pod, volumeName, socketConsumption string) error {
// nothing to do on Windows
return nil
}
14 changes: 10 additions & 4 deletions pkg/daemon/ovs_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,24 @@ import (

var pciAddrRegexp = regexp.MustCompile(`\b([0-9a-fA-F]{4}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}.\d{1}\S*)`)

func (csh cniServerHandler) configureDpdkNic(podName, podNamespace, provider, netns, containerID, ifName, _ string, _ int, ip, _, ingress, egress, shortSharedDir, socketName string) error {
func (csh cniServerHandler) configureDpdkNic(podName, podNamespace, provider, netns, containerID, ifName, _ string, _ int, ip, _, ingress, egress, shortSharedDir, socketName, socketConsumption string) error {
sharedDir := filepath.Join("/var", shortSharedDir)
hostNicName, _ := generateNicName(containerID, ifName)

ipStr := util.GetIPWithoutMask(ip)
ifaceID := ovs.PodNameToPortName(podName, podNamespace, provider)
ovs.CleanDuplicatePort(ifaceID, hostNicName)
// Add veth pair host end to ovs port

vhostServerPath := path.Join(sharedDir, socketName)
if socketConsumption == util.ConsumptionKubevirt {
vhostServerPath = path.Join(sharedDir, ifName)
}

// Add vhostuser host end to ovs port
output, err := ovs.Exec(ovs.MayExist, "add-port", "br-int", hostNicName, "--",
"set", "interface", hostNicName,
"type=dpdkvhostuserclient",
fmt.Sprintf("options:vhost-server-path=%s", path.Join(sharedDir, socketName)),
fmt.Sprintf("options:vhost-server-path=%s", vhostServerPath),
fmt.Sprintf("external_ids:iface-id=%s", ifaceID),
fmt.Sprintf("external_ids:pod_name=%s", podName),
fmt.Sprintf("external_ids:pod_namespace=%s", podNamespace),
Expand Down Expand Up @@ -1559,7 +1565,7 @@ func turnOffNicTxChecksum(nicName string) (err error) {
}

func getShortSharedDir(uid types.UID, volumeName string) string {
return filepath.Join(util.DefaultHostVhostuserBaseDir, string(uid), volumeName)
return filepath.Clean(filepath.Join(util.DefaultHostVhostuserBaseDir, string(uid), volumeName))
}

func linkExists(name string) (bool, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/daemon/ovs_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/kubeovn/kube-ovn/pkg/util"
)

func (csh cniServerHandler) configureDpdkNic(podName, podNamespace, provider, netns, containerID, ifName, mac string, mtu int, ip, gateway, ingress, egress, sharedDir, socketName string) error {
func (csh cniServerHandler) configureDpdkNic(podName, podNamespace, provider, netns, containerID, ifName, mac string, mtu int, ip, gateway, ingress, egress, sharedDir, socketName, socketConsumption string) error {
return errors.New("DPDK is not supported on Windows")
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/request/cniserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ type CniRequest struct {
DeviceID string `json:"deviceID"`
// dpdk
// empty dir volume for sharing vhost user unix socket
VhostUserSocketVolumeName string `json:"vhost_user_socket_volume_name"`
VhostUserSocketName string `json:"vhost_user_socket_name"`
VhostUserSocketVolumeName string `json:"vhost_user_socket_volume_name"`
VhostUserSocketName string `json:"vhost_user_socket_name"`
VhostUserSocketConsumption string `json:"vhost_user_socket_consumption"`
}

// CniResponse is the cniserver response format
Expand Down
3 changes: 3 additions & 0 deletions pkg/util/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,4 +282,7 @@ const (
TProxyPreroutingMask = 0x90004

HealthCheckNamedVipTemplate = "%s:%s" // ip name, health check vip

ConsumptionKubevirt = "kubevirt"
VhostUserSocketVolumeName = "vhostuser-sockets"
)

0 comments on commit 1186636

Please sign in to comment.