From de87974174aec430f876135c19a1b41533a283a8 Mon Sep 17 00:00:00 2001
From: Rhys <rhyscampbell@bluewin.ch>
Date: Sun, 13 Oct 2024 08:24:16 +0200
Subject: [PATCH] Cassandra role consistency level (#287)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

* Added consistency_level parameter for use in cql queries in cassandra_role, cassandra_keyspace and cassandra_table modules. See the project's README.md for details about individual level support.
---------

Co-authored-by: Демьяненко Александр Сергеевич <a.s.demyanenko@tinkoff.ru>
---
 README.md                                     |  20 +++
 plugins/modules/cassandra_keyspace.py         |  86 +++++++--
 plugins/modules/cassandra_role.py             | 165 ++++++++++++------
 plugins/modules/cassandra_table.py            |  83 ++++++++-
 .../targets/cassandra_keyspace/tasks/284.yml  |  43 +++++
 .../targets/cassandra_keyspace/tasks/main.yml |   2 +
 .../targets/cassandra_role/tasks/284.yml      | 115 ++++++++++++
 .../targets/cassandra_role/tasks/main.yml     |   3 +
 .../targets/cassandra_table/tasks/284.yml     |  75 ++++++++
 .../targets/cassandra_table/tasks/main.yml    |   2 +
 tests/sanity/ignore-2.15.txt                  |   1 +
 tests/sanity/ignore-2.16.txt                  |   1 +
 tests/sanity/ignore-2.17.txt                  |   1 +
 tests/sanity/ignore-2.18.txt                  |   1 +
 tests/sanity/ignore-2.19.txt                  |   6 +
 15 files changed, 529 insertions(+), 75 deletions(-)
 create mode 100644 tests/integration/targets/cassandra_keyspace/tasks/284.yml
 create mode 100644 tests/integration/targets/cassandra_role/tasks/284.yml
 create mode 100644 tests/integration/targets/cassandra_table/tasks/284.yml
 create mode 120000 tests/sanity/ignore-2.15.txt
 create mode 120000 tests/sanity/ignore-2.16.txt
 create mode 120000 tests/sanity/ignore-2.17.txt
 create mode 120000 tests/sanity/ignore-2.18.txt
 create mode 100644 tests/sanity/ignore-2.19.txt

diff --git a/README.md b/README.md
index 3fd20010..00d4b5b5 100644
--- a/README.md
+++ b/README.md
@@ -60,6 +60,26 @@ These roles prepare servers with Debian-based and RHEL-based distributions to ru
 - `cassandra_upgradesstables`- Upgrade SSTables which are not on the current Cassandra version.
 - `cassandra_verify`- Checks the data checksum for one or more tables.
 
+## Module support for Consistency Level
+
+The pure-python modules, currently cassandra_role, cassandra_keyspace & cassandra_table all have a consistency_level parameter, through which the consistency level can be changed. Not all consistency levels are supported by read and write. The table below summarizes this.
+
+| **Consistency Level**   | **Read** | **Write** |
+|-------------------------|----------|-----------|
+| **ANY**                 | No       | Yes       |
+| **ONE**                 | Yes      | Yes       |
+| **TWO**                 | Yes      | Yes       |
+| **THREE**               | Yes      | Yes       |
+| **QUORUM**              | Yes      | Yes       |
+| **ALL**                 | Yes      | Yes       |
+| **LOCAL_ONE**           | Yes      | Yes       |
+| **LOCAL_QUORUM**        | Yes      | Yes       |
+| **EACH_QUORUM**         | No       | Yes       |
+| **SERIAL**              | Yes      | No        |
+| **LOCAL_SERIAL**        | Yes      | No        |
+
+If the chosen consistency level is not supported, by either read or write, then the default *LOCAL_ONE* is used.
+
 ## Supported Cassandra Versions
 
 * 4.0.X
diff --git a/plugins/modules/cassandra_keyspace.py b/plugins/modules/cassandra_keyspace.py
index 491ffbed..d0830f2c 100644
--- a/plugins/modules/cassandra_keyspace.py
+++ b/plugins/modules/cassandra_keyspace.py
@@ -85,6 +85,26 @@
     type: dict
     aliases:
       - data_centers
+  consistency_level:
+    description:
+      - Consistency level to perform cassandra queries with.
+      - Not all consistency levels are supported by read or write connections.\
+        When a level is not supported then LOCAL_ONE, the default is used.
+      - Consult the README.md on GitHub for further details.
+    type: str
+    default: "LOCAL_ONE"
+    choices:
+        - ANY
+        - ONE
+        - TWO
+        - THREE
+        - QUORUM
+        - ALL
+        - LOCAL_QUORUM
+        - EACH_QUORUM
+        - SERIAL
+        - LOCAL_SERIAL
+        - LOCAL_ONE
 
 requirements:
   - cassandra-driver
@@ -147,7 +167,10 @@
 
 try:
     from cassandra.cluster import Cluster, AuthenticationFailed
+    from cassandra.cluster import EXEC_PROFILE_DEFAULT
+    from cassandra.cluster import ExecutionProfile
     from cassandra.auth import PlainTextAuthProvider
+    from cassandra import ConsistencyLevel
     HAS_CASSANDRA_DRIVER = True
 except Exception:
     HAS_CASSANDRA_DRIVER = False
@@ -251,6 +274,38 @@ def keyspace_is_changed(module, cluster, keyspace, replication_factor,
         module.fail_json("Unknown Replication strategy: {0}".format(cfg['class']))
     return keyspace_definition_changed
 
+
+def get_read_and_write_sessions(login_host,
+                                login_port,
+                                auth_provider,
+                                ssl_context,
+                                consistency_level):
+    profile = ExecutionProfile(
+        consistency_level=ConsistencyLevel.name_to_value[consistency_level])
+    if consistency_level in ["ANY", "EACH_QUORUM"]:  # Not supported for reads
+        cluster_r = Cluster(login_host,
+                            port=login_port,
+                            auth_provider=auth_provider,
+                            ssl_context=ssl_context)  # Will be LOCAL_ONE
+    else:
+        cluster_r = Cluster(login_host,
+                            port=login_port,
+                            auth_provider=auth_provider,
+                            ssl_context=ssl_context,
+                            execution_profiles={EXEC_PROFILE_DEFAULT: profile})
+    if consistency_level in ["SERIAL", "LOCAL_SERIAL"]:  # Not supported for writes
+        cluster_w = Cluster(login_host,
+                            port=login_port,
+                            auth_provider=auth_provider,
+                            ssl_context=ssl_context)  # Will be LOCAL_ONE
+    else:
+        cluster_w = Cluster(login_host,
+                            port=login_port,
+                            auth_provider=auth_provider,
+                            ssl_context=ssl_context,
+                            execution_profiles={EXEC_PROFILE_DEFAULT: profile})
+    return (cluster_r, cluster_w)  # Return a tuple of sessions for C* (read, write)
+
 ############################################
 
 
@@ -273,7 +328,11 @@ def main():
             state=dict(type='str', required=True, choices=['present', 'absent']),
             replication_factor=dict(type='int', default=1),
             durable_writes=dict(type='bool', default=True),
-            data_centres=dict(type='dict', aliases=['data_centers'])),
+            data_centres=dict(type='dict', aliases=['data_centers']),
+            consistency_level=dict(type='str',
+                                   required=False,
+                                   default="LOCAL_ONE",
+                                   choices=ConsistencyLevel.name_to_value.keys())),
         supports_check_mode=True
     )
 
@@ -303,6 +362,7 @@ def main():
     replication_factor = module.params['replication_factor']
     durable_writes = module.params['durable_writes']
     data_centres = module.params['data_centres']
+    consistency_level = module.params['consistency_level']
 
     if HAS_SSL_LIBRARY is False and ssl is True:
         msg = ("This module requires the SSL python"
@@ -343,18 +403,24 @@ def main():
             ssl_context.verify_mode = getattr(ssl_lib, module.params['ssl_cert_reqs'])
             if ssl_cert_reqs in ('CERT_REQUIRED', 'CERT_OPTIONAL'):
                 ssl_context.load_verify_locations(module.params['ssl_ca_certs'])
-        cluster = Cluster(login_host,
-                          port=login_port,
-                          auth_provider=auth_provider,
-                          ssl_context=ssl_context)
-        session = cluster.connect()
+
+        sessions = get_read_and_write_sessions(login_host,
+                                               login_port,
+                                               auth_provider,
+                                               ssl_context,
+                                               consistency_level)
+
+        cluster = sessions[1]  # maintain cluster object for comptbility
+        session_r = sessions[0].connect()
+        session_w = sessions[1].connect()
+
     except AuthenticationFailed as excep:
         module.fail_json(msg="Authentication failed: {0}".format(excep))
     except Exception as excep:
         module.fail_json(msg="Error connecting to cluster: {0}".format(excep))
 
     try:
-        if keyspace_exists(session, keyspace):
+        if keyspace_exists(session_r, keyspace):
             if module.check_mode:
                 if state == "present":
                     if keyspace_is_changed(module,
@@ -379,7 +445,7 @@ def main():
                                            data_centres):
 
                         cql = create_alter_keyspace(module,
-                                                    session,
+                                                    session_w,
                                                     keyspace,
                                                     replication_factor,
                                                     durable_writes,
@@ -390,7 +456,7 @@ def main():
                     else:
                         result['changed'] = False
                 elif state == "absent":
-                    drop_keyspace(session, keyspace)
+                    drop_keyspace(session_w, keyspace)
                     result['changed'] = True
         else:
             if module.check_mode:
@@ -401,7 +467,7 @@ def main():
             else:
                 if state == "present":
                     cql = create_alter_keyspace(module,
-                                                session,
+                                                session_w,
                                                 keyspace,
                                                 replication_factor,
                                                 durable_writes,
diff --git a/plugins/modules/cassandra_role.py b/plugins/modules/cassandra_role.py
index f2a28589..be058cb9 100644
--- a/plugins/modules/cassandra_role.py
+++ b/plugins/modules/cassandra_role.py
@@ -108,6 +108,26 @@
       - Additional debug output.
     type: bool
     default: false
+  consistency_level:
+    description:
+      - Consistency level to perform cassandra queries with.
+      - Not all consistency levels are supported by read or write connections.\
+        When a level is not supported then LOCAL_ONE, the default is used.
+      - Consult the README.md on GitHub for further details.
+    type: str
+    default: "LOCAL_ONE"
+    choices:
+        - ANY
+        - ONE
+        - TWO
+        - THREE
+        - QUORUM
+        - ALL
+        - LOCAL_QUORUM
+        - EACH_QUORUM
+        - SERIAL
+        - LOCAL_SERIAL
+        - LOCAL_ONE
 '''
 
 EXAMPLES = r'''
@@ -187,10 +207,13 @@
 
 try:
     from cassandra.cluster import Cluster
+    from cassandra.cluster import EXEC_PROFILE_DEFAULT
+    from cassandra.cluster import ExecutionProfile
     from cassandra.auth import PlainTextAuthProvider
     from cassandra import AuthenticationFailed
     from cassandra.query import dict_factory
     from cassandra import InvalidRequest
+    from cassandra import ConsistencyLevel
     HAS_CASSANDRA_DRIVER = True
 except Exception:
     HAS_CASSANDRA_DRIVER = False
@@ -221,8 +244,8 @@ def role_exists(session, role):
 
 def get_role_properties(session, role):
     cql = "SELECT role, can_login, is_superuser, member_of, salted_hash FROM system_auth.roles WHERE role = '{0}'".format(role)
-    session.row_factory = dict_factory
-    role_properties = session.execute(cql)
+    dict_factory_profile = session.execution_profile_clone_update(EXEC_PROFILE_DEFAULT, row_factory=dict_factory)
+    role_properties = session.execute(cql, execution_profile=dict_factory_profile)
     return role_properties[0]
 
 
@@ -248,7 +271,7 @@ def is_role_changed(role_properties, super_user, login, password,
     return changed
 
 
-def create_alter_role(module, session, role, super_user, login, password,
+def create_alter_role(module, role, super_user, login, password,
                       options, data_centres, alter_role):
     if alter_role is False:
         cql = "CREATE ROLE '{0}' ".format(role)
@@ -275,14 +298,14 @@ def create_alter_role(module, session, role, super_user, login, password,
     return cql
 
 
-def create_role(session, role):
+def create_role(role):
     ''' Used for creating roles that are assigned to other users
     '''
     cql = "CREATE ROLE '{0}'".format(role)
     return cql
 
 
-def grant_role(session, role, grantee):
+def grant_role(role, grantee):
     ''' Assign roles to other roles
     '''
     cql = "GRANT '{0}' TO '{1}'".format(role,
@@ -290,7 +313,7 @@ def grant_role(session, role, grantee):
     return cql
 
 
-def revoke_role(session, role, grantee):
+def revoke_role(role, grantee):
     ''' Revoke a role
     '''
     cql = "REVOKE '{0}' FROM '{1}'".format(role,
@@ -298,7 +321,7 @@ def revoke_role(session, role, grantee):
     return cql
 
 
-def drop_role(session, role):
+def drop_role(role):
     cql = "DROP ROLE '{0}'".format(role)
     return cql
 
@@ -326,7 +349,7 @@ def validate_keyspace_permissions(keyspace_permissions):
     return True
 
 
-def grant_permission(session, permission, role, keyspace):
+def grant_permission(permission, role, keyspace):
     if keyspace == "all_keyspaces":
         cql = "GRANT {0} ON ALL KEYSPACES TO '{1}'".format(permission,
                                                            role)
@@ -337,7 +360,7 @@ def grant_permission(session, permission, role, keyspace):
     return cql
 
 
-def revoke_permission(session, permission, role, keyspace):
+def revoke_permission(permission, role, keyspace):
     cql = "REVOKE {0} ON KEYSPACE {1} FROM '{2}'".format(permission,
                                                          keyspace,
                                                          role)
@@ -365,9 +388,9 @@ def list_role_permissions(session, role):
      Returns a resultset object of dicts
     '''
     cql = "LIST ALL OF '{0}'".format(role)
-    session.row_factory = dict_factory
     try:
-        role_permissions = session.execute(cql)
+        dict_factory_profile = session.execution_profile_clone_update(EXEC_PROFILE_DEFAULT, row_factory=dict_factory)
+        role_permissions = session.execute(cql, execution_profile=dict_factory_profile)
     except InvalidRequest as excep:
         # excep_code = type(excep).__name__
         # if excep_code == 2200: # User does not exist
@@ -437,16 +460,14 @@ def build_role_grants(session,
     if current_roles is not None and roles is not None:
         for r in current_roles:
             if r not in roles:
-                cql = revoke_role(session,
-                                  r,
+                cql = revoke_role(r,
                                   role)
                 roles_dict['revoke'].add(cql)
     # grants
     if roles is not None:
         for r in roles:
             if r not in current_roles:
-                cql = grant_role(session,
-                                 r,
+                cql = grant_role(r,
                                  role)
                 roles_dict['grant'].add(cql)
     return roles_dict
@@ -469,9 +490,6 @@ def build_role_permissions(session,
                    "REVOKE ALL PERMISSIONS ON ALL KEYSPACES FROM legacy_app"]
     }
 
-    # TODO - To support check mode we probably have to remove the sesssion.execs
-    # from here and run them elsewhere
-
     '''
 
     perms_dict = {
@@ -494,8 +512,7 @@ def build_role_permissions(session,
                 if bool:
                     pass  # permission is already assigned
                 else:
-                    cql = grant_permission(session,
-                                           permission,
+                    cql = grant_permission(permission,
                                            role,
                                            keyspace)
                     perms_dict['grant'].add(cql)
@@ -523,8 +540,7 @@ def build_role_permissions(session,
                     if ks in keyspace_permissions.keys() \
                             and permission['permission'] not in keyspace_permissions[ks] \
                             and "ALL PERMISSIONS" not in keyspace_permissions[ks]:
-                        cql = revoke_permission(session,
-                                                permission['permission'],
+                        cql = revoke_permission(permission['permission'],
                                                 role,
                                                 ks)
                         perms_dict['revoke'].add(cql)
@@ -532,8 +548,7 @@ def build_role_permissions(session,
             if permission['resource'].startswith('<keyspace') \
                     and permission['role'] == role \
                     and permission['resource'].split(' ')[1].replace('>', '') not in keyspace_permissions.keys():
-                cql = revoke_permission(session,
-                                        permission['permission'],
+                cql = revoke_permission(permission['permission'],
                                         role,
                                         ks)
                 perms_dict['revoke'].add(cql)
@@ -542,8 +557,7 @@ def build_role_permissions(session,
             if permission['resource'].startswith('<keyspace') \
                     and permission['role'] == role:  # We don't touch other permissions
                 ks = permission['resource'].split(' ')[1].replace('>', '')
-                cql = revoke_permission(session,
-                                        permission['permission'],
+                cql = revoke_permission(permission['permission'],
                                         role,
                                         ks)
                 perms_dict['revoke'].add(cql)
@@ -559,8 +573,40 @@ def process_role_permissions(session,
     return cql_dict
 
 
+def get_read_and_write_sessions(login_host,
+                                login_port,
+                                auth_provider,
+                                ssl_context,
+                                consistency_level):
+    profile = ExecutionProfile(
+        consistency_level=ConsistencyLevel.name_to_value[consistency_level])
+    if consistency_level in ["ANY", "EACH_QUORUM"]:  # Not supported for reads
+        cluster_r = Cluster(login_host,
+                            port=login_port,
+                            auth_provider=auth_provider,
+                            ssl_context=ssl_context)  # Will be LOCAL_ONE
+    else:
+        cluster_r = Cluster(login_host,
+                            port=login_port,
+                            auth_provider=auth_provider,
+                            ssl_context=ssl_context,
+                            execution_profiles={EXEC_PROFILE_DEFAULT: profile})
+    if consistency_level in ["SERIAL", "LOCAL_SERIAL"]:  # Not supported for writes
+        cluster_w = Cluster(login_host,
+                            port=login_port,
+                            auth_provider=auth_provider,
+                            ssl_context=ssl_context)  # Will be LOCAL_ONE
+    else:
+        cluster_w = Cluster(login_host,
+                            port=login_port,
+                            auth_provider=auth_provider,
+                            ssl_context=ssl_context,
+                            execution_profiles={EXEC_PROFILE_DEFAULT: profile})
+    return (cluster_r, cluster_w)  # Return a tuple of sessions for C* (read, write)
+
 ############################################
 
+
 def main():
     module = AnsibleModule(
         argument_spec=dict(
@@ -586,7 +632,11 @@ def main():
             keyspace_permissions=dict(type='dict', no_log=False),
             roles=dict(type='list', elements='str'),
             update_password=dict(type='bool', default=False),
-            debug=dict(type='bool', default=False)),
+            debug=dict(type='bool', default=False),
+            consistency_level=dict(type='str',
+                                   required=False,
+                                   default="LOCAL_ONE",
+                                   choices=ConsistencyLevel.name_to_value.keys())),
         supports_check_mode=True
     )
 
@@ -613,6 +663,7 @@ def main():
     keyspace_permissions = module.params['keyspace_permissions']
     roles = module.params['roles']
     debug = module.params['debug']
+    consistency_level = module.params['consistency_level']
 
     if HAS_SSL_LIBRARY is False and ssl is True:
         msg = ("This module requires the SSL python"
@@ -660,11 +711,16 @@ def main():
             ssl_context.verify_mode = getattr(ssl_lib, module.params['ssl_cert_reqs'])
             if ssl_cert_reqs in ('CERT_REQUIRED', 'CERT_OPTIONAL'):
                 ssl_context.load_verify_locations(module.params['ssl_ca_certs'])
-        cluster = Cluster(login_host,
-                          port=login_port,
-                          auth_provider=auth_provider,
-                          ssl_context=ssl_context)
-        session = cluster.connect()
+
+        sessions = get_read_and_write_sessions(login_host,
+                                               login_port,
+                                               auth_provider,
+                                               ssl_context,
+                                               consistency_level)
+
+        session_r = sessions[0].connect()
+        session_w = sessions[1].connect()
+
     except AuthenticationFailed as auth_failed:
         module.fail_json(msg="Authentication failed: {0}".format(auth_failed))
     except Exception as excep:
@@ -674,11 +730,11 @@ def main():
 
     try:
         if debug:
-            result['role_exists'] = role_exists(session, role)
+            result['role_exists'] = role_exists(session_r, role)
         if login:  # Standard user
-            if role_exists(session, role):
+            if role_exists(session_r, role):
                 # Has the role changed?
-                role_properties = get_role_properties(session,
+                role_properties = get_role_properties(session_r,
                                                       role)
                 has_role_changed = is_role_changed(role_properties,
                                                    super_user,
@@ -699,7 +755,6 @@ def main():
                         # create the role
                         if has_role_changed:
                             cql = create_alter_role(module,
-                                                    session,
                                                     role,
                                                     super_user,
                                                     login,
@@ -707,12 +762,12 @@ def main():
                                                     options,
                                                     data_centres,
                                                     has_role_changed)
-                            session .execute(cql)
+                            session_w.execute(cql)
                             result['changed'] = True
                             result['cql'] = cql
                     elif state == "absent":
-                        cql = drop_role(session, role)
-                        session.execute(cql)
+                        cql = drop_role(role)
+                        session_w.execute(cql)
                         result['changed'] = True
                         result['cql'] = cql
             else:
@@ -724,7 +779,6 @@ def main():
                 else:
                     if state == "present":
                         cql = create_alter_role(module,
-                                                session,
                                                 role,
                                                 super_user,
                                                 login,
@@ -732,34 +786,33 @@ def main():
                                                 options,
                                                 data_centres,
                                                 False)
-                        session .execute(cql)
+                        session_w.execute(cql)
                         result['changed'] = True
                         result['cql'] = cql
                     elif state == "absent":
                         result['changed'] = False
         else:  # This is a role
-            if role_exists(session, role):
+            if role_exists(session_r, role):
                 if module.check_mode:
                     if state == "present":
                         result['changed'] = False
                     elif state == "absent":
-                        cql = drop_role(session, role)
-                        session.execute(cql)
+                        cql = drop_role(role)
+                        session_w.execute(cql)
                         result['changed'] = True
                         result['cql'] = cql
                 else:
                     if state == "present":
                         result['changed'] = False
                     elif state == "absent":
-                        cql = drop_role(session, role)
-                        session.execute(cql)
+                        cql = drop_role(role)
+                        session_w.execute(cql)
                         result['changed'] = True
                         result['cql'] = cql
             else:
                 if module.check_mode:
                     if state == "present":
                         cql = create_alter_role(module,
-                                                session,
                                                 role,
                                                 super_user,
                                                 login,
@@ -767,36 +820,36 @@ def main():
                                                 options,
                                                 data_centres,
                                                 has_role_changed)
-                        session .execute(cql)
+                        session_w.execute(cql)
                         result['changed'] = True
                         result['cql'] = cql
                     elif state == "absent":
                         result['changed'] = False
                 else:
                     if state == "present":
-                        cql = create_role(session, role)
-                        session.execute(cql)
+                        cql = create_role(role)
+                        session_w.execute(cql)
                         result['changed'] = True
                         result['cql'] = cql
                     elif state == "absent":
                         result['changed'] = False
 
         if state == "present":
-            cql_dict = process_role_permissions(session,
+            cql_dict = process_role_permissions(session_r,
                                                 keyspace_permissions,
                                                 role)
             if len(cql_dict['grant']) > 0 or len(cql_dict['revoke']) > 0:
                 for r in cql_dict['revoke']:
                     if not module.check_mode:
-                        session.execute(r)
+                        session_w.execute(r)
                 for g in cql_dict['grant']:
                     if not module.check_mode:
-                        session.execute(g)
+                        session_w.execute(g)
                 result['permissions'] = cql_dict
                 result['changed'] = True
 
             # Process roles
-            roles_dict = build_role_grants(session,
+            roles_dict = build_role_grants(session_r,
                                            role,
                                            roles)
 
@@ -804,10 +857,10 @@ def main():
                 result['roles'] = roles_dict
                 for r in roles_dict['revoke']:
                     if not module.check_mode:
-                        session.execute(r)
+                        session_w.execute(r)
                 for g in roles_dict['grant']:
                     if not module.check_mode:
-                        session.execute(g)
+                        session_w.execute(g)
 
                 result['changed'] = True
 
diff --git a/plugins/modules/cassandra_table.py b/plugins/modules/cassandra_table.py
index e7c046ce..aa021f56 100644
--- a/plugins/modules/cassandra_table.py
+++ b/plugins/modules/cassandra_table.py
@@ -99,6 +99,26 @@
       - Debug flag
     type: bool
     default: false
+  consistency_level:
+    description:
+      - Consistency level to perform cassandra queries with.
+      - Not all consistency levels are supported by read or write connections.\
+        When a level is not supported then LOCAL_ONE, the default is used.
+      - Consult the README.md on GitHub for further details.
+    type: str
+    default: "LOCAL_ONE"
+    choices:
+        - ANY
+        - ONE
+        - TWO
+        - THREE
+        - QUORUM
+        - ALL
+        - LOCAL_QUORUM
+        - EACH_QUORUM
+        - SERIAL
+        - LOCAL_SERIAL
+        - LOCAL_ONE
 '''
 
 EXAMPLES = r'''
@@ -262,8 +282,11 @@
 
 try:
     from cassandra.cluster import Cluster
+    from cassandra.cluster import EXEC_PROFILE_DEFAULT
+    from cassandra.cluster import ExecutionProfile
     from cassandra.auth import PlainTextAuthProvider
     from cassandra import AuthenticationFailed
+    from cassandra import ConsistencyLevel
     HAS_CASSANDRA_DRIVER = True
 except Exception:
     HAS_CASSANDRA_DRIVER = False
@@ -378,8 +401,40 @@ def drop_table(keyspace_name,
     return cql
 
 
+def get_read_and_write_sessions(login_host,
+                                login_port,
+                                auth_provider,
+                                ssl_context,
+                                consistency_level):
+    profile = ExecutionProfile(
+        consistency_level=ConsistencyLevel.name_to_value[consistency_level])
+    if consistency_level in ["ANY", "EACH_QUORUM"]:  # Not supported for reads
+        cluster_r = Cluster(login_host,
+                            port=login_port,
+                            auth_provider=auth_provider,
+                            ssl_context=ssl_context)  # Will be LOCAL_ONE
+    else:
+        cluster_r = Cluster(login_host,
+                            port=login_port,
+                            auth_provider=auth_provider,
+                            ssl_context=ssl_context,
+                            execution_profiles={EXEC_PROFILE_DEFAULT: profile})
+    if consistency_level in ["SERIAL", "LOCAL_SERIAL"]:  # Not supported for writes
+        cluster_w = Cluster(login_host,
+                            port=login_port,
+                            auth_provider=auth_provider,
+                            ssl_context=ssl_context)  # Will be LOCAL_ONE
+    else:
+        cluster_w = Cluster(login_host,
+                            port=login_port,
+                            auth_provider=auth_provider,
+                            ssl_context=ssl_context,
+                            execution_profiles={EXEC_PROFILE_DEFAULT: profile})
+    return (cluster_r, cluster_w)  # Return a tuple of sessions for C* (read, write)
+
 ############################################
 
+
 def main():
 
     # required_if_args = [
@@ -409,7 +464,11 @@ def main():
             partition_key=dict(type='list', elements='str', default=[], no_log=False),
             table_options=dict(type='dict', default=None),
             is_type=dict(type='bool', default=False),
-            debug=dict(type='bool', default=False)),
+            debug=dict(type='bool', default=False),
+            consistency_level=dict(type='str',
+                                   required=False,
+                                   default="LOCAL_ONE",
+                                   choices=ConsistencyLevel.name_to_value.keys())),
         supports_check_mode=True
     )
 
@@ -434,6 +493,7 @@ def main():
     table_options = module.params['table_options']
     is_type = module.params['is_type']
     debug = module.params['debug']
+    consistency_level = module.params['consistency_level']
 
     if HAS_SSL_LIBRARY is False and ssl is True:
         msg = ("This module requires the SSL python"
@@ -477,24 +537,29 @@ def main():
             ssl_context.verify_mode = getattr(ssl_lib, module.params['ssl_cert_reqs'])
             if ssl_cert_reqs in ('CERT_REQUIRED', 'CERT_OPTIONAL'):
                 ssl_context.load_verify_locations(module.params['ssl_ca_certs'])
-        cluster = Cluster(login_host,
-                          port=login_port,
-                          auth_provider=auth_provider,
-                          ssl_context=ssl_context)
-        session = cluster.connect()
+
+        sessions = get_read_and_write_sessions(login_host,
+                                               login_port,
+                                               auth_provider,
+                                               ssl_context,
+                                               consistency_level)
+
+        session_r = sessions[0].connect()
+        session_w = sessions[1].connect()
+
     except AuthenticationFailed as excep:
         module.fail_json(msg="Authentication failed: {0}".format(excep))
     except Exception as excep:
         module.fail_json(msg="Error connecting to cluster: {0}".format(excep))
 
     try:
-        if table_exists(session, keyspace_name, table_name):
+        if table_exists(session_r, keyspace_name, table_name):
             if state == "present":
                 result['changed'] = False
             else:
                 cql = drop_table(keyspace_name, table_name)
                 if not module.check_mode:
-                    session.execute(cql)
+                    session_w.execute(cql)
                 result['changed'] = True
                 result['cql'] = cql
         else:  # Table does not exist
@@ -508,7 +573,7 @@ def main():
                                    table_options,
                                    is_type)
                 if not module.check_mode:
-                    session.execute(cql)
+                    session_w.execute(cql)
                 result['changed'] = True
                 result['cql'] = cql
             else:
diff --git a/tests/integration/targets/cassandra_keyspace/tasks/284.yml b/tests/integration/targets/cassandra_keyspace/tasks/284.yml
new file mode 100644
index 00000000..424c3ea6
--- /dev/null
+++ b/tests/integration/targets/cassandra_keyspace/tasks/284.yml
@@ -0,0 +1,43 @@
+- name: Create a test keyspace - LOCAL_ONE consistency
+  community.cassandra.cassandra_keyspace:
+    name: local_keyspace
+    state: present
+    data_centres:
+      zurich: 3
+      tokyo: 3
+      new_york: 2
+    consistency_level: "LOCAL_ONE"
+  register: local
+
+- assert:
+    that:
+      - local.changed
+
+- name: Create a test keyspace- ANY consistency
+  community.cassandra.cassandra_keyspace:
+    name: any_keyspace
+    state: present
+    data_centres:
+      zurich: 3
+      tokyo: 3
+      new_york: 2
+  register: any
+
+- assert:
+    that:
+      - any.changed
+
+- name: Create a test keyspace - QUORUM consistency
+  community.cassandra.cassandra_keyspace:
+    name: quorum_keyspace
+    state: present
+    data_centres:
+      zurich: 3
+      tokyo: 3
+      new_york: 2
+    consistency_level: "QUORUM"
+  register: quorum
+
+- assert:
+    that:
+      - quorum.changed
diff --git a/tests/integration/targets/cassandra_keyspace/tasks/main.yml b/tests/integration/targets/cassandra_keyspace/tasks/main.yml
index 594bb260..dca33628 100644
--- a/tests/integration/targets/cassandra_keyspace/tasks/main.yml
+++ b/tests/integration/targets/cassandra_keyspace/tasks/main.yml
@@ -569,3 +569,5 @@
           - "'\\'london\\': \\'3\\'' in mykeyspace.stdout"
           - "'\\'new_york\\': \\'3\\'' in mykeyspace.stdout"
           - "multiple_dcs.changed == True"
+
+    - import_tasks: 284.yml
diff --git a/tests/integration/targets/cassandra_role/tasks/284.yml b/tests/integration/targets/cassandra_role/tasks/284.yml
new file mode 100644
index 00000000..4d18c66c
--- /dev/null
+++ b/tests/integration/targets/cassandra_role/tasks/284.yml
@@ -0,0 +1,115 @@
+- name: Create a test role - LOCAL_ONE consistency
+  community.cassandra.cassandra_role:
+    name: local_role
+    password: p4ssw0rd
+    login: true
+    keyspace_permissions:
+      test_keyspace:
+        - "ALL PERMISSIONS"
+    state: present
+    login_user: "{{ cassandra_admin_user }}"
+    login_password: "{{ cassandra_admin_pwd }}"
+    debug: yes
+    consistency_level: "LOCAL_ONE"
+  register: local
+
+- assert:
+    that:
+      - local.changed
+
+- name: Create a test role - ANY consistency
+  community.cassandra.cassandra_role:
+    name: any_role
+    password: p4ssw0rd
+    login: true
+    keyspace_permissions:
+      test_keyspace:
+        - "ALL PERMISSIONS"
+    state: present
+    login_user: "{{ cassandra_admin_user }}"
+    login_password: "{{ cassandra_admin_pwd }}"
+    debug: yes
+    consistency_level: "ANY"
+  register: any
+
+- assert:
+    that:
+      - any.changed
+
+- name: Create a test role - QUORUM consistency
+  community.cassandra.cassandra_role:
+    name: quorum_role
+    password: p4ssw0rd
+    login: true
+    keyspace_permissions:
+      test_keyspace:
+        - "ALL PERMISSIONS"
+    state: present
+    login_user: "{{ cassandra_admin_user }}"
+    login_password: "{{ cassandra_admin_pwd }}"
+    debug: yes
+    consistency_level: "QUORUM"
+  register: quorum
+
+- assert:
+    that:
+      - quorum.changed
+
+- name: Create a test role - SERIAL consistency
+  community.cassandra.cassandra_role:
+    name: serial_role
+    password: p4ssw0rd
+    login: true
+    keyspace_permissions:
+      test_keyspace:
+        - "ALL PERMISSIONS"
+    state: present
+    login_user: "{{ cassandra_admin_user }}"
+    login_password: "{{ cassandra_admin_pwd }}"
+    debug: yes
+    consistency_level: "SERIAL"
+  register: serial
+
+- assert:
+    that:
+      - serial.changed
+
+- name: Create a test role - LOCAL_QUORUM consistency
+  community.cassandra.cassandra_role:
+    name: local_quorum_role
+    password: p4ssw0rd
+    login: true
+    keyspace_permissions:
+      test_keyspace:
+        - "ALL PERMISSIONS"
+    state: present
+    login_user: "{{ cassandra_admin_user }}"
+    login_password: "{{ cassandra_admin_pwd }}"
+    debug: yes
+    consistency_level: "LOCAL_QUORUM"
+  register: local_quorum
+
+- assert:
+    that:
+      - local_quorum.changed
+
+- name: Create a test role - TWO consistency
+  community.cassandra.cassandra_role:
+    name: two_role
+    password: p4ssw0rd
+    login: true
+    keyspace_permissions:
+      test_keyspace:
+        - "ALL PERMISSIONS"
+    state: present
+    login_user: "{{ cassandra_admin_user }}"
+    login_password: "{{ cassandra_admin_pwd }}"
+    debug: yes
+    consistency_level: "TWO"
+  register: two
+  ignore_errors: true
+
+- assert:
+    that:
+      - two.failed
+      - "'Cannot achieve consistency level TWO' in two.msg"
diff --git a/tests/integration/targets/cassandra_role/tasks/main.yml b/tests/integration/targets/cassandra_role/tasks/main.yml
index 7a6795cf..79ae1727 100644
--- a/tests/integration/targets/cassandra_role/tasks/main.yml
+++ b/tests/integration/targets/cassandra_role/tasks/main.yml
@@ -838,3 +838,6 @@
 
 - name: Import tasks for issue 204
   import_tasks: 204.yml
+
+- name: Import testa for issue 284
+  import_tasks: 284.yml
diff --git a/tests/integration/targets/cassandra_table/tasks/284.yml b/tests/integration/targets/cassandra_table/tasks/284.yml
new file mode 100644
index 00000000..b87d458d
--- /dev/null
+++ b/tests/integration/targets/cassandra_table/tasks/284.yml
@@ -0,0 +1,75 @@
+- name: Create a keyspace for tests
+  cassandra_keyspace:
+    name: consistency
+    state: present
+    login_user: "{{ cassandra_admin_user }}"
+    login_password: "{{ cassandra_admin_pwd }}"
+
+- name: Create a test keyspace - LOCAL_ONE consistency
+  community.cassandra.cassandra_table:
+    name: local
+    keyspace: consistency
+    state: present
+    columns:
+      - id: uuid
+      - username: text
+      - encrypted_password: blob
+      - first_name: text
+      - last_name: text
+      - dob: date
+    primary_key:
+      - username
+    consistency_level: "LOCAL_ONE"
+    login_user: "{{ cassandra_admin_user }}"
+    login_password: "{{ cassandra_admin_pwd }}"
+  register: local
+
+- assert:
+    that:
+      - local.changed
+
+- name: Create a test keyspace- ANY consistency
+  community.cassandra.cassandra_table:
+    name: any
+    keyspace: consistency
+    state: present
+    columns:
+      - id: uuid
+      - username: text
+      - encrypted_password: blob
+      - first_name: text
+      - last_name: text
+      - dob: date
+    primary_key:
+      - username
+    consistency_level: "ANY"
+    login_user: "{{ cassandra_admin_user }}"
+    login_password: "{{ cassandra_admin_pwd }}"
+  register: any
+
+- assert:
+    that:
+      - any.changed
+
+- name: Create a test keyspace - QUORUM consistency
+  community.cassandra.cassandra_table:
+    name: quorum
+    keyspace: consistency
+    state: present
+    columns:
+      - id: uuid
+      - username: text
+      - encrypted_password: blob
+      - first_name: text
+      - last_name: text
+      - dob: date
+    primary_key:
+      - username
+    consistency_level: "QUORUM"
+    login_user: "{{ cassandra_admin_user }}"
+    login_password: "{{ cassandra_admin_pwd }}"
+  register: quorum
+
+- assert:
+    that:
+      - quorum.changed
diff --git a/tests/integration/targets/cassandra_table/tasks/main.yml b/tests/integration/targets/cassandra_table/tasks/main.yml
index 18f05976..888e9b64 100644
--- a/tests/integration/targets/cassandra_table/tasks/main.yml
+++ b/tests/integration/targets/cassandra_table/tasks/main.yml
@@ -664,3 +664,5 @@
       - "'LZ4Compressor' in killrvideo.stdout"
       - "'gc_grace_seconds = 864001' in killrvideo.stdout"
       - "'bloom_filter_fp_chance = 0.02' in killrvideo.stdout"
+
+- import_tasks: 284.yml
diff --git a/tests/sanity/ignore-2.15.txt b/tests/sanity/ignore-2.15.txt
new file mode 120000
index 00000000..3f569108
--- /dev/null
+++ b/tests/sanity/ignore-2.15.txt
@@ -0,0 +1 @@
+ignore-2.19.txt
\ No newline at end of file
diff --git a/tests/sanity/ignore-2.16.txt b/tests/sanity/ignore-2.16.txt
new file mode 120000
index 00000000..3f569108
--- /dev/null
+++ b/tests/sanity/ignore-2.16.txt
@@ -0,0 +1 @@
+ignore-2.19.txt
\ No newline at end of file
diff --git a/tests/sanity/ignore-2.17.txt b/tests/sanity/ignore-2.17.txt
new file mode 120000
index 00000000..3f569108
--- /dev/null
+++ b/tests/sanity/ignore-2.17.txt
@@ -0,0 +1 @@
+ignore-2.19.txt
\ No newline at end of file
diff --git a/tests/sanity/ignore-2.18.txt b/tests/sanity/ignore-2.18.txt
new file mode 120000
index 00000000..3f569108
--- /dev/null
+++ b/tests/sanity/ignore-2.18.txt
@@ -0,0 +1 @@
+ignore-2.19.txt
\ No newline at end of file
diff --git a/tests/sanity/ignore-2.19.txt b/tests/sanity/ignore-2.19.txt
new file mode 100644
index 00000000..c7fcae39
--- /dev/null
+++ b/tests/sanity/ignore-2.19.txt
@@ -0,0 +1,6 @@
+plugins/modules/cassandra_keyspace.py validate-modules:import-error
+plugins/modules/cassandra_role.py validate-modules:import-error
+plugins/modules/cassandra_table.py validate-modules:import-error
+plugins/modules/cassandra_keyspace.py import-3.9
+plugins/modules/cassandra_role.py import-3.9
+plugins/modules/cassandra_table.py import-3.9
\ No newline at end of file