Skip to content

Commit

Permalink
Merge pull request #1538 from MadnessASAP/ceph-refactor
Browse files Browse the repository at this point in the history
Refactor Ceph configuration parsing
  • Loading branch information
stgraber authored Feb 27, 2025
2 parents 7a6fac5 + 0e8e519 commit d863e58
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 290 deletions.
58 changes: 37 additions & 21 deletions internal/server/device/device_utils_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,35 @@ const RBDFormatPrefix = "rbd"
// RBDFormatSeparator is the field separate used in disk paths for RBD devices.
const RBDFormatSeparator = " "

// DiskParseRBDFormat parses an rbd formatted string, and returns the pool name, volume name, and list of options.
func DiskParseRBDFormat(rbd string) (string, string, []string, error) {
if !strings.HasPrefix(rbd, fmt.Sprintf("%s%s", RBDFormatPrefix, RBDFormatSeparator)) {
return "", "", nil, fmt.Errorf("Invalid rbd format, missing prefix")
// DiskParseRBDFormat parses an rbd formatted string, and returns the pool name, volume name, and map of options.
func DiskParseRBDFormat(rbd string) (string, string, map[string]string, error) {
// Remove and check the prefix.
prefix, rbd, _ := strings.Cut(rbd, RBDFormatSeparator)
if prefix != RBDFormatPrefix {
return "", "", nil, fmt.Errorf("Invalid rbd format, wrong prefix: %q", prefix)
}

fields := strings.SplitN(rbd, RBDFormatSeparator, 3)
if len(fields) != 3 {
return "", "", nil, fmt.Errorf("Invalid rbd format, invalid number of fields")
// Split the path and options.
path, rawOpts, _ := strings.Cut(rbd, RBDFormatSeparator)

// Check for valid RBD path.
pool, volume, validPath := strings.Cut(path, "/")
if !validPath {
return "", "", nil, fmt.Errorf("Invalid rbd format, missing pool and/or volume: %q", path)
}

opts := fields[2]
// Parse options.
opts := make(map[string]string)
for _, o := range strings.Split(rawOpts, ":") {
k, v, isValid := strings.Cut(o, "=")
if !isValid {
return "", "", nil, fmt.Errorf("Invalid rbd format, bad option: %q", o)
}

fields = strings.SplitN(fields[1], "/", 2)
if len(fields) != 2 {
return "", "", nil, fmt.Errorf("Invalid rbd format, invalid pool or volume")
opts[k] = v
}

return fields[0], fields[1], strings.Split(opts, ":"), nil
return pool, volume, opts, nil
}

// DiskGetRBDFormat returns a rbd formatted string with the given values.
Expand All @@ -59,7 +69,6 @@ func DiskGetRBDFormat(clusterName string, userName string, poolName string, volu
opts := []string{
fmt.Sprintf("id=%s", optEscaper.Replace(userName)),
fmt.Sprintf("pool=%s", optEscaper.Replace(poolName)),
fmt.Sprintf("conf=/etc/ceph/%s.conf", optEscaper.Replace(clusterName)),
}

return fmt.Sprintf("%s%s%s/%s%s%s", RBDFormatPrefix, RBDFormatSeparator, optEscaper.Replace(poolName), optEscaper.Replace(volumeName), RBDFormatSeparator, strings.Join(opts, ":"))
Expand All @@ -77,7 +86,7 @@ func BlockFsDetect(dev string) (string, error) {

// IsBlockdev returns boolean indicating whether device is block type.
func IsBlockdev(path string) bool {
// Get a stat struct from the provided path
// Get a stat struct from the provided path.
stat := unix.Stat_t{}
err := unix.Stat(path, &stat)
if err != nil {
Expand Down Expand Up @@ -240,6 +249,12 @@ again:

// diskCephfsOptions returns the mntSrcPath and fsOptions to use for mounting a cephfs share.
func diskCephfsOptions(clusterName string, userName string, fsName string, fsPath string) (string, []string, error) {
// Get the FSID.
fsid, err := storageDrivers.CephFsid(clusterName)
if err != nil {
return "", nil, err
}

// Get the monitor list.
monAddresses, err := storageDrivers.CephMonitors(clusterName)
if err != nil {
Expand All @@ -252,14 +267,15 @@ func diskCephfsOptions(clusterName string, userName string, fsName string, fsPat
return "", nil, err
}

// Prepare mount entry.
fsOptions := []string{
fmt.Sprintf("name=%v", userName),
fmt.Sprintf("secret=%v", secret),
fmt.Sprintf("mds_namespace=%v", fsName),
}
srcPath, fsOptions := storageDrivers.CephBuildMount(
userName,
secret,
fsid,
monAddresses,
fsName,
fsPath,
)

srcPath := strings.Join(monAddresses, ",") + ":/" + fsPath
return srcPath, fsOptions, nil
}

Expand Down
68 changes: 10 additions & 58 deletions internal/server/instance/drivers/driver_qemu.go
Original file line number Diff line number Diff line change
Expand Up @@ -4200,7 +4200,7 @@ func (d *qemu) addDriveConfig(qemuDev map[string]any, bootIndexes map[string]int
} else if isRBDImage {
blockDev["driver"] = "rbd"

_, volName, opts, err := device.DiskParseRBDFormat(driveConf.DevPath)
poolName, volName, opts, err := device.DiskParseRBDFormat(driveConf.DevPath)
if err != nil {
return nil, fmt.Errorf("Failed parsing rbd string: %w", err)
}
Expand All @@ -4225,68 +4225,20 @@ func (d *qemu) addDriveConfig(qemuDev map[string]any, bootIndexes map[string]int
vol := storageDrivers.NewVolume(nil, "", volumeType, rbdContentType, volumeName, nil, nil)
rbdImageName := storageDrivers.CephGetRBDImageName(vol, "", false)

// Parse the options (ceph credentials).
userName := storageDrivers.CephDefaultUser
clusterName := storageDrivers.CephDefaultCluster
poolName := ""

for _, option := range opts {
fields := strings.Split(option, "=")
if len(fields) != 2 {
return nil, fmt.Errorf("Unexpected volume rbd option %q", option)
}

if fields[0] == "id" {
userName = fields[1]
} else if fields[0] == "pool" {
poolName = fields[1]
} else if fields[0] == "conf" {
baseName := filepath.Base(fields[1])
clusterName = strings.TrimSuffix(baseName, ".conf")
// Scan & pass through options.
blockDev["pool"] = poolName
blockDev["image"] = rbdImageName
for key, val := range opts {
// We use 'id' where qemu uses 'user'.
if key == "id" {
blockDev["user"] = val
} else {
blockDev[key] = val
}
}

if poolName == "" {
return nil, fmt.Errorf("Missing pool name")
}

// The aio option isn't available when using the rbd driver.
delete(blockDev, "aio")
blockDev["pool"] = poolName
blockDev["image"] = rbdImageName
blockDev["user"] = userName
blockDev["server"] = []map[string]string{}

// Derference ceph config path.
cephConfPath := fmt.Sprintf("/etc/ceph/%s.conf", clusterName)
target, err := filepath.EvalSymlinks(cephConfPath)
if err == nil {
cephConfPath = target
}

blockDev["conf"] = cephConfPath

// Setup the Ceph cluster config (monitors and keyring).
monitors, err := storageDrivers.CephMonitors(clusterName)
if err != nil {
return nil, err
}

for _, monitor := range monitors {
idx := strings.LastIndex(monitor, ":")
host := monitor[:idx]
port := monitor[idx+1:]

blockDev["server"] = append(blockDev["server"].([]map[string]string), map[string]string{
"host": strings.Trim(host, "[]"),
"port": port,
})
}

rbdSecret, err = storageDrivers.CephKeyring(clusterName, userName)
if err != nil {
return nil, err
}
}

readonly := slices.Contains(driveConf.Opts, "ro")
Expand Down
109 changes: 76 additions & 33 deletions internal/server/storage/drivers/driver_cephfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,8 @@ func (d *cephfs) Create() error {
d.config["cephfs.path"] = d.config["source"]

// Parse the namespace / path.
fields := strings.SplitN(d.config["cephfs.path"], "/", 2)
fsName := fields[0]
fsPath := "/"
if len(fields) > 1 {
fsPath = fields[1]
}
fsName, fsPath, _ := strings.Cut(d.config["cephfs.path"], "/")
fsPath = "/" + fsPath

// If the filesystem already exists, disallow keys associated to creating the filesystem.
fsExists, err := d.fsExists(d.config["cephfs.cluster_name"], d.config["cephfs.user.name"], fsName)
Expand Down Expand Up @@ -265,15 +261,35 @@ func (d *cephfs) Create() error {
return fmt.Errorf("Failed to create directory '%s': %w", mountPoint, err)
}

// Get the credentials and host.
monAddresses, userSecret, err := d.getConfig(d.config["cephfs.cluster_name"], d.config["cephfs.user.name"])
// Collect Ceph information.
clusterName := d.config["cephfs.cluster_name"]
userName := d.config["cephfs.user.name"]

fsid, err := CephFsid(clusterName)
if err != nil {
return err
}

monitors, err := CephMonitors(clusterName)
if err != nil {
return err
}

key, err := CephKeyring(clusterName, userName)
if err != nil {
return err
}

srcPath, options := CephBuildMount(
userName,
key,
fsid,
monitors,
fsName, "/",
)

// Mount the pool.
srcPath := strings.Join(monAddresses, ",") + ":/"
err = TryMount(srcPath, mountPoint, "ceph", 0, fmt.Sprintf("name=%v,secret=%v,mds_namespace=%v", d.config["cephfs.user.name"], userSecret, fsName))
err = TryMount(srcPath, mountPoint, "ceph", 0, strings.Join(options, ","))
if err != nil {
return err
}
Expand All @@ -300,12 +316,8 @@ func (d *cephfs) Create() error {
// Delete clears any local and remote data related to this driver instance.
func (d *cephfs) Delete(op *operations.Operation) error {
// Parse the namespace / path.
fields := strings.SplitN(d.config["cephfs.path"], "/", 2)
fsName := fields[0]
fsPath := "/"
if len(fields) > 1 {
fsPath = fields[1]
}
fsName, fsPath, _ := strings.Cut(d.config["cephfs.path"], "/")
fsPath = "/" + fsPath

// Create a temporary mountpoint.
mountPath, err := os.MkdirTemp("", "incus_cephfs_")
Expand All @@ -326,15 +338,35 @@ func (d *cephfs) Delete(op *operations.Operation) error {
return fmt.Errorf("Failed to create directory '%s': %w", mountPoint, err)
}

// Get the credentials and host.
monAddresses, userSecret, err := d.getConfig(d.config["cephfs.cluster_name"], d.config["cephfs.user.name"])
// Collect Ceph information.
clusterName := d.config["cephfs.cluster_name"]
userName := d.config["cephfs.user.name"]

fsid, err := CephFsid(clusterName)
if err != nil {
return err
}

monitors, err := CephMonitors(clusterName)
if err != nil {
return err
}

key, err := CephKeyring(clusterName, userName)
if err != nil {
return err
}

srcPath, options := CephBuildMount(
userName,
key,
fsid,
monitors,
fsName, "/",
)

// Mount the pool.
srcPath := strings.Join(monAddresses, ",") + ":/"
err = TryMount(srcPath, mountPoint, "ceph", 0, fmt.Sprintf("name=%v,secret=%v,mds_namespace=%v", d.config["cephfs.user.name"], userSecret, fsName))
err = TryMount(srcPath, mountPoint, "ceph", 0, strings.Join(options, ","))
if err != nil {
return err
}
Expand Down Expand Up @@ -397,28 +429,39 @@ func (d *cephfs) Mount() (bool, error) {
}

// Parse the namespace / path.
fields := strings.SplitN(d.config["cephfs.path"], "/", 2)
fsName := fields[0]
fsPath := ""
if len(fields) > 1 {
fsPath = fields[1]
fsName, fsPath, _ := strings.Cut(d.config["cephfs.path"], "/")
fsPath = "/" + fsPath

// Collect Ceph information.
clusterName := d.config["cephfs.cluster_name"]
userName := d.config["cephfs.user.name"]

fsid, err := CephFsid(clusterName)
if err != nil {
return false, err
}

// Get the credentials and host.
monAddresses, userSecret, err := d.getConfig(d.config["cephfs.cluster_name"], d.config["cephfs.user.name"])
monitors, err := CephMonitors(clusterName)
if err != nil {
return false, err
}

// Mount options.
options := fmt.Sprintf("name=%s,secret=%s,mds_namespace=%s", d.config["cephfs.user.name"], userSecret, fsName)
if util.IsTrue(d.config["cephfs.fscache"]) {
options += ",fsc"
key, err := CephKeyring(clusterName, userName)
if err != nil {
return false, err
}

srcPath, options := CephBuildMount(
userName,
key,
fsid,
monitors,
fsName,
fsPath,
)

// Mount the pool.
srcPath := strings.Join(monAddresses, ",") + ":/" + fsPath
err = TryMount(srcPath, GetPoolMountPath(d.name), "ceph", 0, options)
err = TryMount(srcPath, GetPoolMountPath(d.name), "ceph", 0, strings.Join(options, ","))
if err != nil {
return false, err
}
Expand Down
17 changes: 0 additions & 17 deletions internal/server/storage/drivers/driver_cephfs_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,3 @@ func (d *cephfs) osdPoolExists(clusterName string, userName string, osdPoolName

return true, nil
}

// getConfig parses the Ceph configuration file and returns the list of monitors and secret key.
func (d *cephfs) getConfig(clusterName string, userName string) ([]string, string, error) {
// Get the monitor list.
monitors, err := CephMonitors(clusterName)
if err != nil {
return nil, "", err
}

// Get the keyring entry.
secret, err := CephKeyring(clusterName, userName)
if err != nil {
return nil, "", err
}

return monitors, secret, nil
}
Loading

0 comments on commit d863e58

Please sign in to comment.