Skip to content

Commit

Permalink
deduce is_sky_managed
Browse files Browse the repository at this point in the history
  • Loading branch information
zpoint committed Jan 8, 2025
1 parent 3f9525c commit ce45001
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 35 deletions.
62 changes: 37 additions & 25 deletions sky/data/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@
_STORAGE_LOG_FILE_NAME = 'storage_sync.log'


def _is_sky_managed_intermediate_bucket(bucket_name: str) -> bool:
return re.match(r'skypilot-filemounts-.+-[a-f0-9]{8}',
bucket_name) is not None


def get_cached_enabled_storage_clouds_or_refresh(
raise_if_no_cloud_access: bool = False) -> List[str]:
# This is a temporary solution until https://github.com/skypilot-org/skypilot/issues/1943 # pylint: disable=line-too-long
Expand Down Expand Up @@ -392,7 +397,7 @@ def upload(self) -> None:
"""
raise NotImplementedError

def delete(self, force_delete_bucket: bool = False) -> None:
def delete(self) -> None:
"""Removes the Storage from the cloud."""
raise NotImplementedError

Expand Down Expand Up @@ -1086,7 +1091,7 @@ def delete(self, store_type: Optional[StoreType] = None) -> None:
else:
global_user_state.set_storage_handle(self.name, self.handle)
elif self.force_delete:
store.delete(force_delete_bucket=True)
store.delete()
# Remove store from bookkeeping
del self.stores[store_type]
else:
Expand All @@ -1095,7 +1100,7 @@ def delete(self, store_type: Optional[StoreType] = None) -> None:
self.handle.remove_store(store)
store.delete()
elif self.force_delete:
store.delete(force_delete_bucket=True)
store.delete()
self.stores = {}
# Remove storage from global_user_state if present
global_user_state.remove_storage(self.name)
Expand Down Expand Up @@ -1364,7 +1369,9 @@ def initialize(self):
# object (i.e., did not exist in global_user_state) and we should
# set the is_sky_managed property.
# If is_sky_managed is specified, then we take no action.
self.is_sky_managed = is_new_bucket
self.is_sky_managed = (is_new_bucket or
_is_sky_managed_intermediate_bucket(
self.name))

def upload(self):
"""Uploads source to store bucket.
Expand Down Expand Up @@ -1395,9 +1402,8 @@ def upload(self):
raise exceptions.StorageUploadError(
f'Upload failed for store {self.name}') from e

def delete(self, force_delete_bucket: bool = False) -> None:
if (not force_delete_bucket and self._bucket_sub_path is not None and
not self.is_sky_managed):
def delete(self) -> None:
if self._bucket_sub_path is not None and not self.is_sky_managed:
return self._delete_sub_path()

deleted_by_skypilot = self._delete_s3_bucket(self.name)
Expand Down Expand Up @@ -1848,7 +1854,9 @@ def initialize(self):
# object (i.e., did not exist in global_user_state) and we should
# set the is_sky_managed property.
# If is_sky_managed is specified, then we take no action.
self.is_sky_managed = is_new_bucket
self.is_sky_managed = (is_new_bucket or
_is_sky_managed_intermediate_bucket(
self.name))

def upload(self):
"""Uploads source to store bucket.
Expand Down Expand Up @@ -1881,9 +1889,8 @@ def upload(self):
raise exceptions.StorageUploadError(
f'Upload failed for store {self.name}') from e

def delete(self, force_delete_bucket: bool = False) -> None:
if (not force_delete_bucket and self._bucket_sub_path is not None and
not self.is_sky_managed):
def delete(self) -> None:
if self._bucket_sub_path is not None and not self.is_sky_managed:
return self._delete_sub_path()

deleted_by_skypilot = self._delete_gcs_bucket(self.name)
Expand Down Expand Up @@ -2424,7 +2431,9 @@ def initialize(self):
# object (i.e., did not exist in global_user_state) and we should
# set the is_sky_managed property.
# If is_sky_managed is specified, then we take no action.
self.is_sky_managed = is_new_bucket
self.is_sky_managed = (is_new_bucket or
_is_sky_managed_intermediate_bucket(
self.name))

def _update_storage_account_name_and_resource(self):
self.storage_account_name, self.resource_group_name = (
Expand Down Expand Up @@ -2712,10 +2721,9 @@ def upload(self):
raise exceptions.StorageUploadError(
f'Upload failed for store {self.name}') from e

def delete(self, force_delete_bucket: bool = False) -> None:
def delete(self) -> None:
"""Deletes the storage."""
if (not force_delete_bucket and self._bucket_sub_path is not None and
not self.is_sky_managed):
if self._bucket_sub_path is not None and not self.is_sky_managed:
return self._delete_sub_path()

deleted_by_skypilot = self._delete_az_bucket(self.name)
Expand Down Expand Up @@ -3135,7 +3143,9 @@ def initialize(self):
# object (i.e., did not exist in global_user_state) and we should
# set the is_sky_managed property.
# If is_sky_managed is specified, then we take no action.
self.is_sky_managed = is_new_bucket
self.is_sky_managed = (is_new_bucket or
_is_sky_managed_intermediate_bucket(
self.name))

def upload(self):
"""Uploads source to store bucket.
Expand Down Expand Up @@ -3166,9 +3176,8 @@ def upload(self):
raise exceptions.StorageUploadError(
f'Upload failed for store {self.name}') from e

def delete(self, force_delete_bucket: bool = False) -> None:
if (not force_delete_bucket and self._bucket_sub_path is not None and
not self.is_sky_managed):
def delete(self) -> None:
if self._bucket_sub_path is not None and not self.is_sky_managed:
return self._delete_sub_path()

deleted_by_skypilot = self._delete_r2_bucket(self.name)
Expand Down Expand Up @@ -3615,7 +3624,9 @@ def initialize(self):
# object (i.e., did not exist in global_user_state) and we should
# set the is_sky_managed property.
# If is_sky_managed is specified, then we take no action.
self.is_sky_managed = is_new_bucket
self.is_sky_managed = (is_new_bucket or
_is_sky_managed_intermediate_bucket(
self.name))

def upload(self):
"""Uploads files from local machine to bucket.
Expand Down Expand Up @@ -3649,9 +3660,8 @@ def upload(self):
raise exceptions.StorageUploadError(
f'Upload failed for store {self.name}') from e

def delete(self, force_delete_bucket: bool = False) -> None:
if (not force_delete_bucket and self._bucket_sub_path is not None and
not self.is_sky_managed):
def delete(self) -> None:
if self._bucket_sub_path is not None and not self.is_sky_managed:
return self._delete_sub_path()

self._delete_cos_bucket()
Expand Down Expand Up @@ -4060,7 +4070,9 @@ def initialize(self):
# object (i.e., did not exist in global_user_state) and we should
# set the is_sky_managed property.
# If is_sky_managed is specified, then we take no action.
self.is_sky_managed = is_new_bucket
self.is_sky_managed = (is_new_bucket or
_is_sky_managed_intermediate_bucket(
self.name))

def upload(self):
"""Uploads source to store bucket.
Expand All @@ -4085,7 +4097,7 @@ def upload(self):
raise exceptions.StorageUploadError(
f'Upload failed for store {self.name}') from e

def delete(self, force_delete_bucket: bool = False) -> None:
def delete(self) -> None:
deleted_by_skypilot = self._delete_oci_bucket(self.name)
if deleted_by_skypilot:
msg_str = f'Deleted OCI bucket {self.name}.'
Expand Down
12 changes: 2 additions & 10 deletions sky/utils/controller_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,11 +700,6 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None:
# we store all these files in same bucket from config.
bucket_wth_prefix = skypilot_config.get_nested(('jobs', 'bucket'), None)
store_kwargs: Dict[str, Any] = {}
# Controllers don't have the knowledge of whether the bucket is managed by
# sky or not, By default we consider the sky create and managed the
# intermediate bucket so we let controller delete the buckets after job
# finishes.
force_delete = True
if bucket_wth_prefix is None:
store_type = store_cls = sub_path = None
storage_account_name = region = None
Expand All @@ -718,8 +713,7 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None:
store_kwargs['storage_account_name'] = storage_account_name
if region is not None:
store_kwargs['region'] = region
# If the bucket is not managed by sky, we should not force delete it.
force_delete = False

# Step 1: Translate the workdir to SkyPilot storage.
new_storage_mounts = {}
if task.workdir is not None:
Expand Down Expand Up @@ -751,7 +745,6 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None:
mode=storage_lib.StorageMode.COPY,
stores=stores,
_bucket_sub_path=bucket_sub_path)
storage_obj.force_delete = force_delete
new_storage_mounts[constants.SKY_REMOTE_WORKDIR] = storage_obj
# Check of the existence of the workdir in file_mounts is done in
# the task construction.
Expand Down Expand Up @@ -789,7 +782,6 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None:
mode=storage_lib.StorageMode.COPY,
stores=stores,
_bucket_sub_path=bucket_sub_path)
storage_obj.force_delete = force_delete
new_storage_mounts[dst] = storage_obj
logger.info(f' {colorama.Style.DIM}Folder : {src!r} '
f'-> storage: {bucket_name!r}.{colorama.Style.RESET_ALL}')
Expand Down Expand Up @@ -829,7 +821,7 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None:
mode=storage_lib.StorageMode.MOUNT,
stores=stores,
_bucket_sub_path=file_mounts_tmp_subpath)
storage_obj.force_delete = force_delete

new_storage_mounts[file_mount_remote_tmp_dir] = storage_obj
if file_mount_remote_tmp_dir in original_storage_mounts:
with ux_utils.print_exception_no_traceback():
Expand Down

0 comments on commit ce45001

Please sign in to comment.