Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added support for duration types #76

Merged
merged 2 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions c_src/adbc_arrow_array.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct
}
term_type = enif_make_tuple3(env, kAtomTimestamp, term_unit, term_timezone);

using value_type = uint64_t;
using value_type = int64_t;
if (count == -1) count = values->length;
if (values->n_buffers != 2) {
error = erlang::nif::error(env, "invalid n_buffers value for ArrowArray (format=ts), values->n_buffers != 2");
Expand Down Expand Up @@ -996,9 +996,9 @@ int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct
count,
(const uint8_t *)values->buffers[bitmap_buffer_index],
(const value_type *)values->buffers[data_buffer_index],
[unit, us_precision, naive_dt_module, calendar_iso, &keys](ErlNifEnv *env, uint64_t val) -> ERL_NIF_TERM {
[unit, us_precision, naive_dt_module, calendar_iso, &keys](ErlNifEnv *env, int64_t val) -> ERL_NIF_TERM {
// Elixir only supports microsecond precision
uint64_t us = val * unit / 1000;
int64_t us = val * unit / 1000;
time_t t = (time_t)(us / 1000000);
tm* time = gmtime(&t);
us = us % 1000000;
Expand All @@ -1021,6 +1021,42 @@ int arrow_array_to_nif_term(ErlNifEnv *env, struct ArrowSchema * schema, struct
}
);
}
} else if (strncmp("tD", format, 2) == 0) {
// NANOARROW_TYPE_DURATION
switch (format[2]) {
case 's': // seconds
term_type = kAdbcColumnTypeDurationSeconds;
break;
case 'm': // milliseconds
term_type = kAdbcColumnTypeDurationMilliseconds;
break;
case 'u': // microseconds
term_type = kAdbcColumnTypeDurationMicroseconds;
break;
case 'n': // nanoseconds
term_type = kAdbcColumnTypeDurationNanoseconds;
break;
default:
format_processed = false;
}

if (format_processed) {
using value_type = int64_t;
if (count == -1) count = values->length;
if (values->n_buffers != 2) {
error = erlang::nif::error(env, "invalid n_buffers value for ArrowArray (format=tD), values->n_buffers != 2");
return 1;
}

current_term = values_from_buffer(
env,
offset,
count,
(const uint8_t *)values->buffers[bitmap_buffer_index],
(const value_type *)values->buffers[data_buffer_index],
enif_make_int64
);
}
} else {
format_processed = false;
}
Expand Down
56 changes: 53 additions & 3 deletions c_src/adbc_column.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,42 @@ int do_get_list_timestamp(ErlNifEnv *env, ERL_NIF_TERM list, bool nullable, Arro
}
}

int get_list_duration(ErlNifEnv *env, ERL_NIF_TERM list, bool nullable, const std::function<void(int64_t val, bool is_nil)> &callback) {
ERL_NIF_TERM head, tail;
tail = list;
while (enif_get_list_cell(env, tail, &head, &tail)) {
if (enif_is_identical(head, kAtomNil)) {
callback(0, true);
} else {
int64_t val;
if (erlang::nif::get(env, head, &val)) {
callback(val, false);
} else {
return 1;
}
}
}
return 0;
}

int do_get_list_duration(ErlNifEnv *env, ERL_NIF_TERM list, bool nullable, ArrowType nanoarrow_type, enum ArrowTimeUnit time_unit, struct ArrowArray* array_out, struct ArrowSchema* schema_out, struct ArrowError* error_out) {
NANOARROW_RETURN_NOT_OK(ArrowSchemaSetTypeDateTime(schema_out, nanoarrow_type, time_unit, NULL));
NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromSchema(array_out, schema_out, error_out));
NANOARROW_RETURN_NOT_OK(ArrowArrayStartAppending(array_out));
if (nullable) {
return get_list_duration(env, list, nullable, [&array_out](int64_t val, bool is_nil) -> void {
ArrowArrayAppendInt(array_out, val);
if (is_nil) {
ArrowArrayAppendNull(array_out, 1);
}
});
} else {
return get_list_duration(env, list, nullable, [&array_out](int64_t val, bool) -> void {
ArrowArrayAppendInt(array_out, val);
});
}
}

// non-zero return value indicating errors
int adbc_column_to_adbc_field(ErlNifEnv *env, ERL_NIF_TERM adbc_buffer, struct ArrowArray* array_out, struct ArrowSchema* schema_out, struct ArrowError* error_out) {
array_out->release = NULL;
Expand Down Expand Up @@ -665,17 +701,31 @@ int adbc_column_to_adbc_field(ErlNifEnv *env, ERL_NIF_TERM adbc_buffer, struct A
ret = do_get_list_date(env, data_term, nullable, NANOARROW_TYPE_DATE64, array_out, schema_out, error_out);
} else if (enif_is_identical(type_term, kAdbcColumnTypeBool)) {
ret = do_get_list_boolean(env, data_term, nullable, NANOARROW_TYPE_BOOL, array_out, schema_out, error_out);
} else if (enif_is_tuple(env, type_term)) {
// NANOARROW_TYPE_TIME32
// NANOARROW_TYPE_TIME64
} else if (enif_is_tuple(env, type_term)) {
if (enif_is_identical(type_term, kAdbcColumnTypeTime32Seconds)) {
// NANOARROW_TYPE_TIME32
ret = do_get_list_time(env, data_term, nullable, NANOARROW_TYPE_TIME32, NANOARROW_TIME_UNIT_SECOND, 1000000000, array_out, schema_out, error_out);
} else if (enif_is_identical(type_term, kAdbcColumnTypeTime32Milliseconds)) {
// NANOARROW_TYPE_TIME32
ret = do_get_list_time(env, data_term, nullable, NANOARROW_TYPE_TIME32, NANOARROW_TIME_UNIT_MILLI, 1000000, array_out, schema_out, error_out);
} else if (enif_is_identical(type_term, kAdbcColumnTypeTime64Microseconds)) {
// NANOARROW_TYPE_TIME64
ret = do_get_list_time(env, data_term, nullable, NANOARROW_TYPE_TIME64, NANOARROW_TIME_UNIT_MICRO, 1000, array_out, schema_out, error_out);
} else if (enif_is_identical(type_term, kAdbcColumnTypeTime64Nanoseconds)) {
// NANOARROW_TYPE_TIME64
ret = do_get_list_time(env, data_term, nullable, NANOARROW_TYPE_TIME64, NANOARROW_TIME_UNIT_NANO, 1, array_out, schema_out, error_out);
} else if (enif_is_identical(type_term, kAdbcColumnTypeDurationSeconds)) {
// NANOARROW_TYPE_DURATION
ret = do_get_list_duration(env, data_term, nullable, NANOARROW_TYPE_DURATION, NANOARROW_TIME_UNIT_SECOND, array_out, schema_out, error_out);
} else if (enif_is_identical(type_term, kAdbcColumnTypeDurationMilliseconds)) {
// NANOARROW_TYPE_DURATION
ret = do_get_list_duration(env, data_term, nullable, NANOARROW_TYPE_DURATION, NANOARROW_TIME_UNIT_MILLI, array_out, schema_out, error_out);
} else if (enif_is_identical(type_term, kAdbcColumnTypeDurationMicroseconds)) {
// NANOARROW_TYPE_DURATION
ret = do_get_list_duration(env, data_term, nullable, NANOARROW_TYPE_DURATION, NANOARROW_TIME_UNIT_MICRO, array_out, schema_out, error_out);
} else if (enif_is_identical(type_term, kAdbcColumnTypeDurationNanoseconds)) {
// NANOARROW_TYPE_DURATION
ret = do_get_list_duration(env, data_term, nullable, NANOARROW_TYPE_DURATION, NANOARROW_TIME_UNIT_NANO, array_out, schema_out, error_out);
} else {
const ERL_NIF_TERM *tuple = nullptr;
int arity;
Expand Down
5 changes: 5 additions & 0 deletions c_src/adbc_consts.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ static ERL_NIF_TERM kAtomEndOfSeries;
static ERL_NIF_TERM kAtomStructKey;
static ERL_NIF_TERM kAtomTime32;
static ERL_NIF_TERM kAtomTime64;
static ERL_NIF_TERM kAtomDuration;
static ERL_NIF_TERM kAtomSeconds;
static ERL_NIF_TERM kAtomMilliseconds;
static ERL_NIF_TERM kAtomMicroseconds;
Expand Down Expand Up @@ -72,6 +73,10 @@ static ERL_NIF_TERM kAdbcColumnTypeBool;
#define kAdbcColumnTypeTime32Milliseconds enif_make_tuple2(env, kAtomTime32, kAtomMilliseconds)
#define kAdbcColumnTypeTime64Microseconds enif_make_tuple2(env, kAtomTime64, kAtomMicroseconds)
#define kAdbcColumnTypeTime64Nanoseconds enif_make_tuple2(env, kAtomTime64, kAtomNanoseconds)
#define kAdbcColumnTypeDurationSeconds enif_make_tuple2(env, kAtomDuration, kAtomSeconds)
#define kAdbcColumnTypeDurationMilliseconds enif_make_tuple2(env, kAtomDuration, kAtomMilliseconds)
#define kAdbcColumnTypeDurationMicroseconds enif_make_tuple2(env, kAtomDuration, kAtomMicroseconds)
#define kAdbcColumnTypeDurationNanoseconds enif_make_tuple2(env, kAtomDuration, kAtomNanoseconds)

// error codes
constexpr int kErrorBufferIsNotAMap = 1;
Expand Down
1 change: 1 addition & 0 deletions c_src/adbc_nif.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,7 @@ static int on_load(ErlNifEnv *env, void **, ERL_NIF_TERM) {
kAtomStructKey = erlang::nif::atom(env, "__struct__");
kAtomTime32 = erlang::nif::atom(env, "time32");
kAtomTime64 = erlang::nif::atom(env, "time64");
kAtomDuration = erlang::nif::atom(env, "duration");
kAtomSeconds = erlang::nif::atom(env, "seconds");
kAtomMilliseconds = erlang::nif::atom(env, "milliseconds");
kAtomMicroseconds = erlang::nif::atom(env, "microseconds");
Expand Down
46 changes: 46 additions & 0 deletions lib/adbc_column.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ defmodule Adbc.Column do
| {:timestamp, :milliseconds, String.t()}
| {:timestamp, :microseconds, String.t()}
| {:timestamp, :nanoseconds, String.t()}
@type duration_t ::
{:duration, :seconds}
| {:duration, :milliseconds}
| {:duration, :microseconds}
| {:duration, :nanoseconds}
@type data_type ::
:boolean
| signed_integer
Expand All @@ -55,6 +60,7 @@ defmodule Adbc.Column do
| time32_t
| time64_t
| timestamp_t
| duration_t

@spec column(data_type(), list, Keyword.t()) :: %Adbc.Column{}
def column(type, data, opts \\ [])
Expand Down Expand Up @@ -737,4 +743,44 @@ defmodule Adbc.Column do
when is_list(data) and is_binary(timezone) and is_list(opts) do
column({:timestamp, :nanoseconds, timezone}, data, opts)
end

@doc """
A column that contains durations represented as signed 64-bit integers.

## Arguments

* `data`: a list of integer values representing the time in the specified unit

* `unit`: specify the unit of the time value, one of the following:
* `:seconds`
* `:milliseconds`
* `:microseconds`
* `:nanoseconds`

* `opts`: A keyword list of options

## Options

* `:name` - The name of the column
* `:nullable` - A boolean value indicating whether the column is nullable
* `:metadata` - A map of metadata
"""
@spec duration([integer()], time_unit(), Keyword.t()) :: %Adbc.Column{}
def duration(data, unit, opts \\ [])

def duration(data, :seconds, opts) when is_list(data) and is_list(opts) do
column({:duration, :seconds}, data, opts)
end

def duration(data, :milliseconds, opts) when is_list(data) and is_list(opts) do
column({:duration, :milliseconds}, data, opts)
end

def duration(data, :microseconds, opts) when is_list(data) and is_list(opts) do
column({:duration, :microseconds}, data, opts)
end

def duration(data, :nanoseconds, opts) when is_list(data) and is_list(opts) do
column({:duration, :nanoseconds}, data, opts)
end
end
12 changes: 10 additions & 2 deletions test/adbc_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ defmodule AdbcTest do
select
'2023-03-01T10:23:45'::timestamp as datetime,
'2023-03-01T10:23:45.123456'::timestamp as datetime_usec,
'2023-03-01T10:23:45 PST'::timestamptz as datetime_tz,
'2023-03-01T10:23:45 PST'::timestamptz as datetime_tz_8601,
'2023-03-01T10:23:45+02'::timestamptz as datetime_tz_offset,
'2023-03-01'::date as date,
'10:23:45'::time as time,
'10:23:45.123456'::time as time_usec
Expand All @@ -186,12 +187,19 @@ defmodule AdbcTest do
data: [~N[2023-03-01 10:23:45.123456]]
},
%Adbc.Column{
name: "datetime_tz",
name: "datetime_tz_8601",
type: {:timestamp, :microseconds, "UTC"},
nullable: true,
metadata: nil,
data: [~N[2023-03-01 18:23:45.000000]]
},
%Adbc.Column{
name: "datetime_tz_offset",
type: {:timestamp, :microseconds, "UTC"},
nullable: true,
metadata: nil,
data: [~N[2023-03-01 08:23:45.000000]]
},
%Adbc.Column{
name: "date",
type: :date32,
Expand Down