Skip to content
This repository has been archived by the owner on Dec 5, 2022. It is now read-only.

Commit

Permalink
probe for master readiness and number of connected slaves
Browse files Browse the repository at this point in the history
  • Loading branch information
scherniavsky committed Nov 29, 2018
1 parent 1f19a41 commit 1451724
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 91 deletions.
6 changes: 2 additions & 4 deletions example/simple.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
from locust import HttpLocust
from locust import TaskSet
from locust import task
from locust.web import app

# For HTML reporting
from locust.web import app
from src import report

# For reporting
app.add_url_rule('/htmlreport', 'htmlreport', report.download_report)


class SimpleBehavior(TaskSet):

@task
Expand Down
5 changes: 2 additions & 3 deletions example/simple_post.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
from locust import HttpLocust
from locust import TaskSet
from locust import task
from locust.web import app

# For HTML reporting
from locust.web import app
from src import report

# For reporting
app.add_url_rule('/htmlreport', 'htmlreport', report.download_report)

# Read json file
Expand Down
161 changes: 92 additions & 69 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
import multiprocessing
import os

import requests
import signal
import subprocess
import sys

import requests
import time

processes = []
logging.basicConfig()
Expand Down Expand Up @@ -50,6 +50,9 @@ def bootstrap(_return=0):

logger.info('target host: {target}, locust file: {file}, master: {master}, multiplier: {multiplier}'.format(
target=target_host, file=locust_file, master=master_host, multiplier=multiplier))

wait_for_master()

for _ in range(multiplier):
logger.info('Started Process')
s = subprocess.Popen([
Expand All @@ -72,81 +75,76 @@ def bootstrap(_return=0):
os.getenv('SLAVE_MUL', multiprocessing.cpu_count()))
# Default time duration to wait all slaves to be connected is 1 minutes / 60 seconds
slaves_check_timeout = float(os.getenv('SLAVES_CHECK_TIMEOUT', 60))
# Default sleep time interval is 10 seconds
slaves_check_interval = float(os.getenv('SLAVES_CHECK_INTERVAL', 5))
# Default sleep time interval is 3 seconds
slaves_check_interval = float(os.getenv('SLAVES_CHECK_INTERVAL', 3))
users = int(get_or_raise('USERS'))
hatch_rate = int(get_or_raise('HATCH_RATE'))
duration = int(get_or_raise('DURATION'))
logger.info(
'master url: {url}, users: {users}, hatch_rate: {rate}, duration: {duration}'.format(
url=master_url, users=users, rate=hatch_rate, duration=duration))

for _ in range(0, 5):
import time
time.sleep(3)

res = requests.get(url=master_url)
if res.ok:
timeout = time.time() + slaves_check_timeout
connected_slaves = 0
while time.time() < timeout:
try:
logger.info('Checking if all slave(s) are connected.')
stats_url = '/'.join([master_url, 'stats/requests'])
res = requests.get(url=stats_url)
connected_slaves = res.json().get('slave_count')

if connected_slaves >= total_slaves:
break
else:
logger.info('Currently connected slaves: {con}'.format(con=connected_slaves))
time.sleep(slaves_check_interval)
except ValueError as v_err:
logger.error(v_err.message)
else:
logger.warning('Connected slaves:{con} != defined slaves:{dfn}'.format(
con=connected_slaves, dfn=total_slaves))

logger.info('All slaves are succesfully connected! '
'Start load test automatically for {duration} seconds.'.format(duration=duration))
payload = {'locust_count': users, 'hatch_rate': hatch_rate}
res = requests.post(url=master_url + '/swarm', data=payload)

if res.ok:
time.sleep(duration)
requests.get(url=master_url + '/stop')
logger.info('Load test is stopped.')

time.sleep(4)

logging.info('Creating report folder.')
report_path = os.path.join(os.getcwd(), 'reports')
if not os.path.exists(report_path):
os.makedirs(report_path)

logger.info('Creating reports...')
for _url in ['requests', 'distribution']:
res = requests.get(url=master_url + '/stats/' + _url + '/csv')
with open(os.path.join(report_path, _url + '.csv'), "wb") as file:
file.write(res.content)

if _url == 'distribution':
continue
res = requests.get(url=master_url + '/stats/' + _url)
with open(os.path.join(report_path, _url + '.json'), "wb") as file:
file.write(res.content)

res = requests.get(url=master_url + '/htmlreport')
with open(os.path.join(report_path, 'reports.html'), "wb") as file:
file.write(res.content)
logger.info('Reports have been successfully created.')
wait_for_master()

timeout = time.time() + slaves_check_timeout
connected_slaves = 0
while time.time() < timeout:
try:
logger.info('Checking if all slave(s) are connected.')
stats_url = '/'.join([master_url, 'stats/requests'])
res = requests.get(url=stats_url)
connected_slaves = res.json().get('slave_count')

if connected_slaves >= total_slaves:
break
else:
logger.error('Locust cannot be started. Please check logs!')
logger.info('Currently connected slaves: {con}'.format(con=connected_slaves))

except ValueError as v_err:
logger.error(v_err.message)

time.sleep(slaves_check_interval)
else:
logger.error('Connected slaves:{con} < defined slaves:{dfn}'.format(
con=connected_slaves, dfn=total_slaves))
raise RuntimeError('The Slaves did not connect in time.')

logger.info('All slaves are succesfully connected! '
'Start load test automatically for {duration} seconds.'.format(duration=duration))
payload = {'locust_count': users, 'hatch_rate': hatch_rate}
res = requests.post(url=master_url + '/swarm', data=payload)

if res.ok:
time.sleep(duration)
requests.get(url=master_url + '/stop')
logger.info('Load test is stopped.')

time.sleep(4)

logging.info('Creating reports folder.')
report_path = os.path.join(os.getcwd(), 'reports')
if not os.path.exists(report_path):
os.makedirs(report_path)

logger.info('Creating reports...')
for _url in ['requests', 'distribution']:
res = requests.get(url=master_url + '/stats/' + _url + '/csv')
with open(os.path.join(report_path, _url + '.csv'), "wb") as file:
file.write(res.content)

if _url == 'distribution':
continue
res = requests.get(url=master_url + '/stats/' + _url)
with open(os.path.join(report_path, _url + '.json'), "wb") as file:
file.write(res.content)

res = requests.get(url=master_url + '/htmlreport')
with open(os.path.join(report_path, 'reports.html'), "wb") as file:
file.write(res.content)
logger.info('Reports have been successfully created.')
else:
logger.error('Locust cannot be started. Please check logs!')

break
else:
logger.error('Attempt: {attempt}. Locust master might not ready yet.'
'Status code: {status}'.format(attempt=_, status=res.status_code))
except ValueError as v_err:
logger.error(v_err)

Expand All @@ -164,7 +162,7 @@ def bootstrap(_return=0):
sys.exit(0)

else:
raise RuntimeError('Invalid ROLE value. Valid Options: master, slave, controller.')
raise RuntimeError('Invalid ROLE value. Valid Options: master, slave, controller, standalone.')

if _return:
return
Expand Down Expand Up @@ -352,6 +350,31 @@ def kill(signal, frame):
s.kill(s)


def wait_for_master():
master_host = get_or_raise('MASTER_HOST')
master_url = 'http://{master}:8089'.format(master=master_host)

# Wait for the master to come online during SLAVES_CHECK_TIMEOUT
master_check_timeout = float(os.getenv('MASTER_CHECK_TIMEOUT', 60))
# Default sleep time interval is 3 seconds
master_check_interval = float(os.getenv('MASTER_CHECK_INTERVAL', 3))

timeout = time.time() + master_check_timeout
cnt = 1
while time.time() < timeout:
try:
res = requests.get(url=master_url, timeout=1)
if res.ok:
logger.info('Locust master is ready.')
return
except requests.exceptions.ConnectionError:
pass
logger.warning('Attempt: {attempt}. Locust master is not ready yet.'.format(attempt=cnt))
cnt += 1
time.sleep(master_check_interval)
raise RuntimeError('The master did not start in time.')


if __name__ == '__main__':
logger.setLevel(logging.INFO)
logger.info('Started main')
Expand Down
42 changes: 27 additions & 15 deletions src/tests/test_bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,34 @@ def test_valid_master(self, popen, mocked_send_usage):
self.assertTrue(mocked_send_usage.called)

@mock.patch('subprocess.Popen')
def test_valid_slave(self, mocked_popen):
def test_master_not_ready_in_time(self, popen):
os.environ['ROLE'] = 'slave'
os.environ['TARGET_HOST'] = 'https://test.com'
os.environ['MASTER_HOST'] = '127.0.0.1'
os.environ['SLAVE_MUL'] = '1'
os.environ['MASTER_CHECK_TIMEOUT'] = '0.3'
os.environ['MASTER_CHECK_INTERVAL'] = '0.1'

with mock.patch('src.app.get_locust_file') as file:
with self.assertRaises(RuntimeError) as e:
bootstrap()
self.assertFalse(file.called)
self.assertFalse(popen.called)
self.assertEqual('The Master did not start in time.', str(e.exception))


@mock.patch('subprocess.Popen')
@requests_mock.Mocker()
def test_valid_slave(self, mocked_popen, mocked_request):
os.environ['ROLE'] = 'slave'
os.environ['TARGET_HOST'] = 'https://test.com'
os.environ['MASTER_HOST'] = '127.0.0.1'
os.environ['SLAVE_MUL'] = '3'
os.environ['SLAVES_CHECK_TIMEOUT'] = '0.3'
os.environ['SLAVES_CHECK_INTERVAL'] = '0.1'

MASTER_URL = 'http://127.0.0.1:8089'
mocked_request.get(url=MASTER_URL, text='ok')
with mock.patch('src.app.get_locust_file') as file:
bootstrap()
self.assertTrue(file.called)
Expand Down Expand Up @@ -81,14 +103,9 @@ def test_valid_controller_automatic(self, mocked_timeout, mocked_dir, mocked_ope
for endpoint in ['stop', 'stats/requests/csv', 'stats/distribution/csv', 'htmlreport']:
mocked_request.get(url='/'.join([MASTER_URL, endpoint]), text='ok')

self.assertFalse(mocked_timeout.called)
self.assertFalse(mocked_request.called)
#self.assertFalse(mocked_dir.called)
self.assertFalse(mocked_open.called)
bootstrap()
self.assertTrue(mocked_timeout.called)
self.assertTrue(mocked_request.called)
#self.assertTrue(mocked_dir.called)
self.assertTrue(mocked_open.called)

@mock.patch('time.sleep')
Expand All @@ -113,15 +130,10 @@ def test_slaves_not_fully_connected(self, mocked_timeout, mocked_dir, mocked_ope
for endpoint in ['stop', 'stats/requests/csv', 'stats/distribution/csv', 'htmlreport']:
mocked_request.get(url='/'.join([MASTER_URL, endpoint]), text='ok')

self.assertFalse(mocked_timeout.called)
self.assertFalse(mocked_request.called)
#self.assertFalse(mocked_dir.called)
self.assertFalse(mocked_open.called)
bootstrap()
self.assertTrue(mocked_timeout.called)
self.assertTrue(mocked_request.called)
#self.assertTrue(mocked_dir.called)
self.assertTrue(mocked_open.called)
with self.assertRaises(RuntimeError):
bootstrap()
self.assertFalse(mocked_request.called)
self.assertFalse(mocked_open.called)

def test_invalid_role(self):
os.environ['ROLE'] = 'unknown'
Expand Down

0 comments on commit 1451724

Please sign in to comment.