diff --git a/plugins/module_utils/sns.py b/plugins/module_utils/sns.py new file mode 100644 index 00000000000..4ec67cfb858 --- /dev/null +++ b/plugins/module_utils/sns.py @@ -0,0 +1,174 @@ +# -*- coding: utf-8 -*- + +# Copyright: Contributors to the Ansible project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +import copy +import re + +try: + import botocore +except ImportError: + pass # handled by AnsibleAWSModule + +from ansible.module_utils.common.dict_transformations import camel_dict_to_snake_dict + +from ansible_collections.amazon.aws.plugins.module_utils.core import is_boto3_error_code +from ansible_collections.amazon.aws.plugins.module_utils.ec2 import AWSRetry +from ansible_collections.amazon.aws.plugins.module_utils.tagging import ansible_dict_to_boto3_tag_list +from ansible_collections.amazon.aws.plugins.module_utils.tagging import boto3_tag_list_to_ansible_dict +from ansible_collections.amazon.aws.plugins.module_utils.tagging import compare_aws_tags + + +@AWSRetry.jittered_backoff() +def _list_topics_with_backoff(client): + paginator = client.get_paginator("list_topics") + return paginator.paginate().build_full_result()["Topics"] + + +@AWSRetry.jittered_backoff(catch_extra_error_codes=["NotFound"]) +def _list_topic_subscriptions_with_backoff(client, topic_arn): + paginator = client.get_paginator("list_subscriptions_by_topic") + return paginator.paginate(TopicArn=topic_arn).build_full_result()["Subscriptions"] + + +@AWSRetry.jittered_backoff(catch_extra_error_codes=["NotFound"]) +def _list_subscriptions_with_backoff(client): + paginator = client.get_paginator("list_subscriptions") + return paginator.paginate().build_full_result()["Subscriptions"] + + +def list_topic_subscriptions(client, module, topic_arn): + try: + return _list_topic_subscriptions_with_backoff(client, topic_arn) + except is_boto3_error_code("AuthorizationError"): + try: + # potentially AuthorizationError when listing subscriptions for third party topic + return [sub for sub in _list_subscriptions_with_backoff(client) if sub["TopicArn"] == topic_arn] + except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: + module.fail_json_aws(e, msg=f"Couldn't get subscriptions list for topic {topic_arn}") + except ( + botocore.exceptions.ClientError, + botocore.exceptions.BotoCoreError, + ) as e: # pylint: disable=duplicate-except + module.fail_json_aws(e, msg=f"Couldn't get subscriptions list for topic {topic_arn}") + + +def list_topics(client, module): + try: + topics = _list_topics_with_backoff(client) + except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: + module.fail_json_aws(e, msg="Couldn't get topic list") + return [t["TopicArn"] for t in topics] + + +def topic_arn_lookup(client, module, name): + # topic names cannot have colons, so this captures the full topic name + all_topics = list_topics(client, module) + lookup_topic = f":{name}" + for topic in all_topics: + if topic.endswith(lookup_topic): + return topic + + +def compare_delivery_policies(policy_a, policy_b): + _policy_a = copy.deepcopy(policy_a) + _policy_b = copy.deepcopy(policy_b) + # AWS automatically injects disableSubscriptionOverrides if you set an + # http policy + if "http" in policy_a: + if "disableSubscriptionOverrides" not in policy_a["http"]: + _policy_a["http"]["disableSubscriptionOverrides"] = False + if "http" in policy_b: + if "disableSubscriptionOverrides" not in policy_b["http"]: + _policy_b["http"]["disableSubscriptionOverrides"] = False + comparison = _policy_a != _policy_b + return comparison + + +def canonicalize_endpoint(protocol, endpoint): + # AWS SNS expects phone numbers in + # and canonicalizes to E.164 format + # See + if protocol == "sms": + return re.sub("[^0-9+]*", "", endpoint) + return endpoint + + +def get_tags(client, module, topic_arn): + try: + return boto3_tag_list_to_ansible_dict(client.list_tags_for_resource(ResourceArn=topic_arn)["Tags"]) + except is_boto3_error_code("AuthorizationError"): + module.warn("Permission denied accessing tags") + return {} + except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: + module.fail_json_aws(e, msg="Couldn't obtain topic tags") + + +def get_info(connection, module, topic_arn): + name = module.params.get("name") + topic_type = module.params.get("topic_type") + state = module.params.get("state") + subscriptions = module.params.get("subscriptions") + purge_subscriptions = module.params.get("purge_subscriptions") + content_based_deduplication = module.params.get("content_based_deduplication") + subscriptions_existing = module.params.get("subscriptions_existing", []) + subscriptions_deleted = module.params.get("subscriptions_deleted", []) + subscriptions_added = module.params.get("subscriptions_added", []) + subscriptions_added = module.params.get("subscriptions_added", []) + topic_created = module.params.get("topic_created", False) + topic_deleted = module.params.get("topic_deleted", False) + attributes_set = module.params.get("attributes_set", []) + check_mode = module.check_mode + + info = { + "name": name, + "topic_type": topic_type, + "state": state, + "subscriptions_new": subscriptions, + "subscriptions_existing": subscriptions_existing, + "subscriptions_deleted": subscriptions_deleted, + "subscriptions_added": subscriptions_added, + "subscriptions_purge": purge_subscriptions, + "content_based_deduplication": content_based_deduplication, + "check_mode": check_mode, + "topic_created": topic_created, + "topic_deleted": topic_deleted, + "attributes_set": attributes_set, + } + if state != "absent": + if topic_arn in list_topics(connection, module): + info.update(camel_dict_to_snake_dict(connection.get_topic_attributes(TopicArn=topic_arn)["Attributes"])) + info["delivery_policy"] = info.pop("effective_delivery_policy") + info["subscriptions"] = [ + camel_dict_to_snake_dict(sub) for sub in list_topic_subscriptions(connection, module, topic_arn) + ] + info["tags"] = get_tags(connection, module, topic_arn) + return info + + +def update_tags(client, module, topic_arn): + if module.params.get("tags") is None: + return False + + existing_tags = get_tags(client, module, topic_arn) + to_update, to_delete = compare_aws_tags(existing_tags, module.params["tags"], module.params["purge_tags"]) + + if not bool(to_delete or to_update): + return False + + if module.check_mode: + return True + + if to_update: + try: + client.tag_resource(ResourceArn=topic_arn, Tags=ansible_dict_to_boto3_tag_list(to_update)) + except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: + module.fail_json_aws(e, msg="Couldn't add tags to topic") + if to_delete: + try: + client.untag_resource(ResourceArn=topic_arn, TagKeys=to_delete) + except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: + module.fail_json_aws(e, msg="Couldn't remove tags from topic") + + return True diff --git a/plugins/modules/aws_batch_compute_environment.py b/plugins/modules/aws_batch_compute_environment.py index 39ff11e2576..ddfba8dbfac 100644 --- a/plugins/modules/aws_batch_compute_environment.py +++ b/plugins/modules/aws_batch_compute_environment.py @@ -118,6 +118,10 @@ description: - The Amazon Resource Name (ARN) of the Amazon EC2 Spot Fleet IAM role applied to a SPOT compute environment. type: str + launch_template: + description: + - The Name/id and version of the launch template to use for the compute resources. + type: dict requirements: - boto3 @@ -152,6 +156,9 @@ tag1: value1 tag2: value2 service_role: arn:aws:iam:::role/service-role/ + launch_template: + launch_template_name: batch_launch_template_example + version: 1 register: aws_batch_compute_environment_action - name: show results @@ -198,6 +205,9 @@ Environment: Name: type: MANAGED + launch_template: + launch_template_name: batch_launch_template_example + version: 1 validate_certs: true response: computeEnvironmentArn: "arn:aws:batch:...." @@ -223,6 +233,9 @@ status: VALID statusReason: "ComputeEnvironment Healthy" type: MANAGED + launcTemplate: + launchTemplateTame: batch_launch_template_example + version: 1 type: dict ''' @@ -318,7 +331,7 @@ def create_compute_environment(module, client): compute_resources_param_list = ('minv_cpus', 'maxv_cpus', 'desiredv_cpus', 'instance_types', 'image_id', 'subnets', 'security_group_ids', 'ec2_key_pair', 'instance_role', 'tags', 'bid_percentage', - 'spot_iam_fleet_role') + 'spot_iam_fleet_role','launch_template') compute_resources_params = set_api_params(module, compute_resources_param_list) if module.params['compute_resource_type'] is not None: @@ -372,6 +385,7 @@ def manage_state(module, client): minv_cpus = module.params['minv_cpus'] maxv_cpus = module.params['maxv_cpus'] desiredv_cpus = module.params['desiredv_cpus'] + #launch_template = snake_dict_to_camel_dict(module.params['launch_template']) action_taken = 'none' update_env_response = '' @@ -391,6 +405,9 @@ def manage_state(module, client): # Update configuration if needed compute_resources = {} + #if current_compute_environment['computeResources']['launchTemplate'] != launch_template: + # module.fail_json(msg="""Batch doesn't support updating a compute environment with a new launch template version. + # If you update your launch template, you must create a new compute environment with the new template""") if compute_environment_state and current_compute_environment['state'] != compute_environment_state: compute_kwargs.update({'state': compute_environment_state}) updates = True @@ -465,6 +482,7 @@ def main(): tags=dict(type='dict'), bid_percentage=dict(type='int'), spot_iam_fleet_role=dict(), + launch_template=dict(type='dict'), ) module = AnsibleAWSModule( diff --git a/plugins/modules/aws_transfer_family.py b/plugins/modules/aws_transfer_family.py new file mode 100644 index 00000000000..23a13e18beb --- /dev/null +++ b/plugins/modules/aws_transfer_family.py @@ -0,0 +1,453 @@ +#!/usr/bin/python +# aws_transfer_transfer.py +# Ansible AWS Transfer Plugin +# Copyright (C) 2021 Fayez ALSEDLAH; Trading Central + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +DOCUMENTATION = ''' +--- +module: transfer +short_description: Manage SFTP Severs in AWS. +description: + - Manage SFTP Servers in AWS Using AWS Transfer Service. +version_added: "2.4" +requirements: [ boto3 ] +author: "Fayez ALSEDLAH(@falsedlah); Trading Central" +options: + name: + description: + - Fully Qualified Domain name of the SFTP Server to create + required: true + type: str + state: + description: + - Create or remove the SFTP Server + - Present will also execute an update if necessary. + required: false + default: present + choices: [ 'present', 'absent', 'add_user', 'remove_user' ] + type: str + tags: + description: + - tags dict to apply to the server + type: dict + purge_tags: + description: + - whether to remove tags that aren't present in the C(tags) parameter + type: bool + default: True + endpoint_type: + description: + - The type of endpoint to be used. + type: str + choices: ['PUBLIC', 'VPC_ENDPOINT'] + default: 'PUBLIC' + identity_provider_type: + description: + - The identity provider type. + type: str + choices: ['SERVICE_MANAGED', 'API_GATEWAY'] + default: 'SERVICE_MANAGED' + user_home_directory_type: + description: + - The Type of directory that the user is mapped to. + type: str + choices: ['PATH', 'LOGICAL'] + user_home_directory: + description: + - The location of the directory for the user home directory. + type: str + default: '/' + user_home_directory_mappings: + description: + - Mappings for the user home directory on S3 to the local filesystem on the SFTP server. + type: dict + user_name: + description: + - The user name to create an account on the SFTP Server for. + type: str + user_policy: + description: + - A JSON-Formatted policy to limit the user, if needed. + type: str + user_role: + description: + - The ARN that points to the role that the user should assume. This role should have access to the S3 Bucket. + type: str + user_ssh_public_key_body: + description: + - The body of the public key that will be used (if pre-generated) to access the SFTP Server. + type: str + user_tags: + description: + - Tags that should be associated with the user when created. + type: list + host_key: + description: + - The SSH-keygen generated key for this particular host. + - It is not recommended to manage your own SSH keys for sftp hosts, but it is provided as a convenience for migration. + type: str + identity_provider_role: + description: + - The role parameter provides the type of role used to authenticate the user account. + - Length Constraints - Minimum length of 20. Maximum length of 2048. + - 'Pattern:: arn::.*role/.*' + type: str + identity_provider_url: + description: + - The Url parameter provides contains the location of the service endpoint used to authenticate users. + - Length Constraints - Maximum length of 255. + type: str + logging_role: + description: + - A value that allows the service to write your SFTP users' activity to your Amazon CloudWatch logs for monitoring and auditing purposes. + - Length Constraints - Minimum length of 20. Maximum length of 2048. + - 'Pattern:: arn::.*role/.*' + type: str + transfer_endpoint_url: + description: + - The URL for the transfer endpoint. + type: str + vpc_id: + description: + - the VPC to place the created SFTP server into. + type: str +extends_documentation_fragment: + - aws + - ec2 +notes: + - If C(requestPayment), C(policy), C(tagging) or C(versioning) + operations/API aren't implemented by the endpoint, module doesn't fail + if related parameters I(requester_pays), I(policy), I(tags) or + I(versioning) are C(None). +''' + +EXAMPLES = ''' +# Note: These examples do not set authentication details, see the AWS Guide for details. + +''' + + +from ansible.module_utils._text import to_text +from ansible_collections.amazon.aws.plugins.module_utils.core import AnsibleAWSModule +from ansible_collections.amazon.aws.plugins.module_utils.ec2 import ec2_argument_spec, AWSRetry, boto3_tag_list_to_ansible_dict, \ + ansible_dict_to_boto3_tag_list, get_aws_connection_info +try: + import boto3 + from botocore.exceptions import BotoCoreError, ClientError, EndpointConnectionError, WaiterError +except ImportError: + pass # handled by AnsibleAWSModule + +SERVER_NAME_KEY = 'aws:transfer:customHostname' + + +def create_or_update_sftp(client, module): + name = module.params.get("name") + purge_tags = module.params.get("purge_tags") + tags = {} + if module.params.get("tags") is not None: + tags = module.params.get("tags") + endpoint_type = module.params.get("endpoint_type") + vpc_id = module.params.get("vpc_id") + host_key = module.params.get("host_key") + identity_provider_type = module.params.get("identity_provider_type") + identity_provider_role = module.params.get("identity_provider_role") + identity_provider_url = module.params.get("identity_provider_url") + logging_role = module.params.get("logging_role") + changed = False + result = {} + sftp_server = None + needs_creation = False + + # TODO: Eventually, this needs to support all of the endpoint details, including vpc endpoint ids. + endpoint_details = None + if endpoint_type != 'PUBLIC' and vpc_id is not None: + endpoint_details = { + # "AddressAllocationIds": [], + # "SubnetIds": [], + # "VpcEndpointId": "", + "VpcId": vpc_id + } + + identity_provider_details = None + if identity_provider_url is not None and identity_provider_role is not None: + identity_provider_details = { + "InvocationRole": identity_provider_role, + "Url": identity_provider_url + } + + name_tag = {'Key': SERVER_NAME_KEY, 'Value': name} + assigned_tags = [name_tag] + + try: + sftp_server = find_sftp_server(client, name) + needs_creation = sftp_server is None + except EndpointConnectionError as e: + module.fail_json_aws(e, msg="Invalid endpoint provided: %s" % to_text(e)) + except (BotoCoreError, ClientError) as e: + module.fail_json_aws(e, msg="Failed to check Transfer presence") + if needs_creation: + result = create_sftp_server(client, endpoint_details, endpoint_type, host_key, + identity_provider_details, identity_provider_type, logging_role, name_tag) + sftp_server_id = result['ServerId'] + changed = True + else: + sftp_server_id = sftp_server['Server']['ServerId'] + if not purge_tags: + assigned_tags = sftp_server['Tags'] + # Update SFTP Server Details + # Update Tags + for key, value in tags.items(): + item = [ tag for tag in assigned_tags if x['Key'] == key ][0] + if item: + item['Value'] = value + else: + item = {'Key': key, 'Value': value} + assigned_tags.append(item) + update_args = build_server_kwargs(endpoint_details, endpoint_type, host_key, identity_provider_details, + identity_provider_type, logging_role, name, sftp_server_id, is_update=True) + + result = client.update_server(**update_args) + changed = True + + module.exit_json(changed=changed, name=name, **result) + + +def find_sftp_server(client, server_name): + # Finding a server by name is a little more complicated than I originally expected. Rather than wasting resources + # it's much easier to just go find it and then check if the return value of this method is None. + # Load all of the server IDs in the account + all_server_ids = [ server['ServerId'] for server in client.list_servers()['Servers']] + all_servers = [client.describe_server(ServerId=server_id) for server_id in all_server_ids] + host = [server for server in all_servers for tags in server['Server']['Tags'] if tags['Value']==server_name] + if host: + return host[0] + return None + + +@AWSRetry.exponential_backoff(max_delay=120) +def create_sftp_server(client, endpoint_details, endpoint_type, host_key, + identity_provider_details, identity_provider_type, logging_role, name): + """ + Does the work of actually creating the SFTP Server. + :arg client: boto3.session.Session the boto3 client that is used to create the connection + :arg endpoint_details: object The details that are provided to the endpoint - right now vpc_id is the only supported + information. + :arg endpoint_type: str The type of endpoint that the created SFTP Server connects to. AWS Supports PUBLIC, VPC and + VPC_ENDPOINT + :arg host_key: str This is the generated ssh key for the host, the result of ssh-keygen. Do not use this unless you + are transitioning from another SFTP Server and need to maintain backward compatibility. + :arg identity_provider_details: object The information for the provided entity type. + See https://docs.aws.amazon.com/transfer/latest/userguide/API_IdentityProviderDetails.html for more details. + :arg identity_provider_type: str Currently supports SERVICE_MANAGED or API_GATEWAY - if using API_GATEWAY, + identity_provider_details becomes required. SERVICE_MANAGED is the default, and allows AWS to manage the SFTP + server. + :arg logging_role: str A value that allows the service to write your SFTP users' activity to your Amazon CloudWatch + logs for monitoring and auditing purposes. + :arg name: dict The name of the SFTP server that also becomes the FQDN of it, in tag format. + :rtype: dict A Single Entry Dictionary that contains the Server ID. + """ + kwargDict = build_server_kwargs(endpoint_details, endpoint_type, host_key, identity_provider_details, + identity_provider_type, logging_role, name) + + response = client.create_server(**kwargDict) + # According to the documentation response should be an object containing a single string like this: + # { + # ServerId: 'string(19)' + # } + return response + + +def build_server_kwargs(endpoint_details, endpoint_type, host_key, identity_provider_details, identity_provider_type, + logging_role, name, server_id=None, is_update=False): + kwarg_dict = {} + if not is_update: + kwarg_dict['Tags'] = [name] + if endpoint_details is not None: + kwarg_dict['EndpointDetails'] = endpoint_details + if endpoint_type is not None: + kwarg_dict['EndpointType'] = endpoint_type + if host_key is not None: + kwarg_dict['HostKey'] = host_key + if identity_provider_details is not None: + kwarg_dict['IdentityProviderDetails'] = identity_provider_details + if identity_provider_type is not None and not is_update: + kwarg_dict['IdentityProviderType'] = identity_provider_type + if logging_role is not None: + kwarg_dict['LoggingRole'] = logging_role + if server_id is not None: + kwarg_dict['ServerId'] = server_id + return kwarg_dict + + +def add_sftp_users(client, module): + changed = False + user_name = module.params.get('user_name') + user_home_directory = module.params.get('user_home_directory') + user_home_directory_type = module.params.get('user_home_directory_type') + user_home_directory_mappings = module.params.get('user_home_directory_mappings') + user_policy = module.params.get('user_policy') + user_role = module.params.get('user_role') + user_ssh_public_key_body = module.params.get('user_ssh_public_key_body') + user_tags = module.params.get('user_tags') + name = module.params.get('name') + + result = add_user(client, user_name, user_home_directory, user_home_directory_type, user_home_directory_mappings, + user_policy, user_role, user_ssh_public_key_body, user_tags, name) + changed = True + module.exit_json(changed=changed, **result) + + +@AWSRetry.exponential_backoff(max_delay=120) +def add_user(client, user_name, user_home_directory, user_home_directory_type, + user_home_directory_mappings, user_policy, user_role, user_ssh_public_key_body, user_tags, name): + result = {} + sftp_server = find_sftp_server(client, name) + exists = False + if sftp_server is not None: + sftp_server_id = sftp_server['Server']['ServerId'] + users = client.list_users( + ServerId=sftp_server_id + ) + + if users is not None: + exists = [ user for user in users['Users'] if user['UserName'] == user_name ] + + add_user_kwargs = dict( + Role=user_role, + ServerId=sftp_server_id, + UserName=user_name + ) + + if user_home_directory is not None: + add_user_kwargs['HomeDirectory'] = user_home_directory + if user_home_directory_type is not None: + add_user_kwargs['HomeDirectoryType'] = user_home_directory_type + if user_home_directory_mappings is not None: + add_user_kwargs['HomeDirectoryMappings'] = user_home_directory_mappings + if user_policy is not None: + add_user_kwargs['Policy'] = user_policy + if user_ssh_public_key_body is not None: + add_user_kwargs['SshPublicKeyBody'] = user_ssh_public_key_body + if user_tags is not None: + add_user_kwargs['Tags'] = user_tags + + if not exists: + result = client.create_user(**add_user_kwargs) + else: + result = client.update_user(**add_user_kwargs) + + return result + + +@AWSRetry.exponential_backoff(max_delay=120) +def destroy_sftp_server(client, module): + name = module.params.get('name') + response = dict() + sftp_server = find_sftp_server(client, name) + changed = False + if sftp_server is not None: + sftp_server_id = sftp_server['Server']['ServerId'] + response = client.delete_server(ServerId=sftp_server_id) + changed = True + module.exit_json(changed=changed, name=name, **response) + + +@AWSRetry.exponential_backoff(max_delay=120) +def destroy_sftp_users(client, module): + changed = False + response = dict() + name = module.params.get('name') + user_name = module.params.get('user_name') + sftp_server_id = get_sftp_server_id(client, name) + response = client.delete_user(ServerId=sftp_server_id, UserName=user_name) + changed = True + + module.exit_json(changed=changed, name=name, **response) + + +def get_sftp_server_id(client, name): + sftp_server = find_sftp_server(client, name) + sftp_server_id = sftp_server['Server']['ServerId'] + return sftp_server_id + + +def main(): + argument_spec = ec2_argument_spec() + argument_spec.update( + dict( + name=dict(required=True), + state=dict(default='present', choices=['present', 'absent', 'add_user', 'remove_user']), + tags=dict(type='dict'), + purge_tags=dict(type='bool', default=True), + # Default to public because AWS does. This is probably not the best option. + endpoint_type=dict(default="PUBLIC", choices=['PUBLIC', 'VPC_ENDPOINT']), + vpc_id=dict(required=False), + host_key=dict(), + identity_provider_type=dict(default='SERVICE_MANAGED', choices=['SERVICE_MANAGED', 'API_GATEWAY']), + identity_provider_role=dict(), + identity_provider_url=dict(), + transfer_endpoint_url=dict(), + logging_role=dict(), + user_name=dict(type='str'), + user_home_directory=dict(type='str', default='/'), + user_home_directory_type=dict(type='str', choices=['PATH', 'LOGICAL']), + user_home_directory_mappings=dict(type='dict'), + user_policy=dict(type='str'), + user_role=dict(type='str'), + user_ssh_public_key_body=dict(type='str'), + user_tags=dict(type='list'), + ) + ) + + module = AnsibleAWSModule( + argument_spec=argument_spec, + ) + test_mo = module + region, ec2_url, aws_connect_kwargs = get_aws_connection_info(module, boto3=True) + + if region in ('us-east-1', '', None): + # default to US Standard region + location = 'us-east-1' + else: + # Boto uses symbolic names for locations but region strings will + # actually work fine for everything except us-east-1 (US Standard) + location = region + + # Get AWS connection information. + endpoint_url = module.params.get('transfer_endpoint_url') + aws_access_token = aws_connect_kwargs.get('aws_access_key_id') + aws_secret_key = aws_connect_kwargs.get('aws_secret_access_key') + aws_session_token = aws_connect_kwargs.get('security_token') + + state = module.params.get("state") + + transfer_client = module.client('transfer') + + if state == 'present': + create_or_update_sftp(transfer_client, module) + elif state == 'absent': + destroy_sftp_server(transfer_client, module) + elif state == 'add_user': + add_sftp_users(transfer_client, module) + elif state == 'remove_user': + destroy_sftp_users(transfer_client, module) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/plugins/modules/dynamodb_table.py b/plugins/modules/dynamodb_table.py index 35d9cd4d64a..9239ca4f1c9 100644 --- a/plugins/modules/dynamodb_table.py +++ b/plugins/modules/dynamodb_table.py @@ -1,8 +1,8 @@ #!/usr/bin/python # Copyright: Ansible Project # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) - from __future__ import absolute_import, division, print_function +import traceback __metaclass__ = type @@ -11,14 +11,14 @@ module: dynamodb_table version_added: 1.0.0 short_description: Create, update or delete AWS Dynamo DB tables +version_added: "2.0" description: - Create or delete AWS Dynamo DB tables. - Can update the provisioned throughput on existing tables. - Returns the status of the specified table. author: Alan Loi (@loia) requirements: - - "boto >= 2.37.0" - - "boto3 >= 1.4.4 (for tagging)" + - "boto3 >= 1.4.4" options: state: description: @@ -52,6 +52,12 @@ choices: ['STRING', 'NUMBER', 'BINARY'] default: 'STRING' type: str + billing_mode: + version_added: "2.10" + description: + - Controls how you are charged for read and write throughput and how you manage capacity. + choices: ['PROVISIONED', 'PAY_PER_REQUEST'] + type: str read_capacity: description: - Read throughput capacity (units) to provision. @@ -62,6 +68,12 @@ - Write throughput capacity (units) to provision. default: 1 type: int + point_in_time_recovery: + version_added: "2.10" + description: + - Disables or enables point in time recovery on the table. + default: False + type: bool indexes: description: - list of dictionaries describing indexes to add to the table. global indexes can be updated. local indexes don't support updates or have throughput. @@ -103,22 +115,59 @@ - Write throughput capacity (units) to provision for the index. type: int default: [] + version_added: "2.1" type: list elements: dict tags: + version_added: "2.4" description: - A hash/dictionary of tags to add to the new instance or for starting/stopping instance by tag. - 'For example: C({"key":"value"}) and C({"key":"value","key2":"value2"})' type: dict + purge_tags: + version_added: "2.10" + description: + - Disables or enables tag purging on the table. + default: False + type: bool wait_for_active_timeout: + version_added: "2.4" description: - how long before wait gives up, in seconds. only used when tags is set default: 60 type: int + sse_enabled: + version_added: "2.10" + type: bool + description: + - boolean for setting server-side encryption + stream_enabled: + version_added: "2.10" + type: bool + description: + - Indicates whether DynamoDB Streams is enabled (true) or disabled (false) on the table + stream_view_type: + version_added: "2.10" + type: str + choices: ['KEYS_ONLY', 'NEW_IMAGE', 'OLD_IMAGE', 'NEW_AND_OLD_IMAGES'] + description: + - when an item in the table is modified, stream_view_type determines what information is written to the stream for this table. + - "valid types: : ['KEYS_ONLY', 'NEW_IMAGE', 'OLD_IMAGE', 'NEW_AND_OLD_IMAGES']" + sse_type: + version_added: "2.10" + type: str + choices: ['AES256', 'KMS'] + description: + - server-side encryption type + sse_kms_master_key_id: + version_added: "2.10" + type: str + description: + - The KMS Master Key (CMK) which should be used for the KMS encryption. + - To specify a CMK, use its key ID, Amazon Resource Name (ARN), alias name, or alias ARN. extends_documentation_fragment: -- amazon.aws.aws -- amazon.aws.ec2 - + - aws + - ec2 ''' EXAMPLES = r''' @@ -172,137 +221,421 @@ sample: ACTIVE ''' -import time -import traceback try: - import boto - import boto.dynamodb2 - from boto.dynamodb2.table import Table - from boto.dynamodb2.fields import HashKey, RangeKey, AllIndex, GlobalAllIndex, GlobalIncludeIndex, GlobalKeysOnlyIndex, IncludeIndex, KeysOnlyIndex - from boto.dynamodb2.types import STRING, NUMBER, BINARY - from boto.exception import BotoServerError, NoAuthHandlerFound, JSONResponseError - from boto.dynamodb2.exceptions import ValidationException - DYNAMO_TYPE_MAP = { - 'STRING': STRING, - 'NUMBER': NUMBER, - 'BINARY': BINARY - } - # Boto 2 is mandatory, Boto3 is only needed for tagging - import botocore + from ansible_collections.amazon.aws.plugins.module_utils.ec2 import ( + AWSRetry, + ansible_dict_to_boto3_tag_list, + boto3_tag_list_to_ansible_dict, + compare_aws_tags, + ) + from ansible_collections.amazon.aws.plugins.module_utils.core import ( + AnsibleAWSModule, + is_boto3_error_code, + ) + from ansible.module_utils.common.dict_transformations import dict_merge + from botocore.exceptions import ( + ClientError, + NoCredentialsError, + ) except ImportError: - pass # Handled by ec2.HAS_BOTO and ec2.HAS_BOTO3 - -from ansible_collections.amazon.aws.plugins.module_utils.core import AnsibleAWSModule -from ansible_collections.amazon.aws.plugins.module_utils.ec2 import ansible_dict_to_boto3_tag_list -from ansible_collections.amazon.aws.plugins.module_utils.ec2 import AnsibleAWSError -from ansible_collections.amazon.aws.plugins.module_utils.ec2 import connect_to_aws -from ansible_collections.amazon.aws.plugins.module_utils.ec2 import get_aws_connection_info -from ansible_collections.amazon.aws.plugins.module_utils.ec2 import HAS_BOTO -from ansible_collections.amazon.aws.plugins.module_utils.ec2 import HAS_BOTO3 + pass # caught by AnsibleAWSModule DYNAMO_TYPE_DEFAULT = 'STRING' -INDEX_REQUIRED_OPTIONS = ['name', 'type', 'hash_key_name'] -INDEX_OPTIONS = INDEX_REQUIRED_OPTIONS + ['hash_key_type', 'range_key_name', 'range_key_type', 'includes', 'read_capacity', 'write_capacity'] -INDEX_TYPE_OPTIONS = ['all', 'global_all', 'global_include', 'global_keys_only', 'include', 'keys_only'] +INDEX_REQUIRED_OPTIONS = [ + 'name', + 'type', + 'hash_key_name' +] +INDEX_OPTIONS = INDEX_REQUIRED_OPTIONS + [ + 'hash_key_type', + 'range_key_name', + 'range_key_type', + 'includes', + 'read_capacity', + 'write_capacity' +] +INDEX_TYPE_OPTIONS = [ + 'all', + 'global_all', + 'global_include', + 'global_keys_only', + 'include', + 'keys_only' +] + + +@AWSRetry.exponential_backoff( + catch_extra_error_codes=['DynamoDBTableNotActive'] +) +def wait_for_table_active(table): + table.load() + + if table.table_status == 'ACTIVE': + return + + elif table.table_status in ('CREATING', 'UPDATING'): + raise ClientError( + { + 'Error': { + 'Code': 'DynamoDBTableNotActive', + 'Message': "Table '{0}' status is {1}. Expecting ACTIVE.".format( + table.table_name, + table.table_status + ) + } + }, + 'DynamoDBTableWaitForActive' + ) + + raise Exception( + "Error validating ACTIVE state of '{0}' DynamoDB table.".format( + table.table_name) + ) + + +@AWSRetry.exponential_backoff() +def get_table_tags_change( + client, + table, + tags, + purge_tags=False): + + table_tags = boto3_tag_list_to_ansible_dict( + client.list_tags_of_resource( + ResourceArn=table.table_arn + )['Tags'] + ) + + tags_add, tags_remove = compare_aws_tags( + table_tags, + tags, + purge_tags=purge_tags + ) + + result_tags = dict( + (k, v) + for k, v in dict_merge(tags, table_tags).items() + if k not in tags_remove + ) + + return { + 'changed': tags_add or tags_remove, + 'add': tags_add, + 'remove': tags_remove, + 'tags': result_tags + } -def create_or_update_dynamo_table(connection, module, boto3_dynamodb=None, boto3_sts=None, region=None): +@AWSRetry.exponential_backoff() +def update_table_tags( + client, + table, + tags, + purge_tags=False): + + tags_change = get_table_tags_change( + client, + table, + tags, + purge_tags + ) + + if not tags_change['changed']: + return tags_change + + if tags_change['remove']: + client.untag_resource( + ResourceArn=table.table_arn, + TagKeys=tags_change['remove'] + ) + + if tags_change['add']: + client.tag_resource( + ResourceArn=table.table_arn, + Tags=ansible_dict_to_boto3_tag_list(tags_change['add']) + ) + + return tags_change + + +@AWSRetry.exponential_backoff() +def has_continuous_backup_changed( + client, + table, + point_in_time_recovery): + + table_continuous_backup = client.describe_continuous_backups( + TableName=table.table_name + )['ContinuousBackupsDescription'] + + current_point_in_time_recovery = ( + table_continuous_backup['PointInTimeRecoveryDescription']['PointInTimeRecoveryStatus'] == 'ENABLED' + ) + + return point_in_time_recovery != current_point_in_time_recovery + + +@AWSRetry.exponential_backoff( + catch_extra_error_codes=['ContinuousBackupsUnavailableException'] +) +def update_table_continuous_backups( + client, + table, + is_point_in_time_recovery): + + if not has_continuous_backup_changed(client, table, is_point_in_time_recovery): + return + + client.update_continuous_backups( + TableName=table.table_name, + PointInTimeRecoverySpecification={ + 'PointInTimeRecoveryEnabled': is_point_in_time_recovery + } + ) + + +def create_or_update_dynamo_table(resource, module): table_name = module.params.get('name') hash_key_name = module.params.get('hash_key_name') hash_key_type = module.params.get('hash_key_type') range_key_name = module.params.get('range_key_name') range_key_type = module.params.get('range_key_type') + billing_mode = module.params.get('billing_mode') read_capacity = module.params.get('read_capacity') write_capacity = module.params.get('write_capacity') all_indexes = module.params.get('indexes') tags = module.params.get('tags') + purge_tags = module.params.get('purge_tags') wait_for_active_timeout = module.params.get('wait_for_active_timeout') + # stream specification + stream_enabled = module.params.get('stream_enabled') + stream_view_type = module.params.get('stream_view_type') + + # sse specification + sse_enabled = module.params.get('sse_enabled') + sse_type = module.params.get('sse_type') + sse_kms_master_key_id = module.params.get('sse_kms_master_key_id') + + point_in_time_recovery = module.params.get('point_in_time_recovery') + + key_type_mapping = {'STRING': 'S', 'BINARY': 'B', 'NUMBER': 'N'} + for index in all_indexes: validate_index(index, module) - schema = get_schema_param(hash_key_name, hash_key_type, range_key_name, range_key_type) - throughput = { 'read': read_capacity, 'write': write_capacity } + stream_specification = None + sse_specification = None + if stream_enabled: + stream_specification = { + 'StreamEnabled': stream_enabled, + 'StreamViewType': stream_view_type + } + if sse_enabled: + sse_specification = { + 'Enabled': sse_enabled, + 'SSEType': sse_type + } + if sse_kms_master_key_id and sse_kms_master_key_id != '': + sse_specification.update({'KMSMasterKeyId': sse_kms_master_key_id}) - indexes, global_indexes = get_indexes(all_indexes) + ( + local_secondary_indexes, + global_secondary_indexes, + attribute_definitions + ) = serialize_indexes(all_indexes, billing_mode) result = dict( - region=region, table_name=table_name, + billing_mode=billing_mode, + point_in_time_recovery=point_in_time_recovery, hash_key_name=hash_key_name, hash_key_type=hash_key_type, range_key_name=range_key_name, range_key_type=range_key_type, - read_capacity=read_capacity, - write_capacity=write_capacity, indexes=all_indexes, ) try: - table = Table(table_name, connection=connection) + client = module.client('dynamodb') + table = resource.Table(table_name) - if dynamo_table_exists(table): - result['changed'] = update_dynamo_table(table, throughput=throughput, check_mode=module.check_mode, global_indexes=global_indexes) - else: - if not module.check_mode: - Table.create(table_name, connection=connection, schema=schema, throughput=throughput, indexes=indexes, global_indexes=global_indexes) + try: + table_status = table.table_status + except is_boto3_error_code('ResourceNotFoundException'): + table_status = 'TABLE_NOT_FOUND' + + if table_status in ('ACTIVE', 'CREATING', 'UPDATING'): + # The table exists and might need to be updated. + wait_for_table_active(table) + + result.update( + update_dynamo_table( + module, + client, + table, + billing_mode=billing_mode, + throughput=throughput, + stream_spec=stream_specification, + sse_spec=sse_specification, + point_in_time_recovery=point_in_time_recovery, + check_mode=module.check_mode, + global_indexes=global_secondary_indexes, + global_attr_definitions=attribute_definitions + ) + ) + + if tags: + tags_change = ( + get_table_tags_change( + client, + table, + tags, + purge_tags + ) + if module.check_mode + else update_table_tags( + client, + table, + tags, + purge_tags + ) + ) + + if tags_change['changed'] and not result['changed']: + result['changed'] = True + + result['tags'] = tags_change['tags'] + + elif not module.check_mode: + # The table doesn't exist and needs to be created. result['changed'] = True - if not module.check_mode: - result['table_status'] = table.describe()['Table']['TableStatus'] - - if tags: - # only tables which are active can be tagged - wait_until_table_active(module, table, wait_for_active_timeout) - account_id = get_account_id(boto3_sts) - boto3_dynamodb.tag_resource( - ResourceArn='arn:aws:dynamodb:' + - region + - ':' + - account_id + - ':table/' + - table_name, - Tags=ansible_dict_to_boto3_tag_list(tags)) + kwargs = {} + key_schema = [] + + if range_key_name: + key_schema.append( + {'AttributeName': hash_key_name, 'KeyType': 'HASH'}) + key_schema.append( + {'AttributeName': range_key_name, 'KeyType': 'RANGE'}) + attribute_definitions.append( + { + 'AttributeName': hash_key_name, + 'AttributeType': key_type_mapping[hash_key_type.upper()] + } + ) + attribute_definitions.append( + { + 'AttributeName': range_key_name, + 'AttributeType': key_type_mapping[range_key_type.upper()] + } + ) + else: + key_schema.append( + {'AttributeName': hash_key_name, 'KeyType': 'HASH'}) + attribute_definitions.append( + { + 'AttributeName': hash_key_name, + 'AttributeType': key_type_mapping[hash_key_type.upper()] + } + ) + + kwargs.update( + { + 'AttributeDefinitions': remove_duplicates(attribute_definitions), + 'TableName': table_name, + 'KeySchema': key_schema + } + ) + + if billing_mode == 'PAY_PER_REQUEST': + kwargs.update({'BillingMode': 'PAY_PER_REQUEST'}) + result['billing_mode'] = 'PAY_PER_REQUEST' + else: + kwargs.update( + { + 'ProvisionedThroughput': { + 'ReadCapacityUnits': read_capacity, + 'WriteCapacityUnits': write_capacity + } + } + ) + result['billing_mode'] = 'PROVISIONED' + + if local_secondary_indexes: + kwargs.update( + {'LocalSecondaryIndexes': local_secondary_indexes}) + if global_secondary_indexes: + kwargs.update( + {'GlobalSecondaryIndexes': global_secondary_indexes}) + if stream_specification: + kwargs.update({'StreamSpecification': stream_specification}) + if sse_specification: + kwargs.update({'SSESpecification': sse_specification}) + + resource.create_table(**kwargs) + + if point_in_time_recovery: + wait_for_table_active(table) + update_table_continuous_backups( + client, + table, + point_in_time_recovery + ) + + if tags: + wait_for_table_active(table) + result['tags'] = update_table_tags( + client, + table, + tags, + purge_tags + )['tags'] + + result['table_status'] = table.table_status + result['global_secondary_indexes'] = table.global_secondary_indexes + result['local_secondary_indexes'] = table.local_secondary_indexes + + else: + # The table doesn't exist and creation skipped due to check_mode. + result['changed'] = True result['tags'] = tags - except BotoServerError: - result['msg'] = 'Failed to create/update dynamo table due to error: ' + traceback.format_exc() - module.fail_json(**result) + except NoCredentialsError as e: + module.fail_json_aws( + e, 'Unable to locate credentials: ' + traceback.format_exc()) + except ClientError as e: + module.fail_json_aws(e, 'Client Error: ' + traceback.format_exc()) + except Exception as e: + module.fail_json_aws( + e, 'Ansible dynamodb operation failed: ' + traceback.format_exc()) else: module.exit_json(**result) -def get_account_id(boto3_sts): - return boto3_sts.get_caller_identity()["Account"] - - -def wait_until_table_active(module, table, wait_timeout): - max_wait_time = time.time() + wait_timeout - while (max_wait_time > time.time()) and (table.describe()['Table']['TableStatus'] != 'ACTIVE'): - time.sleep(5) - if max_wait_time <= time.time(): - # waiting took too long - module.fail_json(msg="timed out waiting for table to exist") - - -def delete_dynamo_table(connection, module): +def delete_dynamo_table(resource, module): table_name = module.params.get('name') result = dict( - region=module.params.get('region'), table_name=table_name, ) try: - table = Table(table_name, connection=connection) + table = resource.Table(table_name) + table.wait_until_exists() + try: + table_status = table.table_status + except is_boto3_error_code('ResourceNotFoundException'): + table_status = 'TABLE_NOT_FOUND' - if dynamo_table_exists(table): + if table_status == 'ACTIVE': if not module.check_mode: table.delete() result['changed'] = True @@ -310,104 +643,330 @@ def delete_dynamo_table(connection, module): else: result['changed'] = False - except BotoServerError: - result['msg'] = 'Failed to delete dynamo table due to error: ' + traceback.format_exc() - module.fail_json(**result) + except ClientError as e: + module.fail_json_aws( + e, 'Failed to delete dynamo table due to error: ' + traceback.format_exc()) else: module.exit_json(**result) -def dynamo_table_exists(table): - try: - table.describe() - return True +def update_dynamodb_table_args( + table_name, + billing_mode=None, + prov_throughput=None, + stream_spec=None, + sse_spec=None, + global_indexes=None, + global_attr_definitions=None): + kwargs = {} + + if billing_mode is not None: + kwargs.update({'BillingMode': billing_mode}) + + if prov_throughput is not None: + kwargs.update({'ProvisionedThroughput': prov_throughput}) + + if global_indexes is not None: + kwargs.update( + { + 'AttributeDefinitions': global_attr_definitions, + 'GlobalSecondaryIndexUpdates': global_indexes + } + ) + + if stream_spec is not None: + kwargs.update({'StreamSpecification': stream_spec}) + + if isinstance(sse_spec, dict): + # if kms master key id is empty pop it off + if 'KMSMasterKeyId' in sse_spec and sse_spec['KMSMasterKeyId'] == '': + sse_spec.pop('KMSMasterKeyId', None) + + kwargs.update({'SSESpecification': sse_spec}) + + if kwargs: + kwargs.update({'TableName': table_name}) + + return kwargs + + +def update_dynamo_table( + module, + client, + table, + billing_mode=None, + throughput=None, + stream_spec=None, + sse_spec=None, + point_in_time_recovery=None, + check_mode=False, + global_indexes=None, + global_attr_definitions=None): + + if global_indexes is None: + global_indexes = [] + + change_state = { + 'changed': False + } + table_name = table.table_name + + # NOTE: BillingModeSummary is NOT returned strangely if you are using + # provisioned but IS returned if you are using PAY_PER_REQUEST + # (https://github.com/boto/boto3/issues/1875) + current_billing_mode = ( + table.billing_mode_summary['BillingMode'] + if table.billing_mode_summary + else 'PROVISIONED' + ) - except JSONResponseError as e: - if e.message and e.message.startswith('Requested resource not found'): - return False - else: - raise e + set_billing_mode = ( + billing_mode + if current_billing_mode != billing_mode + else None + ) + # Ignore ProvisionedThroughput if desired BillingMode is 'PAY_PER_REQUEST'. + if ((set_billing_mode is None and current_billing_mode == 'PROVISIONED') or set_billing_mode == 'PROVISIONED') and \ + has_throughput_changed(table, throughput): + set_prov_throughput = { + 'ReadCapacityUnits': throughput['read'], + 'WriteCapacityUnits': throughput['write'] + } + else: + set_prov_throughput = None -def update_dynamo_table(table, throughput=None, check_mode=False, global_indexes=None): - table.describe() # populate table details - throughput_changed = False - global_indexes_changed = False - if has_throughput_changed(table, throughput): - if not check_mode: - throughput_changed = table.update(throughput=throughput) - else: - throughput_changed = True + set_global_indexes = get_changed_global_indexes( + table.global_secondary_indexes, + global_indexes + ) - removed_indexes, added_indexes, index_throughput_changes = get_changed_global_indexes(table, global_indexes) - if removed_indexes: - if not check_mode: - for name, index in removed_indexes.items(): - global_indexes_changed = table.delete_global_secondary_index(name) or global_indexes_changed - else: - global_indexes_changed = True + set_stream_spec = ( + has_stream_spec_changed(table, stream_spec) + if stream_spec + else None + ) - if added_indexes: - if not check_mode: - for name, index in added_indexes.items(): - global_indexes_changed = table.create_global_secondary_index(global_index=index) or global_indexes_changed - else: - global_indexes_changed = True - - if index_throughput_changes: - if not check_mode: - # todo: remove try once boto has https://github.com/boto/boto/pull/3447 fixed - try: - global_indexes_changed = table.update_global_secondary_index(global_indexes=index_throughput_changes) or global_indexes_changed - except ValidationException: - pass - else: - global_indexes_changed = True + set_sse_spec = ( + has_sse_spec_changed(table, sse_spec) + if sse_spec + else None + ) + + set_continuous_backup = has_continuous_backup_changed( + client, + table, + point_in_time_recovery + ) + + kwargs = update_dynamodb_table_args( + table_name, + billing_mode=set_billing_mode, + prov_throughput=set_prov_throughput, + stream_spec=set_stream_spec, + sse_spec=set_sse_spec, + global_indexes=set_global_indexes, + global_attr_definitions=global_attr_definitions + ) + + # Prepare state change response + if set_billing_mode is not None: + # billing_mode is always displayed, therefore only 'changed' is updated. + change_state['changed'] = True + change_state['billing_mode'] = set_billing_mode + else: + change_state['billing_mode'] = current_billing_mode + + if set_prov_throughput is not None: + change_state.update( + { + 'changed': True, + 'read_capacity': set_prov_throughput['ReadCapacityUnits'], + 'write_capacity': set_prov_throughput['WriteCapacityUnits'] + } + ) + elif billing_mode == 'PROVISIONED': + # Display read/write capacity even if not changed when appropriate. + change_state.update( + { + 'read_capacity': throughput['read'], + 'write_capacity': throughput['write'] + } + ) + + if set_global_indexes is not None: + change_state.update( + { + 'changed': True, + 'global_indexes_updates': set_global_indexes + } + ) + + if set_continuous_backup: + # point_in_time_recovery is always displayed, therefore only 'changed' is updated. + change_state['changed'] = True + + if check_mode: + return change_state + + if kwargs: + client.update_table(**kwargs) + + if set_continuous_backup: + update_table_continuous_backups(client, table, point_in_time_recovery) + + return change_state + + +def has_sse_spec_changed(table, new_sse_spec): + if not new_sse_spec: + return False + + return ( + table.sse_description is None + or new_sse_spec['Enabled'] != table.sse_description['Status'] + or new_sse_spec['SSEType'] != table.sse_description['SSEType'] + or ( + new_sse_spec['KMSMasterKeyId'] != '' + and new_sse_spec['KMSMasterKeyId'] != table.sse_description['KMSMasterKeyId'] + ) + ) + + +def has_stream_spec_changed(table, new_stream_spec): + if not new_stream_spec: + return False - return throughput_changed or global_indexes_changed + return ( + table.stream_specification is None + or new_stream_spec['StreamEnabled'] != table.stream_specification['StreamEnabled'] + or new_stream_spec['StreamViewType'] != table.stream_specification['StreamViewType'] + ) def has_throughput_changed(table, new_throughput): if not new_throughput: return False - return new_throughput['read'] != table.throughput['read'] or \ - new_throughput['write'] != table.throughput['write'] + return ( + new_throughput['read'] != table.provisioned_throughput['ReadCapacityUnits'] + or new_throughput['write'] != table.provisioned_throughput['WriteCapacityUnits'] + ) + + +def remove_duplicates(attr_definitions): + seen = set() + new_l = [] + for d in attr_definitions: + t = tuple(d.items()) + if t not in seen: + seen.add(t) + new_l.append(d) + return new_l -def get_schema_param(hash_key_name, hash_key_type, range_key_name, range_key_type): + +def get_schema_param( + hash_key_name, + hash_key_type, + range_key_name, + range_key_type): if range_key_name: schema = [ - HashKey(hash_key_name, DYNAMO_TYPE_MAP.get(hash_key_type, DYNAMO_TYPE_MAP[DYNAMO_TYPE_DEFAULT])), - RangeKey(range_key_name, DYNAMO_TYPE_MAP.get(range_key_type, DYNAMO_TYPE_MAP[DYNAMO_TYPE_DEFAULT])) + {'AttributeName': hash_key_name, 'KeyType': 'HASH'}, + {'AttributeName': range_key_name, 'KeyType': 'RANGE'} ] else: schema = [ - HashKey(hash_key_name, DYNAMO_TYPE_MAP.get(hash_key_type, DYNAMO_TYPE_MAP[DYNAMO_TYPE_DEFAULT])) + {'AttributeName': hash_key_name, 'KeyType': 'HASH'} ] - return schema + return schema -def get_changed_global_indexes(table, global_indexes): - table.describe() - table_index_info = dict((index.name, index.schema()) for index in table.global_indexes) - table_index_objects = dict((index.name, index) for index in table.global_indexes) - set_index_info = dict((index.name, index.schema()) for index in global_indexes) - set_index_objects = dict((index.name, index) for index in global_indexes) +def deserialize_index_names(indexes, key='IndexName'): + return set( + index[key] + for index in indexes or [] + if key in index + ) - removed_indexes = dict((name, index) for name, index in table_index_info.items() if name not in set_index_info) - added_indexes = dict((name, set_index_objects[name]) for name, index in set_index_info.items() if name not in table_index_info) - # todo: uncomment once boto has https://github.com/boto/boto/pull/3447 fixed - # for name, index in set_index_objects.items(): - # if (name not in added_indexes and - # (index.throughput['read'] != str(table_index_objects[name].throughput['read']) or - # index.throughput['write'] != str(table_index_objects[name].throughput['write']))): - # index_throughput_changes[name] = index.throughput - # todo: remove once boto has https://github.com/boto/boto/pull/3447 fixed - index_throughput_changes = dict((name, index.throughput) for name, index in set_index_objects.items() if name not in added_indexes) - return removed_indexes, added_indexes, index_throughput_changes +def filter_index_by_name(key, indexes): + for index in indexes: + if index['IndexName'] == key: + return index + + return None + + +def get_changed_global_indexes(table_gsi_indexes, global_indexes): + # check if this is a new index to be created + if not table_gsi_indexes and not global_indexes: + return None + elif not table_gsi_indexes: + table_gsi_indexes = [] + elif not global_indexes: + global_indexes = [] + + global_indexes_updates = [] + input_gsi_index_names_set = deserialize_index_names(global_indexes) + from_aws_gsi_index_names_set = deserialize_index_names(table_gsi_indexes) + + # Find indexes to be deleted and created + # An index is deleted when index name exists in the from_aws_gsi_index_names_set and not in the input_gsi_index_names_set + # An index is created when index name exists in input_gsi_index_names_set but not in from_aws_gsi_index_names_set + indexes_to_be_deleted = from_aws_gsi_index_names_set - input_gsi_index_names_set + + for index_name in indexes_to_be_deleted: + global_indexes_updates.append({'Delete': {'IndexName': index_name}}) + + indexes_to_be_created = input_gsi_index_names_set - from_aws_gsi_index_names_set + + for index_name in indexes_to_be_created: + index = filter_index_by_name(index_name, global_indexes) + global_indexes_updates.append({'Create': index}) + + # Find indexes that needs to be updated + # only provisioned throughput can be updated on an existing gsi index + indexes_to_be_updated = input_gsi_index_names_set & from_aws_gsi_index_names_set + + input_gsi_indexes_to_be_updated = [ + index + for index in global_indexes + if index['IndexName'] in indexes_to_be_updated + ] + from_aws_gsi_indexes_to_be_updated = [ + index + for index in table_gsi_indexes + if index['IndexName'] in indexes_to_be_updated + ] + + for index_name in indexes_to_be_updated: + input_gsi_index = filter_index_by_name(index_name, global_indexes) + from_aws_gsi_index = filter_index_by_name( + index_name, table_gsi_indexes) + if input_gsi_index and 'ProvisionedThroughput' in input_gsi_index: + input_gsi_index_prov_throughput = input_gsi_index.get( + 'ProvisionedThroughput') + from_aws_gsi_index_prov_throughput = from_aws_gsi_index.get( + 'ProvisionedThroughput') + if (input_gsi_index_prov_throughput.get('ReadCapacityUnits') != from_aws_gsi_index_prov_throughput.get('ReadCapacityUnits') + or input_gsi_index_prov_throughput.get('WriteCapacityUnits') != from_aws_gsi_index_prov_throughput.get('WriteCapacityUnits')): + global_indexes_updates.append( + { + 'Update': { + 'IndexName': index_name, + 'ProvisionedThroughput': input_gsi_index_prov_throughput + } + } + ) + + return ( + global_indexes_updates + if global_indexes_updates + else None + ) def validate_index(index, module): @@ -416,41 +975,104 @@ def validate_index(index, module): module.fail_json(msg='%s is not a valid option for an index' % key) for required_option in INDEX_REQUIRED_OPTIONS: if required_option not in index: - module.fail_json(msg='%s is a required option for an index' % required_option) + module.fail_json( + msg='%s is a required option for an index' % required_option) if index['type'] not in INDEX_TYPE_OPTIONS: - module.fail_json(msg='%s is not a valid index type, must be one of %s' % (index['type'], INDEX_TYPE_OPTIONS)) + module.fail_json(msg='%s is not a valid index type, must be one of %s' % ( + index['type'], INDEX_TYPE_OPTIONS)) + + +def serialize_index_to_json(index, billing_mode): + serialized_index = {} + serialized_index_attribute_definitions = [] + name = index['name'] + index_type = index.get('type') + hash_key_name = index.get('hash_key_name') + hash_key_type = index.get('hash_key_type', 'STRING') + range_key_name = index.get('range_key_name') + range_key_type = index.get('range_key_type', 'STRING') + schema = get_schema_param( + hash_key_name, + hash_key_type, + range_key_name, + range_key_type + ) + projection_type = index_type.replace('global_', '') + projection = {'ProjectionType': projection_type.upper()} + index_throughput = { + 'ReadCapacityUnits': index.get('read_capacity', 1), + 'WriteCapacityUnits': index.get('write_capacity', 1) + } + key_type_mapping = {'STRING': 'S', 'BINARY': 'B', 'NUMBER': 'N'} + if projection_type == 'include': + projection.update({'NonKeyAttributes': index['includes']}) -def get_indexes(all_indexes): - indexes = [] - global_indexes = [] - for index in all_indexes: - name = index['name'] - schema = get_schema_param(index.get('hash_key_name'), index.get('hash_key_type'), index.get('range_key_name'), index.get('range_key_type')) - throughput = { - 'read': index.get('read_capacity', 1), - 'write': index.get('write_capacity', 1) + serialized_index.update( + { + 'IndexName': name, + 'KeySchema': schema, + 'Projection': projection } + ) - if index['type'] == 'all': - indexes.append(AllIndex(name, parts=schema)) - - elif index['type'] == 'global_all': - global_indexes.append(GlobalAllIndex(name, parts=schema, throughput=throughput)) - - elif index['type'] == 'global_include': - global_indexes.append(GlobalIncludeIndex(name, parts=schema, throughput=throughput, includes=index['includes'])) - - elif index['type'] == 'global_keys_only': - global_indexes.append(GlobalKeysOnlyIndex(name, parts=schema, throughput=throughput)) + if billing_mode != "PAY_PER_REQUEST" and index_type in ['global_all', 'global_include', 'global_keys_only']: + serialized_index.update({'ProvisionedThroughput': index_throughput}) - elif index['type'] == 'include': - indexes.append(IncludeIndex(name, parts=schema, includes=index['includes'])) + if range_key_name: + serialized_index_attribute_definitions.append( + { + 'AttributeName': hash_key_name, + 'AttributeType': key_type_mapping[hash_key_type.upper()] + } + ) + serialized_index_attribute_definitions.append( + { + 'AttributeName': range_key_name, + 'AttributeType': key_type_mapping[range_key_type.upper()] + } + ) + else: + serialized_index_attribute_definitions.append( + { + 'AttributeName': hash_key_name, + 'AttributeType': key_type_mapping[hash_key_type.upper()] + } + ) + + return ( + index_type, + serialized_index, + serialized_index_attribute_definitions + ) - elif index['type'] == 'keys_only': - indexes.append(KeysOnlyIndex(name, parts=schema)) - return indexes, global_indexes +def serialize_indexes(all_indexes, billing_mode): + local_secondary_indexes = [] + global_secondary_indexes = [] + indexes_attr_definitions = [] + for index in all_indexes: + ( + index_type, + serialized_index_to_json, + serialized_index_attribute_definitions + ) = serialize_index_to_json(index, billing_mode) + + for index_attr_definition in serialized_index_attribute_definitions: + indexes_attr_definitions.append(index_attr_definition) + + if index_type in ['all', 'include', 'keys_only']: + # local secondary all_indexes + local_secondary_indexes.append(serialized_index_to_json) + elif index_type in ['global_all', 'global_include', 'global_keys_only']: + # global secondary indexes + global_secondary_indexes.append(serialized_index_to_json) + + return ( + local_secondary_indexes, + global_secondary_indexes, + remove_duplicates(indexes_attr_definitions) + ) def main(): @@ -458,55 +1080,47 @@ def main(): state=dict(default='present', choices=['present', 'absent']), name=dict(required=True, type='str'), hash_key_name=dict(type='str'), - hash_key_type=dict(default='STRING', type='str', choices=['STRING', 'NUMBER', 'BINARY']), + hash_key_type=dict(default='STRING', type='str', choices=[ + 'STRING', 'NUMBER', 'BINARY']), range_key_name=dict(type='str'), - range_key_type=dict(default='STRING', type='str', choices=['STRING', 'NUMBER', 'BINARY']), + range_key_type=dict(default='STRING', type='str', choices=[ + 'STRING', 'NUMBER', 'BINARY']), + billing_mode=dict(default=None, type='str', choices=[ + 'PROVISIONED', 'PAY_PER_REQUEST']), read_capacity=dict(default=1, type='int'), write_capacity=dict(default=1, type='int'), indexes=dict(default=[], type='list', elements='dict'), tags=dict(type='dict'), + purge_tags=dict(default=False, type='bool'), wait_for_active_timeout=dict(default=60, type='int'), + stream_enabled=dict(type='bool'), + stream_view_type=dict(type='str', choices=[ + 'NEW_IMAGE', 'OLD_IMAGE', 'NEW_AND_OLD_IMAGES', 'KEYS_ONLY']), + sse_enabled=dict(type='bool'), + sse_type=dict(type='str', choices=['AES256', 'KMS']), + sse_kms_master_key_id=dict(type='str'), + point_in_time_recovery=dict(default=False, type='bool') ) module = AnsibleAWSModule( argument_spec=argument_spec, supports_check_mode=True, check_boto3=False, + required_if=[ + ['state', 'present', ['name', 'hash_key_name']], + ['sse_type', 'KMS', ['sse_type']], + ['sse_enabled', 'True', ['sse_type']], + ['stream_enabled', 'True', ['stream_view_type']] + ] ) - - if not HAS_BOTO: - module.fail_json(msg='boto required for this module') - - if not HAS_BOTO3 and module.params.get('tags'): - module.fail_json(msg='boto3 required when using tags for this module') - - region, ec2_url, aws_connect_params = get_aws_connection_info(module) - if not region: - module.fail_json(msg='region must be specified') - - try: - connection = connect_to_aws(boto.dynamodb2, region, **aws_connect_params) - except (NoAuthHandlerFound, AnsibleAWSError) as e: - module.fail_json(msg=str(e)) - - if module.params.get('tags'): - try: - boto3_dynamodb = module.client('dynamodb') - if not hasattr(boto3_dynamodb, 'tag_resource'): - module.fail_json(msg='boto3 connection does not have tag_resource(), likely due to using an old version') - boto3_sts = module.client('sts') - except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: - module.fail_json_aws(e, msg='Failed to connect to AWS') - else: - boto3_dynamodb = None - boto3_sts = None - + resource = module.resource('dynamodb') state = module.params.get('state') + if state == 'present': - create_or_update_dynamo_table(connection, module, boto3_dynamodb, boto3_sts, region) + create_or_update_dynamo_table(resource, module) elif state == 'absent': - delete_dynamo_table(connection, module) + delete_dynamo_table(resource, module) if __name__ == '__main__': - main() + main() \ No newline at end of file diff --git a/plugins/modules/ec2_launch_template.py b/plugins/modules/ec2_launch_template.py index 4a35812cfb4..bae41381988 100644 --- a/plugins/modules/ec2_launch_template.py +++ b/plugins/modules/ec2_launch_template.py @@ -481,11 +481,22 @@ def delete_template(module): return {'changed': False} +def remove_none(obj): + if isinstance(obj, (list, tuple, set)): + return type(obj)(remove_none(x) for x in obj if x is not None) + elif isinstance(obj, dict): + return type(obj)((remove_none(k), remove_none(v)) + for k, v in obj.items() if k is not None and v is not None) + else: + return obj + + def create_or_update(module, template_options): ec2 = module.client('ec2', retry_decorator=AWSRetry.jittered_backoff(catch_extra_error_codes=['InvalidLaunchTemplateId.NotFound'])) template, template_versions = existing_templates(module) out = {} lt_data = params_to_launch_data(module, dict((k, v) for k, v in module.params.items() if k in template_options)) + lt_data = remove_none(lt_data) if not (template or template_versions): # create a full new one try: @@ -705,4 +716,4 @@ def main(): if __name__ == '__main__': - main() + main() \ No newline at end of file diff --git a/plugins/modules/lambda.py b/plugins/modules/lambda.py index 9cb2e0286cc..740d98d5470 100644 --- a/plugins/modules/lambda.py +++ b/plugins/modules/lambda.py @@ -46,7 +46,7 @@ zip_file: description: - A .zip file containing your deployment package - - If I(state=present) then either I(zip_file) or I(s3_bucket) must be present. + - If I(state=present) then either I(zip_file), I(s3_bucket) or I(image_uri) must be present. aliases: [ 'src' ] type: str s3_bucket: @@ -64,6 +64,10 @@ description: - The Amazon S3 object (the deployment package) version you want to upload. type: str + image_uri : + description: + - URI of a container image in the Amazon ECR registry. + type: str description: description: - A short, user-defined function description. Lambda does not use this value. Assign a meaningful description as you see fit. @@ -108,6 +112,10 @@ description: - tag dict to apply to the function (requires botocore 1.5.40 or above). type: dict + image_config: + description: + - A dict of configuration values that override the container image Dockerfile, see U(https://docs.aws.amazon.com/lambda/latest/dg/configuration-images.html) + type: dict author: - 'Steyn Huizinga (@steynovich)' extends_documentation_fragment: @@ -224,7 +232,7 @@ import re try: - from botocore.exceptions import ClientError, BotoCoreError + from botocore.exceptions import ClientError, BotoCoreError, WaiterError except ImportError: pass # protected by AnsibleAWSModule @@ -319,6 +327,16 @@ def set_tag(client, module, tags, function): return changed +def wait_for_lambda(client, module, name): + try: + client_active_waiter = client.get_waiter('function_active') + client_updated_waiter = client.get_waiter('function_updated') + client_active_waiter.wait(FunctionName=name) + client_updated_waiter.wait(FunctionName=name) + except WaiterError as e: + module.fail_json_aws(e, msg='Timeout while waiting on lambda to finish updating') + except (ClientError, BotoCoreError) as e: + module.fail_json_aws(e, msg='Failed while waiting on lambda to finish updating') def main(): argument_spec = dict( @@ -331,6 +349,7 @@ def main(): s3_bucket=dict(), s3_key=dict(), s3_object_version=dict(), + image_uri=dict(), description=dict(default=''), timeout=dict(type='int', default=3), memory_size=dict(type='int', default=128), @@ -340,16 +359,21 @@ def main(): dead_letter_arn=dict(), tracing_mode=dict(choices=['Active', 'PassThrough']), tags=dict(type='dict'), + image_config=dict(type='dict'), ) mutually_exclusive = [['zip_file', 's3_key'], ['zip_file', 's3_bucket'], - ['zip_file', 's3_object_version']] + ['zip_file', 's3_object_version'], + ['zip_file', 'image_uri'], + ['image_uri', 's3_key'], + ['image_uri', 's3_bucket'], + ['image_uri', 's3_object_version']] required_together = [['s3_key', 's3_bucket'], ['vpc_subnet_ids', 'vpc_security_group_ids']] - required_if = [['state', 'present', ['runtime', 'handler', 'role']]] + required_if = [['state', 'present', [['runtime', 'handler', 'role'],['image_uri','role']]]] module = AnsibleAWSModule(argument_spec=argument_spec, supports_check_mode=True, @@ -366,6 +390,7 @@ def main(): s3_key = module.params.get('s3_key') s3_object_version = module.params.get('s3_object_version') zip_file = module.params.get('zip_file') + image_uri = module.params.get('image_uri') description = module.params.get('description') timeout = module.params.get('timeout') memory_size = module.params.get('memory_size') @@ -375,6 +400,7 @@ def main(): dead_letter_arn = module.params.get('dead_letter_arn') tracing_mode = module.params.get('tracing_mode') tags = module.params.get('tags') + image_config = module.params.get('image_config') check_mode = module.check_mode changed = False @@ -454,9 +480,18 @@ def main(): # No VPC configuration is desired, assure VPC config is empty when present in current config if 'VpcConfig' in current_config and current_config['VpcConfig'].get('VpcId'): func_kwargs.update({'VpcConfig': {'SubnetIds': [], 'SecurityGroupIds': []}}) + # Update image configuration + if image_config: + if 'ImageConfigResponse' in current_config: + if image_config != current_config['ImageConfigResponse']['ImageConfig']: + func_kwargs.update({'ImageConfig': image_config}) + elif 'ImageConfigResponse' in current_config: + func_kwargs.update({'ImageConfig': {}}) # Upload new configuration if configuration has changed if len(func_kwargs) > 1: + if not check_mode: + wait_for_lambda(client, module, name) try: if not check_mode: response = client.update_function_configuration(aws_retry=True, **func_kwargs) @@ -467,7 +502,7 @@ def main(): # Update code configuration code_kwargs = {'FunctionName': name, 'Publish': True} - + #module.fail_json(msg=current_function) # Update S3 location if s3_bucket and s3_key: # If function is stored on S3 always update @@ -490,6 +525,9 @@ def main(): code_kwargs.update({'ZipFile': encoded_zip}) except IOError as e: module.fail_json(msg=str(e), exception=traceback.format_exc()) + # Update Container Image Uri + elif image_uri and image_uri != current_function['Code']['ImageUri']: + code_kwargs.update({'ImageUri': image_uri}) # Tag Function if tags is not None: @@ -498,6 +536,8 @@ def main(): # Upload new code if needed (e.g. code checksum has changed) if len(code_kwargs) > 2: + if not check_mode: + wait_for_lambda(client, module, name) try: if not check_mode: response = client.update_function_code(aws_retry=True, **code_kwargs) @@ -515,6 +555,7 @@ def main(): module.exit_json(changed=changed, **camel_dict_to_snake_dict(response)) # Function doesn't exists, create new Lambda function + elif state == 'present': if s3_bucket and s3_key: # If function is stored on S3 @@ -531,18 +572,21 @@ def main(): code = {'ZipFile': zip_content} except IOError as e: module.fail_json(msg=str(e), exception=traceback.format_exc()) + elif image_uri: + code = {'ImageUri': image_uri } else: - module.fail_json(msg='Either S3 object or path to zipfile required') + module.fail_json(msg='Either S3 object, path to zipfile or Image Uri required') func_kwargs = {'FunctionName': name, 'Publish': True, - 'Runtime': runtime, 'Role': role_arn, 'Code': code, 'Timeout': timeout, - 'MemorySize': memory_size, + 'MemorySize': memory_size } + if runtime is not None: + func_kwargs.update({'Runtime': runtime}) if description is not None: func_kwargs.update({'Description': description}) @@ -553,6 +597,11 @@ def main(): if environment_variables: func_kwargs.update({'Environment': {'Variables': environment_variables}}) + if image_uri is not None: + func_kwargs.update({'PackageType': 'Image'}) + if image_config is not None: + func_kwargs.update({'ImageConfig': image_config}) + if dead_letter_arn: func_kwargs.update({'DeadLetterConfig': {'TargetArn': dead_letter_arn}}) diff --git a/plugins/modules/s3_bucket_notification.py b/plugins/modules/s3_bucket_notification.py index f42c64a0028..0cb12f4226b 100644 --- a/plugins/modules/s3_bucket_notification.py +++ b/plugins/modules/s3_bucket_notification.py @@ -120,41 +120,89 @@ class AmazonBucket: - def __init__(self, client, bucket_name): + def __init__(self, module, client): + self.module = module self.client = client - self.bucket_name = bucket_name + self.bucket_name = module.params["bucket_name"] + self.check_mode = module.check_mode self._full_config_cache = None def full_config(self): if self._full_config_cache is None: - self._full_config_cache = [Config.from_api(cfg) for cfg in - self.client.get_bucket_notification_configuration( - Bucket=self.bucket_name).get( - 'LambdaFunctionConfigurations', list())] + self._full_config_cache = dict( + QueueConfigurations=[], TopicConfigurations=[], LambdaFunctionConfigurations=[] + ) + + try: + config_lookup = self.client.get_bucket_notification_configuration(Bucket=self.bucket_name) + except (ClientError, BotoCoreError) as e: + self.module.fail_json(msg=f"{e}") + + # Handle different event targets + if config_lookup.get("QueueConfigurations"): + for queue_config in config_lookup.get("QueueConfigurations"): + self._full_config_cache["QueueConfigurations"].append(Config.from_api(queue_config)) + + if config_lookup.get("TopicConfigurations"): + for topic_config in config_lookup.get("TopicConfigurations"): + self._full_config_cache["TopicConfigurations"].append(Config.from_api(topic_config)) + + if config_lookup.get("LambdaFunctionConfigurations"): + for function_config in config_lookup.get("LambdaFunctionConfigurations"): + self._full_config_cache["LambdaFunctionConfigurations"].append(Config.from_api(function_config)) + return self._full_config_cache def current_config(self, config_name): - for config in self.full_config(): - if config.raw['Id'] == config_name: - return config + for target_configs in self.full_config(): + for config in self.full_config()[target_configs]: + if config.raw["Id"] == config_name: + return config def apply_config(self, desired): - configs = [cfg.raw for cfg in self.full_config() if cfg.name != desired.raw['Id']] - configs.append(desired.raw) + configs = dict(QueueConfigurations=[], TopicConfigurations=[], LambdaFunctionConfigurations=[]) + + # Iterate through existing configs then add the desired config + for target_configs in self.full_config(): + for config in self.full_config()[target_configs]: + if config.name != desired.raw["Id"]: + configs[target_configs].append(config.raw) + + if self.module.params.get("queue_arn"): + configs["QueueConfigurations"].append(desired.raw) + if self.module.params.get("topic_arn"): + configs["TopicConfigurations"].append(desired.raw) + if self.module.params.get("lambda_function_arn"): + configs["LambdaFunctionConfigurations"].append(desired.raw) + self._upload_bucket_config(configs) return configs def delete_config(self, desired): - configs = [cfg.raw for cfg in self.full_config() if cfg.name != desired.raw['Id']] + configs = dict(QueueConfigurations=[], TopicConfigurations=[], LambdaFunctionConfigurations=[]) + + # Iterate through existing configs omitting specified config + for target_configs in self.full_config(): + for config in self.full_config()[target_configs]: + if config.name != desired.raw["Id"]: + configs[target_configs].append(config.raw) + self._upload_bucket_config(configs) return configs - def _upload_bucket_config(self, config): - self.client.put_bucket_notification_configuration( - Bucket=self.bucket_name, - NotificationConfiguration={ - 'LambdaFunctionConfigurations': config - }) + def _upload_bucket_config(self, configs): + api_params = dict(Bucket=self.bucket_name, NotificationConfiguration=dict()) + + # Iterate through available configs + for target_configs in configs: + if len(configs[target_configs]) > 0: + api_params["NotificationConfiguration"][target_configs] = configs[target_configs] + + if not self.check_mode: + try: + self.client.put_bucket_notification_configuration(**api_params) + except (ClientError, BotoCoreError) as e: + self.module.fail_json(msg=f"{e}") class Config: @@ -173,86 +221,116 @@ def __eq__(self, other): @classmethod def from_params(cls, **params): - function_arn = params['lambda_function_arn'] - - qualifier = None - if params['lambda_version'] > 0: - qualifier = str(params['lambda_version']) - elif params['lambda_alias']: - qualifier = str(params['lambda_alias']) - if qualifier: - params['lambda_function_arn'] = '{0}:{1}'.format(function_arn, qualifier) - - return cls({ - 'Id': params['event_name'], - 'LambdaFunctionArn': params['lambda_function_arn'], - 'Events': sorted(params['events']), - 'Filter': { - 'Key': { - 'FilterRules': [{ - 'Name': 'Prefix', - 'Value': params['prefix'] - }, { - 'Name': 'Suffix', - 'Value': params['suffix'] - }] - } - } - }) + bucket_event_params = dict( + Id=params["event_name"], + Events=sorted(params["events"]), + Filter=dict( + Key=dict( + FilterRules=[ + dict(Name="Prefix", Value=params["prefix"]), + dict(Name="Suffix", Value=params["suffix"]), + ] + ) + ), + ) + + # Handle different event targets + if params.get("queue_arn"): + bucket_event_params["QueueArn"] = params["queue_arn"] + if params.get("topic_arn"): + bucket_event_params["TopicArn"] = params["topic_arn"] + if params.get("lambda_function_arn"): + function_arn = params["lambda_function_arn"] + + qualifier = None + if params["lambda_version"] > 0: + qualifier = str(params["lambda_version"]) + elif params["lambda_alias"]: + qualifier = str(params["lambda_alias"]) + if qualifier: + params["lambda_function_arn"] = f"{function_arn}:{qualifier}" + + bucket_event_params["LambdaFunctionArn"] = params["lambda_function_arn"] + + return cls(bucket_event_params) @classmethod def from_api(cls, config): return cls(config) - +def setup_module_object(): + event_types = [ + "s3:ObjectCreated:*", + "s3:ObjectCreated:Put", + "s3:ObjectCreated:Post", + "s3:ObjectCreated:Copy", + "s3:ObjectCreated:CompleteMultipartUpload", + "s3:ObjectRemoved:*", + "s3:ObjectRemoved:Delete", + "s3:ObjectRemoved:DeleteMarkerCreated", + "s3:ObjectRestore:Post", + "s3:ObjectRestore:Completed", + "s3:ReducedRedundancyLostObject", + ] + + argument_spec = dict( + state=dict(default="present", choices=["present", "absent"]), + event_name=dict(required=True), + lambda_function_arn=dict(aliases=["function_arn"]), + queue_arn=dict(type="str"), + topic_arn=dict(type="str"), + bucket_name=dict(required=True), + events=dict(type="list", default=[], choices=event_types, elements="str"), + prefix=dict(default=""), + suffix=dict(default=""), + lambda_alias=dict(), + lambda_version=dict(type="int", default=0), + ) + + mutually_exclusive = [ + ["queue_arn", "topic_arn", "lambda_function_arn"], + ["lambda_alias", "lambda_version"], + ] + + return AnsibleAWSModule( + argument_spec=argument_spec, + supports_check_mode=True, + mutually_exclusive=mutually_exclusive, + required_if=[["state", "present", ["events"]]], + ) def main(): - event_types = ['s3:ObjectCreated:*', 's3:ObjectCreated:Put', 's3:ObjectCreated:Post', - 's3:ObjectCreated:Copy', 's3:ObjectCreated:CompleteMultipartUpload', - 's3:ObjectRemoved:*', 's3:ObjectRemoved:Delete', - 's3:ObjectRemoved:DeleteMarkerCreated', 's3:ObjectRestore:Post', - 's3:ObjectRestore:Completed', 's3:ReducedRedundancyLostObject'] - argument_spec = dict( - state=dict(default='present', choices=['present', 'absent']), - event_name=dict(required=True), - lambda_function_arn=dict(aliases=['function_arn']), - bucket_name=dict(required=True), - events=dict(type='list', default=[], choices=event_types, elements='str'), - prefix=dict(default=''), - suffix=dict(default=''), - lambda_alias=dict(), - lambda_version=dict(type='int', default=0), - ) - - module = AnsibleAWSModule( - argument_spec=argument_spec, - supports_check_mode=True, - mutually_exclusive=[['lambda_alias', 'lambda_version']], - required_if=[['state', 'present', ['events']]] - ) - - bucket = AmazonBucket(module.client('s3'), module.params['bucket_name']) - current = bucket.current_config(module.params['event_name']) + module = setup_module_object() + + client = module.client("s3") + bucket = AmazonBucket(module, client) + current = bucket.current_config(module.params["event_name"]) desired = Config.from_params(**module.params) - notification_configuration = [cfg.raw for cfg in bucket.full_config()] - state = module.params['state'] - try: - if (state == 'present' and current == desired) or (state == 'absent' and not current): - changed = False - elif module.check_mode: - changed = True - elif state == 'present': + notification_configs = dict(QueueConfigurations=[], TopicConfigurations=[], LambdaFunctionConfigurations=[]) + + for target_configs in bucket.full_config(): + for cfg in bucket.full_config()[target_configs]: + notification_configs[target_configs].append(camel_dict_to_snake_dict(cfg.raw)) + + state = module.params["state"] + updated_configuration = dict() + changed = False + + if state == "present": + if current != desired: + updated_configuration = bucket.apply_config(desired) changed = True - notification_configuration = bucket.apply_config(desired) - elif state == 'absent': + elif state == "absent": + if current: + updated_configuration = bucket.delete_config(desired) changed = True - notification_configuration = bucket.delete_config(desired) - except (ClientError, BotoCoreError) as e: - module.fail_json(msg='{0}'.format(e)) - module.exit_json(**dict(changed=changed, - notification_configuration=[camel_dict_to_snake_dict(cfg) for cfg in - notification_configuration])) + for target_configs in updated_configuration: + notification_configs[target_configs] = [] + for cfg in updated_configuration.get(target_configs, list()): + notification_configs[target_configs].append(camel_dict_to_snake_dict(cfg)) + + module.exit_json(changed=changed, notification_configuration=camel_dict_to_snake_dict(notification_configs)) if __name__ == '__main__': diff --git a/plugins/modules/sns_topic.py b/plugins/modules/sns_topic.py index 79070cbabc5..acb40823271 100644 --- a/plugins/modules/sns_topic.py +++ b/plugins/modules/sns_topic.py @@ -12,8 +12,7 @@ short_description: Manages AWS SNS topics and subscriptions version_added: 1.0.0 description: - - The M(community.aws.sns_topic) module allows you to create, delete, and manage subscriptions for AWS SNS topics. - - As of 2.6, this module can be use to subscribe and unsubscribe to topics outside of your AWS account. + - The M(community.aws.sns_topic) module allows you to create, delete, and manage subscriptions for AWS SNS topics. author: - "Joel Thompson (@joelthompson)" - "Fernando Jose Pando (@nand0p)" @@ -24,6 +23,16 @@ - The name or ARN of the SNS topic to manage. required: true type: str + topic_type: + description: + - The type of topic that should be created. Either Standard for FIFO (first-in, first-out). + - Some regions, including GovCloud regions do not support FIFO topics. + Use a default value of 'standard' or omit the option if the region + does not support FIFO topics. + choices: ["standard", "fifo"] + default: 'standard' + type: str + version_added: 2.0.0 state: description: - Whether to create or destroy an SNS topic. @@ -37,11 +46,80 @@ policy: description: - Policy to apply to the SNS topic. + - Policy body can be YAML or JSON. + - This is required for certain use cases for example with S3 bucket notifications. type: dict delivery_policy: description: - Delivery policy to apply to the SNS topic. type: dict + suboptions: + http: + description: + - Delivery policy for HTTP(S) messages. + - See U(https://docs.aws.amazon.com/sns/latest/dg/sns-message-delivery-retries.html) + for more information. + type: dict + required: false + suboptions: + disableSubscriptionOverrides: + description: + - Applies this policy to all subscriptions, even if they have their own policies. + type: bool + required: false + defaultThrottlePolicy: + description: + - Throttle the rate of messages sent to subsriptions. + type: dict + suboptions: + maxReceivesPerSecond: + description: + - The maximum number of deliveries per second per subscription. + type: int + required: true + required: false + defaultHealthyRetryPolicy: + description: + - Retry policy for HTTP(S) messages. + type: dict + required: true + suboptions: + minDelayTarget: + description: + - The minimum delay for a retry. + type: int + required: true + maxDelayTarget: + description: + - The maximum delay for a retry. + type: int + required: true + numRetries: + description: + - The total number of retries. + type: int + required: true + numMaxDelayRetries: + description: + - The number of retries with the maximum delay between them. + type: int + required: true + numMinDelayRetries: + description: + - The number of retries with just the minimum delay between them. + type: int + required: true + numNoDelayRetries: + description: + - The number of retries to be performmed immediately. + type: int + required: true + backoffFunction: + description: + - The function for backoff between retries. + type: str + required: true + choices: ['arithmetic', 'exponential', 'geometric', 'linear'] subscriptions: description: - List of subscriptions to apply to the topic. Note that AWS requires @@ -54,6 +132,10 @@ protocol: description: Protocol of subscription. required: true + attributes: + description: Attributes of subscription. Only supports RawMessageDelievery, FilterPolicy for SQS endpoints. + default: {} + version_added: "4.1.0" type: list elements: dict default: [] @@ -66,11 +148,21 @@ Blame Amazon." default: true type: bool + content_based_deduplication: + description: + - Whether to enable content-based deduplication for this topic. + - Ignored unless I(topic_type=fifo). + - Defaults to C(disabled). + choices: ["disabled", "enabled"] + type: str + version_added: 5.3.0 +notes: + - Support for I(tags) and I(purge_tags) was added in release 5.3.0. extends_documentation_fragment: -- amazon.aws.aws -- amazon.aws.ec2 - -requirements: [ "boto" ] + - amazon.aws.common.modules + - amazon.aws.region.modules + - amazon.aws.tags.modules + - amazon.aws.boto3 ''' EXAMPLES = r""" @@ -83,20 +175,45 @@ delivery_policy: http: defaultHealthyRetryPolicy: - minDelayTarget: 2 - maxDelayTarget: 4 - numRetries: 3 - numMaxDelayRetries: 5 - backoffFunction: "" + minDelayTarget: 2 + maxDelayTarget: 4 + numRetries: 9 + numMaxDelayRetries: 5 + numMinDelayRetries: 2 + numNoDelayRetries: 2 + backoffFunction: "linear" disableSubscriptionOverrides: True defaultThrottlePolicy: - maxReceivesPerSecond: 10 + maxReceivesPerSecond: 10 subscriptions: - endpoint: "my_email_address@example.com" protocol: "email" - endpoint: "my_mobile_number" protocol: "sms" +- name: Create a topic permitting S3 bucket notifications + community.aws.sns_topic: + name: "S3Notifications" + state: present + display_name: "S3 notifications SNS topic" + policy: + Id: s3-topic-policy + Version: 2012-10-17 + Statement: + - Sid: Statement-id + Effect: Allow + Resource: "arn:aws:sns:*:*:S3Notifications" + Principal: + Service: s3.amazonaws.com + Action: sns:Publish + Condition: + ArnLike: + aws:SourceArn: "arn:aws:s3:*:*:SomeBucket" + +- name: Example deleting a topic + community.aws.sns_topic: + name: "ExampleTopic" + state: absent """ RETURN = r''' @@ -104,8 +221,8 @@ description: The ARN of the topic you are modifying type: str returned: always - sample: "arn:aws:sns:us-east-2:111111111111:my_topic_name" -community.aws.sns_topic: + sample: "arn:aws:sns:us-east-2:123456789012:my_topic_name" +sns_topic: description: Dict of sns topic details type: complex returned: always @@ -120,6 +237,12 @@ returned: always type: bool sample: false + content_based_deduplication: + description: Whether or not content_based_deduplication was set + returned: always + type: str + sample: disabled + version_added: 5.3.0 delivery_policy: description: Delivery policy for the SNS topic returned: when topic is owned by this AWS account @@ -141,14 +264,14 @@ description: AWS account that owns the topic returned: when topic is owned by this AWS account type: str - sample: '111111111111' + sample: '123456789012' policy: description: Policy for the SNS topic returned: when topic is owned by this AWS account type: str sample: > - {"Version":"2012-10-17","Id":"SomePolicyId","Statement":[{"Sid":"ANewSid","Effect":"Allow","Principal":{"AWS":"arn:aws:iam::111111111111:root"}, - "Action":"sns:Subscribe","Resource":"arn:aws:sns:us-east-2:111111111111:ansible-test-dummy-topic","Condition":{"StringEquals":{"sns:Protocol":"email"}}}]} + {"Version":"2012-10-17","Id":"SomePolicyId","Statement":[{"Sid":"ANewSid","Effect":"Allow","Principal":{"AWS":"arn:aws:iam::123456789012:root"}, + "Action":"sns:Subscribe","Resource":"arn:aws:sns:us-east-2:123456789012:ansible-test-dummy-topic","Condition":{"StringEquals":{"sns:Protocol":"email"}}}]} state: description: whether the topic is present or absent returned: always @@ -198,7 +321,7 @@ description: ARN of the SNS topic (equivalent to sns_arn) returned: when topic is owned by this AWS account type: str - sample: arn:aws:sns:us-east-2:111111111111:ansible-test-dummy-topic + sample: arn:aws:sns:us-east-2:123456789012:ansible-test-dummy-topic topic_created: description: Whether the topic was created returned: always @@ -212,16 +335,24 @@ ''' import json -import re -import copy try: import botocore except ImportError: pass # handled by AnsibleAWSModule -from ansible_collections.amazon.aws.plugins.module_utils.core import AnsibleAWSModule, is_boto3_error_code -from ansible_collections.amazon.aws.plugins.module_utils.ec2 import compare_policies, AWSRetry, camel_dict_to_snake_dict +from ansible_collections.amazon.aws.plugins.module_utils.core import AnsibleAWSModule +from ansible_collections.amazon.aws.plugins.module_utils.core import scrub_none_parameters +from ansible_collections.amazon.aws.plugins.module_utils.ec2 import compare_policies +from ansible_collections.amazon.aws.plugins.module_utils.ec2 import ansible_dict_to_boto3_tag_list + +from ansible_collections.community.aws.plugins.module_utils.sns import list_topics +from ansible_collections.community.aws.plugins.module_utils.sns import topic_arn_lookup +from ansible_collections.community.aws.plugins.module_utils.sns import compare_delivery_policies +from ansible_collections.community.aws.plugins.module_utils.sns import list_topic_subscriptions +from ansible_collections.community.aws.plugins.module_utils.sns import canonicalize_endpoint +from ansible_collections.community.aws.plugins.module_utils.sns import get_info +from ansible_collections.community.aws.plugins.module_utils.sns import update_tags class SnsTopicManager(object): @@ -230,85 +361,66 @@ class SnsTopicManager(object): def __init__(self, module, name, + topic_type, state, display_name, policy, delivery_policy, subscriptions, purge_subscriptions, + tags, + purge_tags, + content_based_deduplication, check_mode): self.connection = module.client('sns') self.module = module self.name = name + self.topic_type = topic_type self.state = state self.display_name = display_name self.policy = policy - self.delivery_policy = delivery_policy + self.delivery_policy = scrub_none_parameters(delivery_policy) if delivery_policy else None self.subscriptions = subscriptions self.subscriptions_existing = [] self.subscriptions_deleted = [] self.subscriptions_added = [] + self.subscriptions_attributes_set = [] + self.desired_subscription_attributes = dict() self.purge_subscriptions = purge_subscriptions + self.content_based_deduplication = content_based_deduplication self.check_mode = check_mode self.topic_created = False self.topic_deleted = False self.topic_arn = None self.attributes_set = [] + self.tags = tags + self.purge_tags = purge_tags - @AWSRetry.jittered_backoff() - def _list_topics_with_backoff(self): - paginator = self.connection.get_paginator('list_topics') - return paginator.paginate().build_full_result()['Topics'] + def _create_topic(self): + attributes = {} + tags = [] - @AWSRetry.jittered_backoff(catch_extra_error_codes=['NotFound']) - def _list_topic_subscriptions_with_backoff(self): - paginator = self.connection.get_paginator('list_subscriptions_by_topic') - return paginator.paginate(TopicArn=self.topic_arn).build_full_result()['Subscriptions'] + # NOTE: Never set FifoTopic = False. Some regions (including GovCloud) + # don't support the attribute being set, even to False. + if self.topic_type == 'fifo': + attributes['FifoTopic'] = 'true' + if not self.name.endswith('.fifo'): + self.name = self.name + '.fifo' - @AWSRetry.jittered_backoff(catch_extra_error_codes=['NotFound']) - def _list_subscriptions_with_backoff(self): - paginator = self.connection.get_paginator('list_subscriptions') - return paginator.paginate().build_full_result()['Subscriptions'] + if self.tags: + tags = ansible_dict_to_boto3_tag_list(self.tags) - def _list_topics(self): - try: - topics = self._list_topics_with_backoff() - except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: - self.module.fail_json_aws(e, msg="Couldn't get topic list") - return [t['TopicArn'] for t in topics] - - def _topic_arn_lookup(self): - # topic names cannot have colons, so this captures the full topic name - all_topics = self._list_topics() - lookup_topic = ':%s' % self.name - for topic in all_topics: - if topic.endswith(lookup_topic): - return topic - - def _create_topic(self): if not self.check_mode: try: - response = self.connection.create_topic(Name=self.name) + response = self.connection.create_topic(Name=self.name, + Attributes=attributes, + Tags=tags) except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: self.module.fail_json_aws(e, msg="Couldn't create topic %s" % self.name) self.topic_arn = response['TopicArn'] return True - def _compare_delivery_policies(self, policy_a, policy_b): - _policy_a = copy.deepcopy(policy_a) - _policy_b = copy.deepcopy(policy_b) - # AWS automatically injects disableSubscriptionOverrides if you set an - # http policy - if 'http' in policy_a: - if 'disableSubscriptionOverrides' not in policy_a['http']: - _policy_a['http']['disableSubscriptionOverrides'] = False - if 'http' in policy_b: - if 'disableSubscriptionOverrides' not in policy_b['http']: - _policy_b['http']['disableSubscriptionOverrides'] = False - comparison = (_policy_a != _policy_b) - return comparison - def _set_topic_attrs(self): changed = False try: @@ -336,8 +448,22 @@ def _set_topic_attrs(self): except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: self.module.fail_json_aws(e, msg="Couldn't set topic policy") + # Set content-based deduplication attribute. Ignore if topic_type is not fifo. + if ("FifoTopic" in topic_attributes and topic_attributes["FifoTopic"] == "true") and \ + self.content_based_deduplication: + enabled = "true" if self.content_based_deduplication in 'enabled' else "false" + if enabled != topic_attributes['ContentBasedDeduplication']: + changed = True + self.attributes_set.append('content_based_deduplication') + if not self.check_mode: + try: + self.connection.set_topic_attributes(TopicArn=self.topic_arn, AttributeName='ContentBasedDeduplication', + AttributeValue=enabled) + except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: + self.module.fail_json_aws(e, msg="Couldn't set content-based deduplication") + if self.delivery_policy and ('DeliveryPolicy' not in topic_attributes or - self._compare_delivery_policies(self.delivery_policy, json.loads(topic_attributes['DeliveryPolicy']))): + compare_delivery_policies(self.delivery_policy, json.loads(topic_attributes['DeliveryPolicy']))): changed = True self.attributes_set.append('delivery_policy') if not self.check_mode: @@ -348,19 +474,14 @@ def _set_topic_attrs(self): self.module.fail_json_aws(e, msg="Couldn't set topic delivery policy") return changed - def _canonicalize_endpoint(self, protocol, endpoint): - if protocol == 'sms': - return re.sub('[^0-9]*', '', endpoint) - return endpoint - def _set_topic_subs(self): changed = False subscriptions_existing_list = set() desired_subscriptions = [(sub['protocol'], - self._canonicalize_endpoint(sub['protocol'], sub['endpoint'])) for sub in + canonicalize_endpoint(sub['protocol'], sub['endpoint'])) for sub in self.subscriptions] - for sub in self._list_topic_subscriptions(): + for sub in list_topic_subscriptions(self.connection, self.module, self.topic_arn): sub_key = (sub['Protocol'], sub['Endpoint']) subscriptions_existing_list.add(sub_key) if (self.purge_subscriptions and sub_key not in desired_subscriptions and @@ -383,23 +504,71 @@ def _set_topic_subs(self): self.module.fail_json_aws(e, msg="Couldn't subscribe to topic %s" % self.topic_arn) return changed - def _list_topic_subscriptions(self): - try: - return self._list_topic_subscriptions_with_backoff() - except is_boto3_error_code('AuthorizationError'): + def _init_desired_subscription_attributes(self): + for sub in self.subscriptions: + sub_key = (sub['protocol'], canonicalize_endpoint(sub['protocol'], sub['endpoint'])) + tmp_dict = sub.get('attributes', {}) + # aws sdk expects values to be strings + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns.html#SNS.Client.set_subscription_attributes + for k, v in tmp_dict.items(): + tmp_dict[k] = str(v) + + self.desired_subscription_attributes[sub_key] = tmp_dict + + def _set_topic_subs_attributes(self): + changed = False + for sub in list_topic_subscriptions(self.connection, self.module, self.topic_arn): + sub_key = (sub['Protocol'], sub['Endpoint']) + sub_arn = sub['SubscriptionArn'] + + if sub_arn == "PendingConfirmation": + # subscription are not confirmed, skip + continue + + if not self.desired_subscription_attributes.get(sub_key): + # subscription attributes aren't defined in desired, skipping + continue + try: - # potentially AuthorizationError when listing subscriptions for third party topic - return [sub for sub in self._list_subscriptions_with_backoff() - if sub['TopicArn'] == self.topic_arn] + sub_current_attributes = self.connection.get_subscription_attributes(SubscriptionArn=sub_arn)['Attributes'] + except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: - self.module.fail_json_aws(e, msg="Couldn't get subscriptions list for topic %s" % self.topic_arn) - except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: # pylint: disable=duplicate-except - self.module.fail_json_aws(e, msg="Couldn't get subscriptions list for topic %s" % self.topic_arn) + self.module.fail_json_aws(e, "Couldn't get subscription attributes for subscription %s" % sub_arn) + + raw_message = self.desired_subscription_attributes[sub_key].get('RawMessageDelivery') + if raw_message is not None and 'RawMessageDelivery' in sub_current_attributes: + if sub_current_attributes['RawMessageDelivery'].lower() != raw_message.lower(): + changed = True + if not self.check_mode: + try: + self.connection.set_subscription_attributes(SubscriptionArn=sub_arn, + AttributeName='RawMessageDelivery', + AttributeValue=raw_message) + except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: + self.module.fail_json_aws(e, "Couldn't set RawMessageDelivery subscription attribute") + + filter_policy = self.desired_subscription_attributes[sub_key].get('FilterPolicy') + changed = False + if filter_policy is not None: + if 'FilterPolicy' not in sub_current_attributes: + changed = True + if 'FilterPolicy' in sub_current_attributes: + if sub_current_attributes['FilterPolicy'].lower() != filter_policy.lower(): + changed = True + if not self.check_mode and changed: + try: + self.connection.set_subscription_attributes(SubscriptionArn=sub_arn, + AttributeName='FilterPolicy', + AttributeValue=filter_policy) + except (botocore.exceptions.ClientError, botocore.exceptions.BotoCoreError) as e: + self.module.fail_json_aws(e, "Couldn't set FilterPolicy subscription attribute") + + return changed def _delete_subscriptions(self): # NOTE: subscriptions in 'PendingConfirmation' timeout in 3 days # https://forums.aws.amazon.com/thread.jspa?threadID=85993 - subscriptions = self._list_topic_subscriptions() + subscriptions = list_topic_subscriptions(self.connection, self.module, self.topic_arn) if not subscriptions: return False for sub in subscriptions: @@ -426,97 +595,123 @@ def _name_is_arn(self): def ensure_ok(self): changed = False - if self._name_is_arn(): - self.topic_arn = self.name - else: - self.topic_arn = self._topic_arn_lookup() + self.populate_topic_arn() if not self.topic_arn: changed = self._create_topic() - if self.topic_arn in self._list_topics(): + if self.topic_arn in list_topics(self.connection, self.module): changed |= self._set_topic_attrs() elif self.display_name or self.policy or self.delivery_policy: self.module.fail_json(msg="Cannot set display name, policy or delivery policy for SNS topics not owned by this account") changed |= self._set_topic_subs() + self._init_desired_subscription_attributes() + if self.topic_arn in list_topics(self.connection, self.module): + changed |= self._set_topic_subs_attributes() + elif any(self.desired_subscription_attributes.values()): + self.module.fail_json(msg="Cannot set subscription attributes for SNS topics not owned by this account") + # Check tagging + changed |= update_tags(self.connection, self.module, self.topic_arn) + return changed def ensure_gone(self): changed = False - if self._name_is_arn(): - self.topic_arn = self.name - else: - self.topic_arn = self._topic_arn_lookup() + self.populate_topic_arn() if self.topic_arn: - if self.topic_arn not in self._list_topics(): + if self.topic_arn not in list_topics(self.connection, self.module): self.module.fail_json(msg="Cannot use state=absent with third party ARN. Use subscribers=[] to unsubscribe") changed = self._delete_subscriptions() changed |= self._delete_topic() return changed - def get_info(self): - info = { - 'name': self.name, - 'state': self.state, - 'subscriptions_new': self.subscriptions, - 'subscriptions_existing': self.subscriptions_existing, - 'subscriptions_deleted': self.subscriptions_deleted, - 'subscriptions_added': self.subscriptions_added, - 'subscriptions_purge': self.purge_subscriptions, - 'check_mode': self.check_mode, - 'topic_created': self.topic_created, - 'topic_deleted': self.topic_deleted, - 'attributes_set': self.attributes_set, - } - if self.state != 'absent': - if self.topic_arn in self._list_topics(): - info.update(camel_dict_to_snake_dict(self.connection.get_topic_attributes(TopicArn=self.topic_arn)['Attributes'])) - info['delivery_policy'] = info.pop('effective_delivery_policy') - info['subscriptions'] = [camel_dict_to_snake_dict(sub) for sub in self._list_topic_subscriptions()] - - return info + def populate_topic_arn(self): + if self._name_is_arn(): + self.topic_arn = self.name + return + + name = self.name + if self.topic_type == 'fifo' and not name.endswith('.fifo'): + name += ".fifo" + self.topic_arn = topic_arn_lookup(self.connection, self.module, name) def main(): + # We're kinda stuck with CamelCase here, it would be nice to switch to + # snake_case, but we'd need to purge out the alias entries + http_retry_args = dict( + minDelayTarget=dict(type='int', required=True), + maxDelayTarget=dict(type='int', required=True), + numRetries=dict(type='int', required=True), + numMaxDelayRetries=dict(type='int', required=True), + numMinDelayRetries=dict(type='int', required=True), + numNoDelayRetries=dict(type='int', required=True), + backoffFunction=dict(type='str', required=True, choices=['arithmetic', 'exponential', 'geometric', 'linear']), + ) + http_delivery_args = dict( + defaultHealthyRetryPolicy=dict(type='dict', required=True, options=http_retry_args), + disableSubscriptionOverrides=dict(type='bool', required=False), + defaultThrottlePolicy=dict( + type='dict', required=False, + options=dict( + maxReceivesPerSecond=dict(type='int', required=True), + ), + ), + ) + delivery_args = dict( + http=dict(type='dict', required=False, options=http_delivery_args), + ) + argument_spec = dict( name=dict(required=True), + topic_type=dict(type='str', default='standard', choices=['standard', 'fifo']), state=dict(default='present', choices=['present', 'absent']), display_name=dict(), policy=dict(type='dict'), - delivery_policy=dict(type='dict'), + delivery_policy=dict(type='dict', options=delivery_args), subscriptions=dict(default=[], type='list', elements='dict'), purge_subscriptions=dict(type='bool', default=True), + tags=dict(type='dict', aliases=['resource_tags']), + purge_tags=dict(type='bool', default=True), + content_based_deduplication=dict(choices=['enabled', 'disabled']) ) module = AnsibleAWSModule(argument_spec=argument_spec, supports_check_mode=True) name = module.params.get('name') + topic_type = module.params.get('topic_type') state = module.params.get('state') display_name = module.params.get('display_name') policy = module.params.get('policy') delivery_policy = module.params.get('delivery_policy') subscriptions = module.params.get('subscriptions') purge_subscriptions = module.params.get('purge_subscriptions') + content_based_deduplication = module.params.get('content_based_deduplication') check_mode = module.check_mode + tags = module.params.get('tags') + purge_tags = module.params.get('purge_tags') sns_topic = SnsTopicManager(module, name, + topic_type, state, display_name, policy, delivery_policy, subscriptions, purge_subscriptions, + tags, + purge_tags, + content_based_deduplication, check_mode) if state == 'present': changed = sns_topic.ensure_ok() - elif state == 'absent': changed = sns_topic.ensure_gone() sns_facts = dict(changed=changed, sns_arn=sns_topic.topic_arn, - sns_topic=sns_topic.get_info()) + sns_topic=get_info(sns_topic.connection, module, sns_topic.topic_arn)) module.exit_json(**sns_facts) diff --git a/plugins/modules/sqs_queue.py b/plugins/modules/sqs_queue.py index 5d65967974a..6607b91eb97 100644 --- a/plugins/modules/sqs_queue.py +++ b/plugins/modules/sqs_queue.py @@ -375,7 +375,7 @@ def update_sqs_queue(module, client, queue_url): if isinstance(new_value, bool): new_value = str(new_value).lower() - existing_value = str(existing_value).lower() + value = str(value).lower() if new_value == value: continue