Skip to content

Commit

Permalink
Avoid csidriver collisions by using a name formatter
Browse files Browse the repository at this point in the history
  • Loading branch information
addyess committed Mar 1, 2025
1 parent 3f03390 commit 6d8fe47
Show file tree
Hide file tree
Showing 11 changed files with 287 additions and 160 deletions.
28 changes: 25 additions & 3 deletions charmcraft.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,28 @@ config:
description: "Whether or not csi-*plugin-provisioner deployments use host-networking"
type: boolean

csidriver-name-formatter:
default: "{name}"
description: |
Formatter for the 2 CSIDrivers managed by this charm:
- rbd.csi.ceph.com
- cephfs.csi.ceph.com
The formatter is a string that can contain the following placeholders:
- {name} - original name from the manifest
- {app} - the name of the juju application
- {namespace} - the charm configured namespace
Example:
juju config ceph-csi csidriver-name-formatter="{name}.{app}"
The default is to use the built-in storage classname
NOTE: Can only be specified on deployment since some
attributes of kubernetes resources are non-modifiable.
The admin is responsible for creating the namespace.
type: string

cephfs-storage-class-name-formatter:
default: "cephfs"
description: |
Expand All @@ -81,7 +103,7 @@ config:
- {pool-id} - the id of the data-pool
Example:
juju config ceph-csi cephfs-storage-class-name-formatter="{cluster}-{namespace}-{storageclass}"
juju config ceph-csi cephfs-storage-class-name-formatter="cephfs-{namespace}-{pool}"
The default is to use the storage class name
type: string
Expand Down Expand Up @@ -151,7 +173,7 @@ config:
- {namespace} - the charm configured namespace
Example:
juju config ceph-csi ceph-xfs-storage-class-name-formatter="{cluster}-{namespace}-{storageclass}"
juju config ceph-csi ceph-xfs-storage-class-name-formatter="ceph-xfs-{app}"
The default is to use the storage class name
type: string
Expand Down Expand Up @@ -179,7 +201,7 @@ config:
- {namespace} - the charm configured namespace
Example:
juju config ceph-csi ceph-ext4-storage-class-name-formatter="{cluster}-{namespace}-{storageclass}"
juju config ceph-csi ceph-ext4-storage-class-name-formatter="ceph-ext4-{app}"
The default is to use the storage class name
type: string
Expand Down
10 changes: 10 additions & 0 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def __init__(self, *args: Any) -> None:
self.stored.set_default(config_hash=0) # hashed value of the provider config once valid
self.stored.set_default(destroying=False) # True when the charm is being shutdown
self.stored.set_default(namespace=self._configured_ns)
self.stored.set_default(drivername=self._configured_drivername)

self.collector = Collector(
ConfigManifests(self),
Expand Down Expand Up @@ -127,6 +128,10 @@ def _update_status(self) -> None:
raise status.ReconcilerError("Waiting for deployment")
elif self.stored.namespace != self._configured_ns:
status.add(ops.BlockedStatus("Namespace cannot be changed after deployment"))
elif self.stored.drivername != self._configured_drivername:
status.add(
ops.BlockedStatus("csidriver-name-formatter cannot be changed after deployment")
)
else:
self.unit.set_workload_version(self.collector.short_version)
if self.unit.is_leader():
Expand All @@ -146,6 +151,11 @@ def _configured_ns(self) -> str:
"""Currently configured namespace."""
return str(self.config.get("namespace") or self.DEFAULT_NAMESPACE)

@property
def _configured_drivername(self) -> str:
"""Currently configured csi drivername."""
return str(self.config.get("csidriver-name-formatter") or "{name}")

@property
def ceph_data(self) -> Dict[str, Any]:
"""Return Ceph data from ceph-client relation"""
Expand Down
114 changes: 101 additions & 13 deletions src/manifests_base.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import logging
import pickle
from functools import cached_property
from hashlib import md5
from typing import Any, Dict, Generator, List, Optional
from typing import Any, Dict, Generator, List, Optional, Tuple, cast

from lightkube.codecs import AnyResource
from lightkube.core.resource import NamespacedResource
from lightkube.models.core_v1 import Toleration
from ops.manifests import Addition, ManifestLabel, Manifests, Patch
from ops.manifests.literals import APP_LABEL
from ops.manifests import Addition, Manifests, Patch

log = logging.getLogger(__name__)

Expand All @@ -19,6 +19,13 @@ def hash(self) -> int:
"""Calculate a hash of the current configuration."""
return int(md5(pickle.dumps(self.config)).hexdigest(), 16)

@property
def csidriver(self) -> "CSIDriverAdjustments":
for manipulation in self.manipulations:
if isinstance(manipulation, CSIDriverAdjustments):
return manipulation
raise ValueError("CSIDriverAdjustments not found")

@property
def config(self) -> Dict[str, Any]:
return {} # pragma: no cover
Expand All @@ -36,16 +43,6 @@ def __call__(self, obj: AnyResource) -> None:
obj.metadata.namespace = ns


class ManifestLabelExcluder(ManifestLabel):
"""Exclude applying labels to CSIDriver."""

def __call__(self, obj: AnyResource) -> None:
super().__call__(obj)
if obj.kind == "CSIDriver" and obj.metadata and obj.metadata.labels:
# Remove the app label from the CSIDriver to disassociate it from the application
obj.metadata.labels.pop(APP_LABEL, None)


class RbacAdjustments(Patch):
"""Update RBAC Attributes."""

Expand Down Expand Up @@ -251,3 +248,94 @@ def from_space_separated(cls, tolerations: str) -> List["CephToleration"]:
return [cls._from_string(toleration) for toleration in tolerations.split()]
except ValueError as e:
raise ValueError(f"Invalid tolerations: {e}") from e


class ProvisionerAdjustments(Patch):
"""Update provisioner manifest objects."""

PROVISIONER_NAME: str
PLUGIN_NAME: str

def tolerations(self) -> Tuple[List[CephToleration], bool]:
return [], False

def adjust_container_specs(self, obj: AnyResource) -> None:
csidriver = cast(SafeManifest, self.manifests).csidriver
original_dn, updated_dn = csidriver.default_name, csidriver.formatted
kubelet_dir = self.manifests.config.get("kubelet_dir", "/var/lib/kubelet")

for c in obj.spec.template.spec.containers:
for idx in range(len(c.args)):
if original_dn in c.args[idx]:
c.args[idx] = c.args[idx].replace(original_dn, updated_dn)
if "/var/lib/kubelet" in c.args[idx]:
c.args[idx] = c.args[idx].replace("/var/lib/kubelet", kubelet_dir)
for m in c.volumeMounts:
m.mountPath = m.mountPath.replace("/var/lib/kubelet", kubelet_dir)
for v in obj.spec.template.spec.volumes:
if v.hostPath:
v.hostPath.path = v.hostPath.path.replace("/var/lib/kubelet", kubelet_dir)
v.hostPath.path = v.hostPath.path.replace(original_dn, updated_dn)

def __call__(self, obj: AnyResource) -> None:
"""Use the provisioner-replicas and enable-host-networking to update obj."""
tolerations, legacy = self.tolerations()

if (
obj.kind == "Deployment"
and obj.metadata
and obj.metadata.name == self.PROVISIONER_NAME
):
obj.spec.replicas = replica = self.manifests.config.get("provisioner-replicas")
log.info(f"Updating deployment replicas to {replica}")

obj.spec.template.spec.tolerations = tolerations
log.info("Updating deployment tolerations")

obj.spec.template.spec.hostNetwork = host_network = self.manifests.config.get(
"enable-host-networking"
)
log.info(f"Updating deployment hostNetwork to {host_network}")

log.info("Updating deployment specs")
self.adjust_container_specs(obj)

if obj.kind == "DaemonSet" and obj.metadata and obj.metadata.name == self.PLUGIN_NAME:
log.info("Updating daemonset tolerations")
obj.spec.template.spec.tolerations = (
tolerations if not legacy else [CephToleration(operator="Exists")]
)

log.info("Updating daemonset specs")
self.adjust_container_specs(obj)


class CSIDriverAdjustments(Patch):
"""Update CSI driver."""

NAME_FORMATTER = "csidriver-name-formatter"
REQUIRED_CONFIG = {NAME_FORMATTER}

def __init__(self, manifests: Manifests, default_name: str):
super().__init__(manifests)
self.default_name = default_name

@cached_property
def formatted(self) -> str:
"""Rename the object."""
formatter = self.manifests.config[self.NAME_FORMATTER]
fmt_context = {
"name": self.default_name,
"app": self.manifests.model.app.name,
"namespace": self.manifests.config["namespace"],
}
return formatter.format(**fmt_context)

def __call__(self, obj: AnyResource) -> None:
"""Format the name for the CSIDriver and and update obj."""
if not obj.metadata or not obj.metadata.name:
log.error("Object is missing metadata or name. %s", obj)
return

if obj.kind == "CSIDriver":
obj.metadata.name = self.formatted
60 changes: 15 additions & 45 deletions src/manifests_cephfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
from lightkube.codecs import AnyResource
from lightkube.resources.core_v1 import Secret
from lightkube.resources.storage_v1 import StorageClass
from ops.manifests import Addition, ConfigRegistry, Patch
from ops.manifests import Addition, ConfigRegistry
from ops.manifests.manipulations import Subtraction

from manifests_base import (
AdjustNamespace,
CephToleration,
ConfigureLivenessPrometheus,
ManifestLabelExcluder,
CSIDriverAdjustments,
ProvisionerAdjustments,
RbacAdjustments,
SafeManifest,
StorageClassFactory,
Expand Down Expand Up @@ -86,10 +87,10 @@ class CephStorageClass(StorageClassFactory):

FILESYSTEM_LISTING = "fs_list"
REQUIRED_CONFIG = {"fsid", FILESYSTEM_LISTING}
PROVISIONER = "cephfs.csi.ceph.com"

def create(self, param: CephStorageClassParameters) -> AnyResource:
"""Create a storage class object."""
driver_name = cast(SafeManifest, self.manifests).csidriver.formatted

ns = self.manifests.config["namespace"]
metadata: Dict[str, Any] = dict(name=param.storage_class_name)
Expand Down Expand Up @@ -117,7 +118,7 @@ def create(self, param: CephStorageClassParameters) -> AnyResource:
return StorageClass.from_dict(
dict(
metadata=metadata,
provisioner=self.PROVISIONER,
provisioner=driver_name,
allowVolumeExpansion=True,
reclaimPolicy="Delete",
parameters=parameters,
Expand Down Expand Up @@ -177,12 +178,13 @@ def parameter_list(self) -> List[CephStorageClassParameters]:

def __call__(self) -> List[AnyResource]:
"""Craft the storage class objects."""
driver_name = cast(SafeManifest, self.manifests).csidriver.formatted

if cast(SafeManifest, self.manifests).purging:
# If we are purging, we may not be able to create any storage classes
# Just return a fake storage class to satisfy delete_manifests method
# which will look up all storage classes installed by this app/manifest
return [StorageClass.from_dict(dict(metadata={}, provisioner=self.PROVISIONER))]
return [StorageClass.from_dict(dict(metadata={}, provisioner=driver_name))]

if not self.manifests.config.get("enabled"):
# If cephfs is not enabled, we cannot add any storage classes
Expand All @@ -200,7 +202,7 @@ def __call__(self) -> List[AnyResource]:
return [self.create(class_param) for class_param in parameter_list]


class ProvisionerAdjustments(Patch):
class FSProvAdjustments(ProvisionerAdjustments):
"""Update Cephfs provisioner."""

PROVISIONER_NAME = "csi-cephfsplugin-provisioner"
Expand All @@ -212,41 +214,6 @@ def tolerations(self) -> Tuple[List[CephToleration], bool]:
return [], True
return CephToleration.from_space_separated(cfg), False

def __call__(self, obj: AnyResource) -> None:
"""Use the provisioner-replicas and enable-host-networking to update obj."""
tolerations, legacy = self.tolerations()
if (
obj.kind == "Deployment"
and obj.metadata
and obj.metadata.name == self.PROVISIONER_NAME
):
obj.spec.replicas = replica = self.manifests.config.get("provisioner-replicas")
log.info(f"Updating deployment replicas to {replica}")

obj.spec.template.spec.tolerations = tolerations
log.info("Updating deployment tolerations")

obj.spec.template.spec.hostNetwork = host_network = self.manifests.config.get(
"enable-host-networking"
)
log.info(f"Updating deployment hostNetwork to {host_network}")
if obj.kind == "DaemonSet" and obj.metadata and obj.metadata.name == self.PLUGIN_NAME:
obj.spec.template.spec.tolerations = (
tolerations if not legacy else [CephToleration(operator="Exists")]
)
log.info("Updating daemonset tolerations")

kubelet_dir = self.manifests.config.get("kubelet_dir", "/var/lib/kubelet")

for c in obj.spec.template.spec.containers:
c.args = [arg.replace("/var/lib/kubelet", kubelet_dir) for arg in c.args]
for m in c.volumeMounts:
m.mountPath = m.mountPath.replace("/var/lib/kubelet", kubelet_dir)
for v in obj.spec.template.spec.volumes:
if v.hostPath:
v.hostPath.path = v.hostPath.path.replace("/var/lib/kubelet", kubelet_dir)
log.info(f"Updating daemonset kubeletDir to {kubelet_dir}")


class RemoveCephFS(Subtraction):
"""Remove all Cephfs resources when disabled."""
Expand All @@ -257,7 +224,9 @@ def __call__(self, _obj: AnyResource) -> bool:


class CephFSManifests(SafeManifest):
"""Deployment Specific details for the aws-ebs-csi-driver."""
"""Deployment Specific details for the cephfs.csi.ceph.com driver."""

DRIVER_NAME = "cephfs.csi.ceph.com"

def __init__(self, charm: "CephCsiCharm"):
super().__init__(
Expand All @@ -266,10 +235,10 @@ def __init__(self, charm: "CephCsiCharm"):
"upstream/cephfs",
[
StorageSecret(self),
ManifestLabelExcluder(self),
ConfigRegistry(self),
ProvisionerAdjustments(self),
FSProvAdjustments(self),
CephStorageClass(self, STORAGE_TYPE),
CSIDriverAdjustments(self, self.DRIVER_NAME),
RbacAdjustments(self),
RemoveCephFS(self),
AdjustNamespace(self),
Expand Down Expand Up @@ -300,9 +269,10 @@ def config(self) -> Dict:
if value == "" or value is None:
del config[key]

config["namespace"] = self.charm.stored.namespace
config["release"] = config.pop("release", None)
config["enabled"] = config.get("cephfs-enable", None)
config["namespace"] = self.charm.stored.namespace
config["csidriver-name-formatter"] = self.charm.stored.drivername
return config

def evaluate(self) -> Optional[str]:
Expand Down
Loading

0 comments on commit 6d8fe47

Please sign in to comment.