From cf54e5b1034fb757450400bee31aef6232d57b69 Mon Sep 17 00:00:00 2001 From: Yotam loewenbach Date: Mon, 14 Oct 2024 16:54:36 +0300 Subject: [PATCH 01/14] migrate logs to otlphttp format --- exporter/logzioexporter/exporter.go | 33 ++++++++--------------------- 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/exporter/logzioexporter/exporter.go b/exporter/logzioexporter/exporter.go index 297c3eafdcb3..7ea54306eea1 100644 --- a/exporter/logzioexporter/exporter.go +++ b/exporter/logzioexporter/exporter.go @@ -9,6 +9,7 @@ import ( "encoding/json" "errors" "fmt" + "go.opentelemetry.io/collector/pdata/plog/plogotlp" "io" "net/http" "reflect" @@ -72,7 +73,7 @@ func newLogzioTracesExporter(config *Config, set exporter.Settings) (exporter.Tr if err != nil { return nil, err } - exporter.config.ClientConfig.Endpoint, err = generateEndpoint(config) + exporter.config.ClientConfig.Endpoint, err = generateTracesEndpoint(config) if err != nil { return nil, err } @@ -94,7 +95,7 @@ func newLogzioLogsExporter(config *Config, set exporter.Settings) (exporter.Logs if err != nil { return nil, err } - exporter.config.ClientConfig.Endpoint, err = generateEndpoint(config) + exporter.config.ClientConfig.Endpoint, err = generateLogsEndpoint(config) if err != nil { return nil, err } @@ -123,29 +124,13 @@ func (exporter *logzioExporter) start(ctx context.Context, host component.Host) func (exporter *logzioExporter) pushLogData(ctx context.Context, ld plog.Logs) error { var dataBuffer bytes.Buffer - resourceLogs := ld.ResourceLogs() - for i := 0; i < resourceLogs.Len(); i++ { - resource := resourceLogs.At(i).Resource() - scopeLogs := resourceLogs.At(i).ScopeLogs() - for j := 0; j < scopeLogs.Len(); j++ { - logRecords := scopeLogs.At(j).LogRecords() - scope := scopeLogs.At(j).Scope() - for k := 0; k < logRecords.Len(); k++ { - log := logRecords.At(k) - details := mergeMapEntries(resource.Attributes(), scope.Attributes(), log.Attributes()) - details.PutStr(`scopeName`, scope.Name()) - jsonLog, err := json.Marshal(convertLogRecordToJSON(log, details)) - if err != nil { - return err - } - _, err = dataBuffer.Write(append(jsonLog, '\n')) - if err != nil { - return err - } - } - } + tr := plogotlp.NewExportRequestFromLogs(ld) + request, err := tr.MarshalJSON() + if err != nil { + return consumererror.NewPermanent(err) } - err := exporter.export(ctx, exporter.config.ClientConfig.Endpoint, dataBuffer.Bytes()) + dataBuffer.Write(request) + err = exporter.export(ctx, exporter.config.ClientConfig.Endpoint, dataBuffer.Bytes()) // reset the data buffer after each export to prevent duplicated data dataBuffer.Reset() return err From 3fdfd7fb6ad4c1c95983dbec28a89cdd34cd5bd9 Mon Sep 17 00:00:00 2001 From: Yotam loewenbach Date: Mon, 14 Oct 2024 16:54:50 +0300 Subject: [PATCH 02/14] handle new logs endpoint --- exporter/logzioexporter/factory.go | 52 +++++++++++++++++++++++------- 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/exporter/logzioexporter/factory.go b/exporter/logzioexporter/factory.go index 3345e38dac23..75b3682d602b 100644 --- a/exporter/logzioexporter/factory.go +++ b/exporter/logzioexporter/factory.go @@ -48,7 +48,7 @@ func createDefaultConfig() component.Config { } } -func getListenerURL(region string) string { +func getTracesListenerURL(region string) string { var url string lowerCaseRegion := strings.ToLower(region) switch lowerCaseRegion { @@ -72,18 +72,48 @@ func getListenerURL(region string) string { return url } -func generateEndpoint(cfg *Config) (string, error) { - defaultURL := fmt.Sprintf("%s/?token=%s", getListenerURL(""), string(cfg.Token)) - switch { - case cfg.ClientConfig.Endpoint != "": - return cfg.ClientConfig.Endpoint, nil - case cfg.Region != "": - return fmt.Sprintf("%s/?token=%s", getListenerURL(cfg.Region), string(cfg.Token)), nil - case cfg.ClientConfig.Endpoint == "" && cfg.Region == "": - return defaultURL, errors.New("failed to generate endpoint, Endpoint or Region must be set") +func getLogsListenerURL(region string) string { + var url string + lowerCaseRegion := strings.ToLower(region) + switch lowerCaseRegion { + case "us": + url = "https://listener-otlp.logz.io:8071" + case "ca": + url = "https://listener-ca-otlp.logz.io:8071" + case "eu": + url = "https://listener-eu-otlp.logz.io:8071" + case "uk": + url = "https://listener-uk-otlp.logz.io:8071" + case "au": + url = "https://listener-au-otlp.logz.io:8071" + case "nl": + url = "https://listener-nl-otlp.logz.io:8071" + case "wa": + url = "https://listener-wa-otlp.logz.io:8071" default: - return defaultURL, nil + url = "https://listener-otlp.logz.io:8071" + } + return url +} + +func generateTracesEndpoint(cfg *Config) (string, error) { + if cfg.ClientConfig.Endpoint != "" { + return cfg.ClientConfig.Endpoint, nil + } + if cfg.Region != "" { + return fmt.Sprintf("%s/?token=%s", getTracesListenerURL(cfg.Region), string(cfg.Token)), nil + } + return fmt.Sprintf("%s/?token=%s", getTracesListenerURL(""), string(cfg.Token)), errors.New("failed to generate endpoint, Endpoint or Region must be set") +} + +func generateLogsEndpoint(cfg *Config) (string, error) { + if cfg.ClientConfig.Endpoint != "" { + return cfg.ClientConfig.Endpoint, nil + } + if cfg.Region != "" { + return fmt.Sprintf("%s/?token=%s", getLogsListenerURL(cfg.Region), string(cfg.Token)), nil } + return fmt.Sprintf("%s/?token=%s", getLogsListenerURL(""), string(cfg.Token)), errors.New("failed to generate endpoint, Endpoint or Region must be set") } func createTracesExporter(_ context.Context, params exporter.Settings, cfg component.Config) (exporter.Traces, error) { From e60b464995ee990b740e24ec6a1829a9f7a699f2 Mon Sep 17 00:00:00 2001 From: Yotam loewenbach Date: Tue, 22 Oct 2024 11:52:42 +0300 Subject: [PATCH 03/14] Dynamically set `content-type` header --- exporter/logzioexporter/exporter.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/exporter/logzioexporter/exporter.go b/exporter/logzioexporter/exporter.go index 7ea54306eea1..92a6031d9f31 100644 --- a/exporter/logzioexporter/exporter.go +++ b/exporter/logzioexporter/exporter.go @@ -9,6 +9,7 @@ import ( "encoding/json" "errors" "fmt" + "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/pdata/plog/plogotlp" "io" "net/http" @@ -78,6 +79,9 @@ func newLogzioTracesExporter(config *Config, set exporter.Settings) (exporter.Tr return nil, err } config.checkAndWarnDeprecatedOptions(exporter.logger) + exporter.config.ClientConfig.Headers = map[string]configopaque.String{ + "Content-Type": "application/json", + } return exporterhelper.NewTracesExporter( context.TODO(), set, @@ -96,6 +100,10 @@ func newLogzioLogsExporter(config *Config, set exporter.Settings) (exporter.Logs return nil, err } exporter.config.ClientConfig.Endpoint, err = generateLogsEndpoint(config) + exporter.config.ClientConfig.Headers = map[string]configopaque.String{ + "Authorization": "Bearer " + config.Token, + "Content-Type": "application/x-protobuf", + } if err != nil { return nil, err } @@ -123,16 +131,12 @@ func (exporter *logzioExporter) start(ctx context.Context, host component.Host) } func (exporter *logzioExporter) pushLogData(ctx context.Context, ld plog.Logs) error { - var dataBuffer bytes.Buffer tr := plogotlp.NewExportRequestFromLogs(ld) - request, err := tr.MarshalJSON() + request, err := tr.MarshalProto() if err != nil { return consumererror.NewPermanent(err) } - dataBuffer.Write(request) - err = exporter.export(ctx, exporter.config.ClientConfig.Endpoint, dataBuffer.Bytes()) - // reset the data buffer after each export to prevent duplicated data - dataBuffer.Reset() + err = exporter.export(ctx, exporter.config.ClientConfig.Endpoint, request) return err } @@ -216,7 +220,6 @@ func (exporter *logzioExporter) export(ctx context.Context, url string, request if err != nil { return consumererror.NewPermanent(err) } - req.Header.Set("Content-Type", "application/json") resp, err := exporter.client.Do(req) if err != nil { return fmt.Errorf("failed to make an HTTP request: %w", err) From a83d880f00ad5f2df8dce0d376f5db67be1827e0 Mon Sep 17 00:00:00 2001 From: Yotam loewenbach Date: Tue, 22 Oct 2024 11:53:24 +0300 Subject: [PATCH 04/14] change endpoint generation --- exporter/logzioexporter/factory.go | 20 ++++++++++---------- exporter/logzioexporter/factory_test.go | 4 ++-- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/exporter/logzioexporter/factory.go b/exporter/logzioexporter/factory.go index 75b3682d602b..2a56c795e4d5 100644 --- a/exporter/logzioexporter/factory.go +++ b/exporter/logzioexporter/factory.go @@ -77,21 +77,21 @@ func getLogsListenerURL(region string) string { lowerCaseRegion := strings.ToLower(region) switch lowerCaseRegion { case "us": - url = "https://listener-otlp.logz.io:8071" + url = "https://otlp-listener.logz.io/v1/logs" case "ca": - url = "https://listener-ca-otlp.logz.io:8071" + url = "https://otlp-listener-ca.logz.io/v1/logs" case "eu": - url = "https://listener-eu-otlp.logz.io:8071" + url = "https://otlp-listener-eu.logz.io/v1/logs" case "uk": - url = "https://listener-uk-otlp.logz.io:8071" + url = "https://otlp-listener-uk.logz.io/v1/logs" case "au": - url = "https://listener-au-otlp.logz.io:8071" + url = "https://otlp-listener-au.logz.io/v1/logs" case "nl": - url = "https://listener-nl-otlp.logz.io:8071" + url = "https://otlp-listener-nl.logz.io/v1/logs" case "wa": - url = "https://listener-wa-otlp.logz.io:8071" + url = "https://otlp-listener-wa.logz.io/v1/logs" default: - url = "https://listener-otlp.logz.io:8071" + url = "https://otlp-listener.logz.io/v1/logs" } return url } @@ -111,9 +111,9 @@ func generateLogsEndpoint(cfg *Config) (string, error) { return cfg.ClientConfig.Endpoint, nil } if cfg.Region != "" { - return fmt.Sprintf("%s/?token=%s", getLogsListenerURL(cfg.Region), string(cfg.Token)), nil + return getLogsListenerURL(cfg.Region), nil } - return fmt.Sprintf("%s/?token=%s", getLogsListenerURL(""), string(cfg.Token)), errors.New("failed to generate endpoint, Endpoint or Region must be set") + return getLogsListenerURL(""), errors.New("failed to generate endpoint, Endpoint or Region must be set") } func createTracesExporter(_ context.Context, params exporter.Settings, cfg component.Config) (exporter.Traces, error) { diff --git a/exporter/logzioexporter/factory_test.go b/exporter/logzioexporter/factory_test.go index 4d682b44dea1..c73e2bf70f12 100644 --- a/exporter/logzioexporter/factory_test.go +++ b/exporter/logzioexporter/factory_test.go @@ -67,7 +67,7 @@ func TestGenerateUrl(t *testing.T) { Token: "token", ClientConfig: clientConfig, } - output, _ := generateEndpoint(cfg) + output, _ := generateTracesEndpoint(cfg) require.Equal(t, test.expected, output) } } @@ -91,7 +91,7 @@ func TestGetListenerURL(t *testing.T) { {"Us", "https://listener.logz.io:8071"}, } for _, test := range getListenerURLTests { - output := getListenerURL(test.arg1) + output := getTracesListenerURL(test.arg1) require.Equal(t, test.expected, output) } } From 131c0c24824732831f5b985fb56cc5d4340837b3 Mon Sep 17 00:00:00 2001 From: Yotam loewenbach Date: Tue, 22 Oct 2024 16:07:56 +0300 Subject: [PATCH 05/14] Add helper functions --- exporter/logzioexporter/exporter.go | 101 ++++++++++++++++++++++------ 1 file changed, 82 insertions(+), 19 deletions(-) diff --git a/exporter/logzioexporter/exporter.go b/exporter/logzioexporter/exporter.go index 92a6031d9f31..de2f7cbd0e71 100644 --- a/exporter/logzioexporter/exporter.go +++ b/exporter/logzioexporter/exporter.go @@ -11,6 +11,7 @@ import ( "fmt" "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/pdata/plog/plogotlp" + "go.uber.org/zap" "io" "net/http" "reflect" @@ -83,7 +84,7 @@ func newLogzioTracesExporter(config *Config, set exporter.Settings) (exporter.Tr "Content-Type": "application/json", } return exporterhelper.NewTracesExporter( - context.TODO(), + context.Background(), set, config, exporter.pushTraceData, @@ -109,7 +110,7 @@ func newLogzioLogsExporter(config *Config, set exporter.Settings) (exporter.Logs } config.checkAndWarnDeprecatedOptions(exporter.logger) return exporterhelper.NewLogsExporter( - context.TODO(), + context.Background(), set, config, exporter.pushLogData, @@ -132,12 +133,12 @@ func (exporter *logzioExporter) start(ctx context.Context, host component.Host) func (exporter *logzioExporter) pushLogData(ctx context.Context, ld plog.Logs) error { tr := plogotlp.NewExportRequestFromLogs(ld) + var request []byte request, err := tr.MarshalProto() if err != nil { return consumererror.NewPermanent(err) } - err = exporter.export(ctx, exporter.config.ClientConfig.Endpoint, request) - return err + return exporter.export(ctx, exporter.config.ClientConfig.Endpoint, request) } func mergeMapEntries(maps ...pcommon.Map) pcommon.Map { @@ -220,6 +221,9 @@ func (exporter *logzioExporter) export(ctx context.Context, url string, request if err != nil { return consumererror.NewPermanent(err) } + for key, value := range exporter.config.ClientConfig.Headers { + req.Header.Set(key, string(value)) + } resp, err := exporter.client.Do(req) if err != nil { return fmt.Errorf("failed to make an HTTP request: %w", err) @@ -227,7 +231,7 @@ func (exporter *logzioExporter) export(ctx context.Context, url string, request defer func() { // Discard any remaining response body when we are done reading. - _, _ = io.CopyN(io.Discard, resp.Body, maxHTTPResponseReadBytes) + io.CopyN(io.Discard, resp.Body, maxHTTPResponseReadBytes) // nolint:errcheck resp.Body.Close() }() exporter.logger.Debug(fmt.Sprintf("Response status code: %d", resp.StatusCode)) @@ -235,7 +239,7 @@ func (exporter *logzioExporter) export(ctx context.Context, url string, request // Request is successful. return nil } - respStatus := readResponse(resp) + respStatus := readResponseStatus(resp) // Format the error message. Use the status if it is present in the response. var formattedErr error if respStatus != nil { @@ -270,32 +274,91 @@ func (exporter *logzioExporter) export(ctx context.Context, url string, request return formattedErr } +func readResponseBody(resp *http.Response) ([]byte, error) { + if resp.ContentLength == 0 { + return nil, nil + } + + maxRead := resp.ContentLength + + // if maxRead == -1, the ContentLength header has not been sent, so read up to + // the maximum permitted body size. If it is larger than the permitted body + // size, still try to read from the body in case the value is an error. If the + // body is larger than the maximum size, proto unmarshaling will likely fail. + if maxRead == -1 || maxRead > maxHTTPResponseReadBytes { + maxRead = maxHTTPResponseReadBytes + } + protoBytes := make([]byte, maxRead) + n, err := io.ReadFull(resp.Body, protoBytes) + + // No bytes read and an EOF error indicates there is no body to read. + if n == 0 && (err == nil || errors.Is(err, io.EOF)) { + return nil, nil + } + + // io.ReadFull will return io.ErrorUnexpectedEOF if the Content-Length header + // wasn't set, since we will try to read past the length of the body. If this + // is the case, the body will still have the full message in it, so we want to + // ignore the error and parse the message. + if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) { + return nil, err + } + + return protoBytes[:n], nil +} + // Read the response and decode the status.Status from the body. // Returns nil if the response is empty or cannot be decoded. -func readResponse(resp *http.Response) *status.Status { +func readResponseStatus(resp *http.Response) *status.Status { var respStatus *status.Status if resp.StatusCode >= 400 && resp.StatusCode <= 599 { // Request failed. Read the body. OTLP spec says: // "Response body for all HTTP 4xx and HTTP 5xx responses MUST be a // Protobuf-encoded Status message that describes the problem." - maxRead := resp.ContentLength - if maxRead == -1 || maxRead > maxHTTPResponseReadBytes { - maxRead = maxHTTPResponseReadBytes + respBytes, err := readResponseBody(resp) + if err != nil { + return nil } - respBytes := make([]byte, maxRead) - n, err := io.ReadFull(resp.Body, respBytes) - if err == nil && n > 0 { - // Decode it as Status struct. See https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#failures - respStatus = &status.Status{} - err = proto.Unmarshal(respBytes, respStatus) - if err != nil { - respStatus = nil - } + + // Decode it as Status struct. See https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#failures + respStatus = &status.Status{} + err = proto.Unmarshal(respBytes, respStatus) + if err != nil { + return nil } } return respStatus } +func (exporter *logzioExporter) logsPartialSuccessHandler(protoBytes []byte, contentType string) error { + if protoBytes == nil { + return nil + } + exportResponse := plogotlp.NewExportResponse() + switch contentType { + case "application/x-protobuf": + err := exportResponse.UnmarshalProto(protoBytes) + if err != nil { + return fmt.Errorf("error parsing protobuf response: %w", err) + } + case "application/json": + err := exportResponse.UnmarshalJSON(protoBytes) + if err != nil { + return fmt.Errorf("error parsing json response: %w", err) + } + default: + return nil + } + + partialSuccess := exportResponse.PartialSuccess() + if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedLogRecords() == 0) { + exporter.logger.Warn("Partial success response", + zap.String("message", partialSuccess.ErrorMessage()), + zap.Int64("dropped_log_records", partialSuccess.RejectedLogRecords()), + ) + } + return nil +} func (exporter *logzioExporter) dropEmptyTags(tags []model.KeyValue) []model.KeyValue { for i, tag := range tags { From 445199e2684fc54eb77f0d434177e0a87aa7699b Mon Sep 17 00:00:00 2001 From: Yotam loewenbach Date: Mon, 28 Oct 2024 14:40:11 +0200 Subject: [PATCH 06/14] replace `hclog2ZapLogger` -> `zap.logger` in `config.go` --- exporter/logzioexporter/config.go | 4 ++-- exporter/logzioexporter/config_test.go | 7 ++----- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/exporter/logzioexporter/config.go b/exporter/logzioexporter/config.go index 74c53e495832..e471a71f73a3 100644 --- a/exporter/logzioexporter/config.go +++ b/exporter/logzioexporter/config.go @@ -5,8 +5,8 @@ package logzioexporter // import "github.com/open-telemetry/opentelemetry-collec import ( "errors" + "go.uber.org/zap" - "github.com/hashicorp/go-hclog" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/config/configretry" @@ -34,7 +34,7 @@ func (c *Config) Validate() error { } // CheckAndWarnDeprecatedOptions Is checking for soon deprecated configuration options (queue_max_length, queue_capacity, drain_interval, custom_endpoint) log a warning message and map to the relevant updated option -func (c *Config) checkAndWarnDeprecatedOptions(logger hclog.Logger) { +func (c *Config) checkAndWarnDeprecatedOptions(logger *zap.Logger) { if c.QueueCapacity != 0 { logger.Warn("You are using the deprecated `queue_capacity` option that will be removed in the next release; use exporter helper configuration instead: https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md") } diff --git a/exporter/logzioexporter/config_test.go b/exporter/logzioexporter/config_test.go index bf16dbe27644..23e7a42bf3c0 100644 --- a/exporter/logzioexporter/config_test.go +++ b/exporter/logzioexporter/config_test.go @@ -86,11 +86,8 @@ func TestCheckAndWarnDeprecatedOptions(t *testing.T) { ClientConfig: clientConfig, } params := exportertest.NewNopSettings() - logger := hclog2ZapLogger{ - Zap: params.Logger, - name: loggerName, - } - actualCfg.checkAndWarnDeprecatedOptions(&logger) + logger := params.Logger + actualCfg.checkAndWarnDeprecatedOptions(logger) clientConfigEndpoint := confighttp.NewDefaultClientConfig() clientConfigEndpoint.Timeout = 10 * time.Second From ea2d472acbc943b2edc36265ca98b06264c58cf2 Mon Sep 17 00:00:00 2001 From: Yotam loewenbach Date: Mon, 28 Oct 2024 14:40:52 +0200 Subject: [PATCH 07/14] `TestGetLogsListenerURL` + `TestGetTracesListenerURL` --- exporter/logzioexporter/factory_test.go | 26 ++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/exporter/logzioexporter/factory_test.go b/exporter/logzioexporter/factory_test.go index c73e2bf70f12..3252c20ecf24 100644 --- a/exporter/logzioexporter/factory_test.go +++ b/exporter/logzioexporter/factory_test.go @@ -72,7 +72,7 @@ func TestGenerateUrl(t *testing.T) { } } -func TestGetListenerURL(t *testing.T) { +func TestGetTracesListenerURL(t *testing.T) { type getListenerURLTest struct { arg1 string expected string @@ -95,3 +95,27 @@ func TestGetListenerURL(t *testing.T) { require.Equal(t, test.expected, output) } } + +func TestGetLogsListenerURL(t *testing.T) { + type getListenerURLTest struct { + arg1 string + expected string + } + var getListenerURLTests = []getListenerURLTest{ + {"us", "https://otlp-listener.logz.io/v1/logs"}, + {"eu", "https://otlp-listener-eu.logz.io/v1/logs"}, + {"au", "https://otlp-listener-au.logz.io/v1/logs"}, + {"ca", "https://otlp-listener-ca.logz.io/v1/logs"}, + {"nl", "https://otlp-listener-nl.logz.io/v1/logs"}, + {"uk", "https://otlp-listener-uk.logz.io/v1/logs"}, + {"wa", "https://otlp-listener-wa.logz.io/v1/logs"}, + {"not-valid", "https://otlp-listener.logz.io/v1/logs"}, + {"", "https://otlp-listener.logz.io/v1/logs"}, + {"US", "https://otlp-listener.logz.io/v1/logs"}, + {"Us", "https://otlp-listener.logz.io/v1/logs"}, + } + for _, test := range getListenerURLTests { + output := getLogsListenerURL(test.arg1) + require.Equal(t, test.expected, output) + } +} From 57aa9b0b447487b16512c970c2daaad3a75b21ac Mon Sep 17 00:00:00 2001 From: Yotam loewenbach Date: Mon, 28 Oct 2024 16:30:15 +0200 Subject: [PATCH 08/14] Use `zap.logger` instead of `hclog.Logger` + add `partialSuccessHandler` + mask token from url + set `User-Agent` --- exporter/logzioexporter/exporter.go | 207 +++++++++++++++++----------- 1 file changed, 130 insertions(+), 77 deletions(-) diff --git a/exporter/logzioexporter/exporter.go b/exporter/logzioexporter/exporter.go index de2f7cbd0e71..f4216d438034 100644 --- a/exporter/logzioexporter/exporter.go +++ b/exporter/logzioexporter/exporter.go @@ -11,21 +11,23 @@ import ( "fmt" "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/pdata/plog/plogotlp" + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "go.uber.org/zap" + "google.golang.org/grpc/codes" + st "google.golang.org/grpc/status" "io" "net/http" - "reflect" + "regexp" + "runtime" "strconv" "time" - "github.com/hashicorp/go-hclog" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/cache" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" - "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/ptrace" "google.golang.org/genproto/googleapis/rpc/status" @@ -35,31 +37,31 @@ import ( ) const ( - loggerName = "logzio-exporter" headerRetryAfter = "Retry-After" maxHTTPResponseReadBytes = 64 * 1024 + jsonContentType = "application/json" + protobufContentType = "application/x-protobuf" ) +type partialSuccessHandler func(bytes []byte, contentType string) error + // logzioExporter implements an OpenTelemetry trace exporter that exports all spans to Logz.io type logzioExporter struct { config *Config client *http.Client - logger hclog.Logger + logger *zap.Logger settings component.TelemetrySettings serviceCache cache.Cache } func newLogzioExporter(cfg *Config, params exporter.Settings) (*logzioExporter, error) { - logger := hclog2ZapLogger{ - Zap: params.Logger, - name: loggerName, - } + logger := params.Logger if cfg == nil { return nil, errors.New("exporter config can't be null") } return &logzioExporter{ config: cfg, - logger: &logger, + logger: logger, settings: params.TelemetrySettings, serviceCache: cache.NewLRUWithOptions( 100000, @@ -76,12 +78,11 @@ func newLogzioTracesExporter(config *Config, set exporter.Settings) (exporter.Tr return nil, err } exporter.config.ClientConfig.Endpoint, err = generateTracesEndpoint(config) - if err != nil { - return nil, err - } config.checkAndWarnDeprecatedOptions(exporter.logger) + userAgent := fmt.Sprintf("otel-collector-logzio-traces-exporter-%s-%s-%s", set.BuildInfo.Version, runtime.GOOS, runtime.GOARCH) exporter.config.ClientConfig.Headers = map[string]configopaque.String{ - "Content-Type": "application/json", + "Content-Type": jsonContentType, + "User-Agent": configopaque.String(userAgent), } return exporterhelper.NewTracesExporter( context.Background(), @@ -101,12 +102,11 @@ func newLogzioLogsExporter(config *Config, set exporter.Settings) (exporter.Logs return nil, err } exporter.config.ClientConfig.Endpoint, err = generateLogsEndpoint(config) + userAgent := fmt.Sprintf("otel-collector-logzio-logs-exporter-%s-%s-%s", set.BuildInfo.Version, runtime.GOOS, runtime.GOARCH) exporter.config.ClientConfig.Headers = map[string]configopaque.String{ "Authorization": "Bearer " + config.Token, - "Content-Type": "application/x-protobuf", - } - if err != nil { - return nil, err + "Content-Type": protobufContentType, + "User-Agent": configopaque.String(userAgent), } config.checkAndWarnDeprecatedOptions(exporter.logger) return exporterhelper.NewLogsExporter( @@ -133,40 +133,13 @@ func (exporter *logzioExporter) start(ctx context.Context, host component.Host) func (exporter *logzioExporter) pushLogData(ctx context.Context, ld plog.Logs) error { tr := plogotlp.NewExportRequestFromLogs(ld) + var err error var request []byte - request, err := tr.MarshalProto() + request, err = tr.MarshalProto() if err != nil { return consumererror.NewPermanent(err) } - return exporter.export(ctx, exporter.config.ClientConfig.Endpoint, request) -} - -func mergeMapEntries(maps ...pcommon.Map) pcommon.Map { - res := map[string]any{} - for _, m := range maps { - for key, val := range m.AsRaw() { - // Check if the key was already added - if resMapValue, keyExists := res[key]; keyExists { - rt := reflect.TypeOf(resMapValue) - switch rt.Kind() { - case reflect.Slice: - res[key] = append(resMapValue.([]any), val) - default: - // Create a new slice and append values if the key exists: - valslice := []any{} - res[key] = append(valslice, resMapValue, val) - } - } else { - res[key] = val - } - } - } - pcommonRes := pcommon.NewMap() - err := pcommonRes.FromRaw(res) - if err != nil { - return pcommon.Map{} - } - return pcommonRes + return exporter.export(ctx, exporter.config.ClientConfig.Endpoint, request, exporter.logsPartialSuccessHandler) } func (exporter *logzioExporter) pushTraceData(ctx context.Context, traces ptrace.Traces) error { @@ -207,23 +180,20 @@ func (exporter *logzioExporter) pushTraceData(ctx context.Context, traces ptrace } } } - err := exporter.export(ctx, exporter.config.ClientConfig.Endpoint, dataBuffer.Bytes()) + err := exporter.export(ctx, exporter.config.ClientConfig.Endpoint, dataBuffer.Bytes(), exporter.tracesPartialSuccessHandler) // reset the data buffer after each export to prevent duplicated data dataBuffer.Reset() return err } -// export is similar to otlphttp export method with changes in log messages + Permanent error for `StatusUnauthorized` and `StatusForbidden` -// https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/otlphttpexporter/otlp.go#L127 -func (exporter *logzioExporter) export(ctx context.Context, url string, request []byte) error { - exporter.logger.Debug(fmt.Sprintf("Preparing to make HTTP request with %d bytes", len(request))) +func (exporter *logzioExporter) export(ctx context.Context, url string, request []byte, partialSuccessHandler partialSuccessHandler) error { + maskedURL := regexp.MustCompile(`(token=)[^&]+`).ReplaceAllString(url, `$1****`) + exporter.logger.Debug("Preparing to make HTTP request", zap.String("url", maskedURL), zap.Int("request_size", len(request))) req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(request)) if err != nil { return consumererror.NewPermanent(err) } - for key, value := range exporter.config.ClientConfig.Headers { - req.Header.Set(key, string(value)) - } + resp, err := exporter.client.Do(req) if err != nil { return fmt.Errorf("failed to make an HTTP request: %w", err) @@ -235,43 +205,45 @@ func (exporter *logzioExporter) export(ctx context.Context, url string, request resp.Body.Close() }() exporter.logger.Debug(fmt.Sprintf("Response status code: %d", resp.StatusCode)) + if resp.StatusCode >= 200 && resp.StatusCode <= 299 { - // Request is successful. - return nil + return handlePartialSuccessResponse(resp, partialSuccessHandler) } + respStatus := readResponseStatus(resp) + // Format the error message. Use the status if it is present in the response. + var errString string var formattedErr error if respStatus != nil { - formattedErr = fmt.Errorf( + errString = fmt.Sprintf( "error exporting items, request to %s responded with HTTP Status Code %d, Message=%s, Details=%v", url, resp.StatusCode, respStatus.Message, respStatus.Details) } else { - formattedErr = fmt.Errorf( + errString = fmt.Sprintf( "error exporting items, request to %s responded with HTTP Status Code %d", url, resp.StatusCode) } + formattedErr = NewStatusFromMsgAndHTTPCode(errString, resp.StatusCode).Err() - // Check if the server is overwhelmed. - // See spec https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#throttling-1 - if resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable { - // Fallback to 0 if the Retry-After header is not present. This will trigger the - // default backoff policy by our caller (retry handler). + if isRetryableStatusCode(resp.StatusCode) { + // A retry duration of 0 seconds will trigger the default backoff policy + // of our caller (retry handler). retryAfter := 0 - if val := resp.Header.Get(headerRetryAfter); val != "" { + + // Check if the server is overwhelmed. + // See spec https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#otlphttp-throttling + isThrottleError := resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable + if val := resp.Header.Get(headerRetryAfter); isThrottleError && val != "" { if seconds, err2 := strconv.Atoi(val); err2 == nil { retryAfter = seconds } } - // Indicate to our caller to pause for the specified number of seconds. + return exporterhelper.NewThrottleRetry(formattedErr, time.Duration(retryAfter)*time.Second) } - if resp.StatusCode == http.StatusBadRequest || resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { - return consumererror.NewPermanent(formattedErr) - } - // All other errors are retryable, so don't wrap them in consumererror.NewPermanent(). - return formattedErr + return consumererror.NewPermanent(formattedErr) } func readResponseBody(resp *http.Response) ([]byte, error) { @@ -330,18 +302,49 @@ func readResponseStatus(resp *http.Response) *status.Status { return respStatus } + +func (exporter *logzioExporter) tracesPartialSuccessHandler(protoBytes []byte, contentType string) error { + if protoBytes == nil { + return nil + } + exportResponse := ptraceotlp.NewExportResponse() + switch contentType { + case protobufContentType: + err := exportResponse.UnmarshalProto(protoBytes) + if err != nil { + return fmt.Errorf("error parsing protobuf response: %w", err) + } + case jsonContentType: + err := exportResponse.UnmarshalJSON(protoBytes) + if err != nil { + return fmt.Errorf("error parsing json response: %w", err) + } + default: + return nil + } + + partialSuccess := exportResponse.PartialSuccess() + if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedSpans() == 0) { + exporter.logger.Warn("Partial success response", + zap.String("message", exportResponse.PartialSuccess().ErrorMessage()), + zap.Int64("dropped_spans", exportResponse.PartialSuccess().RejectedSpans()), + ) + } + return nil +} + func (exporter *logzioExporter) logsPartialSuccessHandler(protoBytes []byte, contentType string) error { if protoBytes == nil { return nil } exportResponse := plogotlp.NewExportResponse() switch contentType { - case "application/x-protobuf": + case protobufContentType: err := exportResponse.UnmarshalProto(protoBytes) if err != nil { return fmt.Errorf("error parsing protobuf response: %w", err) } - case "application/json": + case jsonContentType: err := exportResponse.UnmarshalJSON(protoBytes) if err != nil { return fmt.Errorf("error parsing json response: %w", err) @@ -349,12 +352,11 @@ func (exporter *logzioExporter) logsPartialSuccessHandler(protoBytes []byte, con default: return nil } - partialSuccess := exportResponse.PartialSuccess() if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedLogRecords() == 0) { exporter.logger.Warn("Partial success response", - zap.String("message", partialSuccess.ErrorMessage()), - zap.Int64("dropped_log_records", partialSuccess.RejectedLogRecords()), + zap.String("message", exportResponse.PartialSuccess().ErrorMessage()), + zap.Int64("dropped_log_records", exportResponse.PartialSuccess().RejectedLogRecords()), ) } return nil @@ -370,3 +372,54 @@ func (exporter *logzioExporter) dropEmptyTags(tags []model.KeyValue) []model.Key } return tags } + +func handlePartialSuccessResponse(resp *http.Response, partialSuccessHandler partialSuccessHandler) error { + bodyBytes, err := readResponseBody(resp) + if err != nil { + return err + } + + return partialSuccessHandler(bodyBytes, resp.Header.Get("Content-Type")) +} + +// Determine if the status code is retryable according to the specification. +// For more, see https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#failures-1 +func isRetryableStatusCode(code int) bool { + switch code { + case http.StatusTooManyRequests: + return true + case http.StatusBadGateway: + return true + case http.StatusServiceUnavailable: + return true + case http.StatusGatewayTimeout: + return true + default: + return false + } +} + +// NewStatusFromMsgAndHTTPCode returns a gRPC status based on an error message string and a http status code. +// This function is shared between the http receiver and http exporter for error propagation. +func NewStatusFromMsgAndHTTPCode(errMsg string, statusCode int) *st.Status { + var c codes.Code + // Mapping based on https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md + // 429 mapping to ResourceExhausted and 400 mapping to StatusBadRequest are exceptions. + switch statusCode { + case http.StatusBadRequest: + c = codes.InvalidArgument + case http.StatusUnauthorized: + c = codes.Unauthenticated + case http.StatusForbidden: + c = codes.PermissionDenied + case http.StatusNotFound: + c = codes.Unimplemented + case http.StatusTooManyRequests: + c = codes.ResourceExhausted + case http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: + c = codes.Unavailable + default: + c = codes.Unknown + } + return st.New(c, errMsg) +} From 447a44e061f1eecd70441c25529b997c698243c1 Mon Sep 17 00:00:00 2001 From: Yotam loewenbach Date: Mon, 28 Oct 2024 16:30:37 +0200 Subject: [PATCH 09/14] Add new unit tests --- exporter/logzioexporter/exporter_test.go | 252 +++++++++++++++++++---- 1 file changed, 212 insertions(+), 40 deletions(-) diff --git a/exporter/logzioexporter/exporter_test.go b/exporter/logzioexporter/exporter_test.go index fde5d753f365..56646806e8f7 100644 --- a/exporter/logzioexporter/exporter_test.go +++ b/exporter/logzioexporter/exporter_test.go @@ -8,7 +8,15 @@ import ( "compress/gzip" "context" "encoding/json" + "errors" "fmt" + "go.opentelemetry.io/collector/pdata/plog/plogotlp" + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" + "google.golang.org/genproto/googleapis/rpc/status" + "google.golang.org/grpc/codes" + "google.golang.org/protobuf/proto" "io" "net/http" "net/http/httptest" @@ -276,7 +284,6 @@ func TestPushLogsData(tester *testing.T) { })) clientConfig := confighttp.NewDefaultClientConfig() clientConfig.Endpoint = server.URL - clientConfig.Compression = configcompression.TypeGzip cfg := Config{ Token: "token", Region: "", @@ -289,45 +296,210 @@ func TestPushLogsData(tester *testing.T) { res.Attributes().PutStr(conventions.AttributeHostName, testHost) err := testLogsExporter(ld, tester, &cfg) require.NoError(tester, err) - var jsonLog map[string]any - decoded, _ := gUnzipData(recordedRequests) - requests := strings.Split(string(decoded), "\n") - assert.NoError(tester, json.Unmarshal([]byte(requests[0]), &jsonLog)) - assert.Equal(tester, testHost, jsonLog["host.name"]) - assert.Equal(tester, testService, jsonLog["service.name"]) - assert.Equal(tester, "server", jsonLog["app"]) - assert.Equal(tester, 1.0, jsonLog["instance_num"]) - assert.Equal(tester, "logScopeName", jsonLog["scopeName"]) - assert.Equal(tester, "hello there", jsonLog["message"]) - assert.Equal(tester, "bar", jsonLog["foo"]) - assert.Equal(tester, 45.0, jsonLog["23"]) + requests := plogotlp.NewExportRequest() + err = requests.UnmarshalProto(recordedRequests) + require.NoError(tester, err) + resultLogs := requests.Logs() + assert.Equal(tester, ld.LogRecordCount(), resultLogs.LogRecordCount()) + assert.Equal(tester, ld.ResourceLogs().At(0).Resource().Attributes().AsRaw(), resultLogs.ResourceLogs().At(0).Resource().Attributes().AsRaw()) + assert.Equal(tester, ld.ResourceLogs(), resultLogs.ResourceLogs()) +} + +func TestTracesPartialSuccessHandler(t *testing.T) { + exporter, err := newLogzioExporter(&Config{}, exportertest.NewNopSettings()) + require.NoError(t, err) + + exportResponse := ptraceotlp.NewExportResponse() + partial := exportResponse.PartialSuccess() + partial.SetErrorMessage("Partial success error") + partial.SetRejectedSpans(5) + + protoBytes, err := exportResponse.MarshalProto() + require.NoError(t, err) + + logger, _ := observer.New(zap.WarnLevel) + exporter.logger = zap.New(logger) + + err = exporter.tracesPartialSuccessHandler(protoBytes, protobufContentType) + require.NoError(t, err) + + err = exporter.tracesPartialSuccessHandler([]byte{0xFF}, protobufContentType) + require.Error(t, err) + require.Contains(t, err.Error(), "error parsing protobuf response") + + err = exporter.tracesPartialSuccessHandler(nil, protobufContentType) + require.NoError(t, err) + + err = exporter.tracesPartialSuccessHandler(protoBytes, "unknown/content-type") + require.NoError(t, err) } -func TestMergeMapEntries(tester *testing.T) { - var firstMap = pcommon.NewMap() - var secondMap = pcommon.NewMap() - var expectedMap = pcommon.NewMap() - firstMap.PutStr("name", "exporter") - firstMap.PutStr("host", "localhost") - firstMap.PutStr("instanceNum", "1") - firstMap.PutInt("id", 4) - secondMap.PutStr("tag", "test") - secondMap.PutStr("host", "ec2") - secondMap.PutInt("instanceNum", 3) - secondMap.PutEmptyMap("id").PutInt("instance_a", 1) - expectedMap.PutStr("name", "exporter") - expectedMap.PutStr("tag", "test") - var slice = expectedMap.PutEmptySlice("host") - slice.AppendEmpty().SetStr("localhost") - slice.AppendEmpty().SetStr("ec2") - slice = expectedMap.PutEmptySlice("instanceNum") - var val = slice.AppendEmpty() - val.SetStr("1") - val = slice.AppendEmpty() - val.SetInt(3) - slice = expectedMap.PutEmptySlice("id") - slice.AppendEmpty().SetInt(4) - slice.AppendEmpty().SetEmptyMap().PutInt("instance_a", 1) - var mergedMap = mergeMapEntries(firstMap, secondMap) - assert.Equal(tester, expectedMap.AsRaw(), mergedMap.AsRaw()) +func TestLogsPartialSuccessHandler(t *testing.T) { + exporter, err := newLogzioExporter(&Config{}, exportertest.NewNopSettings()) + require.NoError(t, err) + // Create a valid ExportResponse with PartialSuccess information + exportResponse := plogotlp.NewExportResponse() + partial := exportResponse.PartialSuccess() + partial.SetErrorMessage("Partial success error") + partial.SetRejectedLogRecords(5) + + protoBytes, err := exportResponse.MarshalProto() + require.NoError(t, err) + + logger, logs := observer.New(zap.WarnLevel) + exporter.logger = zap.New(logger) + + err = exporter.logsPartialSuccessHandler(protoBytes, protobufContentType) + require.NoError(t, err) + + warnLogs := logs.FilterLevelExact(zap.WarnLevel).All() + require.Len(t, warnLogs, 1) + require.Contains(t, warnLogs[0].Message, "Partial success response") + require.Equal(t, "Partial success error", warnLogs[0].ContextMap()["message"]) + require.Equal(t, int64(5), warnLogs[0].ContextMap()["dropped_log_records"]) + + // Now test with invalid protoBytes + err = exporter.logsPartialSuccessHandler([]byte{0xFF}, protobufContentType) + require.Error(t, err) + require.Contains(t, err.Error(), "error parsing protobuf response") + + // Test with nil protoBytes + err = exporter.logsPartialSuccessHandler(nil, protobufContentType) + require.NoError(t, err) + + // Test with unknown content type + err = exporter.logsPartialSuccessHandler(protoBytes, "unknown/content-type") + require.NoError(t, err) +} + +func TestReadResponseStatus(t *testing.T) { + tests := []struct { + name string + response *http.Response + expectedStatus *status.Status + }{ + { + name: "Valid status message", + response: &http.Response{ + StatusCode: http.StatusBadRequest, + Body: io.NopCloser(bytes.NewReader(func() []byte { + statusProto := &status.Status{ + Code: int32(codes.InvalidArgument), + Message: "Invalid argument", + } + statusBytes, _ := proto.Marshal(statusProto) + return statusBytes + }())), + Header: http.Header{"Content-Type": []string{protobufContentType}}, + ContentLength: int64(len(func() []byte { + statusProto := &status.Status{ + Code: int32(codes.InvalidArgument), + Message: "Invalid argument", + } + statusBytes, _ := proto.Marshal(statusProto) + return statusBytes + }())), + }, + expectedStatus: &status.Status{ + Code: int32(codes.InvalidArgument), + Message: "Invalid argument", + }, + }, + { + name: "Invalid status message", + response: &http.Response{ + StatusCode: http.StatusBadRequest, + Body: io.NopCloser(bytes.NewReader([]byte("invalid"))), + Header: http.Header{"Content-Type": []string{protobufContentType}}, + ContentLength: int64(len([]byte("invalid"))), + }, + expectedStatus: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + respStatus := readResponseStatus(tt.response) + if tt.expectedStatus != nil { + require.NotNil(t, respStatus) + require.Equal(t, tt.expectedStatus.Message, respStatus.Message) + } else { + require.Nil(t, respStatus) + } + }) + } +} + +func TestReadResponseBody(t *testing.T) { + tests := []struct { + name string + response *http.Response + expectedOutput []byte + expectedError error + }{ + { + name: "Empty body", + response: &http.Response{ + ContentLength: 0, + Body: io.NopCloser(bytes.NewReader(nil)), + }, + expectedOutput: nil, + expectedError: nil, + }, + { + name: "Valid body", + response: &http.Response{ + ContentLength: 5, + Body: io.NopCloser(bytes.NewReader([]byte("hello"))), + }, + expectedOutput: []byte("hello"), + expectedError: nil, + }, + { + name: "Body larger than max read", + response: &http.Response{ + ContentLength: 100000, + Body: io.NopCloser(bytes.NewReader(make([]byte, 100000))), + }, + expectedOutput: make([]byte, maxHTTPResponseReadBytes), + expectedError: nil, + }, + { + name: "Unexpected EOF", + response: &http.Response{ + ContentLength: -1, + Body: io.NopCloser(bytes.NewReader([]byte("partial"))), + }, + expectedOutput: []byte("partial"), + expectedError: nil, + }, + { + name: "Error reading body", + response: &http.Response{ + ContentLength: 5, + Body: io.NopCloser(&errorReader{}), + }, + expectedOutput: nil, + expectedError: errors.New("read error"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + output, err := readResponseBody(tt.response) + if tt.expectedError != nil { + require.Error(t, err) + require.Contains(t, err.Error(), tt.expectedError.Error()) + } else { + require.NoError(t, err) + } + require.Equal(t, tt.expectedOutput, output) + }) + } +} + +type errorReader struct{} + +func (e *errorReader) Read(p []byte) (n int, err error) { + return 0, errors.New("read error") } From 42f59016c5c3e5173182cbf2da46a938bc318576 Mon Sep 17 00:00:00 2001 From: Yotam loewenbach Date: Mon, 28 Oct 2024 16:30:51 +0200 Subject: [PATCH 10/14] Remove unused files --- exporter/logzioexporter/jsonlog.go | 46 ---------- exporter/logzioexporter/jsonlog_test.go | 117 ------------------------ exporter/logzioexporter/logger.go | 115 ----------------------- exporter/logzioexporter/logger_test.go | 50 ---------- 4 files changed, 328 deletions(-) delete mode 100644 exporter/logzioexporter/jsonlog.go delete mode 100644 exporter/logzioexporter/jsonlog_test.go delete mode 100644 exporter/logzioexporter/logger.go delete mode 100644 exporter/logzioexporter/logger_test.go diff --git a/exporter/logzioexporter/jsonlog.go b/exporter/logzioexporter/jsonlog.go deleted file mode 100644 index 391b61e166ea..000000000000 --- a/exporter/logzioexporter/jsonlog.go +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package logzioexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/logzioexporter" - -import ( - "encoding/hex" - - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/plog" -) - -// convertLogRecordToJSON Takes `plog.LogRecord` and `pcommon.Resource` input, outputs byte array that represents the log record as json string -func convertLogRecordToJSON(log plog.LogRecord, attributes pcommon.Map) map[string]any { - jsonLog := map[string]any{} - if spanID := log.SpanID(); !spanID.IsEmpty() { - jsonLog["spanID"] = hex.EncodeToString(spanID[:]) - } - if traceID := log.TraceID(); !traceID.IsEmpty() { - jsonLog["traceID"] = hex.EncodeToString(traceID[:]) - } - if log.SeverityText() != "" { - jsonLog["level"] = log.SeverityText() - } - // try to set timestamp if exists - if log.Timestamp().AsTime().UnixMilli() != 0 { - jsonLog["@timestamp"] = log.Timestamp().AsTime().UnixMilli() - } - - // Add merged attributed to each json log - attributes.Range(func(k string, v pcommon.Value) bool { - jsonLog[k] = v.AsRaw() - return true - }) - - switch log.Body().Type() { - case pcommon.ValueTypeStr: - jsonLog["message"] = log.Body().Str() - case pcommon.ValueTypeMap: - bodyFieldsMap := log.Body().Map().AsRaw() - for key, value := range bodyFieldsMap { - jsonLog[key] = value - } - } - return jsonLog -} diff --git a/exporter/logzioexporter/jsonlog_test.go b/exporter/logzioexporter/jsonlog_test.go deleted file mode 100644 index 2e4962e20e55..000000000000 --- a/exporter/logzioexporter/jsonlog_test.go +++ /dev/null @@ -1,117 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package logzioexporter - -import ( - "context" - "encoding/json" - "io" - "net/http" - "net/http/httptest" - "strings" - "testing" - - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/configcompression" - "go.opentelemetry.io/collector/config/confighttp" - "go.opentelemetry.io/collector/exporter/exportertest" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/plog" -) - -// Logs -func generateLogRecordWithNestedBody() plog.LogRecord { - lr := plog.NewLogRecord() - fillLogOne(lr) - return lr -} -func generateLogRecordWithMultiTypeValues() plog.LogRecord { - lr := plog.NewLogRecord() - fillLogTwo(lr) - return lr -} - -func TestConvertLogRecordToJSON(t *testing.T) { - type convertLogRecordToJSONTest struct { - log plog.LogRecord - resource pcommon.Resource - expected map[string]any - } - - var convertLogRecordToJSONTests = []convertLogRecordToJSONTest{ - {generateLogRecordWithNestedBody(), - pcommon.NewResource(), - map[string]any{ - "23": float64(45), - "app": "server", - "foo": "bar", - "instance_num": float64(1), - "level": "Info", - "message": "hello there", - "@timestamp": TestLogTimeUnixMilli, - "nested": map[string]any{"number": float64(499), "string": "v1"}, - "spanID": "0102040800000000", - "traceID": "08040201000000000000000000000000", - }, - }, - {generateLogRecordWithMultiTypeValues(), - pcommon.NewResource(), - map[string]any{ - "bool": true, - "customer": "acme", - "env": "dev", - "level": "Info", - "@timestamp": TestLogTimeUnixMilli, - "message": "something happened", - "number": float64(64), - }, - }, - } - for _, test := range convertLogRecordToJSONTests { - output := convertLogRecordToJSON(test.log, test.log.Attributes()) - require.Equal(t, test.expected, output) - } -} - -func TestSetTimeStamp(t *testing.T) { - var recordedRequests []byte - server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - recordedRequests, _ = io.ReadAll(req.Body) - rw.WriteHeader(http.StatusOK) - })) - defer func() { server.Close() }() - ld := generateLogsOneEmptyTimestamp() - clientConfig := confighttp.NewDefaultClientConfig() - clientConfig.Endpoint = server.URL - clientConfig.Compression = configcompression.TypeGzip - cfg := &Config{ - Region: "us", - Token: "token", - ClientConfig: clientConfig, - } - var err error - params := exportertest.NewNopSettings() - exporter, err := createLogsExporter(context.Background(), params, cfg) - require.NoError(t, err) - err = exporter.Start(context.Background(), componenttest.NewNopHost()) - require.NoError(t, err) - ctx := context.Background() - err = exporter.ConsumeLogs(ctx, ld) - require.NoError(t, err) - err = exporter.Shutdown(ctx) - require.NoError(t, err) - var jsonLog map[string]any - var jsonLogNoTimestamp map[string]any - decoded, _ := gUnzipData(recordedRequests) - requests := strings.Split(string(decoded), "\n") - require.NoError(t, json.Unmarshal([]byte(requests[0]), &jsonLog)) - require.NoError(t, json.Unmarshal([]byte(requests[1]), &jsonLogNoTimestamp)) - if jsonLogNoTimestamp["@timestamp"] != nil { - t.Fatalf("did not expect @timestamp") - } - if jsonLog["@timestamp"] == nil { - t.Fatalf("@timestamp does not exist") - } -} diff --git a/exporter/logzioexporter/logger.go b/exporter/logzioexporter/logger.go deleted file mode 100644 index 810e494d5a34..000000000000 --- a/exporter/logzioexporter/logger.go +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package logzioexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/logzioexporter" - -import ( - "fmt" - "io" - "log" - - "github.com/hashicorp/go-hclog" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" -) - -// hclog2ZapLogger implements Hashicorp's hclog.Logger interface using Uber's zap.Logger. It's a workaround for plugin -// system. go-plugin doesn't support other logger than hclog. This logger implements only methods used by the go-plugin. -type hclog2ZapLogger struct { - Zap *zap.Logger - name string -} - -func (l *hclog2ZapLogger) Log(_ hclog.Level, _ string, _ ...any) {} - -func (l *hclog2ZapLogger) ImpliedArgs() []any { - return nil -} - -func (l *hclog2ZapLogger) Name() string { - return l.name -} - -func (l *hclog2ZapLogger) StandardWriter(_ *hclog.StandardLoggerOptions) io.Writer { - return nil -} - -// Trace implementation. -func (l *hclog2ZapLogger) Trace(_ string, _ ...any) {} - -// Debug implementation. -func (l *hclog2ZapLogger) Debug(msg string, args ...any) { - l.Zap.Debug(msg, argsToFields(args...)...) -} - -// Info implementation. -func (l *hclog2ZapLogger) Info(msg string, args ...any) { - l.Zap.Info(msg, argsToFields(args...)...) -} - -// Warn implementation. -func (l *hclog2ZapLogger) Warn(msg string, args ...any) { - l.Zap.Warn(msg, argsToFields(args...)...) -} - -// Error implementation. -func (l *hclog2ZapLogger) Error(msg string, args ...any) { - l.Zap.Error(msg, argsToFields(args...)...) -} - -// IsTrace implementation. -func (l *hclog2ZapLogger) IsTrace() bool { return false } - -// IsDebug implementation. -func (l *hclog2ZapLogger) IsDebug() bool { return false } - -// IsInfo implementation. -func (l *hclog2ZapLogger) IsInfo() bool { return false } - -// IsWarn implementation. -func (l *hclog2ZapLogger) IsWarn() bool { return false } - -// IsError implementation. -func (l *hclog2ZapLogger) IsError() bool { return false } - -// With implementation. -func (l *hclog2ZapLogger) With(args ...any) hclog.Logger { - return &hclog2ZapLogger{Zap: l.Zap.With(argsToFields(args...)...)} -} - -// Named implementation. -func (l *hclog2ZapLogger) Named(name string) hclog.Logger { - return &hclog2ZapLogger{Zap: l.Zap.Named(name)} -} - -// ResetNamed implementation. -func (l *hclog2ZapLogger) ResetNamed(_ string) hclog.Logger { - // no need to implement that as go-plugin doesn't use this method. - return &hclog2ZapLogger{} -} - -// SetLevel implementation. -func (l *hclog2ZapLogger) SetLevel(_ hclog.Level) { - // no need to implement that as go-plugin doesn't use this method. -} - -// GetLevel implementation. -func (l *hclog2ZapLogger) GetLevel() hclog.Level { - // no need to implement that as go-plugin doesn't use this method. - return hclog.NoLevel -} - -// StandardLogger implementation. -func (l *hclog2ZapLogger) StandardLogger(_ *hclog.StandardLoggerOptions) *log.Logger { - // no need to implement that as go-plugin doesn't use this method. - return log.New(io.Discard, "", 0) -} - -func argsToFields(args ...any) []zapcore.Field { - var fields []zapcore.Field - for i := 0; i < len(args); i += 2 { - fields = append(fields, zap.String(args[i].(string), fmt.Sprintf("%v", args[i+1]))) - } - - return fields -} diff --git a/exporter/logzioexporter/logger_test.go b/exporter/logzioexporter/logger_test.go deleted file mode 100644 index 3c74dbf60723..000000000000 --- a/exporter/logzioexporter/logger_test.go +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 -package logzioexporter - -import ( - "testing" - - "github.com/hashicorp/go-hclog" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" -) - -func TestLoggerConfigs(tester *testing.T) { - zapLogger := zap.NewExample() - exporterLogger := hclog2ZapLogger{ - Zap: zapLogger, - name: loggerName, - } - - assert.Equal(tester, loggerName, exporterLogger.Name()) - assert.NotNil(tester, exporterLogger.Named("logger")) - assert.NotNil(tester, exporterLogger.With("key", "val")) - assert.NotNil(tester, exporterLogger.ResetNamed(loggerName)) - assert.NotNil(tester, exporterLogger.StandardLogger(nil)) - assert.Nil(tester, exporterLogger.StandardWriter(nil)) - - assert.False(tester, exporterLogger.IsTrace()) - assert.False(tester, exporterLogger.IsDebug()) - assert.False(tester, exporterLogger.IsInfo()) - assert.False(tester, exporterLogger.IsWarn()) - assert.False(tester, exporterLogger.IsError()) -} - -func TestLogger(tester *testing.T) { - zapLogger := zap.NewExample() - exporterLogger := hclog2ZapLogger{ - Zap: zapLogger, - name: loggerName, - } - - loggerFunc := func() { - exporterLogger.Trace("Trace msg") - exporterLogger.Debug("Debug msg") - exporterLogger.Info("Info msg") - exporterLogger.Warn("Warn msg") - exporterLogger.Error("Error msg") - exporterLogger.Log(hclog.Debug, "log msg") - } - assert.NotPanics(tester, loggerFunc, "did not panic") -} From 6fb26e597f6422ca4b9a81bb5b4d2b22d9d5db40 Mon Sep 17 00:00:00 2001 From: Yotam loewenbach Date: Tue, 29 Oct 2024 19:09:54 +0200 Subject: [PATCH 11/14] changelog --- .chloggen/logzioexporter-enhancement.yaml | 27 +++++++++++++++++++ .../logzioexporter-hclog-deprecation.yaml | 27 +++++++++++++++++++ .../logzioexporter-jsonlog-deprication.yaml | 27 +++++++++++++++++++ 3 files changed, 81 insertions(+) create mode 100644 .chloggen/logzioexporter-enhancement.yaml create mode 100644 .chloggen/logzioexporter-hclog-deprecation.yaml create mode 100644 .chloggen/logzioexporter-jsonlog-deprication.yaml diff --git a/.chloggen/logzioexporter-enhancement.yaml b/.chloggen/logzioexporter-enhancement.yaml new file mode 100644 index 000000000000..4c76c7ce06dc --- /dev/null +++ b/.chloggen/logzioexporter-enhancement.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: logzioexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Support for partial success handlers, support `plogotlp.ExportRequest` struct for logs export, mask secrets from the collector log messages + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.chloggen/logzioexporter-hclog-deprecation.yaml b/.chloggen/logzioexporter-hclog-deprecation.yaml new file mode 100644 index 000000000000..415cc5355ba9 --- /dev/null +++ b/.chloggen/logzioexporter-hclog-deprecation.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: logzioexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Deprecates the `hclog` logger, and use `zap.logger` + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.chloggen/logzioexporter-jsonlog-deprication.yaml b/.chloggen/logzioexporter-jsonlog-deprication.yaml new file mode 100644 index 000000000000..d3dc58c265d3 --- /dev/null +++ b/.chloggen/logzioexporter-jsonlog-deprication.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: logzioexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Deprecates `jsonlog` usage and replace with `plogotlp.ExportRequest` + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] From 2c11241476226f13b89ea4614f747e157bfee9c8 Mon Sep 17 00:00:00 2001 From: Yotam loewenbach Date: Tue, 29 Oct 2024 19:17:46 +0200 Subject: [PATCH 12/14] link changelog to pr --- .chloggen/logzioexporter-enhancement.yaml | 2 +- .chloggen/logzioexporter-hclog-deprecation.yaml | 2 +- .chloggen/logzioexporter-jsonlog-deprication.yaml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.chloggen/logzioexporter-enhancement.yaml b/.chloggen/logzioexporter-enhancement.yaml index 4c76c7ce06dc..33348b509506 100644 --- a/.chloggen/logzioexporter-enhancement.yaml +++ b/.chloggen/logzioexporter-enhancement.yaml @@ -10,7 +10,7 @@ component: logzioexporter note: Support for partial success handlers, support `plogotlp.ExportRequest` struct for logs export, mask secrets from the collector log messages # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. -issues: [] +issues: [36073] # (Optional) One or more lines of additional information to render under the primary note. # These lines will be padded with 2 spaces and then inserted directly into the document. diff --git a/.chloggen/logzioexporter-hclog-deprecation.yaml b/.chloggen/logzioexporter-hclog-deprecation.yaml index 415cc5355ba9..db871b999c39 100644 --- a/.chloggen/logzioexporter-hclog-deprecation.yaml +++ b/.chloggen/logzioexporter-hclog-deprecation.yaml @@ -10,7 +10,7 @@ component: logzioexporter note: Deprecates the `hclog` logger, and use `zap.logger` # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. -issues: [] +issues: [36073] # (Optional) One or more lines of additional information to render under the primary note. # These lines will be padded with 2 spaces and then inserted directly into the document. diff --git a/.chloggen/logzioexporter-jsonlog-deprication.yaml b/.chloggen/logzioexporter-jsonlog-deprication.yaml index d3dc58c265d3..eb006f595a07 100644 --- a/.chloggen/logzioexporter-jsonlog-deprication.yaml +++ b/.chloggen/logzioexporter-jsonlog-deprication.yaml @@ -10,7 +10,7 @@ component: logzioexporter note: Deprecates `jsonlog` usage and replace with `plogotlp.ExportRequest` # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. -issues: [] +issues: [36073] # (Optional) One or more lines of additional information to render under the primary note. # These lines will be padded with 2 spaces and then inserted directly into the document. From 0f190bdfa65739cf34c8c6bc23d085190a8ac58a Mon Sep 17 00:00:00 2001 From: Yotam loewenbach Date: Sun, 17 Nov 2024 14:06:49 +0700 Subject: [PATCH 13/14] fix ineffectual assignment to err --- exporter/logzioexporter/exporter.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/exporter/logzioexporter/exporter.go b/exporter/logzioexporter/exporter.go index 18f71a161562..6310c64fa9fd 100644 --- a/exporter/logzioexporter/exporter.go +++ b/exporter/logzioexporter/exporter.go @@ -77,7 +77,11 @@ func newLogzioTracesExporter(config *Config, set exporter.Settings) (exporter.Tr if err != nil { return nil, err } - exporter.config.ClientConfig.Endpoint, err = generateTracesEndpoint(config) + endpoint, err := generateTracesEndpoint(config) + if err != nil { + return nil, err + } + exporter.config.ClientConfig.Endpoint = endpoint config.checkAndWarnDeprecatedOptions(exporter.logger) userAgent := fmt.Sprintf("otel-collector-logzio-traces-exporter-%s-%s-%s", set.BuildInfo.Version, runtime.GOOS, runtime.GOARCH) exporter.config.ClientConfig.Headers = map[string]configopaque.String{ @@ -96,12 +100,17 @@ func newLogzioTracesExporter(config *Config, set exporter.Settings) (exporter.Tr exporterhelper.WithRetry(config.BackOffConfig), ) } + func newLogzioLogsExporter(config *Config, set exporter.Settings) (exporter.Logs, error) { exporter, err := newLogzioExporter(config, set) if err != nil { return nil, err } - exporter.config.ClientConfig.Endpoint, err = generateLogsEndpoint(config) + endpoint, err := generateLogsEndpoint(config) + if err != nil { + return nil, err + } + exporter.config.ClientConfig.Endpoint = endpoint userAgent := fmt.Sprintf("otel-collector-logzio-logs-exporter-%s-%s-%s", set.BuildInfo.Version, runtime.GOOS, runtime.GOARCH) exporter.config.ClientConfig.Headers = map[string]configopaque.String{ "Authorization": "Bearer " + config.Token, From 85927ef37ed5086dcb6539c1ab19e09d4c20c8ba Mon Sep 17 00:00:00 2001 From: Yotam loewenbach Date: Sun, 17 Nov 2024 14:31:26 +0700 Subject: [PATCH 14/14] fix lint --- exporter/logzioexporter/config.go | 2 +- exporter/logzioexporter/exporter.go | 12 +++++----- exporter/logzioexporter/exporter_test.go | 29 +++++++----------------- 3 files changed, 15 insertions(+), 28 deletions(-) diff --git a/exporter/logzioexporter/config.go b/exporter/logzioexporter/config.go index e471a71f73a3..67d76991e409 100644 --- a/exporter/logzioexporter/config.go +++ b/exporter/logzioexporter/config.go @@ -5,12 +5,12 @@ package logzioexporter // import "github.com/open-telemetry/opentelemetry-collec import ( "errors" - "go.uber.org/zap" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.uber.org/zap" ) // Config contains Logz.io specific configuration such as Account TracesToken, Region, etc. diff --git a/exporter/logzioexporter/exporter.go b/exporter/logzioexporter/exporter.go index 6310c64fa9fd..4b912f5d227d 100644 --- a/exporter/logzioexporter/exporter.go +++ b/exporter/logzioexporter/exporter.go @@ -9,12 +9,6 @@ import ( "encoding/json" "errors" "fmt" - "go.opentelemetry.io/collector/config/configopaque" - "go.opentelemetry.io/collector/pdata/plog/plogotlp" - "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" - "go.uber.org/zap" - "google.golang.org/grpc/codes" - st "google.golang.org/grpc/status" "io" "net/http" "regexp" @@ -25,12 +19,18 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/cache" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/plog/plogotlp" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" + "go.uber.org/zap" "google.golang.org/genproto/googleapis/rpc/status" + "google.golang.org/grpc/codes" + st "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" diff --git a/exporter/logzioexporter/exporter_test.go b/exporter/logzioexporter/exporter_test.go index f2314fc6cd74..a35a7a4fa215 100644 --- a/exporter/logzioexporter/exporter_test.go +++ b/exporter/logzioexporter/exporter_test.go @@ -10,13 +10,6 @@ import ( "encoding/json" "errors" "fmt" - "go.opentelemetry.io/collector/pdata/plog/plogotlp" - "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" - "go.uber.org/zap" - "go.uber.org/zap/zaptest/observer" - "google.golang.org/genproto/googleapis/rpc/status" - "google.golang.org/grpc/codes" - "google.golang.org/protobuf/proto" "io" "net/http" "net/http/httptest" @@ -32,9 +25,16 @@ import ( "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/plog/plogotlp" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "go.opentelemetry.io/collector/pdata/testdata" conventions "go.opentelemetry.io/collector/semconv/v1.27.0" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" + "google.golang.org/genproto/googleapis/rpc/status" + "google.golang.org/grpc/codes" + "google.golang.org/protobuf/proto" ) const ( @@ -71,19 +71,6 @@ func fillLogOne(log plog.LogRecord) { attNestedMap.PutDouble("number", 499) } -func fillLogTwo(log plog.LogRecord) { - log.SetTimestamp(TestLogTimestamp) - log.SetDroppedAttributesCount(1) - log.SetSeverityNumber(plog.SeverityNumberInfo) - log.SetSeverityText("Info") - attrs := log.Attributes() - attrs.PutStr("customer", "acme") - attrs.PutDouble("number", 64) - attrs.PutBool("bool", true) - attrs.PutStr("env", "dev") - log.Body().SetStr("something happened") -} - func fillLogNoTimestamp(log plog.LogRecord) { log.SetDroppedAttributesCount(1) log.SetSeverityNumber(plog.SeverityNumberInfo) @@ -499,6 +486,6 @@ func TestReadResponseBody(t *testing.T) { type errorReader struct{} -func (e *errorReader) Read(p []byte) (n int, err error) { +func (e *errorReader) Read(_ []byte) (n int, err error) { return 0, errors.New("read error") }