diff --git a/src/python/txtai/cloud/storage.py b/src/python/txtai/cloud/storage.py index ac13b1ecd..e61a12767 100644 --- a/src/python/txtai/cloud/storage.py +++ b/src/python/txtai/cloud/storage.py @@ -59,7 +59,7 @@ def metadata(self, path=None): try: # If this is an archive path, check if file exists if self.isarchive(path): - return self.client.get_object(self.config["container"], os.path.basename(path)) + return self.client.get_object(self.config["container"], self.objectname(path)) # Otherwise check if container exists return self.client.get_container(self.config["container"]) @@ -69,17 +69,22 @@ def metadata(self, path=None): def load(self, path=None): # Download archive file if self.isarchive(path): - obj = self.client.get_object(self.config["container"], os.path.basename(path)) + obj = self.client.get_object(self.config["container"], self.objectname(path)) obj.download(path, overwrite_existing=True) - # Download all files in container + # Download files in container. Optionally filter with a provided prefix. else: - # Create local directory, if necessary - os.makedirs(path, exist_ok=True) - container = self.client.get_container(self.config["container"]) - for obj in container.list_objects(): - obj.download(os.path.join(path, obj.name), overwrite_existing=True) + for obj in container.list_objects(prefix=self.config.get("prefix")): + # Derive local path and directory + localpath = os.path.join(path, obj.name) + directory = os.path.dirname(localpath) + + # Create local directory, if necessary + os.makedirs(directory, exist_ok=True) + + # Download file locally + obj.download(localpath, overwrite_existing=True) return path @@ -93,4 +98,25 @@ def save(self, path): # Upload files for f in self.listfiles(path): with open(f, "rb") as iterator: - self.client.upload_object_via_stream(iterator=iterator, container=container, object_name=os.path.basename(f)) + self.client.upload_object_via_stream(iterator=iterator, container=container, object_name=self.objectname(f)) + + def objectname(self, name): + """ + Derives an object name. This method checks if a prefix configuration parameter is present and combines + it with the input name parameter. + + Args: + name: input name + + Returns: + object name + """ + + # Get base name + name = os.path.basename(name) + + # Get optional prefix/folder + prefix = self.config.get("prefix") + + # Prepend prefix, if applicable + return f"{prefix}/{name}" if prefix else name diff --git a/src/python/txtai/workflow/task/storage.py b/src/python/txtai/workflow/task/storage.py index 92d1508c6..efcece76b 100644 --- a/src/python/txtai/workflow/task/storage.py +++ b/src/python/txtai/workflow/task/storage.py @@ -89,15 +89,24 @@ def list(self, element): provider = re.sub(StorageTask.PREFIX, r"\1", element.lower()) path = re.sub(StorageTask.PATH, r"\1", element) - key, container = os.path.dirname(path), os.path.basename(path) + # Load key and secret, if applicable + key = self.key if self.key is not None else os.environ.get("ACCESS_KEY") + secret = self.secret if self.secret is not None else os.environ.get("ACCESS_SECRET") + + # Parse key and container + key, container = (os.path.dirname(path), os.path.basename(path)) if key is None else (key, path) + + # Parse optional prefix from container + parts = container.split("/", 1) + container, prefix = (parts[0], parts[1]) if len(parts) > 1 else (container, None) # Get driver for provider driver = get_driver(provider) # Get client connection client = driver( - key if key else self.key if self.key else os.environ.get("ACCESS_KEY"), - self.secret if self.secret else os.environ.get("ACCESS_SECRET"), + key, + secret, host=self.host, port=self.port, token=self.token, @@ -105,4 +114,4 @@ def list(self, element): ) container = client.get_container(container_name=container) - return [client.get_object_cdn_url(obj) for obj in client.list_container_objects(container=container)] + return [client.get_object_cdn_url(obj) for obj in client.list_container_objects(container=container, prefix=prefix)]