Skip to content

Commit

Permalink
feature(k8s): add general support for multiple K8S clusters
Browse files Browse the repository at this point in the history
Do following changes to SCT to support multiple K8S clusters:
- Add new 'k8s_clusters ' attr to Tester python class.
- Start using that attr in the K8S provisioning methods.
- Add 'k8s_clusters' attr to the Scylla cluster python classes.
- Update all the 'add_nodes' methods with the possibility to use
  different 'dc_idx' values.
- The 'dc_idx' parameter becomes senseful.
- Update the 'disrupt_nodetool_flush_and_reshard_on_kubernetes' nemesis
  to work correctly using the new approach.
- Update various K8S python methods to use new 'k8s_clusters' attr.

What it doesn't do and what will be done in further changes:
- Specific K8S backends updates to be able to create 2+ K8S clusters.
- Scylla-manager update.
- Separation of all the K8S logs. For now 'scylla-operator',
  'scylla-manager', 'cert-manager' and 'scylla-events' streamed logs
  will get written into the same files from each of K8S clusters.
- Proper SNI support for multiple K8S clusters.
- Support of unique values for the wide range of the SCT config options
  such as 'n_db_nodes', 'k8s_n_scylla_pods_per_cluster' and lots of
  others.
- Monitoring logic update to post the DC info.
- New CI jobs configurations.
  • Loading branch information
vponomaryov authored and fruch committed Nov 10, 2023
1 parent ec76eeb commit 6f6ce12
Show file tree
Hide file tree
Showing 11 changed files with 463 additions and 432 deletions.
7 changes: 4 additions & 3 deletions functional_tests/scylla_operator/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#
# Which appears when we define and use fixtures in one single module like we do here.

# TODO: add support for multiDC setups

@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call): # pylint: disable=unused-argument
Expand Down Expand Up @@ -109,6 +110,7 @@ def fixture_db_cluster(tester: ScyllaOperatorFunctionalClusterTester):
verification_node=tester.db_cluster.nodes[0])
tester.db_cluster.wait_for_init(node_list=tester.db_cluster.nodes, wait_for_db_logs=True)
tester.db_cluster.prefill_cluster(dataset_name)
tester.db_cluster.k8s_cluster = tester.db_cluster.k8s_clusters[0]
yield tester.db_cluster

if tester.healthy_flag:
Expand Down Expand Up @@ -224,11 +226,10 @@ def skip_if_not_supported_scylla_version(request: pytest.FixtureRequest,
def skip_based_on_operator_version(request: pytest.FixtureRequest, tester: ScyllaOperatorFunctionalClusterTester):
# pylint: disable=protected-access
if required_operator := request.node.get_closest_marker('required_operator'):
current_version = tester.k8s_cluster._scylla_operator_chart_version.split("-")[0]
current_version = tester.k8s_clusters[0]._scylla_operator_chart_version.split("-")[0]
required_version = required_operator.args[0]
if version_utils.ComparableScyllaOperatorVersion(current_version) < required_version:
pytest.skip(f"require operator version: {required_operator.args[0]} , "
f"current version: {tester.k8s_cluster._scylla_operator_chart_version}")
pytest.skip(f"require operator version: {required_version}, current version: {current_version}")


@pytest.fixture(autouse=True)
Expand Down
2 changes: 2 additions & 0 deletions functional_tests/scylla_operator/test_functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@

log = logging.getLogger()

# TODO: add support for multiDC setups


@pytest.mark.readonly
def test_single_operator_image_tag_is_everywhere(db_cluster):
Expand Down
5 changes: 3 additions & 2 deletions longevity_operator_multi_tenant_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ def _run_test_on_one_tenant(tenant):

self.log.info("Starting tests worker threads")

self.log.info("Clusters count: %s", self.k8s_cluster.tenants_number)
tenants_number = self.k8s_clusters[0].tenants_number
self.log.info("Clusters count: %s", tenants_number)
object_set = ParallelObject(
timeout=int(self.test_duration) * 60,
objects=[[tenant] for tenant in self.tenants],
num_workers=self.k8s_cluster.tenants_number
num_workers=tenants_number,
)
object_set.run(func=_run_test_on_one_tenant, unpack_objects=True, ignore_exceptions=False)
542 changes: 288 additions & 254 deletions sdcm/cluster_k8s/__init__.py

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions sdcm/cluster_k8s/eks.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,15 +651,14 @@ def terminate_k8s_node(self):


class EksScyllaPodCluster(ScyllaPodCluster):
k8s_cluster: 'EksCluster'
PodContainerClass = EksScyllaPodContainer
nodes: List[EksScyllaPodContainer]

# pylint: disable=too-many-arguments
def add_nodes(self,
count: int,
ec2_user_data: str = "",
dc_idx: int = 0,
dc_idx: int = None,
rack: int = 0,
enable_auto_bootstrap: bool = False) -> List[EksScyllaPodContainer]:
new_nodes = super().add_nodes(count=count,
Expand Down
3 changes: 1 addition & 2 deletions sdcm/cluster_k8s/gke.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,15 +496,14 @@ def terminate_k8s_node(self):


class GkeScyllaPodCluster(ScyllaPodCluster):
k8s_cluster: 'GkeCluster'
node_pool: 'GkeNodePool'
PodContainerClass = GkeScyllaPodContainer

# pylint: disable=too-many-arguments
def add_nodes(self,
count: int,
ec2_user_data: str = "",
dc_idx: int = 0,
dc_idx: int = None,
rack: int = 0,
enable_auto_bootstrap: bool = False) -> List[GkeScyllaPodContainer]:
new_nodes = super().add_nodes(count=count,
Expand Down
3 changes: 1 addition & 2 deletions sdcm/cluster_k8s/mini_k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,6 @@ def terminate_k8s_host(self):
# pylint: disable=too-many-ancestors
class LocalMinimalScyllaPodCluster(ScyllaPodCluster):
"""Represents scylla cluster hosted on locally running minimal k8s clusters such as k3d, minikube or kind"""
k8s_cluster: MinimalClusterBase
PodContainerClass = LocalMinimalScyllaPodContainer

def wait_for_nodes_up_and_normal(self, nodes=None, verification_node=None, iterations=20, sleep_time=60, timeout=0): # pylint: disable=too-many-arguments
Expand All @@ -746,7 +745,7 @@ def wait_for_init(self, *_, node_list=None, verbose=False, timeout=None, **__):
self.wait_for_nodes_up_and_normal(nodes=node_list)

def upgrade_scylla_cluster(self, new_version: str) -> None:
self.k8s_cluster.docker_pull(f"{self.params.get('docker_image')}:{new_version}")
self.k8s_clusters[0].docker_pull(f"{self.params.get('docker_image')}:{new_version}")
return super().upgrade_scylla_cluster(new_version)

@staticmethod
Expand Down
48 changes: 22 additions & 26 deletions sdcm/nemesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1287,9 +1287,10 @@ def _terminate_and_wait(self, target_node, sleep_time=300):
def replace_node(self, old_node_ip, host_id, rack=0):
return self._add_and_init_new_cluster_node(old_node_ip, host_id, rack=rack)

def _verify_resharding_on_k8s(self, cpus):
def _verify_resharding_on_k8s(self, cpus, dc_idx):
nodes_data = []
for node in reversed(self.cluster.nodes):
qualified_nodes = (n for n in self.cluster.nodes if n.dc_idx == dc_idx)
for node in reversed(qualified_nodes):
liveness_probe_failures = node.follow_system_log(
patterns=["healthz probe: can't connect to JMX"])
resharding_start = node.follow_system_log(patterns=[DB_LOG_PATTERN_RESHARDING_START])
Expand All @@ -1299,17 +1300,12 @@ def _verify_resharding_on_k8s(self, cpus):
self.log.info(
"Update the cpu count to '%s' CPUs to make Scylla start "
"the resharding process on all the nodes 1 by 1", cpus)
# TODO: properly pick up the rack. For now it assumes we have only one.
self.tester.db_cluster.replace_scylla_cluster_value(
"/spec/datacenter/racks/0/resources", {
"limits": {
"cpu": cpus,
"memory": self.target_node.k8s_cluster.scylla_memory_limit,
},
"requests": {
"cpu": cpus,
"memory": self.target_node.k8s_cluster.scylla_memory_limit,
},
})
"limits": {"cpu": cpus, "memory": self.cluster.k8s_clusters[dc_idx].scylla_memory_limit},
"requests": {"cpu": cpus, "memory": self.cluster.k8s_clusters[dc_idx].scylla_memory_limit},
}, dc_idx=dc_idx)

# Wait for the start of the resharding.
# In K8S it starts from the last node of a rack and then goes to previous ones.
Expand Down Expand Up @@ -1354,35 +1350,35 @@ def _verify_resharding_on_k8s(self, cpus):

def disrupt_nodetool_flush_and_reshard_on_kubernetes(self):
"""Covers https://github.com/scylladb/scylla-operator/issues/894"""
# NOTE: To check resharding we don't need to trigger it on all the nodes,
# so, pick up only one K8S cluster and only if it is EKS.
if not self._is_it_on_kubernetes():
raise UnsupportedNemesis('It is supported only on kubernetes')
if self.cluster.params.get('cluster_backend') != "k8s-eks" or self.cluster.nodes[0].scylla_shards < 14:
dc_idx = 0
for node in self.cluster.nodes:
if hasattr(node.k8s_cluster, 'eks_cluster_version') and node.scylla_shards >= 14:
dc_idx = node.dc_idx

# Calculate new value for the CPU cores dedicated for Scylla pods
current_cpus = convert_cpu_value_from_k8s_to_units(node.k8s_cluster.scylla_cpu_limit)
new_cpus = convert_cpu_units_to_k8s_value(current_cpus + (1 if current_cpus <= 1 else -1))
break
else:
# NOTE: bug https://github.com/scylladb/scylla-operator/issues/1077 reproduces better
# on slower machines with smaller amount number of cores.
# So, allow it to run only on fast K8S-EKS backend having at least 14 cores per pod.
raise UnsupportedNemesis(
"Skipped due to the following bug: "
"https://github.com/scylladb/scylla-operator/issues/1077")

# Calculate new value for the CPU cores dedicated for Scylla pods
current_cpus = convert_cpu_value_from_k8s_to_units(
self.target_node.k8s_cluster.scylla_cpu_limit)
if current_cpus <= 1:
new_cpus = current_cpus + 1
else:
new_cpus = current_cpus - 1
new_cpus = convert_cpu_units_to_k8s_value(new_cpus)
raise UnsupportedNemesis("https://github.com/scylladb/scylla-operator/issues/1077")

# Run 'nodetool flush' command
self.target_node.run_nodetool("flush -- keyspace1")

try:
# Change number of CPUs dedicated for Scylla pods
# and make sure that the resharding process begins and finishes
self._verify_resharding_on_k8s(new_cpus)
self._verify_resharding_on_k8s(new_cpus, dc_idx)
finally:
# Return the cpu count back and wait for the resharding begin and finish
self._verify_resharding_on_k8s(current_cpus)
self._verify_resharding_on_k8s(current_cpus, dc_idx)

def disrupt_drain_kubernetes_node_then_replace_scylla_node(self):
self._disrupt_kubernetes_then_replace_scylla_node('drain_k8s_node')
Expand Down
Loading

0 comments on commit 6f6ce12

Please sign in to comment.