Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/prometheusremotewrite] feat: prom rw exporter add support for rw2 #35888

Open
wants to merge 96 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 67 commits
Commits
Show all changes
96 commits
Select commit Hold shift + click to select a range
8a0b38e
feat: prom rw exporter add support for rw2
jmichalek132 Oct 20, 2024
3d54298
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Oct 21, 2024
4c4ef36
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Oct 25, 2024
d5521bf
chore: try to reduce code duplication
jmichalek132 Oct 25, 2024
146bf31
chore: try to reduce code duplication 2
jmichalek132 Oct 25, 2024
07185a8
chore: removed batching for now to make PR smaller
jmichalek132 Oct 25, 2024
0cdd754
chore: addressed comments from PR
jmichalek132 Oct 25, 2024
b8052ce
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Oct 25, 2024
1b5d693
chore: implement enum for rw2 instead of bool
jmichalek132 Oct 25, 2024
c5518b0
chore: add feature flag for enabling support for rw2
jmichalek132 Oct 25, 2024
446e58a
chore: fix issues pointed out by linter
jmichalek132 Oct 25, 2024
a15354d
Update exporter/prometheusremotewriteexporter/config.go
jmichalek132 Oct 26, 2024
da15f91
chore: addressed feedback from review
jmichalek132 Oct 28, 2024
81805f3
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Oct 28, 2024
e4ddae3
chore: addressed feedback from review
jmichalek132 Oct 28, 2024
4340649
chore: addressed feedback from review
jmichalek132 Oct 28, 2024
56c09c1
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Oct 30, 2024
a44abba
refactor PushMetrics based on suggestion from review
jmichalek132 Oct 30, 2024
6e2a0d8
chore: undo unneeded changes
jmichalek132 Oct 30, 2024
8bd8bc7
chore: updated unit tests & added an rw2 case
jmichalek132 Oct 30, 2024
6b3bea6
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Oct 30, 2024
c8f832d
chore: ran make gci
jmichalek132 Oct 30, 2024
658ff26
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Oct 30, 2024
cf476d6
chore: move debug line to correct place
jmichalek132 Oct 31, 2024
eae3908
chore: fix typo in feature flag description
jmichalek132 Oct 31, 2024
7e18afb
chore: move debug line to correct place
jmichalek132 Oct 31, 2024
aca5734
chore: small refactor based on review
jmichalek132 Oct 31, 2024
3abd46e
chore: update failing unit test
jmichalek132 Oct 31, 2024
9719de0
chore: updated readme first iteration
jmichalek132 Oct 31, 2024
7f2b253
chore: added changelog draft
jmichalek132 Oct 31, 2024
1770b82
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Oct 31, 2024
047b4ba
chore: ran make generate
jmichalek132 Oct 31, 2024
48a3544
chore: moved v2 function to v2 file
jmichalek132 Oct 31, 2024
bc6c1dc
chore: delete empty file
jmichalek132 Oct 31, 2024
4f04e82
chore: import proto message from prometheus and validate it from config
jmichalek132 Oct 31, 2024
6be771d
chore: ran make generate
jmichalek132 Oct 31, 2024
cc1599c
chore: added check if flag enabled where missing
jmichalek132 Oct 31, 2024
a5bab73
chore: fix broken tests
jmichalek132 Oct 31, 2024
7917ee0
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 11, 2024
018fde1
chore: fixed go.sum after resolving conflicts
jmichalek132 Nov 11, 2024
7fd2376
Update .chloggen/jm-prom-rw-exporter-add-support-for-rw2.yaml
jmichalek132 Nov 11, 2024
a5cea35
chore: updqated changelong entry based on feedback
jmichalek132 Nov 11, 2024
0e8d921
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 11, 2024
7f283c0
chore: ran make gotidy
jmichalek132 Nov 11, 2024
e78df16
Update exporter/prometheusremotewriteexporter/factory.go
jmichalek132 Nov 12, 2024
5b7a338
chore: avoid setting feature flag for all tests
jmichalek132 Nov 12, 2024
9415ebe
chore: ran make gci
jmichalek132 Nov 12, 2024
5747e4a
chore: added log line with proto message
jmichalek132 Nov 12, 2024
f6eb6ab
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 12, 2024
e730d21
Update .chloggen/jm-prom-rw-exporter-add-support-for-rw2.yaml
jmichalek132 Nov 13, 2024
8180fa4
Update exporter/prometheusremotewriteexporter/README.md
jmichalek132 Nov 13, 2024
18f933d
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 13, 2024
1b6c001
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 13, 2024
4ed165b
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 13, 2024
322d135
Update exporter/prometheusremotewriteexporter/README.md
jmichalek132 Nov 13, 2024
7db3003
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 13, 2024
e53e521
chore: ran go mod tidy
jmichalek132 Nov 13, 2024
8e85d8c
chore: remove unnecessary config option
jmichalek132 Nov 13, 2024
b2198a8
chore: moved validation of proto message into correct place
jmichalek132 Nov 13, 2024
10345a6
chore: updated comment on exportV2 func
jmichalek132 Nov 13, 2024
d770a2c
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 13, 2024
40be25d
chore: ran make generate
jmichalek132 Nov 13, 2024
4b8ca99
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 14, 2024
f6505d1
chore: fix tests and v2 after rebase
jmichalek132 Nov 14, 2024
fdf6b7a
chore: fix linter errors
jmichalek132 Nov 14, 2024
95476c8
chore: ran make generate
jmichalek132 Nov 14, 2024
bfd072f
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 17, 2024
01d77a6
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
dashpole Nov 18, 2024
3ddd34e
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 20, 2024
50caa19
chore: cleanup for pipeline to pass after rebase
jmichalek132 Nov 20, 2024
f950fb4
chore: ran go mod tidy in prom recevier to fix pipeline
jmichalek132 Nov 20, 2024
a65bd59
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 20, 2024
56ba3ba
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 20, 2024
bf37b71
choreL ran make go tidy
jmichalek132 Nov 20, 2024
1dd7935
choreL ran make go tidy
jmichalek132 Nov 20, 2024
fe755ad
Update exporter/prometheusremotewriteexporter/exporter_v2.go
jmichalek132 Nov 27, 2024
217bf6a
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 27, 2024
71327df
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 27, 2024
4e11b39
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 27, 2024
0e29bad
Update exporter/prometheusremotewriteexporter/exporter.go
jmichalek132 Nov 27, 2024
cdb12dc
Update exporter/prometheusremotewriteexporter/config.go
jmichalek132 Nov 27, 2024
dcad87a
Update exporter/prometheusremotewriteexporter/config.go
jmichalek132 Nov 27, 2024
64f0223
chore: addressed feedback from review
jmichalek132 Nov 27, 2024
53dcdce
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Nov 27, 2024
23a5aae
chore: ran go mod tidy
jmichalek132 Nov 27, 2024
c031d95
chore: ran make gci
jmichalek132 Nov 27, 2024
1f9fac9
chore: explicitly put back buffer after usage
jmichalek132 Nov 27, 2024
deeb455
chore: fix linting issues
jmichalek132 Nov 27, 2024
b23d48d
chore: ran make generate
jmichalek132 Nov 27, 2024
fb4abec
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
jmichalek132 Dec 15, 2024
a037f25
chore: solving conflict
jmichalek132 Dec 15, 2024
e790e84
chore: solving conflict
jmichalek132 Dec 15, 2024
8a929c1
chore: solving conflict
jmichalek132 Dec 15, 2024
702ff70
chore: solving conflict
jmichalek132 Dec 15, 2024
6b8fafa
chore: solving conflict
jmichalek132 Dec 15, 2024
5312467
Merge branch 'main' into jm-prom-rw-exporter-add-support-for-rw2
dashpole Dec 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/jm-prom-rw-exporter-add-support-for-rw2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusremotewriteexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add `exporter.prometheusremotewritexporter.enableSendingRW2` feature gate and configure the exporter to send Prometheus remote write 2.0 version.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33661]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: PRW 2.0 support for the exporter is still under development.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user, api]
7 changes: 6 additions & 1 deletion exporter/prometheusremotewriteexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ The following settings can be optionally configured:
- *Note the following headers cannot be changed: `Content-Encoding`, `Content-Type`, `X-Prometheus-Remote-Write-Version`, and `User-Agent`.*
- `namespace`: prefix attached to each exported metric name.
- `add_metric_suffixes`: If set to false, type and unit suffixes will not be added to metrics. Default: true.
- `send_metadata`: If set to true, prometheus metadata will be generated and sent. Default: false.
- `send_metadata`: If set to true, prometheus metadata will be generated and sent. Default: false. This option is ignored when using PRW 2.0, which already includes metadata.
- `remote_write_queue`: fine tuning for queueing and sending of the outgoing remote writes.
- `enabled`: enable the sending queue (default: `true`)
- `queue_size`: number of OTLP metrics that can be queued. Ignored if `enabled` is `false` (default: `10000`)
Expand All @@ -66,6 +66,11 @@ The following settings can be optionally configured:
- `max_batch_size_bytes` (default = `3000000` -> `~2.861 mb`): Maximum size of a batch of
samples to be sent to the remote write endpoint. If the batch size is larger
than this value, it will be split into multiple batches.
- `protobuf_message` (default = `prometheus.WriteRequest`):
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
- Protobuf message to use when writing to the remote write endpoint. This option is ignored unless the `exporter.prometheusremotewritexporter.enableSendingRW2` feature gate is enabled.
- `prometheus.WriteRequest` is the message used in [Remote Write 1.0](https://prometheus.io/docs/specs/remote_write_spec/).
- `io.prometheus.write.v2.Request` is the message used in [Remote Write 2.0](https://prometheus.io/docs/specs/remote_write_spec_2_0/). It is more efficient, always includes metadata, and adds support for the created timestamp and native histograms. Your remote storage provider must support PRW 2.0 to be able to use this message. PRW 2.0 support is currently **In Development**, and is only partially implemented.


Example:

Expand Down
9 changes: 9 additions & 0 deletions exporter/prometheusremotewriteexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package prometheusremotewriteexporter // import "github.com/open-telemetry/opent
import (
"fmt"

"github.com/prometheus/prometheus/config"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configretry"
Expand Down Expand Up @@ -52,6 +53,9 @@ type Config struct {

// SendMetadata controls whether prometheus metadata will be generated and sent
SendMetadata bool `mapstructure:"send_metadata"`

// RemoteWriteProtoMsg controls whether prometheus remote write v1 or v2 is sent
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
RemoteWriteProtoMsg config.RemoteWriteProtoMsg `mapstructure:"protobuf_message,omitempty"`
}

type CreatedMetric struct {
Expand Down Expand Up @@ -115,5 +119,10 @@ func (cfg *Config) Validate() error {
cfg.MaxBatchSizeBytes = 3000000
}

err := cfg.RemoteWriteProtoMsg.Validate()
if err != nil {
return err
}
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved

return nil
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
}
4 changes: 3 additions & 1 deletion exporter/prometheusremotewriteexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/cenkalti/backoff/v4"
"github.com/prometheus/prometheus/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -79,7 +80,8 @@ func TestLoadConfig(t *testing.T) {
TargetInfo: &TargetInfo{
Enabled: true,
},
CreatedMetric: &CreatedMetric{Enabled: true},
CreatedMetric: &CreatedMetric{Enabled: true},
RemoteWriteProtoMsg: config.RemoteWriteProtoMsgV1,
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
},
},
{
Expand Down
114 changes: 79 additions & 35 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cenkalti/backoff/v4"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/prompb"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
Expand Down Expand Up @@ -85,6 +86,7 @@ type prwExporter struct {
exporterSettings prometheusremotewrite.Settings
telemetry prwTelemetry
batchTimeSeriesState batchTimeSeriesState
RemoteWriteProtoMsg config.RemoteWriteProtoMsg
}

func newPRWTelemetry(set exporter.Settings) (prwTelemetry, error) {
Expand Down Expand Up @@ -118,19 +120,25 @@ func newPRWExporter(cfg *Config, set exporter.Settings) (*prwExporter, error) {
return nil, err
}

err = config.RemoteWriteProtoMsg.Validate(cfg.RemoteWriteProtoMsg)
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved

userAgentHeader := fmt.Sprintf("%s/%s", strings.ReplaceAll(strings.ToLower(set.BuildInfo.Description), " ", "-"), set.BuildInfo.Version)

prwe := &prwExporter{
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
endpointURL: endpointURL,
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
userAgentHeader: userAgentHeader,
maxBatchSizeBytes: cfg.MaxBatchSizeBytes,
concurrency: cfg.RemoteWriteQueue.NumConsumers,
clientSettings: &cfg.ClientConfig,
settings: set.TelemetrySettings,
retrySettings: cfg.BackOffConfig,
retryOnHTTP429: retryOn429FeatureGate.IsEnabled(),
endpointURL: endpointURL,
wg: new(sync.WaitGroup),
closeChan: make(chan struct{}),
userAgentHeader: userAgentHeader,
maxBatchSizeBytes: cfg.MaxBatchSizeBytes,
concurrency: cfg.RemoteWriteQueue.NumConsumers,
clientSettings: &cfg.ClientConfig,
settings: set.TelemetrySettings,
retrySettings: cfg.BackOffConfig,
retryOnHTTP429: retryOn429FeatureGate.IsEnabled(),
RemoteWriteProtoMsg: cfg.RemoteWriteProtoMsg,
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
exporterSettings: prometheusremotewrite.Settings{
Namespace: cfg.Namespace,
ExternalLabels: sanitizedLabels,
Expand All @@ -143,6 +151,8 @@ func newPRWExporter(cfg *Config, set exporter.Settings) (*prwExporter, error) {
batchTimeSeriesState: newBatchTimeSericesState(),
}

prwe.settings.Logger.Info("Running with prometheus remote write proto message", zap.Any("ProtoMsg", cfg.RemoteWriteProtoMsg))
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved

prwe.wal = newWAL(cfg.WAL, prwe.export)
return prwe, nil
}
Expand Down Expand Up @@ -176,6 +186,23 @@ func (prwe *prwExporter) Shutdown(context.Context) error {
return err
}

func (prwe *prwExporter) pushMetricsV1(ctx context.Context, md pmetric.Metrics) error {
tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings)

prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMap))

var m []*prompb.MetricMetadata
if prwe.exporterSettings.SendMetadata {
m = prometheusremotewrite.OtelMetricsToMetadata(md, prwe.exporterSettings.AddMetricSuffixes)
}
if err != nil {
prwe.telemetry.recordTranslationFailure(ctx)
prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap)))
}
// Call export even if a conversion error, since there may be points that were successfully converted.
return prwe.handleExport(ctx, tsMap, m)
}

// PushMetrics converts metrics to Prometheus remote write TimeSeries and send to remote endpoint. It maintain a map of
// TimeSeries, validates and handles each individual metric, adding the converted TimeSeries to the map, and finally
// exports the map.
Expand All @@ -188,21 +215,23 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er
return errors.New("shutdown has been called")
default:

tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings)
if err != nil {
prwe.telemetry.recordTranslationFailure(ctx)
prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap)))
// If feature flag not enabled support only RW1

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// If feature flag not enabled support only RW1
// If feature flag not enabled support only RW1.

if !enableSendingRW2FeatureGate.IsEnabled() {
return prwe.pushMetricsV1(ctx, md)
}

prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMap))
// If feature flag enabled check if we want to send RW1 or RW2

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// If feature flag enabled check if we want to send RW1 or RW2
// If feature flag was enabled check if we want to send RW1 or RW2.

switch prwe.RemoteWriteProtoMsg {
case config.RemoteWriteProtoMsgV1:
// Rw1 case

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too trivial commentary?

Suggested change
// Rw1 case

return prwe.pushMetricsV1(ctx, md)
case config.RemoteWriteProtoMsgV2:
// RW2 case

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// RW2 case

return prwe.pushMetricsV2(ctx, md)

var m []*prompb.MetricMetadata
if prwe.exporterSettings.SendMetadata {
m = prometheusremotewrite.OtelMetricsToMetadata(md, prwe.exporterSettings.AddMetricSuffixes)
default:
return fmt.Errorf("unsupported remote-write protobuf message: %v", prwe.RemoteWriteProtoMsg)
}

// Call export even if a conversion error, since there may be points that were successfully converted.
return prwe.handleExport(ctx, tsMap, m)
}
}

Expand All @@ -223,7 +252,6 @@ func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*pro
if len(tsMap) == 0 {
return nil
}

// Calls the helper function to convert and batch the TsMap to the desired format
requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, &prwe.batchTimeSeriesState)
if err != nil {
Expand Down Expand Up @@ -271,7 +299,19 @@ func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteReq
if !ok {
return
}
if errExecute := prwe.execute(ctx, request); errExecute != nil {

buf := bufferPool.Get().(*buffer)
buf.protobuf.Reset()
defer bufferPool.Put(buf)

// Uses proto.Marshal to convert the WriteRequest into bytes array
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
errMarshal := buf.protobuf.Marshal(request)
if errMarshal != nil {
errs = multierr.Append(errs, consumererror.NewPermanent(errMarshal))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: risk of race here (errs shared by multiple goroutines and not locked)

return
}
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved

if errExecute := prwe.execute(ctx, buf); errExecute != nil {
mu.Lock()
errs = multierr.Append(errs, consumererror.NewPermanent(errExecute))
mu.Unlock()
Expand All @@ -285,16 +325,7 @@ func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteReq
return errs
}

func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequest) error {
buf := bufferPool.Get().(*buffer)
buf.protobuf.Reset()
defer bufferPool.Put(buf)

// Uses proto.Marshal to convert the WriteRequest into bytes array
errMarshal := buf.protobuf.Marshal(writeReq)
if errMarshal != nil {
return consumererror.NewPermanent(errMarshal)
}
func (prwe *prwExporter) execute(ctx context.Context, buf *buffer) error {
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
// If we don't pass a buffer large enough, Snappy Encode function will not use it and instead will allocate a new buffer.
// Manually grow the buffer to make sure Snappy uses it and we can re-use it afterwards.
maxCompressedLen := snappy.MaxEncodedLen(len(buf.protobuf.Bytes()))
Expand Down Expand Up @@ -327,10 +358,23 @@ func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequ
// Add necessary headers specified by:
// https://cortexmetrics.io/docs/apis/#remote-api
req.Header.Add("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
req.Header.Set("User-Agent", prwe.userAgentHeader)

// If feature flag not enabled support only RW1
if !enableSendingRW2FeatureGate.IsEnabled() {
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
} else {
switch prwe.RemoteWriteProtoMsg {
case config.RemoteWriteProtoMsgV1:
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
case config.RemoteWriteProtoMsgV2:
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
}
jmichalek132 marked this conversation as resolved.
Show resolved Hide resolved
}

resp, err := prwe.client.Do(req)
if err != nil {
return err
Expand Down
Loading