This repository has been archived by the owner on Dec 5, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 65
/
Copy pathapp.py
executable file
·360 lines (299 loc) · 12.6 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
#!/usr/bin/env python2
import logging
import multiprocessing
import os
import signal
import subprocess
import sys
import requests
processes = []
logging.basicConfig()
logger = logging.getLogger('bootstrap')
def bootstrap(_return=0):
"""
Initialize role of running docker container.
master: web interface / API.
slave: node that load test given url.
controller: node that control the automatic run.
"""
role = get_or_raise('ROLE')
logger.info('Role :{role}'.format(role=role))
if role == 'master':
target_host = get_or_raise('TARGET_HOST')
send_kpi = convert_str_to_bool(os.getenv('SEND_ANONYMOUS_USAGE_INFO', str(False)))
if send_kpi:
send_usage_statistics(target_host)
locust_file = get_locust_file()
logger.info('target host: {target}, locust file: {file}'.format(target=target_host, file=locust_file))
s = subprocess.Popen([
'locust', '-H', target_host, '--loglevel', 'debug', '--master', '-f', locust_file
])
processes.append(s)
elif role == 'slave':
try:
target_host = get_or_raise('TARGET_HOST')
locust_file = get_locust_file()
master_host = get_or_raise('MASTER_HOST')
multiplier = int(os.getenv('SLAVE_MUL', multiprocessing.cpu_count()))
except ValueError as verr:
logger.error(verr)
logger.info('target host: {target}, locust file: {file}, master: {master}, multiplier: {multiplier}'.format(
target=target_host, file=locust_file, master=master_host, multiplier=multiplier))
for _ in range(multiplier):
logger.info('Started Process')
s = subprocess.Popen([
'locust', '-H', target_host, '--loglevel', 'debug', '--slave', '-f', locust_file,
'--master-host', master_host
])
processes.append(s)
elif role == 'controller':
automatic = convert_str_to_bool(os.getenv('AUTOMATIC', str(False)))
logger.info('Automatic run: {auto}'.format(auto=automatic))
if not automatic:
return
try:
master_host = get_or_raise('MASTER_HOST')
master_url = 'http://{master}:8089'.format(master=master_host)
total_slaves = int(os.getenv('TOTAL_SLAVES')) if os.getenv('TOTAL_SLAVES') else int(
os.getenv('SLAVE_MUL', multiprocessing.cpu_count()))
# Default time duration to wait all slaves to be connected is 1 minutes / 60 seconds
slaves_check_timeout = float(os.getenv('SLAVES_CHECK_TIMEOUT', 60))
# Default sleep time interval is 10 seconds
slaves_check_interval = float(os.getenv('SLAVES_CHECK_INTERVAL', 5))
users = int(get_or_raise('USERS'))
hatch_rate = int(get_or_raise('HATCH_RATE'))
duration = int(get_or_raise('DURATION'))
logger.info(
'master url: {url}, users: {users}, hatch_rate: {rate}, duration: {duration}'.format(
url=master_url, users=users, rate=hatch_rate, duration=duration))
for _ in range(0, 5):
import time
time.sleep(3)
res = requests.get(url=master_url)
if res.ok:
timeout = time.time() + slaves_check_timeout
connected_slaves = 0
while time.time() < timeout:
try:
logger.info('Checking if all slave(s) are connected.')
stats_url = '/'.join([master_url, 'stats/requests'])
res = requests.get(url=stats_url)
connected_slaves = res.json().get('slave_count')
if connected_slaves >= total_slaves:
break
else:
logger.info('Currently connected slaves: {con}'.format(con=connected_slaves))
time.sleep(slaves_check_interval)
except ValueError as v_err:
logger.error(v_err.message)
else:
logger.warning('Connected slaves:{con} != defined slaves:{dfn}'.format(
con=connected_slaves, dfn=total_slaves))
logger.info('All slaves are succesfully connected! '
'Start load test automatically for {duration} seconds.'.format(duration=duration))
payload = {'locust_count': users, 'hatch_rate': hatch_rate}
res = requests.post(url=master_url + '/swarm', data=payload)
if res.ok:
time.sleep(duration)
requests.get(url=master_url + '/stop')
logger.info('Load test is stopped.')
time.sleep(4)
logging.info('Creating report folder.')
report_path = os.path.join(os.getcwd(), 'reports')
if not os.path.exists(report_path):
os.makedirs(report_path)
logger.info('Creating reports...')
for _url in ['requests', 'distribution']:
res = requests.get(url=master_url + '/stats/' + _url + '/csv')
with open(os.path.join(report_path, _url + '.csv'), "wb") as file:
file.write(res.content)
if _url == 'distribution':
continue
res = requests.get(url=master_url + '/stats/' + _url)
with open(os.path.join(report_path, _url + '.json'), "wb") as file:
file.write(res.content)
res = requests.get(url=master_url + '/htmlreport')
with open(os.path.join(report_path, 'reports.html'), "wb") as file:
file.write(res.content)
logger.info('Reports have been successfully created.')
else:
logger.error('Locust cannot be started. Please check logs!')
break
else:
logger.error('Attempt: {attempt}. Locust master might not ready yet.'
'Status code: {status}'.format(attempt=_, status=res.status_code))
except ValueError as v_err:
logger.error(v_err)
elif role == 'standalone':
automatic = convert_str_to_bool(os.getenv('AUTOMATIC', str(False)))
os.environ["MASTER_HOST"] = '127.0.0.1'
for role in ['master', 'slave']:
os.environ['ROLE'] = role
bootstrap(1)
if automatic:
os.environ['ROLE'] = 'controller'
bootstrap(1)
sys.exit(0)
else:
raise RuntimeError('Invalid ROLE value. Valid Options: master, slave, controller.')
if _return:
return
for s in processes:
s.communicate()
def send_usage_statistics(target_host):
"""Send user usage to Google Analytics."""
try:
ga_endpoint = 'https://www.google-analytics.com/collect'
ga_tracking_id = 'UA-110383676-1'
app_id = os.getenv('APPLICATION_ID')
build_url = os.getenv('BUILD_URL')
cdp_target_repository = os.getenv('CDP_TARGET_REPOSITORY')
image_version = os.getenv('DL_IMAGE_VERSION', 'unknown')
host_in_array = target_host.split('.')
if target_host.endswith('zalan.do'):
contains_zalando = True
elif len(host_in_array) >= 2 and 'zalando' in host_in_array[len(host_in_array) - 2]:
contains_zalando = True
else:
contains_zalando = False
if app_id:
user_type = 'internal'
user = app_id.split('-')[0]
description = 'AWS'
elif build_url and build_url.endwith('zalan.do'):
user_type = 'internal'
user = build_url.split('/')[2].split('.')[0]
description = 'Jenkins'
elif cdp_target_repository and 'github' in cdp_target_repository:
user_type = 'internal'
user = cdp_target_repository.split('/')[1]
description = 'CDP'
else:
user_type = 'external/local-machine'
with open('/proc/version', 'r') as v:
user = '_'.join(w for w in v.read().split(' ') if '@' not in w)
description = '-'
if contains_zalando:
description += '; {host}'.format(host=target_host)
payload = {
'v': '1', # API Version.
'tid': ga_tracking_id,
'cid': user,
't': 'event', # Event hit type.
'ec': user_type,
'ea': user,
'el': description,
'an': 'docker-locust',
'av': image_version,
}
for attempt in range(1, 4):
logger.info('attempt: {attempt}'.format(attempt=attempt))
res = requests.post(ga_endpoint, data=payload)
if res.ok:
logger.info('Usage statistics is successfully sent!')
break
else:
logger.warning('Usage statistics cannot be sent! response: {res}'.format(res.text))
except Exception as e:
logger.warning(e)
pass
def get_locust_file():
"""
Find locust file.
Possible parameters are:
1. S3 Bucket
2. Any http or https url e.g. raw url from GitHub
3. File from 'locust-script' folder
:return: file_name
:rtype: str
"""
files = get_files()
file_name = None
for file in files:
# Download from s3 bucket
if file.startswith('s3://'):
if file.endswith('.py'):
file_name = os.path.basename(file)
_, _, bucket, path = file.split('/', 3)
f = os.path.basename(file)
import boto3
import botocore
s3 = boto3.resource('s3')
try:
s3.Bucket(bucket).download_file(path, f)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
logger.error('File cannot be found!')
else:
raise
# Download from http or https
elif file.startswith('http'):
logger.info('Load test script from http or https url')
import wget
try:
if file.endswith('.py'):
file_name = wget.download(file)
else:
wget.download(file)
except:
logger.error('File cannot be downloaded! Please check given url!')
# Share volume with local machine
else:
logger.info('Load test script from local machine')
if file.endswith('.py'):
file_name = file if file.startswith('/') else '/'.join(['script', file])
logger.info('load test file: {f}'.format(f=file_name))
return file_name
def get_files():
"""
Check user input and return all valid files.
:return: files
:rtype: list
"""
given_input = get_or_raise('LOCUST_FILE')
logger.info('Given input: {input}'.format(input=given_input))
files = [i.strip() for i in given_input.split(',')]
logger.info('Files: {files}'.format(files=files))
python_files = [file for file in files if str(file).endswith('py')]
if not python_files:
logger.error('There is no python file!')
sys.exit(1)
elif python_files.__len__() > 1:
logger.error('There are more than 1 python files!')
sys.exit(1)
else:
logger.info('Check passed!')
return files
def convert_str_to_bool(str):
"""
Convert string to boolean.
:param str: given string
:type str: str
:return: converted string
:rtype: bool
"""
if isinstance(str, basestring):
return str.lower() in ('yes', 'true', 't', '1')
else:
return False
def get_or_raise(env):
"""
Check if needed environment variables are given.
:param env: key
:type env: str
:return: value
:rtype: str
"""
env_value = os.getenv(env)
if not env_value:
raise RuntimeError('The environment variable {0:s} should be set.'.format(env))
return env_value
def kill(signal, frame):
logger.info('Received KILL signal')
for s in processes:
s.kill(s)
if __name__ == '__main__':
logger.setLevel(logging.INFO)
logger.info('Started main')
signal.signal(signal.SIGTERM, kill)
bootstrap()