Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(ci): Ensure integration workflow passes #643

Merged
merged 11 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ services:
- ${NANOARROW_DOCKER_SOURCE_DIR}:/arrow-integration/nanoarrow
environment:
ARCHERY_INTEGRATION_TARGET_IMPLEMENTATIONS: "nanoarrow"
# Rust writes invalid flatbuffers:
# https://github.com/apache/arrow-rs/issues/5052
ARCHERY_INTEGRATION_WITH_RUST: "0"
command:
["echo '::group::Build nanoarrow' &&
conda run --no-capture-output /arrow-integration/ci/scripts/nanoarrow_build.sh /arrow-integration /build &&
Expand Down
16 changes: 9 additions & 7 deletions src/nanoarrow/ipc/decoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -1024,12 +1024,11 @@ static inline int ArrowIpcDecoderReadHeaderPrefix(struct ArrowIpcDecoder* decode

ArrowErrorCode ArrowIpcDecoderPeekHeader(struct ArrowIpcDecoder* decoder,
struct ArrowBufferView data,
int32_t* prefix_size_bytes,
struct ArrowError* error) {
ArrowIpcDecoderResetHeaderInfo(decoder);
int32_t prefix_size_bytes;
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderReadHeaderPrefix(decoder, &data, &prefix_size_bytes, error));
NANOARROW_UNUSED(prefix_size_bytes);
ArrowIpcDecoderReadHeaderPrefix(decoder, &data, prefix_size_bytes, error));
return NANOARROW_OK;
}

Expand Down Expand Up @@ -1187,15 +1186,15 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder,
decoder->body_size_bytes = ns(Message_bodyLength(message));

switch (decoder->metadata_version) {
case ns(MetadataVersion_V5):
case ns(MetadataVersion_V4):
case ns(MetadataVersion_V5):
break;
case ns(MetadataVersion_V1):
case ns(MetadataVersion_V2):
case ns(MetadataVersion_V3):
ArrowErrorSet(error, "Expected metadata version V4 or V5 but found %s",
ns(MetadataVersion_name(ns(Message_version(message)))));
return EINVAL;
case ns(MetadataVersion_V1):
case ns(MetadataVersion_V2):
case ns(MetadataVersion_V3):
default:
ArrowErrorSet(error, "Unexpected value for Message metadata version (%d)",
decoder->metadata_version);
Expand Down Expand Up @@ -1286,6 +1285,9 @@ ArrowErrorCode ArrowIpcDecoderDecodeFooter(struct ArrowIpcDecoder* decoder,
data.data.as_uint8 + data.size_bytes - footer_and_size_and_magic_size;
ns(Footer_table_t) footer = ns(Footer_as_root(footer_data));

NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderDecodeSchemaHeader(decoder, ns(Footer_schema(footer)), error));

NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeSchemaImpl(
ns(Footer_schema(footer)), &private_data->footer.schema, error));

Expand Down
5 changes: 4 additions & 1 deletion src/nanoarrow/ipc/decoder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,10 @@ TEST(NanoarrowIpcTest, NanoarrowIpcPeekSimpleSchema) {
data.size_bytes = sizeof(kSimpleSchema);

ArrowIpcDecoderInit(&decoder);
EXPECT_EQ(ArrowIpcDecoderPeekHeader(&decoder, data, &error), NANOARROW_OK);
int32_t prefix_size_bytes = 0;
EXPECT_EQ(ArrowIpcDecoderPeekHeader(&decoder, data, &prefix_size_bytes, &error),
NANOARROW_OK);
EXPECT_EQ(prefix_size_bytes, 8);
EXPECT_EQ(decoder.header_size_bytes, sizeof(kSimpleSchema));
EXPECT_EQ(decoder.body_size_bytes, 0);

Expand Down
52 changes: 46 additions & 6 deletions src/nanoarrow/ipc/reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,20 @@ static int ArrowIpcArrayStreamReaderNextHeader(
// propagated higher (e.g., if the stream is empty and there's no schema message)
ArrowErrorSet(&private_data->error, "No data available on stream");
return ENODATA;
} else if (bytes_read == 4) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this special case should specifically assert the metadata version we're currently reading. It's never valid for a V5 message to omit the continuation and a V4 message is invalid if it includes that.

And we can't write:

Suggested change
} else if (bytes_read == 4) {
// DON'T commit; broken!
} else if (private_data->decoder.metadata_version == NANOARROW_IPC_METADATA_VERSION_V4 && bytes_read == 4) {

because these peek etc functions are used before we've fully unpacked the schema to know what the metadata version is.

So I think what needs to happen is: the first time a decoder peeks a header and observes the lack of a continuation, we set ArrowIpcDecoderPrivate::prefix_length = 4. Later when we decode the metadata version, we can raise an error if the prefix length is not as expected. Also we should raise if the prefix length is inconsistent in any future message peeked by the decoder.

As a pleasant side effect, there will be one fewer argument for you to pass around :D

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a fun turn of events, not all V4 metadata messages have a 4 byte prefix size (i.e., not in the 0.17.1 golden files). I think I included all the other checks you suggested! (And also opened an issue with instructions for how to skip those cases in the future should we decide they don't need to be supported): #648

// Special case very, very old IPC streams that used 0x00000000 as the
// end-of-stream indicator.
uint32_t last_four_bytes = 0;
memcpy(&last_four_bytes, private_data->header.data, sizeof(uint32_t));
if (last_four_bytes == 0) {
ArrowErrorSet(&private_data->error, "No data available on stream");
return ENODATA;
} else {
ArrowErrorSet(&private_data->error,
"Expected 0x00000000 if exactly four bytes are available at the end "
"of a stream");
return EINVAL;
}
} else if (bytes_read != 8) {
ArrowErrorSet(&private_data->error,
"Expected at least 8 bytes in remainder of stream");
Expand All @@ -241,17 +255,43 @@ static int ArrowIpcArrayStreamReaderNextHeader(
input_view.size_bytes = private_data->header.size_bytes;

// Use PeekHeader to fill in decoder.header_size_bytes
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderPeekHeader(&private_data->decoder, input_view,
&private_data->error));
int32_t prefix_size_bytes = 0;
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderPeekHeader(
&private_data->decoder, input_view, &prefix_size_bytes, &private_data->error));

// Legacy streams are missing the 0xFFFFFFFF at the start of the message. The
// decoder can handle this; however, verification will fail because flatbuffers
// must be 8-byte aligned. To handle this case, we prepend the continuation
// token to the start of the stream and ensure that we read four fewer bytes
// the next time we issue a read.
int64_t extra_bytes_already_read = 0;
if (prefix_size_bytes == 4) {
NANOARROW_RETURN_NOT_OK_WITH_ERROR(ArrowBufferReserve(&private_data->header, 4),
&private_data->error);
memmove(private_data->header.data + 4, private_data->header.data,
private_data->header.size_bytes);
uint32_t continuation = 0xFFFFFFFFU;
memcpy(private_data->header.data, &continuation, sizeof(uint32_t));
private_data->header.size_bytes += 4;
extra_bytes_already_read = 4;

input_view.data.data = private_data->header.data;
input_view.size_bytes = private_data->header.size_bytes;

NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderPeekHeader(
&private_data->decoder, input_view, &prefix_size_bytes, &private_data->error));
}

// Read the header bytes
int64_t expected_header_bytes = private_data->decoder.header_size_bytes - 8;
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowBufferReserve(&private_data->header, expected_header_bytes),
ArrowBufferReserve(&private_data->header,
expected_header_bytes - extra_bytes_already_read),
&private_data->error);
NANOARROW_RETURN_NOT_OK(
private_data->input.read(&private_data->input, private_data->header.data + 8,
expected_header_bytes, &bytes_read, &private_data->error));
NANOARROW_RETURN_NOT_OK(private_data->input.read(
&private_data->input, private_data->header.data + private_data->header.size_bytes,
expected_header_bytes - extra_bytes_already_read, &bytes_read,
&private_data->error));
private_data->header.size_bytes += bytes_read;

// Verify + decode the header
Expand Down
5 changes: 5 additions & 0 deletions src/nanoarrow/nanoarrow_ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,13 @@ void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder);
/// these bytes and returns ESPIPE if there are not enough remaining bytes in data to read
/// the entire header message, EINVAL if the first 8 bytes are not valid, ENODATA if the
/// Arrow end-of-stream indicator has been reached, or NANOARROW_OK otherwise.
///
/// Pre-1.0 messages were not prefixed with 0xFFFFFFFF. For these messages, a value
/// of 4 will be placed into prefix_size_bytes; otherwise a value of 8 will be placed
/// into prefix_size_bytes.
ArrowErrorCode ArrowIpcDecoderPeekHeader(struct ArrowIpcDecoder* decoder,
struct ArrowBufferView data,
int32_t* prefix_size_bytes,
struct ArrowError* error);

/// \brief Verify a message header
Expand Down
Loading