diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 7b930688506..260a451c27c 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -2177,6 +2177,89 @@ mod tests { assert_eq!(input_batch, output_batch); } + #[test] + fn test_unaligned() { + let batch = RecordBatch::try_from_iter(vec![( + "i32", + Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _, + )]) + .unwrap(); + + let gen = IpcDataGenerator {}; + #[allow(deprecated)] + let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true); + let (_, encoded) = gen + .encoded_batch(&batch, &mut dict_tracker, &Default::default()) + .unwrap(); + + let message = root_as_message(&encoded.ipc_message).unwrap(); + + // Construct an unaligned buffer + let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1); + buffer.push(0_u8); + buffer.extend_from_slice(&encoded.arrow_data); + let b = Buffer::from(buffer).slice(1); + assert_ne!(b.as_ptr().align_offset(8), 0); + + let ipc_batch = message.header_as_record_batch().unwrap(); + let roundtrip = ArrayReader::try_new( + &b, + ipc_batch, + batch.schema(), + &Default::default(), + &message.version(), + ) + .unwrap() + .with_require_alignment(false) + .read_record_batch() + .unwrap(); + assert_eq!(batch, roundtrip); + } + + #[test] + fn test_unaligned_throws_error_with_require_alignment() { + let batch = RecordBatch::try_from_iter(vec![( + "i32", + Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _, + )]) + .unwrap(); + + let gen = IpcDataGenerator {}; + #[allow(deprecated)] + let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true); + let (_, encoded) = gen + .encoded_batch(&batch, &mut dict_tracker, &Default::default()) + .unwrap(); + + let message = root_as_message(&encoded.ipc_message).unwrap(); + + // Construct an unaligned buffer + let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1); + buffer.push(0_u8); + buffer.extend_from_slice(&encoded.arrow_data); + let b = Buffer::from(buffer).slice(1); + assert_ne!(b.as_ptr().align_offset(8), 0); + + let ipc_batch = message.header_as_record_batch().unwrap(); + let result = ArrayReader::try_new( + &b, + ipc_batch, + batch.schema(), + &Default::default(), + &message.version(), + ) + .unwrap() + .with_require_alignment(true) + .read_record_batch(); + + let error = result.unwrap_err(); + assert_eq!( + error.to_string(), + "Invalid argument error: Misaligned buffers[0] in array of type Int32, \ + offset from expected alignment of 4 by 1" + ); + } + #[test] fn test_file_with_massive_column_count() { // 499_999 is upper limit for default settings (1_000_000) @@ -2332,89 +2415,4 @@ mod tests { assert_eq!(decoded_batch.expect("Failed to read RecordBatch"), batch); }); } - - #[test] - fn test_unaligned() { - let batch = RecordBatch::try_from_iter(vec![( - "i32", - Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _, - )]) - .unwrap(); - - let gen = IpcDataGenerator {}; - #[allow(deprecated)] - let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true); - let (_, encoded) = gen - .encoded_batch(&batch, &mut dict_tracker, &Default::default()) - .unwrap(); - - let message = root_as_message(&encoded.ipc_message).unwrap(); - - // Construct an unaligned buffer - let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1); - buffer.push(0_u8); - buffer.extend_from_slice(&encoded.arrow_data); - let b = Buffer::from(buffer).slice(1); - assert_ne!(b.as_ptr().align_offset(8), 0); - - let ipc_batch = message.header_as_record_batch().unwrap(); - - let roundtrip = ArrayReader::try_new( - &b, - ipc_batch, - batch.schema(), - &Default::default(), - &message.version(), - ) - .unwrap() - .with_require_alignment(false) - .read_record_batch() - .unwrap(); - assert_eq!(batch, roundtrip); - } - - #[test] - fn test_unaligned_throws_error_with_require_alignment() { - let batch = RecordBatch::try_from_iter(vec![( - "i32", - Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _, - )]) - .unwrap(); - - let gen = IpcDataGenerator {}; - #[allow(deprecated)] - let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true); - let (_, encoded) = gen - .encoded_batch(&batch, &mut dict_tracker, &Default::default()) - .unwrap(); - - let message = root_as_message(&encoded.ipc_message).unwrap(); - - // Construct an unaligned buffer - let mut buffer = MutableBuffer::with_capacity(encoded.arrow_data.len() + 1); - buffer.push(0_u8); - buffer.extend_from_slice(&encoded.arrow_data); - let b = Buffer::from(buffer).slice(1); - assert_ne!(b.as_ptr().align_offset(8), 0); - - let ipc_batch = message.header_as_record_batch().unwrap(); - - let result = ArrayReader::try_new( - &b, - ipc_batch, - batch.schema(), - &Default::default(), - &message.version(), - ) - .unwrap() - .with_require_alignment(true) - .read_record_batch(); - - let error = result.unwrap_err(); - assert_eq!( - error.to_string(), - "Invalid argument error: Misaligned buffers[0] in array of type Int32, \ - offset from expected alignment of 4 by 1" - ); - } }