Skip to content

Commit

Permalink
Adds SSL config switch test to ceph-dashboard
Browse files Browse the repository at this point in the history
Signed-off-by: Utkarsh Bhatt <[email protected]>
  • Loading branch information
UtkarshBhatthere committed Aug 14, 2023
1 parent 931a00c commit c0b2a2c
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 21 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ async_generator
# https://github.com/pyca/pyopenssl/commit/a145fc3bc6d2e943434beb2f04bbf9b18930296f
pyopenssl<22.1.0

trustme
boto3<1.25
PyYAML<=4.2,>=3.0; python_version < '3.9'
PyYAML>=5.1; python_version >= '3.9'
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
'futurist<2.0.0',
'async_generator',
'boto3',
'trustme',

# pyopenssl depends on a newer version of cryptography since 22.1.0
# TypeError: deprecated() got an unexpected keyword argument 'name'
Expand Down
166 changes: 145 additions & 21 deletions zaza/openstack/charm_tests/ceph/dashboard/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@

"""Encapsulating `ceph-dashboard` testing."""

import collections
import json
import uuid
import logging
import collections
from base64 import b64encode
import requests
import tenacity
import uuid
import trustme

import zaza
import zaza.openstack.charm_tests.test_utils as test_utils
Expand Down Expand Up @@ -122,7 +124,8 @@ def _run_request_get(self, url, verify, allow_redirects):
return requests.get(
url,
verify=verify,
allow_redirects=allow_redirects)
allow_redirects=allow_redirects,
timeout=120)

@tenacity.retry(wait=tenacity.wait_exponential(multiplier=1,
min=5, max=10),
Expand All @@ -149,7 +152,8 @@ def _run_request_post(self, url, verify, data, headers):
url,
data=data,
headers=headers,
verify=verify)
verify=verify,
timeout=120)

@tenacity.retry(wait=tenacity.wait_fixed(2), reraise=True,
stop=tenacity.stop_after_attempt(90))
Expand All @@ -170,20 +174,9 @@ def get_master_dashboard_url(self):
raise tenacity.RetryError(None)
return url.strip('/')

def test_dashboard_units(self):
def test_001_dashboard_units(self):
"""Check dashboard units are configured correctly."""
verify = self.local_ca_cert
units = zaza.model.get_units('ceph-mon')
rcs = collections.defaultdict(list)
for unit in units:
r = self._run_request_get(
'https://{}:8443'.format(
zaza.model.get_unit_public_address(unit)),
verify=verify,
allow_redirects=False)
rcs[r.status_code].append(zaza.model.get_unit_public_address(unit))
self.assertEqual(len(rcs[requests.codes.ok]), 1)
self.assertEqual(len(rcs[requests.codes.see_other]), len(units) - 1)
self.verify_ssl_config(self.local_ca_cert)

def create_user(self, username, role='administrator'):
"""Create a dashboard user.
Expand Down Expand Up @@ -211,7 +204,7 @@ def get_random_username(self):
"""
return "zazauser-{}".format(uuid.uuid1())

def test_create_user(self):
def test_002_create_user(self):
"""Test create user action."""
test_user = self.get_random_username()
action = self.create_user(test_user)
Expand Down Expand Up @@ -244,11 +237,11 @@ def access_dashboard(self, dashboard_url):
headers=headers)
self.assertEqual(r.status_code, requests.codes.created)

def test_access_dashboard(self):
def test_003_access_dashboard(self):
"""Test logging in to the dashboard."""
self.access_dashboard(self.get_master_dashboard_url())

def test_ceph_keys(self):
def test_004_ceph_keys(self):
"""Check that ceph services are properly registered."""
status = zaza.model.get_status()
applications = status.applications.keys()
Expand Down Expand Up @@ -282,7 +275,7 @@ def wait_for_saml_dashboard(self):
return
raise tenacity.RetryError(None)

def test_saml(self):
def test_005_saml(self):
"""Check that the dashboard is accessible with SAML enabled."""
url = self.get_master_dashboard_url()
idp_meta = SAML_IDP_METADATA.format(
Expand All @@ -309,3 +302,134 @@ def test_saml(self):
verify=self.local_ca_cert,
allow_redirects=False)
self.assertEqual(resp.status_code, requests.codes.ok)

def is_app_deployed(self, app_name) -> bool:
"""Check if the provided app is deployed in the zaza model."""
try:
zaza.model.get_application(app_name)
return True
except KeyError:
return False

def _get_wait_for_dashboard_assert_state(
self, state, message_prefix) -> dict:
"""Generate a assert state for ceph-dashboard charm blocked state."""
assert_state = {
'ceph-dashboard': {
"workload-status": state,
"workload-status-message-prefix": message_prefix
}
}
# Telegraf has a non-standard active state message.
if self.is_app_deployed('telegraf'):
assert_state['telegraf'] = {
"workload-status": "active",
"workload-status-message-prefix": "Monitoring ceph"
}

return assert_state

def verify_ssl_config(self, ca_file):
"""Check if request validates the configured SSL cert."""
rcs = collections.defaultdict(list)
units = zaza.model.get_units('ceph-mon')
for unit in units:
req = self._run_request_get(
'https://{}:8443'.format(
zaza.model.get_unit_public_address(unit)),
verify=ca_file,
allow_redirects=False)
rcs[req.status_code].append(
zaza.model.get_unit_public_address(unit)
)
self.assertEqual(len(rcs[requests.codes.ok]), 1)
self.assertEqual(len(rcs[requests.codes.see_other]), len(units) - 1)

def _get_dashboard_hostnames_sans(self):
"""Get a generator for Dashboard unit public addresses."""
yield 'ceph-dashboard' # Include hostname in san as well.
# Since Ceph-Dashboard is a subordinate application,
# we use the principle application to get public addresses.
for unit in zaza.model.get_units('ceph-mon'):
addr = zaza.model.get_unit_public_address(unit)
if addr:
yield addr

def test_006_charm_config_ssl(self):
"""Config charm SSL certs to test the Ceph dashboard application."""
# Use RSA keys not ECSDA
local_ca = trustme.CA(key_type=trustme.KeyType.RSA)
server_cert = local_ca.issue_cert(
*self._get_dashboard_hostnames_sans(),
key_type=trustme.KeyType.RSA
)

ssl_cert = b64encode(server_cert.cert_chain_pems[0].bytes()).decode()
ssl_key = b64encode(server_cert.private_key_pem.bytes()).decode()
ssl_ca = b64encode(local_ca.cert_pem.bytes()).decode()

# Configure local certs in charm config
zaza.model.set_application_config(
'ceph-dashboard',
{
'ssl_cert': ssl_cert, 'ssl_key': ssl_key,
'ssl_ca': ssl_ca
}
)

# Check application status message.
assert_state = self._get_wait_for_dashboard_assert_state(
"blocked", "Conflict: Active SSL from 'certificates' relation"
)
zaza.model.wait_for_application_states(
states=assert_state, timeout=500
)

# Remove certificates relation to trigger configured certs.
zaza.model.remove_relation(
'ceph-dashboard', 'ceph-dashboard:certificates',
'vault:certificates'
)

# Wait for status to clear
assert_state = self._get_wait_for_dashboard_assert_state(
"active", "Unit is ready"
)
zaza.model.wait_for_application_states(
states=assert_state, timeout=500
)

# Verify Certificates.
with local_ca.cert_pem.tempfile() as ca_temp_file:
self.verify_ssl_config(ca_temp_file)

# Re-add certificates relation
zaza.model.add_relation(
'ceph-dashboard', 'ceph-dashboard:certificates',
'vault:certificates'
)

# Check blocked status message
assert_state = self._get_wait_for_dashboard_assert_state(
"blocked", "Conflict: Active SSL from Charm config"
)
zaza.model.wait_for_application_states(
states=assert_state, timeout=500
)

# Remove SSL config
zaza.model.set_application_config(
'ceph-dashboard',
{'ssl_cert': "", 'ssl_key': "", 'ssl_ca': ""}
)

# Wait for status to clear
assert_state = self._get_wait_for_dashboard_assert_state(
"active", "Unit is ready"
)
zaza.model.wait_for_application_states(
states=assert_state, timeout=500
)

# Verify Relation SSL certs.
self.verify_ssl_config(self.local_ca_cert)

0 comments on commit c0b2a2c

Please sign in to comment.