Skip to content

Commit

Permalink
Add methods for bulk policy operations (#229)
Browse files Browse the repository at this point in the history
* New methods

* Tests

* Update policies.py example

* Remove value from testing

* Update docstrings
  • Loading branch information
jaherne-duo authored Oct 12, 2023
1 parent 68ab134 commit a4b6cfe
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 0 deletions.
47 changes: 47 additions & 0 deletions duo_client/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3396,6 +3396,34 @@ def update_policy_v2(self, policy_key, json_request):
path = "/admin/v2/policies/" + self._quote_policy_id(policy_key)
response = self.json_api_call("PUT", path, json_request)
return response

def update_policies_v2(self, sections, sections_to_delete,
edit_list, edit_all_policies=False):
"""
Update the contents of multiple policies.
Args:
sections (dict): policy content to update
sections_to_delete (list): List of section names to delete
edit_list (list): List of new policy keys to apply the changes to.
Ignored if edit_all_policies is True.
edit_all_policies (bool, optional): Apply changes to all policies.
Defaults to False.
Returns (list): all updated policies
"""
path = "/admin/v2/policies/update"
params = {
"policies_to_update": {
"edit_all_policies": edit_all_policies,
"edit_list": edit_list,
},
"policy_changes": {
"sections": sections,
"sections_to_delete": sections_to_delete,
},
}
response = self.json_api_call("PUT", path, params)
return response

def create_policy_v2(self, json_request):
"""
Expand All @@ -3407,6 +3435,25 @@ def create_policy_v2(self, json_request):
path = "/admin/v2/policies"
response = self.json_api_call("POST", path, json_request)
return response

def copy_policy_v2(self, policy_key, new_policy_names_list):
"""
Copy policy to multiple new policies.
Args:
policy_key (str): Unique id of the policy to copy from
new_policy_names_list (array): The policy specified by policy_key
will be copied once for each name
in the list
Returns (list): all new policies
"""
path = "/admin/v2/policies/copy"
params = {
"policy_key": policy_key,
"new_policy_names_list": new_policy_names_list
}
response = self.json_api_call("POST", path, params)
return response

def get_policy_v2(self, policy_key):
"""
Expand Down
25 changes: 25 additions & 0 deletions examples/policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,25 @@ def create_policy_browsers(name, print_response=False):
print(pretty)
return response.get("policy_key")

def copy_policy(name1, name2, copy_from, print_response=False):
"""
Copy the policy `copy_from` to two new policies.
"""
response = admin_api.copy_policy_v2(copy_from, [name1, name2])
if print_response:
pretty = json.dumps(response, indent=4, sort_keys=True, default=str)
print(pretty)
policies = response.get("policies")
return (policies[0].get("policy_key"), policies[1].get("policy_key"))

def bulk_delete_section(policy_keys, print_response=False):
"""
Delete the section "browsers" from the provided policies.
"""
response = admin_api.update_policies_v2("", ["browsers"], policy_keys)
if print_response:
pretty = json.dumps(response, indent=4, sort_keys=True, default=str)
print(pretty)

def update_policy_with_device_health_app(policy_key, print_response=False):
"""
Expand Down Expand Up @@ -131,6 +150,12 @@ def main():
# Create a policy with browser restriction settings.
policy_key_d = create_policy_browsers("Test New Policy - d")

# Copy a policy to 2 new policies.
policy_key_e, policy_key_f = copy_policy("Test New Policy - e", "Test New Policy - f", policy_key_d)

# Delete the browser restriction settings from 2 policies.
bulk_delete_section([policy_key_e, policy_key_f])

# Fetch the global and other custom policy.
get_policy("global")
get_policy(policy_key_b)
Expand Down
19 changes: 19 additions & 0 deletions tests/admin/test_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ def test_update_policy_v2(self):
self.assertEqual(response["method"], "PUT")
self.assertEqual(response["uri"], "/admin/v2/policies/{}".format(policy_key))

def test_update_policies_v2(self):
edit_list = ["POSTGY2G0HVWJR4JO1XT", "POSTGY2G0HVWJR4JO1XU"]
sections = {
"browsers": {
"blocked_browsers_list": ["ie"],
}
}
response = self.client.update_policies_v2(sections, [], edit_list)
self.assertEqual(response["method"], "PUT")
self.assertEqual(response["uri"], "/admin/v2/policies/update")

def test_create_policy_v2(self):
policy_settings = {
"name": "my new policy",
Expand All @@ -53,6 +64,14 @@ def test_create_policy_v2(self):
self.assertEqual(response["method"], "POST")
self.assertEqual(response["uri"], "/admin/v2/policies")

def test_copy_policy_v2(self):
policy_key = "POSTGY2G0HVWJR4JO1XT"
new_policy_names_list = ["Copied Pol 1", "Copied Pol 2"]
response = self.client.copy_policy_v2(policy_key, new_policy_names_list)

self.assertEqual(response["method"], "POST")
self.assertEqual(response["uri"], "/admin/v2/policies/copy")

def test_get_policies_v2(self):
response = self.client.get_policies_v2(limit=3, offset=0)
uri, args = response["uri"].split("?")
Expand Down

0 comments on commit a4b6cfe

Please sign in to comment.