Skip to content

Commit

Permalink
Merge pull request #281 from multiscale/issue_274_missing_shutdown_wait
Browse files Browse the repository at this point in the history
Fix and improve profiling
  • Loading branch information
LourensVeen authored Dec 18, 2023
2 parents 5a49314 + c8ee1e5 commit 3e7d118
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 56 deletions.
29 changes: 18 additions & 11 deletions libmuscle/cpp/src/libmuscle/instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -807,8 +807,12 @@ void Instance::Impl::check_port_(std::string const & port_name) {
* @return false iff the port is connected and ClosePort was received.
*/
bool Instance::Impl::receive_settings_() {
Message default_message(0.0, Settings(), Settings());
auto msg = communicator_->receive_message("muscle_settings_in", {}, default_message);
if (!communicator_->settings_in_connected()) {
settings_manager_.overlay = Settings();
return true;
}

auto msg = communicator_->receive_message("muscle_settings_in", {});
if (is_close_port(msg.data()))
return false;

Expand All @@ -826,6 +830,7 @@ bool Instance::Impl::receive_settings_() {
for (auto const & key_val : msg.data().as<Settings>())
settings[key_val.first] = key_val.second;
settings_manager_.overlay = settings;

return true;
}

Expand All @@ -849,16 +854,12 @@ bool Instance::Impl::have_f_init_connections_() {
* @return true iff no ClosePort messages were received.
*/
bool Instance::Impl::pre_receive_() {
ProfileEvent sw_event(ProfileEventType::shutdown_wait, ProfileTimestamp());

bool all_ports_open = receive_settings_();
pre_receive_f_init_();
for (auto const & ref_msg : f_init_cache_)
if (is_close_port(ref_msg.second.data()))
all_ports_open = false;

if (!all_ports_open)
profiler_->record_event(std::move(sw_event));
return all_ports_open;
}

Expand Down Expand Up @@ -937,6 +938,9 @@ bool Instance::Impl::decide_reuse_instance_() {
#ifdef MUSCLE_ENABLE_MPI
if (mpi_barrier_.is_root()) {
#endif
// only actually happens if we don't reuse, start recording just in case
ProfileEvent sw_event(ProfileEventType::shutdown_wait, ProfileTimestamp());

bool f_init_connected = have_f_init_connections_();
if (first_run_.get() && snapshot_manager_->resuming_from_intermediate()) {
// resume from intermediate
Expand All @@ -946,10 +950,10 @@ bool Instance::Impl::decide_reuse_instance_() {
} else if (first_run_.get() && snapshot_manager_->resuming_from_final()) {
// resume from final
if (f_init_connected) {
bool got_f_init_messages = pre_receive_();
bool no_closed_ports = pre_receive_();
do_resume_ = true;
do_init_ = true;
do_reuse = got_f_init_messages;
do_reuse = no_closed_ports;
} else {
do_resume_ = false;
do_init_ = false;
Expand All @@ -965,12 +969,15 @@ bool Instance::Impl::decide_reuse_instance_() {
do_reuse = first_run_.get();
} else {
// not resuming and f_init connected, run while we get messages
bool got_f_init_messages = pre_receive_();
do_init_ = got_f_init_messages;
do_reuse = got_f_init_messages;
bool no_closed_ports = pre_receive_();
do_init_ = no_closed_ports;
do_reuse = no_closed_ports;
}
}

if (!do_reuse)
profiler_->record_event(std::move(sw_event));

#ifdef MUSCLE_ENABLE_MPI
mpi_barrier_.signal();
int do_reuse_mpi[3] = {do_reuse, do_resume_, do_init_};
Expand Down
2 changes: 2 additions & 0 deletions libmuscle/cpp/src/libmuscle/tests/test_instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ TEST(libmuscle_instance, receive_parallel_universe) {
port.set_closed();
MockCommunicator::get_port_return_value.emplace("in", port);

MockCommunicator::settings_in_connected_return_value = true;

Settings recv_settings;
recv_settings["test1"] = 12;
MockCommunicator::next_received_message["in"] =
Expand Down
63 changes: 36 additions & 27 deletions libmuscle/python/libmuscle/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -810,38 +810,47 @@ def _decide_reuse_instance(self) -> bool:
elif self._first_run:
self._first_run = False

# resume from intermediate
do_reuse = False

# only actually happens if we don't reuse, start recording just in case
sw_event = ProfileEvent(ProfileEventType.SHUTDOWN_WAIT, ProfileTimestamp())

f_init_connected = self._have_f_init_connections()
if self._first_run and self._snapshot_manager.resuming_from_intermediate():
# resume from intermediate
self._do_resume = True
self._do_init = False
return True

f_init_connected = self._have_f_init_connections()

# resume from final
if self._first_run and self._snapshot_manager.resuming_from_final():
do_reuse = True
elif self._first_run and self._snapshot_manager.resuming_from_final():
# resume from final
if f_init_connected:
got_f_init_messages = self._pre_receive()
no_closed_ports = self._pre_receive()
self._do_resume = True
self._do_init = True
return got_f_init_messages
do_reuse = no_closed_ports
else:
self._do_resume = False # unused
self._do_init = False # unused
return False
do_reuse = False
else:
# fresh start or resuming from implicit snapshot
self._do_resume = False

# fresh start or resuming from implicit snapshot
self._do_resume = False
no_closed_ports = self._pre_receive()

# simple straight single run without resuming
if not f_init_connected:
self._do_init = self._first_run
return self._first_run
if not f_init_connected:
# simple straight single run without resuming
self._do_init = self._first_run
do_reuse = self._first_run
else:
# not resuming and f_init connected, run while we get messages
self._do_init = no_closed_ports
do_reuse = no_closed_ports

# not resuming and f_init connected, run while we get messages
got_f_init_messages = self._pre_receive()
self._do_init = got_f_init_messages
return got_f_init_messages
if not do_reuse:
self._profiler.record_event(sw_event)

return do_reuse

def _save_snapshot(
self, message: Optional[Message], final: bool,
Expand Down Expand Up @@ -1005,17 +1014,12 @@ def _pre_receive(self) -> bool:
Returns:
True iff no ClosePort messages were received.
"""
sw_event = ProfileEvent(ProfileEventType.SHUTDOWN_WAIT, ProfileTimestamp())

all_ports_open = self.__receive_settings()
self.__pre_receive_f_init()
for message in self._f_init_cache.values():
if isinstance(message.data, ClosePort):
all_ports_open = False

if not all_ports_open:
self._profiler.record_event(sw_event)

return all_ports_open

def __receive_settings(self) -> bool:
Expand All @@ -1024,11 +1028,16 @@ def __receive_settings(self) -> bool:
Returns:
False iff the port is connnected and ClosePort was received.
"""
default_message = Message(0.0, None, Settings(), Settings())
if not self._communicator.settings_in_connected():
self._settings_manager.overlay = Settings()
return True

message, saved_until = self._communicator.receive_message(
'muscle_settings_in', None, default_message)
'muscle_settings_in', None)

if isinstance(message.data, ClosePort):
return False

if not isinstance(message.data, Settings):
err_msg = ('"{}" received a message on'
' muscle_settings_in that is not a'
Expand Down
42 changes: 31 additions & 11 deletions libmuscle/python/libmuscle/manager/profile_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import threading
from types import TracebackType
from typing import Any, cast, Dict, List, Optional, Tuple, Type, Union
from warnings import warn


class ProfileDatabase:
Expand Down Expand Up @@ -94,24 +95,42 @@ def instance_stats(
"""
cur = self._get_cursor()
cur.execute("BEGIN TRANSACTION")
cur.execute("SELECT name FROM instances")
instances = [row[0] for row in cur.fetchall()]

cur.execute(
"SELECT instance, stop_time"
" FROM all_events"
" WHERE type = 'CONNECT'")
start_run = dict(cur.fetchall())

for name in instances:
if name not in start_run:
warn(
f'Instance {name} seems to have never registered with the'
' manager, and will be omitted from the results. Did it crash'
' on startup?')

cur.execute(
"SELECT instance, start_time"
" FROM all_events"
" WHERE type = 'SHUTDOWN_WAIT'")
stop_run = dict(cur.fetchall())

if not stop_run:
cur.execute(
"SELECT instance, start_time"
" FROM all_events"
" WHERE type = 'DEREGISTER'")
stop_run = dict(cur.fetchall())
for name in instances:
if name not in stop_run:
warn(
f'Instance {name} did not shut down cleanly, data may be'
' inaccurate or missing')

cur.execute(
"SELECT stop_time"
" FROM all_events"
" WHERE instance = ?"
" ORDER BY stop_time DESC LIMIT 1", [name])
result = cur.fetchall()
if result:
stop_run[name] = result[0][0]

cur.execute(
"SELECT instance, SUM(stop_time - start_time)"
Expand All @@ -130,20 +149,21 @@ def instance_stats(
cur.execute("COMMIT")
cur.close()

instances = list(start_run.keys())
total_times = [(stop_run[i] - start_run[i]) * 1e-9 for i in instances]
complete_instances = list(set(start_run) & set(stop_run))

total_times = [(stop_run[i] - start_run[i]) * 1e-9 for i in complete_instances]
comm_times = [
(
(comm[i] if i in comm else 0) -
(wait[i] if i in wait else 0)
) * 1e-9
for i in instances]
wait_times = [(wait[i] if i in wait else 0) * 1e-9 for i in instances]
for i in complete_instances]
wait_times = [(wait[i] if i in wait else 0) * 1e-9 for i in complete_instances]
run_times = [
t - c - w
for t, c, w in zip(total_times, comm_times, wait_times)]

return instances, run_times, comm_times, wait_times
return complete_instances, run_times, comm_times, wait_times

def resource_stats(self) -> Dict[str, Dict[str, float]]:
"""Calculate per-core statistics.
Expand Down
14 changes: 8 additions & 6 deletions libmuscle/python/libmuscle/manager/test/test_profile_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ def t(offset: int) -> ProfileTimestamp:
ProfileEvent(
ProfileEventType.SEND, t(2600), t(2900),
Port('out', Operator.O_I), None, None, 1000000, 0.0),
ProfileEvent(ProfileEventType.DEREGISTER, t(10000), t(11000))]
ProfileEvent(ProfileEventType.SHUTDOWN_WAIT, t(10000), t(11000)),
ProfileEvent(ProfileEventType.DEREGISTER, t(11000), t(11100))]

store.add_events(Reference('instance1'), e1)

Expand All @@ -73,7 +74,8 @@ def t(offset: int) -> ProfileTimestamp:
ProfileEvent(
ProfileEventType.RECEIVE_WAIT, t(2600), t(2870),
Port('in', Operator.O_I), None, None, 1000000, 0.0),
ProfileEvent(ProfileEventType.DEREGISTER, t(10000), t(11000))]
ProfileEvent(ProfileEventType.SHUTDOWN_WAIT, t(10000), t(11000)),
ProfileEvent(ProfileEventType.DEREGISTER, t(11000), t(11100))]

store.add_events(Reference('instance2'), e2)

Expand Down Expand Up @@ -126,12 +128,12 @@ def test_resource_stats(db_file):
def test_time_taken(db_file):
with ProfileDatabase(db_file) as db:
assert 1000.0 == db.time_taken(etype='REGISTER', instance='instance1')
assert 1000.0 == db.time_taken(etype='DEREGISTER')
assert 11000.0 == db.time_taken(
assert 100.0 == db.time_taken(etype='DEREGISTER')
assert 11100.0 == db.time_taken(
etype='REGISTER', instance='instance1', etype2='DEREGISTER')
assert 9000.0 == db.time_taken(
assert 10000.0 == db.time_taken(
etype='REGISTER', instance='instance1', time='stop',
etype2='DEREGISTER', time2='start')
assert 200.0 == db.time_taken(etype='SEND')
assert 2000.0 == db.time_taken(etype='DEREGISTER', aggregate='sum')
assert 200.0 == db.time_taken(etype='DEREGISTER', aggregate='sum')
assert 600.0 == db.time_taken(etype='SEND', aggregate='sum')
3 changes: 2 additions & 1 deletion libmuscle/python/libmuscle/test/test_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ def get_port(name):
msg = Message(0.0, 1.0, 'message')
msg_with_settings = Message(0.0, 1.0, 'message', overlay_settings)

def receive_message(name, slot, default):
def receive_message(name, slot, default=None):
if 'not_connected' in name:
assert default is not None
return default, 10.0
if 'settings' in name:
return msg_with_settings, 10.0
Expand Down

0 comments on commit 3e7d118

Please sign in to comment.