Skip to content

Commit

Permalink
Embed BufferHolder
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 committed Oct 21, 2024
1 parent 8012ab6 commit ba307e7
Show file tree
Hide file tree
Showing 18 changed files with 147 additions and 301 deletions.
22 changes: 0 additions & 22 deletions pkg/frontend/querymiddleware/custom.go

This file was deleted.

7 changes: 3 additions & 4 deletions pkg/frontend/querymiddleware/model.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 9 additions & 17 deletions pkg/frontend/querymiddleware/model.pb.go.expdiff
Original file line number Diff line number Diff line change
@@ -1,22 +1,14 @@
diff --git a/pkg/frontend/querymiddleware/model.pb.go b/pkg/frontend/querymiddleware/model.pb.go
index fe81fabf6..47f80838c 100644
index 315ed4eed..47f80838c 100644
--- a/pkg/frontend/querymiddleware/model.pb.go
+++ b/pkg/frontend/querymiddleware/model.pb.go
@@ -13,7 +13,6 @@ import (
types "github.com/gogo/protobuf/types"
github_com_grafana_mimir_pkg_mimirpb "github.com/grafana/mimir/pkg/mimirpb"
mimirpb "github.com/grafana/mimir/pkg/mimirpb"
- "google.golang.org/grpc/mem"
io "io"
math "math"
math_bits "math/bits"
@@ -91,9 +90,6 @@ type PrometheusResponse struct {
Headers []*PrometheusHeader `protobuf:"bytes,5,rep,name=Headers,proto3" json:"-"`
Warnings []string `protobuf:"bytes,6,rep,name=Warnings,proto3" json:"warnings,omitempty"`
Infos []string `protobuf:"bytes,7,rep,name=Infos,proto3" json:"infos,omitempty"`
-
- // Keep reference to buffer for unsafe references.
- buffer mem.Buffer
@@ -83,9 +83,6 @@ func (m *PrometheusHeader) GetValues() []string {
}

func (m *PrometheusResponse) Reset() { *m = PrometheusResponse{} }
type PrometheusResponse struct {
- // Keep reference to buffer for unsafe references.
- github_com_grafana_mimir_pkg_mimirpb.BufferHolder
-
Status string `protobuf:"bytes,1,opt,name=Status,proto3" json:"status"`
Data *PrometheusData `protobuf:"bytes,2,opt,name=Data,proto3" json:"data,omitempty"`
ErrorType string `protobuf:"bytes,3,opt,name=ErrorType,proto3" json:"errorType,omitempty"`
67 changes: 0 additions & 67 deletions pkg/ingester/client/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,51 +9,10 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"google.golang.org/grpc/mem"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/storage/chunk"
)

var _ mimirpb.BufferHolder = &QueryResponse{}

func (m *QueryResponse) SetBuffer(buf mem.Buffer) {
m.buffer = buf
}

func (m *QueryResponse) FreeBuffer() {
if m.buffer != nil {
m.buffer.Free()
m.buffer = nil
}
}

var _ mimirpb.BufferHolder = &QueryStreamResponse{}

func (m *QueryStreamResponse) SetBuffer(buf mem.Buffer) {
m.buffer = buf
}

func (m *QueryStreamResponse) FreeBuffer() {
if m.buffer != nil {
m.buffer.Free()
m.buffer = nil
}
}

var _ mimirpb.BufferHolder = &ExemplarQueryResponse{}

func (m *ExemplarQueryResponse) SetBuffer(buf mem.Buffer) {
m.buffer = buf
}

func (m *ExemplarQueryResponse) FreeBuffer() {
if m.buffer != nil {
m.buffer.Free()
m.buffer = nil
}
}

func ChunksCount(series []TimeSeriesChunk) int {
if len(series) == 0 {
return 0
Expand Down Expand Up @@ -106,29 +65,3 @@ func ChunkFromMeta(meta chunks.Meta) (Chunk, error) {
func DefaultMetricsMetadataRequest() *MetricsMetadataRequest {
return &MetricsMetadataRequest{Limit: -1, LimitPerMetric: -1, Metric: ""}
}

var _ mimirpb.BufferHolder = &MetricsForLabelMatchersResponse{}

func (m *MetricsForLabelMatchersResponse) SetBuffer(buf mem.Buffer) {
m.buffer = buf
}

func (m *MetricsForLabelMatchersResponse) FreeBuffer() {
if m.buffer != nil {
m.buffer.Free()
m.buffer = nil
}
}

var _ mimirpb.BufferHolder = &ActiveSeriesResponse{}

func (m *ActiveSeriesResponse) SetBuffer(buf mem.Buffer) {
m.buffer = buf
}

func (m *ActiveSeriesResponse) FreeBuffer() {
if m.buffer != nil {
m.buffer.Free()
m.buffer = nil
}
}
31 changes: 15 additions & 16 deletions pkg/ingester/client/ingester.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 32 additions & 40 deletions pkg/ingester/client/ingester.pb.go.expdiff
Original file line number Diff line number Diff line change
@@ -1,62 +1,54 @@
diff --git a/pkg/ingester/client/ingester.pb.go b/pkg/ingester/client/ingester.pb.go
index 36860bb02..bbefc14b1 100644
index 9398a5d80..bbefc14b1 100644
--- a/pkg/ingester/client/ingester.pb.go
+++ b/pkg/ingester/client/ingester.pb.go
@@ -16,7 +16,6 @@ import (
mimirpb "github.com/grafana/mimir/pkg/mimirpb"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
- "google.golang.org/grpc/mem"
status "google.golang.org/grpc/status"
io "io"
math "math"
@@ -584,9 +583,6 @@ func (m *ActiveSeriesRequest) GetType() ActiveSeriesRequest_RequestType {
@@ -582,9 +582,6 @@ func (m *ActiveSeriesRequest) GetType() ActiveSeriesRequest_RequestType {
}

type QueryResponse struct {
Timeseries []mimirpb.TimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"`
-
- // Keep reference to buffer for unsafe references.
- buffer mem.Buffer
- mimirpb.BufferHolder
-
Timeseries []mimirpb.TimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"`
}

func (m *QueryResponse) Reset() { *m = QueryResponse{} }
@@ -642,9 +638,6 @@ type QueryStreamResponse struct {
StreamingSeries []QueryStreamSeries `protobuf:"bytes,3,rep,name=streaming_series,json=streamingSeries,proto3" json:"streaming_series"`
IsEndOfSeriesStream bool `protobuf:"varint,4,opt,name=is_end_of_series_stream,json=isEndOfSeriesStream,proto3" json:"is_end_of_series_stream,omitempty"`
StreamingSeriesChunks []QueryStreamSeriesChunks `protobuf:"bytes,5,rep,name=streaming_series_chunks,json=streamingSeriesChunks,proto3" json:"streaming_series_chunks"`
-
@@ -636,9 +633,6 @@ func (m *QueryResponse) GetTimeseries() []mimirpb.TimeSeries {
//
// Only one of these two options will be populated.
type QueryStreamResponse struct {
- // Keep reference to buffer for unsafe references.
- buffer mem.Buffer
- mimirpb.BufferHolder
-
Chunkseries []TimeSeriesChunk `protobuf:"bytes,1,rep,name=chunkseries,proto3" json:"chunkseries"`
Timeseries []mimirpb.TimeSeries `protobuf:"bytes,2,rep,name=timeseries,proto3" json:"timeseries"`
StreamingSeries []QueryStreamSeries `protobuf:"bytes,3,rep,name=streaming_series,json=streamingSeries,proto3" json:"streaming_series"`
@@ -809,9 +803,6 @@ func (m *QueryStreamSeriesChunks) GetChunks() []Chunk {
}

func (m *QueryStreamResponse) Reset() { *m = QueryStreamResponse{} }
@@ -811,9 +804,6 @@ func (m *QueryStreamSeriesChunks) GetChunks() []Chunk {

type ExemplarQueryResponse struct {
Timeseries []mimirpb.TimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"`
-
- // Keep reference to buffer for unsafe references.
- buffer mem.Buffer
- mimirpb.BufferHolder
-
Timeseries []mimirpb.TimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"`
}

func (m *ExemplarQueryResponse) Reset() { *m = ExemplarQueryResponse{} }
@@ -1332,9 +1322,6 @@ func (m *MetricsForLabelMatchersRequest) GetMatchersSet() []*LabelMatchers {
@@ -1330,9 +1321,6 @@ func (m *MetricsForLabelMatchersRequest) GetMatchersSet() []*LabelMatchers {
}

type MetricsForLabelMatchersResponse struct {
Metric []*mimirpb.Metric `protobuf:"bytes,1,rep,name=metric,proto3" json:"metric,omitempty"`
-
- // Keep reference to buffer for unsafe references.
- buffer mem.Buffer
- mimirpb.BufferHolder
-
Metric []*mimirpb.Metric `protobuf:"bytes,1,rep,name=metric,proto3" json:"metric,omitempty"`
}

func (m *MetricsForLabelMatchersResponse) Reset() { *m = MetricsForLabelMatchersResponse{} }
@@ -1483,9 +1470,6 @@ type ActiveSeriesResponse struct {
// bucket_count is only used when the request type was NATIVE_HISTOGRAM_SERIES.
// bucket_count contains the native histogram active buckets count for each series in "metric" above.
BucketCount []uint64 `protobuf:"varint,2,rep,packed,name=bucket_count,json=bucketCount,proto3" json:"bucket_count,omitempty"`
-
- // Keep reference to buffer for unsafe references.
- buffer mem.Buffer
@@ -1478,9 +1466,6 @@ func (m *MetricsMetadataResponse) GetMetadata() []*mimirpb.MetricMetadata {
}

func (m *ActiveSeriesResponse) Reset() { *m = ActiveSeriesResponse{} }
type ActiveSeriesResponse struct {
- // Keep reference to buffer for unsafe references.
- mimirpb.BufferHolder
-
Metric []*mimirpb.Metric `protobuf:"bytes,1,rep,name=metric,proto3" json:"metric,omitempty"`
// bucket_count is only used when the request type was NATIVE_HISTOGRAM_SERIES.
// bucket_count contains the native histogram active buckets count for each series in "metric" above.
24 changes: 11 additions & 13 deletions pkg/mimirpb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,23 @@ func (c *codecV2) Marshal(v any) (mem.BufferSlice, error) {
}

// Unmarshal customizes gRPC unmarshalling.
// If v implements the BufferHolder interface, its SetBuffer method is called with the unmarshalling buffer.
// If v wraps BufferHolder, its SetBuffer method is called with the unmarshalling buffer.
func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) error {
vv := messageV2Of(v)
buf := data.MaterializeToBuffer(mem.DefaultBufferPool())
// Decrement buf's reference count. Note though that if v implements BufferHolder,
// Decrement buf's reference count. Note though that if v wraps BufferHolder,
// we increase buf's reference count first so it doesn't go to zero.
defer buf.Free()

if err := protobufproto.Unmarshal(buf.ReadOnlyData(), vv); err != nil {
return err
}

unmarshaler, ok := v.(BufferHolder)
if ok {
if holder, ok := v.(interface {
SetBuffer(mem.Buffer)
}); ok {
buf.Ref()
unmarshaler.SetBuffer(buf)
holder.SetBuffer(buf)
}

return nil
Expand All @@ -68,20 +69,17 @@ func (c *codecV2) Name() string {
return c.codec.Name()
}

// BufferHolder is an interface for protobuf messages that keep unsafe references to the unmarshalling buffer.
// BufferHolder is a base type for protobuf messages that keep unsafe references to the unmarshalling buffer.
// Implementations of this interface should keep a reference to said buffer.
type BufferHolder interface {
// SetBuffer sets the unmarshalling buffer.
SetBuffer(mem.Buffer)
type BufferHolder struct {
buffer mem.Buffer
}

var _ BufferHolder = &WriteRequest{}

func (m *WriteRequest) SetBuffer(buf mem.Buffer) {
func (m *BufferHolder) SetBuffer(buf mem.Buffer) {
m.buffer = buf
}

func (m *WriteRequest) FreeBuffer() {
func (m *BufferHolder) FreeBuffer() {
if m.buffer != nil {
m.buffer.Free()
m.buffer = nil
Expand Down
Loading

0 comments on commit ba307e7

Please sign in to comment.