Skip to content

Commit

Permalink
Use wrapper function instead of defer
Browse files Browse the repository at this point in the history
  • Loading branch information
mwear committed Nov 22, 2024
1 parent 763363a commit 0942592
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 49 deletions.
8 changes: 4 additions & 4 deletions exporter/otlpexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func createTraces(
oce := newExporter(cfg, set)
oCfg := cfg.(*Config)
return exporterhelper.NewTraces(ctx, set, cfg,
oce.pushTraces,
oce.pushTracesWithStatus,
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithTimeout(oCfg.TimeoutConfig),
exporterhelper.WithRetry(oCfg.RetryConfig),
Expand All @@ -78,7 +78,7 @@ func createMetrics(
oce := newExporter(cfg, set)
oCfg := cfg.(*Config)
return exporterhelper.NewMetrics(ctx, set, cfg,
oce.pushMetrics,
oce.pushMetricsWithStatus,
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithTimeout(oCfg.TimeoutConfig),
exporterhelper.WithRetry(oCfg.RetryConfig),
Expand All @@ -97,7 +97,7 @@ func createLogs(
oce := newExporter(cfg, set)
oCfg := cfg.(*Config)
return exporterhelper.NewLogs(ctx, set, cfg,
oce.pushLogs,
oce.pushLogsWithStatus,
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithTimeout(oCfg.TimeoutConfig),
exporterhelper.WithRetry(oCfg.RetryConfig),
Expand All @@ -116,7 +116,7 @@ func createProfilesExporter(
oce := newExporter(cfg, set)
oCfg := cfg.(*Config)
return exporterhelperprofiles.NewProfilesExporter(ctx, set, cfg,
oce.pushProfiles,
oce.pushProfilesWithStatus,
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithTimeout(oCfg.TimeoutConfig),
exporterhelper.WithRetry(oCfg.RetryConfig),
Expand Down
77 changes: 48 additions & 29 deletions exporter/otlpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,11 @@ func (e *baseExporter) shutdown(context.Context) error {
return nil
}

func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) (err error) {
defer func() {
e.reportStatusFromError(err)
}()
func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
req := ptraceotlp.NewExportRequestFromTraces(td)
resp, respErr := e.traceExporter.Export(e.enhanceContext(ctx), req, e.callOptions...)
if err = processError(respErr); err != nil {
return
if err := processError(respErr); err != nil {
return err
}
partialSuccess := resp.PartialSuccess()
if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedSpans() == 0) {
Expand All @@ -108,17 +105,23 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) (err er
zap.Int64("dropped_spans", resp.PartialSuccess().RejectedSpans()),
)
}
return
return nil
}

func (e *baseExporter) pushTracesWithStatus(ctx context.Context, td ptrace.Traces) error {
if err := e.pushTraces(ctx, td); err != nil {
componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err))
return err
}
componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK))
return nil
}

func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) (err error) {
defer func() {
e.reportStatusFromError(err)
}()
func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error {
req := pmetricotlp.NewExportRequestFromMetrics(md)
resp, respErr := e.metricExporter.Export(e.enhanceContext(ctx), req, e.callOptions...)
if err = processError(respErr); err != nil {
return
if err := processError(respErr); err != nil {
return err
}
partialSuccess := resp.PartialSuccess()
if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedDataPoints() == 0) {
Expand All @@ -127,17 +130,23 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) (err
zap.Int64("dropped_data_points", resp.PartialSuccess().RejectedDataPoints()),
)
}
return
return nil
}

func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) (err error) {
defer func() {
e.reportStatusFromError(err)
}()
func (e *baseExporter) pushMetricsWithStatus(ctx context.Context, md pmetric.Metrics) error {
if err := e.pushMetrics(ctx, md); err != nil {
componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err))
return err
}
componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK))
return nil
}

func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
req := plogotlp.NewExportRequestFromLogs(ld)
resp, respErr := e.logExporter.Export(e.enhanceContext(ctx), req, e.callOptions...)
if err = processError(respErr); err != nil {
return
if err := processError(respErr); err != nil {
return err
}
partialSuccess := resp.PartialSuccess()
if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedLogRecords() == 0) {
Expand All @@ -146,7 +155,16 @@ func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) (err error) {
zap.Int64("dropped_log_records", resp.PartialSuccess().RejectedLogRecords()),
)
}
return
return nil
}

func (e *baseExporter) pushLogsWithStatus(ctx context.Context, ld plog.Logs) error {
if err := e.pushLogs(ctx, ld); err != nil {
componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err))
return err
}
componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK))
return nil
}

func (e *baseExporter) pushProfiles(ctx context.Context, td pprofile.Profiles) error {
Expand All @@ -165,6 +183,15 @@ func (e *baseExporter) pushProfiles(ctx context.Context, td pprofile.Profiles) e
return nil
}

func (e *baseExporter) pushProfilesWithStatus(ctx context.Context, td pprofile.Profiles) error {
if err := e.pushProfiles(ctx, td); err != nil {
componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err))
return err
}
componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK))
return nil
}

func (e *baseExporter) enhanceContext(ctx context.Context) context.Context {
if e.metadata.Len() > 0 {
return metadata.NewOutgoingContext(ctx, e.metadata)
Expand Down Expand Up @@ -203,14 +230,6 @@ func processError(err error) error {
return err
}

func (e *baseExporter) reportStatusFromError(err error) {
if err != nil {
componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err))
return
}
componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK))
}

func shouldRetry(code codes.Code, retryInfo *errdetails.RetryInfo) bool {
switch code {
case codes.Canceled,
Expand Down
8 changes: 4 additions & 4 deletions exporter/otlphttpexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func createTraces(
}

return exporterhelper.NewTraces(ctx, set, cfg,
oce.pushTraces,
oce.pushTracesWithStatus,
exporterhelper.WithStart(oce.start),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
// explicitly disable since we rely on http.Client timeout logic.
Expand All @@ -116,7 +116,7 @@ func createMetrics(
}

return exporterhelper.NewMetrics(ctx, set, cfg,
oce.pushMetrics,
oce.pushMetricsWithStatus,
exporterhelper.WithStart(oce.start),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
// explicitly disable since we rely on http.Client timeout logic.
Expand All @@ -141,7 +141,7 @@ func createLogs(
}

return exporterhelper.NewLogs(ctx, set, cfg,
oce.pushLogs,
oce.pushLogsWithStatus,
exporterhelper.WithStart(oce.start),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
// explicitly disable since we rely on http.Client timeout logic.
Expand All @@ -167,7 +167,7 @@ func createProfiles(
}

return exporterhelperprofiles.NewProfilesExporter(ctx, set, cfg,
oce.pushProfiles,
oce.pushProfilesWithStatus,
exporterhelper.WithStart(oce.start),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
// explicitly disable since we rely on http.Client timeout logic.
Expand Down
49 changes: 37 additions & 12 deletions exporter/otlphttpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
return e.export(ctx, e.tracesURL, request, e.tracesPartialSuccessHandler)
}

func (e *baseExporter) pushTracesWithStatus(ctx context.Context, td ptrace.Traces) error {
if err := e.pushTraces(ctx, td); err != nil {
componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err))
return err
}
componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK))
return nil
}

func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error {
tr := pmetricotlp.NewExportRequestFromMetrics(md)

Expand All @@ -134,6 +143,15 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro
return e.export(ctx, e.metricsURL, request, e.metricsPartialSuccessHandler)
}

func (e *baseExporter) pushMetricsWithStatus(ctx context.Context, md pmetric.Metrics) error {
if err := e.pushMetrics(ctx, md); err != nil {
componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err))
return err
}
componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK))
return nil
}

func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
tr := plogotlp.NewExportRequestFromLogs(ld)

Expand All @@ -155,6 +173,15 @@ func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
return e.export(ctx, e.logsURL, request, e.logsPartialSuccessHandler)
}

func (e *baseExporter) pushLogsWithStatus(ctx context.Context, ld plog.Logs) error {
if err := e.pushLogs(ctx, ld); err != nil {
componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err))
return err
}
componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK))
return nil
}

func (e *baseExporter) pushProfiles(ctx context.Context, td pprofile.Profiles) error {
tr := pprofileotlp.NewExportRequestFromProfiles(td)

Expand All @@ -176,10 +203,16 @@ func (e *baseExporter) pushProfiles(ctx context.Context, td pprofile.Profiles) e
return e.export(ctx, e.profilesURL, request, e.profilesPartialSuccessHandler)
}

func (e *baseExporter) export(ctx context.Context, url string, request []byte, partialSuccessHandler partialSuccessHandler) (err error) {
defer func() {
e.reportStatusFromError(err)
}()
func (e *baseExporter) pushProfilesWithStatus(ctx context.Context, td pprofile.Profiles) error {
if err := e.pushProfiles(ctx, td); err != nil {
componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err))
return err
}
componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK))
return nil
}

func (e *baseExporter) export(ctx context.Context, url string, request []byte, partialSuccessHandler partialSuccessHandler) error {
e.logger.Debug("Preparing to make HTTP request", zap.String("url", url))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(request))
if err != nil {
Expand Down Expand Up @@ -248,14 +281,6 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte, p
return consumererror.NewPermanent(formattedErr)
}

func (e *baseExporter) reportStatusFromError(err error) {
if err != nil {
componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err))
return
}
componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK))
}

// Determine if the status code is retryable according to the specification.
// For more, see https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#failures-1
func isRetryableStatusCode(code int) bool {
Expand Down

0 comments on commit 0942592

Please sign in to comment.