-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathise-post-internalusers.py
executable file
Β·239 lines (205 loc) Β· 9.7 KB
/
ise-post-internalusers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
#!/usr/bin/env python3
"""
Generates the specified number of ISE internaluser resources using a REST API.
Examples:
ise-post-internalusers.py -h
ise-post-internalusers.py
ise-post-internalusers.py -n 10
ise-post-internalusers.py -n 100 -vt
Requires setting the these environment variables using the `export` command:
export ISE_PPAN='1.2.3.4' # hostname or IP address of ISE Primary PAN
export ISE_REST_USERNAME='admin' # ISE ERS admin or operator username
export ISE_REST_PASSWORD='C1sco12345' # ISE ERS admin or operator password
export ISE_CERT_VERIFY=false # validate the ISE certificate
You may add these export lines to a text file and load with `source`:
source ise-env.sh
"""
__author__ = "Thomas Howard"
__email__ = "[email protected]"
__license__ = "MIT - https://mit-license.org/"
import aiohttp
import asyncio
import argparse
from faker import Faker # generate fake users, MACs, IPs
import csv
import io
import json
import os
import random
import sys
import time
# Globals
REST_PAGE_SIZE_DEFAULT = 20
REST_PAGE_SIZE_MAX = 100
REST_PAGE_SIZE = REST_PAGE_SIZE_MAX
WORKERS_MAX = 20
faker = Faker("en-US") # fake data generator
username_cache = {} # NAS identifier name cache to ensure uniqueness
def make_username(firstname=faker.first_name(), lastname=faker.last_name()):
"""
Returns the next available instance (name-#) of a name
"""
n = 1
username = (firstname[0:1] + lastname[0:8]).lower()
while username in username_cache:
n += 1
username = (firstname[0:1] + lastname[0:8] + str(n)).lower()
username_cache[username] = 1 # cache it
return username
async def get_ise_identitygroup_id(session: aiohttp.ClientSession = None, name: str = "Employee"):
"""
Returns the id of the ISE identitygroup with the specified name.
"""
response = await session.get(f"/ers/config/identitygroup/name/{name}")
return (await response.json()).popitem()[1]["id"] # popitem returns (k,v)
def generate_random_internaluser_data(username: str = None, password: str = None, groupid: str = None):
"""
Return an internaluser object ready for conversion to JSON.
"""
firstname = faker.first_name()
lastname = faker.last_name()
username = make_username(firstname, lastname) if username is None else username
password = "C1sco12345" if password is None else password
resource = {
"InternalUser": {
"name": username,
"description": "", # faker.sentence(nb_words=8),
"enabled": True,
"password": password,
"email": f"{username}@domain.com",
"firstName": firstname,
"lastName": lastname,
"identityGroups": groupid,
"passwordIDStore": "Internal Users",
"changePassword": False,
# 'enablePassword' : "enablePassword",
"expiryDateEnabled": False,
# 'expiryDate' : faker.past_date(start_date='+3M').isoformat(),
# π‘ ISE 3.2+ :
"passwordNeverExpires": True,
# 'accountNameAlias' : 'user123',
# 'daysForPasswordExpiration' : 60,
# π‘ ISE 3.3+ :
# 'dateModified' : faker.past_date(start_date='-1m').isoformat(),
# 'dateCreated' : faker.past_date(start_date='-1m').isoformat(),
# Custom Attributes
"customAttributes": {},
# "key1": "value1",
# "key2": "value2"
# 'Created' : faker.past_datetime(start_date='-5y').isoformat(sep='T'),
# 'Updated' : faker.past_datetime(start_date='-6m').isoformat(sep='T'),
# Organization
# 'Owner-First-Name' : firstname,
# 'Owner-Last-Name' : lastname,
# 'Owner_Email' : username,
# 'Department' : random.choice(CORPORATE_DEPARTMENTS),
# 'Zone' : random.choice(ZONES),
# 'Authorization' : random.choice(['Internet','Employee','Quarantine','Guest','IOT']),
# Location
# 'Site' : location.iloc[0]['City3'],
# 'Building' : location.iloc[0]['Building'],
# 'Floor' : '',
# 'Room' : '',
# Network
# "Authorization": "",
# "Expiration": "",
# "iPSK": ""
# 'Network_Type' : network_type,
# 'iPSK' : faker.password(12) if (network_type == 'wireless' and os.iloc[0].Type in ['RHEL','Linux']) else '',
# 'MAC' : faker.mac_address().upper(),
# 'Endpoint-IPv4-Static' : faker.ipv4_private() if faker.boolean(.1) else '',
# 'Identity-Group' : 'Employee',
# },
}
}
return resource
async def get_resource(session, url):
async with session.get(url, ssl=False) as resp:
response = await resp.json()
return response["SearchResult"]["resources"]
async def cache_existing_internalusers(session):
"""
Reads existing ISE internalusers and saves them to the username_cache so we do not create an existing user.
"""
# if args.verbose: print(f"β Caching existing users ...", file=sys.stderr)
rest_endpoint_path = "/ers/config/internaluser"
response = await session.get(f"{rest_endpoint_path}?size={REST_PAGE_SIZE}")
if response.status != 200:
raise ValueError(f"Bad status: {response}")
json = await response.json()
resources = json["SearchResult"]["resources"]
existing_user_count = json["SearchResult"]["total"]
if existing_user_count > REST_PAGE_SIZE: # we will need more than one fetch
pages = int(existing_user_count / REST_PAGE_SIZE) + (1 if existing_user_count % REST_PAGE_SIZE else 0)
urls = []
for page in range(1, pages + 1):
urls.append(f"{rest_endpoint_path}?size={REST_PAGE_SIZE}&page={page}")
urls.pop(0) # discard first URL; used for the count above
tasks = [asyncio.ensure_future(get_resource(session, url)) for url in urls]
responses = await asyncio.gather(*tasks)
[resources.extend(response) for response in responses]
for resource in resources: # add users to the cache
username_cache[resource["name"]] = 1
return username_cache
async def ise_internaluser_creator(queue, session):
PATH = "/ers/config/internaluser"
while True:
user_dict = await queue.get() # Get an item from the queue
response = await session.post(PATH, data=json.dumps(user_dict))
if response.status == 201:
print(
f"β {response.status} | {user_dict['InternalUser']['name']} | {response.headers['Location'].split('/')[-1]}",
file=sys.stderr,
)
elif response.status == 400 and (await response.json())["ERSResponse"]["messages"][0]["title"].find("Password"):
# π ISE will randomly complain about Password Policy even though it's fine
print(f"π Password Policy error: Re-queue {user_dict['InternalUser']['name']}", file=sys.stderr)
queue.put_nowait(user_dict)
elif response.status == 401:
print(f"Set the environment variables and verify your credentials are correct! {await response.json()}", file=sys.stderr)
break
else:
error = await response.json()
print(
f"β {response.status} {user_dict['InternalUser']['name']} {error['ERSResponse']['messages'][0]['title']}", file=sys.stderr
)
queue.task_done() # Notify queue the item is processed
async def main():
"""
Entrypoint for packaged script.
"""
global args
argp = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawTextHelpFormatter)
argp.add_argument("number", action="store", type=int, default=1, help="Number of users to create")
argp.add_argument("-t", "--timer", action="store_true", default=False, help="time", required=False)
argp.add_argument("-v", "--verbose", action="count", default=0, help="Verbosity")
args = argp.parse_args()
if args.timer:
start_time = time.time()
env = {k: v for (k, v) in os.environ.items()} # Load environment variables
# Create HTTP session
base_url = f"https://{env['ISE_PPAN']}"
conn = aiohttp.TCPConnector(ssl=(False if env["ISE_CERT_VERIFY"][0:1].lower() in ["f", "n"] else True))
basic_auth = aiohttp.BasicAuth(login=env["ISE_REST_USERNAME"], password=env["ISE_REST_PASSWORD"])
json_headers = {"Accept": "application/json", "Content-Type": "application/json"}
async with aiohttp.ClientSession(base_url, auth=basic_auth, connector=conn, headers=json_headers) as session:
# Cache existing ISE users to prevent duplicates and HTTP 400 errors
username_cache = await asyncio.wait_for(cache_existing_internalusers(session), 60)
if args.verbose:
print(f"β Cached {len(username_cache)} existing users")
users_queue = asyncio.Queue() # Create a queue for the user workload
# π‘ No guarantee of default identitygroup IDs across ISE deployments!
identitygroup_id = await get_ise_identitygroup_id(session, "Employee")
# Create worker tasks to process the queue concurrently
tasks = [asyncio.create_task(ise_internaluser_creator(users_queue, session)) for ii in range(WORKERS_MAX)]
[
users_queue.put_nowait(generate_random_internaluser_data(groupid=identitygroup_id)) for n in range(1, args.number + 1)
] # enqueue a user for creation
await users_queue.join() # Wait until the queue is finished
if args.timer:
print(f"β² {'{0:.3f}'.format(time.time() - start_time)} seconds", file=sys.stderr)
if __name__ == "__main__":
"""
Run from script
"""
asyncio.run(main())