diff --git a/meta/runtime.yml b/meta/runtime.yml index b7056143029..c5a461aebeb 100644 --- a/meta/runtime.yml +++ b/meta/runtime.yml @@ -71,6 +71,8 @@ action_groups: - aws_inspector_target - aws_kms - aws_kms_info + - aws_msk_cluster + - aws_msk_config - aws_region_info - aws_s3_bucket_info - aws_s3_cors diff --git a/plugins/modules/aws_msk_cluster.py b/plugins/modules/aws_msk_cluster.py new file mode 100644 index 00000000000..7f85c00a59b --- /dev/null +++ b/plugins/modules/aws_msk_cluster.py @@ -0,0 +1,838 @@ +#!/usr/bin/python +# Copyright: (c) 2021, Daniil Kupchenko (@oukooveu) +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + + +DOCUMENTATION = r""" +--- +module: aws_msk_cluster +short_description: Manage Amazon MSK clusters. +version_added: "2.0.0" +requirements: + - botocore >= 1.17.42 + - boto3 >= 1.17.9 +description: + - Create, delete and modify Amazon MSK (Managed Streaming for Apache Kafka) clusters. +author: + - Daniil Kupchenko (@oukooveu) +options: + state: + description: Create (C(present)) or delete (C(absent)) cluster. + choices: ['present', 'absent'] + type: str + default: 'present' + name: + description: The name of the cluster. + required: true + type: str + version: + description: + - The version of Apache Kafka. + - This version should exist in given configuration. + - This parameter is required when I(state=present). + type: str + configuration_arn: + description: + - ARN of the configuration to use. + - This parameter is required when I(state=present). + type: str + configuration_revision: + description: + - The revision of the configuration to use. + - This parameter is required when I(state=present). + type: int + nodes: + description: The number of broker nodes in the cluster. Should be greater or equal to two. + type: int + default: 3 + instance_type: + description: + - The type of Amazon EC2 instances to use for Kafka brokers. + - Update operation requires boto3 version >= 1.16.58 + choices: + - kafka.t3.small + - kafka.m5.large + - kafka.m5.xlarge + - kafka.m5.2xlarge + - kafka.m5.4xlarge + default: kafka.t3.small + type: str + ebs_volume_size: + description: The size in GiB of the EBS volume for the data drive on each broker node. + type: int + default: 100 + subnets: + description: + - The list of subnets to connect to in the client virtual private cloud (VPC). + AWS creates elastic network interfaces inside these subnets. Client applications use + elastic network interfaces to produce and consume data. + - Client subnets can't be in Availability Zone us-east-1e. + - This parameter is required when I(state=present). + type: list + elements: str + security_groups: + description: + - The AWS security groups to associate with the elastic network interfaces in order to specify + who can connect to and communicate with the Amazon MSK cluster. + If you don't specify a security group, Amazon MSK uses the default security group associated with the VPC. + type: list + elements: str + encryption: + description: + - Includes all encryption-related information. + - Effective only for new cluster and can not be updated. + type: dict + suboptions: + kms_key_id: + description: + - The ARN of the AWS KMS key for encrypting data at rest. If you don't specify a KMS key, MSK creates one for you and uses it. + default: Null + type: str + in_transit: + description: The details for encryption in transit. + type: dict + suboptions: + in_cluster: + description: + - When set to true, it indicates that data communication among the broker nodes of the cluster is encrypted. + When set to false, the communication happens in plaintext. + type: bool + default: True + client_broker: + description: + - Indicates the encryption setting for data in transit between clients and brokers. The following are the possible values. + TLS means that client-broker communication is enabled with TLS only. + TLS_PLAINTEXT means that client-broker communication is enabled for both TLS-encrypted, as well as plaintext data. + PLAINTEXT means that client-broker communication is enabled in plaintext only. + choices: + - TLS + - TLS_PLAINTEXT + - PLAINTEXT + type: str + default: TLS + authentication: + description: + - Includes all client authentication related information. + - Effective only for new cluster and can not be updated. + type: dict + suboptions: + tls_ca_arn: + description: List of ACM Certificate Authority ARNs. + type: list + elements: str + sasl_scram: + description: SASL/SCRAM authentication is enabled or not. + type: bool + default: False + enhanced_monitoring: + description: Specifies the level of monitoring for the MSK cluster. + choices: + - DEFAULT + - PER_BROKER + - PER_TOPIC_PER_BROKER + - PER_TOPIC_PER_PARTITION + default: DEFAULT + type: str + open_monitoring: + description: The settings for open monitoring. + type: dict + suboptions: + jmx_exporter: + description: Indicates whether you want to enable or disable the JMX Exporter. + type: bool + default: False + node_exporter: + description: Indicates whether you want to enable or disable the Node Exporter. + type: bool + default: False + logging: + description: Logging configuration. + type: dict + suboptions: + cloudwatch: + description: Details of the CloudWatch Logs destination for broker logs. + type: dict + suboptions: + enabled: + description: Specifies whether broker logs get sent to the specified CloudWatch Logs destination. + type: bool + default: False + log_group: + description: The CloudWatch log group that is the destination for broker logs. + type: str + required: False + firehose: + description: Details of the Kinesis Data Firehose delivery stream that is the destination for broker logs. + type: dict + suboptions: + enabled: + description: Specifies whether broker logs get send to the specified Kinesis Data Firehose delivery stream. + type: bool + default: False + delivery_stream: + description: The Kinesis Data Firehose delivery stream that is the destination for broker logs. + type: str + required: False + s3: + description: Details of the Amazon S3 destination for broker logs. + type: dict + suboptions: + enabled: + description: Specifies whether broker logs get sent to the specified Amazon S3 destination. + type: bool + default: False + bucket: + description: The name of the S3 bucket that is the destination for broker logs. + type: str + required: False + prefix: + description: The S3 prefix that is the destination for broker logs. + type: str + required: False + wait: + description: Whether to wait for the cluster to be available or deleted. + type: bool + default: false + wait_timeout: + description: How many seconds to wait. Cluster creation can take up to 20-30 minutes. + type: int + default: 3600 + tags: + description: Tag dictionary to apply to the cluster. + type: dict + purge_tags: + description: Remove tags not listed in I(tags) when tags is specified. + default: true + type: bool +extends_documentation_fragment: + - amazon.aws.aws + - amazon.aws.ec2 +notes: + - All operations are time consuming, for example create takes 20-30 minutes, + update kafka version -- more than one hour, update configuration -- 10-15 minutes; + - Cluster's brokers get evenly distributed over a number of availability zones + that's equal to the number of subnets. +""" + +EXAMPLES = r""" +# Note: These examples do not set authentication details, see the AWS Guide for details. + +- aws_msk_cluster: + name: kafka-cluster + state: present + version: 2.6.1 + nodes: 6 + ebs_volume_size: "{{ aws_msk_options.ebs_volume_size }}" + subnets: + - subnet-e3b48ce7c25861eeb + - subnet-2990c8b25b07ddd43 + - subnet-d9fbeaf46c54bfab6 + wait: true + wait_timeout: 1800 + configuration_arn: arn:aws:kafka:us-east-1:000000000001:configuration/kafka-cluster-configuration/aaaaaaaa-bbbb-4444-3333-ccccccccc-1 + configuration_revision: 1 + +- aws_msk_cluster: + name: kafka-cluster + state: absent +""" + +RETURN = r""" +# These are examples of possible return values, and in general should use other names for return values. + +bootstrap_broker_string: + description: A list of brokers that a client application can use to bootstrap. + type: complex + contains: + plain: + description: A string containing one or more hostname:port pairs. + type: str + tls: + description: A string containing one or more DNS names (or IP) and TLS port pairs. + type: str + returned: I(state=present) and cluster state is I(ACTIVE) +cluster_info: + description: Description of the MSK cluster. + type: dict + returned: I(state=present) +response: + description: The response from actual API call. + type: dict + returned: always + sample: {} +""" + +import time + +try: + import botocore +except ImportError: + pass # handled by AnsibleAWSModule + +from ansible_collections.amazon.aws.plugins.module_utils.core import AnsibleAWSModule +from ansible_collections.amazon.aws.plugins.module_utils.ec2 import ( + camel_dict_to_snake_dict, + compare_aws_tags, + AWSRetry, +) + + +@AWSRetry.jittered_backoff(retries=5, delay=5) +def list_clusters_with_backoff(client, cluster_name): + paginator = client.get_paginator("list_clusters") + return paginator.paginate(ClusterNameFilter=cluster_name).build_full_result() + + +@AWSRetry.jittered_backoff(retries=5, delay=5) +def list_nodes_with_backoff(client, cluster_arn): + paginator = client.get_paginator("list_nodes") + return paginator.paginate(ClusterArn=cluster_arn).build_full_result() + + +def find_cluster_by_name(client, module, cluster_name): + try: + cluster_list = list_clusters_with_backoff(client, cluster_name).get("ClusterInfoList", []) + except ( + botocore.exceptions.BotoCoreError, + botocore.exceptions.ClientError, + ) as e: + module.fail_json_aws(e, "Failed to find kafka cluster by name") + if cluster_list: + if len(cluster_list) != 1: + module.fail_json(msg="Found more than one cluster with name '{0}'".format(cluster_name)) + return cluster_list[0] + return {} + + +def get_cluster_state(client, module, arn): + try: + response = client.describe_cluster(ClusterArn=arn, aws_retry=True) + except client.exceptions.NotFoundException: + return "DELETED" + except ( + botocore.exceptions.BotoCoreError, + botocore.exceptions.ClientError, + ) as e: + module.fail_json_aws(e, "Failed to get kafka cluster state") + return response["ClusterInfo"]["State"] + + +def get_cluster_version(client, module, arn): + try: + response = client.describe_cluster(ClusterArn=arn, aws_retry=True) + except ( + botocore.exceptions.BotoCoreError, + botocore.exceptions.ClientError, + ) as e: + module.fail_json_aws(e, "Failed to get kafka cluster version") + return response["ClusterInfo"]["CurrentVersion"] + + +def wait_for_cluster_state(client, module, arn, state="ACTIVE"): + # As of 2021-06 boto3 doesn't offer any built in waiters + start = time.time() + timeout = int(module.params.get("wait_timeout")) + check_interval = 60 + while True: + current_state = get_cluster_state(client, module, arn) + if current_state == state: + return + if time.time() - start > timeout: + module.fail_json( + msg="Timeout waiting for cluster {0} (desired state is '{1}')".format( + current_state, state + ) + ) + time.sleep(check_interval) + + +def prepare_create_options(module): + """ + Return data structure for cluster create operation + """ + + c_params = { + "ClusterName": module.params["name"], + "KafkaVersion": module.params["version"], + "ConfigurationInfo": { + "Arn": module.params["configuration_arn"], + "Revision": module.params["configuration_revision"], + }, + "NumberOfBrokerNodes": module.params["nodes"], + "BrokerNodeGroupInfo": { + "ClientSubnets": module.params["subnets"], + "InstanceType": module.params["instance_type"], + } + } + + if module.params["security_groups"] and len(module.params["security_groups"]) != 0: + c_params["BrokerNodeGroupInfo"]["SecurityGroups"] = module.params.get("security_groups") + + if module.params["ebs_volume_size"]: + c_params["BrokerNodeGroupInfo"]["StorageInfo"] = { + "EbsStorageInfo": { + "VolumeSize": module.params.get("ebs_volume_size") + } + } + + if module.params["encryption"]: + c_params["EncryptionInfo"] = {} + if module.params["encryption"].get("kms_key_id"): + c_params["EncryptionInfo"]["EncryptionAtRest"] = { + "DataVolumeKMSKeyId": module.params["encryption"]["kms_key_id"] + } + c_params["EncryptionInfo"]["EncryptionInTransit"] = { + "ClientBroker": module.params["encryption"]["in_transit"].get("client_broker", "TLS"), + "InCluster": module.params["encryption"]["in_transit"].get("in_cluster", True) + } + + if module.params["authentication"]: + c_params["ClientAuthentication"] = {} + if module.params["authentication"].get("sasl_scram"): + c_params["ClientAuthentication"]["Sasl"] = { + "Scram": module.params["authentication"]["sasl_scram"] + } + if module.params["authentication"].get("tls_ca_arn"): + c_params["ClientAuthentication"]["Tls"] = { + "CertificateAuthorityArnList": module.params["authentication"]["tls_ca_arn"] + } + + c_params.update(prepare_enhanced_monitoring_options(module)) + c_params.update(prepare_open_monitoring_options(module)) + c_params.update(prepare_logging_options(module)) + + return c_params + + +def prepare_enhanced_monitoring_options(module): + m_params = {} + m_params["EnhancedMonitoring"] = module.params["enhanced_monitoring"] or "DEFAULT" + return m_params + + +def prepare_open_monitoring_options(module): + m_params = {} + open_monitoring = module.params["open_monitoring"] or {} + m_params["OpenMonitoring"] = { + "Prometheus": { + "JmxExporter": { + "EnabledInBroker": open_monitoring.get("jmx_exporter", False) + }, + "NodeExporter": { + "EnabledInBroker": open_monitoring.get("node_exporter", False) + } + } + } + return m_params + + +def prepare_logging_options(module): + l_params = {} + logging = module.params["logging"] or {} + if logging.get("cloudwatch"): + l_params["CloudWatchLogs"] = { + "Enabled": module.params["logging"]["cloudwatch"].get("enabled"), + "LogGroup": module.params["logging"]["cloudwatch"].get("log_group") + } + else: + l_params["CloudWatchLogs"] = { + "Enabled": False + } + if logging.get("firehose"): + l_params["Firehose"] = { + "Enabled": module.params["logging"]["firehose"].get("enabled"), + "DeliveryStream": module.params["logging"]["firehose"].get("delivery_stream") + } + else: + l_params["Firehose"] = { + "Enabled": False + } + if logging.get("s3"): + l_params["S3"] = { + "Enabled": module.params["logging"]["s3"].get("enabled"), + "Bucket": module.params["logging"]["s3"].get("bucket"), + "Prefix": module.params["logging"]["s3"].get("prefix") + } + else: + l_params["S3"] = { + "Enabled": False + } + return { + "LoggingInfo": { + "BrokerLogs": l_params + } + } + + +def create_or_update_cluster(client, module): + """ + Create new or update existing cluster + """ + + changed = False + response = {} + + cluster = find_cluster_by_name(client, module, module.params["name"]) + + if not cluster: + + changed = True + + if module.check_mode: + return True, {} + + create_params = prepare_create_options(module) + + try: + response = client.create_cluster(aws_retry=True, **create_params) + except ( + botocore.exceptions.BotoCoreError, + botocore.exceptions.ClientError, + ) as e: + module.fail_json_aws(e, "Failed to create kafka cluster") + + if module.params.get("wait"): + wait_for_cluster_state(client, module, arn=response["ClusterArn"], state="ACTIVE") + + else: + + response["ClusterArn"] = cluster["ClusterArn"] + response["changes"] = {} + + # prepare available update methods definitions with current/target values and options + msk_cluster_changes = { + "broker_count": { + "current_value": cluster["NumberOfBrokerNodes"], + "target_value": module.params.get("nodes"), + "update_params": { + "TargetNumberOfBrokerNodes": module.params.get("nodes") + } + }, + "broker_storage": { + "current_value": cluster["BrokerNodeGroupInfo"]["StorageInfo"]["EbsStorageInfo"]["VolumeSize"], + "target_value": module.params.get("ebs_volume_size"), + "update_params": { + "TargetBrokerEBSVolumeInfo": [ + {"KafkaBrokerNodeId": "All", "VolumeSizeGB": module.params.get("ebs_volume_size")} + ] + } + }, + "broker_type": { + "boto3_version": "1.16.58", + "current_value": cluster["BrokerNodeGroupInfo"]["InstanceType"], + "target_value": module.params.get("instance_type"), + "update_params": { + "TargetInstanceType": module.params.get("instance_type") + } + }, + "cluster_configuration": { + "current_value": { + "arn": cluster["CurrentBrokerSoftwareInfo"]["ConfigurationArn"], + "revision": cluster["CurrentBrokerSoftwareInfo"]["ConfigurationRevision"], + }, + "target_value": { + "arn": module.params.get("configuration_arn"), + "revision": module.params.get("configuration_revision"), + }, + "update_params": { + "ConfigurationInfo": { + "Arn": module.params.get("configuration_arn"), + "Revision": module.params.get("configuration_revision") + } + } + }, + "cluster_kafka_version": { + "current_value": cluster["CurrentBrokerSoftwareInfo"]["KafkaVersion"], + "target_value": module.params.get("version"), + "update_params": { + "TargetKafkaVersion": module.params.get("version") + } + }, + "enhanced_monitoring": { + "current_value": cluster["EnhancedMonitoring"], + "target_value": module.params.get("enhanced_monitoring"), + "update_method": "update_monitoring", + "update_params": prepare_enhanced_monitoring_options(module) + }, + "open_monitoring": { + "current_value": { + "OpenMonitoring": cluster["OpenMonitoring"] + }, + "target_value": prepare_open_monitoring_options(module), + "update_method": "update_monitoring", + "update_params": prepare_open_monitoring_options(module) + }, + "logging": { + "current_value": { + "LoggingInfo": cluster["LoggingInfo"] + }, + "target_value": prepare_logging_options(module), + "update_method": "update_monitoring", + "update_params": prepare_logging_options(module) + } + } + + for method, options in msk_cluster_changes.items(): + + if 'boto3_version' in options: + if not module.boto3_at_least(options["boto3_version"]): + continue + + try: + update_method = getattr(client, options.get("update_method", "update_" + method)) + except AttributeError as e: + module.fail_json_aws(e, "There is no update method 'update_{0}'".format(method)) + + if options["current_value"] != options["target_value"]: + changed = True + if module.check_mode: + return True, {} + + # need to get cluster version and check for the state because + # there can be several updates requested but only one in time can be performed + version = get_cluster_version(client, module, cluster["ClusterArn"]) + state = get_cluster_state(client, module, cluster["ClusterArn"]) + if state != "ACTIVE": + if module.params["wait"]: + wait_for_cluster_state(client, module, arn=cluster["ClusterArn"], state="ACTIVE") + else: + module.fail_json( + msg="Cluster can be updated only in active state, current state is '{0}'. check cluster state or use wait option".format( + state + ) + ) + try: + response["changes"][method] = update_method( + ClusterArn=cluster["ClusterArn"], + CurrentVersion=version, + **options["update_params"] + ) + except ( + botocore.exceptions.BotoCoreError, + botocore.exceptions.ClientError, + ) as e: + module.fail_json_aws( + e, "Failed to update cluster via 'update_{0}'".format(method) + ) + + if module.params["wait"]: + wait_for_cluster_state(client, module, arn=cluster["ClusterArn"], state="ACTIVE") + + changed |= update_cluster_tags(client, module, response["ClusterArn"]) + + return changed, response + + +def update_cluster_tags(client, module, arn): + new_tags = module.params.get('tags') + if new_tags is None: + return False + purge_tags = module.params.get('purge_tags') + + try: + existing_tags = client.list_tags_for_resource(ResourceArn=arn, aws_retry=True)['Tags'] + except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: + module.fail_json_aws(e, msg="Unable to retrieve tags for cluster '{0}'".format(arn)) + + tags_to_add, tags_to_remove = compare_aws_tags(existing_tags, new_tags, purge_tags=purge_tags) + + if not module.check_mode: + try: + if tags_to_remove: + client.untag_resource(ResourceArn=arn, TagKeys=tags_to_remove, aws_retry=True) + if tags_to_add: + client.tag_resource(ResourceArn=arn, Tags=tags_to_add, aws_retry=True) + except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: + module.fail_json_aws(e, msg="Unable to set tags for cluster '{0}'".format(arn)) + + changed = bool(tags_to_add) or bool(tags_to_remove) + return changed + + +def delete_cluster(client, module): + + cluster = find_cluster_by_name(client, module, module.params["name"]) + + if module.check_mode: + if cluster: + return True, cluster + else: + return False, {} + + if not cluster: + return False, {} + + try: + response = client.delete_cluster( + ClusterArn=cluster["ClusterArn"], + CurrentVersion=cluster["CurrentVersion"], + ) + except (botocore.exceptions.BotoCoreError, botocore.exceptions.ClientError) as e: + module.fail_json_aws(e, "Failed to delete kafka cluster") + + if module.params["wait"]: + wait_for_cluster_state(client, module, arn=cluster["ClusterArn"], state="DELETED") + + response["bootstrap_broker_string"] = {} + + return True, response + + +def main(): + + module_args = dict( + name=dict(type="str", required=True), + state=dict(type="str", choices=["present", "absent"], default="present"), + version=dict(type="str"), + configuration_arn=dict(type="str"), + configuration_revision=dict(type="int"), + nodes=dict(type="int", default=3), + instance_type=dict( + choices=[ + "kafka.t3.small", + "kafka.m5.large", + "kafka.m5.xlarge", + "kafka.m5.2xlarge", + "kafka.m5.4xlarge", + ], + default="kafka.t3.small", + ), + ebs_volume_size=dict(type="int", default=100), + subnets=dict(type="list", elements="str"), + security_groups=dict(type="list", elements="str", required=False), + encryption=dict( + type="dict", + options=dict( + kms_key_id=dict(type="str", required=False), + in_transit=dict( + type="dict", + options=dict( + in_cluster=dict(type="bool", default=True), + client_broker=dict( + choices=["TLS", "TLS_PLAINTEXT", "PLAINTEXT"], + default="TLS" + ), + ), + ), + ), + ), + authentication=dict( + type="dict", + options=dict( + tls_ca_arn=dict(type="list", elements="str", required=False), + sasl_scram=dict(type="bool", default=False), + ), + ), + enhanced_monitoring=dict( + choices=[ + "DEFAULT", + "PER_BROKER", + "PER_TOPIC_PER_BROKER", + "PER_TOPIC_PER_PARTITION", + ], + default="DEFAULT", + required=False, + ), + open_monitoring=dict( + type="dict", + options=dict( + jmx_exporter=dict(type="bool", default=False), + node_exporter=dict(type="bool", default=False), + ), + ), + logging=dict( + type="dict", + options=dict( + cloudwatch=dict( + type="dict", + options=dict( + enabled=dict(type="bool", default=False), + log_group=dict(type="str", required=False), + ), + ), + firehose=dict( + type="dict", + options=dict( + enabled=dict(type="bool", default=False), + delivery_stream=dict(type="str", required=False), + ), + ), + s3=dict( + type="dict", + options=dict( + enabled=dict(type="bool", default=False), + bucket=dict(type="str", required=False), + prefix=dict(type="str", required=False), + ), + ), + ), + ), + wait=dict(type="bool", default=False), + wait_timeout=dict(type="int", default=3600), + tags=dict(type='dict'), + purge_tags=dict(type='bool', default=True), + ) + + module = AnsibleAWSModule( + argument_spec=module_args, + required_if=[['state', 'present', ['version', 'configuration_arn', 'configuration_revision', 'subnets']]], + supports_check_mode=True + ) + + client = module.client("kafka", retry_decorator=AWSRetry.jittered_backoff()) + + if module.params["state"] == "present": + if len(module.params["subnets"]) < 2: + module.fail_json( + msg="At least two client subnets should be provided" + ) + if int(module.params["nodes"]) % int(len(module.params["subnets"])) != 0: + module.fail_json( + msg="The number of broker nodes must be a multiple of availability zones in the subnets parameter" + ) + if len(module.params["name"]) > 64: + module.fail_json( + module.fail_json(msg='Cluster name "{0}" exceeds 64 character limit'.format(module.params["name"])) + ) + changed, response = create_or_update_cluster(client, module) + elif module.params["state"] == "absent": + changed, response = delete_cluster(client, module) + + cluster_info = {} + bootstrap_broker_string = {} + if response.get("ClusterArn") and module.params["state"] == "present": + try: + cluster_info = client.describe_cluster(ClusterArn=response["ClusterArn"], aws_retry=True)[ + "ClusterInfo" + ] + if cluster_info.get("State") == "ACTIVE": + brokers = client.get_bootstrap_brokers(ClusterArn=response["ClusterArn"], aws_retry=True) + if brokers.get("BootstrapBrokerString"): + bootstrap_broker_string["plain"] = brokers["BootstrapBrokerString"] + if brokers.get("BootstrapBrokerStringTls"): + bootstrap_broker_string["tls"] = brokers["BootstrapBrokerStringTls"] + except ( + botocore.exceptions.BotoCoreError, + botocore.exceptions.ClientError, + ) as e: + module.fail_json_aws( + e, + "Can not obtain information about cluster {0}".format( + response["ClusterArn"] + ), + ) + + module.exit_json( + changed=changed, + bootstrap_broker_string=bootstrap_broker_string, + cluster_info=camel_dict_to_snake_dict(cluster_info), + response=camel_dict_to_snake_dict(response), + ) + + +if __name__ == "__main__": + main() diff --git a/plugins/modules/aws_msk_config.py b/plugins/modules/aws_msk_config.py new file mode 100644 index 00000000000..c02769152a5 --- /dev/null +++ b/plugins/modules/aws_msk_config.py @@ -0,0 +1,323 @@ +#!/usr/bin/python +# Copyright: (c) 2021, Daniil Kupchenko (@oukooveu) +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + + +DOCUMENTATION = r""" +--- +module: aws_msk_config +short_description: Manage Amazon MSK cluster configurations. +version_added: "2.0.0" +requirements: + - botocore >= 1.17.42 + - boto3 >= 1.17.9 +description: + - Create, delete and modify Amazon MSK (Managed Streaming for Apache Kafka) cluster configurations. +author: + - Daniil Kupchenko (@oukooveu) +options: + state: + description: Create (C(present)) or delete (C(absent)) cluster configuration. + choices: ['present', 'absent'] + default: 'present' + type: str + name: + description: The name of the configuration. + required: true + type: str + description: + description: The description of the configuration. + type: str + config: + description: Contents of the server.properties file. + type: dict + aliases: ['configuration'] + kafka_versions: + description: + - The versions of Apache Kafka with which you can use this MSK configuration. + - Required when I(state=present). + type: list + elements: str +extends_documentation_fragment: + - amazon.aws.aws + - amazon.aws.ec2 +""" + +EXAMPLES = r""" +# Note: These examples do not set authentication details, see the AWS Guide for details. + +- aws_msk_config: + name: kafka-cluster-configuration + state: present + kafka_versions: + - 2.6.0 + - 2.6.1 + config: + auto.create.topics.enable: false + num.partitions: 1 + default.replication.factor: 3 + zookeeper.session.timeout.ms: 18000 + +- aws_msk_config: + name: kafka-cluster-configuration + state: absent +""" + +RETURN = r""" +# These are examples of possible return values, and in general should use other names for return values. + +arn: + description: The Amazon Resource Name (ARN) of the configuration. + type: str + returned: I(state=present) + sample: "arn:aws:kafka:::configuration//" +revision: + description: The revision number. + type: int + returned: I(state=present) + sample: 1 +server_properties: + description: Contents of the server.properties file. + type: str + returned: I(state=present) + sample: "default.replication.factor=3\nnum.io.threads=8\nzookeeper.session.timeout.ms=18000" +response: + description: The response from actual API call. + type: dict + returned: always + sample: {} +""" + +try: + import botocore +except ImportError: + pass # handled by AnsibleAWSModule + +from ansible_collections.amazon.aws.plugins.module_utils.core import AnsibleAWSModule +from ansible_collections.amazon.aws.plugins.module_utils.ec2 import ( + camel_dict_to_snake_dict, + AWSRetry, +) + + +BOTOCORE_MIN_VERSION = "1.17.42" + + +def dict_to_prop(d): + """convert dictionary to multi-line properties""" + if len(d) == 0: + return "" + return "\n".join("{0}={1}".format(k, v) for k, v in d.items()) + + +def prop_to_dict(p): + """convert properties to dictionary""" + if len(p) == 0: + return {} + r_dict = {} + for s in p.decode().split("\n"): + kv = s.split("=") + r_dict[kv[0].strip()] = kv[1].strip() + return r_dict + # python >= 2.7 is required: + # return { + # k.strip(): v.strip() for k, v in (i.split("=") for i in p.decode().split("\n")) + # } + + +@AWSRetry.jittered_backoff(retries=5, delay=5) +def get_configurations_with_backoff(client): + paginator = client.get_paginator("list_configurations") + return paginator.paginate().build_full_result() + + +def find_active_config(client, module): + """ + looking for configuration by name + status is not returned for list_configurations in botocore 1.17.42 + delete_configuration method was added in botocore 1.17.48 + """ + + name = module.params["name"] + + try: + all_configs = get_configurations_with_backoff(client)["Configurations"] + except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: + module.fail_json_aws(e, msg="failed to obtain kafka configurations") + + active_configs = list( + item + for item in all_configs + if item["Name"] == name and item["State"] == "ACTIVE" + ) + + if active_configs: + if len(active_configs) == 1: + return active_configs[0] + else: + module.fail_json_aws( + msg="found more than one active config with name '{0}'".format(name) + ) + + return None + + +def get_configuration_revision(client, module, arn, revision): + try: + return client.describe_configuration_revision(Arn=arn, Revision=revision, aws_retry=True) + except ( + botocore.exceptions.BotoCoreError, + botocore.exceptions.ClientError, + ) as e: + module.fail_json_aws(e, "failed to describe kafka configuration revision") + + +def is_configuration_changed(module, current): + """ + compare configuration's description and properties + python 2.7+ version: + prop_module = {str(k): str(v) for k, v in module.params.get("config").items()} + """ + prop_module = {} + for k, v in module.params.get("config").items(): + prop_module[str(k)] = str(v) + if prop_to_dict(current.get("ServerProperties", "")) == prop_module: + if current.get("Description", "") == module.params.get("description"): + return False + return True + + +def create_config(client, module): + """create new or update existing configuration""" + + config = find_active_config(client, module) + + # create new configuration + if not config: + + if module.check_mode: + return True, {} + + try: + response = client.create_configuration( + Name=module.params.get("name"), + Description=module.params.get("description"), + KafkaVersions=module.params.get("kafka_versions"), + ServerProperties=dict_to_prop(module.params.get("config")).encode(), + aws_retry=True + ) + except ( + botocore.exceptions.BotoCoreError, + botocore.exceptions.ClientError, + ) as e: + module.fail_json_aws(e, "failed to create kafka configuration") + + # update existing configuration (creates new revision) + else: + # it's required because 'config' doesn't contain 'ServerProperties' + response = get_configuration_revision(client, module, arn=config["Arn"], revision=config["LatestRevision"]["Revision"]) + + if not is_configuration_changed(module, response): + return False, response + + if module.check_mode: + return True, {} + + try: + response = client.update_configuration( + Arn=config["Arn"], + Description=module.params.get("description"), + ServerProperties=dict_to_prop(module.params.get("config")).encode(), + aws_retry=True + ) + except ( + botocore.exceptions.BotoCoreError, + botocore.exceptions.ClientError, + ) as e: + module.fail_json_aws(e, "failed to update kafka configuration") + + arn = response["Arn"] + revision = response["LatestRevision"]["Revision"] + + result = get_configuration_revision(client, module, arn=arn, revision=revision) + + return True, result + + +def delete_config(client, module): + """delete configuration""" + + config = find_active_config(client, module) + + if module.check_mode: + if config: + return True, config + else: + return False, {} + + if config: + try: + response = client.delete_configuration(Arn=config["Arn"], aws_retry=True) + except ( + botocore.exceptions.BotoCoreError, + botocore.exceptions.ClientError, + ) as e: + module.fail_json_aws(e, "failed to delete the kafka configuration") + return True, response + + return False, {} + + +def main(): + + module_args = dict( + name=dict(type="str", required=True), + description=dict(type="str", default=""), + state=dict(choices=["present", "absent"], default="present"), + config=dict(type="dict", aliases=["configuration"], default={}), + kafka_versions=dict(type="list", elements="str"), + ) + + module = AnsibleAWSModule(argument_spec=module_args, supports_check_mode=True) + + if not module.botocore_at_least(BOTOCORE_MIN_VERSION): + module.fail_json( + msg="aws_msk_config module requires botocore >= {0}".format( + BOTOCORE_MIN_VERSION + ) + ) + + client = module.client("kafka", retry_decorator=AWSRetry.jittered_backoff()) + + if module.params["state"] == "present": + changed, response = create_config(client, module) + + elif module.params["state"] == "absent": + changed, response = delete_config(client, module) + + # return some useless staff in check mode if configuration doesn't exists + # can be useful when these options are referenced by other modules during check mode run + if module.check_mode and not response.get("Arn"): + arn = "arn:aws:kafka:region:account:configuration/name/id" + revision = 1 + server_properties = "" + else: + arn = response.get("Arn") + revision = response.get("Revision") + server_properties = response.get("ServerProperties", "") + + module.exit_json( + changed=changed, + arn=arn, + revision=revision, + server_properties=server_properties, + response=camel_dict_to_snake_dict(response), + ) + + +if __name__ == "__main__": + main() diff --git a/tests/integration/targets/aws_msk_cluster/aliases b/tests/integration/targets/aws_msk_cluster/aliases new file mode 100644 index 00000000000..4ef4b2067d0 --- /dev/null +++ b/tests/integration/targets/aws_msk_cluster/aliases @@ -0,0 +1 @@ +cloud/aws diff --git a/tests/integration/targets/aws_msk_cluster/defaults/main.yml b/tests/integration/targets/aws_msk_cluster/defaults/main.yml new file mode 100644 index 00000000000..25d5dafa689 --- /dev/null +++ b/tests/integration/targets/aws_msk_cluster/defaults/main.yml @@ -0,0 +1,19 @@ +--- +vpc_name: "{{ resource_prefix }}" +vpc_cidr: '10.{{ 256 | random(seed=resource_prefix) }}.0.0/16' +vpc_subnets: + - '10.{{ 256 | random(seed=resource_prefix) }}.100.0/24' + - '10.{{ 256 | random(seed=resource_prefix) }}.101.0/24' +vpc_subnet_name_prefix: "{{ resource_prefix }}" + +msk_config_name: "{{ resource_prefix }}-config" +msk_cluster_name: "ansible-test-{{ (resource_prefix | hash('md5'))[:7] }}-msk-cluster" +msk_version: 2.6.0 +msk_broker_nodes: 2 + +tags_create: + key1: "value1" + key2: "value2" +tags_update: + key2: "value2" + key3: "value3" diff --git a/tests/integration/targets/aws_msk_cluster/tasks/main.yml b/tests/integration/targets/aws_msk_cluster/tasks/main.yml new file mode 100644 index 00000000000..ab4f94e3305 --- /dev/null +++ b/tests/integration/targets/aws_msk_cluster/tasks/main.yml @@ -0,0 +1,97 @@ +--- +- name: aws_msk_cluster integration tests + module_defaults: + group/aws: + aws_access_key: "{{ aws_access_key }}" + aws_secret_key: "{{ aws_secret_key }}" + security_token: "{{ security_token | default(omit) }}" + region: "{{ aws_region }}" + collections: + - amazon.aws + block: + + - name: collect availability zone info + aws_az_info: + register: az_info + + - name: assert there are at least two zones + assert: + that: az_info.availability_zones | length >= 2 + + - name: create vpc + ec2_vpc_net: + state: present + cidr_block: '{{ vpc_cidr }}' + name: '{{ vpc_name }}' + register: vpc + + - name: create subnets + ec2_vpc_subnet: + state: present + cidr: '{{ item }}' + az: '{{ az_info.availability_zones[index].zone_name }}' + vpc_id: '{{ vpc.vpc.id }}' + tags: + Name: '{{ vpc_subnet_name_prefix }}-subnet-{{ index }}' + loop: "{{ vpc_subnets }}" + loop_control: + index_var: index + register: subnets + + - set_fact: + subnet_ids: '{{ subnets | community.general.json_query("results[].subnet.id") | list }}' + + - name: create msk configuration + aws_msk_config: + name: "{{ msk_config_name }}" + state: "present" + kafka_versions: + - "{{ msk_version }}" + register: msk_config + + - name: create tests + include_tasks: test_create.yml + + - name: update tests + include_tasks: test_update.yml + + - name: delete tests + include_tasks: test_delete.yml + + always: + + - name: delete msk cluster + aws_msk_cluster: + name: "{{ msk_cluster_name }}" + state: absent + wait: true + ignore_errors: yes + + - name: remove msk configuration + aws_msk_config: + name: "{{ msk_config_name }}" + state: absent + ignore_errors: yes + + - name: remove subnets + ec2_vpc_subnet: + state: absent + cidr: '{{ item }}' + vpc_id: '{{ vpc.vpc.id }}' + loop: "{{ vpc_subnets }}" + ignore_errors: yes + register: removed_subnets + until: removed_subnets is succeeded + retries: 5 + delay: 5 + + - name: remove the vpc + ec2_vpc_net: + state: absent + cidr_block: '{{ vpc_cidr }}' + name: '{{ vpc_name }}' + ignore_errors: yes + register: removed_vpc + until: removed_vpc is success + retries: 5 + delay: 5 \ No newline at end of file diff --git a/tests/integration/targets/aws_msk_cluster/tasks/test_create.yml b/tests/integration/targets/aws_msk_cluster/tasks/test_create.yml new file mode 100644 index 00000000000..4fd7073cc5c --- /dev/null +++ b/tests/integration/targets/aws_msk_cluster/tasks/test_create.yml @@ -0,0 +1,72 @@ +--- +- name: create msk cluster (check mode) + aws_msk_cluster: + name: "{{ msk_cluster_name }}" + state: "present" + version: "{{ msk_version }}" + nodes: "{{ msk_broker_nodes }}" + ebs_volume_size: 10 + subnets: "{{ subnet_ids }}" + wait: true + tags: "{{ tags_create }}" + configuration_arn: "{{ msk_config.arn }}" + configuration_revision: "{{ msk_config.revision }}" + check_mode: yes + register: msk_cluster + +- name: assert that the msk cluster be created + assert: + that: + - msk_cluster is changed + +- name: create msk cluster + aws_msk_cluster: + name: "{{ msk_cluster_name }}" + state: "present" + version: "{{ msk_version }}" + nodes: "{{ msk_broker_nodes }}" + ebs_volume_size: 10 + subnets: "{{ subnet_ids }}" + wait: true + tags: "{{ tags_create }}" + configuration_arn: "{{ msk_config.arn }}" + configuration_revision: "{{ msk_config.revision }}" + register: msk_cluster + +- name: assert that the msk cluster is created + assert: + that: + - msk_cluster is changed + +- name: validate return values + assert: + that: + - "'cluster_info' in msk_cluster" + - "'bootstrap_broker_string' in msk_cluster" + - "'key1' in msk_cluster.cluster_info.tags" + - "msk_cluster.cluster_info.tags.key1 == 'value1'" + - "msk_cluster.cluster_info.cluster_name == msk_cluster_name" + - "msk_cluster.cluster_info.number_of_broker_nodes == msk_broker_nodes" + - "msk_cluster.cluster_info.broker_node_group_info.instance_type == 'kafka.t3.small'" + - "msk_cluster.cluster_info.broker_node_group_info.storage_info.ebs_storage_info.volume_size == 10" + - "msk_cluster.cluster_info.open_monitoring.prometheus.jmx_exporter.enabled_in_broker == false" + - "msk_cluster.cluster_info.cluster_arn.startswith('arn:aws:kafka:{{ aws_region }}:')" + +- name: create msk cluster (idempotency) + aws_msk_cluster: + name: "{{ msk_cluster_name }}" + state: "present" + version: "{{ msk_version }}" + nodes: "{{ msk_broker_nodes }}" + ebs_volume_size: 10 + subnets: "{{ subnet_ids }}" + wait: true + tags: "{{ tags_create }}" + configuration_arn: "{{ msk_config.arn }}" + configuration_revision: "{{ msk_config.revision }}" + register: msk_cluster + +- name: assert that the msk cluster wasn't changed + assert: + that: + - msk_cluster is not changed diff --git a/tests/integration/targets/aws_msk_cluster/tasks/test_delete.yml b/tests/integration/targets/aws_msk_cluster/tasks/test_delete.yml new file mode 100644 index 00000000000..efd90fa14cc --- /dev/null +++ b/tests/integration/targets/aws_msk_cluster/tasks/test_delete.yml @@ -0,0 +1,37 @@ +--- +- name: delete msk cluster (check mode) + aws_msk_cluster: + name: "{{ msk_cluster_name }}" + state: "absent" + wait: true + check_mode: yes + register: msk_cluster + +- name: assert that the msk cluster be changed + assert: + that: + - msk_cluster is changed + +- name: delete msk cluster + aws_msk_cluster: + name: "{{ msk_cluster_name }}" + state: "absent" + wait: true + register: msk_cluster + +- name: assert that the msk cluster is changed + assert: + that: + - msk_cluster is changed + +- name: delete msk cluster (idempotency) + aws_msk_cluster: + name: "{{ msk_cluster_name }}" + state: "absent" + wait: true + register: msk_cluster + +- name: assert that the msk cluster wasn't changed + assert: + that: + - msk_cluster is not changed diff --git a/tests/integration/targets/aws_msk_cluster/tasks/test_update.yml b/tests/integration/targets/aws_msk_cluster/tasks/test_update.yml new file mode 100644 index 00000000000..50ac9171836 --- /dev/null +++ b/tests/integration/targets/aws_msk_cluster/tasks/test_update.yml @@ -0,0 +1,72 @@ +--- +- name: update msk cluster (check mode) + aws_msk_cluster: + name: "{{ msk_cluster_name }}" + state: "present" + version: "{{ msk_version }}" + nodes: "{{ msk_broker_nodes }}" + ebs_volume_size: 20 + open_monitoring: + jmx_exporter: True + subnets: "{{ subnet_ids }}" + wait: true + tags: "{{ tags_update }}" + configuration_arn: "{{ msk_config.arn }}" + configuration_revision: "{{ msk_config.revision }}" + check_mode: yes + register: msk_cluster + +- name: assert that the msk cluster be changed + assert: + that: + - msk_cluster is changed + +- name: update msk cluster + aws_msk_cluster: + name: "{{ msk_cluster_name }}" + state: "present" + version: "{{ msk_version }}" + nodes: "{{ msk_broker_nodes }}" + ebs_volume_size: 20 + open_monitoring: + jmx_exporter: True + subnets: "{{ subnet_ids }}" + wait: true + tags: "{{ tags_update }}" + configuration_arn: "{{ msk_config.arn }}" + configuration_revision: "{{ msk_config.revision }}" + register: msk_cluster + +- name: assert that the msk cluster is changed + assert: + that: + - msk_cluster is changed + +- name: validate return values + assert: + that: + - "msk_cluster.cluster_info.broker_node_group_info.storage_info.ebs_storage_info.volume_size == 20" + - "msk_cluster.cluster_info.open_monitoring.prometheus.jmx_exporter.enabled_in_broker == true" + - "'key-1' not in msk_cluster.cluster_info.tags" + - "msk_cluster.cluster_info.tags.key3 == 'value3'" + +- name: update msk cluster (idempotency) + aws_msk_cluster: + name: "{{ msk_cluster_name }}" + state: "present" + version: "{{ msk_version }}" + nodes: "{{ msk_broker_nodes }}" + ebs_volume_size: 20 + open_monitoring: + jmx_exporter: True + subnets: "{{ subnet_ids }}" + wait: true + tags: "{{ tags_update }}" + configuration_arn: "{{ msk_config.arn }}" + configuration_revision: "{{ msk_config.revision }}" + register: msk_cluster + +- name: assert that the msk cluster wasn't changed + assert: + that: + - msk_cluster is not changed diff --git a/tests/integration/targets/aws_msk_config/aliases b/tests/integration/targets/aws_msk_config/aliases new file mode 100644 index 00000000000..4ef4b2067d0 --- /dev/null +++ b/tests/integration/targets/aws_msk_config/aliases @@ -0,0 +1 @@ +cloud/aws diff --git a/tests/integration/targets/aws_msk_config/defaults/main.yml b/tests/integration/targets/aws_msk_config/defaults/main.yml new file mode 100644 index 00000000000..264a2430061 --- /dev/null +++ b/tests/integration/targets/aws_msk_config/defaults/main.yml @@ -0,0 +1,10 @@ +--- +msk_config_name: "{{ resource_prefix }}-config" +msk_configs: + - auto.create.topics.enable: true + zookeeper.session.timeout.ms: 18000 + - num.io.threads: 8 + zookeeper.session.timeout.ms: 36000 +msk_kafka_versions: + - 2.6.0 + - 2.6.1 diff --git a/tests/integration/targets/aws_msk_config/tasks/main.yml b/tests/integration/targets/aws_msk_config/tasks/main.yml new file mode 100644 index 00000000000..f5343139c38 --- /dev/null +++ b/tests/integration/targets/aws_msk_config/tasks/main.yml @@ -0,0 +1,149 @@ +--- +- name: tests for community.aws.aws_msk_config + module_defaults: + group/aws: + aws_access_key: "{{ aws_access_key }}" + aws_secret_key: "{{ aws_secret_key }}" + security_token: "{{ security_token | default(omit) }}" + region: "{{ aws_region }}" + collections: + - amazon.aws + block: + + - name: create msk configuration (check mode) + aws_msk_config: + name: "{{ msk_config_name }}" + state: "present" + kafka_versions: "{{ msk_kafka_versions }}" + config: "{{ msk_configs[0] }}" + check_mode: yes + register: msk_config + + - name: assert that the msk configuration be created + assert: + that: + - msk_config is changed + + - name: create msk configuration + aws_msk_config: + name: "{{ msk_config_name }}" + state: "present" + kafka_versions: "{{ msk_kafka_versions }}" + config: "{{ msk_configs[0] }}" + register: msk_config + + - name: assert that the msk configuration is created + assert: + that: + - msk_config is changed + + - name: create msk configuration (idempotency) + aws_msk_config: + name: "{{ msk_config_name }}" + state: "present" + kafka_versions: "{{ msk_kafka_versions }}" + config: "{{ msk_configs[0] }}" + register: msk_config + + - name: assert that the msk configuration wasn't changed + assert: + that: + - msk_config is not changed + + - name: validate return values + assert: + that: + - msk_config.revision == 1 + - "msk_config.arn.startswith('arn:aws:kafka:{{ aws_region }}:')" + - "'auto.create.topics.enable=True' in msk_config.server_properties" + - "'zookeeper.session.timeout.ms=18000' in msk_config.server_properties" + + - name: update msk configuration (check mode) + aws_msk_config: + name: "{{ msk_config_name }}" + state: "present" + kafka_versions: "{{ msk_kafka_versions }}" + config: "{{ msk_configs[1] }}" + check_mode: yes + register: msk_config + + - name: assert that the msk configuration be changed + assert: + that: + - msk_config is changed + + - name: update msk configuration + aws_msk_config: + name: "{{ msk_config_name }}" + state: "present" + kafka_versions: "{{ msk_kafka_versions }}" + config: "{{ msk_configs[1] }}" + register: msk_config + + - name: assert that the msk configuration is changed + assert: + that: + - msk_config is changed + + - name: validate return values (update) + assert: + that: + - msk_config.revision == 2 + - "'auto.create.topics.enable=True' not in msk_config.server_properties" + - "'num.io.threads=8' in msk_config.server_properties" + - "'zookeeper.session.timeout.ms=36000' in msk_config.server_properties" + + - name: update msk configuration (idempotency) + aws_msk_config: + name: "{{ msk_config_name }}" + state: "present" + kafka_versions: "{{ msk_kafka_versions }}" + config: "{{ msk_configs[1] }}" + register: msk_config + + - name: assert that the msk configuration wasn't changed + assert: + that: + - msk_config is not changed + + - name: delete msk configuration (check mode) + aws_msk_config: + name: "{{ msk_config_name }}" + state: "absent" + check_mode: yes + register: msk_config + + - name: assert that the msk configuration be changed + assert: + that: + - msk_config is changed + + - name: delete msk configuration + aws_msk_config: + name: "{{ msk_config_name }}" + state: "absent" + register: msk_config + + - name: assert that the msk configuration is changed + assert: + that: + - msk_config is changed + + - name: delete msk configuration (idempotency) + aws_msk_config: + name: "{{ msk_config_name }}" + state: "absent" + register: msk_config + + - name: assert that the msk configuration wasn't changed + assert: + that: + - msk_config is not changed + + always: + + - name: remove msk configuration + aws_msk_config: + name: "{{ msk_config_name }}" + state: absent + ignore_errors: yes