Skip to content

Commit

Permalink
[DPE-5126] Use admin server instead of the 4lw commands (#102)
Browse files Browse the repository at this point in the history
* refactor: Replace nc commands with curl

* refactor: Remove temp variable

* feat: Port VM changes
  • Loading branch information
Batalex authored Sep 13, 2024
1 parent 50813f1 commit 6fd7fd6
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 103 deletions.
1 change: 0 additions & 1 deletion metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,3 @@ storage:
description: Directories where snapshot and transaction data is stored
minimum-size: 10G
location: /var/lib/zookeeper

2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 20 additions & 19 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ jsonschema = ">=4.10"
lightkube = "^0.15.0"
typing-extensions = "^4.9.0"
boto3 = "^1.34.159"
boto3-stubs = {extras = ["s3"], version = "^1.35.8"}
boto3-stubs = { extras = ["s3"], version = "^1.35.8" }
httpx = "^0.27.2"

[tool.poetry.group.fmt]
optional = true
Expand All @@ -69,7 +70,7 @@ optional = true

[tool.poetry.group.unit.dependencies]
pytest = ">=7.2"
coverage = {extras = ["toml"], version = ">7.0"}
coverage = { extras = ["toml"], version = ">7.0" }
jsonschema = ">=4.10"
pytest-mock = "^3.11.1"
ops-scenario = "^6.0.0"
Expand All @@ -82,42 +83,42 @@ pytest = ">=7.2"
# FIXME (libjuju): Unpin once GH#1093 is fixed
# (impossible to parse storage constraints)
juju = "^3.2.0,<3.5.2"
coverage = {extras = ["toml"], version = ">7.0"}
coverage = { extras = ["toml"], version = ">7.0" }
pytest-operator = ">0.20"
requests = ">2.25"
kazoo = ">=2.8"
jsonschema = ">=4.10"
pytest-operator-cache = {git = "https://github.com/canonical/data-platform-workflows", tag = "v20.0.2", subdirectory = "python/pytest_plugins/pytest_operator_cache"}
pytest-operator-cache = { git = "https://github.com/canonical/data-platform-workflows", tag = "v20.0.2", subdirectory = "python/pytest_plugins/pytest_operator_cache" }
# To be enabled if we are using groups on integration tests
# pytest-operator-groups = {git = "https://github.com/canonical/data-platform-workflows", tag = "v6.1.1", subdirectory = "python/pytest_plugins/pytest_operator_groups"}
pytest-microceph = {git = "https://github.com/canonical/data-platform-workflows", tag = "v20.0.2", subdirectory = "python/pytest_plugins/microceph"}
pytest-microceph = { git = "https://github.com/canonical/data-platform-workflows", tag = "v20.0.2", subdirectory = "python/pytest_plugins/microceph" }

[tool.poetry.group.format.dependencies]
pyright = "^1.1.301"

[tool.ruff]
line-length = 99
extend-exclude = ["__pycache__", "*.egg_info", "tests/integration/app-charm/lib"]
target-version="py310"
target-version = "py310"
src = ["src", "tests"]

[tool.ruff.lint]
select = ["E", "W", "F", "C", "N", "D", "I001"]
ignore = ["E501", "D107"]
per-file-ignores = {"tests/*" = ["D100","D101","D102","D103","D104", "E999"], "src/literals.py" = ["D101"]}
per-file-ignores = { "tests/*" = ["D100", "D101", "D102", "D103", "D104", "E999"], "src/literals.py" = ["D101"] }
extend-ignore = [
"D203",
"D204",
"D213",
"D215",
"D400",
"D401",
"D404",
"D406",
"D407",
"D408",
"D409",
"D413",
"D203",
"D204",
"D213",
"D215",
"D400",
"D401",
"D404",
"D406",
"D407",
"D408",
"D409",
"D413",
]
mccabe.max-complexity = 10

Expand Down
1 change: 1 addition & 0 deletions src/literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
CLIENT_PORT = 2181
SECURE_CLIENT_PORT = 2182
SERVER_PORT = 2888
ADMIN_SERVER_PORT = 8080
ELECTION_PORT = 3888
JMX_PORT = 9998
METRICS_PROVIDER_PORT = 7000
Expand Down
55 changes: 17 additions & 38 deletions src/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@

"""Implementation of WorkloadBase for running on K8s."""
import logging
import re
import secrets
import string
from subprocess import CalledProcessError

import httpx
from ops.model import Container
from ops.pebble import ChangeError, ExecError, Layer
from ops.pebble import ChangeError, Layer
from tenacity import retry, retry_if_result, stop_after_attempt, wait_fixed
from typing_extensions import override

from core.workload import WorkloadBase
from literals import CLIENT_PORT
from literals import ADMIN_SERVER_PORT

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -59,7 +58,7 @@ def write(self, content: str, path: str) -> None:

@override
def exec(self, command: list[str], working_dir: str | None = None) -> str:
return str(self.container.exec(command, working_dir=working_dir).wait_output())
return self.container.exec(command, working_dir=working_dir).wait_output()[0]

@property
@override
Expand Down Expand Up @@ -87,27 +86,14 @@ def healthy(self) -> bool:
if not self.alive:
return False

# netcat isn't a default utility, so can't guarantee it's on the charm containers
# this ugly hack avoids needing netcat
bash_netcat = (
f"echo '4lw' | (exec 3<>/dev/tcp/localhost/{CLIENT_PORT}; cat >&3; cat <&3; exec 3<&-)"
)
ruok = [bash_netcat.replace("4lw", "ruok")]
srvr = [bash_netcat.replace("4lw", "srvr")]
try:
response = httpx.get(f"http://localhost:{ADMIN_SERVER_PORT}/commands/ruok", timeout=10)
response.raise_for_status()

# timeout needed as it can sometimes hang forever if there's a problem
# for example when the endpoint is unreachable
timeout = ["timeout", "10s", "bash", "-c"]
except httpx.HTTPError:
return False

try:
ruok_response = self.exec(command=timeout + ruok)
if not ruok_response or "imok" not in ruok_response:
return False

srvr_response = self.exec(command=timeout + srvr)
if not srvr_response or "not currently serving requests" in srvr_response:
return False
except (ExecError, CalledProcessError):
if response.json().get("error", None):
return False

return True
Expand All @@ -134,21 +120,14 @@ def get_version(self) -> str:
if not self.healthy:
return ""

stat = [
"bash",
"-c",
f"echo 'stat' | (exec 3<>/dev/tcp/localhost/{CLIENT_PORT}; cat >&3; cat <&3; exec 3<&-)",
]

try:
stat_response = self.exec(command=stat)
if not stat_response:
return ""

matcher = re.search(r"(?P<version>\d\.\d\.\d)", stat_response)
version = matcher.group("version") if matcher else ""
response = httpx.get(f"http://localhost:{ADMIN_SERVER_PORT}/commands/srvr", timeout=10)
response.raise_for_status()

except (ExecError, CalledProcessError):
except httpx.HTTPError:
return ""

return version
if not (full_version := response.json().get("version", "")):
return full_version
else:
return full_version.split("-")[0]
33 changes: 18 additions & 15 deletions tests/integration/ha/helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
import logging
import os
import re
import string
import subprocess
import tempfile
Expand All @@ -15,6 +14,8 @@
from pytest_operator.plugin import OpsTest
from tenacity import retry, retry_if_not_result, stop_after_attempt, wait_fixed

from literals import ADMIN_SERVER_PORT

logger = logging.getLogger(__name__)

METADATA = yaml.safe_load(Path("./metadata.yaml").read_text())
Expand Down Expand Up @@ -48,28 +49,26 @@ async def wait_idle(ops_test, apps: list[str] = [APP_NAME], units: int = 3) -> N
stop=stop_after_attempt(60),
reraise=True,
)
def srvr(host: str) -> dict:
"""Calls srvr 4lw command to specified host.
def srvr(model_full_name: str, unit: str) -> dict:
"""Calls srvr 4lw command to specified unit.
Args:
host: ZooKeeper address and port to issue srvr 4lw command to
model_full_name: Current test model
unit: ZooKeeper unit to issue srvr 4lw command to
Returns:
Dict of srvr command output key/values
"""
response = subprocess.check_output(
f"echo srvr | nc {host} 2181", stderr=subprocess.PIPE, shell=True, universal_newlines=True
f"JUJU_MODEL={model_full_name} juju ssh {unit} sudo -i 'curl localhost:{ADMIN_SERVER_PORT}/commands/srvr -m 10'",
stderr=subprocess.PIPE,
shell=True,
universal_newlines=True,
)

assert response, "ZooKeeper not running"

result = {}
for item in response.splitlines():
k = re.split(": ", item)[0]
v = re.split(": ", item)[1]
result[k] = v

return result
return json.loads(response)


def get_unit_address_map(ops_test: OpsTest, app_name: str = APP_NAME) -> dict[str, str]:
Expand Down Expand Up @@ -164,13 +163,17 @@ def get_leader_name(ops_test: OpsTest, hosts: str, app_name: str = APP_NAME) ->
String of unit name of the ZooKeeper quorum leader
"""
for host in hosts.split(","):
unit_name = get_unit_name_from_host(ops_test, host, app_name)
try:
mode = srvr(host.split(":")[0])["Mode"]
mode = (
srvr(ops_test.model_full_name, unit_name)
.get("server_stats", {})
.get("server_state", "")
)
except subprocess.CalledProcessError: # unit is down
continue
if mode == "leader":
leader_name = get_unit_name_from_host(ops_test, host, app_name)
return leader_name
return unit_name

return ""

Expand Down
30 changes: 18 additions & 12 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from kazoo.client import KazooClient
from pytest_operator.plugin import OpsTest

from literals import ADMIN_SERVER_PORT

from . import APP_NAME

PEER = "cluster"
Expand Down Expand Up @@ -89,29 +91,32 @@ def check_key(host: str, password: str, username: str = "super") -> None:
raise KeyError


def srvr(host: str) -> Dict:
def srvr(model_full_name: str, unit: str) -> dict:
"""Retrieves attributes returned from the 'srvr' 4lw command.
Specifically for this test, we are interested in the "Mode" of the ZK server,
which allows checking quorum leadership and follower active status.
"""
response = check_output(
f"echo srvr | nc {host} 2181", stderr=PIPE, shell=True, universal_newlines=True
f"JUJU_MODEL={model_full_name} juju ssh {unit} sudo -i 'curl localhost:{ADMIN_SERVER_PORT}/commands/srvr -m 10'",
stderr=PIPE,
shell=True,
universal_newlines=True,
)

result = {}
for item in response.splitlines():
k = re.split(": ", item)[0]
v = re.split(": ", item)[1]
result[k] = v
assert response, "ZooKeeper not running"

return result
return json.loads(response)


async def ping_servers(ops_test: OpsTest) -> bool:
for unit in ops_test.model.applications[APP_NAME].units:
host = unit.public_address
mode = srvr(host)["Mode"]
srvr_response = srvr(ops_test.model_full_name, unit.name)

if srvr_response.get("error", None) is not None:
return False

mode = srvr_response.get("server_stats", {}).get("server_state", "")
if mode not in ["leader", "follower"]:
return False

Expand Down Expand Up @@ -170,8 +175,9 @@ def _get_show_unit_json(model_full_name: str, unit: str) -> Dict:

async def correct_version_running(ops_test: OpsTest, expected_version: str) -> bool:
for unit in ops_test.model.applications[APP_NAME].units:
host = unit.public_address
if expected_version not in srvr(host)["Zookeeper version"]:
srvr_response = srvr(ops_test.model_full_name, unit.name)

if expected_version not in srvr_response.get("version", ""):
return False

return True
Expand Down
33 changes: 16 additions & 17 deletions tests/unit/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pathlib import Path
from unittest.mock import Mock, PropertyMock, patch

import httpx
import pytest
import yaml
from charms.rolling_ops.v0.rollingops import WaitingStatus
Expand Down Expand Up @@ -1084,36 +1085,34 @@ def test_update_relation_data(harness):

@pytest.mark.nopatched_version
def test_workload_version_is_setted(harness, monkeypatch):
output_install = (
"Zookeeper version: 3.8.1-ubuntu0-${mvngit.commit.id}, built on 2023-11-21 15:33 UTC"
)
output_changed = (
"Zookeeper version: 3.8.2-ubuntu0-${mvngit.commit.id}, built on 2023-11-21 15:33 UTC"
)
output_install = {
"version": "3.8.1-ubuntu0-${mvngit.commit.id}, built on 2023-11-21 15:33 UTC"
}
output_changed = {
"version": "3.8.2-ubuntu0-${mvngit.commit.id}, built on 2023-11-21 15:33 UTC"
}
response_mock = Mock()
response_mock.return_value.json.side_effect = [output_install, output_changed]
monkeypatch.setattr(
harness.charm.workload,
"exec",
Mock(side_effect=[output_install, output_changed]),
httpx,
"get",
response_mock,
)
monkeypatch.setattr(harness.charm.workload, "install", Mock(return_value=True))
monkeypatch.setattr(harness.charm.workload, "healthy", Mock(return_value=True))

harness.add_relation(PEER, CHARM_KEY)
harness.charm.on.install.emit()
assert harness.get_workload_version() == "3.8.1"

with (
patch("charm.ZooKeeperCharm.init_server"),
patch("charm.ZooKeeperCharm.update_quorum"),
patch("managers.config.ConfigManager.config_changed"),
patch("core.cluster.ClusterState.all_units_related"),
patch("core.cluster.ClusterState.all_units_declaring_ip"),
patch("core.cluster.ClusterState.unit_server"),
patch("core.cluster.ClusterState.cluster"),
patch(
"core.cluster.ClusterState.peer_relation",
new_callable=PropertyMock,
),
patch("events.upgrade.ZKUpgradeEvents.idle", return_value=True),
):
harness.charm.on.install.emit()
assert harness.get_workload_version() == "3.8.1"
harness.charm.on.config_changed.emit()

assert harness.get_workload_version() == "3.8.2"

0 comments on commit 6fd7fd6

Please sign in to comment.