Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Take or UnionArray #44

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
- name: Install python dev
run: |
apt update
apt install -y libpython3.9-dev
apt install -y libpython3.11-dev
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand Down
2 changes: 1 addition & 1 deletion arrow-buffer/src/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ pub trait ToByteSlice {
impl<T: ArrowNativeType> ToByteSlice for [T] {
#[inline]
fn to_byte_slice(&self) -> &[u8] {
let raw_ptr = self.as_ptr() as *const T as *const u8;
let raw_ptr = self.as_ptr() as *const u8;
unsafe { std::slice::from_raw_parts(raw_ptr, std::mem::size_of_val(self)) }
}
}
Expand Down
2 changes: 1 addition & 1 deletion arrow-buffer/src/util/bit_chunk_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl<'a> UnalignedBitChunk<'a> {
self.prefix
.into_iter()
.chain(self.chunks.iter().cloned())
.chain(self.suffix.into_iter())
.chain(self.suffix)
}

/// Counts the number of ones
Expand Down
54 changes: 53 additions & 1 deletion arrow-select/src/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use arrow_buffer::{
ScalarBuffer,
};
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_schema::{ArrowError, DataType, FieldRef};
use arrow_schema::{ArrowError, DataType, FieldRef, UnionMode};

use num::{One, Zero};

Expand Down Expand Up @@ -207,6 +207,21 @@ fn take_impl<IndexType: ArrowPrimitiveType>(
Ok(new_null_array(&DataType::Null, indices.len()))
}
}
DataType::Union(fields, UnionMode::Sparse) => {
let mut field_type_ids = Vec::with_capacity(fields.len());
let mut children = Vec::with_capacity(fields.len());
let values = values.as_any().downcast_ref::<UnionArray>().unwrap();
let type_ids = take_native(values.type_ids(), indices).into_inner();
for (type_id, field) in fields.iter() {
let values = values.child(type_id);
let values = take_impl(values, indices, None)?;
let field = (**field).clone();
children.push((field, values));
field_type_ids.push(type_id);
}
let array = UnionArray::try_new(field_type_ids.as_slice(), type_ids, None, children)?;
Ok(Arc::new(array))
}
t => unimplemented!("Take not supported for data type {:?}", t)
}
}
Expand Down Expand Up @@ -1949,4 +1964,41 @@ mod tests {
.collect::<Vec<_>>();
assert_eq!(&values, &[Some(23), Some(4), None, None])
}

#[test]
fn test_take_union() {
let structs = create_test_struct(vec![
Some((Some(true), Some(42))),
Some((Some(false), Some(28))),
Some((Some(false), Some(19))),
Some((Some(true), Some(31))),
None,
]);
let strings =
StringArray::from(vec![Some("a"), None, Some("c"), None, Some("d")]);
let type_ids = Buffer::from_slice_ref(vec![1i8; 5]);

let children: Vec<(Field, Arc<dyn Array>)> = vec![
(
Field::new("f1", structs.data_type().clone(), true),
Arc::new(structs),
),
(
Field::new("f2", strings.data_type().clone(), true),
Arc::new(strings),
),
];
let array = UnionArray::try_new(&[0, 1], type_ids, None, children).unwrap();

let indices = vec![0, 3, 1, 0, 2, 4];
let index = UInt32Array::from(indices.clone());
let actual = take(&array, &index, None).unwrap();
let actual = actual.as_any().downcast_ref::<UnionArray>().unwrap();
let strings = actual.child(1);
let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();

let actual = strings.iter().collect::<Vec<_>>();
let expected = vec![Some("a"), None, None, Some("a"), Some("c"), Some("d")];
assert_eq!(expected, actual);
}
}
2 changes: 1 addition & 1 deletion object_store/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ impl AsyncWrite for InMemoryAppend {

if let Some((bytes, _)) = writer.remove(&self.location) {
let buf = std::mem::take(&mut self.data);
let concat = Bytes::from_iter(bytes.into_iter().chain(buf.into_iter()));
let concat = Bytes::from_iter(bytes.into_iter().chain(buf));
writer.insert(self.location.clone(), (concat, Utc::now()));
} else {
writer.insert(
Expand Down
5 changes: 3 additions & 2 deletions object_store/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ where
D: serde::Deserializer<'de>,
{
let s: String = serde::Deserialize::deserialize(deserializer)?;
chrono::TimeZone::datetime_from_str(&chrono::Utc, &s, RFC1123_FMT)
.map_err(serde::de::Error::custom)
let naive = chrono::NaiveDateTime::parse_from_str(&s, RFC1123_FMT)
.map_err(serde::de::Error::custom)?;
Ok(chrono::TimeZone::from_utc_datetime(&chrono::Utc, &naive))
}

#[cfg(any(feature = "aws", feature = "azure"))]
Expand Down