Skip to content

Commit

Permalink
sdk/python: ObjectFile (File-Like Object)
Browse files Browse the repository at this point in the history
If applied, this MR will add:

- ObjectFile (file-like object extending BufferedIOBase) with support for retries and error recovery, including a notebook demo tests.
- iter_from_position in ObjectReader, which returns an iterator over each chunk of bytes in the object starting from the specificied byte position, including tests.
- Integration tests for ObjectReader.
- Updated Python SDK documentation (and fixes for minor related issues in AuthN documentation generation).

Signed-off-by: Ryan Koo <[email protected]>
  • Loading branch information
rkoo19 committed Sep 10, 2024
1 parent 2e9edee commit 6a9c9fc
Show file tree
Hide file tree
Showing 10 changed files with 831 additions and 15 deletions.
106 changes: 105 additions & 1 deletion docs/python_sdk.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ or see [https://github.com/NVIDIA/aistore/tree/main/python/aistore](https://gith
* [props](#object.Object.props)
* [head](#object.Object.head)
* [get](#object.Object.get)
* [as\_file](#object.Object.as_file)
* [get\_semantic\_url](#object.Object.get_semantic_url)
* [get\_url](#object.Object.get_url)
* [put\_content](#object.Object.put_content)
Expand All @@ -163,9 +164,12 @@ or see [https://github.com/NVIDIA/aistore/tree/main/python/aistore](https://gith
* [ObjectIterator](#object_iterator.ObjectIterator)
* [object\_reader](#object_reader)
* [ObjectReader](#object_reader.ObjectReader)
* [head](#object_reader.ObjectReader.head)
* [attributes](#object_reader.ObjectReader.attributes)
* [chunk\_size](#object_reader.ObjectReader.chunk_size)
* [read\_all](#object_reader.ObjectReader.read_all)
* [raw](#object_reader.ObjectReader.raw)
* [iter\_from\_position](#object_reader.ObjectReader.iter_from_position)
* [\_\_iter\_\_](#object_reader.ObjectReader.__iter__)

<a id="authn.authn_client.AuthNClient"></a>
Expand Down Expand Up @@ -2732,6 +2736,57 @@ Creates and returns an ObjectReader with access to object contents and optionall
- `requests.ConnectionTimeout` - Timed out connecting to AIStore
- `requests.ReadTimeout` - Timed out waiting response from AIStore

<a id="object.Object.as_file"></a>

### as\_file

```python
def as_file(max_resume: int = 5,
archive_settings: ArchiveSettings = None,
blob_download_settings: BlobDownloadSettings = None,
chunk_size: int = DEFAULT_CHUNK_SIZE,
etl_name: str = None,
writer: BufferedWriter = None,
latest: bool = False,
byte_range: str = None) -> ObjectFile
```

Creates an `ObjectFile` for reading object data in chunks with support for
resuming and retrying from the last known position in the case the object stream
is prematurely closed due to an unexpected error.

**Arguments**:

- `max_resume` _int, optional_ - If streaming object contents is interrupted, this
defines the maximum number of attempts to resume the connection before
raising an exception.
- `archive_settings` _ArchiveSettings, optional_ - Settings for archive extraction.
- `blob_download_settings` _BlobDownloadSettings, optional_ - Settings for using blob
download (e.g., chunk size, workers).
- `chunk_size` _int, optional_ - The size of chunks to use while reading from the stream.
- `etl_name` _str, optional_ - Name of the ETL (Extract, Transform, Load) transformation
to apply during the get operation.
- `writer` _BufferedWriter, optional_ - A writer for writing content output. User is
responsible for closing the writer.
- `latest` _bool, optional_ - Whether to get the latest version of the object from
a remote bucket (if applicable).
- `byte_range` _str, optional_ - Specify a byte range to fetch a segment of the object
(e.g., "bytes=0-499" for the first 500 bytes).


**Returns**:

- `ObjectFile` - A file-like object that can be used to read the object content.


**Raises**:

- `requests.RequestException` - An ambiguous exception occurred while handling the request.
- `requests.ConnectionError` - A connection error occurred.
- `requests.ConnectionTimeout` - The connection to AIStore timed out.
- `requests.ReadTimeout` - Waiting for a response from AIStore timed out.
- `requests.exceptions.HTTPError(404)` - The object does not exist.

<a id="object.Object.get_semantic_url"></a>

### get\_semantic\_url
Expand Down Expand Up @@ -2982,6 +3037,20 @@ class ObjectReader()
Represents the data returned by the API when getting an object, including access to the content stream and object
attributes.

<a id="object_reader.ObjectReader.head"></a>

### head

```python
def head() -> ObjectAttributes
```

Make a head request to AIS to update and return only object attributes.

**Returns**:

ObjectAttributes for this object

<a id="object_reader.ObjectReader.attributes"></a>

### attributes
Expand All @@ -2997,6 +3066,21 @@ Object metadata attributes.

- `ObjectAttributes` - Parsed object attributes from the headers returned by AIS

<a id="object_reader.ObjectReader.chunk_size"></a>

### chunk\_size

```python
@property
def chunk_size() -> int
```

Chunk size.

**Returns**:

- `int` - Current chunk size for reading the object.

<a id="object_reader.ObjectReader.read_all"></a>

### read\_all
Expand Down Expand Up @@ -3027,6 +3111,26 @@ Returns the raw byte stream of object content.

- `requests.Response` - Raw byte stream of the object content

<a id="object_reader.ObjectReader.iter_from_position"></a>

### iter\_from\_position

```python
def iter_from_position(start_position: int = 0) -> Iterator[bytes]
```

Make a request to get a stream from the provided object starting at a specific byte position
and yield chunks of the stream content.

**Arguments**:

- `start_position` _int, optional_ - The byte position to start reading from. Defaults to 0.


**Returns**:

- `Iterator[bytes]` - An iterator over each chunk of bytes in the object starting from the specific position.

<a id="object_reader.ObjectReader.__iter__"></a>

### \_\_iter\_\_
Expand All @@ -3039,5 +3143,5 @@ Make a request to get a stream from the provided object and yield chunks of the

**Returns**:

- `Iterator[bytes]` - An iterator over each chunk of bytes in the object
- `Iterator[bytes]` - An iterator over each chunk of bytes in the object.

6 changes: 4 additions & 2 deletions python/aistore/sdk/authn/access_attr.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ class AccessAttr(IntFlag):
OBJ_UPDATE = 1 << 7
BCK_HEAD = 1 << 8
OBJ_LIST = 1 << 9
PATCH = 1 << 10 # TODO: Not implemented in SDK
BCK_SET_ACL = 1 << 11 # TODO: Implemented in SDK
# TODO: Not implemented in SDK
PATCH = 1 << 10
# TODO: Not implemented in SDK
BCK_SET_ACL = 1 << 11
LIST_BUCKETS = 1 << 12
SHOW_CLUSTER = 1 << 13
CREATE_BUCKET = 1 << 14
Expand Down
55 changes: 55 additions & 0 deletions python/aistore/sdk/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
HEADER_OBJECT_BLOB_WORKERS,
HEADER_OBJECT_BLOB_CHUNK_SIZE,
)
from aistore.sdk.object_file import ObjectFile
from aistore.sdk.object_reader import ObjectReader
from aistore.sdk.types import (
ActionMsg,
Expand Down Expand Up @@ -174,6 +175,60 @@ def get(
writer.writelines(obj_reader)
return obj_reader

def as_file(
self,
max_resume: int = 5,
archive_settings: ArchiveSettings = None,
blob_download_settings: BlobDownloadSettings = None,
chunk_size: int = DEFAULT_CHUNK_SIZE,
etl_name: str = None,
writer: BufferedWriter = None,
latest: bool = False,
byte_range: str = None,
) -> ObjectFile: # pylint: disable=too-many-arguments
"""
Creates an `ObjectFile` for reading object data in chunks with support for
resuming and retrying from the last known position in the case the object stream
is prematurely closed due to an unexpected error.
Args:
max_resume (int, optional): If streaming object contents is interrupted, this
defines the maximum number of attempts to resume the connection before
raising an exception.
archive_settings (ArchiveSettings, optional): Settings for archive extraction.
blob_download_settings (BlobDownloadSettings, optional): Settings for using blob
download (e.g., chunk size, workers).
chunk_size (int, optional): The size of chunks to use while reading from the stream.
etl_name (str, optional): Name of the ETL (Extract, Transform, Load) transformation
to apply during the get operation.
writer (BufferedWriter, optional): A writer for writing content output. User is
responsible for closing the writer.
latest (bool, optional): Whether to get the latest version of the object from
a remote bucket (if applicable).
byte_range (str, optional): Specify a byte range to fetch a segment of the object
(e.g., "bytes=0-499" for the first 500 bytes).
Returns:
ObjectFile: A file-like object that can be used to read the object content.
Raises:
requests.RequestException: An ambiguous exception occurred while handling the request.
requests.ConnectionError: A connection error occurred.
requests.ConnectionTimeout: The connection to AIStore timed out.
requests.ReadTimeout: Waiting for a response from AIStore timed out.
requests.exceptions.HTTPError(404): The object does not exist.
"""
object_reader = self.get(
archive_settings=archive_settings,
blob_download_settings=blob_download_settings,
chunk_size=chunk_size,
etl_name=etl_name,
writer=writer,
latest=latest,
byte_range=byte_range,
)
return ObjectFile(object_reader, max_resume=max_resume)

def get_semantic_url(self):
"""
Get the semantic URL to the object
Expand Down
Loading

0 comments on commit 6a9c9fc

Please sign in to comment.