Skip to content

Commit

Permalink
Update cluster state management (#227)
Browse files Browse the repository at this point in the history
* Fix function return types and dictionary key checks
* Update pre-commit Github actions
* Update cluster state management, i.e. present, absent, started, restarted, and stopped

Signed-off-by: Webster Mudge <[email protected]>
  • Loading branch information
wmudge authored May 17, 2024
1 parent 232995e commit a434cce
Show file tree
Hide file tree
Showing 6 changed files with 399 additions and 193 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/pre-commit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

name: pre-commit
name: Execute Precommit Linting and Checks

on:
pull_request:
Expand All @@ -23,6 +23,6 @@ jobs:
pre-commit:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v3
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
- uses: pre-commit/[email protected]
35 changes: 13 additions & 22 deletions plugins/modules/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
}

DOCUMENTATION = r"""
---
module: cluster
short_description: Manage the lifecycle and state of a cluster
description:
Expand All @@ -77,7 +76,6 @@
"""

EXAMPLES = r"""
---
- name: Create an ECS cluster
cloudera.cluster.cluster:
host: example.cloudera.com
Expand Down Expand Up @@ -119,7 +117,6 @@
"""

RETURN = r"""
---
cloudera_manager:
description: Details about Cloudera Manager Cluster
type: dict
Expand Down Expand Up @@ -276,21 +273,18 @@ def process(self):

elif self.state == "absent":
# Delete cluster

refresh = False

# TODO Check for status when deleting
# if existing and existing.entity_status == "":
# self.wait_for_active_cmd(cluster_api, self.cluster_name)
# elif existing:
if existing:
self.changed = True
if not self.module.check_mode:
self.cluster_api.delete_cluster(cluster_name=self.name)
self.wait_for_active_cmd(self.name)
if existing.entity_status != "STOPPED":
stop = self.cluster_api.stop_command(cluster_name=self.name)
self.wait_command(stop, polling=self.timeout, delay=self.delay)

delete = self.cluster_api.delete_cluster(cluster_name=self.name)
self.wait_command(delete, polling=self.timeout, delay=self.delay)

elif self.state == "started":
# TODO NONE seems to be fresh cluster, never run before
# Already started
if existing and existing.entity_status == "GOOD_HEALTH":
refresh = False
Expand All @@ -312,11 +306,11 @@ def process(self):
# If newly created or created by not yet initialize
if not existing or existing.entity_status == "NONE":
first_run = self.cluster_api.first_run(cluster_name=self.name)
self.wait_for_composite_cmd(first_run.id)
self.wait_command(first_run)
# Start the existing and previously initialized cluster
else:
start = self.cluster_api.start_command(cluster_name=self.name)
self.wait_for_composite_cmd(start.id)
self.wait_command(start, polling=self.timeout, delay=self.delay)

if self.state == "stopped":
# Already stopped
Expand All @@ -339,7 +333,7 @@ def process(self):
self.changed = True
if not self.module.check_mode:
stop = self.cluster_api.stop_command(cluster_name=self.name)
self.wait_for_composite_cmd(stop.id)
self.wait_command(stop, polling=self.timeout, delay=self.delay)

if self.state == "restarted":
# Start underway
Expand All @@ -357,7 +351,7 @@ def process(self):
self.changed = True
if not self.module.check_mode:
restart = self.cluster_api.restart_command(cluster_name=self.name)
self.wait_for_composite_cmd(restart.id)
self.wait_command(restart, polling=self.timeout, delay=self.delay)

if refresh:
# Retrieve the updated cluster details
Expand Down Expand Up @@ -547,7 +541,6 @@ def create_cluster_from_parameters(self):
timeout=self.timeout,
)
parcel.activate()

# Apply host templates
for ht, refs in template_map.items():
self.host_template_api.apply_host_template(
Expand Down Expand Up @@ -674,10 +667,10 @@ def create_cluster_from_parameters(self):
if self.auto_assign:
self.cluster_api.auto_assign_roles(cluster_name=self.name)

def marshal_service(self, options: str) -> ApiService:
def marshal_service(self, options: dict) -> ApiService:
service = ApiService(name=options["name"], type=options["type"])

if "display_name" in options:
if options["display_name"]:
service.display_name = options["display_name"]

# Service-wide configuration
Expand Down Expand Up @@ -741,9 +734,7 @@ def marshal_hostrefs(self, hosts: dict) -> list[ApiHostRef]:
)
return results

def find_base_role_group_name(
self, service_type: str, role_type: str
) -> ApiRoleConfigGroup:
def find_base_role_group_name(self, service_type: str, role_type: str) -> str:
rcgs = [
rcg
for s in self.service_api.read_services(cluster_name=self.name).items
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,97 +365,6 @@ def test_present_base_host_role_overrides(conn, module_args):
LOG.info(str(e.value.cloudera_manager))


def test_present_basic_cluster(conn, module_args):
args = """
name: Basic_Cluster
cluster_version: "7.1.9-1.cdh7.1.9.p0.44702451"
type: BASE_CLUSTER
state: present
services:
- name: core-settings-0
type: CORE_SETTINGS
display_name: CORE_SETTINGS_TEST
- name: zookeeper-0
type: ZOOKEEPER
display_name: ZK_TEST
config:
zookeeper_datadir_autocreate: yes
- name: hdfs-0
type: HDFS
display_name: HDFS_TEST
config:
zookeeper_service: zookeeper-0
core_connector: core-settings-0
role_groups:
- type: DATANODE
config:
dfs_data_dir_list: /dfs/dn
- type: NAMENODE
config:
dfs_name_dir_list: /dfs/nn
- type: SECONDARYNAMENODE
config:
fs_checkpoint_dir_list: /dfs/snn
- name: yarn-0
type: YARN
display_name: YARN_TEST
config:
hdfs_service: hdfs-0
zookeeper_service: zookeeper-0
role_groups:
- type: RESOURCEMANAGER
config:
yarn_scheduler_maximum_allocation_mb: 4096
yarn_scheduler_maximum_allocation_vcores: 4
- type: NODEMANAGER
config:
yarn_nodemanager_resource_memory_mb: 4096
yarn_nodemanager_resource_cpu_vcores: 4
yarn_nodemanager_local_dirs: /tmp/nm
yarn_nodemanager_log_dirs: /var/log/nm
- type: GATEWAY
config:
mapred_submit_replication: 3
mapred_reduce_tasks: 6
host_templates:
- name: Master1
role_groups:
- service: HDFS
type: NAMENODE
- service: HDFS
type: SECONDARYNAMENODE
- service: YARN
type: RESOURCEMANAGER
- service: YARN
type: JOBHISTORY
- name: Worker
role_groups:
- service: HDFS
type: DATANODE
- service: YARN
type: NODEMANAGER
- service: ZOOKEEPER
type: SERVER
parcels:
CDH: "7.1.9-1.cdh7.1.9.p0.44702451"
hosts:
- name: test10-worker-free-01.cldr.internal
host_template: Master1
- name: test10-worker-free-02.cldr.internal
host_template: Worker
- name: test10-worker-free-03.cldr.internal
host_template: Worker
"""
conn.update(yaml.safe_load(args))
module_args(conn)

with pytest.raises(AnsibleExitJson) as e:
cluster.main()

LOG.info(str(e.value.cloudera_manager))


@pytest.mark.skip(reason="Not yet implemented")
def test_started_base(conn, module_args):
conn.update(
name="PVC-Base",
Expand All @@ -470,7 +379,6 @@ def test_started_base(conn, module_args):
LOG.info(str(e.value.cloudera_manager))


@pytest.mark.skip(reason="Not yet implemented")
def test_restarted_base(conn, module_args):
conn.update(
name="PVC-Base",
Expand All @@ -485,24 +393,20 @@ def test_restarted_base(conn, module_args):
LOG.info(str(e.value.cloudera_manager))


@pytest.mark.skip(reason="Not yet implemented")
def test_stopped_base(conn, module_args):
conn.update(
name="PVC-Base",
cluster_version="7.1.9", # "1.5.1-b626.p0.42068229",
# type="COMPUTE_CLUSTER",
state="stopped",
)
module_args(conn)

with pytest.raises(AnsibleExitJson) as e:
cluster.main()

# LOG.info(str(e.value))
LOG.info(str(e.value.cloudera_manager))


@pytest.mark.skip(reason="Not yet implemented")
def test_absent_base(conn, module_args):
conn.update(
name="Example_Base",
Expand All @@ -516,78 +420,6 @@ def test_absent_base(conn, module_args):
LOG.info(str(e.value.cloudera_manager))


def test_present_compute_minimum(conn, module_args):
conn.update(
name="Example_Compute",
cluster_version="7.1.9",
contexts=["SDX"],
state="present",
)
module_args(conn)

with pytest.raises(AnsibleExitJson) as e:
cluster.main()

LOG.info(str(e.value.cloudera_manager))


@pytest.mark.skip(reason="Not yet implemented")
def test_started_compute_minimum(conn, module_args):
conn.update(
name="Example_Compute",
cluster_version="7.1.9",
contexts=["SDX"],
state="started",
)
module_args(conn)

with pytest.raises(AnsibleExitJson) as e:
cluster.main()

LOG.info(str(e.value.cloudera_manager))


def test_absent_compute(conn, module_args):
conn.update(
name="Example_Compute",
state="absent",
)
module_args(conn)

with pytest.raises(AnsibleExitJson) as e:
cluster.main()

LOG.info(str(e.value.cloudera_manager))


def test_present_experience_minimum(conn, module_args):
conn.update(
name="Example_Experience",
cluster_version="1.5.3",
type="EXPERIENCE_CLUSTER",
state="present",
)
module_args(conn)

with pytest.raises(AnsibleExitJson) as e:
cluster.main()

LOG.info(str(e.value.cloudera_manager))


def test_absent_experience(conn, module_args):
conn.update(
name="Example_Experience",
state="absent",
)
module_args(conn)

with pytest.raises(AnsibleExitJson) as e:
cluster.main()

LOG.info(str(e.value.cloudera_manager))


def test_pytest_cluster_with_template(module_args):
module_args(
{
Expand Down
Loading

0 comments on commit a434cce

Please sign in to comment.