Skip to content

Commit

Permalink
Fixed dq_pq_rd_read_actor.cpp
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Dec 7, 2024
1 parent 9062f4a commit 04417a1
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 21 deletions.
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/actors/common/retry_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class TRetryEventsQueue {
THolder<T> ev = MakeHolder<T>();
ev->Record = Event->Record;
ev->Record.MutableTransportMeta()->SetConfirmedSeqNo(confirmedSeqNo);
for (ui64 i = 0; i < Event->GetPayloadCount(); ++i) {
for (ui32 i = 0; i < Event->GetPayloadCount(); ++i) {
ev->AddPayload(TRope(Event->GetPayload(i)));
}
return MakeHolder<NActors::IEventHandle>(Recipient, Sender, ev.Release(), NActors::IEventHandle::FlagTrackDelivery, Cookie);
Expand Down
50 changes: 30 additions & 20 deletions ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
}

public:
TVector<TRope> Data; // vector of (serialized messages, number messages)
TVector<TRope> Data;
i64 UsedSpace = 0;
ui64 NextOffset = 0;
ui64 PartitionId;
Expand Down Expand Up @@ -172,8 +172,8 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
TCounters Counters;

// Parsing info
TVector<std::optional<ui64>> ColumnIndexes; // Output column index in schema passed into RowDispatcher
const TType* InputDataType = nullptr;
std::vector<std::optional<ui64>> ColumnIndexes; // Output column index in schema passed into RowDispatcher
const TType* InputDataType = nullptr; // Multi type (comes from Row Dispatcher)
std::unique_ptr<NKikimr::NMiniKQL::TValuePackerTransport<true>> DataUnpacker;

struct SessionInfo {
Expand Down Expand Up @@ -274,7 +274,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
void PassAway() override;
i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>& watermark, bool&, i64 freeSpace) override;
std::vector<ui64> GetPartitionsToRead() const;
void AddItem(TRope&& data, NKikimr::NMiniKQL::TUnboxedValueBatch& buffer);
void AddMessageBatch(TRope&& serializedBatch, NKikimr::NMiniKQL::TUnboxedValueBatch& buffer);
void ProcessState();
void Stop(NDqProto::StatusIds::StatusCode status, TIssues issues);
void StopSessions();
Expand Down Expand Up @@ -310,22 +310,25 @@ TDqPqRdReadActor::TDqPqRdReadActor(
, HolderFactory(holderFactory)
, MaxBufferSize(bufferSize)
{
const IFunctionRegistry& functionRegistry = *holderFactory.GetFunctionRegistry();
const auto pb = std::make_unique<TProgramBuilder>(typeEnv, functionRegistry);
const auto outputItemType = NCommon::ParseTypeFromYson(TStringBuf(SourceParams.GetRowType()), *pb, Cerr);
YQL_ENSURE(outputItemType, "Failed to parse row type: " << SourceParams.GetRowType());
YQL_ENSURE(outputItemType->IsStruct(), "Row type is not struct");
const auto programBuilder = std::make_unique<TProgramBuilder>(typeEnv, *holderFactory.GetFunctionRegistry());

// Parse output schema (expected struct output type)
const auto& outputTypeYson = SourceParams.GetRowType();
const auto outputItemType = NCommon::ParseTypeFromYson(TStringBuf(outputTypeYson), *programBuilder, Cerr);
YQL_ENSURE(outputItemType, "Failed to parse output type: " << outputItemType);
YQL_ENSURE(outputItemType->IsStruct(), "Output type " << outputItemType << " is not struct");
const auto structType = static_cast<TStructType*>(outputItemType);

TVector<TType*> inputTypeParts;
// Build input schema and unpacker (for data comes from RowDispatcher)
TVector<TType* const> inputTypeParts;
inputTypeParts.reserve(SourceParams.ColumnsSize());
ColumnIndexes.resize(structType->GetMembersCount());
for (ui64 i = 0; i < SourceParams.ColumnsSize(); ++i) {
for (size_t i = 0; i < SourceParams.ColumnsSize(); ++i) {
const auto index = structType->GetMemberIndex(SourceParams.GetColumns().Get(i));
inputTypeParts.emplace_back(structType->GetMemberType(index));
ColumnIndexes[index] = i;
}
InputDataType = pb->NewMultiType(inputTypeParts);
InputDataType = programBuilder->NewMultiType(inputTypeParts);
DataUnpacker = std::make_unique<NKikimr::NMiniKQL::TValuePackerTransport<true>>(InputDataType);

IngressStats.Level = statsLevel;
Expand Down Expand Up @@ -452,9 +455,10 @@ i64 TDqPqRdReadActor::GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& b
auto readyBatch = std::move(ReadyBuffer.front());
ReadyBuffer.pop();

for (auto& message : readyBatch.Data) {
AddItem(std::move(message), buffer);
for (auto& messageBatch : readyBatch.Data) {
AddMessageBatch(std::move(messageBatch), buffer);
}

usedSpace += readyBatch.UsedSpace;
freeSpace -= readyBatch.UsedSpace;
TPartitionKey partitionKey{TString{}, readyBatch.PartitionId};
Expand Down Expand Up @@ -773,22 +777,28 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev)
NotifyCA();
}

void TDqPqRdReadActor::AddItem(TRope&& data, NKikimr::NMiniKQL::TUnboxedValueBatch& buffer) {
void TDqPqRdReadActor::AddMessageBatch(TRope&& messageBatch, NKikimr::NMiniKQL::TUnboxedValueBatch& buffer) {
// TDOD: pass multi type directly to CA, without transforming it into struct

NKikimr::NMiniKQL::TUnboxedValueBatch parsedData(InputDataType);
DataUnpacker->UnpackBatch(MakeChunkedBuffer(std::move(data)), HolderFactory, parsedData);
DataUnpacker->UnpackBatch(MakeChunkedBuffer(std::move(messageBatch)), HolderFactory, parsedData);

while (!parsedData.empty()) {
const auto* parsedRow = parsedData.Head();

NUdf::TUnboxedValue* itemPtr;
NUdf::TUnboxedValuePod item = HolderFactory.CreateDirectArrayHolder(ColumnIndexes.size(), itemPtr);
for (const auto index : ColumnIndexes) {
// TODO: support metadata fields
YQL_ENSURE(index < parsedData.Width(), "Unexpected data width " << parsedData.Width() << ", failed to extract column by index " << index);
*(itemPtr++) = index ? parsedRow[*index] : NUdf::TUnboxedValue();
if (index) {
YQL_ENSURE(*index < parsedData.Width(), "Unexpected data width " << parsedData.Width() << ", failed to extract column by index " << index);
*(itemPtr++) = parsedRow[*index];
} else {
// TODO: support metadata fields here
*(itemPtr++) = NUdf::TUnboxedValue();
}
}

buffer.push_back(std::move(item));
buffer.emplace_back(std::move(item));
parsedData.Pop();
}
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
#include <ydb/library/yql/providers/pq/proto/dq_task_params.pb.h>

namespace NYql::NDq::NInternal {

Expand Down

0 comments on commit 04417a1

Please sign in to comment.