Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

repeatedly call adbc_arrow_array_stream_next/1 until end of series #46

Merged
merged 4 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 37 additions & 19 deletions c_src/adbc_nif.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ template <typename M> static ERL_NIF_TERM strings_from_buffer(
return enif_make_list_from_array(env, values.data(), (unsigned)values.size());
}

static int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, uint64_t level, std::vector<ERL_NIF_TERM> &out_terms, ERL_NIF_TERM &error);
static int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, uint64_t level, std::vector<ERL_NIF_TERM> &out_terms, ERL_NIF_TERM &error, bool *end_of_series = nullptr);

static int get_arrow_array_children_as_list(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, uint64_t level, std::vector<ERL_NIF_TERM> &children, ERL_NIF_TERM &error) {
ERL_NIF_TERM children_term{};
Expand All @@ -113,9 +113,9 @@ static int get_arrow_array_children_as_list(ErlNifEnv *env, struct ArrowSchema *
return 1;
}

children.resize(schema->n_children);
if (schema->n_children > 0) {
for (int64_t child_i = 0; child_i < schema->n_children; child_i++) {
children.resize(values->n_children);
if (values->n_children > 0) {
for (int64_t child_i = 0; child_i < values->n_children; child_i++) {
struct ArrowSchema * child_schema = schema->children[child_i];
struct ArrowArray * child_values = values->children[child_i];
std::vector<ERL_NIF_TERM> childrens;
Expand Down Expand Up @@ -277,7 +277,7 @@ static ERL_NIF_TERM get_arrow_array_list_children(ErlNifEnv *env, struct ArrowSc
return enif_make_list_from_array(env, children.data(), (unsigned)items_values->n_children);
}

int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, uint64_t level, std::vector<ERL_NIF_TERM> &out_terms, ERL_NIF_TERM &error) {
int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct ArrowArray * values, uint64_t level, std::vector<ERL_NIF_TERM> &out_terms, ERL_NIF_TERM &error, bool *end_of_series) {
if (schema == nullptr) {
error = erlang::nif::error(env, "invalid ArrowSchema (nullptr) when invoking next");
return 1;
Expand Down Expand Up @@ -430,10 +430,18 @@ int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct
if (strncmp("+s", format, 2) == 0) {
// only handle and return children if this is a struct
is_struct = true;
if (get_arrow_array_children_as_list(env, schema, values, level, children, error) == 1) {
return 1;

if (schema->n_children == values->n_children) {
if (get_arrow_array_children_as_list(env, schema, values, level, children, error) == 1) {
return 1;
}
children_term = enif_make_list_from_array(env, children.data(), (unsigned)schema->n_children);
} else {
if (end_of_series) {
*end_of_series = true;
}
children_term = erlang::nif::atom(env, "end_of_series");
}
children_term = enif_make_list_from_array(env, children.data(), (unsigned)schema->n_children);
} else if (strncmp("+m", format, 2) == 0) {
children_term = get_arrow_array_map_children(env, schema, values, level);
} else if (strncmp("+l", format, 2) == 0 || strncmp("+L", format, 2) == 0) {
Expand Down Expand Up @@ -1071,28 +1079,38 @@ static ERL_NIF_TERM adbc_arrow_array_stream_next(ErlNifEnv *env, int argc, const
return erlang::nif::error(env, reason ? reason : "unknown error");
}

if (res->private_data == nullptr) {
res->private_data = enif_alloc(sizeof(struct ArrowSchema));
memset(res->private_data, 0, sizeof(struct ArrowSchema));
int code = res->val.get_schema(&res->val, (struct ArrowSchema *)res->private_data);
if (code != 0) {
const char * reason = res->val.get_last_error(&res->val);
enif_free(res->private_data);
res->private_data = nullptr;
return erlang::nif::error(env, reason ? reason : "unknown error");
}
if (res->private_data != nullptr) {
enif_free(res->private_data);
res->private_data = nullptr;
}

res->private_data = enif_alloc(sizeof(struct ArrowSchema));
memset(res->private_data, 0, sizeof(struct ArrowSchema));
code = res->val.get_schema(&res->val, (struct ArrowSchema *)res->private_data);
if (code != 0) {
const char * reason = res->val.get_last_error(&res->val);
enif_free(res->private_data);
res->private_data = nullptr;
return erlang::nif::error(env, reason ? reason : "unknown error");
}

std::vector<ERL_NIF_TERM> out_terms;

auto schema = (struct ArrowSchema*)res->private_data;
if (arrow_array_to_nif_term(env, schema, &out, 0, out_terms, error) == 1) {
bool end_of_series = false;
if (arrow_array_to_nif_term(env, schema, &out, 0, out_terms, error, &end_of_series) == 1) {
if (out.release) out.release(&out);
return error;
}

if (out_terms.size() == 1) {
ret = out_terms[0];
if (end_of_series) {
if (out.release) {
out.release(&out);
}
return ret;
}
} else {
ret = enif_make_tuple2(env, out_terms[0], out_terms[1]);
}
Expand Down
9 changes: 4 additions & 5 deletions lib/adbc_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,12 @@ defmodule Adbc.Connection do

defp stream_results(reference, acc, num_rows) do
case Adbc.Nif.adbc_arrow_array_stream_next(reference) do
{:ok, results, done} ->
{:ok, results, _done} ->
acc = Map.merge(acc, Map.new(results), fn _k, v1, v2 -> v1 ++ v2 end)
stream_results(reference, acc, num_rows)

case done do
0 -> stream_results(reference, acc, num_rows)
1 -> {:ok, %Adbc.Result{data: acc, num_rows: num_rows}}
end
:end_of_series ->
{:ok, %Adbc.Result{data: acc, num_rows: num_rows}}

{:error, reason} ->
{:error, error_to_exception(reason)}
Expand Down
14 changes: 14 additions & 0 deletions test/adbc_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,20 @@ defmodule AdbcTest do
Connection.query(conn, "SELECT 123 as num")
end

test "getting all chunks", %{conn: conn} do
query = """
SELECT * FROM generate_series('2000-03-01 00:00'::timestamp, '2100-03-04 12:00'::timestamp, '15 minutes')
"""

%Adbc.Result{
data: %{
"generate_series" => generate_series
}
} = Connection.query!(conn, query)

assert Enum.count(generate_series) == 3_506_641
end

test "select with temporal types", %{conn: conn} do
query = """
select
Expand Down