Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Result path conversion and upload to object store #104

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions alchemiscale/compute/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import json
from urllib.parse import urljoin
from functools import wraps
from pathlib import Path

import requests
from requests.auth import HTTPBasicAuth
Expand Down Expand Up @@ -108,3 +109,22 @@ def set_task_result(
pdr_sk = self._post_resource(f"tasks/{task}/results", data)

return ScopedKey.from_dict(pdr_sk)

def push_resultfile(
self, task: ScopedKey,
protocoldagresult: ProtocolDAGResult,
path: Path
) -> ScopedKey:
data = dict(
protocoldagresult=json.dumps(
protocoldagresult.to_dict(), cls=JSON_HANDLER.encoder
)
)

pdr_sk = self._post_resource(f"tasks/{task}/results", data)

return ScopedKey.from_dict(pdr_sk)


def check_exists_resultfile(self, location):
...
66 changes: 66 additions & 0 deletions alchemiscale/compute/filestorage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import pathlib
import shutil
import os
from typing import Union, Tuple, ContextManager

from gufe.storage.externalresource.base import ExternalStorage
from gufe.storage.storagemanager import StorageManager, SingleProcDAGContextManager

from gufe.storage.errors import (
MissingExternalResourceError, ChangedExternalResourceError
)

from ..models import ScopedKey
from .client import AlchemiscaleComputeClient


class ResultFileDAGContextManager(SingleProcDAGContextManager):
...


class ResultFileStorageManager(StorageManager):
...


class ResultFileStorage(ExternalStorage):

# need some way of making sure files land in the right place in object store
# so somehow we need to communicate this in every call to API service, so
# it can translate what is being requested into the true location in the
# object store

# task_sk may be the right thing here, but depends on if paths get shipped
# *before* or *after* ProtocolDAGResult returned by executor and uploaded
# it's better for us if paths get shipped *after*, since then we'll have
# the reference in the state store to use for routing into object store
def __init__(self, client: AlchemiscaleComputeClient, task_sk: ScopedKey):
self.client = client

def _iter_contents(self, prefix=""):
raise NotImplementedError()

def _store_bytes(self, location, byte_data):
"""
For implementers: This should be blocking, even if the storage
backend allows asynchronous storage.
"""
raise NotImplementedError()

def _store_path(self, location, path):
"""
For implementers: This should be blocking, even if the storage
backend allows asynchronous storage.
"""
raise NotImplementedError()

def _exists(self, location):
return self.client.check_exists_resultfile(location)

def _delete(self, location):
raise NotImplementedError()

def _get_filename(self, location):
raise NotImplementedError()

def _load_stream(self, location):
raise NotImplementedError()
19 changes: 14 additions & 5 deletions alchemiscale/compute/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from gufe.protocols.protocoldag import execute_DAG, ProtocolDAG, ProtocolDAGResult

from .client import AlchemiscaleComputeClient
from ..storage.models import Task, TaskHub, ComputeServiceID
from ..storage.models import Task, TaskHub, ComputeServiceID, ObjectStoreRef
from ..models import Scope, ScopedKey


Expand Down Expand Up @@ -289,13 +289,20 @@ def task_to_protocoldag(
)
return protocoldag, transformation, extends_protocoldagresult


def _paths_to_objectstorerefs(self, outputs, task, protocoldagresult):
if isinstance(outputs, dict):
return {key: self.paths_to_objectstorerefs(value, task, protocoldagresult) for key, value in outputs.items()}
elif isinstance(outputs, list):
return [self.paths_to_objectstorerefs(value, task, protocoldagresult) for value in outputs]
elif isinstance(outputs, Path):
return self.client.push_result_path(task, protocoldagresult, outputs)
else:
return outputs

def push_result(
self, task: ScopedKey, protocoldagresult: ProtocolDAGResult
) -> ScopedKey:
# TODO: this method should postprocess any paths,
# leaf nodes in DAG for blob results that should go to object store

# TODO: ship paths to object store

# finally, push ProtocolDAGResult
sk: ScopedKey = self.client.set_task_result(
Expand All @@ -304,6 +311,8 @@ def push_result(

return sk



def execute(self, task: ScopedKey) -> ScopedKey:
"""Executes given Task.

Expand Down
7 changes: 0 additions & 7 deletions alchemiscale/interface/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,13 +668,6 @@ def cancel_tasks(

return [ScopedKey.from_str(i) if i is not None else None for i in canceled_sks]

def _set_task_status(
self, task: ScopedKey, status: TaskStatusEnum
) -> Optional[ScopedKey]:
"""Set the status of a `Task`."""
task_sk = self._post_resource(f"/tasks/{task}/status", status.value)
return ScopedKey.from_str(task_sk) if task_sk is not None else None

async def _set_task_status(
self, tasks: List[ScopedKey], status: TaskStatusEnum
) -> List[Optional[ScopedKey]]:
Expand Down
58 changes: 49 additions & 9 deletions alchemiscale/storage/objectstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,20 +138,12 @@ def _get_bytes(self, location):
return self.resource.Object(self.bucket, key).get()["Body"].read()

def _store_path(self, location, path):
"""
For implementers: This should be blocking, even if the storage
backend allows asynchronous storage.
"""
"""
For implementers: This should be blocking, even if the storage
backend allows asynchronous storage.
"""
key = os.path.join(self.prefix, location)

with open(path, "rb") as f:
self.resource.Bucket(self.bucket).upload_fileobj(f, key)

b = self.resource.Bucket(self.bucket)
self.resource.Bucket(self.bucket).upload_file(path, key)

def _exists(self, location) -> bool:
from botocore.exceptions import ClientError
Expand Down Expand Up @@ -192,6 +184,54 @@ def _get_filename(self, location):

return url

def push_result_artifact(
self,
protocoldagresult_key: GufeKey,
scope: Scope,
) -> ProtocolDAGResultRef:
"""Push given `ProtocolDAGResult` to this `ObjectStore`.

Parameters
----------
protocoldagresult
ProtocolDAGResult to store.
scope
Scope to store ProtocolDAGResult under.

Returns
-------
ProtocolDAGResultRef
Reference to the serialized `ProtocolDAGResult` in the object store.

"""
ok = protocoldagresult.ok()
route = "results" if ok else "failures"

# build `location` based on gufe key
location = os.path.join(
"protocoldagresult",
*scope.to_tuple(),
protocoldagresult.transformation_key,
route,
protocoldagresult.key,
"obj.json",
)

# TODO: add support for compute client-side compressed protocoldagresults
pdr_jb = json.dumps(
protocoldagresult.to_dict(), cls=JSON_HANDLER.encoder
).encode("utf-8")
response = self._store_bytes(location, pdr_jb)

return ProtocolDAGResultRef(
location=location,
obj_key=protocoldagresult.key,
scope=scope,
ok=ok,
)



def push_protocoldagresult(
self,
protocoldagresult: ProtocolDAGResult,
Expand Down