Skip to content

Commit

Permalink
Additional documentation for normalize functions. Switched Schema
Browse files Browse the repository at this point in the history
… normalization to iterative approach.
  • Loading branch information
nglime committed Dec 31, 2024
1 parent 9c9c699 commit 4422add
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 90 deletions.
84 changes: 60 additions & 24 deletions arrow-array/src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,50 +397,86 @@ impl RecordBatch {
}

/// Normalize a semi-structured [`RecordBatch`] into a flat table.
/// If `max_level` is 0, normalizes all levels.
///
/// If max_level is 0, normalizes all levels.
/// # Example
///
/// ```
/// # use std::sync::Arc;
/// # use arrow_array::{ArrayRef, Int64Array, StringArray, StructArray, RecordBatch};
/// # use arrow_schema::{DataType, Field, Fields, Schema};
///
/// let animals: ArrayRef = Arc::new(StringArray::from(vec!["Parrot", ""]));
/// let n_legs: ArrayRef = Arc::new(Int64Array::from(vec![Some(2), Some(4)]));
///
/// let animals_field = Arc::new(Field::new("animals", DataType::Utf8, true));
/// let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, true));
///
/// let a = Arc::new(StructArray::from(vec![
/// (animals_field.clone(), Arc::new(animals.clone()) as ArrayRef),
/// (n_legs_field.clone(), Arc::new(n_legs.clone()) as ArrayRef),
/// ]));
///
/// let schema = Schema::new(vec![
/// Field::new(
/// "a",
/// DataType::Struct(Fields::from(vec![animals_field, n_legs_field])),
/// false,
/// )
/// ]);
///
/// let normalized = RecordBatch::try_new(Arc::new(schema), vec![a])
/// .expect("valid conversion")
/// .normalize(".", 0)
/// .expect("valid normalization");
///
/// let expected = RecordBatch::try_from_iter_with_nullable(vec![
/// ("a.animals", animals.clone(), true),
/// ("a.n_legs", n_legs.clone(), true),
/// ])
/// .expect("valid conversion");
///
/// assert_eq!(expected, normalized);
/// ```
pub fn normalize(&self, separator: &str, mut max_level: usize) -> Result<Self, ArrowError> {
if max_level == 0 {
max_level = usize::MAX;
}
if self.num_rows() == 0 {
// No data, only need to normalize the schema
return Ok(Self::new_empty(Arc::new(
self.schema.normalize(separator, max_level)?,
)));
}
let mut queue: VecDeque<(usize, (ArrayRef, FieldRef))> = VecDeque::new();

let mut queue: VecDeque<(usize, &ArrayRef, Vec<&str>, &DataType, bool)> = VecDeque::new();
for (c, f) in self.columns.iter().zip(self.schema.fields()) {
queue.push_back((0, ((*c).clone(), (*f).clone())));
let name_vec: Vec<&str> = vec![f.name()];
queue.push_back((0, c, name_vec, f.data_type(), f.is_nullable()));
}

let mut columns: Vec<ArrayRef> = Vec::new();
let mut fields: Vec<FieldRef> = Vec::new();

while let Some((depth, (c, f))) = queue.pop_front() {
while let Some((depth, c, name, data_type, nullable)) = queue.pop_front() {
if depth < max_level {
match f.data_type() {
match data_type {
DataType::Struct(ff) => {
// Need to zip these in reverse to maintain original order
for (cff, fff) in c.as_struct().columns().iter().zip(ff.into_iter()).rev() {
let new_key = format!("{}{}{}", f.name(), separator, fff.name());
let updated_field = Field::new(
new_key.as_str(),
fff.data_type().clone(),
let mut name = name.clone();
name.push(separator);
name.push(fff.name().as_str());
queue.push_front((
depth + 1,
cff,
name.clone(),
fff.data_type(),
fff.is_nullable(),
);
queue.push_front((depth + 1, (cff.clone(), Arc::new(updated_field))))
))
}
}
_ => {
columns.push(c);
fields.push(f);
let updated_field = Field::new(name.concat(), data_type.clone(), nullable);
columns.push(c.clone());
fields.push(Arc::new(updated_field));
}
}
} else {
columns.push(c);
fields.push(f);
let updated_field = Field::new(name.concat(), data_type.clone(), nullable);
fields.push(Arc::new(updated_field));
}
}
RecordBatch::try_new(Arc::new(Schema::new(fields)), columns)
Expand Down Expand Up @@ -1250,7 +1286,7 @@ mod tests {
}

#[test]
fn normalize() {
fn normalize_simple() {
let animals: ArrayRef = Arc::new(StringArray::from(vec!["Parrot", ""]));
let n_legs: ArrayRef = Arc::new(Int64Array::from(vec![Some(2), Some(4)]));
let year: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(2022)]));
Expand Down
129 changes: 63 additions & 66 deletions arrow-schema/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::hash::Hash;
use std::sync::Arc;
Expand Down Expand Up @@ -413,79 +413,76 @@ impl Schema {
&self.metadata
}

/// Returns a new schema, normalized based on the max_level
/// This carries metadata from the parent schema over as well
/// Returns a new schema, normalized based on the max_level field.
/// If `max_level` is 0, normalizes all levels.
///
/// This carries metadata from the parent schema over.
///
/// # Example
///
/// ```
/// # use std::sync::Arc;
/// # use arrow_schema::{DataType, Field, Fields, Schema};
///
/// let schema = Schema::new(vec![
/// Field::new(
/// "a",
/// DataType::Struct(Fields::from(vec![
/// Arc::new(Field::new("animals", DataType::Utf8, true)),
/// Arc::new(Field::new("n_legs", DataType::Int64, true)),
/// ])),
/// false,
/// ),
/// ])
/// .normalize(".", 0)
/// .expect("valid normalization");
///
/// let expected = Schema::new(vec![
/// Field::new("a.animals", DataType::Utf8, true),
/// Field::new("a.n_legs", DataType::Int64, true),
/// ]);
///
/// assert_eq!(schema, expected);
/// ```
pub fn normalize(&self, separator: &str, mut max_level: usize) -> Result<Self, ArrowError> {
if max_level == 0 {
max_level = usize::MAX;
}
let mut new_fields: Vec<FieldRef> = vec![];
for field in self.fields() {
match field.data_type() {
DataType::Struct(nested_fields) => {
let field_name = field.name().as_str();
new_fields = [
new_fields,
Self::normalizer(
nested_fields.to_vec(),
field_name,
separator,
max_level - 1,
),
]
.concat();
}
_ => new_fields.push(Arc::new(Field::new(
field.name(),
field.data_type().clone(),
field.is_nullable(),
))),
};
let mut queue: VecDeque<(usize, Vec<&str>, &DataType, bool)> = VecDeque::new();
for f in self.fields() {
let name_vec: Vec<&str> = vec![f.name()];
queue.push_back((0, name_vec, f.data_type(), f.is_nullable()));
}
Ok(Self::new_with_metadata(new_fields, self.metadata.clone()))
}

fn normalizer(
fields: Vec<FieldRef>,
key_string: &str,
separator: &str,
max_level: usize,
) -> Vec<FieldRef> {
let mut new_fields: Vec<FieldRef> = vec![];
if max_level > 0 {
for field in fields {
match field.data_type() {
DataType::Struct(nested_fields) => {
let field_name = field.name().as_str();
let new_key = format!("{key_string}{separator}{field_name}");
new_fields = [
new_fields,
Self::normalizer(
nested_fields.to_vec(),
new_key.as_str(),
separator,
max_level - 1,
),
]
.concat();
let mut fields: Vec<FieldRef> = Vec::new();

while let Some((depth, name, data_type, nullable)) = queue.pop_front() {
if depth < max_level {
match data_type {
DataType::Struct(ff) => {
// Need to zip these in reverse to maintain original order
for fff in ff.into_iter().rev() {
let mut name = name.clone();
name.push(separator);
name.push(fff.name().as_str());
queue.push_front((
depth + 1,
name.clone(),
fff.data_type(),
fff.is_nullable(),
))
}
}
_ => new_fields.push(Arc::new(Field::new(
format!("{key_string}{separator}{}", field.name()),
field.data_type().clone(),
field.is_nullable(),
))),
};
}
} else {
for field in fields {
new_fields.push(Arc::new(Field::new(
format!("{key_string}{separator}{}", field.name()),
field.data_type().clone(),
field.is_nullable(),
)));
_ => {
let updated_field = Field::new(name.concat(), data_type.clone(), nullable);
fields.push(Arc::new(updated_field));
}
}
} else {
let updated_field = Field::new(name.concat(), data_type.clone(), nullable);
fields.push(Arc::new(updated_field));
}
}
new_fields
Ok(Schema::new(fields))
}

/// Look up a column by name and return a immutable reference to the column along with
Expand Down

0 comments on commit 4422add

Please sign in to comment.