Skip to content

Commit

Permalink
Generic pyo3 integration for object-store (#229)
Browse files Browse the repository at this point in the history
* wip pyo3-object-store

* wip

* fix import

* Implement ObjectStore creation APIs

* rename

* Cleanup

* Add local and in-memory object-store

* Fix module add

* Cleaner store errors

* remove test_s3

* remove test_fsspec
  • Loading branch information
kylebarron authored Oct 15, 2024
1 parent 2d3b583 commit 6d513ea
Show file tree
Hide file tree
Showing 27 changed files with 2,333 additions and 29 deletions.
1,180 changes: 1,157 additions & 23 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
[workspace]
members = ["arro3-compute", "arro3-core", "arro3-io", "pyo3-arrow"]
members = [
"arro3-compute",
"arro3-core",
"arro3-io",
"pyo3-arrow",
"pyo3-object_store",
]
resolver = "2"

[workspace.package]
Expand All @@ -26,6 +32,7 @@ arrow-select = "53"
half = "2"
indexmap = "2"
numpy = "0.22"
object_store = "0.11"
parquet = "53"
pyo3 = { version = "0.22", features = ["macros", "indexmap"] }
pyo3-file = "0.9"
Expand Down
21 changes: 19 additions & 2 deletions arro3-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,33 @@ rust-version = { workspace = true }
name = "_io"
crate-type = ["cdylib"]

[features]
default = ["async"]
# Include async code. This feature won't compile for pyodide.
async = [
"dep:pyo3-object_store",
"dep:pyo3-async-runtimes",
"parquet/object_store",
"dep:object_store",
"dep:futures",
]

[dependencies]
arrow-array = { workspace = true }
arrow = { workspace = true, features = ["ffi"] }
arrow-buffer = { workspace = true }
arrow-csv = { workspace = true }
arrow-ipc = { workspace = true }
arrow-schema = { workspace = true }
arrow = { workspace = true, features = ["ffi"] }
bytes = "1.7.0"
futures = { version = "0.3.30", optional = true }
object_store = { workspace = true, optional = true }
parquet = { workspace = true }
pyo3 = { workspace = true }
pyo3-arrow = { path = "../pyo3-arrow" }
pyo3-async-runtimes = { git = "https://github.com/PyO3/pyo3-async-runtimes", features = [
"tokio-runtime",
], optional = true }
pyo3-file = { workspace = true }
pyo3-object_store = { path = "../pyo3-object_store", optional = true }
thiserror = { workspace = true }
pyo3-arrow = { path = "../pyo3-arrow" }
2 changes: 1 addition & 1 deletion arro3-io/python/arro3/io/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ._io import *
from ._io import ___version
from ._io import ___version, store

__version__: str = ___version()
12 changes: 12 additions & 0 deletions arro3-io/python/arro3/io/_io.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ from typing import IO, Literal, Sequence
import arro3.core as core
import arro3.core.types as types

from ._pyo3_object_store import ObjectStore

#### CSV

def infer_csv_schema(
Expand Down Expand Up @@ -275,6 +277,16 @@ def read_parquet(file: IO[bytes] | Path | str) -> core.RecordBatchReader:
The loaded Arrow data.
"""

async def read_parquet_async(path: str, *, store: ObjectStore) -> core.Table:
"""Read a Parquet file to an Arrow Table in an async fashion
Args:
file: The path to the Parquet file in the given store
Returns:
The loaded Arrow data.
"""

def write_parquet(
data: types.ArrowStreamExportable | types.ArrowArrayExportable,
file: IO[bytes] | Path | str,
Expand Down
115 changes: 115 additions & 0 deletions arro3-io/python/arro3/io/_pyo3_object_store.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# TODO: move this to a standalone package/docs website that can be shared across
# multiple python packages.

from __future__ import annotations

from datetime import timedelta
from typing import Dict, TypedDict

import boto3
import botocore
import botocore.session

class BackoffConfig(TypedDict):
init_backoff: timedelta
max_backoff: timedelta
base: int | float

class RetryConfig(TypedDict):
backoff: BackoffConfig
max_retries: int
retry_timeout: timedelta

class AzureStore:
@classmethod
def from_env(
cls,
container: str,
*,
config: Dict[str, str] | None = None,
client_options: Dict[str, str] | None = None,
retry_config: RetryConfig | None = None,
) -> S3Store: ...
@classmethod
def from_url(
cls,
url: str,
*,
config: Dict[str, str] | None = None,
client_options: Dict[str, str] | None = None,
retry_config: RetryConfig | None = None,
) -> S3Store: ...

class GCSStore:
@classmethod
def from_env(
cls,
bucket: str,
*,
config: Dict[str, str] | None = None,
client_options: Dict[str, str] | None = None,
retry_config: RetryConfig | None = None,
) -> S3Store: ...
@classmethod
def from_url(
cls,
url: str,
*,
config: Dict[str, str] | None = None,
client_options: Dict[str, str] | None = None,
retry_config: RetryConfig | None = None,
) -> S3Store: ...

class HTTPStore:
@classmethod
def from_url(
cls,
url: str,
*,
client_options: Dict[str, str] | None = None,
retry_config: RetryConfig | None = None,
) -> S3Store: ...

class S3Store:
@classmethod
def from_env(
cls,
bucket: str,
*,
config: Dict[str, str] | None = None,
client_options: Dict[str, str] | None = None,
retry_config: RetryConfig | None = None,
) -> S3Store: ...
@classmethod
def from_session(
cls,
session: boto3.Session | botocore.session.Session,
bucket: str,
*,
config: Dict[str, str] | None = None,
client_options: Dict[str, str] | None = None,
retry_config: RetryConfig | None = None,
) -> S3Store: ...
@classmethod
def from_url(
cls,
url: str,
*,
config: Dict[str, str] | None = None,
client_options: Dict[str, str] | None = None,
retry_config: RetryConfig | None = None,
) -> S3Store: ...

class LocalStore:
"""
Local filesystem storage providing an ObjectStore interface to files on local disk.
Can optionally be created with a directory prefix.
"""
def __init__(self, prefix: str | None = None) -> None: ...

class MemoryStore:
"""A fully in-memory implementation of ObjectStore."""
def __init__(self) -> None: ...

ObjectStore = AzureStore | GCSStore | HTTPStore | S3Store | LocalStore | MemoryStore
7 changes: 7 additions & 0 deletions arro3-io/python/arro3/io/store.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# TODO: move to reusable types package
from ._pyo3_object_store import AzureStore as AzureStore
from ._pyo3_object_store import GCSStore as GCSStore
from ._pyo3_object_store import HTTPStore as HTTPStore
from ._pyo3_object_store import LocalStore as LocalStore
from ._pyo3_object_store import MemoryStore as MemoryStore
from ._pyo3_object_store import S3Store as S3Store
50 changes: 50 additions & 0 deletions arro3-io/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//! Contains the [`Arro3IoError`], the Error returned by most fallible functions in this crate.
use pyo3::exceptions::{PyException, PyValueError};
use pyo3::prelude::*;
use pyo3::DowncastError;
use thiserror::Error;

/// The Error variants returned by this crate.
#[derive(Error, Debug)]
#[non_exhaustive]
pub enum Arro3IoError {
/// A wrapped [arrow::error::ArrowError]
#[error(transparent)]
ArrowError(#[from] arrow::error::ArrowError),

/// A wrapped [object_store::Error]
#[error(transparent)]
ObjectStoreError(#[from] object_store::Error),

/// A wrapped [parquet::errors::ParquetError]
#[error(transparent)]
ParquetError(#[from] parquet::errors::ParquetError),

/// A wrapped [PyErr]
#[error(transparent)]
PyErr(#[from] PyErr),
}

impl From<Arro3IoError> for PyErr {
fn from(error: Arro3IoError) -> Self {
match error {
Arro3IoError::PyErr(err) => err,
Arro3IoError::ArrowError(err) => PyException::new_err(err.to_string()),
Arro3IoError::ObjectStoreError(err) => PyException::new_err(err.to_string()),
Arro3IoError::ParquetError(err) => PyException::new_err(err.to_string()),
}
}
}

impl<'a, 'py> From<DowncastError<'a, 'py>> for Arro3IoError {
fn from(other: DowncastError<'a, 'py>) -> Self {
Self::PyErr(PyValueError::new_err(format!(
"Could not downcast: {}",
other
)))
}
}

/// A type wrapper around `Result<T, Arro3IoError>`.
pub type Arro3IoResult<T> = Result<T, Arro3IoError>;
6 changes: 5 additions & 1 deletion arro3-io/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use pyo3::prelude::*;

mod csv;
mod error;
mod ipc;
mod json;
mod parquet;
Expand All @@ -14,9 +15,11 @@ fn ___version() -> &'static str {
}

#[pymodule]
fn _io(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
fn _io(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(___version))?;

pyo3_object_store::register_store_module(py, m, "arro3.io")?;

m.add_wrapped(wrap_pyfunction!(csv::infer_csv_schema))?;
m.add_wrapped(wrap_pyfunction!(csv::read_csv))?;
m.add_wrapped(wrap_pyfunction!(csv::write_csv))?;
Expand All @@ -32,6 +35,7 @@ fn _io(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(ipc::write_ipc_stream))?;

m.add_wrapped(wrap_pyfunction!(parquet::read_parquet))?;
m.add_wrapped(wrap_pyfunction!(parquet::read_parquet_async))?;
m.add_wrapped(wrap_pyfunction!(parquet::write_parquet))?;

Ok(())
Expand Down
40 changes: 39 additions & 1 deletion arro3-io/src/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use arrow_array::{RecordBatchIterator, RecordBatchReader};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::arrow_writer::ArrowWriterOptions;
use parquet::arrow::async_reader::ParquetObjectReader;
use parquet::arrow::ArrowWriter;
use parquet::basic::{Compression, Encoding};
use parquet::file::properties::{WriterProperties, WriterVersion};
Expand All @@ -14,8 +15,10 @@ use pyo3::exceptions::{PyTypeError, PyValueError};
use pyo3::prelude::*;
use pyo3_arrow::error::PyArrowResult;
use pyo3_arrow::input::AnyRecordBatch;
use pyo3_arrow::PyRecordBatchReader;
use pyo3_arrow::{PyRecordBatchReader, PyTable};
use pyo3_object_store::PyObjectStore;

use crate::error::Arro3IoResult;
use crate::utils::{FileReader, FileWriter};

#[pyfunction]
Expand All @@ -40,6 +43,41 @@ pub fn read_parquet(py: Python, file: FileReader) -> PyArrowResult<PyObject> {
Ok(PyRecordBatchReader::new(iter).to_arro3(py)?)
}

#[pyfunction]
#[pyo3(signature = (path, *, store))]
pub fn read_parquet_async(
py: Python,
path: String,
store: PyObjectStore,
) -> PyArrowResult<PyObject> {
let fut = pyo3_async_runtimes::tokio::future_into_py(py, async move {
Ok(read_parquet_async_inner(store.into_inner(), path).await?)
})?;

Ok(fut.into())
}

async fn read_parquet_async_inner(
store: Arc<dyn object_store::ObjectStore>,
path: String,
) -> Arro3IoResult<PyTable> {
use futures::TryStreamExt;
use parquet::arrow::ParquetRecordBatchStreamBuilder;

let meta = store.head(&path.into()).await?;

let object_reader = ParquetObjectReader::new(store, meta);
let builder = ParquetRecordBatchStreamBuilder::new(object_reader).await?;

let metadata = builder.schema().metadata().clone();
let reader = builder.build()?;

let arrow_schema = Arc::new(reader.schema().as_ref().clone().with_metadata(metadata));

let batches = reader.try_collect::<Vec<_>>().await?;
Ok(PyTable::try_new(batches, arrow_schema)?)
}

pub(crate) struct PyWriterVersion(WriterVersion);

impl<'py> FromPyObject<'py> for PyWriterVersion {
Expand Down
27 changes: 27 additions & 0 deletions pyo3-object_store/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[package]
name = "pyo3-object_store"
version = "0.1.0"
authors = { workspace = true }
edition = { workspace = true }
description = "object_store integration for pyo3."
readme = "README.md"
repository = { workspace = true }
license = { workspace = true }
keywords = { workspace = true }
categories = { workspace = true }
rust-version = { workspace = true }

[dependencies]
futures = "0.3.30"
object_store = { workspace = true, features = ["aws", "azure", "gcp", "http"] }
pyo3 = { workspace = true, features = ["chrono", "indexmap"] }
pyo3-async-runtimes = { git = "https://github.com/PyO3/pyo3-async-runtimes", features = [
"tokio-runtime",
] }
thiserror = { workspace = true }

[dev-dependencies]
arrow-select = { workspace = true }

[lib]
crate-type = ["rlib"]
3 changes: 3 additions & 0 deletions pyo3-object_store/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# pyo3-object_store

Use `object-store` in your pyo3-based libraries.
Loading

0 comments on commit 6d513ea

Please sign in to comment.