diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp index ad204f6a365a..82cebc19e0cc 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp @@ -1,4 +1,5 @@ #include "yql_pq_provider_impl.h" +#include "yql_pq_helpers.h" #include #include @@ -7,6 +8,7 @@ #include #include #include +#include #include #include @@ -156,6 +158,13 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase { return TStatus::Error; } + if (const auto maybeSharedReadingSetting = FindSetting(topicSource.Settings().Ptr(), SharedReading)) { + const TExprNode& value = maybeSharedReadingSetting.Cast().Ref(); + if (value.IsAtom() && FromString(value.Content())) { + input.Ptr()->SetTypeAnn(ctx.MakeType(topicSource.RowType().Ref().GetTypeAnn())); + return TStatus::Ok; + } + } if (topic.Metadata().Empty()) { input.Ptr()->SetTypeAnn(ctx.MakeType(ctx.MakeType(EDataSlot::String))); diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp index c4fc523825f1..de525c933685 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp @@ -101,5 +101,20 @@ void FillSettingsWithResolvedYdsIds( } } +TMaybeNode FindSetting(TExprNode::TPtr settings, TStringBuf name) { + const auto maybeSettingsList = TMaybeNode(settings); + if (!maybeSettingsList) { + return nullptr; + } + const auto settingsList = maybeSettingsList.Cast(); + + for (size_t i = 0; i < settingsList.Size(); ++i) { + TCoNameValueTuple setting = settingsList.Item(i); + if (setting.Name().Value() == name) { + return setting.Value(); + } + } + return nullptr; +} } // namespace NYql diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h index 0a64879f0352..bf328554a817 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h +++ b/ydb/library/yql/providers/pq/provider/yql_pq_helpers.h @@ -21,4 +21,6 @@ void FillSettingsWithResolvedYdsIds( const TPqState::TPtr& state, const TDatabaseResolverResponse::TDatabaseDescriptionMap& fullResolvedIds); +NNodes::TMaybeNode FindSetting(TExprNode::TPtr settings, TStringBuf name); + } // namespace NYql diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_mkql_compiler.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_mkql_compiler.cpp index b53ea00ddd34..483393d9b3a8 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_mkql_compiler.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_mkql_compiler.cpp @@ -1,4 +1,5 @@ #include "yql_pq_mkql_compiler.h" +#include "yql_pq_helpers.h" #include #include @@ -12,22 +13,6 @@ using namespace NNodes; namespace { -TMaybeNode FindSetting(TExprNode::TPtr settings, TStringBuf name) { - const auto maybeSettingsList = TMaybeNode(settings); - if (!maybeSettingsList) { - return nullptr; - } - const auto settingsList = maybeSettingsList.Cast(); - - for (size_t i = 0; i < settingsList.Size(); ++i) { - TCoNameValueTuple setting = settingsList.Item(i); - if (setting.Name().Value() == name) { - return setting.Value(); - } - } - return nullptr; -} - bool UseSharedReading(TExprNode::TPtr settings) { const auto maybeInnerSettings = FindSetting(settings, "settings"); if (!maybeInnerSettings) { @@ -43,12 +28,35 @@ bool UseSharedReading(TExprNode::TPtr settings) { return value.IsAtom() && FromString(value.Content()); } +TRuntimeNode WrapSharedReading(const TDqSourceWrapBase &wrapper, NCommon::TMkqlBuildContext& ctx) { + const auto input = MkqlBuildExpr(wrapper.Input().Ref(), ctx); + const auto flow = ctx.ProgramBuilder.ToFlow(input); + + const TStructExprType* rowType = wrapper.RowType().Ref().GetTypeAnn()->Cast()->GetType()->Cast(); + const auto* finalItemStructType = static_cast(NCommon::BuildType(wrapper.RowType().Ref(), *rowType, ctx.ProgramBuilder)); + + return ctx.ProgramBuilder.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { + TRuntimeNode::TList fields; + fields.reserve(finalItemStructType->GetMembersCount()); + for (ui32 i = 0; i < finalItemStructType->GetMembersCount(); ++i) { + fields.push_back(ctx.ProgramBuilder.Member(item, finalItemStructType->GetMemberName(i))); + } + return fields; + }); +} + } void RegisterDqPqMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) { compiler.ChainCallable(TDqSourceWideWrap::CallableName(), [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) { if (const auto wrapper = TDqSourceWideWrap(&node); wrapper.DataSource().Category().Value() == PqProviderName) { + if (const auto maybeSettings = wrapper.Settings()) { + if (UseSharedReading(maybeSettings.Cast().Ptr())) { + return WrapSharedReading(wrapper, ctx); + } + } + const auto wrapped = TryWrapWithParser(wrapper, ctx); if (wrapped) { return *wrapped; @@ -56,13 +64,6 @@ void RegisterDqPqMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) { const auto input = MkqlBuildExpr(wrapper.Input().Ref(), ctx); auto flow = ctx.ProgramBuilder.ToFlow(input); - - if (const auto maybeSettings = wrapper.Settings()) { - if (UseSharedReading(maybeSettings.Cast().Ptr())) { - return flow; - } - } - return ctx.ProgramBuilder.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item};