Skip to content

Commit

Permalink
Fixes for localhost tests
Browse files Browse the repository at this point in the history
  • Loading branch information
JosepSampe committed May 12, 2024
1 parent ac3294b commit 5e98883
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 90 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ To contribute a patch:
3. Make sure that your code passes the functional tests. See the [Functional testing](#functional-testing) section below.
4. Make sure that your code passes the linter. Install `flake8` with `pip3 install flake8` and run the next command until you don't see any linitng error:
```bash
$ flake8 lithops --count --max-line-length=180 --statistics --ignore W605,W503
flake8 lithops --count --max-line-length=180 --statistics --ignore W605,W503
```
6. Add new unit tests for your code.

Expand Down
4 changes: 2 additions & 2 deletions lithops/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
SERVERLESS, STANDALONE
from lithops.utils import setup_lithops_logger, \
is_lithops_worker, create_executor_id, create_futures_list
from lithops.localhost import LocalhostHandler, LocalhostHandlerV2
from lithops.localhost import LocalhostHandlerV1, LocalhostHandlerV2
from lithops.standalone import StandaloneHandler
from lithops.serverless import ServerlessHandler
from lithops.storage.utils import create_job_key, CloudObject
Expand Down Expand Up @@ -119,7 +119,7 @@ def __init__(
if self.mode == LOCALHOST:
localhost_config = extract_localhost_config(self.config)
if localhost_config.get('version', 1) == 1:
self.compute_handler = LocalhostHandler(localhost_config)
self.compute_handler = LocalhostHandlerV1(localhost_config)
else:
self.compute_handler = LocalhostHandlerV2(localhost_config)
elif self.mode == SERVERLESS:
Expand Down
4 changes: 2 additions & 2 deletions lithops/libs/inspect/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def _shadowed_dict_from_mro_tuple(mro):
dunder_dict = _get_dunder_dict_of_class(entry)
if '__dict__' in dunder_dict:
class_dict = dunder_dict['__dict__']
if not (type(class_dict) is types.GetSetDescriptorType
if not (isinstance(class_dict, types.GetSetDescriptorType)
and class_dict.__name__ == "__dict__"
and class_dict.__objclass__ is entry):
return class_dict
Expand Down Expand Up @@ -128,7 +128,7 @@ def getattr_static(obj, attr, default=_sentinel):
if type not in _static_getmro(objtype):
klass = objtype
dict_attr = _shadowed_dict(klass)
if (dict_attr is _sentinel or type(dict_attr) is types.MemberDescriptorType):
if (dict_attr is _sentinel or isinstance(dict_attr, types.MemberDescriptorType)):
instance_result = _check_instance(obj, attr)
else:
klass = obj
Expand Down
7 changes: 5 additions & 2 deletions lithops/localhost/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from .v1.localhost import LocalhostHandler
from .v1.localhost import LocalhostHandlerV1
from .v2.localhost import LocalhostHandlerV2

# Set the default localhost handler
LocalhostHandler = LocalhostHandlerV1

__all__ = [
'LocalhostHandler',
'LocalhostHandlerV1',
'LocalhostHandlerV2'
]
141 changes: 76 additions & 65 deletions lithops/localhost/v1/localhost.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from lithops.version import __version__
from lithops.constants import (
LOCALHOST_RUNTIME_DEFAULT,
RN_LOG_FILE,
TEMP_DIR,
USER_TEMP_DIR,
LITHOPS_TEMP_DIR,
Expand All @@ -41,16 +40,17 @@
BackendType,
get_docker_path,
is_lithops_worker,
is_podman,
is_unix_system
)

logger = logging.getLogger(__name__)

RUNNER = os.path.join(LITHOPS_TEMP_DIR, 'localhost-runner.py')
RUNNER_FILE = os.path.join(LITHOPS_TEMP_DIR, 'localhost-runner.py')
LITHOPS_LOCATION = os.path.dirname(os.path.abspath(lithops.__file__))


class LocalhostHandler:
class LocalhostHandlerV1:
"""
A localhostHandler object is used by invokers and other components to access
underlying localhost backend without exposing the implementation details.
Expand All @@ -60,11 +60,11 @@ def __init__(self, config):
logger.debug('Creating Localhost compute client')
self.config = config
self.runtime_name = self.config.get('runtime', LOCALHOST_RUNTIME_DEFAULT)
self.env = None

self.env = None
self.job_queue = queue.Queue()
self.job_manager = None
self.should_run = True
self.invocation_in_progress = False

msg = COMPUTE_CLI_MSG.format('Localhost compute v1')
logger.info(f"{msg}")
Expand All @@ -80,7 +80,8 @@ def init(self):
Init tasks for localhost
"""
default_env = self.runtime_name.startswith(('python', '/'))
self.env = DefaultEnv(self.config) if default_env else DockerEnv(self.config)
self.env = DefaultEnvironment(self.config) if default_env \
else ContainerEnvironment(self.config)
self.env.setup()

def start_manager(self):
Expand All @@ -90,26 +91,31 @@ def start_manager(self):

def job_manager():
logger.debug('Staring localhost job manager')
self.should_run = True

while self.should_run:
while True:
job_payload, job_filename = self.job_queue.get()

if job_payload is None and job_filename is None:
if self.job_queue.empty():
break
else:
if self.invocation_in_progress or not self.job_queue.empty():
continue
else:
break

executor_id = job_payload['executor_id']
job_id = job_payload['job_id']
total_calls = len(job_payload['call_ids'])
job_key = job_payload['job_key']
logger.debug(f'ExecutorID {executor_id} | JobID {job_id} - Running '
f'{total_calls} activations in the localhost worker')
process = self.env.run(job_key, job_filename)
process = self.env.run_job(job_key, job_filename)
process.communicate() # blocks until the process finishes
logger.debug(f'ExecutorID {executor_id} | JobID {job_id} - Execution finished')

if self.job_queue.empty():
break
if self.invocation_in_progress:
continue
else:
break

self.job_manager = None
logger.debug("Localhost job manager finished")
Expand All @@ -129,6 +135,7 @@ def invoke(self, job_payload):
"""
Run the job description against the selected environment
"""
self.invocation_in_progress = True
executor_id = job_payload['executor_id']
job_id = job_payload['job_id']

Expand All @@ -138,6 +145,7 @@ def invoke(self, job_payload):
self.job_queue.put((job_payload, job_filename))

self.start_manager()
self.invocation_in_progress = False

def get_runtime_key(self, runtime_name, *args):
"""
Expand Down Expand Up @@ -169,8 +177,6 @@ def clear(self, job_keys=None, exception=None):
"""
Kills all running jobs processes
"""
self.should_run = False

while not self.job_queue.empty():
try:
self.job_queue.get(False)
Expand All @@ -182,27 +188,26 @@ def clear(self, job_keys=None, exception=None):
if self.job_manager:
self.job_queue.put((None, None))

self.should_run = True


class BaseEnv:
class ExecutionEnvironment:
"""
Base environment class for shared methods
"""

def __init__(self, config):
self.config = config
self.runtime_name = self.config['runtime']
self.is_unix_system = is_unix_system()
self.jobs = {} # dict to store executed jobs (job_keys) and PIDs

def _copy_lithops_to_tmp(self):
if is_lithops_worker() and os.path.isfile(RUNNER):
if is_lithops_worker() and os.path.isfile(RUNNER_FILE):
return
os.makedirs(LITHOPS_TEMP_DIR, exist_ok=True)
shutil.rmtree(os.path.join(LITHOPS_TEMP_DIR, 'lithops'), ignore_errors=True)
shutil.copytree(LITHOPS_LOCATION, os.path.join(LITHOPS_TEMP_DIR, 'lithops'))
src_handler = os.path.join(LITHOPS_LOCATION, 'localhost', 'v1', 'runner.py')
copyfile(src_handler, RUNNER)
copyfile(src_handler, RUNNER_FILE)

def prepare_job_file(self, job_payload):
"""
Expand All @@ -222,7 +227,7 @@ def prepare_job_file(self, job_payload):
with open(local_job_filename, 'w') as jl:
json.dump(job_payload, jl, default=str)

if isinstance(self, DockerEnv):
if isinstance(self, ContainerEnvironment):
job_filename = f'{docker_job_dir}/{job_file}'
else:
job_filename = local_job_filename
Expand All @@ -238,7 +243,7 @@ def kill_job(job_key):
if self.jobs[job_key].poll() is None:
logger.debug(f'Killing job {job_key} with PID {self.jobs[job_key].pid}')
PID = self.jobs[job_key].pid
if is_unix_system():
if self.is_unix_system:
PGID = os.getpgid(PID)
os.killpg(PGID, signal.SIGKILL)
else:
Expand All @@ -254,105 +259,111 @@ def kill_job(job_key):
pass


class DefaultEnv(BaseEnv):
class DefaultEnvironment(ExecutionEnvironment):
"""
Default environment uses current python3 installation
"""

def __init__(self, config):
super().__init__(config)
logger.debug(f'Starting python environment for {self.runtime_name}')
logger.debug(f'Starting default environment for {self.runtime_name}')

def setup(self):
logger.debug('Setting up python environment')
logger.debug('Setting up default environment')
self._copy_lithops_to_tmp()

def get_metadata(self):
if not os.path.isfile(RUNNER):
if not os.path.isfile(RUNNER_FILE):
self.setup()

logger.debug(f"Extracting runtime metadata from: {self.runtime_name}")
cmd = [self.runtime_name, RUNNER, 'get_metadata']
process = sp.run(cmd, check=True, stdout=sp.PIPE, universal_newlines=True,
start_new_session=True)
logger.debug(f"Extracting metadata from: {self.runtime_name}")
cmd = [self.runtime_name, RUNNER_FILE, 'get_metadata']
process = sp.run(
cmd, check=True,
stdout=sp.PIPE,
universal_newlines=True,
start_new_session=True
)
runtime_meta = json.loads(process.stdout.strip())
return runtime_meta

def run(self, job_key, job_filename):
def run_job(self, job_key, job_filename):
"""
Runs a job
"""
if not os.path.isfile(RUNNER):
if not os.path.isfile(RUNNER_FILE):
self.setup()

cmd = [self.runtime_name, RUNNER, 'run_job', job_filename]
log = open(RN_LOG_FILE, 'a')
process = sp.Popen(cmd, stdout=log, stderr=log, start_new_session=True)
cmd = [self.runtime_name, RUNNER_FILE, 'run_job', job_filename]
process = sp.Popen(cmd, start_new_session=True)
self.jobs[job_key] = process

return process


class DockerEnv(BaseEnv):
class ContainerEnvironment(ExecutionEnvironment):
"""
Docker environment uses a docker runtime image
"""

def __init__(self, config):
super().__init__(config)
logger.debug(f'Starting container environment for {self.runtime_name}')
self.use_gpu = self.config.get('use_gpu', False)
logger.debug(f'Starting docker environment for {self.runtime_name}')
self.uid = os.getuid() if is_unix_system() else None
self.gid = os.getgid() if is_unix_system() else None
self.docker_path = get_docker_path()
self.is_podman = is_podman(self.docker_path)
self.uid = os.getuid() if self.is_unix_system else None
self.gid = os.getgid() if self.is_unix_system else None

def setup(self):
logger.debug('Setting up Docker environment')
logger.debug('Setting up container environment')
self._copy_lithops_to_tmp()
if self.config.get('pull_runtime', False):
logger.debug('Pulling Docker runtime {}'.format(self.runtime_name))
sp.run(shlex.split(f'docker pull {self.runtime_name}'), check=True,
stdout=sp.PIPE, universal_newlines=True)
logger.debug(f'Pulling runtime {self.runtime_name}')
sp.run(
shlex.split(f'docker pull {self.runtime_name}'), check=True,
stdout=sp.PIPE, universal_newlines=True
)

def get_metadata(self):
if not os.path.isfile(RUNNER):
if not os.path.isfile(RUNNER_FILE):
self.setup()

logger.debug(f"Extracting runtime metadata from: {self.runtime_name}")
logger.debug(f"Extracting metadata from: {self.runtime_name}")

tmp_path = Path(TEMP_DIR).as_posix()
docker_path = get_docker_path()

cmd = f'{docker_path} run --name lithops_metadata '
cmd += f'--user {self.uid}:{self.gid} ' if is_unix_system() else ''
cmd = f'{self.docker_path} run --name lithops_metadata '
cmd += f'--user {self.uid}:{self.gid} ' if self.is_unix_system and not self.is_podman else ''
cmd += f'--env USER={os.getenv("USER", "root")} '
cmd += f'--rm -v {tmp_path}:/tmp --entrypoint "python3" '
cmd += f'{self.runtime_name} /tmp/{USER_TEMP_DIR}/localhost-runner.py get_metadata'

process = sp.run(shlex.split(cmd), check=True, stdout=sp.PIPE,
universal_newlines=True, start_new_session=True)
process = sp.run(
shlex.split(cmd), check=True, stdout=sp.PIPE,
universal_newlines=True, start_new_session=True
)
runtime_meta = json.loads(process.stdout.strip())

return runtime_meta

def run(self, job_key, job_filename):
def run_job(self, job_key, job_filename):
"""
Runs a job
"""
if not os.path.isfile(RUNNER):
if not os.path.isfile(RUNNER_FILE):
self.setup()

tmp_path = Path(TEMP_DIR).as_posix()
docker_path = get_docker_path()

cmd = f'{docker_path} run --name lithops_{job_key} '
cmd = f'{self.docker_path} run --name lithops_{job_key} '
cmd += '--gpus all ' if self.use_gpu else ''
cmd += f'--user {self.uid}:{self.gid} ' if is_unix_system() else ''
cmd += f'--user {self.uid}:{self.gid} ' if self.is_unix_system and not self.is_podman else ''
cmd += f'--env USER={os.getenv("USER", "root")} '
cmd += f'--rm -v {tmp_path}:/tmp --entrypoint "python3" '
cmd += f'{self.runtime_name} /tmp/{USER_TEMP_DIR}/localhost-runner.py run_job {job_filename}'

log = open(RN_LOG_FILE, 'a')
process = sp.Popen(shlex.split(cmd), stdout=log, stderr=log, start_new_session=True)
process = sp.Popen(shlex.split(cmd), start_new_session=True)
self.jobs[job_key] = process

return process
Expand All @@ -361,12 +372,12 @@ def stop(self, job_keys=None):
"""
Stops running containers
"""
if job_keys:
for job_key in job_keys:
sp.Popen(shlex.split(f'docker rm -f lithops_{job_key}'),
stdout=sp.DEVNULL, stderr=sp.DEVNULL)
else:
for job_key in self.jobs:
sp.Popen(shlex.split(f'docker rm -f lithops_{job_key}'),
stdout=sp.DEVNULL, stderr=sp.DEVNULL)
jk_to_delete = job_keys or list(self.jobs.keys())

for job_key in jk_to_delete:
sp.Popen(
shlex.split(f'{self.docker_path} rm -f lithops_{job_key}'),
stdout=sp.DEVNULL, stderr=sp.DEVNULL
)

super().stop(job_keys)
Loading

0 comments on commit 5e98883

Please sign in to comment.