From 8efff4878d29d332e270451c2ae96dd8ef247967 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Mon, 6 May 2024 08:56:34 -0700 Subject: [PATCH] [chore] remove duplicate code from the connector (#10082) Signed-off-by: Bogdan Drutu Co-authored-by: Alex Boten <223565+codeboten@users.noreply.github.com> --- connector/logs_router.go | 10 +++---- connector/metrics_router.go | 39 ++------------------------ connector/router.go | 55 +++++++++++++++++++++++++++++++++++++ connector/traces_router.go | 43 ++++------------------------- 4 files changed, 68 insertions(+), 79 deletions(-) create mode 100644 connector/router.go diff --git a/connector/logs_router.go b/connector/logs_router.go index 2d642c94315..0db9ea7799d 100644 --- a/connector/logs_router.go +++ b/connector/logs_router.go @@ -23,17 +23,17 @@ type LogsRouterAndConsumer interface { type logsRouter struct { consumer.Logs - consumers map[component.ID]consumer.Logs + baseRouter[consumer.Logs] } func NewLogsRouter(cm map[component.ID]consumer.Logs) LogsRouterAndConsumer { consumers := make([]consumer.Logs, 0, len(cm)) - for _, consumer := range cm { - consumers = append(consumers, consumer) + for _, cons := range cm { + consumers = append(consumers, cons) } return &logsRouter{ - Logs: fanoutconsumer.NewLogs(consumers), - consumers: cm, + Logs: fanoutconsumer.NewLogs(consumers), + baseRouter: newBaseRouter(fanoutconsumer.NewLogs, cm), } } diff --git a/connector/metrics_router.go b/connector/metrics_router.go index ad572782225..3e688261bfe 100644 --- a/connector/metrics_router.go +++ b/connector/metrics_router.go @@ -4,10 +4,6 @@ package connector // import "go.opentelemetry.io/collector/connector" import ( - "fmt" - - "go.uber.org/multierr" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/internal/fanoutconsumer" @@ -23,7 +19,7 @@ type MetricsRouterAndConsumer interface { type metricsRouter struct { consumer.Metrics - consumers map[component.ID]consumer.Metrics + baseRouter[consumer.Metrics] } func NewMetricsRouter(cm map[component.ID]consumer.Metrics) MetricsRouterAndConsumer { @@ -32,38 +28,9 @@ func NewMetricsRouter(cm map[component.ID]consumer.Metrics) MetricsRouterAndCons consumers = append(consumers, cons) } return &metricsRouter{ - Metrics: fanoutconsumer.NewMetrics(consumers), - consumers: cm, - } -} - -func (r *metricsRouter) PipelineIDs() []component.ID { - ids := make([]component.ID, 0, len(r.consumers)) - for id := range r.consumers { - ids = append(ids, id) - } - return ids -} - -func (r *metricsRouter) Consumer(pipelineIDs ...component.ID) (consumer.Metrics, error) { - if len(pipelineIDs) == 0 { - return nil, fmt.Errorf("missing consumers") - } - consumers := make([]consumer.Metrics, 0, len(pipelineIDs)) - var errors error - for _, pipelineID := range pipelineIDs { - c, ok := r.consumers[pipelineID] - if ok { - consumers = append(consumers, c) - } else { - errors = multierr.Append(errors, fmt.Errorf("missing consumer: %q", pipelineID)) - } - } - if errors != nil { - // TODO potentially this could return a NewMetrics with the valid consumers - return nil, errors + Metrics: fanoutconsumer.NewMetrics(consumers), + baseRouter: newBaseRouter(fanoutconsumer.NewMetrics, cm), } - return fanoutconsumer.NewMetrics(consumers), nil } func (r *metricsRouter) privateFunc() {} diff --git a/connector/router.go b/connector/router.go new file mode 100644 index 00000000000..bba7ee76bba --- /dev/null +++ b/connector/router.go @@ -0,0 +1,55 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package connector // import "go.opentelemetry.io/collector/connector" + +import ( + "fmt" + + "go.uber.org/multierr" + + "go.opentelemetry.io/collector/component" +) + +type baseRouter[T any] struct { + fanout func([]T) T + consumers map[component.ID]T +} + +func newBaseRouter[T any](fanout func([]T) T, cm map[component.ID]T) baseRouter[T] { + consumers := make(map[component.ID]T, len(cm)) + for k, v := range cm { + consumers[k] = v + } + return baseRouter[T]{fanout: fanout, consumers: consumers} +} + +func (r *baseRouter[T]) PipelineIDs() []component.ID { + ids := make([]component.ID, 0, len(r.consumers)) + for id := range r.consumers { + ids = append(ids, id) + } + return ids +} + +func (r *baseRouter[T]) Consumer(pipelineIDs ...component.ID) (T, error) { + var ret T + if len(pipelineIDs) == 0 { + return ret, fmt.Errorf("missing consumers") + } + consumers := make([]T, 0, len(pipelineIDs)) + var errors error + for _, pipelineID := range pipelineIDs { + c, ok := r.consumers[pipelineID] + if ok { + consumers = append(consumers, c) + } else { + errors = multierr.Append(errors, fmt.Errorf("missing consumer: %q", pipelineID)) + } + } + if errors != nil { + // TODO potentially this could return a NewTraces with the valid consumers + return ret, errors + } + return r.fanout(consumers), nil +} diff --git a/connector/traces_router.go b/connector/traces_router.go index 293fd723dc7..84eb889c05a 100644 --- a/connector/traces_router.go +++ b/connector/traces_router.go @@ -4,10 +4,6 @@ package connector // import "go.opentelemetry.io/collector/connector" import ( - "fmt" - - "go.uber.org/multierr" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/internal/fanoutconsumer" @@ -23,47 +19,18 @@ type TracesRouterAndConsumer interface { type tracesRouter struct { consumer.Traces - consumers map[component.ID]consumer.Traces + baseRouter[consumer.Traces] } func NewTracesRouter(cm map[component.ID]consumer.Traces) TracesRouterAndConsumer { consumers := make([]consumer.Traces, 0, len(cm)) - for _, c := range cm { - consumers = append(consumers, c) + for _, cons := range cm { + consumers = append(consumers, cons) } return &tracesRouter{ - Traces: fanoutconsumer.NewTraces(consumers), - consumers: cm, - } -} - -func (r *tracesRouter) PipelineIDs() []component.ID { - ids := make([]component.ID, 0, len(r.consumers)) - for id := range r.consumers { - ids = append(ids, id) - } - return ids -} - -func (r *tracesRouter) Consumer(pipelineIDs ...component.ID) (consumer.Traces, error) { - if len(pipelineIDs) == 0 { - return nil, fmt.Errorf("missing consumers") - } - consumers := make([]consumer.Traces, 0, len(pipelineIDs)) - var errors error - for _, pipelineID := range pipelineIDs { - c, ok := r.consumers[pipelineID] - if ok { - consumers = append(consumers, c) - } else { - errors = multierr.Append(errors, fmt.Errorf("missing consumer: %q", pipelineID)) - } - } - if errors != nil { - // TODO potentially this could return a NewTraces with the valid consumers - return nil, errors + Traces: fanoutconsumer.NewTraces(consumers), + baseRouter: newBaseRouter(fanoutconsumer.NewTraces, cm), } - return fanoutconsumer.NewTraces(consumers), nil } func (r *tracesRouter) privateFunc() {}