Skip to content

Commit

Permalink
Fix global/local core/cpu confusion on SMT systems
Browse files Browse the repository at this point in the history
  • Loading branch information
LourensVeen committed Nov 30, 2024
1 parent 8e5e7eb commit 2601d87
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self, agent_dir: Path) -> None:
def get_resources(self) -> Dict[str, List[FrozenSet[int]]]:
"""Return detected resources.
This returns a list of tuples of logical hwthread ids for each core per node.
This returns a list of sets of logical hwthread ids per core, per node.
Called by NativeInstantiator.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ class GlobalResources:
Attributes:
scheduler: The HPC scheduler we're running under, if any.
nodes: List of hostnames of available nodes to run on.
cores_per_node: Number of cores available on each node. List alongside nodes.
logical_cpus_per_node: Number of cores available on each node.
List alongside nodes.
"""
def __init__(self) -> None:
"""Create a GlobalResources.
Expand All @@ -38,16 +39,17 @@ def __init__(self) -> None:
_logger.info('Detected a SLURM allocation')
self.scheduler = Scheduler.SLURM
self.nodes = slurm.get_nodes()
self.cores_per_node = slurm.get_cores_per_node()
self.logical_cpus_per_node = slurm.get_logical_cpus_per_node()
_logger.info(
f'We have {len(self.nodes)} nodes and a total of'
f' {sum(self.cores_per_node)} cores available')
f' {sum(self.logical_cpus_per_node)} logical CPUs available')
else:
_logger.info('Running locally without a cluster scheduler')
self.scheduler = Scheduler.NONE
self.nodes = [gethostname()]
self.cores_per_node = [psutil.cpu_count(logical=False)]
_logger.info(f'We have {self.cores_per_node[0]} cores available')
self.logical_cpus_per_node = [psutil.cpu_count(logical=True)]
_logger.info(
f'We have {self.logical_cpus_per_node[0]} logical CPUS available')

def on_cluster(self) -> bool:
"""Return whether we're running on a cluster."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,15 +306,16 @@ def _send_resources(self) -> None:
step outside our bounds even if the cluster doesn't constrain processes to their
assigned processors.
"""
already_logged_smt = False
resources = Resources()

agent_cores = self._agent_manager.get_resources()

env_ncores = dict(
zip(global_resources().nodes, global_resources().cores_per_node)
env_ncpus = dict(
zip(global_resources().nodes, global_resources().logical_cpus_per_node)
)

for node in env_ncores:
for node in env_ncpus:
if node not in agent_cores:
_logger.warning(
f'The environment suggests we should have node {node},'
Expand All @@ -323,26 +324,36 @@ def _send_resources(self) -> None:
else:
resources.cores[node] = set(agent_cores[node])

env_nncores = env_ncores[node]
env_nncpus = env_ncpus[node]
ag_nncores = len(agent_cores[node])
if ag_nncores < env_nncores:
ag_nnthreads = sum((len(ts) for ts in agent_cores[node]))

if ag_nncores != ag_nnthreads and ag_nnthreads == env_nncpus:
if not already_logged_smt:
_logger.info(
'Detected SMT (hyperthreading) as available and'
' enabled. Note that MUSCLE3 will assign whole cores to'
' each thread or MPI process.')
already_logged_smt = True

elif ag_nncores < env_nncpus:
_logger.warning(
f'Node {node} should have {env_nncores} cores available,'
f'Node {node} should have {env_nncpus} cores available,'
f' but the agent reports only {ag_nncores} available to it.'
f' We\'ll use the {ag_nncores} we seem to have.')

resources.cores[node] = set(agent_cores[node])

elif env_nncores < ag_nncores:
elif env_nncpus < ag_nncores:
_logger.warning(
f'Node {node} should have {env_nncores} cores available,'
f'Node {node} should have {env_nncpus} cores available,'
f' but the agent reports {ag_nncores} available to it.'
' Maybe the cluster does not constrain resources? We\'ll'
f' use the {env_nncores} that we should have got.')
resources.cores[node] = set(agent_cores[node][:env_nncores])
f' use the {env_nncpus} that we should have got.')
resources.cores[node] = set(agent_cores[node][:env_nncpus])

for node in agent_cores:
if node not in env_ncores:
if node not in env_ncpus:
_logger.warning(
f'An agent is running on node {node} but the environment'
' does not list it as ours. It seems that the node\'s'
Expand Down
4 changes: 2 additions & 2 deletions libmuscle/python/libmuscle/native_instantiator/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@ def get_nodes() -> List[str]:
return parse_slurm_nodelist(nodelist)


def get_cores_per_node() -> List[int]:
"""Return the number of CPU cores per node.
def get_logical_cpus_per_node() -> List[int]:
"""Return the number of logical CPU cores per node.
This returns a list with the number of cores of each node in the result of
get_nodes(), which gets read from SLURM_JOB_CPUS_PER_NODE.
Expand Down

0 comments on commit 2601d87

Please sign in to comment.