Skip to content

Commit

Permalink
added support for run-end encoded array
Browse files Browse the repository at this point in the history
  • Loading branch information
cocoa-xu committed Jun 7, 2024
1 parent 46bcbca commit c0b7f1a
Show file tree
Hide file tree
Showing 10 changed files with 358 additions and 45 deletions.
64 changes: 59 additions & 5 deletions c_src/adbc_arrow_array.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ int get_arrow_array_children_as_list(ErlNifEnv *env, struct ArrowSchema * schema
if (enif_is_identical(childrens[1], kAtomNil)) {
children[child_i] = kAtomNil;
} else {
children[child_i] = make_adbc_column(env, childrens[0], child_type, nullable, child_metadata, childrens[1]);
children[child_i] = make_adbc_column(env, schema, values, childrens[0], child_type, nullable, child_metadata, childrens[1]);
}
}
}
Expand Down Expand Up @@ -228,7 +228,7 @@ int get_arrow_struct(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowAr
if (enif_is_identical(childrens[1], kAtomNil)) {
children[child_i] = kAtomNil;
} else {
children[child_i] = make_adbc_column(env, childrens[0], child_type, nullable, child_metadata, childrens[1]);
children[child_i] = make_adbc_column(env, schema, values, childrens[0], child_type, nullable, child_metadata, childrens[1]);
}
}
}
Expand Down Expand Up @@ -437,6 +437,55 @@ ERL_NIF_TERM get_arrow_array_sparse_union_children(ErlNifEnv *env, struct ArrowS
return get_arrow_array_sparse_union_children(env, schema, values, 0, -1, level);
}

ERL_NIF_TERM get_arrow_run_end_encoded(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, int64_t offset, int64_t count, uint64_t level) {
ERL_NIF_TERM error{};
if (schema->n_children != 2 || values->n_children != 2) {
return erlang::nif::error(env, "invalid ArrowSchema (run_end_encoded), schema->n_children != 2 || values->n_children != 2");
}
if (schema->children == nullptr || values->children == nullptr) {
return erlang::nif::error(env, "invalid ArrowArray (run_end_encoded), schema->children == nullptr || values->children == nullptr");
}
if (strncmp("run_ends", schema->children[0]->name, 8) != 0) {
return erlang::nif::error(env, "invalid ArrowSchema (run_end_encoded), its first child is not named run_ends");
}
if (strncmp("values", schema->children[1]->name, 6) != 0) {
return erlang::nif::error(env, "invalid ArrowSchema (run_end_encoded), its second child is not named values");
}

std::vector<ERL_NIF_TERM> children(2);
for (int64_t child_i = 0; child_i < 2; child_i++) {
std::vector<ERL_NIF_TERM> childrens;
ERL_NIF_TERM child_type;
ERL_NIF_TERM child_metadata;
if (arrow_array_to_nif_term(env, schema->children[child_i], values->children[child_i], 0, -1, level + 1, childrens, child_type, child_metadata, error) == 1) {
return 1;
}

if (childrens.size() == 1) {
children[child_i] = childrens[0];
} else {
bool nullable = child_i == 1 && ((schema->children[child_i]->flags & ARROW_FLAG_NULLABLE) || (values->children[child_i]->null_count > 0));
if (enif_is_identical(childrens[1], kAtomNil)) {
children[child_i] = kAtomNil;
} else {
children[child_i] = make_adbc_column(env, schema, values, childrens[0], child_type, nullable, child_metadata, childrens[1]);
}
}
}

ERL_NIF_TERM run_ends_keys[] = { kAtomRunEnds, kAtomValues };
ERL_NIF_TERM run_ends_values[] = { children[0], children[1] };
ERL_NIF_TERM run_ends_data;
// only fail if there are duplicated keys
// so we don't need to check the return value
enif_make_map_from_arrays(env, run_ends_keys, run_ends_values, 2, &run_ends_data);
return run_ends_data;
}

ERL_NIF_TERM get_arrow_run_end_encoded(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, uint64_t level) {
return get_arrow_run_end_encoded(env, schema, values, 0, -1, level);
}

ERL_NIF_TERM get_arrow_array_list_children(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, int64_t offset, int64_t count, uint64_t level, ArrowType list_type, unsigned n_items) {
ERL_NIF_TERM error{};
if (schema->children == nullptr) {
Expand Down Expand Up @@ -497,7 +546,7 @@ ERL_NIF_TERM get_arrow_array_list_children(ErlNifEnv *env, struct ArrowSchema *
if (enif_is_identical(childrens[1], kAtomNil)) {
children.emplace_back(kAtomNil);
} else {
children.emplace_back(make_adbc_column(env, childrens[0], children_type, children_nullable, children_metadata, childrens[1]));
children.emplace_back(make_adbc_column(env, schema, values, childrens[0], children_type, children_nullable, children_metadata, childrens[1]));
}
}
}
Expand Down Expand Up @@ -536,7 +585,7 @@ ERL_NIF_TERM get_arrow_array_list_children(ErlNifEnv *env, struct ArrowSchema *
if (enif_is_identical(childrens[1], kAtomNil)) {
children.emplace_back(kAtomNil);
} else {
children.emplace_back(make_adbc_column(env, childrens[0], children_type, children_nullable, children_metadata, childrens[1]));
children.emplace_back(make_adbc_column(env, schema, values, childrens[0], children_type, children_nullable, children_metadata, childrens[1]));
}
}
}
Expand Down Expand Up @@ -608,7 +657,7 @@ ERL_NIF_TERM get_arrow_array_list_view(ErlNifEnv *env, struct ArrowSchema * sche
if (enif_is_identical(childrens[1], kAtomNil)) {
values_term = kAtomNil;
} else {
values_term = make_adbc_column(env, childrens[0], children_type, false, children_metadata, childrens[1]);
values_term = make_adbc_column(env, schema, values, childrens[0], children_type, false, children_metadata, childrens[1]);
}
}

Expand Down Expand Up @@ -1007,6 +1056,11 @@ int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct
return 1;
}
children_term = enif_make_list_from_array(env, children.data(), (unsigned)children.size());
} else if (strncmp("+r", format, 2) == 0) {
// NANOARROW_TYPE_RUN_END_ENCODED (maybe in nanoarrow v0.6.0)
// https://github.com/apache/arrow-nanoarrow/pull/507
term_type = kAdbcColumnTypeRunEndEncoded;
children_term = get_arrow_run_end_encoded(env, schema, values, offset, count, level);
} else if (strncmp("+m", format, 2) == 0) {
// NANOARROW_TYPE_MAP
term_type = kAdbcColumnTypeMap;
Expand Down
23 changes: 15 additions & 8 deletions c_src/adbc_column.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,18 @@ int AdbcColumnNifTerm::from_term(ErlNifEnv *env, ERL_NIF_TERM adbc_column, bool
return 0;
}

ERL_NIF_TERM make_adbc_column(ErlNifEnv *env, ERL_NIF_TERM name_term, ERL_NIF_TERM type_term, bool nullable, ERL_NIF_TERM metadata, ERL_NIF_TERM data) {
ERL_NIF_TERM make_adbc_column(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * array, ERL_NIF_TERM name_term, ERL_NIF_TERM type_term, bool nullable, ERL_NIF_TERM metadata, ERL_NIF_TERM data) {
ERL_NIF_TERM nullable_term = nullable ? kAtomTrue : kAtomFalse;

ERL_NIF_TERM keys[] = {
std::vector<ERL_NIF_TERM> keys = {
kAtomStructKey,
kAtomNameKey,
kAtomTypeKey,
kAtomNullableKey,
kAtomMetadataKey,
kAtomDataKey,
};
ERL_NIF_TERM values[] = {
std::vector<ERL_NIF_TERM> values = {
kAtomAdbcColumnModule,
name_term,
type_term,
Expand All @@ -117,19 +117,26 @@ ERL_NIF_TERM make_adbc_column(ErlNifEnv *env, ERL_NIF_TERM name_term, ERL_NIF_TE
data,
};

if (enif_is_identical(type_term, kAdbcColumnTypeRunEndEncoded)) {
keys.emplace_back(kAtomLength);
values.emplace_back(enif_make_int64(env, array->length));
keys.emplace_back(kAtomOffset);
values.emplace_back(enif_make_int64(env, array->offset));
}

ERL_NIF_TERM adbc_column;
enif_make_map_from_arrays(env, keys, values, sizeof(keys)/sizeof(keys[0]), &adbc_column);
enif_make_map_from_arrays(env, keys.data(), values.data(), (unsigned)values.size(), &adbc_column);
return adbc_column;
}

ERL_NIF_TERM make_adbc_column(ErlNifEnv *env, ERL_NIF_TERM name_term, const char * type, bool nullable, ERL_NIF_TERM metadata, ERL_NIF_TERM data) {
ERL_NIF_TERM make_adbc_column(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, ERL_NIF_TERM name_term, const char * type, bool nullable, ERL_NIF_TERM metadata, ERL_NIF_TERM data) {
ERL_NIF_TERM type_term = erlang::nif::make_binary(env, type);
return make_adbc_column(env, name_term, type_term, nullable, metadata, data);
return make_adbc_column(env, schema, values, name_term, type_term, nullable, metadata, data);
}

ERL_NIF_TERM make_adbc_column(ErlNifEnv *env, const char * name, const char * type, bool nullable, ERL_NIF_TERM metadata, ERL_NIF_TERM data) {
ERL_NIF_TERM make_adbc_column(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, const char * name, const char * type, bool nullable, ERL_NIF_TERM metadata, ERL_NIF_TERM data) {
ERL_NIF_TERM name_term = erlang::nif::make_binary(env, name == nullptr ? "" : name);
return make_adbc_column(env, name_term, type, nullable, metadata, data);
return make_adbc_column(env, schema, values, name_term, type, nullable, metadata, data);
}

template <typename Integer, typename std::enable_if<
Expand Down
4 changes: 4 additions & 0 deletions c_src/adbc_consts.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ static ERL_NIF_TERM kAtomValidity;
static ERL_NIF_TERM kAtomOffsets;
static ERL_NIF_TERM kAtomSizes;
static ERL_NIF_TERM kAtomValues;
static ERL_NIF_TERM kAtomRunEnds;
static ERL_NIF_TERM kAtomOffset;
static ERL_NIF_TERM kAtomLength;

static ERL_NIF_TERM kAtomDecimal;
static ERL_NIF_TERM kAtomFixedSizeBinary;
Expand Down Expand Up @@ -116,6 +119,7 @@ static ERL_NIF_TERM kAdbcColumnTypeStruct;
static ERL_NIF_TERM kAdbcColumnTypeMap;
static ERL_NIF_TERM kAdbcColumnTypeDenseUnion;
static ERL_NIF_TERM kAdbcColumnTypeSparseUnion;
static ERL_NIF_TERM kAdbcColumnTypeRunEndEncoded;

// error codes
constexpr int kErrorBufferIsNotAMap = 1;
Expand Down
4 changes: 4 additions & 0 deletions c_src/adbc_nif.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,9 @@ static int on_load(ErlNifEnv *env, void **, ERL_NIF_TERM) {
kAtomOffsets = erlang::nif::atom(env, "offsets");
kAtomSizes = erlang::nif::atom(env, "sizes");
kAtomValues = erlang::nif::atom(env, "values");
kAtomRunEnds = erlang::nif::atom(env, "run_ends");
kAtomOffset = erlang::nif::atom(env, "offset");
kAtomLength = erlang::nif::atom(env, "length");

kAtomDecimal = erlang::nif::atom(env, "decimal");
kAtomFixedSizeBinary = erlang::nif::atom(env, "fixed_size_binary");
Expand Down Expand Up @@ -860,6 +863,7 @@ static int on_load(ErlNifEnv *env, void **, ERL_NIF_TERM) {
kAdbcColumnTypeMap = erlang::nif::atom(env, "map");
kAdbcColumnTypeDenseUnion = erlang::nif::atom(env, "dense_union");
kAdbcColumnTypeSparseUnion = erlang::nif::atom(env, "sparse_union");
kAdbcColumnTypeRunEndEncoded = erlang::nif::atom(env, "run_end_encoded");

return 0;
}
Expand Down
113 changes: 99 additions & 14 deletions lib/adbc_column.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ defmodule Adbc.Column do
`Adbc.Column` corresponds to a column in the table. It contains the column's name, type, and
data. The data is a list of values of the column's data type.
"""
defstruct name: nil,
type: nil,
nullable: false,
metadata: nil,
data: nil
@enforce_keys [:name, :type, :nullable, :data]
defstruct [:name, :type, :nullable, :metadata, :data, :length, :offset]

import Bitwise

@type i8 :: -128..127
@type u8 :: 0..255
Expand Down Expand Up @@ -82,6 +81,7 @@ defmodule Adbc.Column do
sizes: [non_neg_integer()],
values: %Adbc.Column{}
}
@valid_run_end_types [:i16, :i32, :i64]
@type data_type ::
:boolean
| signed_integer
Expand All @@ -105,6 +105,7 @@ defmodule Adbc.Column do
| timestamp_t
| duration_t
| interval_t
| :run_end_encoded
@spec column(data_type(), list() | list_view_data_t(), Keyword.t()) :: %Adbc.Column{}
def column(type, data, opts \\ [])
when (is_atom(type) or is_tuple(type)) and
Expand Down Expand Up @@ -1129,7 +1130,7 @@ defmodule Adbc.Column do
}
}
}
iex> Adbc.Column.list_view_to_list(list_view)
iex> Adbc.Column.to_list(list_view)
%Adbc.Column{
name: nil,
type: :list,
Expand Down Expand Up @@ -1168,8 +1169,8 @@ defmodule Adbc.Column do
]
}
"""
@spec list_view_to_list(%Adbc.Column{data: map()}) :: %Adbc.Column{}
def list_view_to_list(
@spec to_list(%Adbc.Column{data: map()}) :: %Adbc.Column{}
def to_list(
column = %Adbc.Column{
type: type,
data: %{
Expand All @@ -1183,7 +1184,7 @@ defmodule Adbc.Column do
when type in @list_view_types and is_list(validity) and is_list(offsets) and is_list(sizes) do
values =
if values.type in @list_view_types do
Adbc.Column.list_view_to_list(values)
Adbc.Column.to_list(values)
else
values
end
Expand All @@ -1193,8 +1194,7 @@ defmodule Adbc.Column do
if valid do
%{
values
| data:
Enum.map(Enum.slice(values.data, offset, size), &Adbc.Column.list_view_to_list/1)
| data: Enum.map(Enum.slice(values.data, offset, size), &Adbc.Column.to_list/1)
}
else
nil
Expand All @@ -1204,9 +1204,94 @@ defmodule Adbc.Column do
%{column | data: new_data, type: :list}
end

def list_view_to_list(column = %Adbc.Column{data: data}) when is_list(data) do
%{column | data: Enum.map(data, &Adbc.Column.list_view_to_list/1)}
def to_list(
column = %Adbc.Column{
type: :run_end_encoded,
data: %{
run_ends: run_ends = %Adbc.Column{type: run_end_type},
values: values
},
length: length,
offset: offset
}
)
when is_integer(offset) and offset >= 0 and is_integer(length) and length >= 1 do
values = Adbc.Column.to_list(values)

max_allowed_length =
case run_end_type do
:i16 ->
1 <<< 16

:i32 ->
1 <<< 32

:i64 ->
1 <<< 64

_ ->
raise Adbc.Error,
"Invalid run end type: #{inspect(run_end_type)}, expected one of #{inspect(@valid_run_end_types)}"
end

if offset + length > max_allowed_length do
raise Adbc.Error,
"Run end data exceeds maximum allowed length: #{length} + #{offset} > #{max_allowed_length}"
end

run_end_len = Enum.count(run_ends.data)

{run_end_start_index, values_start_index, encoded} =
case Enum.drop_while(run_ends.data, &(&1 < offset)) do
[] ->
raise Adbc.Error,
"Last run end is #{hd(Enum.reverse(run_ends.data))} but it should >= #{offset + length} (offset: #{offset}, length: #{length})"

encoded = [run_end_start_index | _] ->
values_start_index = run_end_len - Enum.count(encoded)

if run_end_start_index == offset do
{run_end_start_index, values_start_index, encoded}
else
{offset, values_start_index, encoded}
end
end

if offset + length > hd(Enum.reverse(run_ends.data)) do
raise Adbc.Error,
"Last run end is #{hd(Enum.reverse(run_ends.data))} but it should >= #{offset + length} (offset: #{offset}, length: #{length})"
end

{_, _, decoded} =
Enum.reduce(encoded, {run_end_start_index, values_start_index, []}, fn run_end,
{index, value_index,
acc} ->
real_end =
if run_end > offset + length do
offset + length
else
run_end
end

if is_map(values.data) do
{run_end, value_index + 1, List.duplicate(to_list(values), real_end - index) ++ acc}
else
{run_end, value_index + 1,
List.duplicate(Enum.at(values.data, value_index), real_end - index) ++ acc}
end
end)

%Adbc.Column{
name: column.name,
type: values.type,
nullable: column.nullable,
data: Enum.reverse(decoded)
}
end

def to_list(column = %Adbc.Column{data: data}) when is_list(data) do
%{column | data: Enum.map(data, &Adbc.Column.to_list/1)}
end

def list_view_to_list(v), do: v
def to_list(v), do: v
end
8 changes: 4 additions & 4 deletions lib/adbc_result.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule Adbc.Result do
Returns a map of columns as a result.
"""
def to_map(result = %Adbc.Result{}) do
Map.new(list_view_to_list(result).data, fn %Adbc.Column{name: name, type: type, data: data} ->
Map.new(to_list(result).data, fn %Adbc.Column{name: name, type: type, data: data} ->
case type do
:list -> {name, Enum.map(data, &list_to_map/1)}
_ -> {name, data}
Expand All @@ -30,9 +30,9 @@ defmodule Adbc.Result do
@doc """
Convert any list view in the result set to normal lists.
"""
@spec list_view_to_list(%Adbc.Result{}) :: %Adbc.Result{}
def list_view_to_list(result = %Adbc.Result{data: data}) when is_list(data) do
%{result | data: Enum.map(data, &Adbc.Column.list_view_to_list/1)}
@spec to_list(%Adbc.Result{}) :: %Adbc.Result{}
def to_list(result = %Adbc.Result{data: data}) when is_list(data) do
%{result | data: Enum.map(data, &Adbc.Column.to_list/1)}
end

defp list_to_map(%Adbc.Column{name: name, type: type, data: data}) do
Expand Down
Loading

0 comments on commit c0b7f1a

Please sign in to comment.