Skip to content

Commit

Permalink
support -elastic-mode=reload (#363)
Browse files Browse the repository at this point in the history
* Fix linking (#361)

* add flag

* remove implicit init

* add setup script for mindspore

* add if linux for flag

* add check for KUNGFU_NO_AUTO_INIT

* init :: -watch-mode reload

* merge

* revert

Co-authored-by: Marcel Wagenländer <[email protected]>
  • Loading branch information
lgarithm and marwage authored Aug 31, 2021
1 parent 874df34 commit 1548ad2
Show file tree
Hide file tree
Showing 20 changed files with 488 additions and 17 deletions.
11 changes: 11 additions & 0 deletions INSTALL
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/sh
set -e

# go install -v ./...

PYTHON=$(which python3)
echo "Using $PYTHON"

$PYTHON -m pip install --no-index -U .

echo "done $0"
4 changes: 4 additions & 0 deletions configure
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ add_cmake_flags() {
add_cmake_flag NCCL_HOME ${NCCL_HOME}
add_cmake_flag KUNGFU_ENABLE_NCCL 1
fi

if [ $(uname) = "Linux" ]; then
add_cmake_flag KUNGFU_ENABLE_AFFINITY 1
fi
}

cleanup_cmake_cache() {
Expand Down
28 changes: 28 additions & 0 deletions scripts/tests/test-elastic-reload.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/sh
set -e

PYTHON=$(which python3)
echo "Using $PYTHON"

runner_flags() {
echo -logdir logs
echo -q

echo -w
echo -elastic-mode reload
echo -builtin-config-port 9100
echo -config-server http://127.0.0.1:9100/config
}

elastic_run_n() {
local init_np=$1
shift
$PYTHON -m kungfu.cmd $(runner_flags) -np $init_np $@
}

app() {
echo python3 tests/python/integration/test_elastic_reload.py
echo --max-step 100
}

elastic_run_n 1 $(app)
111 changes: 111 additions & 0 deletions setup_mindspore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import multiprocessing
import os
import subprocess
import sys
import sysconfig

from setuptools import Extension, find_packages, setup
from setuptools.command.build_ext import build_ext


class CMakeExtension(Extension):
def __init__(self, sourcedir):
Extension.__init__(self, '', sources=[])
self.sourcedir = os.path.abspath(sourcedir)


def ensure_absent(filepath):
if os.path.isfile(filepath):
os.remove(filepath)


def cmake_flag(k, v):
return '-D%s=%s' % (k, str(v))


def pass_env(keys):
for key in keys:
val = os.getenv(key)
if val:
print('Using %s=%s from env' % (key, val))
yield cmake_flag(key, val)


class CMakeBuild(build_ext):
def build_extension(self, ext):
if not os.path.exists(self.build_temp):
os.makedirs(self.build_temp)

extdir = self.get_ext_fullpath(ext.name)
if not os.path.exists(extdir):
os.makedirs(extdir)

install_prefix = os.path.abspath(os.path.dirname(extdir))
executable_dir = os.path.abspath(os.path.dirname(sys.executable))

# FIXME: do it in a more standard way
if '--user' in sys.argv:
executable_dir = os.path.join(os.getenv('HOME'), '.local/bin')

cmake_args = [
# FIXME: use CMAKE_LIBRARY_OUTPUT_DIRECTORY
cmake_flag('LIBRARY_OUTPUT_PATH',
os.path.join(install_prefix, 'kungfu')),
cmake_flag('KUNGFU_BUILD_TF_OPS', 0),
cmake_flag('CMAKE_RUNTIME_OUTPUT_DIRECTORY', executable_dir),
cmake_flag('PYTHON', sys.executable),
] + list(
pass_env([
'KUNGFU_ENABLE_TRACE',
'CMAKE_VERBOSE_MAKEFILE',
'CMAKE_EXPORT_COMPILE_COMMANDS',
]))

if sys.platform == 'linux':
cmake_args.append(cmake_flag('KUNGFU_ENABLE_AFFINITY', 1))
cmake_args.extend(pass_env(['KUNGFU_ENABLE_HWLOC']))

use_nccl = os.getenv('KUNGFU_ENABLE_NCCL')
if use_nccl:
cmake_args.append(cmake_flag('KUNGFU_ENABLE_NCCL', use_nccl))
nccl_home = os.getenv('NCCL_HOME')
if nccl_home:
cmake_args.append(cmake_flag('NCCL_HOME', nccl_home))

ensure_absent(os.path.join(ext.sourcedir, 'CMakeCache.txt'))

subprocess.check_call(
['cmake', ext.sourcedir] + cmake_args,
cwd=self.build_temp,
)
if (os.getenv('CMAKE_BUILD_PARALLEL_LEVEL') is None
and os.getenv('READTHEDOCS') is None):
os.environ['CMAKE_BUILD_PARALLEL_LEVEL'] = str(
multiprocessing.cpu_count())
subprocess.check_call(
['cmake', '--build', '.'],
cwd=self.build_temp,
)


package_dir = './srcs/python'

setup(
name='kungfu',
version='0.2.2',
package_dir={'': package_dir},
packages=find_packages(package_dir),
description='KungFu distributed machine learning framework',
url='https://github.com/lsds/KungFu',
ext_modules=[
CMakeExtension('.'),
],
cmdclass=dict(build_ext=CMakeBuild),
setup_requires=[],
install_requires=[],
entry_points={
'console_scripts': [
'kungfu-run = kungfu.cmd:run',
],
},
)
4 changes: 4 additions & 0 deletions srcs/cpp/include/kungfu/peer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class Peer
// https://www.open-mpi.org/doc/v4.0/man3/MPI_Comm_size.3.php
int Size() const;

int InitProgress() const;

int LocalRank() const;
int LocalSize() const;
int HostCount() const;
Expand Down Expand Up @@ -180,6 +182,8 @@ class Peer
// control APIs
int ResizeCluster(const uint32_t new_size, bool *changed, bool *detached);
int ResizeClusterFromURL(bool *changed, bool *detached);
// will reload all if changed
int ChangeCluster(const uint64_t process, bool *changed, bool *detached);

int ProposeNewSize(int new_size);

Expand Down
3 changes: 3 additions & 0 deletions srcs/cpp/include/kungfu/python/c_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ extern void kungfu_print_strategy_stats();
// unstable APIs
extern void kungfu_resize(int n, char *changed, char *detached);
extern void kungfu_resize_from_url(char *p_changed, char *p_detached);
extern void kungfu_change_cluster(int progress /* FIXME: pass uint64_t */,
char *p_changed, char *p_detached);
extern int kungfu_init_progress();

#ifdef __cplusplus
}
Expand Down
15 changes: 15 additions & 0 deletions srcs/cpp/src/peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,21 @@ int Peer::ResizeClusterFromURL(bool *changed, bool *detached)
reinterpret_cast<char *>(detached));
}

int Peer::ChangeCluster(const uint64_t progress, bool *changed, bool *detached)
{
static_assert(sizeof(bool) == sizeof(char), "");
return GoKungfuChangeCluster(GoInt(progress),
reinterpret_cast<char *>(changed),
reinterpret_cast<char *>(detached));
}

int Peer::InitProgress() const
{
int progress;
GoKungfuInitProgress(&progress);
return progress;
}

void Peer::LogStats() { GoLogStats(); }

void Peer::CalcStats() { GoCalcStats(); }
Expand Down
13 changes: 13 additions & 0 deletions srcs/cpp/src/python/c_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,16 @@ void kungfu_resize_from_url(char *p_changed, char *p_detached)
reinterpret_cast<bool *>(p_changed),
reinterpret_cast<bool *>(p_detached));
}

void kungfu_change_cluster(int progress, char *p_changed, char *p_detached)
{
static_assert(sizeof(bool) == sizeof(char), "");
kungfu::Peer::GetDefault()->ChangeCluster(
progress, reinterpret_cast<bool *>(p_changed),
reinterpret_cast<bool *>(p_detached));
}

int kungfu_init_progress()
{
return kungfu::Peer::GetDefault()->InitProgress();
}
1 change: 1 addition & 0 deletions srcs/go/cmd/kungfu-run/app/kungfu-run.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func Main(args []string) {
Args: f.Args,
LogDir: f.LogDir,
AllowNVLink: f.AllowNVLink,
ElasticMode: f.ElasticMode,
}
ctx, cancel := context.WithCancel(context.Background())
trap(cancel)
Expand Down
62 changes: 61 additions & 1 deletion srcs/go/kungfu/env/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,38 @@ package env
import (
"fmt"
"os"
"strconv"

kb "github.com/lsds/KungFu/srcs/go/kungfu/base"
"github.com/lsds/KungFu/srcs/go/plan"
)

type ElasticMode int

var (
ElasticModeDefault ElasticMode = 0
ElasticModeReload ElasticMode = 1
)

// Set implements flags.Value::Set
func (e *ElasticMode) Set(val string) error {
value, err := parseElasticMode(val)
if err != nil {
return err
}
*e = *value
return nil
}

var elasticModeNames = map[ElasticMode]string{
ElasticModeDefault: "",
ElasticModeReload: "reload",
}

func (e ElasticMode) String() string {
return elasticModeNames[e]
}

type Config struct {
ConfigServer string
Parent plan.PeerID
Expand All @@ -16,9 +43,11 @@ type Config struct {
Strategy kb.Strategy

InitClusterVersion string
InitProgress uint64
InitPeers plan.PeerList

Single bool
Single bool
ElasticMode ElasticMode
}

func ParseConfigFromEnv() (*Config, error) {
Expand All @@ -45,6 +74,14 @@ func ParseConfigFromEnv() (*Config, error) {
if err != nil {
return nil, err
}
elasticMode, err := parseElasticMode(os.Getenv(ElasticModeEnvKey))
if err != nil {
return nil, err
}
initProgress, err := parseInitProgress(os.Getenv(InitProgressEnvKey))
if err != nil {
return nil, err
}
return &Config{
ConfigServer: getConfigServerFromEnv(),
Self: *self,
Expand All @@ -53,9 +90,32 @@ func ParseConfigFromEnv() (*Config, error) {
InitPeers: initPeers,
Strategy: *strategy,
InitClusterVersion: os.Getenv(InitClusterVersionEnvKey),
ElasticMode: *elasticMode,
InitProgress: initProgress,
}, nil
}

func parseElasticMode(val string) (*ElasticMode, error) {
if val == "" {
return &ElasticModeDefault, nil
}
if val == "reload" {
return &ElasticModeReload, nil
}
return nil, fmt.Errorf("invalid %s: %q", ElasticModeEnvKey, val)
}

func parseInitProgress(val string) (uint64, error) {
if val == "" {
return 0, nil
}
n, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return 0, err
}
return uint64(n), nil
}

func SingleMachineEnv(rank, size int) (*Config, error) {
pl, err := plan.DefaultHostList.GenPeerList(size, plan.DefaultPortRange)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions srcs/go/kungfu/env/envs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ const (
RunnerListEnvKey = `KUNGFU_INIT_RUNNERS`
SelfSpecEnvKey = `KUNGFU_SELF_SPEC` // self spec should never change during the life of a process
AllReduceStrategyEnvKey = `KUNGFU_ALLREDUCE_STRATEGY`
ElasticModeEnvKey = `KUNGFU_ELASTIC_MODE`
InitProgressEnvKey = `KUNGFU_INIT_PROGRESS`

JobStartTimestamp = `KUNGFU_JOB_START_TIMESTAMP`
ProcStartTimestamp = `KUNGFU_PROC_START_TIMESTAMP`
Expand Down
7 changes: 5 additions & 2 deletions srcs/go/kungfu/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ type Job struct {
LogDir string

AllowNVLink bool
ElasticMode env.ElasticMode
}

var warnCudaOption = new(sync.Once)

func (j Job) NewProc(peer plan.PeerID, gpuID int, initClusterVersion int, cluster plan.Cluster) proc.Proc {
func (j Job) NewProc(peer plan.PeerID, gpuID int, initClusterVersion int, cluster plan.Cluster, initProgress uint64) proc.Proc {
envs := proc.Envs{
env.JobStartTimestamp: strconv.FormatInt(j.StartTime.Unix(), 10),
env.ProcStartTimestamp: strconv.FormatInt(time.Now().Unix(), 10),
Expand All @@ -43,6 +44,8 @@ func (j Job) NewProc(peer plan.PeerID, gpuID int, initClusterVersion int, cluste
env.AllReduceStrategyEnvKey: j.Strategy.String(),
env.ConfigServerEnvKey: j.ConfigServer,
env.AllowNvLink: fmt.Sprintf("%v", j.AllowNVLink),
env.ElasticModeEnvKey: j.ElasticMode.String(),
env.InitProgressEnvKey: strconv.FormatInt(int64(initProgress), 10),
}
if len(j.ConfigServer) > 0 {
envs[env.ConfigServerEnvKey] = j.ConfigServer
Expand Down Expand Up @@ -80,7 +83,7 @@ func (j Job) CreateProcs(cluster plan.Cluster, host uint32) []proc.Proc {
var ps []proc.Proc
for _, self := range cluster.Workers.On(host) {
localRank, _ := cluster.Workers.LocalRank(self)
proc := j.NewProc(self, localRank, 0, cluster)
proc := j.NewProc(self, localRank, 0, cluster, 0)
ps = append(ps, proc)
}
return ps
Expand Down
Loading

0 comments on commit 1548ad2

Please sign in to comment.