From abb4ce7f401cade25ae56d1786cfb15ca440a221 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Wed, 24 Jan 2024 16:11:36 +0300 Subject: [PATCH] Cell maker lib & parsing from json KIKIMR-20673 (#1255) --- ydb/core/grpc_services/rpc_import_data.cpp | 2 +- ydb/core/grpc_services/ya.make | 2 +- ydb/core/io_formats/{ => arrow}/csv_arrow.cpp | 4 +- .../io_formats/{csv.h => arrow/csv_arrow.h} | 12 +- .../{ut_csv.cpp => arrow/csv_arrow_ut.cpp} | 2 +- ydb/core/io_formats/{ => arrow}/ut/ya.make | 6 +- ydb/core/io_formats/arrow/ya.make | 20 +++ .../cell_maker.cpp} | 154 ++++++++++++------ ydb/core/io_formats/cell_maker/cell_maker.h | 18 ++ ydb/core/io_formats/cell_maker/ya.make | 25 +++ ydb/core/io_formats/ya.make | 33 +--- ydb/core/io_formats/ydb_dump/csv_ydb_dump.cpp | 48 ++++++ ydb/core/io_formats/ydb_dump/csv_ydb_dump.h | 26 +++ ydb/core/io_formats/ydb_dump/ya.make | 15 ++ ydb/core/tx/datashard/import_s3.cpp | 2 +- ydb/core/tx/datashard/ya.make | 2 +- .../tx/tx_proxy/upload_rows_common_impl.h | 2 +- ydb/core/tx/tx_proxy/ya.make | 2 +- 18 files changed, 276 insertions(+), 99 deletions(-) rename ydb/core/io_formats/{ => arrow}/csv_arrow.cpp (99%) rename ydb/core/io_formats/{csv.h => arrow/csv_arrow.h} (82%) rename ydb/core/io_formats/{ut_csv.cpp => arrow/csv_arrow_ut.cpp} (99%) rename ydb/core/io_formats/{ => arrow}/ut/ya.make (69%) create mode 100644 ydb/core/io_formats/arrow/ya.make rename ydb/core/io_formats/{csv_ydb_dump.cpp => cell_maker/cell_maker.cpp} (67%) create mode 100644 ydb/core/io_formats/cell_maker/cell_maker.h create mode 100644 ydb/core/io_formats/cell_maker/ya.make create mode 100644 ydb/core/io_formats/ydb_dump/csv_ydb_dump.cpp create mode 100644 ydb/core/io_formats/ydb_dump/csv_ydb_dump.h create mode 100644 ydb/core/io_formats/ydb_dump/ya.make diff --git a/ydb/core/grpc_services/rpc_import_data.cpp b/ydb/core/grpc_services/rpc_import_data.cpp index 0ab87e29f22a..250421386d10 100644 --- a/ydb/core/grpc_services/rpc_import_data.cpp +++ b/ydb/core/grpc_services/rpc_import_data.cpp @@ -11,7 +11,7 @@ #include #include #include -#include +#include #include diff --git a/ydb/core/grpc_services/ya.make b/ydb/core/grpc_services/ya.make index aff471b356bc..c3fd88321a38 100644 --- a/ydb/core/grpc_services/ya.make +++ b/ydb/core/grpc_services/ya.make @@ -104,7 +104,7 @@ PEERDIR( ydb/core/grpc_services/cancelation ydb/core/grpc_services/auth_processor ydb/core/health_check - ydb/core/io_formats + ydb/core/io_formats/ydb_dump ydb/core/kesus/tablet ydb/core/kqp/common ydb/core/protos diff --git a/ydb/core/io_formats/csv_arrow.cpp b/ydb/core/io_formats/arrow/csv_arrow.cpp similarity index 99% rename from ydb/core/io_formats/csv_arrow.cpp rename to ydb/core/io_formats/arrow/csv_arrow.cpp index 896f535bb976..06b070d76db2 100644 --- a/ydb/core/io_formats/csv_arrow.cpp +++ b/ydb/core/io_formats/arrow/csv_arrow.cpp @@ -1,6 +1,8 @@ -#include "csv.h" +#include "csv_arrow.h" + #include #include + #include #include diff --git a/ydb/core/io_formats/csv.h b/ydb/core/io_formats/arrow/csv_arrow.h similarity index 82% rename from ydb/core/io_formats/csv.h rename to ydb/core/io_formats/arrow/csv_arrow.h index 7c99cef05366..a4e5b912f664 100644 --- a/ydb/core/io_formats/csv.h +++ b/ydb/core/io_formats/arrow/csv_arrow.h @@ -1,22 +1,12 @@ #pragma once -#include -#include - -#include -#include +#include #include #include namespace NKikimr::NFormats { -struct TYdbDump { - // Parse YdbDump-formatted line - static bool ParseLine(TStringBuf line, const std::vector>& columnOrderTypes, TMemoryPool& pool, - TVector& keys, TVector& values, TString& strError, ui64& numBytes); -}; - class TArrowCSV { public: static constexpr ui32 DEFAULT_BLOCK_SIZE = 1024 * 1024; diff --git a/ydb/core/io_formats/ut_csv.cpp b/ydb/core/io_formats/arrow/csv_arrow_ut.cpp similarity index 99% rename from ydb/core/io_formats/ut_csv.cpp rename to ydb/core/io_formats/arrow/csv_arrow_ut.cpp index 0257e923e16a..5716126eb83e 100644 --- a/ydb/core/io_formats/ut_csv.cpp +++ b/ydb/core/io_formats/arrow/csv_arrow_ut.cpp @@ -1,4 +1,4 @@ -#include "csv.h" +#include "csv_arrow.h" #include #include diff --git a/ydb/core/io_formats/ut/ya.make b/ydb/core/io_formats/arrow/ut/ya.make similarity index 69% rename from ydb/core/io_formats/ut/ya.make rename to ydb/core/io_formats/arrow/ut/ya.make index 7b8ba5f60659..5e9b91b525d1 100644 --- a/ydb/core/io_formats/ut/ya.make +++ b/ydb/core/io_formats/arrow/ut/ya.make @@ -1,9 +1,9 @@ -UNITTEST_FOR(ydb/core/io_formats) +UNITTEST_FOR(ydb/core/io_formats/arrow) SIZE(SMALL) PEERDIR( - ydb/core/io_formats + ydb/core/io_formats/arrow # for NYql::NUdf alloc stuff used in binary_json ydb/library/yql/public/udf/service/exception_policy @@ -13,7 +13,7 @@ PEERDIR( YQL_LAST_ABI_VERSION() SRCS( - ut_csv.cpp + csv_arrow_ut.cpp ) END() diff --git a/ydb/core/io_formats/arrow/ya.make b/ydb/core/io_formats/arrow/ya.make new file mode 100644 index 000000000000..58ba9c23ba38 --- /dev/null +++ b/ydb/core/io_formats/arrow/ya.make @@ -0,0 +1,20 @@ +RECURSE_FOR_TESTS(ut) + +LIBRARY() + +SRCS( + csv_arrow.cpp +) + +CFLAGS( + -Wno-unused-parameter +) + +PEERDIR( + ydb/core/scheme_types + ydb/core/formats/arrow +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/io_formats/csv_ydb_dump.cpp b/ydb/core/io_formats/cell_maker/cell_maker.cpp similarity index 67% rename from ydb/core/io_formats/csv_ydb_dump.cpp rename to ydb/core/io_formats/cell_maker/cell_maker.cpp index 53feedfde2f2..1fc259159728 100644 --- a/ydb/core/io_formats/csv_ydb_dump.cpp +++ b/ydb/core/io_formats/cell_maker/cell_maker.cpp @@ -1,18 +1,20 @@ -#include "csv.h" - -#include +#include "cell_maker.h" #include #include -#include - #include #include -#include #include +#include #include +#include +#include +#include +#include +#include + #include #include @@ -178,6 +180,15 @@ namespace { return false; } + return Conv(c, t, pool, conv); + } + + static bool MakeDirect(TCell& c, const T& v, TMemoryPool& pool, TString&, TConverter conv = &Implicit) { + return Conv(c, v, pool, conv); + } + + private: + static bool Conv(TCell& c, const T& t, TMemoryPool& pool, TConverter conv) { auto& u = *pool.Allocate(); u = conv(t); c = TCell(reinterpret_cast(&u), sizeof(u)); @@ -195,10 +206,11 @@ namespace { return false; } - const auto u = pool.AppendString(conv(t)); - c = TCell(u.data(), u.size()); + return Conv(c, t, pool, conv); + } - return true; + static bool MakeDirect(TCell& c, const T& v, TMemoryPool& pool, TString&, TConverter conv = &Implicit) { + return Conv(c, v, pool, conv); } static bool Make(TCell& c, TStringBuf v, TMemoryPool& pool, TString& err, TConverter conv, void* parseParam) { @@ -207,6 +219,11 @@ namespace { return false; } + return Conv(c, t, pool, conv); + } + + private: + static bool Conv(TCell& c, const T& t, TMemoryPool& pool, TConverter conv) { const auto u = pool.AppendString(conv(t)); c = TCell(u.data(), u.size()); @@ -214,6 +231,19 @@ namespace { } }; + NJson::TJsonWriterConfig DefaultJsonConfig() { + NJson::TJsonWriterConfig jsonConfig; + jsonConfig.ValidateUtf8 = false; + jsonConfig.WriteNanAsString = true; + return jsonConfig; + } + + TString WriteJson(const NJson::TJsonValue& json) { + TStringStream str; + NJson::WriteJson(&str, &json, DefaultJsonConfig()); + return str.Str(); + } + } // anonymous bool MakeCell(TCell& cell, TStringBuf value, NScheme::TTypeInfo type, TMemoryPool& pool, TString& err) { @@ -272,6 +302,71 @@ bool MakeCell(TCell& cell, TStringBuf value, NScheme::TTypeInfo type, TMemoryPoo } } +bool MakeCell(TCell& cell, const NJson::TJsonValue& value, NScheme::TTypeInfo type, TMemoryPool& pool, TString& err) { + if (value.IsNull()) { + return true; + } + + try { + switch (type.GetTypeId()) { + case NScheme::NTypeIds::Bool: + return TCellMaker::MakeDirect(cell, value.GetBooleanSafe(), pool, err); + case NScheme::NTypeIds::Int8: + return TCellMaker::MakeDirect(cell, value.GetIntegerSafe(), pool, err); + case NScheme::NTypeIds::Uint8: + return TCellMaker::MakeDirect(cell, value.GetUIntegerSafe(), pool, err); + case NScheme::NTypeIds::Int16: + return TCellMaker::MakeDirect(cell, value.GetIntegerSafe(), pool, err); + case NScheme::NTypeIds::Uint16: + return TCellMaker::MakeDirect(cell, value.GetUIntegerSafe(), pool, err); + case NScheme::NTypeIds::Int32: + return TCellMaker::MakeDirect(cell, value.GetIntegerSafe(), pool, err); + case NScheme::NTypeIds::Uint32: + return TCellMaker::MakeDirect(cell, value.GetUIntegerSafe(), pool, err); + case NScheme::NTypeIds::Int64: + return TCellMaker::MakeDirect(cell, value.GetIntegerSafe(), pool, err); + case NScheme::NTypeIds::Uint64: + return TCellMaker::MakeDirect(cell, value.GetUIntegerSafe(), pool, err); + case NScheme::NTypeIds::Float: + return TCellMaker::MakeDirect(cell, value.GetDoubleSafe(), pool, err); + case NScheme::NTypeIds::Double: + return TCellMaker::MakeDirect(cell, value.GetDoubleSafe(), pool, err); + case NScheme::NTypeIds::Date: + return TCellMaker::Make(cell, value.GetStringSafe(), pool, err, &Days); + case NScheme::NTypeIds::Datetime: + return TCellMaker::Make(cell, value.GetStringSafe(), pool, err, &Seconds); + case NScheme::NTypeIds::Timestamp: + return TCellMaker::Make(cell, value.GetStringSafe(), pool, err, &MicroSeconds); + case NScheme::NTypeIds::Interval: + return TCellMaker::MakeDirect(cell, value.GetIntegerSafe(), pool, err); + case NScheme::NTypeIds::String: + case NScheme::NTypeIds::String4k: + case NScheme::NTypeIds::String2m: + return TCellMaker::MakeDirect(cell, Base64Decode(value.GetStringSafe()), pool, err); + case NScheme::NTypeIds::Utf8: + return TCellMaker::MakeDirect(cell, value.GetStringSafe(), pool, err); + case NScheme::NTypeIds::Yson: + return TCellMaker::MakeDirect(cell, NJson2Yson::SerializeJsonValueAsYson(value), pool, err); + case NScheme::NTypeIds::Json: + return TCellMaker::MakeDirect(cell, NFormats::WriteJson(value), pool, err); + case NScheme::NTypeIds::JsonDocument: + if (const auto& result = NBinaryJson::SerializeToBinaryJson(NFormats::WriteJson(value))) { + return TCellMaker, TStringBuf>::MakeDirect(cell, result, pool, err, &BinaryJsonToStringBuf); + } else { + return false; + } + case NScheme::NTypeIds::DyNumber: + return TCellMaker, TStringBuf>::Make(cell, value.GetStringSafe(), pool, err, &DyNumberToStringBuf); + case NScheme::NTypeIds::Decimal: + return TCellMaker>::Make(cell, value.GetStringSafe(), pool, err, &Int128ToPair); + default: + return false; + } + } catch (const yexception&) { + return false; + } +} + bool CheckCellValue(const TCell& cell, NScheme::TTypeInfo type) { if (cell.IsNull()) { return true; @@ -317,45 +412,4 @@ bool CheckCellValue(const TCell& cell, NScheme::TTypeInfo type) { } } -bool TYdbDump::ParseLine(TStringBuf line, const std::vector>& columnOrderTypes, TMemoryPool& pool, - TVector& keys, TVector& values, TString& strError, ui64& numBytes) -{ - for (const auto& [keyOrder, pType] : columnOrderTypes) { - TStringBuf value = line.NextTok(','); - if (!value) { - strError = "Empty token"; - return false; - } - - TCell* cell = nullptr; - - if (keyOrder != -1) { - if ((int)keys.size() < (keyOrder + 1)) { - keys.resize(keyOrder + 1); - } - - cell = &keys.at(keyOrder); - } else { - cell = &values.emplace_back(); - } - - Y_ABORT_UNLESS(cell); - - TString parseError; - if (!MakeCell(*cell, value, pType, pool, parseError)) { - strError = TStringBuilder() << "Value parse error: '" << value << "' " << parseError; - return false; - } - - if (!CheckCellValue(*cell, pType)) { - strError = TStringBuilder() << "Value check error: '" << value << "'"; - return false; - } - - numBytes += cell->Size(); - } - - return true; -} - } diff --git a/ydb/core/io_formats/cell_maker/cell_maker.h b/ydb/core/io_formats/cell_maker/cell_maker.h new file mode 100644 index 000000000000..a5f198ba3753 --- /dev/null +++ b/ydb/core/io_formats/cell_maker/cell_maker.h @@ -0,0 +1,18 @@ +#pragma once + +#include +#include + +#include + +#include +#include +#include + +namespace NKikimr::NFormats { + +bool MakeCell(TCell& cell, TStringBuf value, NScheme::TTypeInfo type, TMemoryPool& pool, TString& err); +bool MakeCell(TCell& cell, const NJson::TJsonValue& value, NScheme::TTypeInfo type, TMemoryPool& pool, TString& err); +bool CheckCellValue(const TCell& cell, NScheme::TTypeInfo type); + +} diff --git a/ydb/core/io_formats/cell_maker/ya.make b/ydb/core/io_formats/cell_maker/ya.make new file mode 100644 index 000000000000..67a752c4ffc1 --- /dev/null +++ b/ydb/core/io_formats/cell_maker/ya.make @@ -0,0 +1,25 @@ +LIBRARY() + +SRCS( + cell_maker.cpp +) + +PEERDIR( + ydb/core/scheme + ydb/core/scheme_types + ydb/library/binary_json + ydb/library/dynumber + ydb/library/yql/minikql/dom + ydb/library/yql/public/decimal + ydb/library/yql/public/udf + ydb/library/yql/utils + contrib/libs/double-conversion + library/cpp/json + library/cpp/json/yson + library/cpp/string_utils/base64 + library/cpp/string_utils/quote +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/io_formats/ya.make b/ydb/core/io_formats/ya.make index 4838ff96e1bf..f1af670980a5 100644 --- a/ydb/core/io_formats/ya.make +++ b/ydb/core/io_formats/ya.make @@ -1,30 +1,9 @@ -RECURSE_FOR_TESTS(ut) - -LIBRARY() - -SRCS( - csv_ydb_dump.cpp - csv_arrow.cpp -) - -CFLAGS( - -Wno-unused-parameter +RECURSE( + arrow + cell_maker + ydb_dump ) -PEERDIR( - contrib/libs/double-conversion - library/cpp/string_utils/quote - ydb/core/formats/arrow - ydb/core/scheme - ydb/library/binary_json - ydb/library/dynumber - ydb/library/yql/minikql/dom - ydb/library/yql/public/decimal - ydb/library/yql/public/udf - ydb/library/yql/utils - ydb/public/lib/scheme_types +RECURSE_FOR_TESTS( + arrow ) - -YQL_LAST_ABI_VERSION() - -END() diff --git a/ydb/core/io_formats/ydb_dump/csv_ydb_dump.cpp b/ydb/core/io_formats/ydb_dump/csv_ydb_dump.cpp new file mode 100644 index 000000000000..e1e266d67ead --- /dev/null +++ b/ydb/core/io_formats/ydb_dump/csv_ydb_dump.cpp @@ -0,0 +1,48 @@ +#include "csv_ydb_dump.h" + +#include + +namespace NKikimr::NFormats { + +bool TYdbDump::ParseLine(TStringBuf line, const std::vector>& columnOrderTypes, TMemoryPool& pool, + TVector& keys, TVector& values, TString& strError, ui64& numBytes) +{ + for (const auto& [keyOrder, pType] : columnOrderTypes) { + TStringBuf value = line.NextTok(','); + if (!value) { + strError = "Empty token"; + return false; + } + + TCell* cell = nullptr; + + if (keyOrder != -1) { + if ((int)keys.size() < (keyOrder + 1)) { + keys.resize(keyOrder + 1); + } + + cell = &keys.at(keyOrder); + } else { + cell = &values.emplace_back(); + } + + Y_ABORT_UNLESS(cell); + + TString parseError; + if (!MakeCell(*cell, value, pType, pool, parseError)) { + strError = TStringBuilder() << "Value parse error: '" << value << "' " << parseError; + return false; + } + + if (!CheckCellValue(*cell, pType)) { + strError = TStringBuilder() << "Value check error: '" << value << "'"; + return false; + } + + numBytes += cell->Size(); + } + + return true; +} + +} diff --git a/ydb/core/io_formats/ydb_dump/csv_ydb_dump.h b/ydb/core/io_formats/ydb_dump/csv_ydb_dump.h new file mode 100644 index 000000000000..aba24449bc4d --- /dev/null +++ b/ydb/core/io_formats/ydb_dump/csv_ydb_dump.h @@ -0,0 +1,26 @@ +#pragma once + +#include +#include + +#include +#include +#include + +namespace NKikimr::NFormats { + +struct TYdbDump { + // Parse YdbDump-formatted line. + // Returns true in case of success, false otherwise. + // numBytes will be increased by the size of the processed cells + static bool ParseLine( + TStringBuf line, + const std::vector>& columnOrderTypes, + TMemoryPool& pool, + TVector& keys, + TVector& values, + TString& strError, + ui64& numBytes); +}; + +} diff --git a/ydb/core/io_formats/ydb_dump/ya.make b/ydb/core/io_formats/ydb_dump/ya.make new file mode 100644 index 000000000000..db03fc483eb1 --- /dev/null +++ b/ydb/core/io_formats/ydb_dump/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +SRCS( + csv_ydb_dump.cpp +) + +PEERDIR( + ydb/core/scheme + ydb/core/scheme_types + ydb/core/io_formats/cell_maker +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/tx/datashard/import_s3.cpp b/ydb/core/tx/datashard/import_s3.cpp index 3b25118d9760..c1b6c2dd599d 100644 --- a/ydb/core/tx/datashard/import_s3.cpp +++ b/ydb/core/tx/datashard/import_s3.cpp @@ -14,7 +14,7 @@ #include #include #include -#include +#include #include #include diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index 5cc688f117c8..49541b974798 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -237,7 +237,7 @@ PEERDIR( ydb/core/engine ydb/core/engine/minikql ydb/core/formats - ydb/core/io_formats + ydb/core/io_formats/ydb_dump ydb/core/kqp/runtime ydb/core/persqueue/partition_key_range ydb/core/persqueue/writer diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index bccb959d30c6..16f4964299b5 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -7,7 +7,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/ydb/core/tx/tx_proxy/ya.make b/ydb/core/tx/tx_proxy/ya.make index 64c681e43c4c..32954a8058ae 100644 --- a/ydb/core/tx/tx_proxy/ya.make +++ b/ydb/core/tx/tx_proxy/ya.make @@ -29,7 +29,7 @@ PEERDIR( ydb/core/engine ydb/core/formats ydb/core/grpc_services - ydb/core/io_formats + ydb/core/io_formats/arrow ydb/core/protos ydb/core/scheme ydb/core/sys_view/common