From 34e3abfa3689be834de6222e3fea9ea30acb9bab Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 10 Dec 2024 00:37:36 -0500 Subject: [PATCH 1/2] kv: allocate BatchRequest header and first request together MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Avoids a heap allocation on each BatchStream RPC request. ``` name old time/op new time/op delta Sysbench/KV/1node_remote/oltp_point_select-10 35.2µs ±10% 35.7µs ±12% ~ (p=0.853 n=10+10) name old alloc/op new alloc/op delta Sysbench/KV/1node_remote/oltp_point_select-10 6.74kB ± 2% 6.78kB ± 1% +0.65% (p=0.040 n=9+9) name old allocs/op new allocs/op delta Sysbench/KV/1node_remote/oltp_point_select-10 61.0 ± 0% 60.0 ± 0% -1.64% (p=0.000 n=10+9) ``` --- pkg/server/node.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/server/node.go b/pkg/server/node.go index 5f5a969990a7..0d1b8e422020 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1877,7 +1877,14 @@ func (n *Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchR func (n *Node) BatchStream(stream kvpb.Internal_BatchStreamServer) error { ctx := stream.Context() for { - args, err := stream.Recv() + argsAlloc := new(struct { + args kvpb.BatchRequest + reqs [1]kvpb.RequestUnion + }) + args := &argsAlloc.args + args.Requests = argsAlloc.reqs[:0] + + err := stream.RecvMsg(args) if err != nil { // From grpc.ServerStream.Recv: // > It returns io.EOF when the client has performed a CloseSend. From 0165b5072ea8d1be70dea0804b577654e9599f0c Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 10 Dec 2024 01:07:39 -0500 Subject: [PATCH 2/2] kv: allocate BatchResponse header and responses together MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Avoids a heap allocation on each BatchResponse creations. ``` name old time/op new time/op delta Sysbench/KV/1node_remote/oltp_point_select-10 39.8µs ±16% 39.6µs ±19% ~ (p=0.968 n=9+10) name old alloc/op new alloc/op delta Sysbench/KV/1node_remote/oltp_point_select-10 6.79kB ± 4% 6.77kB ± 4% ~ (p=0.671 n=10+10) name old allocs/op new allocs/op delta Sysbench/KV/1node_remote/oltp_point_select-10 60.0 ± 0% 59.0 ± 0% -1.67% (p=0.000 n=9+8) ``` --- pkg/kv/kvpb/batch_generated.go | 38 ++++++++++++++++++++++++++++++-- pkg/kv/kvpb/gen/main.go | 40 ++++++++++++++++++++++++++++++++-- 2 files changed, 74 insertions(+), 4 deletions(-) diff --git a/pkg/kv/kvpb/batch_generated.go b/pkg/kv/kvpb/batch_generated.go index b721790a728f..15a57c7516af 100644 --- a/pkg/kv/kvpb/batch_generated.go +++ b/pkg/kv/kvpb/batch_generated.go @@ -830,12 +830,46 @@ type linkExternalSSTableResponseAlloc struct { resp LinkExternalSSTableResponse } +func allocBatchResponse(nResps int) *BatchResponse { + if nResps <= 1 { + alloc := new(struct { + br BatchResponse + resps [1]ResponseUnion + }) + alloc.br.Responses = alloc.resps[:nResps] + return &alloc.br + } else if nResps <= 2 { + alloc := new(struct { + br BatchResponse + resps [2]ResponseUnion + }) + alloc.br.Responses = alloc.resps[:nResps] + return &alloc.br + } else if nResps <= 4 { + alloc := new(struct { + br BatchResponse + resps [4]ResponseUnion + }) + alloc.br.Responses = alloc.resps[:nResps] + return &alloc.br + } else if nResps <= 8 { + alloc := new(struct { + br BatchResponse + resps [8]ResponseUnion + }) + alloc.br.Responses = alloc.resps[:nResps] + return &alloc.br + } + br := &BatchResponse{} + br.Responses = make([]ResponseUnion, nResps) + return br +} + // CreateReply creates replies for each of the contained requests, wrapped in a // BatchResponse. The response objects are batch allocated to minimize // allocation overhead. func (ba *BatchRequest) CreateReply() *BatchResponse { - br := &BatchResponse{} - br.Responses = make([]ResponseUnion, len(ba.Requests)) + br := allocBatchResponse(len(ba.Requests)) counts := ba.getReqCounts() diff --git a/pkg/kv/kvpb/gen/main.go b/pkg/kv/kvpb/gen/main.go index aac45035210f..a24ef985a038 100644 --- a/pkg/kv/kvpb/gen/main.go +++ b/pkg/kv/kvpb/gen/main.go @@ -366,13 +366,49 @@ func (ba *BatchRequest) WriteSummary(b *strings.Builder) { allocTypes[resV.variantName] = allocName } + fmt.Fprint(f, ` +func allocBatchResponse(nResps int) *BatchResponse { + if nResps <= 1 { + alloc := new(struct { + br BatchResponse + resps [1]ResponseUnion + }) + alloc.br.Responses = alloc.resps[:nResps] + return &alloc.br + } else if nResps <= 2 { + alloc := new(struct { + br BatchResponse + resps [2]ResponseUnion + }) + alloc.br.Responses = alloc.resps[:nResps] + return &alloc.br + } else if nResps <= 4 { + alloc := new(struct { + br BatchResponse + resps [4]ResponseUnion + }) + alloc.br.Responses = alloc.resps[:nResps] + return &alloc.br + } else if nResps <= 8 { + alloc := new(struct { + br BatchResponse + resps [8]ResponseUnion + }) + alloc.br.Responses = alloc.resps[:nResps] + return &alloc.br + } + br := &BatchResponse{} + br.Responses = make([]ResponseUnion, nResps) + return br +} +`) + fmt.Fprint(f, ` // CreateReply creates replies for each of the contained requests, wrapped in a // BatchResponse. The response objects are batch allocated to minimize // allocation overhead. func (ba *BatchRequest) CreateReply() *BatchResponse { - br := &BatchResponse{} - br.Responses = make([]ResponseUnion, len(ba.Requests)) + br := allocBatchResponse(len(ba.Requests)) counts := ba.getReqCounts()