Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix memory leak #733

Merged
merged 1 commit into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion connectorx-python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ license = "MIT"
maintainers = ["Weiyuan Wu <[email protected]>"]
name = "connectorx"
readme = "README.md" # Markdown files are supported
version = "0.4.1-alpha1"

[project]
name = "connectorx" # Target file name of maturin build
readme = "README.md"
license = { text = "MIT" }
requires-python = ">=3.10"
version = "0.4.1-alpha1"
dynamic = ["version"]

[tool.poetry.dependencies]
dask = {version = "^2021", optional = true, extras = ["dataframe"]}
Expand Down
22 changes: 11 additions & 11 deletions connectorx-python/src/pandas/destination.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{
pandas_columns::{
ArrayBlock, BooleanBlock, BytesBlock, DateTimeBlock, Float64Block, HasPandasColumn,
Int64Block, PandasColumn, PandasColumnObject, PyBytes, StringBlock,
ArrayBlock, BooleanBlock, BytesBlock, DateTimeBlock, ExtractBlockFromBound, Float64Block,
HasPandasColumn, Int64Block, PandasColumn, PandasColumnObject, PyBytes, StringBlock,
},
pystring::PyString,
typesystem::{PandasArrayType, PandasBlockType, PandasTypeSystem},
Expand Down Expand Up @@ -215,7 +215,7 @@ impl<'py> Destination for PandasDestination<'py> {
let buf = &self.block_datas[idx];
match block.dt {
PandasBlockType::Boolean(_) => {
let bblock = buf.extract::<BooleanBlock>()?;
let bblock = BooleanBlock::extract_block(buf)?;

let bcols = bblock.split()?;
for (&cid, bcol) in block.cids.iter().zip_eq(bcols) {
Expand All @@ -227,7 +227,7 @@ impl<'py> Destination for PandasDestination<'py> {
}
}
PandasBlockType::Float64 => {
let fblock = buf.extract::<Float64Block>()?;
let fblock = Float64Block::extract_block(buf)?;
let fcols = fblock.split()?;
for (&cid, fcol) in block.cids.iter().zip_eq(fcols) {
partitioned_columns[cid] = fcol
Expand All @@ -238,7 +238,7 @@ impl<'py> Destination for PandasDestination<'py> {
}
}
PandasBlockType::BooleanArray => {
let bblock = buf.extract::<ArrayBlock<bool>>()?;
let bblock = ArrayBlock::<bool>::extract_block(buf)?;
let bcols = bblock.split()?;
for (&cid, bcol) in block.cids.iter().zip_eq(bcols) {
partitioned_columns[cid] = bcol
Expand All @@ -249,7 +249,7 @@ impl<'py> Destination for PandasDestination<'py> {
}
}
PandasBlockType::Float64Array => {
let fblock = buf.extract::<ArrayBlock<f64>>()?;
let fblock = ArrayBlock::<f64>::extract_block(buf)?;
let fcols = fblock.split()?;
for (&cid, fcol) in block.cids.iter().zip_eq(fcols) {
partitioned_columns[cid] = fcol
Expand All @@ -260,7 +260,7 @@ impl<'py> Destination for PandasDestination<'py> {
}
}
PandasBlockType::Int64Array => {
let fblock = buf.extract::<ArrayBlock<i64>>()?;
let fblock = ArrayBlock::<i64>::extract_block(buf)?;
let fcols = fblock.split()?;
for (&cid, fcol) in block.cids.iter().zip_eq(fcols) {
partitioned_columns[cid] = fcol
Expand All @@ -271,7 +271,7 @@ impl<'py> Destination for PandasDestination<'py> {
}
}
PandasBlockType::Int64(_) => {
let ublock = buf.extract::<Int64Block>()?;
let ublock = Int64Block::extract_block(buf)?;
let ucols = ublock.split()?;
for (&cid, ucol) in block.cids.iter().zip_eq(ucols) {
partitioned_columns[cid] = ucol
Expand All @@ -282,7 +282,7 @@ impl<'py> Destination for PandasDestination<'py> {
}
}
PandasBlockType::String => {
let sblock = buf.extract::<StringBlock>()?;
let sblock = StringBlock::extract_block(buf)?;
let scols = sblock.split()?;
for (&cid, scol) in block.cids.iter().zip_eq(scols) {
partitioned_columns[cid] = scol
Expand All @@ -293,7 +293,7 @@ impl<'py> Destination for PandasDestination<'py> {
}
}
PandasBlockType::Bytes => {
let bblock = buf.extract::<BytesBlock>()?;
let bblock = BytesBlock::extract_block(buf)?;
let bcols = bblock.split()?;
for (&cid, bcol) in block.cids.iter().zip_eq(bcols) {
partitioned_columns[cid] = bcol
Expand All @@ -304,7 +304,7 @@ impl<'py> Destination for PandasDestination<'py> {
}
}
PandasBlockType::DateTime => {
let dblock = buf.extract::<DateTimeBlock>()?;
let dblock = DateTimeBlock::extract_block(buf)?;
let dcols = dblock.split()?;
for (&cid, dcol) in block.cids.iter().zip_eq(dcols) {
partitioned_columns[cid] = dcol
Expand Down
17 changes: 8 additions & 9 deletions connectorx-python/src/pandas/pandas_columns/array.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject, GIL_MUTEX};
use super::{
check_dtype, ExtractBlockFromBound, HasPandasColumn, PandasColumn, PandasColumnObject,
GIL_MUTEX,
};
use crate::errors::ConnectorXPythonError;
use anyhow::anyhow;
use fehler::throws;
use ndarray::{ArrayViewMut2, Axis, Ix2};
use numpy::{Element, PyArray, PyArrayDescr};
use pyo3::{Bound, FromPyObject, Py, PyAny, PyResult, Python, ToPyObject};
use numpy::{Element, PyArray, PyArrayDescr, PyArrayMethods};
use pyo3::{types::PyAnyMethods, Bound, Py, PyAny, PyResult, Python, ToPyObject};
use std::any::TypeId;
use std::marker::PhantomData;

Expand All @@ -30,8 +33,8 @@ pub struct ArrayBlock<'a, V> {
_value_type: PhantomData<V>,
}

impl<'a, V> FromPyObject<'a> for ArrayBlock<'a, V> {
fn extract(ob: &'a PyAny) -> PyResult<Self> {
impl<'a, V> ExtractBlockFromBound<'a> for ArrayBlock<'a, V> {
fn extract_block<'b: 'a>(ob: &'b pyo3::Bound<'a, PyAny>) -> PyResult<Self> {
check_dtype(ob, "object")?;
let array = ob.downcast::<PyArray<PyList, Ix2>>()?;
let data = unsafe { array.as_array_mut() };
Expand All @@ -41,10 +44,6 @@ impl<'a, V> FromPyObject<'a> for ArrayBlock<'a, V> {
_value_type: PhantomData,
})
}

fn extract_bound(ob: &pyo3::Bound<'a, PyAny>) -> PyResult<Self> {
Self::extract(ob.clone().into_gil_ref())
}
}

impl<'a, V> ArrayBlock<'a, V> {
Expand Down
24 changes: 13 additions & 11 deletions connectorx-python/src/pandas/pandas_columns/boolean.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject};
use super::{
check_dtype, ExtractBlockFromBound, HasPandasColumn, PandasColumn, PandasColumnObject,
};
use crate::errors::ConnectorXPythonError;
use anyhow::anyhow;
use fehler::throws;
use ndarray::{ArrayViewMut1, ArrayViewMut2, Axis, Ix2};
use numpy::{PyArray, PyArray1};
use pyo3::{types::PyTuple, FromPyObject, PyAny, PyResult};
use numpy::{PyArray, PyArray1, PyArrayMethods};
use pyo3::{
types::{PyAnyMethods, PyTuple, PyTupleMethods},
PyAny, PyResult,
};
use std::any::TypeId;

// Boolean
pub enum BooleanBlock<'a> {
NumPy(ArrayViewMut2<'a, bool>),
Extention(ArrayViewMut1<'a, bool>, ArrayViewMut1<'a, bool>),
}
impl<'a> FromPyObject<'a> for BooleanBlock<'a> {
fn extract(ob: &'a PyAny) -> PyResult<Self> {

impl<'a> ExtractBlockFromBound<'a> for BooleanBlock<'a> {
fn extract_block<'b: 'a>(ob: &'b pyo3::Bound<'a, PyAny>) -> PyResult<Self> {
if let Ok(array) = ob.downcast::<PyArray<bool, Ix2>>() {
// if numpy array
check_dtype(ob, "bool")?;
Expand All @@ -22,8 +28,8 @@ impl<'a> FromPyObject<'a> for BooleanBlock<'a> {
} else {
// if extension array
let tuple = ob.downcast::<PyTuple>()?;
let data = tuple.get_item(0)?;
let mask = tuple.get_item(1)?;
let data = tuple.as_slice().get(0).unwrap();
let mask = tuple.as_slice().get(1).unwrap();
check_dtype(data, "bool")?;
check_dtype(mask, "bool")?;

Expand All @@ -33,10 +39,6 @@ impl<'a> FromPyObject<'a> for BooleanBlock<'a> {
))
}
}

fn extract_bound(ob: &pyo3::Bound<'a, PyAny>) -> PyResult<Self> {
Self::extract(ob.clone().into_gil_ref())
}
}

impl<'a> BooleanBlock<'a> {
Expand Down
17 changes: 8 additions & 9 deletions connectorx-python/src/pandas/pandas_columns/bytes.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject, GIL_MUTEX};
use super::{
check_dtype, ExtractBlockFromBound, HasPandasColumn, PandasColumn, PandasColumnObject,
GIL_MUTEX,
};
use crate::errors::ConnectorXPythonError;
use anyhow::anyhow;
use fehler::throws;
use ndarray::{ArrayViewMut2, Axis, Ix2};
use numpy::{Element, PyArray, PyArrayDescr};
use pyo3::{Bound, FromPyObject, Py, PyAny, PyResult, Python};
use numpy::{Element, PyArray, PyArrayDescr, PyArrayMethods};
use pyo3::{types::PyAnyMethods, Bound, Py, PyAny, PyResult, Python};
use std::any::TypeId;

#[derive(Clone)]
Expand All @@ -28,8 +31,8 @@ pub struct BytesBlock<'a> {
buf_size_mb: usize,
}

impl<'a> FromPyObject<'a> for BytesBlock<'a> {
fn extract(ob: &'a PyAny) -> PyResult<Self> {
impl<'a> ExtractBlockFromBound<'a> for BytesBlock<'a> {
fn extract_block<'b: 'a>(ob: &'b pyo3::Bound<'a, PyAny>) -> PyResult<Self> {
check_dtype(ob, "object")?;
let array = ob.downcast::<PyArray<PyBytes, Ix2>>()?;
let data = unsafe { array.as_array_mut() };
Expand All @@ -38,10 +41,6 @@ impl<'a> FromPyObject<'a> for BytesBlock<'a> {
buf_size_mb: 16, // in MB
})
}

fn extract_bound(ob: &pyo3::Bound<'a, PyAny>) -> PyResult<Self> {
Self::extract(ob.clone().into_gil_ref())
}
}

impl<'a> BytesBlock<'a> {
Expand Down
16 changes: 7 additions & 9 deletions connectorx-python/src/pandas/pandas_columns/datetime.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject};
use super::{
check_dtype, ExtractBlockFromBound, HasPandasColumn, PandasColumn, PandasColumnObject,
};
use crate::errors::ConnectorXPythonError;
use anyhow::anyhow;
use chrono::{DateTime, Utc};
use fehler::throws;
use ndarray::{ArrayViewMut2, Axis, Ix2};
use numpy::PyArray;
use pyo3::{FromPyObject, PyAny, PyResult};
use numpy::{PyArray, PyArrayMethods};
use pyo3::{types::PyAnyMethods, PyAny, PyResult};
use std::any::TypeId;

// datetime64 is represented in int64 in numpy
Expand All @@ -14,17 +16,13 @@ pub struct DateTimeBlock<'a> {
data: ArrayViewMut2<'a, i64>,
}

impl<'a> FromPyObject<'a> for DateTimeBlock<'a> {
fn extract(ob: &'a PyAny) -> PyResult<Self> {
impl<'a> ExtractBlockFromBound<'a> for DateTimeBlock<'a> {
fn extract_block<'b: 'a>(ob: &'b pyo3::Bound<'a, PyAny>) -> PyResult<Self> {
check_dtype(ob, "int64")?;
let array = ob.downcast::<PyArray<i64, Ix2>>()?;
let data = unsafe { array.as_array_mut() };
Ok(DateTimeBlock { data })
}

fn extract_bound(ob: &pyo3::Bound<'a, PyAny>) -> PyResult<Self> {
Self::extract(ob.clone().into_gil_ref())
}
}

impl<'a> DateTimeBlock<'a> {
Expand Down
20 changes: 9 additions & 11 deletions connectorx-python/src/pandas/pandas_columns/float64.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject};
use super::{
check_dtype, ExtractBlockFromBound, HasPandasColumn, PandasColumn, PandasColumnObject,
};
use crate::errors::ConnectorXPythonError;
use anyhow::anyhow;
use fehler::throws;
use ndarray::{ArrayViewMut2, Axis, Ix2};
use numpy::PyArray;
use pyo3::{FromPyObject, PyAny, PyResult};
use numpy::{PyArray, PyArrayMethods};
use pyo3::{types::PyAnyMethods, PyAny, PyResult};
use std::any::TypeId;

// Float
pub struct Float64Block<'a> {
data: ArrayViewMut2<'a, f64>,
}

impl<'a> FromPyObject<'a> for Float64Block<'a> {
fn extract(ob: &'a PyAny) -> PyResult<Self> {
impl<'a> ExtractBlockFromBound<'a> for Float64Block<'a> {
fn extract_block<'b: 'a>(ob: &'b pyo3::Bound<'a, PyAny>) -> PyResult<Self> {
check_dtype(ob, "float64")?;
let array = ob.downcast::<PyArray<f64, Ix2>>()?;
let data = unsafe { array.as_array_mut() };
let array: &pyo3::Bound<'a, PyArray<f64, Ix2>> = ob.downcast()?;
let data: ArrayViewMut2<'a, f64> = unsafe { array.as_array_mut() };
Ok(Float64Block { data })
}

fn extract_bound(ob: &pyo3::Bound<'a, PyAny>) -> PyResult<Self> {
Self::extract(ob.clone().into_gil_ref())
}
}

impl<'a> Float64Block<'a> {
Expand Down
26 changes: 14 additions & 12 deletions connectorx-python/src/pandas/pandas_columns/int64.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,41 @@
use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject};
use super::{
check_dtype, ExtractBlockFromBound, HasPandasColumn, PandasColumn, PandasColumnObject,
};
use crate::errors::ConnectorXPythonError;
use anyhow::anyhow;
use fehler::throws;
use ndarray::{ArrayViewMut1, ArrayViewMut2, Axis, Ix2};
use numpy::{PyArray, PyArray1};
use pyo3::{types::PyTuple, FromPyObject, PyAny, PyResult};
use numpy::{PyArray, PyArray1, PyArrayMethods};
use pyo3::{
types::{PyAnyMethods, PyTuple, PyTupleMethods},
PyAny, PyResult,
};
use std::any::TypeId;

pub enum Int64Block<'a> {
NumPy(ArrayViewMut2<'a, i64>),
Extention(ArrayViewMut1<'a, i64>, ArrayViewMut1<'a, bool>),
}
impl<'a> FromPyObject<'a> for Int64Block<'a> {
fn extract(ob: &'a PyAny) -> PyResult<Self> {

impl<'a> ExtractBlockFromBound<'a> for Int64Block<'a> {
fn extract_block<'b: 'a>(ob: &'b pyo3::Bound<'a, PyAny>) -> PyResult<Self> {
if let Ok(array) = ob.downcast::<PyArray<i64, Ix2>>() {
check_dtype(ob, "int64")?;
let data = unsafe { array.as_array_mut() };
Ok(Int64Block::NumPy(data))
} else {
let tuple = ob.downcast::<PyTuple>()?;
let data = tuple.get_item(0)?;
let mask = tuple.get_item(1)?;
// let data = tuple.get_borrowed_item(0)?;
let data = tuple.as_slice().get(0).unwrap();
let mask = tuple.as_slice().get(1).unwrap();
check_dtype(data, "int64")?;
check_dtype(mask, "bool")?;

Ok(Int64Block::Extention(
unsafe { data.downcast::<PyArray1<i64>>()?.as_array_mut() },
unsafe { mask.downcast::<PyArray1<bool>>()?.as_array_mut() },
))
}
}

fn extract_bound(ob: &pyo3::Bound<'a, PyAny>) -> PyResult<Self> {
Self::extract(ob.clone().into_gil_ref())
}
}

impl<'a> Int64Block<'a> {
Expand Down
Loading
Loading