Skip to content

Commit

Permalink
Support conversion between spark timestamp and ch datetime64 (#119)
Browse files Browse the repository at this point in the history
* support timestamp -> datetime64

* fix bugs

* remove comments

* fix building error

* Revert log level
  • Loading branch information
taiyang-li authored Sep 22, 2022
1 parent 31a4432 commit f5909f2
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 82 deletions.
45 changes: 36 additions & 9 deletions utils/local-engine/Builder/SerializedPlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,55 +12,71 @@ SchemaPtr SerializedSchemaBuilder::build()
{
auto * t = type_struct->mutable_types()->Add();
t->mutable_i8()->set_nullability(
this->nullability_map[name] ? substrait::Type_Nullability_NULLABILITY_NULLABLE : substrait::Type_Nullability_NULLABILITY_REQUIRED);
this->nullability_map[name] ? substrait::Type_Nullability_NULLABILITY_NULLABLE
: substrait::Type_Nullability_NULLABILITY_REQUIRED);
}
else if (type == "I32")
{
auto * t = type_struct->mutable_types()->Add();
t->mutable_i32()->set_nullability(
this->nullability_map[name] ? substrait::Type_Nullability_NULLABILITY_NULLABLE : substrait::Type_Nullability_NULLABILITY_REQUIRED);
this->nullability_map[name] ? substrait::Type_Nullability_NULLABILITY_NULLABLE
: substrait::Type_Nullability_NULLABILITY_REQUIRED);
}
else if (type == "I64")
{
auto * t = type_struct->mutable_types()->Add();
t->mutable_i64()->set_nullability(
this->nullability_map[name] ? substrait::Type_Nullability_NULLABILITY_NULLABLE : substrait::Type_Nullability_NULLABILITY_REQUIRED);
this->nullability_map[name] ? substrait::Type_Nullability_NULLABILITY_NULLABLE
: substrait::Type_Nullability_NULLABILITY_REQUIRED);
}
else if (type == "Boolean")
{
auto * t = type_struct->mutable_types()->Add();
t->mutable_bool_()->set_nullability(
this->nullability_map[name] ? substrait::Type_Nullability_NULLABILITY_NULLABLE : substrait::Type_Nullability_NULLABILITY_REQUIRED);
this->nullability_map[name] ? substrait::Type_Nullability_NULLABILITY_NULLABLE
: substrait::Type_Nullability_NULLABILITY_REQUIRED);
}
else if (type == "I16")
{
auto * t = type_struct->mutable_types()->Add();
t->mutable_i16()->set_nullability(
this->nullability_map[name] ? substrait::Type_Nullability_NULLABILITY_NULLABLE : substrait::Type_Nullability_NULLABILITY_REQUIRED);
this->nullability_map[name] ? substrait::Type_Nullability_NULLABILITY_NULLABLE
: substrait::Type_Nullability_NULLABILITY_REQUIRED);
}
else if (type == "String")
{
auto * t = type_struct->mutable_types()->Add();
t->mutable_string()->set_nullability(
this->nullability_map[name] ? substrait::Type_Nullability_NULLABILITY_NULLABLE : substrait::Type_Nullability_NULLABILITY_REQUIRED);
this->nullability_map[name] ? substrait::Type_Nullability_NULLABILITY_NULLABLE
: substrait::Type_Nullability_NULLABILITY_REQUIRED);
}
else if (type == "FP32")
{
auto * t = type_struct->mutable_types()->Add();
t->mutable_fp32()->set_nullability(
this->nullability_map[name] ? substrait::Type_Nullability_NULLABILITY_NULLABLE : substrait::Type_Nullability_NULLABILITY_REQUIRED);
this->nullability_map[name] ? substrait::Type_Nullability_NULLABILITY_NULLABLE
: substrait::Type_Nullability_NULLABILITY_REQUIRED);
}
else if (type == "FP64")
{
auto * t = type_struct->mutable_types()->Add();
t->mutable_fp64()->set_nullability(
this->nullability_map[name] ? substrait::Type_Nullability_NULLABILITY_NULLABLE : substrait::Type_Nullability_NULLABILITY_REQUIRED);
this->nullability_map[name] ? substrait::Type_Nullability_NULLABILITY_NULLABLE
: substrait::Type_Nullability_NULLABILITY_REQUIRED);
}
else if (type == "Date")
{
auto * t = type_struct->mutable_types()->Add();
t->mutable_date()->set_nullability(
this->nullability_map[name] ? substrait::Type_Nullability_NULLABILITY_NULLABLE : substrait::Type_Nullability_NULLABILITY_REQUIRED);
this->nullability_map[name] ? substrait::Type_Nullability_NULLABILITY_NULLABLE
: substrait::Type_Nullability_NULLABILITY_REQUIRED);
}
else if (type == "Timestamp")
{
auto * t = type_struct->mutable_types()->Add();
t->mutable_timestamp()->set_nullability(
this->nullability_map[name] ? substrait::Type_Nullability_NULLABILITY_NULLABLE
: substrait::Type_Nullability_NULLABILITY_REQUIRED);
}
else
{
Expand All @@ -69,6 +85,7 @@ SchemaPtr SerializedSchemaBuilder::build()
}
return std::move(this->schema);
}

SerializedSchemaBuilder & SerializedSchemaBuilder::column(std::string name, std::string type, bool nullable)
{
this->type_map.emplace(name, type);
Expand Down Expand Up @@ -232,4 +249,14 @@ substrait::Expression* literalDate(int32_t value)
literal->set_date(value);
return rel;
}

/// Timestamp in units of microseconds since the UNIX epoch.
substrait::Expression * literalTimestamp(int64_t value)
{
substrait::Expression * rel = new substrait::Expression();
auto * literal = rel->mutable_literal();
literal->set_timestamp(value);
return rel;
}

}
5 changes: 4 additions & 1 deletion utils/local-engine/Builder/SerializedPlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class SerializedPlanBuilder
{
public:
SerializedPlanBuilder();

SerializedPlanBuilder & registerSupportedFunctions()
{
this->registerFunction(IS_NOT_NULL, "is_not_null")
Expand All @@ -34,10 +35,11 @@ class SerializedPlanBuilder
.registerFunction(LESS_THAN, "lt")
.registerFunction(MULTIPLY, "multiply")
.registerFunction(SUM, "sum")
.registerFunction(TO_DATE, "TO_DATE")
.registerFunction(TO_DATE, "to_date")
.registerFunction(EQUAL_TO, "equal");
return *this;
}

SerializedPlanBuilder & registerFunction(int id, std::string name);
SerializedPlanBuilder & filter(substrait::Expression * condition);
SerializedPlanBuilder & project(std::vector<substrait::Expression *> projections);
Expand Down Expand Up @@ -85,6 +87,7 @@ substrait::Expression * literal(double_t value);
substrait::Expression * literal(int32_t value);
substrait::Expression * literal(std::string value);
substrait::Expression * literalDate(int32_t value);
substrait::Expression * literalTimestamp(int64_t value);

substrait::Expression * selection(int32_t field_id);

Expand Down
12 changes: 9 additions & 3 deletions utils/local-engine/Common/DebugUtils.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "DebugUtils.h"
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <Formats/FormatSettings.h>
#include <Functions/FunctionHelpers.h>
#include <IO/WriteBufferFromString.h>
Expand Down Expand Up @@ -81,11 +82,16 @@ void headBlock(const DB::Block & block, size_t count)
date_type->getSerialization(DB::ISerialization::Kind::DEFAULT)->serializeText(*nested_col, row, wb, {});
std::cerr << date_string.substr(0, 10) << "\t";
}
else
else if (which.isDateTime64())
{
std::cerr << "N/A"
<< "\t";
const auto * datetime64_type = DB::checkAndGetDataType<DB::DataTypeDateTime64>(nested_type.get());
String datetime64_string;
DB::WriteBufferFromString wb(datetime64_string);
datetime64_type->getSerialization(DB::ISerialization::Kind::DEFAULT)->serializeText(*nested_col, row, wb, {});
std::cerr << datetime64_string << "\t";
}
else
std::cerr << "N/A" << "\t";
}
std::cerr << std::endl;
}
Expand Down
19 changes: 19 additions & 0 deletions utils/local-engine/Parser/CHColumnToSparkRow.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "CHColumnToSparkRow.h"
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/IColumn.h>
#include <DataTypes/DataTypeNullable.h>
#include <Core/Types.h>
Expand Down Expand Up @@ -157,6 +158,24 @@ void writeValue(
{
WRITE_VECTOR_COLUMN(UInt32, uint32_t, get64)
}
else if (which.isDateTime64())
{
using ColumnDateTime64 = ColumnDecimal<DateTime64>;
const auto * datetime64_col = checkAndGetColumn<ColumnDateTime64>(*nested_col);
for (auto i=0; i<num_rows; i++)
{
bool is_null = nullable_column && nullable_column->isNullAt(i);
if (is_null)
{
setNullAt(buffer_address, offsets[i], field_offset, col_index);
}
else
{
auto * pointer = reinterpret_cast<int64_t *>(buffer_address + offsets[i] + field_offset);
pointer[0] = datetime64_col->getInt(i);
}
}
}
else if (which.isString())
{
const auto * string_col = checkAndGetColumn<ColumnString>(*nested_col);
Expand Down
Loading

0 comments on commit f5909f2

Please sign in to comment.