Skip to content

Commit

Permalink
feature(k8s-serverless): scylla-bench cloud bundle support
Browse files Browse the repository at this point in the history
* switch to newer scylla-bench with (scylladb/scylla-bench#108)
* add suport in `scylla_bench_thread.py`
* add unit test (that can only work if you have a bundle available,
  for now)
  • Loading branch information
fruch committed Nov 30, 2022
1 parent 65901b4 commit 49564b1
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 22 deletions.
2 changes: 1 addition & 1 deletion defaults/test_default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ stress_image:
ycsb: 'scylladb/hydra-loaders:ycsb-jdk8-20220918'
nosqlbench: 'scylladb/hydra-loaders:nosqlbench-4.15.49'
cassandra-stress: '' # default would be same version as scylla under test
scylla-bench: 'scylladb/hydra-loaders:scylla-bench-v0.1.12'
scylla-bench: 'scylladb/hydra-loaders:scylla-bench-v0.1.14'
gemini: 'scylladb/hydra-loaders:gemini-1.7.7'
alternator-dns: 'scylladb/hydra-loaders:alternator-dns-0.1'
cdc-stresser: 'scylladb/hydra-loaders:cdc-stresser-20210630'
Expand Down
2 changes: 1 addition & 1 deletion docker/scylla-bench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ docker push ${SCYLLA_BENCH_DOCKER_IMAGE}
```
export SCYLLA_BENCH_BRANCH=heads/some_fixes
export SCYLLA_BENCH_FORK=fruch/scylla-bench
export NAME=`echo $SCYLLA_BENCH_VERSION | cut -d "/" -f 2`
export NAME=`echo $SCYLLA_BENCH_BRANCH | cut -d "/" -f 2`
export SCYLLA_BENCH_DOCKER_IMAGE=scylladb/hydra-loaders:scylla-bench-${NAME}
docker build . -t ${SCYLLA_BENCH_DOCKER_IMAGE} --build-arg version=${SCYLLA_BENCH_BRANCH} --build-arg fork=${SCYLLA_BENCH_FORK}
docker push ${SCYLLA_BENCH_DOCKER_IMAGE}
Expand Down
21 changes: 21 additions & 0 deletions functional_tests/scylla_operator/test_functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,3 +828,24 @@ def test_cloud_bundle_connectivity_cassandra_stress(tester):

assert "latency 99th percentile" in output[0]
assert float(output[0]["latency 99th percentile"]) > 0


@pytest.mark.required_operator("v1.8.0")
@pytest.mark.requires_tls
def test_cloud_bundle_connectivity_scylla_bench(tester):

assert tester.db_cluster.connection_bundle_file, "cloud bundle wasn't found"

cmd = (
"scylla-bench -workload=sequential -mode=write -replication-factor=1 -partition-count=10 "
"-clustering-row-count=5555 -clustering-row-size=uniform:10..20 -concurrency=10 "
"-connection-count=10 -consistency-level=one -rows-per-request=10 -timeout=60s -duration=1m"
)

stress_obj = tester.run_stress_thread(cmd, stop_test_on_failure=False)
summaries, errors = stress_obj.verify_results()
assert not errors
assert summaries[0]["Clustering row size"] == "Uniform(min=10, max=20)"

# TODO: add verification that the output say it's using the cloud bundle
# (need to add that to log output in scylla-bench)
9 changes: 9 additions & 0 deletions sdcm/cluster_k8s/mini_k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,15 @@ def start_k8s_software(self) -> None:
"""
for _ in range(self.params.get("n_db_nodes")):
script_start_part += scylla_node_definition

loader_node_definition = f"""
- role: worker
labels:
{POOL_LABEL_NAME}: {self.LOADER_POOL_NAME}
"""
for _ in range(self.params.get("n_loaders")):
script_start_part += loader_node_definition

script_end_part = """
EndOfSpec
/var/tmp/kind delete cluster || true
Expand Down
23 changes: 17 additions & 6 deletions sdcm/scylla_bench_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,16 @@ def verify_results(self):

return sb_summary, errors

def create_stress_cmd(self, stress_cmd):
if self.connection_bundle_file:
stress_cmd = f'{stress_cmd.strip()} -cloud-config-path={self.target_connection_bundle_file}'
else:
# Select first seed node to send the scylla-bench cmds
ips = ",".join([n.cql_ip_address for n in self.node_list])
stress_cmd = f'{stress_cmd.strip()} -nodes {ips}'

return stress_cmd

def _run_stress(self, loader, loader_idx, cpu_idx): # pylint: disable=too-many-locals
cmd_runner = None
if "k8s" in self.params.get("cluster_backend") and self.params.get(
Expand All @@ -159,10 +169,13 @@ def _run_stress(self, loader, loader_idx, cpu_idx): # pylint: disable=too-many-
cpu_options = f'--cpuset-cpus="{cpu_idx}"'
cmd_runner = cleanup_context = RemoteDocker(
loader, self.params.get('stress_image.scylla-bench'),
extra_docker_opts=(
f'{cpu_options} --label shell_marker={self.shell_marker} --network=host'))
extra_docker_opts=f'{cpu_options} --label shell_marker={self.shell_marker} --network=host',
)
cmd_runner_name = loader.ip_address

if self.connection_bundle_file:
cmd_runner.send_files(str(self.connection_bundle_file), self.target_connection_bundle_file)

if self.sb_mode == ScyllaBenchModes.WRITE and self.sb_workload == ScyllaBenchWorkloads.TIMESERIES:
loader.parent_cluster.sb_write_timeseries_ts = write_timestamp = time.time_ns()
LOGGER.debug("Set start-time: %s", write_timestamp)
Expand All @@ -186,9 +199,7 @@ def _run_stress(self, loader, loader_idx, cpu_idx): # pylint: disable=too-many-
os.makedirs(loader.logdir, exist_ok=True)

log_file_name = os.path.join(loader.logdir, f'scylla-bench-l{loader_idx}-{uuid.uuid4()}.log')
# Select first seed node to send the scylla-bench cmds
ips = ",".join([n.cql_ip_address for n in self.node_list])

stress_cmd = self.create_stress_cmd(stress_cmd)
with ScyllaBenchStressExporter(instance_name=cmd_runner_name,
metrics=nemesis_metrics_obj(),
stress_operation=self.sb_mode,
Expand All @@ -202,7 +213,7 @@ def _run_stress(self, loader, loader_idx, cpu_idx): # pylint: disable=too-many-
result = None
try:
result = cmd_runner.run(
cmd="{name} -nodes {ips}".format(name=stress_cmd.strip(), ips=ips),
cmd=stress_cmd,
timeout=self.timeout,
log_file=log_file_name,
retry=0,
Expand Down
9 changes: 9 additions & 0 deletions sdcm/stress/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import logging
import random
import concurrent.futures
from pathlib import Path

from sdcm.cluster import BaseLoaderSet
from sdcm.utils.common import generate_random_string
Expand Down Expand Up @@ -116,6 +117,14 @@ def db_node_to_query(self, loader):
return node_to_query.cql_ip_address
return self.node_list[0].cql_ip_address

@property
def connection_bundle_file(self) -> Path:
return self.node_list[0].parent_cluster.connection_bundle_file

@property
def target_connection_bundle_file(self) -> str:
return str(Path('/tmp/') / self.connection_bundle_file.name)


def format_stress_cmd_error(exc: Exception) -> str:
"""Format nicely the exception from a stress command failure."""
Expand Down
4 changes: 2 additions & 2 deletions sdcm/stress_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,9 @@ def _run_cs_stress(self, loader, loader_idx, cpu_idx, keyspace_idx): # pylint:
str(Path('/etc/scylla/ssl_conf/client') / ssl_file.name),
verbose=True)

if connection_bundle_file := self.node_list[0].parent_cluster.connection_bundle_file:
if connection_bundle_file := self.connection_bundle_file:
cmd_runner.send_files(str(connection_bundle_file),
str(Path('/tmp') / connection_bundle_file.name), delete_dst=True, verbose=True)
self.target_connection_bundle_file, delete_dst=True, verbose=True)

# Get next word after `cassandra-stress' in stress_cmd.
# Do it this way because stress_cmd can contain env variables before `cassandra-stress'.
Expand Down
28 changes: 20 additions & 8 deletions sdcm/tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -1255,11 +1255,6 @@ def get_cluster_baremetal(self):
def get_cluster_k8s_local_kind_cluster(self):
self.credentials.append(UserRemoteCredentials(key_file=self.params.get('user_credentials_path')))

container_node_params = dict(
docker_image=self.params.get('docker_image'),
docker_image_tag=self.params.get('scylla_version'),
node_key_file=self.credentials[0].key_file,
)
self.k8s_cluster = mini_k8s.LocalKindCluster(
software_version=self.params.get("mini_k8s_version"),
user_prefix=self.params.get("user_prefix"),
Expand All @@ -1270,6 +1265,16 @@ def get_cluster_k8s_local_kind_cluster(self):
self.k8s_cluster.set_nodeselector_for_deployments(
pool_name=self.k8s_cluster.AUXILIARY_POOL_NAME, namespace=namespace)

loader_pool = None
if self.params.get("n_loaders"):
loader_pool = mini_k8s.MinimalK8SNodePool(
k8s_cluster=self.k8s_cluster,
name=self.k8s_cluster.LOADER_POOL_NAME,
num_nodes=self.params.get("n_loaders"),
image_type="fake-image-type",
instance_type="fake-instance-type")
self.k8s_cluster.deploy_node_pool(loader_pool, wait_till_ready=False)

scylla_pool = mini_k8s.MinimalK8SNodePool(
k8s_cluster=self.k8s_cluster,
name=self.k8s_cluster.SCYLLA_POOL_NAME,
Expand All @@ -1295,20 +1300,27 @@ def get_cluster_k8s_local_kind_cluster(self):
n_nodes=self.params.get("k8s_n_scylla_pods_per_cluster") or self.params.get("n_db_nodes"),
params=self.params,
node_pool=scylla_pool,
add_nodes=False,
)

if self.params.get('k8s_deploy_monitoring'):
self.k8s_cluster.deploy_monitoring_cluster(
is_manager_deployed=self.params.get('use_mgmt')
)

if self.params.get("n_loaders"):
self.loaders = cluster_docker.LoaderSetDocker(
**container_node_params,
n_nodes=self.params.get("n_loaders"),
self.loaders = cluster_k8s.LoaderPodCluster(
k8s_cluster=self.k8s_cluster,
loader_cluster_name=self.params.get("k8s_loader_cluster_name"),
user_prefix=self.params.get("user_prefix"),
n_nodes=self.params.get("k8s_n_loader_pods_per_cluster") or self.params.get("n_loaders"),
params=self.params,
node_pool=loader_pool,
add_nodes=False,
)

self._add_and_wait_for_cluster_nodes_in_parallel([self.db_cluster, self.loaders])

if self.params.get("n_monitor_nodes") > 0:
self.monitors = cluster_docker.MonitorSetDocker(
n_nodes=self.params.get("n_monitor_nodes"),
Expand Down
3 changes: 3 additions & 0 deletions test-cases/scylla-operator/functional.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ n_db_nodes: 4
k8s_n_scylla_pods_per_cluster: 3

n_loaders: 1
k8s_loader_run_type: 'dynamic'
k8s_loader_cluster_name: 'sct-loaders'

n_monitor_nodes: 0

use_mgmt: true
Expand Down
16 changes: 12 additions & 4 deletions unit_tests/test_scylla_bench_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,20 @@


@pytest.mark.integration
@pytest.mark.parametrize("extra_cmd", argvalues=[
pytest.param('', id="regular"),
pytest.param('-tls', id="tls", marks=[pytest.mark.docker_scylla_args(ssl=True)])])
@pytest.mark.parametrize(
"extra_cmd",
argvalues=[
pytest.param("", id="regular"),
pytest.param("-tls", id="tls", marks=[pytest.mark.docker_scylla_args(ssl=True)]),
pytest.param("cloud-config", id="sni_proxy", marks=pytest.mark.skip(reason="manual test only")),
],
)
def test_01_scylla_bench(request, docker_scylla, params, extra_cmd):
loader_set = LocalLoaderSetDummy()

if extra_cmd == "cloud-config":
params["k8s_connection_bundle_file"] = "/home/fruch/Downloads/k8s_config.yaml"
docker_scylla.parent_cluster.params = params
extra_cmd = ""
cmd = (
f"scylla-bench -workload=sequential {extra_cmd} -mode=write -replication-factor=1 -partition-count=10 "
"-clustering-row-count=5555 -clustering-row-size=uniform:10..20 -concurrency=10 "
Expand Down

0 comments on commit 49564b1

Please sign in to comment.