From 38d3d8824a01bcb2af513a6c82b21939b2fd418a Mon Sep 17 00:00:00 2001 From: James Gill Date: Tue, 15 Oct 2024 11:09:18 -0400 Subject: [PATCH] Add support for de/serializing list-encoded JSON structs [#6558] Currently, a StructArray can only be deserialized from or serialized to a JSON object (e.g. `{a: 1, b: "c"}`), but some services (e.g. Presto and Trino) encode ROW types as JSON lists (e.g. `[1, "c"]`) because this is more compact, and the schema is known. This PR adds the ability to encode and decode JSON lists from and to StructArrays, if StructMode is set to ListOnly. In ListOnly mode, object-encoded structs raise an error. Setting to ObjectOnly (the default) has the original parsing behavior. Some notes/questions/points for discussion: 1. I've made a JsonParseMode struct instead of a bool flag for two reasons. One is that it's self-descriptive (what would `true` be?), and the other is that it allows a future Mixed mode that could deserialize either. The latter isn't currently requested by anyone. 2. I kept the error messages as similar to the old messages as possible. I considered having more specific error messages (like "Encountered a '[' when parsing a Struct, but the StructParseMode is ObjectOnly" or similar), but wanted to hear opinions before I went that route. 3. I'm not attached to any name/code-style/etc, so happy to modify to fit local conventions. Fixes #6558 --- arrow-json/src/lib.rs | 85 +++++++++ arrow-json/src/reader/list_array.rs | 3 + arrow-json/src/reader/map_array.rs | 4 + arrow-json/src/reader/mod.rs | 258 +++++++++++++++++++++++++- arrow-json/src/reader/struct_array.rs | 106 +++++++---- arrow-json/src/writer/encoder.rs | 24 ++- arrow-json/src/writer/mod.rs | 100 +++++++++- 7 files changed, 536 insertions(+), 44 deletions(-) diff --git a/arrow-json/src/lib.rs b/arrow-json/src/lib.rs index b6c441012b2a..05cf2051f42e 100644 --- a/arrow-json/src/lib.rs +++ b/arrow-json/src/lib.rs @@ -74,6 +74,23 @@ pub use self::writer::{ArrayWriter, LineDelimitedWriter, Writer, WriterBuilder}; use half::f16; use serde_json::{Number, Value}; +/// Specifies what is considered valid JSON when parsing StructArrays. +/// +/// If a struct with fields `("a", Int32)` and `("b", Utf8)`, it could be represented as +/// a JSON object (`{"a": 1, "b": "c"}`) or a JSON list (`[1, "c"]`). This enum controls +/// which form(s) the Reader will accept. +/// +/// For objects, the order of the key does not matter. +/// For lists, the entries must be the same number and in the same order as the struct fields. +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] +pub enum StructMode { + #[default] + /// Encode/decode structs as objects (e.g., {"a": 1, "b": "c"}) + ObjectOnly, + /// Encode/decode structs as lists (e.g., [1, "c"]) + ListOnly, +} + /// Trait declaring any type that is serializable to JSON. This includes all primitive types (bool, i32, etc.). pub trait JsonSerializable: 'static { /// Converts self into json value if its possible @@ -156,4 +173,72 @@ mod tests { ); assert_eq!(None, f32::NAN.into_json_value()); } + + #[test] + fn test_json_roundtrip_structs() { + use crate::writer::LineDelimited; + use arrow_schema::DataType; + use arrow_schema::Field; + use arrow_schema::Fields; + use arrow_schema::Schema; + use std::sync::Arc; + + let schema = Arc::new(Schema::new(vec![ + Field::new( + "c1", + DataType::Struct(Fields::from(vec![ + Field::new("c11", DataType::Int32, true), + Field::new( + "c12", + DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()), + false, + ), + ])), + false, + ), + Field::new("c2", DataType::Utf8, false), + ])); + + { + let object_input = r#"{"c1":{"c11":1,"c12":{"c121":"e"}},"c2":"a"} +{"c1":{"c12":{"c121":"f"}},"c2":"b"} +{"c1":{"c11":5,"c12":{"c121":"g"}},"c2":"c"} +"# + .as_bytes(); + let object_reader = ReaderBuilder::new(schema.clone()) + .with_struct_mode(StructMode::ObjectOnly) + .build(object_input) + .unwrap(); + + let mut object_output: Vec = Vec::new(); + let mut object_writer = WriterBuilder::new() + .with_struct_mode(StructMode::ObjectOnly) + .build::<_, LineDelimited>(&mut object_output); + for batch_res in object_reader { + object_writer.write(&batch_res.unwrap()).unwrap(); + } + assert_eq!(object_input, &object_output); + } + + { + let list_input = r#"[[1,["e"]],"a"] +[[null,["f"]],"b"] +[[5,["g"]],"c"] +"# + .as_bytes(); + let list_reader = ReaderBuilder::new(schema.clone()) + .with_struct_mode(StructMode::ListOnly) + .build(list_input) + .unwrap(); + + let mut list_output: Vec = Vec::new(); + let mut list_writer = WriterBuilder::new() + .with_struct_mode(StructMode::ListOnly) + .build::<_, LineDelimited>(&mut list_output); + for batch_res in list_reader { + list_writer.write(&batch_res.unwrap()).unwrap(); + } + assert_eq!(list_input, &list_output); + } + } } diff --git a/arrow-json/src/reader/list_array.rs b/arrow-json/src/reader/list_array.rs index b6f8c18ea9c3..1a1dee6a23d4 100644 --- a/arrow-json/src/reader/list_array.rs +++ b/arrow-json/src/reader/list_array.rs @@ -17,6 +17,7 @@ use crate::reader::tape::{Tape, TapeElement}; use crate::reader::{make_decoder, ArrayDecoder}; +use crate::StructMode; use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder}; use arrow_array::OffsetSizeTrait; use arrow_buffer::buffer::NullBuffer; @@ -37,6 +38,7 @@ impl ListArrayDecoder { coerce_primitive: bool, strict_mode: bool, is_nullable: bool, + struct_mode: StructMode, ) -> Result { let field = match &data_type { DataType::List(f) if !O::IS_LARGE => f, @@ -48,6 +50,7 @@ impl ListArrayDecoder { coerce_primitive, strict_mode, field.is_nullable(), + struct_mode, )?; Ok(Self { diff --git a/arrow-json/src/reader/map_array.rs b/arrow-json/src/reader/map_array.rs index cd1ca5f71fa9..ee78373a551e 100644 --- a/arrow-json/src/reader/map_array.rs +++ b/arrow-json/src/reader/map_array.rs @@ -17,6 +17,7 @@ use crate::reader::tape::{Tape, TapeElement}; use crate::reader::{make_decoder, ArrayDecoder}; +use crate::StructMode; use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder}; use arrow_buffer::buffer::NullBuffer; use arrow_buffer::ArrowNativeType; @@ -36,6 +37,7 @@ impl MapArrayDecoder { coerce_primitive: bool, strict_mode: bool, is_nullable: bool, + struct_mode: StructMode, ) -> Result { let fields = match &data_type { DataType::Map(_, true) => { @@ -59,12 +61,14 @@ impl MapArrayDecoder { coerce_primitive, strict_mode, fields[0].is_nullable(), + struct_mode, )?; let values = make_decoder( fields[1].data_type().clone(), coerce_primitive, strict_mode, fields[1].is_nullable(), + struct_mode, )?; Ok(Self { diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs index f857e8813c7e..31ac90537d9a 100644 --- a/arrow-json/src/reader/mod.rs +++ b/arrow-json/src/reader/mod.rs @@ -133,6 +133,7 @@ //! ``` //! +use crate::StructMode; use std::io::BufRead; use std::sync::Arc; @@ -176,6 +177,7 @@ pub struct ReaderBuilder { coerce_primitive: bool, strict_mode: bool, is_field: bool, + struct_mode: StructMode, schema: SchemaRef, } @@ -195,6 +197,7 @@ impl ReaderBuilder { coerce_primitive: false, strict_mode: false, is_field: false, + struct_mode: StructMode::ObjectOnly, schema, } } @@ -235,6 +238,7 @@ impl ReaderBuilder { coerce_primitive: false, strict_mode: false, is_field: true, + struct_mode: StructMode::ObjectOnly, schema: Arc::new(Schema::new([field.into()])), } } @@ -262,6 +266,26 @@ impl ReaderBuilder { } } + /// Set the [`StructMode`] for the reader, which determines whether + /// structs can be represented by JSON objects, lists, or either. + /// + /// For example, if the RecordBatch Schema is + /// `[("a", Int32), ("r", Struct([("b", Boolean), ("c", Utf8)]))]` + /// then [`StructMode::ObjectOnly`] would read rows of the form + /// `{"a": 1, "r": {"b": true, "c": "cat"}}` + /// while ['StructMode::ListOnly'] would read rows of the form + /// `[1, [true, "cat"]]` + /// + /// JSON data of this form is returned by + /// [Presto](https://prestodb.io/docs/current/develop/client-protocol.html#important-queryresults-attributes) + /// and [Trino](https://trino.io/docs/current/develop/client-protocol.html#important-queryresults-attributes). + pub fn with_struct_mode(self, struct_mode: StructMode) -> Self { + Self { + struct_mode, + ..self + } + } + /// Create a [`Reader`] with the provided [`BufRead`] pub fn build(self, reader: R) -> Result, ArrowError> { Ok(Reader { @@ -280,7 +304,13 @@ impl ReaderBuilder { } }; - let decoder = make_decoder(data_type, self.coerce_primitive, self.strict_mode, nullable)?; + let decoder = make_decoder( + data_type, + self.coerce_primitive, + self.strict_mode, + nullable, + self.struct_mode, + )?; let num_fields = self.schema.flattened_fields().len(); @@ -643,6 +673,7 @@ fn make_decoder( coerce_primitive: bool, strict_mode: bool, is_nullable: bool, + struct_mode: StructMode, ) -> Result, ArrowError> { downcast_integer! { data_type => (primitive_decoder, data_type), @@ -693,13 +724,13 @@ fn make_decoder( DataType::Boolean => Ok(Box::::default()), DataType::Utf8 => Ok(Box::new(StringArrayDecoder::::new(coerce_primitive))), DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::::new(coerce_primitive))), - DataType::List(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, is_nullable)?)), - DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, is_nullable)?)), - DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable)?)), + DataType::List(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)), + DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)), + DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)), DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => { Err(ArrowError::JsonError(format!("{data_type} is not supported by JSON"))) } - DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable)?)), + DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)), d => Err(ArrowError::NotYetImplemented(format!("Support for {d} in JSON reader"))) } } @@ -715,7 +746,7 @@ mod tests { use arrow_buffer::{ArrowNativeType, Buffer}; use arrow_cast::display::{ArrayFormatter, FormatOptions}; use arrow_data::ArrayDataBuilder; - use arrow_schema::Field; + use arrow_schema::{Field, Fields}; use super::*; @@ -2343,4 +2374,219 @@ mod tests { .unwrap() ); } + + #[test] + fn test_struct_decoding_list_length() { + use arrow_array::array; + + let row = "[1, 2]"; + + let mut fields = vec![Field::new("a", DataType::Int32, true)]; + let too_few_fields = Fields::from(fields.clone()); + fields.push(Field::new("b", DataType::Int32, true)); + let correct_fields = Fields::from(fields.clone()); + fields.push(Field::new("c", DataType::Int32, true)); + let too_many_fields = Fields::from(fields.clone()); + + let parse = |fields: Fields, as_field: bool| { + let builder = if as_field { + ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true)) + } else { + ReaderBuilder::new(Arc::new(Schema::new(fields))) + }; + builder + .with_struct_mode(StructMode::ListOnly) + .build(Cursor::new(row.as_bytes())) + .unwrap() + .next() + .unwrap() + }; + + let expected_row = StructArray::new( + correct_fields.clone(), + vec![ + Arc::new(array::Int32Array::from(vec![1])), + Arc::new(array::Int32Array::from(vec![2])), + ], + None, + ); + let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true); + + assert_eq!( + parse(too_few_fields.clone(), true).unwrap_err().to_string(), + "Json error: found extra columns for 1 fields".to_string() + ); + assert_eq!( + parse(too_few_fields, false).unwrap_err().to_string(), + "Json error: found extra columns for 1 fields".to_string() + ); + assert_eq!( + parse(correct_fields.clone(), true).unwrap(), + RecordBatch::try_new( + Arc::new(Schema::new(vec![row_field])), + vec![Arc::new(expected_row.clone())] + ) + .unwrap() + ); + assert_eq!( + parse(correct_fields, false).unwrap(), + RecordBatch::from(expected_row) + ); + assert_eq!( + parse(too_many_fields.clone(), true) + .unwrap_err() + .to_string(), + "Json error: found 2 columns for 3 fields".to_string() + ); + assert_eq!( + parse(too_many_fields, false).unwrap_err().to_string(), + "Json error: found 2 columns for 3 fields".to_string() + ); + } + + #[test] + fn test_struct_decoding() { + use arrow_array::builder; + + let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#; + let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#; + let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#; + + let struct_fields = Fields::from(vec![ + Field::new("b", DataType::new_list(DataType::Int32, true), true), + Field::new_map( + "c", + "entries", + Field::new("keys", DataType::Utf8, false), + Field::new("values", DataType::Int32, true), + false, + false, + ), + ]); + + let list_array = + ListArray::from_iter_primitive::(vec![Some(vec![Some(1), Some(2)])]); + + let map_array = { + let mut map_builder = builder::MapBuilder::new( + None, + builder::StringBuilder::new(), + builder::Int32Builder::new(), + ); + map_builder.keys().append_value("d"); + map_builder.values().append_value(3); + map_builder.append(true).unwrap(); + map_builder.finish() + }; + + let struct_array = StructArray::new( + struct_fields.clone(), + vec![Arc::new(list_array), Arc::new(map_array)], + None, + ); + + let schema = Arc::new(Schema::new(vec![Field::new( + "a", + DataType::Struct(struct_fields), + true, + )])); + let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap(); + + let parse = |s: &str, mode: StructMode| { + ReaderBuilder::new(schema.clone()) + .with_struct_mode(mode) + .build(Cursor::new(s.as_bytes())) + .unwrap() + .next() + .unwrap() + }; + + assert_eq!( + parse(nested_object_json, StructMode::ObjectOnly).unwrap(), + expected + ); + assert_eq!( + parse(nested_list_json, StructMode::ObjectOnly) + .unwrap_err() + .to_string(), + "Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned() + ); + assert_eq!( + parse(nested_mixed_json, StructMode::ObjectOnly) + .unwrap_err() + .to_string(), + "Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned() + ); + + assert_eq!( + parse(nested_list_json, StructMode::ListOnly).unwrap(), + expected + ); + assert_eq!( + parse(nested_object_json, StructMode::ListOnly) + .unwrap_err() + .to_string(), + "Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned() + ); + assert_eq!( + parse(nested_mixed_json, StructMode::ListOnly) + .unwrap_err() + .to_string(), + "Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned() + ); + } + + // Test cases: + // [] -> RecordBatch row with no entries. Schema = [('a', Int32)] -> Error + // [] -> RecordBatch row with no entries. Schema = [('r', [('a', Int32)])] -> Error + // [] -> StructArray row with no entries. Fields [('a', Int32')] -> Error + // [[]] -> RecordBatch row with empty struct entry. Schema = [('r', [('a', Int32)])] -> Error + #[test] + fn test_struct_decoding_empty_list() { + let int_field = Field::new("a", DataType::Int32, true); + let struct_field = Field::new( + "r", + DataType::Struct(Fields::from(vec![int_field.clone()])), + true, + ); + + let parse = |json: &str, as_field: bool, field: Field| { + let builder = if as_field { + ReaderBuilder::new_with_field(field.clone()) + } else { + ReaderBuilder::new(Arc::new(Schema::new(vec![field].clone()))) + }; + builder + .with_struct_mode(StructMode::ListOnly) + .build(Cursor::new(json.as_bytes())) + .unwrap() + .next() + .unwrap() + }; + + assert_eq!( + parse("[]", true, struct_field.clone()) + .unwrap_err() + .to_string(), + "Json error: found 0 columns for 1 fields".to_owned() + ); + assert_eq!( + parse("[]", false, int_field.clone()) + .unwrap_err() + .to_string(), + "Json error: found 0 columns for 1 fields".to_owned() + ); + assert_eq!( + parse("[]", false, struct_field.clone()) + .unwrap_err() + .to_string(), + "Json error: found 0 columns for 1 fields".to_owned() + ); + assert_eq!( + parse("[[]]", false, struct_field.clone()) + .unwrap_err() + .to_string(), + "Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned() + ); + } } diff --git a/arrow-json/src/reader/struct_array.rs b/arrow-json/src/reader/struct_array.rs index 6c805591d390..28ba14dcee8a 100644 --- a/arrow-json/src/reader/struct_array.rs +++ b/arrow-json/src/reader/struct_array.rs @@ -16,7 +16,7 @@ // under the License. use crate::reader::tape::{Tape, TapeElement}; -use crate::reader::{make_decoder, ArrayDecoder}; +use crate::reader::{make_decoder, ArrayDecoder, StructMode}; use arrow_array::builder::BooleanBufferBuilder; use arrow_buffer::buffer::NullBuffer; use arrow_data::{ArrayData, ArrayDataBuilder}; @@ -27,6 +27,7 @@ pub struct StructArrayDecoder { decoders: Vec>, strict_mode: bool, is_nullable: bool, + struct_mode: StructMode, } impl StructArrayDecoder { @@ -35,6 +36,7 @@ impl StructArrayDecoder { coerce_primitive: bool, strict_mode: bool, is_nullable: bool, + struct_mode: StructMode, ) -> Result { let decoders = struct_fields(&data_type) .iter() @@ -48,6 +50,7 @@ impl StructArrayDecoder { coerce_primitive, strict_mode, nullable, + struct_mode, ) }) .collect::, ArrowError>>()?; @@ -57,6 +60,7 @@ impl StructArrayDecoder { decoders, strict_mode, is_nullable, + struct_mode, }) } } @@ -70,43 +74,81 @@ impl ArrayDecoder for StructArrayDecoder { .is_nullable .then(|| BooleanBufferBuilder::new(pos.len())); - for (row, p) in pos.iter().enumerate() { - let end_idx = match (tape.get(*p), nulls.as_mut()) { - (TapeElement::StartObject(end_idx), None) => end_idx, - (TapeElement::StartObject(end_idx), Some(nulls)) => { - nulls.append(true); - end_idx - } - (TapeElement::Null, Some(nulls)) => { - nulls.append(false); - continue; - } - _ => return Err(tape.error(*p, "{")), - }; - - let mut cur_idx = *p + 1; - while cur_idx < end_idx { - // Read field name - let field_name = match tape.get(cur_idx) { - TapeElement::String(s) => tape.get_string(s), - _ => return Err(tape.error(cur_idx, "field name")), + if self.struct_mode == StructMode::ObjectOnly { + for (row, p) in pos.iter().enumerate() { + let end_idx = match (tape.get(*p), nulls.as_mut()) { + (TapeElement::StartObject(end_idx), None) => end_idx, + (TapeElement::StartObject(end_idx), Some(nulls)) => { + nulls.append(true); + end_idx + } + (TapeElement::Null, Some(nulls)) => { + nulls.append(false); + continue; + } + (_, _) => return Err(tape.error(*p, "{")), }; - // Update child pos if match found - match fields.iter().position(|x| x.name() == field_name) { - Some(field_idx) => child_pos[field_idx][row] = cur_idx + 1, - None => { - if self.strict_mode { - return Err(ArrowError::JsonError(format!( - "column '{}' missing from schema", - field_name - ))); + let mut cur_idx = *p + 1; + while cur_idx < end_idx { + // Read field name + let field_name = match tape.get(cur_idx) { + TapeElement::String(s) => tape.get_string(s), + _ => return Err(tape.error(cur_idx, "field name")), + }; + + // Update child pos if match found + match fields.iter().position(|x| x.name() == field_name) { + Some(field_idx) => child_pos[field_idx][row] = cur_idx + 1, + None => { + if self.strict_mode { + return Err(ArrowError::JsonError(format!( + "column '{}' missing from schema", + field_name + ))); + } } } + // Advance to next field + cur_idx = tape.next(cur_idx + 1, "field value")?; } + } + } else { + for (row, p) in pos.iter().enumerate() { + let end_idx = match (tape.get(*p), nulls.as_mut()) { + (TapeElement::StartList(end_idx), None) => end_idx, + (TapeElement::StartList(end_idx), Some(nulls)) => { + nulls.append(true); + end_idx + } + (TapeElement::Null, Some(nulls)) => { + nulls.append(false); + continue; + } + (_, _) => return Err(tape.error(*p, "[")), + }; - // Advance to next field - cur_idx = tape.next(cur_idx + 1, "field value")?; + let mut cur_idx = *p + 1; + let mut entry_idx = 0; + while cur_idx < end_idx { + if entry_idx >= fields.len() { + return Err(ArrowError::JsonError(format!( + "found extra columns for {} fields", + fields.len() + ))); + } + child_pos[entry_idx][row] = cur_idx; + entry_idx += 1; + // Advance to next field + cur_idx = tape.next(cur_idx, "field value")?; + } + if entry_idx != fields.len() { + return Err(ArrowError::JsonError(format!( + "found {} columns for {} fields", + entry_idx, + fields.len() + ))); + } } } diff --git a/arrow-json/src/writer/encoder.rs b/arrow-json/src/writer/encoder.rs index ed430fe6a1ec..e2ba08c186ad 100644 --- a/arrow-json/src/writer/encoder.rs +++ b/arrow-json/src/writer/encoder.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::StructMode; use arrow_array::cast::AsArray; use arrow_array::types::*; use arrow_array::*; @@ -29,6 +30,7 @@ use std::io::Write; #[derive(Debug, Clone, Default)] pub struct EncoderOptions { pub explicit_nulls: bool, + pub struct_mode: StructMode, } /// A trait to format array values as JSON values @@ -135,6 +137,7 @@ fn make_encoder_impl<'a>( let encoder = StructArrayEncoder{ encoders, explicit_nulls: options.explicit_nulls, + struct_mode: options.struct_mode, }; (Box::new(encoder) as _, array.nulls().cloned()) } @@ -172,6 +175,7 @@ struct FieldEncoder<'a> { struct StructArrayEncoder<'a> { encoders: Vec>, explicit_nulls: bool, + struct_mode: StructMode, } /// This API is only stable since 1.70 so can't use it when current MSRV is lower @@ -185,11 +189,16 @@ fn is_some_and(opt: Option, f: impl FnOnce(T) -> bool) -> bool { impl Encoder for StructArrayEncoder<'_> { fn encode(&mut self, idx: usize, out: &mut Vec) { - out.push(b'{'); + match self.struct_mode { + StructMode::ObjectOnly => out.push(b'{'), + StructMode::ListOnly => out.push(b'['), + } let mut is_first = true; + // Nulls can only be dropped in explicit mode + let drop_nulls = (self.struct_mode == StructMode::ObjectOnly) && !self.explicit_nulls; for field_encoder in &mut self.encoders { let is_null = is_some_and(field_encoder.nulls.as_ref(), |n| n.is_null(idx)); - if is_null && !self.explicit_nulls { + if is_null && drop_nulls { continue; } @@ -198,15 +207,20 @@ impl Encoder for StructArrayEncoder<'_> { } is_first = false; - encode_string(field_encoder.field.name(), out); - out.push(b':'); + if self.struct_mode == StructMode::ObjectOnly { + encode_string(field_encoder.field.name(), out); + out.push(b':'); + } match is_null { true => out.extend_from_slice(b"null"), false => field_encoder.encoder.encode(idx, out), } } - out.push(b'}'); + match self.struct_mode { + StructMode::ObjectOnly => out.push(b'}'), + StructMode::ListOnly => out.push(b']'), + } } } diff --git a/arrow-json/src/writer/mod.rs b/arrow-json/src/writer/mod.rs index ee6d83a0a1f0..eb96f5e3653c 100644 --- a/arrow-json/src/writer/mod.rs +++ b/arrow-json/src/writer/mod.rs @@ -108,6 +108,7 @@ mod encoder; use std::{fmt::Debug, io::Write}; +use crate::StructMode; use arrow_array::*; use arrow_schema::*; @@ -247,12 +248,43 @@ impl WriterBuilder { /// {"foo":null,"bar":null} /// ``` /// - /// Default is to skip nulls (set to `false`). + /// Default is to skip nulls (set to `false`). If `struct_mode == ListOnly`, + /// nulls will be written explicitly regardless of this setting. pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self { self.0.explicit_nulls = explicit_nulls; self } + /// Returns if this writer is configured to write structs as JSON Objects or Arrays. + pub fn struct_mode(&self) -> StructMode { + self.0.struct_mode + } + + /// Set whether to write structs as JSON Objects or Lists. + /// + /// For example, a writer (with [`LineDelimited`] format) writing the schema + /// `[("a", Int32), ("m": Struct)] would write with + /// `StructMode::ObjectOnly`: + /// + /// ```json + /// {"a": 1, "m": {"b": true, "c": "cat"}} + /// ``` + /// + /// With `StructMode::ListOnly`: + /// + /// ```json + /// [1, [true, "cat"]] + /// ``` + /// + /// Map columns are not affected by this option. + /// + /// Default is to use `ObjectOnly`. If this is set to `ListOnly`, nulls will + /// be written explicitly regardless of the `explicit_nulls` setting. + pub fn with_struct_mode(mut self, struct_mode: StructMode) -> Self { + self.0.struct_mode = struct_mode; + self + } + /// Create a new `Writer` with specified `JsonFormat` and builder options. pub fn build(self, writer: W) -> Writer where @@ -1953,4 +1985,70 @@ mod tests { "#, ); } + + #[test] + fn write_structs_as_list() { + let schema = Schema::new(vec![ + Field::new( + "c1", + DataType::Struct(Fields::from(vec![ + Field::new("c11", DataType::Int32, true), + Field::new( + "c12", + DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()), + false, + ), + ])), + false, + ), + Field::new("c2", DataType::Utf8, false), + ]); + + let c1 = StructArray::from(vec![ + ( + Arc::new(Field::new("c11", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as ArrayRef, + ), + ( + Arc::new(Field::new( + "c12", + DataType::Struct(vec![Field::new("c121", DataType::Utf8, false)].into()), + false, + )), + Arc::new(StructArray::from(vec![( + Arc::new(Field::new("c121", DataType::Utf8, false)), + Arc::new(StringArray::from(vec![Some("e"), Some("f"), Some("g")])) as ArrayRef, + )])) as ArrayRef, + ), + ]); + let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]); + + let batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)]).unwrap(); + + let expected = r#"[[1,["e"]],"a"] +[[null,["f"]],"b"] +[[5,["g"]],"c"] +"#; + + let mut buf = Vec::new(); + { + let builder = WriterBuilder::new() + .with_explicit_nulls(true) + .with_struct_mode(StructMode::ListOnly); + let mut writer = builder.build::<_, LineDelimited>(&mut buf); + writer.write_batches(&[&batch]).unwrap(); + } + assert_json_eq(&buf, expected); + + let mut buf = Vec::new(); + { + let builder = WriterBuilder::new() + .with_explicit_nulls(false) + .with_struct_mode(StructMode::ListOnly); + let mut writer = builder.build::<_, LineDelimited>(&mut buf); + writer.write_batches(&[&batch]).unwrap(); + } + assert_json_eq(&buf, expected); + } }