diff --git a/ydb/library/yql/dq/actors/common/retry_queue.h b/ydb/library/yql/dq/actors/common/retry_queue.h index e85e0cef6bfd..2124a0e9f6e2 100644 --- a/ydb/library/yql/dq/actors/common/retry_queue.h +++ b/ydb/library/yql/dq/actors/common/retry_queue.h @@ -186,7 +186,7 @@ class TRetryEventsQueue { THolder ev = MakeHolder(); ev->Record = Event->Record; ev->Record.MutableTransportMeta()->SetConfirmedSeqNo(confirmedSeqNo); - for (ui64 i = 0; i < Event->GetPayloadCount(); ++i) { + for (ui32 i = 0; i < Event->GetPayloadCount(); ++i) { ev->AddPayload(TRope(Event->GetPayload(i))); } return MakeHolder(Recipient, Sender, ev.Release(), NActors::IEventHandle::FlagTrackDelivery, Cookie); diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp index 6a1b38845d15..79e8c80fb5aa 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp @@ -123,7 +123,7 @@ class TDqPqRdReadActor : public NActors::TActor, public NYql:: } public: - TVector Data; // vector of (serialized messages, number messages) + TVector Data; i64 UsedSpace = 0; ui64 NextOffset = 0; ui64 PartitionId; @@ -172,8 +172,8 @@ class TDqPqRdReadActor : public NActors::TActor, public NYql:: TCounters Counters; // Parsing info - TVector> ColumnIndexes; // Output column index in schema passed into RowDispatcher - const TType* InputDataType = nullptr; + std::vector> ColumnIndexes; // Output column index in schema passed into RowDispatcher + const TType* InputDataType = nullptr; // Multi type (comes from Row Dispatcher) std::unique_ptr> DataUnpacker; struct SessionInfo { @@ -274,7 +274,7 @@ class TDqPqRdReadActor : public NActors::TActor, public NYql:: void PassAway() override; i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe& watermark, bool&, i64 freeSpace) override; std::vector GetPartitionsToRead() const; - void AddItem(TRope&& data, NKikimr::NMiniKQL::TUnboxedValueBatch& buffer); + void AddMessageBatch(TRope&& serializedBatch, NKikimr::NMiniKQL::TUnboxedValueBatch& buffer); void ProcessState(); void Stop(NDqProto::StatusIds::StatusCode status, TIssues issues); void StopSessions(); @@ -310,22 +310,25 @@ TDqPqRdReadActor::TDqPqRdReadActor( , HolderFactory(holderFactory) , MaxBufferSize(bufferSize) { - const IFunctionRegistry& functionRegistry = *holderFactory.GetFunctionRegistry(); - const auto pb = std::make_unique(typeEnv, functionRegistry); - const auto outputItemType = NCommon::ParseTypeFromYson(TStringBuf(SourceParams.GetRowType()), *pb, Cerr); - YQL_ENSURE(outputItemType, "Failed to parse row type: " << SourceParams.GetRowType()); - YQL_ENSURE(outputItemType->IsStruct(), "Row type is not struct"); + const auto programBuilder = std::make_unique(typeEnv, *holderFactory.GetFunctionRegistry()); + + // Parse output schema (expected struct output type) + const auto& outputTypeYson = SourceParams.GetRowType(); + const auto outputItemType = NCommon::ParseTypeFromYson(TStringBuf(outputTypeYson), *programBuilder, Cerr); + YQL_ENSURE(outputItemType, "Failed to parse output type: " << outputItemType); + YQL_ENSURE(outputItemType->IsStruct(), "Output type " << outputItemType << " is not struct"); const auto structType = static_cast(outputItemType); - TVector inputTypeParts; + // Build input schema and unpacker (for data comes from RowDispatcher) + TVector inputTypeParts; inputTypeParts.reserve(SourceParams.ColumnsSize()); ColumnIndexes.resize(structType->GetMembersCount()); - for (ui64 i = 0; i < SourceParams.ColumnsSize(); ++i) { + for (size_t i = 0; i < SourceParams.ColumnsSize(); ++i) { const auto index = structType->GetMemberIndex(SourceParams.GetColumns().Get(i)); inputTypeParts.emplace_back(structType->GetMemberType(index)); ColumnIndexes[index] = i; } - InputDataType = pb->NewMultiType(inputTypeParts); + InputDataType = programBuilder->NewMultiType(inputTypeParts); DataUnpacker = std::make_unique>(InputDataType); IngressStats.Level = statsLevel; @@ -452,9 +455,10 @@ i64 TDqPqRdReadActor::GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& b auto readyBatch = std::move(ReadyBuffer.front()); ReadyBuffer.pop(); - for (auto& message : readyBatch.Data) { - AddItem(std::move(message), buffer); + for (auto& messageBatch : readyBatch.Data) { + AddMessageBatch(std::move(messageBatch), buffer); } + usedSpace += readyBatch.UsedSpace; freeSpace -= readyBatch.UsedSpace; TPartitionKey partitionKey{TString{}, readyBatch.PartitionId}; @@ -773,9 +777,11 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev) NotifyCA(); } -void TDqPqRdReadActor::AddItem(TRope&& data, NKikimr::NMiniKQL::TUnboxedValueBatch& buffer) { +void TDqPqRdReadActor::AddMessageBatch(TRope&& messageBatch, NKikimr::NMiniKQL::TUnboxedValueBatch& buffer) { + // TDOD: pass multi type directly to CA, without transforming it into struct + NKikimr::NMiniKQL::TUnboxedValueBatch parsedData(InputDataType); - DataUnpacker->UnpackBatch(MakeChunkedBuffer(std::move(data)), HolderFactory, parsedData); + DataUnpacker->UnpackBatch(MakeChunkedBuffer(std::move(messageBatch)), HolderFactory, parsedData); while (!parsedData.empty()) { const auto* parsedRow = parsedData.Head(); @@ -783,12 +789,16 @@ void TDqPqRdReadActor::AddItem(TRope&& data, NKikimr::NMiniKQL::TUnboxedValueBat NUdf::TUnboxedValue* itemPtr; NUdf::TUnboxedValuePod item = HolderFactory.CreateDirectArrayHolder(ColumnIndexes.size(), itemPtr); for (const auto index : ColumnIndexes) { - // TODO: support metadata fields - YQL_ENSURE(index < parsedData.Width(), "Unexpected data width " << parsedData.Width() << ", failed to extract column by index " << index); - *(itemPtr++) = index ? parsedRow[*index] : NUdf::TUnboxedValue(); + if (index) { + YQL_ENSURE(*index < parsedData.Width(), "Unexpected data width " << parsedData.Width() << ", failed to extract column by index " << index); + *(itemPtr++) = parsedRow[*index]; + } else { + // TODO: support metadata fields here + *(itemPtr++) = NUdf::TUnboxedValue(); + } } - buffer.push_back(std::move(item)); + buffer.emplace_back(std::move(item)); parsedData.Pop(); } } diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.h b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.h index 5028a93078c2..7b965b7f94b4 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.h +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.h @@ -1,6 +1,8 @@ #pragma once #include +#include +#include namespace NYql::NDq::NInternal {