-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathec.py
206 lines (160 loc) · 6.91 KB
/
ec.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
from urllib.error import HTTPError
from urllib.parse import urlparse
from urllib.parse import parse_qs
from urllib.parse import unquote
import requests
import time
import json
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class ExtractChangesMGR:
def __init__(self, url, portalContext, serverContext, username, password, arcgis=False):
self.url = url
self.portalContext = portalContext
self.serverContext = serverContext
self.username = username
self.password = password
self.arcgis = arcgis
self.common_headers = {}
self.common_query_parameters = {"f": "json"}
self.token = None
self.token = self._get_token(url, username, password) if not arcgis else None
if self.token:
self.common_query_parameters.update({"token":self.token})
def _get_token(self, url, username, password):
if not self.arcgis:
token_url = "{}/{}/sharing/rest/generateToken".format(url, self.serverContext)
portal_url = "{}/{}/sharing/rest/generateToken".format(url, self.portalContext)
else:
token_url = "{}/sharing/rest/generateToken".format(url)
portal_url = "{}/sharing/rest/generateToken".format(url)
# Token URL in a portal can have redirects
try:
token_response = requests.get(token_url, verify=True, allow_redirects=True)
except Exception as e:
raise Exception
token_url = token_response.url
token_referrer = portal_url
token_params = {
"username": username,
"password": password,
"client": "referer",
"referer": token_referrer,
"expiration": 1440, # 24 hour token
"f": "json",
}
try:
return self._make_request(portal_url, token_params)['token']
except Exception as e:
print("Failed to get a portal token: \n {}".format(e))
def _make_request(self, u, query_parameters=None, headers=None, timeout=None, files=None):
"""Make a POST HTTP request and return the JSON response."""
if not query_parameters:
query_parameters = {}
if not headers:
headers = {}
# Add token and f=pjson to query parameters
query_parameters.update(self.common_query_parameters)
# Convert any lists or dicts in query parameters to string so that requests.post can correctly pass them when
# making REST API call.
post_data = {}
for param_name, param_value in query_parameters.items():
if isinstance(param_value, (dict, list)):
post_data[param_name] = json.dumps(param_value)
else:
post_data[param_name] = param_value
# Add referrer to the headers
headers.update(self.common_headers)
if u:
try:
response = requests.post(u, data=post_data, headers=headers, timeout=timeout, files=files, verify=False)
if response.status_code == 200:
response_json = response.json()
else:
raise HTTPError(u, response.status_code, response.reason)
except Exception as e:
print("ERROR: \n{}".format(e))
raise
if "error" in response_json or ("status" in response_json and response_json["status"] == "error"):
error_reason = []
for key in ("reason", "messages"):
if key in response_json:
value = response_json[key]
if isinstance(value, list):
error_reason.append(", ".join(value))
else:
error_reason.append(value)
print(error_reason)
return response_json
def parseResults(self, resultJSONURL):
resultJSON = self._make_request(resultJSONURL)
if "edits" in resultJSON:
print("Found {} edits in the changes file".format(len(resultJSON['edits'])))
else:
print("No edits found")
return resultJSON
def waitForStauts(self, statusURL):
statusPayload = {"status":"foo"}
counter = 0
while statusPayload["status"].upper() != "COMPLETED":
statusPayload = self._make_request(statusURL)
counter += 1
time.sleep(1)
if counter == 20:
print("No results after 20 seconds, something is probably wrong")
continue
try:
return statusPayload['resultUrl']
except Exception:
print("Failed to get a resultURL")
def getExtractChanges(self, changesURL):
# Depending on the type of changes happening in your feature service,
# the following parameters can be updated to get back specific types
# of changes. Below is set to return any Insert, Update or Delete
if not self.arcgis:
parse = urlparse(changesURL)
extract_params = {
#"token": self.token,
"serverGens": parse_qs(parse.query)['serverGens'][0],
"returnInserts": "true",
"returnUpdates": "true",
"returnDeletes": "true",
"returnAttachments": "false",
"returnAttachmentsDataByUrl": "false"
}
if self.token:
headers = {"token":self.token}
else:
headers = {}
changePayload = self._make_request(changesURL.split("?")[0], extract_params, headers)
else:
noquote = unquote(changesURL)
changePayload = self._make_request(noquote)
if "statusUrl" in changePayload:
return changePayload['statusUrl']
else:
print("No status URL found, cannot proceed")
def doExtractChanges(self, changesURL):
statusURL = self.getExtractChanges(changesURL)
resultJSONURL = self.waitForStauts(statusURL)
changeResults = self.parseResults(resultJSONURL)
return changeResults
''' __main__ used to run this as a stand alone script.
When used in a larger webhook workflow, this script would be imported and the
ExtractChangesMGR object would be used.
'''
if __name__ == "__main__":
url = "https://www.server.com/"
pContext = "portal"
sContext = "server"
username = "admin"
pword = "password"
changeURL = "https://www.server.com/server/rest/services/ServiceName/FeatureServer/extractChanges?serverGens=%5B1730385704442,1730385704443%5D"
ec = ExtractChangesMGR(url, pContext, sContext, username, pword)
changes = ec.doExtractChanges(changeURL)
if changes:
if "edits" in changes:
for e in changes['edits']:
print(e)
else:
print("No edits found in response")