Skip to content

Commit

Permalink
fix: make it compile
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored and ion-elgreco committed Feb 3, 2025
1 parent 90db540 commit c75b3e6
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 11 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/data_catalog/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ mod tests {
use super::*;
use datafusion::assert_batches_sorted_eq;
use datafusion::catalog::CatalogProvider;
use datafusion::catalog_common::MemoryCatalogProvider;
use datafusion::catalog::MemoryCatalogProvider;
use datafusion::execution::context::SessionContext;

#[test]
Expand Down
5 changes: 3 additions & 2 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion_common::scalar::ScalarValue;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::{
config::ConfigOptions, Column, DFSchema, DataFusionError, Result as DataFusionResult,
TableReference, ToDFSchema,
config::ConfigOptions, Column, Constraints, DFSchema, DataFusionError,
Result as DataFusionResult, TableReference, ToDFSchema,
};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::logical_plan::CreateExternalTable;
Expand Down Expand Up @@ -659,6 +659,7 @@ impl<'a> DeltaScanBuilder<'a> {
} else {
file_groups.into_values().collect()
},
constraints: Constraints::default(),
statistics: stats,
projection: self.projection.cloned(),
limit: self.limit,
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ mod datafusion {
null_count,
max_value,
min_value,
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
})
}
Expand All @@ -653,6 +654,7 @@ mod datafusion {
null_count: self.null_count.add(&other.null_count),
max_value: self.max_value.max(&other.max_value),
min_value: self.min_value.min(&other.min_value),
sum_value: Precision::Absent,
distinct_count: self.distinct_count.add(&other.distinct_count),
}
}
Expand Down
5 changes: 4 additions & 1 deletion crates/core/src/operations/load_cdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion::prelude::SessionContext;
use datafusion_common::{ScalarValue, Statistics};
use datafusion_common::{Constraints, ScalarValue, Statistics};
use datafusion_physical_expr::expressions;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::projection::ProjectionExec;
Expand Down Expand Up @@ -389,6 +389,7 @@ impl CdfLoadBuilder {
object_store_url: self.log_store.object_store_url(),
file_schema: cdc_file_schema.clone(),
file_groups: cdc_file_groups.into_values().collect(),
constraints: Constraints::default(),
statistics: Statistics::new_unknown(&cdc_file_schema),
projection: None,
limit: None,
Expand All @@ -406,6 +407,7 @@ impl CdfLoadBuilder {
object_store_url: self.log_store.object_store_url(),
file_schema: add_remove_file_schema.clone(),
file_groups: add_file_groups.into_values().collect(),
constraints: Constraints::default(),
statistics: Statistics::new_unknown(&add_remove_file_schema.clone()),
projection: None,
limit: None,
Expand All @@ -423,6 +425,7 @@ impl CdfLoadBuilder {
object_store_url: self.log_store.object_store_url(),
file_schema: add_remove_file_schema.clone(),
file_groups: remove_file_groups.into_values().collect(),
constraints: Constraints::default(),
statistics: Statistics::new_unknown(&add_remove_file_schema),
projection: None,
limit: None,
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/writer/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,8 @@ impl DeltaWriter<Vec<Value>> for JsonWriter {
ParquetError::IndexOutOfBound(u.to_owned(), v.to_owned())
}
ParquetError::NYI(msg) => ParquetError::NYI(msg.to_owned()),
// ParquetError is non exhaustive, so have a fallback
e => ParquetError::General(e.to_string()),
},
skipped_values: partial_writes,
}
Expand Down
14 changes: 9 additions & 5 deletions python/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::RawDeltaTable;
use deltalake::storage::object_store::{MultipartUpload, PutPayloadMut};
use deltalake::storage::{DynObjectStore, ListResult, ObjectStoreError, Path};
use deltalake::DeltaTableBuilder;
use parking_lot::Mutex;
use pyo3::exceptions::{PyIOError, PyNotImplementedError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{IntoPyDict, PyBytes, PyType};
Expand Down Expand Up @@ -519,7 +520,9 @@ impl ObjectInputFile {
// TODO the C++ implementation track an internal lock on all random access files, DO we need this here?
#[pyclass(weakref, module = "deltalake._internal")]
pub struct ObjectOutputStream {
upload: Box<dyn MultipartUpload>,
// wrap in mutex as rustc says `MultipartUpload` can't be
// shared across threads (it isn't sync)
upload: Mutex<Box<dyn MultipartUpload>>,
pos: i64,
#[pyo3(get)]
closed: bool,
Expand All @@ -537,7 +540,7 @@ impl ObjectOutputStream {
) -> Result<Self, ObjectStoreError> {
let upload = store.put_multipart(&path).await?;
Ok(Self {
upload,
upload: Mutex::new(upload),
pos: 0,
closed: false,
mode: "wb".into(),
Expand All @@ -555,14 +558,15 @@ impl ObjectOutputStream {
}

fn abort(&mut self) -> PyResult<()> {
rt().block_on(self.upload.abort())
rt().block_on(self.upload.lock().abort())
.map_err(PythonError::from)?;
Ok(())
}

fn upload_buffer(&mut self) -> PyResult<()> {
let payload = std::mem::take(&mut self.buffer).freeze();
match rt().block_on(self.upload.put_part(payload)) {
let res = rt().block_on(self.upload.lock().put_part(payload));
match res {
Ok(_) => Ok(()),
Err(err) => {
self.abort()?;
Expand All @@ -580,7 +584,7 @@ impl ObjectOutputStream {
if !self.buffer.is_empty() {
self.upload_buffer()?;
}
match rt().block_on(self.upload.complete()) {
match rt().block_on(self.upload.lock().complete()) {
Ok(_) => Ok(()),
Err(err) => Err(PyIOError::new_err(err.to_string())),
}
Expand Down
6 changes: 4 additions & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1525,10 +1525,12 @@ impl RawDeltaTable {
&self,
py: Python<'py>,
) -> PyResult<Bound<'py, PyCapsule>> {
// tokio runtime handle?
let handle = None;
let name = CString::new("datafusion_table_provider").unwrap();

let table = self.with_table(|t| Ok(Arc::new(t.clone())))?;
let provider = FFI_TableProvider::new(table, false);
let provider = FFI_TableProvider::new(table, false, handle);

PyCapsule::new_bound(py, provider, Some(name.clone()))
}
Expand Down Expand Up @@ -1782,7 +1784,7 @@ fn filestats_to_expression_next<'py>(
})?
.data_type()
.clone();
let column_type = PyArrowType(column_type).into_py(py);
let column_type = PyArrowType(column_type).into_pyobject(py)?;
pa.call_method1("scalar", (value,))?
.call_method1("cast", (column_type,))
};
Expand Down

0 comments on commit c75b3e6

Please sign in to comment.