Skip to content

Commit

Permalink
parquet DateTime insert has been supported YQ-2570 (ydb-platform#1410)
Browse files Browse the repository at this point in the history
  • Loading branch information
dorooleg authored Feb 26, 2024
1 parent e18a4d2 commit 0cc37c5
Show file tree
Hide file tree
Showing 44 changed files with 107 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#if USE_ARROW || USE_PARQUET

#include <common/DateLUTImpl.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
Expand Down Expand Up @@ -394,6 +396,43 @@ namespace NDB
}
}

static void fillArrowArrayWithDateTime64ColumnData(
const DataTypePtr & type,
ColumnPtr write_column,
const PaddedPODArray<UInt8> * null_bytemap,
const String & format_name,
arrow::ArrayBuilder* array_builder,
size_t start,
size_t end)
{
const auto * datetime64_type = assert_cast<const DataTypeDateTime64 *>(type.get());
const auto & column = assert_cast<const ColumnDecimal<DateTime64> &>(*write_column);
arrow::TimestampBuilder & builder = assert_cast<arrow::TimestampBuilder &>(*array_builder);
arrow::Status status;

auto scale = datetime64_type->getScale();
bool need_rescale = scale % 3;
auto rescale_multiplier = DecimalUtils::scaleMultiplier<DateTime64::NativeType>(3 - scale % 3);
for (size_t value_i = start; value_i < end; ++value_i)
{
if (null_bytemap && (*null_bytemap)[value_i])
{
status = builder.AppendNull();
}
else
{
auto value = static_cast<Int64>(column[value_i].get<DecimalField<DateTime64>>().getValue());
if (need_rescale)
{
if (common::mulOverflow(value, rescale_multiplier, value))
throw Exception(ErrorCodes::DECIMAL_OVERFLOW, "Decimal math overflow");
}
status = builder.Append(value);
}
checkStatus(status, write_column->getName(), format_name);
}
}

static void fillArrowArray(
const String & column_name,
ColumnPtr & column,
Expand Down Expand Up @@ -454,6 +493,10 @@ namespace NDB
DataTypePtr array_type = assert_cast<const DataTypeMap *>(column_type.get())->getNestedType();
fillArrowArrayWithArrayColumnData<arrow::MapBuilder>(column_name, column_array, array_type, null_bytemap, array_builder, format_name, start, end, dictionary_values);
}
else if (isDateTime64(column_type))
{
fillArrowArrayWithDateTime64ColumnData(column_type, column, null_bytemap, format_name, array_builder, start, end);
}
else if (isDecimal(column_type))
{
auto fill_decimal = [&](const auto & types) -> bool
Expand Down Expand Up @@ -548,6 +591,18 @@ namespace NDB
}
}

static arrow::TimeUnit::type getArrowTimeUnit(const DataTypeDateTime64 * type)
{
UInt32 scale = type->getScale();
if (scale == 0)
return arrow::TimeUnit::SECOND;
if (scale > 0 && scale <= 3)
return arrow::TimeUnit::MILLI;
if (scale > 3 && scale <= 6)
return arrow::TimeUnit::MICRO;
return arrow::TimeUnit::NANO;
}

static std::shared_ptr<arrow::DataType> getArrowType(
DataTypePtr column_type, ColumnPtr column, const std::string & column_name, const std::string & format_name, bool * out_is_column_nullable)
{
Expand Down Expand Up @@ -630,6 +685,12 @@ namespace NDB
getArrowType(val_type, columns[1], column_name, format_name, out_is_column_nullable));
}

if (isDateTime64(column_type))
{
const auto * datetime64_type = assert_cast<const DataTypeDateTime64 *>(column_type.get());
return arrow::timestamp(getArrowTimeUnit(datetime64_type), datetime64_type->getTimeZone().getTimeZone());
}

const std::string type_name = column_type->getFamilyName();
if (const auto * arrow_type_it = std::find_if(
internal_type_to_arrow_type.begin(),
Expand Down
36 changes: 36 additions & 0 deletions ydb/tests/fq/s3/canondata/result.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@
"test_format_setting.TestS3.test_timestamp_simple_format_insert[v1-common/simple_format/test.json-json_each_row]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_format_insert_v1-common_simple_format_test.json-json_each_row_/timestamp_format_common_simple_format_test.json"
},
"test_format_setting.TestS3.test_timestamp_simple_format_insert[v1-common/simple_format/test.parquet-parquet]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_format_insert_v1-common_simple_format_test.parquet-parquet_/timestamp_format_common_simple_format_test.parquet"
},
"test_format_setting.TestS3.test_timestamp_simple_format_insert[v1-common/simple_format/test.tsv-tsv_with_names]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_format_insert_v1-common_simple_format_test.tsv-tsv_with_names_/timestamp_format_common_simple_format_test.tsv"
},
Expand All @@ -86,6 +89,9 @@
"test_format_setting.TestS3.test_timestamp_simple_format_insert[v2-common/simple_format/test.json-json_each_row]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_format_insert_v2-common_simple_format_test.json-json_each_row_/timestamp_format_common_simple_format_test.json"
},
"test_format_setting.TestS3.test_timestamp_simple_format_insert[v2-common/simple_format/test.parquet-parquet]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_format_insert_v2-common_simple_format_test.parquet-parquet_/timestamp_format_common_simple_format_test.parquet"
},
"test_format_setting.TestS3.test_timestamp_simple_format_insert[v2-common/simple_format/test.tsv-tsv_with_names]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_format_insert_v2-common_simple_format_test.tsv-tsv_with_names_/timestamp_format_common_simple_format_test.tsv"
},
Expand All @@ -95,6 +101,9 @@
"test_format_setting.TestS3.test_timestamp_simple_iso_insert[v1-timestamp/simple_iso/test.json-json_each_row]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_iso_insert_v1-timestamp_simple_iso_test.json-json_each_row_/timestamp_simple_iso_test.json"
},
"test_format_setting.TestS3.test_timestamp_simple_iso_insert[v1-timestamp/simple_iso/test.parquet-parquet]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_iso_insert_v1-timestamp_simple_iso_test.parquet-parquet_/timestamp_simple_iso_test.parquet"
},
"test_format_setting.TestS3.test_timestamp_simple_iso_insert[v1-timestamp/simple_iso/test.tsv-tsv_with_names]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_iso_insert_v1-timestamp_simple_iso_test.tsv-tsv_with_names_/timestamp_simple_iso_test.tsv"
},
Expand All @@ -104,6 +113,9 @@
"test_format_setting.TestS3.test_timestamp_simple_iso_insert[v2-timestamp/simple_iso/test.json-json_each_row]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_iso_insert_v2-timestamp_simple_iso_test.json-json_each_row_/timestamp_simple_iso_test.json"
},
"test_format_setting.TestS3.test_timestamp_simple_iso_insert[v2-timestamp/simple_iso/test.parquet-parquet]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_iso_insert_v2-timestamp_simple_iso_test.parquet-parquet_/timestamp_simple_iso_test.parquet"
},
"test_format_setting.TestS3.test_timestamp_simple_iso_insert[v2-timestamp/simple_iso/test.tsv-tsv_with_names]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_iso_insert_v2-timestamp_simple_iso_test.tsv-tsv_with_names_/timestamp_simple_iso_test.tsv"
},
Expand All @@ -113,6 +125,9 @@
"test_format_setting.TestS3.test_timestamp_simple_posix_insert[v1-common/simple_posix/test.json-json_each_row]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_posix_insert_v1-common_simple_posix_test.json-json_each_row_/common_simple_posix_test.json"
},
"test_format_setting.TestS3.test_timestamp_simple_posix_insert[v1-common/simple_posix/test.parquet-parquet]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_posix_insert_v1-common_simple_posix_test.parquet-parquet_/common_simple_posix_test.parquet"
},
"test_format_setting.TestS3.test_timestamp_simple_posix_insert[v1-common/simple_posix/test.tsv-tsv_with_names]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_posix_insert_v1-common_simple_posix_test.tsv-tsv_with_names_/common_simple_posix_test.tsv"
},
Expand All @@ -122,6 +137,9 @@
"test_format_setting.TestS3.test_timestamp_simple_posix_insert[v2-common/simple_posix/test.json-json_each_row]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_posix_insert_v2-common_simple_posix_test.json-json_each_row_/common_simple_posix_test.json"
},
"test_format_setting.TestS3.test_timestamp_simple_posix_insert[v2-common/simple_posix/test.parquet-parquet]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_posix_insert_v2-common_simple_posix_test.parquet-parquet_/common_simple_posix_test.parquet"
},
"test_format_setting.TestS3.test_timestamp_simple_posix_insert[v2-common/simple_posix/test.tsv-tsv_with_names]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_simple_posix_insert_v2-common_simple_posix_test.tsv-tsv_with_names_/common_simple_posix_test.tsv"
},
Expand All @@ -143,6 +161,15 @@
"test_format_setting.TestS3.test_timestamp_unix_time_insert[v1-timestamp/unix_time/test.json-json_each_row-UNIX_TIME_SECONDS]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_unix_time_insert_v1-timestamp_unix_time_test.json-json_each_row-UNIX_TIME_SECONDS_/UNIX_TIME_SECONDS_timestamp_unix_time_test.json"
},
"test_format_setting.TestS3.test_timestamp_unix_time_insert[v1-timestamp/unix_time/test.parquet-parquet-UNIX_TIME_MICROSECONDS]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_unix_time_insert_v1-timestamp_unix_time_test.parquet-parquet-UNIX_TIME_MICROSECONDS_/UNIX_TIME_MICROSECONDS_timestamp_unix_time_test.parquet"
},
"test_format_setting.TestS3.test_timestamp_unix_time_insert[v1-timestamp/unix_time/test.parquet-parquet-UNIX_TIME_MILLISECONDS]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_unix_time_insert_v1-timestamp_unix_time_test.parquet-parquet-UNIX_TIME_MILLISECONDS_/UNIX_TIME_MILLISECONDS_timestamp_unix_time_test.parquet"
},
"test_format_setting.TestS3.test_timestamp_unix_time_insert[v1-timestamp/unix_time/test.parquet-parquet-UNIX_TIME_SECONDS]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_unix_time_insert_v1-timestamp_unix_time_test.parquet-parquet-UNIX_TIME_SECONDS_/UNIX_TIME_SECONDS_timestamp_unix_time_test.parquet"
},
"test_format_setting.TestS3.test_timestamp_unix_time_insert[v1-timestamp/unix_time/test.tsv-tsv_with_names-UNIX_TIME_MICROSECONDS]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_unix_time_insert_v1-timestamp_unix_time_test.tsv-tsv_with_names-UNIX_TIME_MICROSECONDS_/UNIX_TIME_MICROSECONDS_timestamp_unix_time_test.tsv"
},
Expand Down Expand Up @@ -170,6 +197,15 @@
"test_format_setting.TestS3.test_timestamp_unix_time_insert[v2-timestamp/unix_time/test.json-json_each_row-UNIX_TIME_SECONDS]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_unix_time_insert_v2-timestamp_unix_time_test.json-json_each_row-UNIX_TIME_SECONDS_/UNIX_TIME_SECONDS_timestamp_unix_time_test.json"
},
"test_format_setting.TestS3.test_timestamp_unix_time_insert[v2-timestamp/unix_time/test.parquet-parquet-UNIX_TIME_MICROSECONDS]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_unix_time_insert_v2-timestamp_unix_time_test.parquet-parquet-UNIX_TIME_MICROSECONDS_/UNIX_TIME_MICROSECONDS_timestamp_unix_time_test.parquet"
},
"test_format_setting.TestS3.test_timestamp_unix_time_insert[v2-timestamp/unix_time/test.parquet-parquet-UNIX_TIME_MILLISECONDS]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_unix_time_insert_v2-timestamp_unix_time_test.parquet-parquet-UNIX_TIME_MILLISECONDS_/UNIX_TIME_MILLISECONDS_timestamp_unix_time_test.parquet"
},
"test_format_setting.TestS3.test_timestamp_unix_time_insert[v2-timestamp/unix_time/test.parquet-parquet-UNIX_TIME_SECONDS]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_unix_time_insert_v2-timestamp_unix_time_test.parquet-parquet-UNIX_TIME_SECONDS_/UNIX_TIME_SECONDS_timestamp_unix_time_test.parquet"
},
"test_format_setting.TestS3.test_timestamp_unix_time_insert[v2-timestamp/unix_time/test.tsv-tsv_with_names-UNIX_TIME_MICROSECONDS]": {
"uri": "file://test_format_setting.TestS3.test_timestamp_unix_time_insert_v2-timestamp_unix_time_test.tsv-tsv_with_names-UNIX_TIME_MICROSECONDS_/UNIX_TIME_MICROSECONDS_timestamp_unix_time_test.tsv"
},
Expand Down

This file was deleted.

This file was deleted.

Binary file not shown.

This file was deleted.

This file was deleted.

This file was deleted.

Binary file not shown.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Binary file not shown.
Binary file not shown.

This file was deleted.

This file was deleted.

This file was deleted.

Binary file not shown.
Binary file not shown.

This file was deleted.

This file was deleted.

This file was deleted.

Binary file not shown.
Binary file not shown.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
15 changes: 10 additions & 5 deletions ydb/tests/fq/s3/test_format_setting.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,8 @@ def test_timestamp_simple_iso(self, kikimr, s3, client, filename, type_format):
@pytest.mark.parametrize("filename, type_format", [
("timestamp/simple_iso/test.csv", "csv_with_names"),
("timestamp/simple_iso/test.tsv", "tsv_with_names"),
("timestamp/simple_iso/test.json", "json_each_row")
("timestamp/simple_iso/test.json", "json_each_row"),
("timestamp/simple_iso/test.parquet", "parquet")
])
def test_timestamp_simple_iso_insert(self, kikimr, s3, client, filename, type_format):
self.create_bucket_and_upload_file(filename, s3, kikimr)
Expand Down Expand Up @@ -383,7 +384,8 @@ def test_timestamp_simple_posix(self, kikimr, s3, client, filename, type_format)
@pytest.mark.parametrize("filename, type_format", [
("common/simple_posix/test.csv", "csv_with_names"),
("common/simple_posix/test.tsv", "tsv_with_names"),
("common/simple_posix/test.json", "json_each_row")
("common/simple_posix/test.json", "json_each_row"),
("common/simple_posix/test.parquet", "parquet")
])
def test_timestamp_simple_posix_insert(self, kikimr, s3, client, filename, type_format):
self.create_bucket_and_upload_file(filename, s3, kikimr)
Expand Down Expand Up @@ -432,7 +434,8 @@ def test_date_time_simple_iso(self, kikimr, s3, client, filename, type_format):
@pytest.mark.parametrize("filename, type_format", [
("date_time/simple_iso/test.csv", "csv_with_names"),
("date_time/simple_iso/test.tsv", "tsv_with_names"),
("date_time/simple_iso/test.json", "json_each_row")
("date_time/simple_iso/test.json", "json_each_row"),
("date_time/simple_iso/test.parquet", "parquet")
])
def test_date_time_simple_iso_insert(self, kikimr, s3, client, filename, type_format):
self.create_bucket_and_upload_file(filename, s3, kikimr)
Expand Down Expand Up @@ -507,7 +510,8 @@ def test_date_time_simple_posix_insert(self, kikimr, s3, client, filename, type_
@pytest.mark.parametrize("filename, type_format", [
("timestamp/unix_time/test.csv", "csv_with_names"),
("timestamp/unix_time/test.tsv", "tsv_with_names"),
("timestamp/unix_time/test.json", "json_each_row")
("timestamp/unix_time/test.json", "json_each_row"),
("timestamp/unix_time/test.parquet", "parquet")
])
def test_timestamp_unix_time_insert(self, kikimr, s3, client, filename, type_format, timestamp_format):
self.create_bucket_and_upload_file(filename, s3, kikimr)
Expand All @@ -531,7 +535,8 @@ def test_timestamp_unix_time_insert(self, kikimr, s3, client, filename, type_for
@pytest.mark.parametrize("filename, type_format", [
("common/simple_format/test.csv", "csv_with_names"),
("common/simple_format/test.tsv", "tsv_with_names"),
("common/simple_format/test.json", "json_each_row")
("common/simple_format/test.json", "json_each_row"),
("common/simple_format/test.parquet", "parquet")
])
def test_timestamp_simple_format_insert(self, kikimr, s3, client, filename, type_format):
self.create_bucket_and_upload_file(filename, s3, kikimr)
Expand Down

0 comments on commit 0cc37c5

Please sign in to comment.