diff --git a/internal/server/device/device_utils_disk.go b/internal/server/device/device_utils_disk.go index b38d59c7ae1..36ceef93383 100644 --- a/internal/server/device/device_utils_disk.go +++ b/internal/server/device/device_utils_disk.go @@ -30,25 +30,35 @@ const RBDFormatPrefix = "rbd" // RBDFormatSeparator is the field separate used in disk paths for RBD devices. const RBDFormatSeparator = " " -// DiskParseRBDFormat parses an rbd formatted string, and returns the pool name, volume name, and list of options. -func DiskParseRBDFormat(rbd string) (string, string, []string, error) { - if !strings.HasPrefix(rbd, fmt.Sprintf("%s%s", RBDFormatPrefix, RBDFormatSeparator)) { - return "", "", nil, fmt.Errorf("Invalid rbd format, missing prefix") +// DiskParseRBDFormat parses an rbd formatted string, and returns the pool name, volume name, and map of options. +func DiskParseRBDFormat(rbd string) (string, string, map[string]string, error) { + // Remove and check the prefix. + prefix, rbd, _ := strings.Cut(rbd, RBDFormatSeparator) + if prefix != RBDFormatPrefix { + return "", "", nil, fmt.Errorf("Invalid rbd format, wrong prefix: %q", prefix) } - fields := strings.SplitN(rbd, RBDFormatSeparator, 3) - if len(fields) != 3 { - return "", "", nil, fmt.Errorf("Invalid rbd format, invalid number of fields") + // Split the path and options. + path, rawOpts, _ := strings.Cut(rbd, RBDFormatSeparator) + + // Check for valid RBD path. + pool, volume, validPath := strings.Cut(path, "/") + if !validPath { + return "", "", nil, fmt.Errorf("Invalid rbd format, missing pool and/or volume: %q", path) } - opts := fields[2] + // Parse options. + opts := make(map[string]string) + for _, o := range strings.Split(rawOpts, ":") { + k, v, isValid := strings.Cut(o, "=") + if !isValid { + return "", "", nil, fmt.Errorf("Invalid rbd format, bad option: %q", o) + } - fields = strings.SplitN(fields[1], "/", 2) - if len(fields) != 2 { - return "", "", nil, fmt.Errorf("Invalid rbd format, invalid pool or volume") + opts[k] = v } - return fields[0], fields[1], strings.Split(opts, ":"), nil + return pool, volume, opts, nil } // DiskGetRBDFormat returns a rbd formatted string with the given values. @@ -59,7 +69,6 @@ func DiskGetRBDFormat(clusterName string, userName string, poolName string, volu opts := []string{ fmt.Sprintf("id=%s", optEscaper.Replace(userName)), fmt.Sprintf("pool=%s", optEscaper.Replace(poolName)), - fmt.Sprintf("conf=/etc/ceph/%s.conf", optEscaper.Replace(clusterName)), } return fmt.Sprintf("%s%s%s/%s%s%s", RBDFormatPrefix, RBDFormatSeparator, optEscaper.Replace(poolName), optEscaper.Replace(volumeName), RBDFormatSeparator, strings.Join(opts, ":")) @@ -77,7 +86,7 @@ func BlockFsDetect(dev string) (string, error) { // IsBlockdev returns boolean indicating whether device is block type. func IsBlockdev(path string) bool { - // Get a stat struct from the provided path + // Get a stat struct from the provided path. stat := unix.Stat_t{} err := unix.Stat(path, &stat) if err != nil { @@ -240,6 +249,12 @@ again: // diskCephfsOptions returns the mntSrcPath and fsOptions to use for mounting a cephfs share. func diskCephfsOptions(clusterName string, userName string, fsName string, fsPath string) (string, []string, error) { + // Get the FSID. + fsid, err := storageDrivers.CephFsid(clusterName) + if err != nil { + return "", nil, err + } + // Get the monitor list. monAddresses, err := storageDrivers.CephMonitors(clusterName) if err != nil { @@ -252,14 +267,15 @@ func diskCephfsOptions(clusterName string, userName string, fsName string, fsPat return "", nil, err } - // Prepare mount entry. - fsOptions := []string{ - fmt.Sprintf("name=%v", userName), - fmt.Sprintf("secret=%v", secret), - fmt.Sprintf("mds_namespace=%v", fsName), - } + srcPath, fsOptions := storageDrivers.CephBuildMount( + userName, + secret, + fsid, + monAddresses, + fsName, + fsPath, + ) - srcPath := strings.Join(monAddresses, ",") + ":/" + fsPath return srcPath, fsOptions, nil } diff --git a/internal/server/instance/drivers/driver_qemu.go b/internal/server/instance/drivers/driver_qemu.go index f1854a35567..1c7f98b1b9c 100644 --- a/internal/server/instance/drivers/driver_qemu.go +++ b/internal/server/instance/drivers/driver_qemu.go @@ -4200,7 +4200,7 @@ func (d *qemu) addDriveConfig(qemuDev map[string]any, bootIndexes map[string]int } else if isRBDImage { blockDev["driver"] = "rbd" - _, volName, opts, err := device.DiskParseRBDFormat(driveConf.DevPath) + poolName, volName, opts, err := device.DiskParseRBDFormat(driveConf.DevPath) if err != nil { return nil, fmt.Errorf("Failed parsing rbd string: %w", err) } @@ -4225,68 +4225,20 @@ func (d *qemu) addDriveConfig(qemuDev map[string]any, bootIndexes map[string]int vol := storageDrivers.NewVolume(nil, "", volumeType, rbdContentType, volumeName, nil, nil) rbdImageName := storageDrivers.CephGetRBDImageName(vol, "", false) - // Parse the options (ceph credentials). - userName := storageDrivers.CephDefaultUser - clusterName := storageDrivers.CephDefaultCluster - poolName := "" - - for _, option := range opts { - fields := strings.Split(option, "=") - if len(fields) != 2 { - return nil, fmt.Errorf("Unexpected volume rbd option %q", option) - } - - if fields[0] == "id" { - userName = fields[1] - } else if fields[0] == "pool" { - poolName = fields[1] - } else if fields[0] == "conf" { - baseName := filepath.Base(fields[1]) - clusterName = strings.TrimSuffix(baseName, ".conf") + // Scan & pass through options. + blockDev["pool"] = poolName + blockDev["image"] = rbdImageName + for key, val := range opts { + // We use 'id' where qemu uses 'user'. + if key == "id" { + blockDev["user"] = val + } else { + blockDev[key] = val } } - if poolName == "" { - return nil, fmt.Errorf("Missing pool name") - } - // The aio option isn't available when using the rbd driver. delete(blockDev, "aio") - blockDev["pool"] = poolName - blockDev["image"] = rbdImageName - blockDev["user"] = userName - blockDev["server"] = []map[string]string{} - - // Derference ceph config path. - cephConfPath := fmt.Sprintf("/etc/ceph/%s.conf", clusterName) - target, err := filepath.EvalSymlinks(cephConfPath) - if err == nil { - cephConfPath = target - } - - blockDev["conf"] = cephConfPath - - // Setup the Ceph cluster config (monitors and keyring). - monitors, err := storageDrivers.CephMonitors(clusterName) - if err != nil { - return nil, err - } - - for _, monitor := range monitors { - idx := strings.LastIndex(monitor, ":") - host := monitor[:idx] - port := monitor[idx+1:] - - blockDev["server"] = append(blockDev["server"].([]map[string]string), map[string]string{ - "host": strings.Trim(host, "[]"), - "port": port, - }) - } - - rbdSecret, err = storageDrivers.CephKeyring(clusterName, userName) - if err != nil { - return nil, err - } } readonly := slices.Contains(driveConf.Opts, "ro") diff --git a/internal/server/storage/drivers/driver_cephfs.go b/internal/server/storage/drivers/driver_cephfs.go index 3ae39aec46a..f83dc1a1aaf 100644 --- a/internal/server/storage/drivers/driver_cephfs.go +++ b/internal/server/storage/drivers/driver_cephfs.go @@ -133,12 +133,8 @@ func (d *cephfs) Create() error { d.config["cephfs.path"] = d.config["source"] // Parse the namespace / path. - fields := strings.SplitN(d.config["cephfs.path"], "/", 2) - fsName := fields[0] - fsPath := "/" - if len(fields) > 1 { - fsPath = fields[1] - } + fsName, fsPath, _ := strings.Cut(d.config["cephfs.path"], "/") + fsPath = "/" + fsPath // If the filesystem already exists, disallow keys associated to creating the filesystem. fsExists, err := d.fsExists(d.config["cephfs.cluster_name"], d.config["cephfs.user.name"], fsName) @@ -265,15 +261,35 @@ func (d *cephfs) Create() error { return fmt.Errorf("Failed to create directory '%s': %w", mountPoint, err) } - // Get the credentials and host. - monAddresses, userSecret, err := d.getConfig(d.config["cephfs.cluster_name"], d.config["cephfs.user.name"]) + // Collect Ceph information. + clusterName := d.config["cephfs.cluster_name"] + userName := d.config["cephfs.user.name"] + + fsid, err := CephFsid(clusterName) + if err != nil { + return err + } + + monitors, err := CephMonitors(clusterName) if err != nil { return err } + key, err := CephKeyring(clusterName, userName) + if err != nil { + return err + } + + srcPath, options := CephBuildMount( + userName, + key, + fsid, + monitors, + fsName, "/", + ) + // Mount the pool. - srcPath := strings.Join(monAddresses, ",") + ":/" - err = TryMount(srcPath, mountPoint, "ceph", 0, fmt.Sprintf("name=%v,secret=%v,mds_namespace=%v", d.config["cephfs.user.name"], userSecret, fsName)) + err = TryMount(srcPath, mountPoint, "ceph", 0, strings.Join(options, ",")) if err != nil { return err } @@ -300,12 +316,8 @@ func (d *cephfs) Create() error { // Delete clears any local and remote data related to this driver instance. func (d *cephfs) Delete(op *operations.Operation) error { // Parse the namespace / path. - fields := strings.SplitN(d.config["cephfs.path"], "/", 2) - fsName := fields[0] - fsPath := "/" - if len(fields) > 1 { - fsPath = fields[1] - } + fsName, fsPath, _ := strings.Cut(d.config["cephfs.path"], "/") + fsPath = "/" + fsPath // Create a temporary mountpoint. mountPath, err := os.MkdirTemp("", "incus_cephfs_") @@ -326,15 +338,35 @@ func (d *cephfs) Delete(op *operations.Operation) error { return fmt.Errorf("Failed to create directory '%s': %w", mountPoint, err) } - // Get the credentials and host. - monAddresses, userSecret, err := d.getConfig(d.config["cephfs.cluster_name"], d.config["cephfs.user.name"]) + // Collect Ceph information. + clusterName := d.config["cephfs.cluster_name"] + userName := d.config["cephfs.user.name"] + + fsid, err := CephFsid(clusterName) if err != nil { return err } + monitors, err := CephMonitors(clusterName) + if err != nil { + return err + } + + key, err := CephKeyring(clusterName, userName) + if err != nil { + return err + } + + srcPath, options := CephBuildMount( + userName, + key, + fsid, + monitors, + fsName, "/", + ) + // Mount the pool. - srcPath := strings.Join(monAddresses, ",") + ":/" - err = TryMount(srcPath, mountPoint, "ceph", 0, fmt.Sprintf("name=%v,secret=%v,mds_namespace=%v", d.config["cephfs.user.name"], userSecret, fsName)) + err = TryMount(srcPath, mountPoint, "ceph", 0, strings.Join(options, ",")) if err != nil { return err } @@ -397,28 +429,39 @@ func (d *cephfs) Mount() (bool, error) { } // Parse the namespace / path. - fields := strings.SplitN(d.config["cephfs.path"], "/", 2) - fsName := fields[0] - fsPath := "" - if len(fields) > 1 { - fsPath = fields[1] + fsName, fsPath, _ := strings.Cut(d.config["cephfs.path"], "/") + fsPath = "/" + fsPath + + // Collect Ceph information. + clusterName := d.config["cephfs.cluster_name"] + userName := d.config["cephfs.user.name"] + + fsid, err := CephFsid(clusterName) + if err != nil { + return false, err } - // Get the credentials and host. - monAddresses, userSecret, err := d.getConfig(d.config["cephfs.cluster_name"], d.config["cephfs.user.name"]) + monitors, err := CephMonitors(clusterName) if err != nil { return false, err } - // Mount options. - options := fmt.Sprintf("name=%s,secret=%s,mds_namespace=%s", d.config["cephfs.user.name"], userSecret, fsName) - if util.IsTrue(d.config["cephfs.fscache"]) { - options += ",fsc" + key, err := CephKeyring(clusterName, userName) + if err != nil { + return false, err } + srcPath, options := CephBuildMount( + userName, + key, + fsid, + monitors, + fsName, + fsPath, + ) + // Mount the pool. - srcPath := strings.Join(monAddresses, ",") + ":/" + fsPath - err = TryMount(srcPath, GetPoolMountPath(d.name), "ceph", 0, options) + err = TryMount(srcPath, GetPoolMountPath(d.name), "ceph", 0, strings.Join(options, ",")) if err != nil { return false, err } diff --git a/internal/server/storage/drivers/driver_cephfs_utils.go b/internal/server/storage/drivers/driver_cephfs_utils.go index c23d689e405..edffd3a9f78 100644 --- a/internal/server/storage/drivers/driver_cephfs_utils.go +++ b/internal/server/storage/drivers/driver_cephfs_utils.go @@ -44,20 +44,3 @@ func (d *cephfs) osdPoolExists(clusterName string, userName string, osdPoolName return true, nil } - -// getConfig parses the Ceph configuration file and returns the list of monitors and secret key. -func (d *cephfs) getConfig(clusterName string, userName string) ([]string, string, error) { - // Get the monitor list. - monitors, err := CephMonitors(clusterName) - if err != nil { - return nil, "", err - } - - // Get the keyring entry. - secret, err := CephKeyring(clusterName, userName) - if err != nil { - return nil, "", err - } - - return monitors, secret, nil -} diff --git a/internal/server/storage/drivers/utils_ceph.go b/internal/server/storage/drivers/utils_ceph.go index 9eb86347a6c..1b036eaa60b 100644 --- a/internal/server/storage/drivers/utils_ceph.go +++ b/internal/server/storage/drivers/utils_ceph.go @@ -1,13 +1,13 @@ package drivers import ( - "bufio" + "encoding/json" "fmt" - "os" "strings" "github.com/lxc/incus/v6/shared/api" - "github.com/lxc/incus/v6/shared/util" + "github.com/lxc/incus/v6/shared/logger" + "github.com/lxc/incus/v6/shared/subprocess" ) // CephGetRBDImageName returns the RBD image name as it is used in ceph. @@ -58,194 +58,181 @@ func CephGetRBDImageName(vol Volume, snapName string, zombie bool) string { return out } -// CephMonitors gets the mon-host field for the relevant cluster and extracts the list of addresses and ports. -func CephMonitors(cluster string) ([]string, error) { - // Open the CEPH configuration. - cephConf, err := os.Open(fmt.Sprintf("/etc/ceph/%s.conf", cluster)) - if err != nil { - return nil, fmt.Errorf("Failed to open %q: %w", fmt.Sprintf("/etc/ceph/%s.conf", cluster), err) +// CephBuildMount creates a mount string and option list from mount parameters. +func CephBuildMount(user string, key string, fsid string, monitors Monitors, fsName string, path string) (source string, options []string) { + // Ceph mount paths must begin with a '/', if it doesn't (or is empty). + // prefix it now. The leading '/' can be stripped out during option parsing. + if !strings.HasPrefix(path, "/") { + path = "/" + path } - // Locate the mon-host key and its values. - cephMon := []string{} - scan := bufio.NewScanner(cephConf) - for scan.Scan() { - line := scan.Text() - line = strings.TrimSpace(line) + msgrV2 := false + monAddrs := monitors.V1 + if len(monitors.V2) > 0 { + msgrV2 = true + monAddrs = monitors.V2 + } - if line == "" { - continue - } + // Build the source path. + source = fmt.Sprintf("%s@%s.%s=%s", user, fsid, fsName, path) - if strings.HasPrefix(line, "mon_host") || strings.HasPrefix(line, "mon-host") || strings.HasPrefix(line, "mon host") { - fields := strings.SplitN(line, "=", 2) - if len(fields) < 2 { - continue - } + // Build the options list. + options = []string{ + "mon_addr=" + strings.Join(monAddrs, "/"), + "name=" + user, + } - // Parsing mon_host is quite tricky. - // It supports a space separate list of comma separated lists of: - // - DNS names - // - IPv4 addresses - // - IPv6 addresses (square brackets) - // - Optional version indicator - // - Optional port numbers - // - Optional data (after / separator) - // - Tuples of addresses with all the above still applying inside the tuple - // - // As this function is primarily used for cephfs which - // doesn't take the version indication, trailing bits or supports those - // tuples, all of those effectively get stripped away to get a clean - // address list (with ports). - entries := strings.Split(fields[1], " ") - for _, entry := range entries { - servers := strings.Split(entry, ",") - for _, server := range servers { - // Trim leading/trailing spaces. - server = strings.TrimSpace(server) - - // Trim leading protocol version. - server = strings.TrimPrefix(server, "v1:") - server = strings.TrimPrefix(server, "v2:") - server = strings.TrimPrefix(server, "[v1:") - server = strings.TrimPrefix(server, "[v2:") - - // Trim trailing divider. - server = strings.Split(server, "/")[0] - - // Handle end of nested blocks. - server = strings.ReplaceAll(server, "]]", "]") - if !strings.HasPrefix(server, "[") { - server = strings.TrimSuffix(server, "]") - } - - // Trim any spaces. - server = strings.TrimSpace(server) - - // If nothing left, skip. - if server == "" { - continue - } - - // Append the default v1 port if none are present. - if !strings.HasSuffix(server, ":6789") && !strings.HasSuffix(server, ":3300") { - server += ":6789" - } - - cephMon = append(cephMon, strings.TrimSpace(server)) - } - } - } + // If key is blank assume cephx is disabled. + if key != "" { + options = append(options, "secret="+key) } - if len(cephMon) == 0 { - return nil, fmt.Errorf("Couldn't find a CEPH mon") + // Pick connection mode. + if msgrV2 { + options = append(options, "ms_mode=prefer-crc") + } else { + options = append(options, "ms_mode=legacy") } - return cephMon, nil + return source, options +} + +// callCeph makes a call to ceph with the given args. +func callCeph(args ...string) (string, error) { + out, err := subprocess.RunCommand("ceph", args...) + logger.Debug("callCeph", logger.Ctx{ + "cmd": "ceph", + "args": args, + "err": err, + "out": out, + }) + return strings.TrimSpace(out), err } -func getCephKeyFromFile(path string) (string, error) { - cephKeyring, err := os.Open(path) +// callCephJSON makes a call to the `ceph` admin tool with the given args then parses the json output into `out`. +func callCephJSON(out any, args ...string) error { + // Get as JSON format. + args = append([]string{"--format", "json"}, args...) + + // Make the call. + jsonOut, err := callCeph(args...) if err != nil { - return "", fmt.Errorf("Failed to open %q: %w", path, err) + return err } - // Locate the keyring entry and its value. - var cephSecret string - scan := bufio.NewScanner(cephKeyring) - for scan.Scan() { - line := scan.Text() - line = strings.TrimSpace(line) + // Parse the JSON. + err = json.Unmarshal([]byte(jsonOut), &out) + return err +} - if line == "" { - continue - } +// Monitors holds a list of ceph monitor addresses based on which protocol they expect. +type Monitors struct { + V1 []string + V2 []string +} - if strings.HasPrefix(line, "key") { - fields := strings.SplitN(line, "=", 2) - if len(fields) < 2 { - continue - } +// CephMonitors returns a list of public monitor IP:ports for the given cluster. +func CephMonitors(cluster string) (Monitors, error) { + // Get the monitor dump, there may be other better ways but this is quick and easy. + monitors := struct { + Mons []struct { + PublicAddrs struct { + Addrvec []struct { + Type string `json:"type"` + Addr string `json:"addr"` + } `json:"addrvec"` + } `json:"public_addrs"` + } `json:"mons"` + }{} + + err := callCephJSON(&monitors, + "--cluster", cluster, + "mon", "dump", + ) + if err != nil { + return Monitors{}, fmt.Errorf("Ceph mon dump for %q failed: %w", cluster, err) + } - cephSecret = strings.TrimSpace(fields[1]) - break + // Loop through monitors then monitor addresses and add them to the list. + var ep Monitors + for _, mon := range monitors.Mons { + for _, addr := range mon.PublicAddrs.Addrvec { + if addr.Type == "v1" { + ep.V1 = append(ep.V1, addr.Addr) + } else if addr.Type == "v2" { + ep.V2 = append(ep.V2, addr.Addr) + } else { + logger.Warnf("Unknown ceph monitor address type: %q:%q", + addr.Type, addr.Addr, + ) + } } } - if cephSecret == "" { - return "", fmt.Errorf("Couldn't find a keyring entry") + if len(ep.V2) == 0 { + if len(ep.V1) == 0 { + return Monitors{}, fmt.Errorf("No ceph monitors for %q", cluster) + } + + logger.Warnf("Only found v1 monitors for ceph cluster %q", cluster) } - return cephSecret, nil + return ep, nil } -// CephKeyring gets the key for a particular Ceph cluster and client name. +// CephKeyring retrieves the CephX key for the given entity. func CephKeyring(cluster string, client string) (string, error) { - var cephSecret string - cephConfigPath := fmt.Sprintf("/etc/ceph/%v.conf", cluster) - - keyringPathFull := fmt.Sprintf("/etc/ceph/%v.client.%v.keyring", cluster, client) - keyringPathCluster := fmt.Sprintf("/etc/ceph/%v.keyring", cluster) - keyringPathGlobal := "/etc/ceph/keyring" - keyringPathGlobalBin := "/etc/ceph/keyring.bin" - - if util.PathExists(keyringPathFull) { - return getCephKeyFromFile(keyringPathFull) - } else if util.PathExists(keyringPathCluster) { - return getCephKeyFromFile(keyringPathCluster) - } else if util.PathExists(keyringPathGlobal) { - return getCephKeyFromFile(keyringPathGlobal) - } else if util.PathExists(keyringPathGlobalBin) { - return getCephKeyFromFile(keyringPathGlobalBin) - } else if util.PathExists(cephConfigPath) { - // Open the CEPH config file. - cephConfig, err := os.Open(cephConfigPath) - if err != nil { - return "", fmt.Errorf("Failed to open %q: %w", cephConfigPath, err) - } - - // Locate the keyring entry and its value. - scan := bufio.NewScanner(cephConfig) - for scan.Scan() { - line := scan.Text() - line = strings.TrimSpace(line) + // If client isn't prefixed, prefix it with 'client.'. + if !strings.Contains(client, ".") { + client = "client." + client + } - if line == "" { - continue - } + // Check that cephx is enabled. + authType, err := callCeph( + "--cluster", cluster, + "config", "get", client, "auth_service_required", + ) + if err != nil { + return "", fmt.Errorf( + "Failed to query ceph config for auth_service_required: %w", + err, + ) + } - if strings.HasPrefix(line, "key") { - fields := strings.SplitN(line, "=", 2) - if len(fields) < 2 { - continue - } - - // Check all key related config keys. - switch strings.TrimSpace(fields[0]) { - case "key": - cephSecret = strings.TrimSpace(fields[1]) - case "keyfile": - key, err := os.ReadFile(fields[1]) - if err != nil { - return "", err - } - - cephSecret = strings.TrimSpace(string(key)) - case "keyring": - return getCephKeyFromFile(strings.TrimSpace(fields[1])) - } - } + if authType == "none" { + logger.Infof("Ceph cluster %q has disabled cephx", cluster) + return "", nil + } - if cephSecret != "" { - break - } - } + // Call ceph auth get. + key := struct { + Key string `json:"key"` + }{} + err = callCephJSON(&key, + "--cluster", cluster, + "auth", "get-key", client, + ) + if err != nil { + return "", fmt.Errorf( + "Failed to get keyring for %q on %q: %w", + client, cluster, err, + ) } - if cephSecret == "" { - return "", fmt.Errorf("Couldn't find a keyring entry") + return key.Key, nil +} + +// CephFsid retrieves the FSID for the given cluster. +func CephFsid(cluster string) (string, error) { + // Call ceph fsid. + fsid := struct { + Fsid string `json:"fsid"` + }{} + + err := callCephJSON(&fsid, "--cluster", cluster, "fsid") + if err != nil { + return "", fmt.Errorf("Couldn't get fsid for %q: %w", cluster, err) } - return cephSecret, nil + return fsid.Fsid, nil }