Skip to content

Commit

Permalink
wait each node ready before joining another
Browse files Browse the repository at this point in the history
refactor wait_until_k8s_ready
  • Loading branch information
eaudetcobello committed Jan 29, 2025
1 parent e086756 commit 27a2f54
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 15 deletions.
8 changes: 7 additions & 1 deletion tests/integration/tests/test_annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ def test_no_remove(instances: List[harness.Instance]):
join_token_2 = util.get_join_token(cluster_node, joining_cp_2)
join_token_worker = util.get_join_token(cluster_node, joining_worker, "--worker")
util.join_cluster(joining_cp, join_token)
util.wait_until_k8s_ready(cluster_node, [joining_cp])
util.join_cluster(joining_cp_2, join_token_2)
util.wait_until_k8s_ready(cluster_node, [joining_cp, joining_cp_2])
util.join_cluster(joining_worker, join_token_worker)

util.wait_until_k8s_ready(cluster_node, instances)

nodes = util.ready_nodes(cluster_node)
assert len(nodes) == 4, "nodes should have joined cluster"

Expand Down Expand Up @@ -71,9 +73,13 @@ def test_skip_services_stop_on_remove(instances: List[harness.Instance]):
join_token = util.get_join_token(cluster_node, joining_cp)
util.join_cluster(joining_cp, join_token)

util.wait_until_k8s_ready(cluster_node, [joining_cp])

join_token_2 = util.get_join_token(cluster_node, joining_cp_2)
util.join_cluster(joining_cp_2, join_token_2)

util.wait_until_k8s_ready(cluster_node, [joining_cp, joining_cp_2])

join_token_worker = util.get_join_token(cluster_node, worker, "--worker")
util.join_cluster(worker, join_token_worker)

Expand Down
39 changes: 25 additions & 14 deletions tests/integration/tests/test_util/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def wait_until_k8s_ready(
instances: List[harness.Instance],
retries: int = config.DEFAULT_WAIT_RETRIES,
delay_s: int = config.DEFAULT_WAIT_DELAY_S,
node_names: Mapping[str, str] = {},
node_names: Optional[dict[str, str]] = None,
):
"""
Validates that the K8s node is in Ready state.
Expand All @@ -261,21 +261,32 @@ def wait_until_k8s_ready(
If the instance name is different from the hostname, the instance name should be passed to the
node_names dictionary, e.g. {"instance_id": "node_name"}.
"""
if node_names is None:
node_names = {}

instance_id_node_name_map = {}
for instance in instances:
node_name = node_names.get(instance.id)
if node_name is None:
node_name = hostname(instance)

result = (
stubbornly(retries=retries, delay_s=delay_s)
.on(control_node)
.until(lambda p: " Ready" in p.stdout.decode())
.exec(["k8s", "kubectl", "get", "node", node_name, "--no-headers"])
)
LOG.info(f"Kubelet registered successfully on instance '{instance.id}'")
LOG.info("%s", result.stdout.decode())
instance_id_node_name_map[instance.id] = node_name
node_name = node_names.get(instance.id, hostname(instance))
LOG.info(f"Checking if Kubelet is ready on '{instance.id}' (node: {node_name})")

try:
result = (
stubbornly(retries=retries, delay_s=delay_s)
.on(control_node)
.until(lambda p: " Ready" in p.stdout.decode())
.exec(["k8s", "kubectl", "get", "node", node_name, "--no-headers"])
)
instance_id_node_name_map[instance.id] = node_name
LOG.info(
f"Kubelet registered successfully on instance '{instance.id}' (node: {node_name})"
)
LOG.info("Command output: %s", result.stdout.decode())

except Exception as e:
LOG.error(
f"Failed to verify readiness of instance '{instance.id}' (node: {node_name}): {e}"
)
raise
LOG.info(
"Successfully checked Kubelet registered on all harness instances: "
f"{instance_id_node_name_map}"
Expand Down

0 comments on commit 27a2f54

Please sign in to comment.