Skip to content

Commit

Permalink
Correct Rows count in Block Output Channels (#11893)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hor911 authored Nov 23, 2024
1 parent b83d01c commit 398fb41
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 21 deletions.
3 changes: 3 additions & 0 deletions ydb/library/yql/dq/common/dq_serialized_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,13 @@ TChunkedBuffer SaveForSpilling(TDqSerializedBatch&& batch) {

ui32 transportversion = batch.Proto.GetTransportVersion();
ui32 rowCount = batch.Proto.GetRows();
ui32 chunkCount = batch.Proto.GetChunks();

TChunkedBuffer protoPayload(std::move(*batch.Proto.MutableRaw()));

AppendNumber(result, transportversion);
AppendNumber(result, rowCount);
AppendNumber(result, chunkCount);
AppendNumber(result, protoPayload.Size());
result.Append(std::move(protoPayload));
AppendNumber(result, batch.Payload.Size());
Expand All @@ -85,6 +87,7 @@ TDqSerializedBatch LoadSpilled(TBuffer&& blob) {
TDqSerializedBatch result;
result.Proto.SetTransportVersion(ReadNumber<ui32>(source));
result.Proto.SetRows(ReadNumber<ui32>(source));
result.Proto.SetChunks(ReadNumber<ui32>(source));

size_t protoSize = ReadNumber<size_t>(source);
YQL_ENSURE(source.size() >= protoSize, "Premature end of spilled data");
Expand Down
4 changes: 4 additions & 0 deletions ydb/library/yql/dq/common/dq_serialized_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ struct TDqSerializedBatch {
return Proto.GetRows();
}

ui32 ChunkCount() const {
return Proto.GetChunks();
}

void Clear() {
Payload.Clear();
Proto.Clear();
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/dq/proto/dq_transport.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ enum EDataTransportVersion {
message TData {
uint32 TransportVersion = 1;
bytes Raw = 2;
uint32 Rows = 3;
uint32 Rows = 5;
uint32 Chunks = 3;
optional uint32 PayloadId = 4;
}
5 changes: 3 additions & 2 deletions ydb/library/yql/dq/runtime/dq_input_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class TDqInputChannel : public IDqInputChannel {

void PushImpl(TDqSerializedBatch&& data) {
const i64 space = data.Size();
const size_t rowCount = data.RowCount();
const size_t chunkCount = data.ChunkCount();
auto inputType = Impl.GetInputType();
NKikimr::NMiniKQL::TUnboxedValueBatch batch(inputType);
if (Y_UNLIKELY(PushStats.CollectProfile())) {
Expand All @@ -58,7 +58,8 @@ class TDqInputChannel : public IDqInputChannel {
DataSerializer.Deserialize(std::move(data), inputType, batch);
}

YQL_ENSURE(batch.RowCount() == rowCount);
// single batch row is chunk and may be Arrow block
YQL_ENSURE(batch.RowCount() == chunkCount);
Impl.AddBatch(std::move(batch), space);
}

Expand Down
69 changes: 51 additions & 18 deletions ydb/library/yql/dq/runtime/dq_output_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class TDqOutputChannel : public IDqOutputChannel {
}

ui64 GetValuesCount() const override {
return SpilledRowCount + PackedRowCount + ChunkRowCount;
return SpilledRowCount + PackedRowCount + PackerCurrentRowCount;
}

const TDqOutputStats& GetPushStats() const override {
Expand Down Expand Up @@ -95,8 +95,12 @@ class TDqOutputChannel : public IDqOutputChannel {
return;
}

ui32 rows = Packer.IsBlock() ?
NKikimr::NMiniKQL::TArrowBlock::From(values[width - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value
: 1;

if (PushStats.CollectBasic()) {
PushStats.Rows++;
PushStats.Rows += rows;
PushStats.Chunks++;
PushStats.Resume();
}
Expand All @@ -110,7 +114,8 @@ class TDqOutputChannel : public IDqOutputChannel {
values[i] = {};
}

ChunkRowCount++;
PackerCurrentRowCount += rows;
PackerCurrentChunkCount++;

size_t packerSize = Packer.PackedSizeEstimate();
if (packerSize >= MaxChunkBytes) {
Expand All @@ -120,9 +125,12 @@ class TDqOutputChannel : public IDqOutputChannel {
PushStats.Bytes += Data.back().Buffer.Size();
}
PackedDataSize += Data.back().Buffer.Size();
PackedRowCount += ChunkRowCount;
Data.back().RowCount = ChunkRowCount;
ChunkRowCount = 0;
PackedRowCount += PackerCurrentRowCount;
PackedChunkCount += PackerCurrentChunkCount;
Data.back().RowCount = PackerCurrentRowCount;
Data.back().ChunkCount = PackerCurrentChunkCount;
PackerCurrentRowCount = 0;
PackerCurrentChunkCount = 0;
packerSize = 0;
}

Expand All @@ -134,11 +142,13 @@ class TDqOutputChannel : public IDqOutputChannel {
TDqSerializedBatch data;
data.Proto.SetTransportVersion(TransportVersion);
data.Proto.SetRows(head.RowCount);
data.Proto.SetChunks(head.ChunkCount);
data.SetPayload(std::move(head.Buffer));
Storage->Put(NextStoredId++, SaveForSpilling(std::move(data)));

PackedDataSize -= bufSize;
PackedRowCount -= head.RowCount;
PackedChunkCount -= head.ChunkCount;

SpilledRowCount += head.RowCount;

Expand Down Expand Up @@ -199,22 +209,29 @@ class TDqOutputChannel : public IDqOutputChannel {
} else if (!Data.empty()) {
auto& packed = Data.front();
PackedRowCount -= packed.RowCount;
PackedChunkCount -= packed.ChunkCount;
PackedDataSize -= packed.Buffer.Size();
data.Proto.SetRows(packed.RowCount);
data.Proto.SetChunks(packed.ChunkCount);
data.SetPayload(std::move(packed.Buffer));
Data.pop_front();
} else {
data.Proto.SetRows(ChunkRowCount);
data.Proto.SetRows(PackerCurrentRowCount);
data.Proto.SetChunks(PackerCurrentChunkCount);
data.SetPayload(FinishPackAndCheckSize());
ChunkRowCount = 0;
if (PushStats.CollectBasic()) {
PushStats.Bytes += data.Payload.Size();
}
PackerCurrentRowCount = 0;
PackerCurrentChunkCount = 0;
}

DLOG("Took " << data.RowCount() << " rows");

if (PopStats.CollectBasic()) {
PopStats.Bytes += data.Size();
PopStats.Rows += data.RowCount();
PopStats.Chunks++;
PopStats.Rows += data.RowCount();
PopStats.Chunks++; // pop chunks do not match push chunks
if (!IsFull() || FirstStoredId == NextStoredId) {
PopStats.Resume();
}
Expand Down Expand Up @@ -256,20 +273,31 @@ class TDqOutputChannel : public IDqOutputChannel {
data.Clear();
data.Proto.SetTransportVersion(TransportVersion);
if (SpilledRowCount == 0 && PackedRowCount == 0) {
data.Proto.SetRows(ChunkRowCount);
data.Proto.SetRows(PackerCurrentRowCount);
data.Proto.SetChunks(PackerCurrentChunkCount);
data.SetPayload(FinishPackAndCheckSize());
ChunkRowCount = 0;
if (PushStats.CollectBasic()) {
PushStats.Bytes += data.Payload.Size();
}
PackerCurrentRowCount = 0;
PackerCurrentChunkCount = 0;
return true;
}

// Repack all - thats why PopAll should never be used
if (ChunkRowCount) {
if (PackerCurrentRowCount) {
Data.emplace_back();
Data.back().Buffer = FinishPackAndCheckSize();
if (PushStats.CollectBasic()) {
PushStats.Bytes += Data.back().Buffer.Size();
}
PackedDataSize += Data.back().Buffer.Size();
PackedRowCount += ChunkRowCount;
Data.back().RowCount = ChunkRowCount;
ChunkRowCount = 0;
PackedRowCount += PackerCurrentRowCount;
PackedChunkCount += PackerCurrentChunkCount;
Data.back().RowCount = PackerCurrentRowCount;
Data.back().ChunkCount = PackerCurrentChunkCount;
PackerCurrentRowCount = 0;
PackerCurrentChunkCount = 0;
}

NKikimr::NMiniKQL::TUnboxedValueBatch rows(OutputType);
Expand Down Expand Up @@ -332,7 +360,9 @@ class TDqOutputChannel : public IDqOutputChannel {
ui64 rows = GetValuesCount();
Data.clear();
Packer.Clear();
SpilledRowCount = PackedDataSize = PackedRowCount = ChunkRowCount = 0;
PackedDataSize = 0;
SpilledRowCount = PackedRowCount = PackerCurrentRowCount = 0;
PackedChunkCount = PackerCurrentChunkCount = 0;
FirstStoredId = NextStoredId;
return rows;
}
Expand All @@ -359,6 +389,7 @@ class TDqOutputChannel : public IDqOutputChannel {
struct TSerializedBatch {
TChunkedBuffer Buffer;
ui64 RowCount = 0;
ui64 ChunkCount = 0;
};
std::deque<TSerializedBatch> Data;

Expand All @@ -368,8 +399,10 @@ class TDqOutputChannel : public IDqOutputChannel {

size_t PackedDataSize = 0;
size_t PackedRowCount = 0;
size_t PackedChunkCount = 0;

size_t ChunkRowCount = 0;
size_t PackerCurrentRowCount = 0;
size_t PackerCurrentChunkCount = 0;

bool Finished = false;

Expand Down

0 comments on commit 398fb41

Please sign in to comment.