Skip to content

Commit

Permalink
Cell maker lib & parsing from json KIKIMR-20673 (ydb-platform#1255)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Jan 24, 2024
1 parent 78daf8b commit abb4ce7
Show file tree
Hide file tree
Showing 18 changed files with 276 additions and 99 deletions.
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/rpc_import_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/io_formats/csv.h>
#include <ydb/core/io_formats/ydb_dump/csv_ydb_dump.h>

#include <ydb/library/actors/core/hfunc.h>

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ PEERDIR(
ydb/core/grpc_services/cancelation
ydb/core/grpc_services/auth_processor
ydb/core/health_check
ydb/core/io_formats
ydb/core/io_formats/ydb_dump
ydb/core/kesus/tablet
ydb/core/kqp/common
ydb/core/protos
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "csv.h"
#include "csv_arrow.h"

#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/serializer/stream.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/util/value_parsing.h>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,12 @@
#pragma once

#include <ydb/core/scheme/scheme_tablecell.h>
#include <ydb/public/lib/scheme_types/scheme_type_id.h>

#include <util/generic/strbuf.h>
#include <util/memory/pool.h>
#include <ydb/core/scheme_types/scheme_type_info.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/csv/api.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/io/api.h>

namespace NKikimr::NFormats {

struct TYdbDump {
// Parse YdbDump-formatted line
static bool ParseLine(TStringBuf line, const std::vector<std::pair<i32, NScheme::TTypeInfo>>& columnOrderTypes, TMemoryPool& pool,
TVector<TCell>& keys, TVector<TCell>& values, TString& strError, ui64& numBytes);
};

class TArrowCSV {
public:
static constexpr ui32 DEFAULT_BLOCK_SIZE = 1024 * 1024;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "csv.h"
#include "csv_arrow.h"

#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <library/cpp/testing/unittest/registar.h>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
UNITTEST_FOR(ydb/core/io_formats)
UNITTEST_FOR(ydb/core/io_formats/arrow)

SIZE(SMALL)

PEERDIR(
ydb/core/io_formats
ydb/core/io_formats/arrow

# for NYql::NUdf alloc stuff used in binary_json
ydb/library/yql/public/udf/service/exception_policy
Expand All @@ -13,7 +13,7 @@ PEERDIR(
YQL_LAST_ABI_VERSION()

SRCS(
ut_csv.cpp
csv_arrow_ut.cpp
)

END()
20 changes: 20 additions & 0 deletions ydb/core/io_formats/arrow/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
RECURSE_FOR_TESTS(ut)

LIBRARY()

SRCS(
csv_arrow.cpp
)

CFLAGS(
-Wno-unused-parameter
)

PEERDIR(
ydb/core/scheme_types
ydb/core/formats/arrow
)

YQL_LAST_ABI_VERSION()

END()
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
#include "csv.h"

#include <contrib/libs/double-conversion/double-conversion/double-conversion.h>
#include "cell_maker.h"

#include <ydb/library/binary_json/write.h>
#include <ydb/library/dynumber/dynumber.h>

#include <library/cpp/string_utils/quote/quote.h>

#include <ydb/library/yql/minikql/dom/yson.h>
#include <ydb/library/yql/minikql/dom/json.h>
#include <ydb/library/yql/public/udf/udf_types.h>
#include <ydb/library/yql/public/decimal/yql_decimal.h>
#include <ydb/library/yql/public/udf/udf_types.h>
#include <ydb/library/yql/utils/utf8.h>

#include <contrib/libs/double-conversion/double-conversion/double-conversion.h>
#include <library/cpp/json/json_writer.h>
#include <library/cpp/json/yson/json2yson.h>
#include <library/cpp/string_utils/base64/base64.h>
#include <library/cpp/string_utils/quote/quote.h>

#include <util/datetime/base.h>
#include <util/string/cast.h>

Expand Down Expand Up @@ -178,6 +180,15 @@ namespace {
return false;
}

return Conv(c, t, pool, conv);
}

static bool MakeDirect(TCell& c, const T& v, TMemoryPool& pool, TString&, TConverter<T, U> conv = &Implicit<T, U>) {
return Conv(c, v, pool, conv);
}

private:
static bool Conv(TCell& c, const T& t, TMemoryPool& pool, TConverter<T, U> conv) {
auto& u = *pool.Allocate<U>();
u = conv(t);
c = TCell(reinterpret_cast<const char*>(&u), sizeof(u));
Expand All @@ -195,10 +206,11 @@ namespace {
return false;
}

const auto u = pool.AppendString(conv(t));
c = TCell(u.data(), u.size());
return Conv(c, t, pool, conv);
}

return true;
static bool MakeDirect(TCell& c, const T& v, TMemoryPool& pool, TString&, TConverter<T, TStringBuf> conv = &Implicit<T, TStringBuf>) {
return Conv(c, v, pool, conv);
}

static bool Make(TCell& c, TStringBuf v, TMemoryPool& pool, TString& err, TConverter<T, TStringBuf> conv, void* parseParam) {
Expand All @@ -207,13 +219,31 @@ namespace {
return false;
}

return Conv(c, t, pool, conv);
}

private:
static bool Conv(TCell& c, const T& t, TMemoryPool& pool, TConverter<T, TStringBuf> conv) {
const auto u = pool.AppendString(conv(t));
c = TCell(u.data(), u.size());

return true;
}
};

NJson::TJsonWriterConfig DefaultJsonConfig() {
NJson::TJsonWriterConfig jsonConfig;
jsonConfig.ValidateUtf8 = false;
jsonConfig.WriteNanAsString = true;
return jsonConfig;
}

TString WriteJson(const NJson::TJsonValue& json) {
TStringStream str;
NJson::WriteJson(&str, &json, DefaultJsonConfig());
return str.Str();
}

} // anonymous

bool MakeCell(TCell& cell, TStringBuf value, NScheme::TTypeInfo type, TMemoryPool& pool, TString& err) {
Expand Down Expand Up @@ -272,6 +302,71 @@ bool MakeCell(TCell& cell, TStringBuf value, NScheme::TTypeInfo type, TMemoryPoo
}
}

bool MakeCell(TCell& cell, const NJson::TJsonValue& value, NScheme::TTypeInfo type, TMemoryPool& pool, TString& err) {
if (value.IsNull()) {
return true;
}

try {
switch (type.GetTypeId()) {
case NScheme::NTypeIds::Bool:
return TCellMaker<bool>::MakeDirect(cell, value.GetBooleanSafe(), pool, err);
case NScheme::NTypeIds::Int8:
return TCellMaker<i8>::MakeDirect(cell, value.GetIntegerSafe(), pool, err);
case NScheme::NTypeIds::Uint8:
return TCellMaker<ui8>::MakeDirect(cell, value.GetUIntegerSafe(), pool, err);
case NScheme::NTypeIds::Int16:
return TCellMaker<i16>::MakeDirect(cell, value.GetIntegerSafe(), pool, err);
case NScheme::NTypeIds::Uint16:
return TCellMaker<ui16>::MakeDirect(cell, value.GetUIntegerSafe(), pool, err);
case NScheme::NTypeIds::Int32:
return TCellMaker<i32>::MakeDirect(cell, value.GetIntegerSafe(), pool, err);
case NScheme::NTypeIds::Uint32:
return TCellMaker<ui32>::MakeDirect(cell, value.GetUIntegerSafe(), pool, err);
case NScheme::NTypeIds::Int64:
return TCellMaker<i64>::MakeDirect(cell, value.GetIntegerSafe(), pool, err);
case NScheme::NTypeIds::Uint64:
return TCellMaker<ui64>::MakeDirect(cell, value.GetUIntegerSafe(), pool, err);
case NScheme::NTypeIds::Float:
return TCellMaker<float>::MakeDirect(cell, value.GetDoubleSafe(), pool, err);
case NScheme::NTypeIds::Double:
return TCellMaker<double>::MakeDirect(cell, value.GetDoubleSafe(), pool, err);
case NScheme::NTypeIds::Date:
return TCellMaker<TInstant, ui16>::Make(cell, value.GetStringSafe(), pool, err, &Days);
case NScheme::NTypeIds::Datetime:
return TCellMaker<TInstant, ui32>::Make(cell, value.GetStringSafe(), pool, err, &Seconds);
case NScheme::NTypeIds::Timestamp:
return TCellMaker<TInstant, ui64>::Make(cell, value.GetStringSafe(), pool, err, &MicroSeconds);
case NScheme::NTypeIds::Interval:
return TCellMaker<i64>::MakeDirect(cell, value.GetIntegerSafe(), pool, err);
case NScheme::NTypeIds::String:
case NScheme::NTypeIds::String4k:
case NScheme::NTypeIds::String2m:
return TCellMaker<TString, TStringBuf>::MakeDirect(cell, Base64Decode(value.GetStringSafe()), pool, err);
case NScheme::NTypeIds::Utf8:
return TCellMaker<TString, TStringBuf>::MakeDirect(cell, value.GetStringSafe(), pool, err);
case NScheme::NTypeIds::Yson:
return TCellMaker<TString, TStringBuf>::MakeDirect(cell, NJson2Yson::SerializeJsonValueAsYson(value), pool, err);
case NScheme::NTypeIds::Json:
return TCellMaker<TString, TStringBuf>::MakeDirect(cell, NFormats::WriteJson(value), pool, err);
case NScheme::NTypeIds::JsonDocument:
if (const auto& result = NBinaryJson::SerializeToBinaryJson(NFormats::WriteJson(value))) {
return TCellMaker<TMaybe<NBinaryJson::TBinaryJson>, TStringBuf>::MakeDirect(cell, result, pool, err, &BinaryJsonToStringBuf);
} else {
return false;
}
case NScheme::NTypeIds::DyNumber:
return TCellMaker<TMaybe<TString>, TStringBuf>::Make(cell, value.GetStringSafe(), pool, err, &DyNumberToStringBuf);
case NScheme::NTypeIds::Decimal:
return TCellMaker<NYql::NDecimal::TInt128, std::pair<ui64, ui64>>::Make(cell, value.GetStringSafe(), pool, err, &Int128ToPair);
default:
return false;
}
} catch (const yexception&) {
return false;
}
}

bool CheckCellValue(const TCell& cell, NScheme::TTypeInfo type) {
if (cell.IsNull()) {
return true;
Expand Down Expand Up @@ -317,45 +412,4 @@ bool CheckCellValue(const TCell& cell, NScheme::TTypeInfo type) {
}
}

bool TYdbDump::ParseLine(TStringBuf line, const std::vector<std::pair<i32, NScheme::TTypeInfo>>& columnOrderTypes, TMemoryPool& pool,
TVector<TCell>& keys, TVector<TCell>& values, TString& strError, ui64& numBytes)
{
for (const auto& [keyOrder, pType] : columnOrderTypes) {
TStringBuf value = line.NextTok(',');
if (!value) {
strError = "Empty token";
return false;
}

TCell* cell = nullptr;

if (keyOrder != -1) {
if ((int)keys.size() < (keyOrder + 1)) {
keys.resize(keyOrder + 1);
}

cell = &keys.at(keyOrder);
} else {
cell = &values.emplace_back();
}

Y_ABORT_UNLESS(cell);

TString parseError;
if (!MakeCell(*cell, value, pType, pool, parseError)) {
strError = TStringBuilder() << "Value parse error: '" << value << "' " << parseError;
return false;
}

if (!CheckCellValue(*cell, pType)) {
strError = TStringBuilder() << "Value check error: '" << value << "'";
return false;
}

numBytes += cell->Size();
}

return true;
}

}
18 changes: 18 additions & 0 deletions ydb/core/io_formats/cell_maker/cell_maker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#pragma once

#include <ydb/core/scheme/scheme_tablecell.h>
#include <ydb/core/scheme_types/scheme_type_info.h>

#include <library/cpp/json/json_value.h>

#include <util/generic/strbuf.h>
#include <util/generic/string.h>
#include <util/memory/pool.h>

namespace NKikimr::NFormats {

bool MakeCell(TCell& cell, TStringBuf value, NScheme::TTypeInfo type, TMemoryPool& pool, TString& err);
bool MakeCell(TCell& cell, const NJson::TJsonValue& value, NScheme::TTypeInfo type, TMemoryPool& pool, TString& err);
bool CheckCellValue(const TCell& cell, NScheme::TTypeInfo type);

}
25 changes: 25 additions & 0 deletions ydb/core/io_formats/cell_maker/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
LIBRARY()

SRCS(
cell_maker.cpp
)

PEERDIR(
ydb/core/scheme
ydb/core/scheme_types
ydb/library/binary_json
ydb/library/dynumber
ydb/library/yql/minikql/dom
ydb/library/yql/public/decimal
ydb/library/yql/public/udf
ydb/library/yql/utils
contrib/libs/double-conversion
library/cpp/json
library/cpp/json/yson
library/cpp/string_utils/base64
library/cpp/string_utils/quote
)

YQL_LAST_ABI_VERSION()

END()
33 changes: 6 additions & 27 deletions ydb/core/io_formats/ya.make
Original file line number Diff line number Diff line change
@@ -1,30 +1,9 @@
RECURSE_FOR_TESTS(ut)

LIBRARY()

SRCS(
csv_ydb_dump.cpp
csv_arrow.cpp
)

CFLAGS(
-Wno-unused-parameter
RECURSE(
arrow
cell_maker
ydb_dump
)

PEERDIR(
contrib/libs/double-conversion
library/cpp/string_utils/quote
ydb/core/formats/arrow
ydb/core/scheme
ydb/library/binary_json
ydb/library/dynumber
ydb/library/yql/minikql/dom
ydb/library/yql/public/decimal
ydb/library/yql/public/udf
ydb/library/yql/utils
ydb/public/lib/scheme_types
RECURSE_FOR_TESTS(
arrow
)

YQL_LAST_ABI_VERSION()

END()
Loading

0 comments on commit abb4ce7

Please sign in to comment.