From 7eb49226f2339d6f6284cbd4e4cba497e61d96fe Mon Sep 17 00:00:00 2001 From: claravox Date: Fri, 13 Oct 2023 11:43:26 +0200 Subject: [PATCH] Linting (only) --- yclienttools/common_rules.py | 268 +++++++++++++++++------------------ yclienttools/importgroups.py | 128 +++++++++++------ 2 files changed, 213 insertions(+), 183 deletions(-) diff --git a/yclienttools/common_rules.py b/yclienttools/common_rules.py index 75c86a2..3dd653a 100644 --- a/yclienttools/common_rules.py +++ b/yclienttools/common_rules.py @@ -9,6 +9,7 @@ from yclienttools import common_queries from yclienttools.exceptions import SizeNotSupportedException + class RuleInterface: def __init__(self, session, yoda_version): @@ -22,12 +23,12 @@ def __init__(self, session, yoda_version): """ self.session = session self.set_re = False if yoda_version == "1.7" else True - self.uuGroupAdd_version = "1.7" if yoda_version in ["1.7", "1.8"] else "1.9" + self.uuGroupAdd_version = "1.7" if yoda_version in [ + "1.7", "1.8"] else "1.9" self.default_rule_engine = 'irods_rule_engine_plugin-irods_rule_language-instance' - def call_rule(self, rulename, params, number_outputs, - rule_engine = None): + rule_engine=None): """Run a rule :param rulename: name of the rule @@ -40,23 +41,25 @@ def call_rule(self, rulename, params, number_outputs, for input_var in params.keys(): body += "*{},".format(input_var) - outparams = list(map(lambda n : '*outparam{}'.format(str(n+1)), range(number_outputs))) + outparams = list( + map(lambda n: '*outparam{}'.format(str(n + 1)), range(number_outputs))) body += '{}); writeLine("stdout","{}")}}'.format( ",".join(outparams), "\n".join(outparams)) - input_params = { "*{}".format(k) : '"{}"'.format(v) for (k,v) in params.items() } + input_params = {"*{}".format(k): '"{}"'.format(v) + for (k, v) in params.items()} output_params = 'ruleExecOut' if self.set_re: - re_config = { 'instance_name': self.default_rule_engine if rule_engine is None - else rule_engine } + re_config = {'instance_name': self.default_rule_engine if rule_engine is None + else rule_engine} else: re_config = {} myrule = Rule( self.session, - rule_file = StringIO(body), + rule_file=StringIO(body), params=input_params, output=output_params, **re_config) @@ -67,163 +70,156 @@ def call_rule(self, rulename, params, number_outputs, return buf[:number_outputs] - def _string_list_to_list(self, s): if s.startswith("[") and s.endswith("]"): return s[1:-1].split(",") else: - raise ValueError("Unable to convert string representation of list to list") - + raise ValueError( + "Unable to convert string representation of list to list") def call_uuGroupGetMembers(self, groupname): - """Returns list of group members""" - parms = OrderedDict([ - ( 'groupname', groupname)] ) - [out] = self.call_rule('uuGroupGetMembers', parms, 1) - if len(out) >= 1023 and not out.endswith("]"): - raise SizeNotSupportedException("Group member list exceeds 1023 bytes") - return self._string_list_to_list(out) - + """Returns list of group members""" + parms = OrderedDict([ + ('groupname', groupname)]) + [out] = self.call_rule('uuGroupGetMembers', parms, 1) + if len(out) >= 1023 and not out.endswith("]"): + raise SizeNotSupportedException( + "Group member list exceeds 1023 bytes") + return self._string_list_to_list(out) def call_uuGroupUserRemove(self, groupname, user): - """Removes a user from a group""" - parms = OrderedDict([ - ( 'groupname', groupname), - ( 'user', user) ]) - return self.call_rule('uuGroupUserRemove', parms, 2) - + """Removes a user from a group""" + parms = OrderedDict([ + ('groupname', groupname), + ('user', user)]) + return self.call_rule('uuGroupUserRemove', parms, 2) def call_uuGroupGetMemberType(self, groupname, user): - """:returns: member type of a group member""" - parms = OrderedDict([ - ( 'groupname', groupname), - ( 'user', user) ]) - return self.call_rule('uuGroupGetMemberType', parms, 1)[0] - - def call_uuGroupUserAddByOtherCreator(self, groupname, username, creator_user, creator_zone): - """Adds user to group on the behalf of a creator user. - - :param: groupname - :param: username - :param: creator_user - :param: creator_zone - :returns: (status, message) ; status !=0 is error - """ - parms = OrderedDict([ - ('groupname', groupname), - ('username', username), - ('creatorUser', creator_user), - ('creatorZone', creator_zone)]) - return self.call_rule('uuGroupUserAdd', parms, 2) + """:returns: member type of a group member""" + parms = OrderedDict([ + ('groupname', groupname), + ('user', user)]) + return self.call_rule('uuGroupGetMemberType', parms, 1)[0] + + def call_uuGroupUserAddByOtherCreator( + self, groupname, username, creator_user, creator_zone): + """Adds user to group on the behalf of a creator user. + + :param: groupname + :param: username + :param: creator_user + :param: creator_zone + :returns: (status, message) ; status !=0 is error + """ + parms = OrderedDict([ + ('groupname', groupname), + ('username', username), + ('creatorUser', creator_user), + ('creatorZone', creator_zone)]) + return self.call_rule('uuGroupUserAdd', parms, 2) def call_uuGroupUserAdd(self, groupname, username): - """Adds user to group. - - :param: groupname - :param: username - :returns: (status, message) ; status !=0 is error - """ - parms = OrderedDict([ - ('groupname', groupname), - ('username', username)]) - return self.call_rule('uuGroupUserAdd', parms, 2) + """Adds user to group. + :param: groupname + :param: username + :returns: (status, message) ; status !=0 is error + """ + parms = OrderedDict([ + ('groupname', groupname), + ('username', username)]) + return self.call_rule('uuGroupUserAdd', parms, 2) def call_uuGroupUserChangeRole(self, groupname, username, newrole): - """Change role of user in group - - :param groupname: name of group - :param username: name of user - :param newrole: new role (can be "manager", "reader", "normal") - :returns: (status, message) ; status != 0 is error - """ - parms = OrderedDict([ - ('groupname', groupname), - ('username', username), - ('newrole', newrole)]) - return self.call_rule('uuGroupUserChangeRole', parms, 2) + """Change role of user in group + :param groupname: name of group + :param username: name of user + :param newrole: new role (can be "manager", "reader", "normal") + :returns: (status, message) ; status != 0 is error + """ + parms = OrderedDict([ + ('groupname', groupname), + ('username', username), + ('newrole', newrole)]) + return self.call_rule('uuGroupUserChangeRole', parms, 2) def call_uuGroupExists(self, groupname): - """Check whether group name exists on Yoda - - :param groupname: name of group - :returns: false/true - """ - parms = OrderedDict([('groupname', groupname)]) - [out] = self.call_rule('uuGroupExists', parms, 1) - return out == 'true' + """Check whether group name exists on Yoda + :param groupname: name of group + :returns: false/true + """ + parms = OrderedDict([('groupname', groupname)]) + [out] = self.call_rule('uuGroupExists', parms, 1) + return out == 'true' def call_uuUserExists(self, username): - """Check whether user name exists on Yoda - - :param username: name of user - :returns: false/true - """ - parms = OrderedDict([('username', username)]) - [out] = self.call_rule('uuUserExists', parms, 1) - return out == 'true' + """Check whether user name exists on Yoda + :param username: name of user + :returns: false/true + """ + parms = OrderedDict([('username', username)]) + [out] = self.call_rule('uuUserExists', parms, 1) + return out == 'true' def call_uuGroupAdd(self, groupname, category, - subcategory, description, classification, schema_id='default-2', expiration_date=''): - """Adds a group - - :param groupname: name of group - :param category: category / community - :param subcategory: subcategory - :param description: description - :param classification: security classification - :param schema_id: schema id - :param expiration_date: expiration date - - :returns: (status, message). Status not 0 means error, - -1089000 means group name already exists - """ - if self.uuGroupAdd_version == "1.7": - parms = OrderedDict([ - ('groupname', groupname), - ('category', category), - ('subcategory', subcategory), - ('description', description), - ('classification', classification)]) - elif self.uuGroupAdd_version == "1.9": - parms = OrderedDict([ - ('groupname', groupname), - ('category', category), - ('subcategory', subcategory), - ('schema_id', schema_id if schema_id not in ("", ".") else "default-2"), - ('expiration_date', expiration_date), - ('description', description), - ('dataClassification', classification), - ('co_identifier', '') - ]) - - return self.call_rule('uuGroupAdd', parms, 2) - + subcategory, description, classification, schema_id='default-2', expiration_date=''): + """Adds a group + + :param groupname: name of group + :param category: category / community + :param subcategory: subcategory + :param description: description + :param classification: security classification + :param schema_id: schema id + :param expiration_date: expiration date + + :returns: (status, message). Status not 0 means error, + -1089000 means group name already exists + """ + if self.uuGroupAdd_version == "1.7": + parms = OrderedDict([ + ('groupname', groupname), + ('category', category), + ('subcategory', subcategory), + ('description', description), + ('classification', classification)]) + elif self.uuGroupAdd_version == "1.9": + parms = OrderedDict([ + ('groupname', groupname), + ('category', category), + ('subcategory', subcategory), + ('schema_id', schema_id if schema_id not in ("", ".") else "default-2"), + ('expiration_date', expiration_date), + ('description', description), + ('dataClassification', classification), + ('co_identifier', '') + ]) + + return self.call_rule('uuGroupAdd', parms, 2) def call_uuGroupModify(self, groupname, property, value): - """Modifies one property of a group - - :param groupname: name of group - :param property: property to change - :param value: value to change the property to + """Modifies one property of a group - :returns: (status, message). Status not 0 means error. - """ - parms = OrderedDict([('groupname', groupname), - ('property', property), - ('value', value)]) - return self.call_rule('uuGroupModify', parms, 2) + :param groupname: name of group + :param property: property to change + :param value: value to change the property to + :returns: (status, message). Status not 0 means error. + """ + parms = OrderedDict([('groupname', groupname), + ('property', property), + ('value', value)]) + return self.call_rule('uuGroupModify', parms, 2) def call_uuGroupRemove(self, groupname): - """Removes an empty group + """Removes an empty group - :param groupname: name of group + :param groupname: name of group - :returns: (status, message). Status not 0 means error. - """ - parms = OrderedDict([('groupname', groupname)]) - return self.call_rule('uuGroupRemove', parms, 2) + :returns: (status, message). Status not 0 means error. + """ + parms = OrderedDict([('groupname', groupname)]) + return self.call_rule('uuGroupRemove', parms, 2) diff --git a/yclienttools/importgroups.py b/yclienttools/importgroups.py index f5dcd44..63ac39d 100644 --- a/yclienttools/importgroups.py +++ b/yclienttools/importgroups.py @@ -20,6 +20,7 @@ # Based on yoda-batch-add script by Ton Smeele + def parse_csv_file(input_file, args, yoda_version): extracted_data = [] @@ -40,9 +41,11 @@ def parse_csv_file(input_file, args, yoda_version): _exit_with_error( 'CSV header is missing compulsory field "{}"'.format(label)) - duplicate_columns = _get_duplicate_columns(reader.fieldnames, yoda_version) - if ( len(duplicate_columns) > 0 ): - _exit_with_error("File has duplicate column(s): " + str(duplicate_columns) ) + duplicate_columns = _get_duplicate_columns( + reader.fieldnames, yoda_version) + if (len(duplicate_columns) > 0): + _exit_with_error( + "File has duplicate column(s): " + str(duplicate_columns)) for line in reader: row_number += 1 @@ -56,13 +59,16 @@ def parse_csv_file(input_file, args, yoda_version): return extracted_data + def _get_csv_required_labels(): return ['category', 'subcategory', 'groupname'] + def _get_csv_1_9_exclusive_labels(): """Returns labels that can only appear with yoda version 1.9 and higher.""" return ['expiration_date', 'schema_id'] + def _get_csv_predefined_labels(yoda_version): if yoda_version in ('1.7', '1.8'): return ['category', 'subcategory', 'groupname'] @@ -76,7 +82,7 @@ def _get_duplicate_columns(fields_list, yoda_version): for field in fields_list: if (field in _get_csv_predefined_labels(yoda_version) or - field.startswith(("manager:", "viewer:", "member:"))): + field.startswith(("manager:", "viewer:", "member:"))): if field in fields_seen: duplicate_fields.add(field) else: @@ -149,15 +155,16 @@ def _process_csv_line(line, args, yoda_version): if not is_valid_expiration_date(expiration_date): return None, '"{}" is not a valid expiration date.'.format(expiration_date) - row_data = (category, subcategory, groupname, managers, members, viewers, schema_id, expiration_date) + row_data = (category, subcategory, groupname, managers, + members, viewers, schema_id, expiration_date) return row_data, None -def _are_roles_equivalent(a,b): +def _are_roles_equivalent(a, b): """Checks whether two roles are equivalent. Needed because Yoda and Yoda-clienttools use slightly different names for the roles.""" - r_role_names = [ "viewer", "reader" ] - m_role_names = [ "member", "normal" ] + r_role_names = ["viewer", "reader"] + m_role_names = ["member", "normal"] if a == b: return True @@ -168,24 +175,29 @@ def _are_roles_equivalent(a,b): else: return False + def is_email(username): return re.search(r'@.*[^\.]+\.[^\.]+$', username) is not None + @lru_cache(maxsize=100) def is_valid_domain(domain): try: - return bool(resolver.query(domain, 'MX')) + return bool(resolver.query(domain, 'MX')) except (resolver.NXDOMAIN, resolver.NoAnswer): - return False + return False + def is_valid_category(name): """Is this name a valid (sub)category name?""" return re.search(r"^[a-zA-Z0-9\-_]+$", name) is not None + def is_valid_groupname(name): """Is this name a valid group name (prefix such as "research-" can be omitted""" return re.search(r"^[a-zA-Z0-9\-]+$", name) is not None + def is_internal_user(username, internal_domains): for domain in internal_domains: domain_pattern = '@{}$'.format(domain) @@ -194,6 +206,7 @@ def is_internal_user(username, internal_domains): return False + def is_valid_expiration_date(expiration_date): """Validation of expiration date. @@ -216,12 +229,14 @@ def is_valid_expiration_date(expiration_date): except ValueError: return False + def is_valid_schema_id(schema_id): """Is this schema at least a correctly formatted schema-id?""" if schema_id == "": return True return re.search(r"^[a-zA-Z0-9\-]+\-[0-9]+$", schema_id) is not None + def validate_data(rule_interface, args, data): errors = [] for (category, subcategory, groupname, managers, members, viewers, schema_id, expiration_date) in data: @@ -233,7 +248,7 @@ def validate_data(rule_interface, args, data): # ensure that external users already have an iRODS account # we do not want to be the actor that creates them (unless # we are creating them in the name of a creator user) - if not rule_interface.call_uuUserExists(user)and not args.creator_user: + if not rule_interface.call_uuUserExists(user) and not args.creator_user: errors.append( 'Group {} has nonexisting external user {}'.format(groupname, user)) @@ -256,7 +271,8 @@ def apply_data(rule_interface, args, data): print( 'WARNING: group "{}" not created, it already exists'.format(groupname)) if schema_id != '': - print('WARNING: group property "schema_id" not updated, as it can only be specified when the group is first created') + print( + 'WARNING: group property "schema_id" not updated, as it can only be specified when the group is first created') elif status != '0': _exit_with_error( 'Error while attempting to create group "{}". Status/message: {} / {}'.format( @@ -269,26 +285,31 @@ def apply_data(rule_interface, args, data): # Now add the users and set their role if other than member allusers = managers + members + viewers for username in list(set(allusers)): # duplicates removed - currentrole = rule_interface.call_uuGroupGetMemberType(groupname, username) + currentrole = rule_interface.call_uuGroupGetMemberType( + groupname, username) if currentrole == "none": if args.creator_user: - [status, msg] = rule_interface.call_uuGroupUserAddByOtherCreator(groupname, username, args.creator_user, args.creator_zone) + [status, msg] = rule_interface.call_uuGroupUserAddByOtherCreator( + groupname, username, args.creator_user, args.creator_zone) else: - [status, msg] = rule_interface.call_uuGroupUserAdd(groupname, username) + [status, msg] = rule_interface.call_uuGroupUserAdd( + groupname, username) if status == '0': - currentrole = "member" - if args.verbose: - print("Notice: added user {} to group {}".format(username,groupname)) + currentrole = "member" + if args.verbose: + print("Notice: added user {} to group {}".format( + username, groupname)) else: print("Warning: error occurred while attempting to add user {} to group {}".format( - username, - groupname)) + username, + groupname)) print("Status: {} , Message: {}".format(status, msg)) else: if args.verbose: - print("Notice: user {} is already present in group {}.".format(username,groupname)) + print("Notice: user {} is already present in group {}.".format( + username, groupname)) # Set requested role. Note that user could be listed in multiple roles. # In case of multiple roles, manager takes precedence over normal, @@ -301,13 +322,15 @@ def apply_data(rule_interface, args, data): if _are_roles_equivalent(role, currentrole): if args.verbose: - print("Notice: user {} already has role {} in group {}.".format(username, role, groupname)) + print("Notice: user {} already has role {} in group {}.".format( + username, role, groupname)) else: [status, msg] = rule_interface.call_uuGroupUserChangeRole( groupname, username, role) if status == '0': if args.verbose: - print("Notice: changed role of user {} in group {} to {}".format(username, groupname, role)) + print("Notice: changed role of user {} in group {} to {}".format( + username, groupname, role)) else: print( "Warning: error while attempting to change role of user {} in group {} to {}".format( @@ -318,25 +341,29 @@ def apply_data(rule_interface, args, data): # Always remove the rods user for new groups, unless it is in the # CSV file. - if ( new_group and "rods" not in allusers and - rule_interface.call_uuGroupGetMemberType(groupname, "rods") != "none" ): - (status,msg) = rule_interface.call_uuGroupUserRemove(groupname, "rods") - if status == "0": - if args.verbose: - print("Notice: removed rods user from group " + groupname) - else: - if status !=0: - print ("Warning: error while attempting to remove user rods from group {}".format(groupname)) - print("Status: {} , Message: {}".format(status, msg)) - + if (new_group and "rods" not in allusers and + rule_interface.call_uuGroupGetMemberType(groupname, "rods") != "none"): + (status, msg) = rule_interface.call_uuGroupUserRemove(groupname, "rods") + if status == "0": + if args.verbose: + print("Notice: removed rods user from group " + groupname) + else: + if status != 0: + print("Warning: error while attempting to remove user rods from group {}".format( + groupname)) + print("Status: {} , Message: {}".format(status, msg)) + # Update expiration date if applicable - if not new_group and expiration_date not in ['','.'] and args.allow_update: - [status, msg] = rule_interface.call_uuGroupModify(groupname, "expiration_date", expiration_date) + if not new_group and expiration_date not in ['', '.'] and args.allow_update: + [status, msg] = rule_interface.call_uuGroupModify( + groupname, "expiration_date", expiration_date) if status == "0": if args.verbose: - print("Notice: updated expiration date to {} for group {}".format(expiration_date, groupname)) + print("Notice: updated expiration date to {} for group {}".format( + expiration_date, groupname)) else: - print("Warning: error while attempting to update expiration date to {} for group {}".format(expiration_date, groupname)) + print("Warning: error while attempting to update expiration date to {} for group {}".format( + expiration_date, groupname)) print("Status: {} , Message: {}".format(status, msg)) # Remove users not in sheet @@ -345,7 +372,8 @@ def apply_data(rule_interface, args, data): try: currentusers = rule_interface.call_uuGroupGetMembers(groupname) except SizeNotSupportedException: - print("Unable to check whether members of group {} need to be deleted.".format(groupname)) + print("Unable to check whether members of group {} need to be deleted.".format( + groupname)) print("Number of current members group is too large.") continue @@ -354,15 +382,18 @@ def apply_data(rule_interface, args, data): if user in managers: if len(managers) == 1: print("Error: cannot remove user {} from group {}, because he/she is the only group manager".format( - user,groupname)) + user, groupname)) continue else: managers.remove(user) if args.verbose: - print("Removing user {} from group {}".format(user,groupname)) - (status,msg) = rule_interface.call_uuGroupUserRemove(groupname, user) + print("Removing user {} from group {}".format( + user, groupname)) + (status, msg) = rule_interface.call_uuGroupUserRemove( + groupname, user) if status != "0": - print ("Warning: error while attempting to remove user {} from group {}".format(user,groupname)) + print("Warning: error while attempting to remove user {} from group {}".format( + user, groupname)) print("Status: {} , Message: {}".format(status, msg)) @@ -396,13 +427,16 @@ def entry(): print_parsed_data(data) if args.delete and not args.allow_update: - _exit_with_error("Using the --delete option without the --allow-update option is not supported.") + _exit_with_error( + "Using the --delete option without the --allow-update option is not supported.") if (args.creator_user and not args.creator_zone) or (not args.creator_user and args.creator_zone): - _exit_with_error("Using the --creator-user option without the --creator-zone option is not supported.") + _exit_with_error( + "Using the --creator-user option without the --creator-zone option is not supported.") - if (args.creator_user and (yoda_version in ('1.7','1.8'))): - _exit_with_error("The --creator-user and --creator-zone options are only supported with Yoda versions 1.9 and higher.") + if (args.creator_user and (yoda_version in ('1.7', '1.8'))): + _exit_with_error( + "The --creator-user and --creator-zone options are only supported with Yoda versions 1.9 and higher.") if args.offline_check: sys.exit(0)