Skip to content

Commit

Permalink
Fixed pq rd source types
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Nov 25, 2024
1 parent 851dc32 commit 2264103
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "yql_pq_provider_impl.h"
#include "yql_pq_helpers.h"

#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
#include <ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.h>
Expand All @@ -7,6 +8,7 @@
#include <yql/essentials/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/common/pushdown/type_ann.h>
#include <ydb/library/yql/providers/pq/common/pq_meta_fields.h>
#include <ydb/library/yql/providers/pq/common/yql_names.h>
#include <yql/essentials/providers/common/provider/yql_data_provider_impl.h>

#include <yql/essentials/utils/log/log.h>
Expand Down Expand Up @@ -156,6 +158,13 @@ class TPqDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
return TStatus::Error;
}

if (const auto maybeSharedReadingSetting = FindSetting(topicSource.Settings().Ptr(), SharedReading)) {
const TExprNode& value = maybeSharedReadingSetting.Cast().Ref();
if (value.IsAtom() && FromString<bool>(value.Content())) {
input.Ptr()->SetTypeAnn(ctx.MakeType<TStreamExprType>(topicSource.RowType().Ref().GetTypeAnn()));
return TStatus::Ok;
}
}

if (topic.Metadata().Empty()) {
input.Ptr()->SetTypeAnn(ctx.MakeType<TStreamExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String)));
Expand Down
15 changes: 15 additions & 0 deletions ydb/library/yql/providers/pq/provider/yql_pq_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,20 @@ void FillSettingsWithResolvedYdsIds(
}
}

TMaybeNode<TExprBase> FindSetting(TExprNode::TPtr settings, TStringBuf name) {
const auto maybeSettingsList = TMaybeNode<TCoNameValueTupleList>(settings);
if (!maybeSettingsList) {
return nullptr;
}
const auto settingsList = maybeSettingsList.Cast();

for (size_t i = 0; i < settingsList.Size(); ++i) {
TCoNameValueTuple setting = settingsList.Item(i);
if (setting.Name().Value() == name) {
return setting.Value();
}
}
return nullptr;
}

} // namespace NYql
2 changes: 2 additions & 0 deletions ydb/library/yql/providers/pq/provider/yql_pq_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ void FillSettingsWithResolvedYdsIds(
const TPqState::TPtr& state,
const TDatabaseResolverResponse::TDatabaseDescriptionMap& fullResolvedIds);

NNodes::TMaybeNode<NNodes::TExprBase> FindSetting(TExprNode::TPtr settings, TStringBuf name);

} // namespace NYql
47 changes: 24 additions & 23 deletions ydb/library/yql/providers/pq/provider/yql_pq_mkql_compiler.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "yql_pq_mkql_compiler.h"
#include "yql_pq_helpers.h"

#include <ydb/library/yql/providers/pq/common/yql_names.h>
#include <ydb/library/yql/providers/pq/expr_nodes/yql_pq_expr_nodes.h>
Expand All @@ -12,22 +13,6 @@ using namespace NNodes;

namespace {

TMaybeNode<TExprBase> FindSetting(TExprNode::TPtr settings, TStringBuf name) {
const auto maybeSettingsList = TMaybeNode<TCoNameValueTupleList>(settings);
if (!maybeSettingsList) {
return nullptr;
}
const auto settingsList = maybeSettingsList.Cast();

for (size_t i = 0; i < settingsList.Size(); ++i) {
TCoNameValueTuple setting = settingsList.Item(i);
if (setting.Name().Value() == name) {
return setting.Value();
}
}
return nullptr;
}

bool UseSharedReading(TExprNode::TPtr settings) {
const auto maybeInnerSettings = FindSetting(settings, "settings");
if (!maybeInnerSettings) {
Expand All @@ -43,26 +28,42 @@ bool UseSharedReading(TExprNode::TPtr settings) {
return value.IsAtom() && FromString<bool>(value.Content());
}

TRuntimeNode WrapSharedReading(const TDqSourceWrapBase &wrapper, NCommon::TMkqlBuildContext& ctx) {
const auto input = MkqlBuildExpr(wrapper.Input().Ref(), ctx);
const auto flow = ctx.ProgramBuilder.ToFlow(input);

const TStructExprType* rowType = wrapper.RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
const auto* finalItemStructType = static_cast<TStructType*>(NCommon::BuildType(wrapper.RowType().Ref(), *rowType, ctx.ProgramBuilder));

return ctx.ProgramBuilder.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList {
TRuntimeNode::TList fields;
fields.reserve(finalItemStructType->GetMembersCount());
for (ui32 i = 0; i < finalItemStructType->GetMembersCount(); ++i) {
fields.push_back(ctx.ProgramBuilder.Member(item, finalItemStructType->GetMemberName(i)));
}
return fields;
});
}

}

void RegisterDqPqMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) {
compiler.ChainCallable(TDqSourceWideWrap::CallableName(),
[](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) {
if (const auto wrapper = TDqSourceWideWrap(&node); wrapper.DataSource().Category().Value() == PqProviderName) {
if (const auto maybeSettings = wrapper.Settings()) {
if (UseSharedReading(maybeSettings.Cast().Ptr())) {
return WrapSharedReading(wrapper, ctx);
}
}

const auto wrapped = TryWrapWithParser(wrapper, ctx);
if (wrapped) {
return *wrapped;
}

const auto input = MkqlBuildExpr(wrapper.Input().Ref(), ctx);
auto flow = ctx.ProgramBuilder.ToFlow(input);

if (const auto maybeSettings = wrapper.Settings()) {
if (UseSharedReading(maybeSettings.Cast().Ptr())) {
return flow;
}
}

return ctx.ProgramBuilder.ExpandMap(flow,
[&](TRuntimeNode item) -> TRuntimeNode::TList {
return {item};
Expand Down

0 comments on commit 2264103

Please sign in to comment.