From 48580c1cd1fdc7465762e1bcf180f9b0d88c5b70 Mon Sep 17 00:00:00 2001 From: Mark Tripod Date: Thu, 30 Nov 2023 10:58:28 -0500 Subject: [PATCH] feat: add get_user_by_email() method to admin.py doc: add report_user_by_email.py to examples --- duo_client/admin.py | 552 ++++++++++++++++--------------- examples/report_user_by_email.py | 8 +- 2 files changed, 284 insertions(+), 276 deletions(-) diff --git a/duo_client/admin.py b/duo_client/admin.py index 884c6c3..ee50170 100644 --- a/duo_client/admin.py +++ b/duo_client/admin.py @@ -194,19 +194,19 @@ TOKEN_YUBIKEY = "yk" VALID_AUTHLOG_REQUEST_PARAMS = [ - "mintime", - "maxtime", - "limit", - "sort", - "next_offset", - "event_types", - "reasons", - "results", - "users", - "applications", - "groups", - "factors", - "api_version", + "mintime", + "maxtime", + "limit", + "sort", + "next_offset", + "event_types", + "reasons", + "results", + "users", + "applications", + "groups", + "factors", + "api_version", ] VALID_ACTIVITY_REQUEST_PARAMS = ["mintime", "maxtime", "limit", "sort", "next_offset"] @@ -220,12 +220,11 @@ def api_call(self, method, path, params): params['account_id'] = self.account_id return super(Admin, self).api_call( - method, - path, - params, + method, + path, + params, ) - @classmethod def _canonicalize_ip_whitelist(klass, ip_whitelist): if isinstance(ip_whitelist, six.string_types): @@ -280,7 +279,7 @@ def get_administrative_units(self, admin_id=None, group_id=None, params) iterator = self.get_administrative_units_iterator( - admin_id, group_id, integration_key) + admin_id, group_id, integration_key) return list(iterator) @@ -339,12 +338,12 @@ def get_administrator_log(self, # Sanity check mintime as unix timestamp, then transform to string mintime = str(int(mintime)) params = { - 'mintime': mintime, + 'mintime': mintime, } response = self.json_api_call( - 'GET', - '/admin/v1/logs/administrator', - params, + 'GET', + '/admin/v1/logs/administrator', + params, ) for row in response: row['eventtype'] = 'administrator' @@ -378,12 +377,12 @@ def get_offline_log(self, # Sanity check mintime as unix timestamp, then transform to string mintime = str(int(mintime)) params = { - 'mintime': mintime, + 'mintime': mintime, } response = self.json_api_call( - 'GET', - '/admin/v1/logs/offline_enrollment', - params, + 'GET', + '/admin/v1/logs/offline_enrollment', + params, ) return response @@ -486,21 +485,21 @@ def get_authentication_log(self, api_version=1, **kwargs): Raises RuntimeError on error. """ - if api_version not in [1,2]: + if api_version not in [1, 2]: raise ValueError("Invalid API Version") params = {} - if api_version == 1: #v1 + if api_version == 1: # v1 params['mintime'] = kwargs['mintime'] if 'mintime' in kwargs else 0; # Sanity check mintime as unix timestamp, then transform to string params['mintime'] = '{:d}'.format(int(params['mintime'])) warnings.warn( - 'The v1 Admin API for retrieving authentication log events ' - 'will be deprecated in a future release of the Duo Admin API. ' - 'Please migrate to the v2 API.', - DeprecationWarning) - else: #v2 + 'The v1 Admin API for retrieving authentication log events ' + 'will be deprecated in a future release of the Duo Admin API. ' + 'Please migrate to the v2 API.', + DeprecationWarning) + else: # v2 for k in kwargs: if kwargs[k] is not None and k in VALID_AUTHLOG_REQUEST_PARAMS: params[k] = kwargs[k] @@ -510,17 +509,15 @@ def get_authentication_log(self, api_version=1, **kwargs): # Sanity check mintime as unix timestamp, then transform to string params['mintime'] = '{:d}'.format(int(params['mintime'])) - if 'maxtime' not in params: params['maxtime'] = int(time.time()) * 1000 # Sanity check maxtime as unix timestamp, then transform to string params['maxtime'] = '{:d}'.format(int(params['maxtime'])) - response = self.json_api_call( - 'GET', - '/admin/v{}/logs/authentication'.format(api_version), - params, + 'GET', + '/admin/v{}/logs/authentication'.format(api_version), + params, ) if api_version == 1: @@ -613,16 +610,16 @@ def get_activity_logs(self, **kwargs): params['mintime'] = default_mintime params['mintime'] = str(int(params['mintime'])) if 'maxtime' not in params: - #if maxtime is not provided, the script defaults it to now + # if maxtime is not provided, the script defaults it to now params['maxtime'] = default_maxtime params['maxtime'] = str(int(params['maxtime'])) if 'limit' in params: params['limit'] = str(int(params['limit'])) response = self.json_api_call( - 'GET', - '/admin/v2/logs/activity', - params, + 'GET', + '/admin/v2/logs/activity', + params, ) for row in response['items']: row['eventtype'] = 'activity' @@ -681,7 +678,7 @@ def get_telephony_log(self, mintime=0, api_version=1, **kwargs): Raises RuntimeError on error. """ - if api_version not in [1,2]: + if api_version not in [1, 2]: raise ValueError("Invalid API Version") if api_version == 2: @@ -709,10 +706,28 @@ def get_users(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', '/admin/v1/users', {'limit': limit, 'offset': offset}) + 'GET', '/admin/v1/users', {'limit': limit, 'offset': offset}) return list(self.get_users_iterator()) + def get_user_by_email(self, email_address: str) -> list: + """ + Returns user specified by email_address. + + email_address - E-mail address of user to fetch + + Returns a list of 0 or 1 user objects. + + Raises RuntimeError on error. + """ + params = { + 'email': email_address, + } + response = self.json_api_call('GET', + '/admin/v1/users', + params) + return response + def get_user_by_id(self, user_id): """ Returns user specified by user_id. @@ -739,7 +754,7 @@ def get_users_by_name(self, username): Raises RuntimeError on error. """ params = { - 'username': username, + 'username': username, } response = self.json_api_call('GET', '/admin/v1/users', @@ -758,7 +773,7 @@ def get_users_by_names(self, usernames): """ username_list = json.dumps(usernames) params = { - 'username_list': username_list, + 'username_list': username_list, } response = self.json_paging_api_call('GET', '/admin/v1/users', @@ -777,7 +792,7 @@ def get_users_by_ids(self, user_ids): """ user_id_list = json.dumps(user_ids) params = { - 'user_id_list': user_id_list, + 'user_id_list': user_id_list, } response = self.json_paging_api_call('GET', '/admin/v1/users', @@ -806,7 +821,7 @@ def add_user(self, username, realname=None, status=None, Raises RuntimeError on error. """ params = { - 'username': username, + 'username': username, } if realname is not None: params['realname'] = realname @@ -914,8 +929,8 @@ def enroll_user(self, username, email, valid_secs=None): """ path = '/admin/v1/users/enroll' params = { - 'username': username, - 'email': email, + 'username': username, + 'email': email, } if valid_secs is not None: @@ -923,7 +938,8 @@ def enroll_user(self, username, email, valid_secs=None): return self.json_api_call('POST', path, params) - def add_user_bypass_codes(self, user_id, count=None, valid_secs=None, remaining_uses=None, codes=None, preserve_existing=None): + def add_user_bypass_codes(self, user_id, count=None, valid_secs=None, remaining_uses=None, codes=None, + preserve_existing=None): """ Replace a user's bypass codes with new codes. @@ -953,7 +969,7 @@ def add_user_bypass_codes(self, user_id, count=None, valid_secs=None, remaining_ if codes is not None: params['codes'] = self._canonicalize_bypass_codes(codes) - + if preserve_existing is not None: params['preserve_existing'] = preserve_existing @@ -975,7 +991,6 @@ def get_user_bypass_codes_iterator(self, user_id): path = '/admin/v1/users/' + user_id + '/bypass_codes' return self.json_paging_api_call('GET', path, {}) - def get_user_bypass_codes(self, user_id, limit=None, offset=0): """ Returns a list of bypass codes associated with a user. @@ -995,7 +1010,7 @@ def get_user_bypass_codes(self, user_id, limit=None, offset=0): user_id = six.moves.urllib.parse.quote_plus(str(user_id)) path = '/admin/v1/users/' + user_id + '/bypass_codes' return self.json_api_call( - 'GET', path, {'limit': limit, 'offset': offset}) + 'GET', path, {'limit': limit, 'offset': offset}) return list(self.get_user_bypass_codes_iterator(user_id)) @@ -1030,7 +1045,7 @@ def get_user_phones(self, user_id, limit=None, offset=0): user_id = six.moves.urllib.parse.quote_plus(str(user_id)) path = '/admin/v1/users/' + user_id + '/phones' return self.json_api_call( - 'GET', path, {'limit': limit, 'offset': offset}) + 'GET', path, {'limit': limit, 'offset': offset}) return list(self.get_user_phones_iterator(user_id)) @@ -1048,7 +1063,7 @@ def add_user_phone(self, user_id, phone_id): user_id = six.moves.urllib.parse.quote_plus(str(user_id)) path = '/admin/v1/users/' + user_id + '/phones' params = { - 'phone_id': phone_id, + 'phone_id': phone_id, } return self.json_api_call('POST', path, params) @@ -1067,7 +1082,7 @@ def delete_user_phone(self, user_id, phone_id): path = '/admin/v1/users/' + user_id + '/phones/' + phone_id params = {} return self.json_api_call('DELETE', path, - params) + params) def get_user_tokens_iterator(self, user_id): """ @@ -1100,7 +1115,7 @@ def get_user_tokens(self, user_id, limit=None, offset=0): user_id = six.moves.urllib.parse.quote_plus(str(user_id)) path = '/admin/v1/users/' + user_id + '/tokens' return self.json_api_call( - 'GET', path, {'limit': limit, 'offset': offset}) + 'GET', path, {'limit': limit, 'offset': offset}) return list(self.get_user_tokens_iterator(user_id)) @@ -1118,7 +1133,7 @@ def add_user_token(self, user_id, token_id): user_id = six.moves.urllib.parse.quote_plus(str(user_id)) path = '/admin/v1/users/' + user_id + '/tokens' params = { - 'token_id': token_id, + 'token_id': token_id, } return self.json_api_call('POST', path, params) @@ -1173,7 +1188,7 @@ def get_user_u2ftokens(self, user_id, limit=None, offset=0): user_id = six.moves.urllib.parse.quote_plus(str(user_id)) path = '/admin/v1/users/' + user_id + '/u2ftokens' return self.json_api_call( - 'GET', path, {'limit': limit, 'offset': offset}) + 'GET', path, {'limit': limit, 'offset': offset}) return list(self.get_user_u2ftokens_iterator(user_id)) @@ -1213,7 +1228,7 @@ def get_user_webauthncredentials(self, user_id, limit=None, offset=0): user_id = six.moves.urllib.parse.quote_plus(str(user_id)) path = '/admin/v1/users/' + user_id + '/webauthncredentials' return self.json_api_call( - 'GET', path, {'limit': limit, 'offset': offset}) + 'GET', path, {'limit': limit, 'offset': offset}) return list(self.get_user_webauthncredentials_iterator(user_id)) @@ -1248,7 +1263,7 @@ def get_user_groups(self, user_id, limit=None, offset=0): user_id = six.moves.urllib.parse.quote_plus(str(user_id)) path = '/admin/v1/users/' + user_id + '/groups' return self.json_api_call( - 'GET', path, {'limit': limit, 'offset': offset}) + 'GET', path, {'limit': limit, 'offset': offset}) return list(self.get_user_groups_iterator(user_id)) @@ -1329,9 +1344,9 @@ def get_phones_generator(self): Returns a generator yielding phones. """ return self.json_paging_api_call( - 'GET', - '/admin/v1/phones', - {} + 'GET', + '/admin/v1/phones', + {} ) def get_phones(self, limit=None, offset=0): @@ -1349,9 +1364,9 @@ def get_phones(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v1/phones', - {'limit': limit, 'offset': offset} + 'GET', + '/admin/v1/phones', + {'limit': limit, 'offset': offset} ) return list(self.get_phones_generator()) @@ -1386,7 +1401,7 @@ def get_phones_by_number(self, number, extension=None): if extension is not None: params['extension'] = extension response = self.json_api_call('GET', path, - params) + params) return response def add_phone(self, @@ -1431,7 +1446,7 @@ def add_phone(self, if postdelay is not None: params['postdelay'] = postdelay response = self.json_api_call('POST', path, - params) + params) return response def update_phone(self, phone_id, @@ -1477,7 +1492,7 @@ def update_phone(self, phone_id, if postdelay is not None: params['postdelay'] = postdelay response = self.json_api_call('POST', path, - params) + params) return response def delete_phone(self, phone_id): @@ -1534,7 +1549,7 @@ def send_sms_activation_to_phone(self, phone_id, if activation_msg is not None: params['activation_msg'] = activation_msg return self.json_api_call('POST', path, - params) + params) def create_activation_url(self, phone_id, valid_secs=None, @@ -1592,9 +1607,9 @@ def get_desktoptokens_generator(self): Returns a generator yielding desktoptokens. """ return self.json_paging_api_call( - 'GET', - '/admin/v1/desktoptokens', - {} + 'GET', + '/admin/v1/desktoptokens', + {} ) def get_desktoptokens(self, limit=None, offset=0): @@ -1613,9 +1628,9 @@ def get_desktoptokens(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v1/desktoptokens', - {'limit': limit, 'offset': offset} + 'GET', + '/admin/v1/desktoptokens', + {'limit': limit, 'offset': offset} ) return list(self.get_desktoptokens_generator()) @@ -1648,7 +1663,7 @@ def add_desktoptoken(self, Raises RuntimeError on error. """ params = { - 'platform': platform, + 'platform': platform, } if name is not None: params['name'] = name @@ -1716,8 +1731,8 @@ def activate_desktoptoken(self, desktoptoken_id, valid_secs=None): params['valid_secs'] = str(valid_secs) quoted_id = six.moves.urllib.parse.quote_plus(desktoptoken_id) response = self.json_api_call('POST', - '/admin/v1/desktoptokens/%s/activate' % quoted_id, - params) + '/admin/v1/desktoptokens/%s/activate' % quoted_id, + params) return response def get_tokens_generator(self): @@ -1725,9 +1740,9 @@ def get_tokens_generator(self): Returns a generator yielding tokens. """ return self.json_paging_api_call( - 'GET', - '/admin/v1/tokens', - {} + 'GET', + '/admin/v1/tokens', + {} ) def get_tokens(self, limit=None, offset=0): @@ -1745,9 +1760,9 @@ def get_tokens(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v1/tokens', - {'limit': limit, 'offset': offset} + 'GET', + '/admin/v1/tokens', + {'limit': limit, 'offset': offset} ) return list(self.get_tokens_generator()) @@ -1764,7 +1779,7 @@ def get_token_by_id(self, token_id): path = '/admin/v1/tokens/' + token_id params = {} response = self.json_api_call('GET', path, - params) + params) return response def get_tokens_by_serial(self, type, serial): @@ -1777,8 +1792,8 @@ def get_tokens_by_serial(self, type, serial): Returns a list of 0 or 1 token objects. """ params = { - 'type': type, - 'serial': serial, + 'type': type, + 'serial': serial, } response = self.json_api_call('GET', '/admin/v1/tokens', params) return response @@ -1808,7 +1823,7 @@ def add_hotp6_token(self, serial, secret, counter=None): if counter is not None: params['counter'] = str(int(counter)) response = self.json_api_call('POST', path, - params) + params) return response def add_hotp8_token(self, serial, secret, counter=None): @@ -1826,7 +1841,7 @@ def add_hotp8_token(self, serial, secret, counter=None): if counter is not None: params['counter'] = str(int(counter)) response = self.json_api_call('POST', path, - params) + params) return response def add_totp6_token(self, serial, secret, totp_step=None): @@ -1844,7 +1859,7 @@ def add_totp6_token(self, serial, secret, totp_step=None): if totp_step is not None: params['totp_step'] = str(int(totp_step)) response = self.json_api_call('POST', path, - params) + params) return response def add_totp8_token(self, serial, secret, totp_step=None): @@ -1862,7 +1877,7 @@ def add_totp8_token(self, serial, secret, totp_step=None): if totp_step is not None: params['totp_step'] = str(int(totp_step)) response = self.json_api_call('POST', path, - params) + params) return response def update_token(self, token_id, totp_step=None): @@ -1881,7 +1896,7 @@ def update_token(self, token_id, totp_step=None): if totp_step is not None: params['totp_step'] = totp_step response = self.json_api_call('POST', path, - params) + params) return response def add_yubikey_token(self, serial, private_id, aes_key): @@ -1894,10 +1909,10 @@ def add_yubikey_token(self, serial, private_id, aes_key): Returns newly added token object. """ path = '/admin/v1/tokens' - params = {'type': 'yk', 'serial': serial, 'private_id': private_id, + params = {'type': 'yk', 'serial': serial, 'private_id': private_id, 'aes_key': aes_key} response = self.json_api_call('POST', path, - params) + params) return response def resync_hotp_token(self, token_id, code1, code2, code3): @@ -2037,7 +2052,7 @@ def update_settings(self, params['fraud_email'] = fraud_email if fraud_email_enabled is not None: params['fraud_email_enabled'] = ('1' if - fraud_email_enabled else '0') + fraud_email_enabled else '0') if keypress_confirm is not None: params['keypress_confirm'] = keypress_confirm if keypress_fraud is not None: @@ -2054,16 +2069,16 @@ def update_settings(self, params['minimum_password_length'] = str(minimum_password_length) if password_requires_upper_alpha is not None: params['password_requires_upper_alpha'] = ('1' if - password_requires_upper_alpha else '0') + password_requires_upper_alpha else '0') if password_requires_lower_alpha is not None: params['password_requires_lower_alpha'] = ('1' if - password_requires_lower_alpha else '0') + password_requires_lower_alpha else '0') if password_requires_numeric is not None: params['password_requires_numeric'] = ('1' if - password_requires_numeric else '0') + password_requires_numeric else '0') if password_requires_special is not None: params['password_requires_special'] = ('1' if - password_requires_special else '0') + password_requires_special else '0') if helpdesk_bypass is not None: params['helpdesk_bypass'] = str(helpdesk_bypass) if helpdesk_bypass_expiration is not None: @@ -2072,24 +2087,24 @@ def update_settings(self, params['helpdesk_message'] = str(helpdesk_message) if helpdesk_can_send_enroll_email is not None: params['helpdesk_can_send_enroll_email'] = ('1' if - helpdesk_can_send_enroll_email else '0') + helpdesk_can_send_enroll_email else '0') if reactivation_url is not None: params['reactivation_url'] = reactivation_url if reactivation_integration_key is not None: params['reactivation_integration_key'] = reactivation_integration_key if security_checkup_enabled is not None: params['security_checkup_enabled'] = ('1' if - security_checkup_enabled else '0') + security_checkup_enabled else '0') if user_managers_can_put_users_in_bypass is not None: params['user_managers_can_put_users_in_bypass'] = ('1' if - user_managers_can_put_users_in_bypass else '0') + user_managers_can_put_users_in_bypass else '0') if email_activity_notification_enabled is not None: params['email_activity_notification_enabled'] = ( - '1' if email_activity_notification_enabled else '0' + '1' if email_activity_notification_enabled else '0' ) if push_activity_notification_enabled is not None: params['push_activity_notification_enabled'] = ( - '1' if push_activity_notification_enabled else '0' + '1' if push_activity_notification_enabled else '0' ) if not params: @@ -2101,45 +2116,45 @@ def update_settings(self, return response def set_allowed_admin_auth_methods(self, - push_enabled=None, - sms_enabled=None, - voice_enabled=None, - mobile_otp_enabled=None, - yubikey_enabled=None, - hardware_token_enabled=None, - ): + push_enabled=None, + sms_enabled=None, + voice_enabled=None, + mobile_otp_enabled=None, + yubikey_enabled=None, + hardware_token_enabled=None, + ): params = {} if push_enabled is not None: params['push_enabled'] = ( - '1' if push_enabled else '0') + '1' if push_enabled else '0') if sms_enabled is not None: params['sms_enabled'] = ( - '1' if sms_enabled else '0') + '1' if sms_enabled else '0') if mobile_otp_enabled is not None: params['mobile_otp_enabled'] = ( - '1' if mobile_otp_enabled else '0') + '1' if mobile_otp_enabled else '0') if hardware_token_enabled is not None: params['hardware_token_enabled'] = ( - '1' if hardware_token_enabled else '0') + '1' if hardware_token_enabled else '0') if yubikey_enabled is not None: params['yubikey_enabled'] = ( - '1' if yubikey_enabled else '0') + '1' if yubikey_enabled else '0') if voice_enabled is not None: params['voice_enabled'] = ( - '1' if voice_enabled else '0') + '1' if voice_enabled else '0') response = self.json_api_call( - 'POST', - '/admin/v1/admins/allowed_auth_methods', - params + 'POST', + '/admin/v1/admins/allowed_auth_methods', + params ) return response def get_allowed_admin_auth_methods(self): - params={} + params = {} response = self.json_api_call( - 'GET', - '/admin/v1/admins/allowed_auth_methods', - params + 'GET', + '/admin/v1/admins/allowed_auth_methods', + params ) return response @@ -2152,9 +2167,9 @@ def get_info_summary(self): """ params = {} response = self.json_api_call( - 'GET', - '/admin/v1/info/summary', - params + 'GET', + '/admin/v1/info/summary', + params ) return response @@ -2177,9 +2192,9 @@ def get_info_telephony_credits_used(self, if maxtime is not None: params['maxtime'] = maxtime response = self.json_api_call( - 'GET', - '/admin/v1/info/telephony_credits_used', - params + 'GET', + '/admin/v1/info/telephony_credits_used', + params ) return response @@ -2212,9 +2227,9 @@ def get_authentication_attempts(self, if maxtime is not None: params['maxtime'] = maxtime response = self.json_api_call( - 'GET', - '/admin/v1/info/authentication_attempts', - params + 'GET', + '/admin/v1/info/authentication_attempts', + params ) return response @@ -2249,9 +2264,9 @@ def get_user_authentication_attempts(self, if maxtime is not None: params['maxtime'] = maxtime response = self.json_api_call( - 'GET', - '/admin/v1/info/user_authentication_attempts', - params + 'GET', + '/admin/v1/info/user_authentication_attempts', + params ) return response @@ -2260,9 +2275,9 @@ def get_groups_generator(self): Returns a generator yielding groups. """ return self.json_paging_api_call( - 'GET', - '/admin/v1/groups', - {} + 'GET', + '/admin/v1/groups', + {} ) def get_groups_by_group_ids(self, group_ids): @@ -2277,9 +2292,9 @@ def get_groups_by_group_ids(self, group_ids): """ group_id_list = json.dumps(group_ids) return self.json_api_call( - 'GET', - '/admin/v1/groups', - {'group_id_list': group_id_list} + 'GET', + '/admin/v1/groups', + {'group_id_list': group_id_list} ) def get_groups(self, limit=None, offset=0): @@ -2298,9 +2313,9 @@ def get_groups(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v1/groups', - {'limit': limit, 'offset': offset} + 'GET', + '/admin/v1/groups', + {'limit': limit, 'offset': offset} ) return list(self.get_groups_generator()) @@ -2320,10 +2335,10 @@ def get_group(self, group_id, api_version=1): if api_version == 1: url = '/admin/v1/groups/' warnings.warn( - 'The v1 Admin API for group details will be deprecated ' - 'in a future release of the Duo Admin API. Please migrate to ' - 'the v2 API.', - DeprecationWarning) + 'The v1 Admin API for group details will be deprecated ' + 'in a future release of the Duo Admin API. Please migrate to ' + 'the v2 API.', + DeprecationWarning) elif api_version == 2: url = '/admin/v2/groups/' else: @@ -2343,12 +2358,12 @@ def get_group_users(self, group_id, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v2/groups/' + group_id + '/users', - { - 'limit': limit, - 'offset': offset, - }) + 'GET', + '/admin/v2/groups/' + group_id + '/users', + { + 'limit': limit, + 'offset': offset, + }) return list(self.get_group_users_iterator(group_id)) def get_group_users_iterator(self, group_id): @@ -2358,20 +2373,20 @@ def get_group_users_iterator(self, group_id): group_id - The id of the group (Required) """ return self.json_paging_api_call( - 'GET', - '/admin/v2/groups/' + group_id + '/users', - {} + 'GET', + '/admin/v2/groups/' + group_id + '/users', + {} ) def create_group(self, name, - desc=None, - status=None, - push_enabled=None, - sms_enabled=None, - voice_enabled=None, - mobile_otp_enabled=None, - u2f_enabled=None, - ): + desc=None, + status=None, + push_enabled=None, + sms_enabled=None, + voice_enabled=None, + mobile_otp_enabled=None, + u2f_enabled=None, + ): """ Create a new group. @@ -2384,7 +2399,7 @@ def create_group(self, name, mobile_otp_enabled - Mobile OTP restriction <>True/False (Optional) """ params = { - 'name': name, + 'name': name, } if desc is not None: params['desc'] = desc @@ -2401,9 +2416,9 @@ def create_group(self, name, if u2f_enabled is not None: params['u2f_enabled'] = '1' if u2f_enabled else '0' response = self.json_api_call( - 'POST', - '/admin/v1/groups', - params + 'POST', + '/admin/v1/groups', + params ) return response @@ -2414,9 +2429,9 @@ def delete_group(self, group_id): group_id - The id of the group (Required) """ return self.json_api_call( - 'DELETE', - '/admin/v1/groups/' + group_id, - {} + 'DELETE', + '/admin/v1/groups/' + group_id, + {} ) def modify_group(self, @@ -2429,7 +2444,7 @@ def modify_group(self, voice_enabled=None, mobile_otp_enabled=None, u2f_enabled=None, - ): + ): """ Modify a group @@ -2461,9 +2476,9 @@ def modify_group(self, if u2f_enabled is not None: params['u2f_enabled'] = '1' if u2f_enabled else '0' response = self.json_api_call( - 'POST', - '/admin/v1/groups/' + group_id, - params + 'POST', + '/admin/v1/groups/' + group_id, + params ) return response @@ -2472,9 +2487,9 @@ def get_integrations_generator(self): Returns a generator yielding integrations. """ return self.json_paging_api_call( - 'GET', - '/admin/v2/integrations', - {}, + 'GET', + '/admin/v2/integrations', + {}, ) def get_integrations(self, limit=None, offset=0): @@ -2492,9 +2507,9 @@ def get_integrations(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v2/integrations', - {'limit': limit, 'offset': offset}, + 'GET', + '/admin/v2/integrations', + {'limit': limit, 'offset': offset}, ) return list(self.get_integrations_generator()) @@ -2511,9 +2526,9 @@ def get_integration(self, integration_key): """ params = {} response = self.json_api_call( - 'GET', - '/admin/v2/integrations/' + integration_key, - params, + 'GET', + '/admin/v2/integrations/' + integration_key, + params, ) return response @@ -2603,12 +2618,12 @@ def create_integration(self, params['adminapi_read_log'] = '1' if adminapi_read_log else '0' if adminapi_read_resource is not None: params['adminapi_read_resource'] = ( - '1' if adminapi_read_resource else '0') + '1' if adminapi_read_resource else '0') if adminapi_settings is not None: params['adminapi_settings'] = '1' if adminapi_settings else '0' if adminapi_write_resource is not None: params['adminapi_write_resource'] = ( - '1' if adminapi_write_resource else '0') + '1' if adminapi_write_resource else '0') if groups_allowed is not None: params['groups_allowed'] = groups_allowed if self_service_allowed is not None: @@ -2618,7 +2633,7 @@ def create_integration(self, response = self.json_api_call('POST', '/admin/v2/integrations', params, - ) + ) return response def delete_integration(self, integration_key): @@ -2632,9 +2647,9 @@ def delete_integration(self, integration_key): integration_key = six.moves.urllib.parse.quote_plus(str(integration_key)) path = '/admin/v2/integrations/%s' % integration_key return self.json_api_call( - 'DELETE', - path, - {}, + 'DELETE', + path, + {}, ) def update_integration(self, @@ -2727,12 +2742,12 @@ def update_integration(self, params['adminapi_read_log'] = '1' if adminapi_read_log else '0' if adminapi_read_resource is not None: params['adminapi_read_resource'] = ( - '1' if adminapi_read_resource else '0') + '1' if adminapi_read_resource else '0') if adminapi_settings is not None: params['adminapi_settings'] = '1' if adminapi_settings else '0' if adminapi_write_resource is not None: params['adminapi_write_resource'] = ( - '1' if adminapi_write_resource else '0') + '1' if adminapi_write_resource else '0') if reset_secret_key is not None: params['reset_secret_key'] = '1' if groups_allowed is not None: @@ -2746,9 +2761,9 @@ def update_integration(self, raise TypeError("No new values were provided") response = self.json_api_call( - 'POST', - path, - params, + 'POST', + path, + params, ) return response @@ -2769,9 +2784,9 @@ def get_admins(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v1/admins', - {'limit': limit, 'offset': offset} + 'GET', + '/admin/v1/admins', + {'limit': limit, 'offset': offset} ) iterator = self.get_admins_iterator() @@ -2953,13 +2968,13 @@ def get_external_password_mgmt_statuses(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v1/admins/password_mgmt', - {'limit': limit, 'offset': offset} + 'GET', + '/admin/v1/admins/password_mgmt', + {'limit': limit, 'offset': offset} ) iterator = self.json_paging_api_call( - 'GET', '/admin/v1/admins/password_mgmt', {}) + 'GET', '/admin/v1/admins/password_mgmt', {}) return list(iterator) @@ -2981,8 +2996,8 @@ def get_external_password_mgmt_status_for_admin(self, admin_id): return response def update_admin_password_mgmt_status( - self, admin_id, has_external_password_mgmt=None, - password=None): + self, admin_id, has_external_password_mgmt=None, + password=None): """ Enable or disable an admin for external password management, and optionally set the password for an admin @@ -3033,7 +3048,7 @@ def update_logo(self, logo): Raises RuntimeError on error. """ params = { - 'logo': base64.b64encode(logo).decode(), + 'logo': base64.b64encode(logo).decode(), } return self.json_api_call('POST', '/admin/v1/logo', params) @@ -3182,9 +3197,9 @@ def get_bypass_codes_generator(self): Returns a generator yielding bypass codes. """ return self.json_paging_api_call( - 'GET', - '/admin/v1/bypass_codes', - {} + 'GET', + '/admin/v1/bypass_codes', + {} ) def get_bypass_codes(self, limit=None, offset=0): @@ -3203,9 +3218,9 @@ def get_bypass_codes(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - 'GET', - '/admin/v1/bypass_codes', - {'limit': limit, 'offset': offset} + 'GET', + '/admin/v1/bypass_codes', + {'limit': limit, 'offset': offset} ) return list(self.get_bypass_codes_generator()) @@ -3237,19 +3252,19 @@ def sync_user(self, username, directory_key): Raises RuntimeError on error. """ params = { - 'username': username, + 'username': username, } directory_key = six.moves.urllib.parse.quote_plus(directory_key) path = ( - '/admin/v1/users/directorysync/{directory_key}/syncuser').format( + '/admin/v1/users/directorysync/{directory_key}/syncuser').format( directory_key=directory_key) return self.json_api_call('POST', path, params) def get_trust_monitor_events_iterator( - self, - mintime, - maxtime, - event_type=None, + self, + mintime, + maxtime, + event_type=None, ): """ Returns a generator which yields trust monitor events. @@ -3272,27 +3287,27 @@ def get_trust_monitor_events_iterator( """ params = { - "mintime": "{}".format(mintime), - "maxtime": "{}".format(maxtime), + "mintime": "{}".format(mintime), + "maxtime": "{}".format(maxtime), } if event_type is not None: params["type"] = event_type return self.json_cursor_api_call( - "GET", - "/admin/v1/trust_monitor/events", - params, - lambda resp: resp["events"], + "GET", + "/admin/v1/trust_monitor/events", + params, + lambda resp: resp["events"], ) def get_trust_monitor_events_by_offset( - self, - mintime, - maxtime, - limit=None, - offset=None, - event_type=None, + self, + mintime, + maxtime, + limit=None, + offset=None, + event_type=None, ): """ Fetch Duo Trust Monitor Events from the Admin API. @@ -3319,8 +3334,8 @@ def get_trust_monitor_events_by_offset( """ params = { - "mintime": "{}".format(mintime), - "maxtime": "{}".format(maxtime), + "mintime": "{}".format(mintime), + "maxtime": "{}".format(maxtime), } if limit is not None: @@ -3333,9 +3348,9 @@ def get_trust_monitor_events_by_offset( params["type"] = event_type return self.json_api_call( - "GET", - "/admin/v1/trust_monitor/events", - params, + "GET", + "/admin/v1/trust_monitor/events", + params, ) def _quote_policy_id(self, policy_key): @@ -3348,9 +3363,9 @@ def get_policies_v2_iterator(self): """ return self.json_paging_api_call( - "GET", - "/admin/v2/policies", - {}, + "GET", + "/admin/v2/policies", + {}, ) def get_policies_v2(self, limit=None, offset=0): @@ -3366,9 +3381,9 @@ def get_policies_v2(self, limit=None, offset=0): (limit, offset) = self.normalize_paging_args(limit, offset) if limit: return self.json_api_call( - "GET", - "/admin/v2/policies", - {"limit": limit, "offset": offset}, + "GET", + "/admin/v2/policies", + {"limit": limit, "offset": offset}, ) return list(self.get_policies_v2_iterator()) @@ -3396,8 +3411,8 @@ def update_policy_v2(self, policy_key, json_request): path = "/admin/v2/policies/" + self._quote_policy_id(policy_key) response = self.json_api_call("PUT", path, json_request) return response - - def update_policies_v2(self, sections, sections_to_delete, + + def update_policies_v2(self, sections, sections_to_delete, edit_list, edit_all_policies=False): """ Update the contents of multiple policies. @@ -3413,14 +3428,14 @@ def update_policies_v2(self, sections, sections_to_delete, """ path = "/admin/v2/policies/update" params = { - "policies_to_update": { - "edit_all_policies": edit_all_policies, - "edit_list": edit_list, - }, - "policy_changes": { - "sections": sections, - "sections_to_delete": sections_to_delete, - }, + "policies_to_update": { + "edit_all_policies": edit_all_policies, + "edit_list": edit_list, + }, + "policy_changes": { + "sections": sections, + "sections_to_delete": sections_to_delete, + }, } response = self.json_api_call("PUT", path, params) return response @@ -3435,7 +3450,7 @@ def create_policy_v2(self, json_request): path = "/admin/v2/policies" response = self.json_api_call("POST", path, json_request) return response - + def copy_policy_v2(self, policy_key, new_policy_names_list): """ Copy policy to multiple new policies. @@ -3449,8 +3464,8 @@ def copy_policy_v2(self, policy_key, new_policy_names_list): """ path = "/admin/v2/policies/copy" params = { - "policy_key": policy_key, - "new_policy_names_list": new_policy_names_list + "policy_key": policy_key, + "new_policy_names_list": new_policy_names_list } response = self.json_api_call("POST", path, params) return response @@ -3465,7 +3480,7 @@ def get_policy_v2(self, policy_key): path = "/admin/v2/policies/" + self._quote_policy_id(policy_key) response = self.json_api_call("GET", path, {}) return response - + def get_policy_summary_v2(self): """ Returns (dict) - summary of all policies and the applications @@ -3488,17 +3503,17 @@ def __init__(self, account_id, child_api_host=None, **kwargs): See the Client base class for other parameters. """ if not child_api_host: - child_api_host = Accounts.child_map.get(account_id, None) + child_api_host = Accounts.child_map.get(account_id, None) if child_api_host is None: child_api_host = kwargs.get('host') try: accounts_api = Accounts(**kwargs) accounts_api.get_child_accounts() - child_api_host = Accounts.child_map.get(account_id, kwargs['host']) + child_api_host = Accounts.child_map.get(account_id, kwargs['host']) except RuntimeError: pass kwargs['host'] = child_api_host - + super(AccountAdmin, self).__init__(**kwargs) self.account_id = account_id @@ -3526,7 +3541,7 @@ def set_edition(self, edition): Raises RuntimeError on error. """ params = { - 'edition': edition, + 'edition': edition, } return self.json_api_call('POST', @@ -3542,9 +3557,8 @@ def get_telephony_credits(self): Raises RuntimeError on error. """ return self.json_api_call('GET', - '/admin/v1/billing/telephony_credits', - params={}) - + '/admin/v1/billing/telephony_credits', + params={}) def set_telephony_credits(self, credits): """ @@ -3558,8 +3572,8 @@ def set_telephony_credits(self, credits): Raises RuntimeError on error. """ params = { - 'credits': str(credits), + 'credits': str(credits), } return self.json_api_call('POST', - '/admin/v1/billing/telephony_credits', - params) + '/admin/v1/billing/telephony_credits', + params) diff --git a/examples/report_user_by_email.py b/examples/report_user_by_email.py index 8704772..d8e135a 100755 --- a/examples/report_user_by_email.py +++ b/examples/report_user_by_email.py @@ -35,13 +35,7 @@ def main(): # Retrieve user info from API: email_address = get_next_arg('E-mail address of user to retrieve: ') - req_params = {"email": email_address} - # There is no get_user_by_email in the duo_client_python library, so we use the generic api_call - user = admin_api.json_api_call( - method='GET', - path='/admin/v1/users', - params=req_params - ) + user = admin_api.get_user_by_email(email_address) if user: pprint(user, indent=2)