Skip to content

Commit

Permalink
Add prefix parameter for object storage, closes #568
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmezzetti committed Oct 3, 2023
1 parent 68fc138 commit 46a8ef8
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 13 deletions.
44 changes: 35 additions & 9 deletions src/python/txtai/cloud/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def metadata(self, path=None):
try:
# If this is an archive path, check if file exists
if self.isarchive(path):
return self.client.get_object(self.config["container"], os.path.basename(path))
return self.client.get_object(self.config["container"], self.objectname(path))

# Otherwise check if container exists
return self.client.get_container(self.config["container"])
Expand All @@ -69,17 +69,22 @@ def metadata(self, path=None):
def load(self, path=None):
# Download archive file
if self.isarchive(path):
obj = self.client.get_object(self.config["container"], os.path.basename(path))
obj = self.client.get_object(self.config["container"], self.objectname(path))
obj.download(path, overwrite_existing=True)

# Download all files in container
# Download files in container. Optionally filter with a provided prefix.
else:
# Create local directory, if necessary
os.makedirs(path, exist_ok=True)

container = self.client.get_container(self.config["container"])
for obj in container.list_objects():
obj.download(os.path.join(path, obj.name), overwrite_existing=True)
for obj in container.list_objects(prefix=self.config.get("prefix")):
# Derive local path and directory
localpath = os.path.join(path, obj.name)
directory = os.path.dirname(localpath)

# Create local directory, if necessary
os.makedirs(directory, exist_ok=True)

# Download file locally
obj.download(localpath, overwrite_existing=True)

return path

Expand All @@ -93,4 +98,25 @@ def save(self, path):
# Upload files
for f in self.listfiles(path):
with open(f, "rb") as iterator:
self.client.upload_object_via_stream(iterator=iterator, container=container, object_name=os.path.basename(f))
self.client.upload_object_via_stream(iterator=iterator, container=container, object_name=self.objectname(f))

def objectname(self, name):
"""
Derives an object name. This method checks if a prefix configuration parameter is present and combines
it with the input name parameter.
Args:
name: input name
Returns:
object name
"""

# Get base name
name = os.path.basename(name)

# Get optional prefix/folder
prefix = self.config.get("prefix")

# Prepend prefix, if applicable
return f"{prefix}/{name}" if prefix else name
17 changes: 13 additions & 4 deletions src/python/txtai/workflow/task/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,29 @@ def list(self, element):
provider = re.sub(StorageTask.PREFIX, r"\1", element.lower())
path = re.sub(StorageTask.PATH, r"\1", element)

key, container = os.path.dirname(path), os.path.basename(path)
# Load key and secret, if applicable
key = self.key if self.key is not None else os.environ.get("ACCESS_KEY")
secret = self.secret if self.secret is not None else os.environ.get("ACCESS_SECRET")

# Parse key and container
key, container = (os.path.dirname(path), os.path.basename(path)) if key is None else (key, path)

# Parse optional prefix from container
parts = container.split("/", 1)
container, prefix = (parts[0], parts[1]) if len(parts) > 1 else (container, None)

# Get driver for provider
driver = get_driver(provider)

# Get client connection
client = driver(
key if key else self.key if self.key else os.environ.get("ACCESS_KEY"),
self.secret if self.secret else os.environ.get("ACCESS_SECRET"),
key,
secret,
host=self.host,
port=self.port,
token=self.token,
region=self.region,
)

container = client.get_container(container_name=container)
return [client.get_object_cdn_url(obj) for obj in client.list_container_objects(container=container)]
return [client.get_object_cdn_url(obj) for obj in client.list_container_objects(container=container, prefix=prefix)]

0 comments on commit 46a8ef8

Please sign in to comment.