diff --git a/service/internal/graph/capabilities.go b/service/internal/graph/capabilities.go new file mode 100644 index 00000000000..8a16ae67853 --- /dev/null +++ b/service/internal/graph/capabilities.go @@ -0,0 +1,40 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package graph // import "go.opentelemetry.io/collector/service/internal/graph" + +import ( + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumerprofiles" + "go.opentelemetry.io/collector/pipeline" +) + +const capabilitiesSeed = "capabilities" + +var _ consumerNode = (*capabilitiesNode)(nil) + +// Every pipeline has a "virtual" capabilities node immediately after the receiver(s). +// There are two purposes for this node: +// 1. Present aggregated capabilities to receivers, such as whether the pipeline mutates data. +// 2. Present a consistent "first consumer" for each pipeline. +// The nodeID is derived from "pipeline ID". +type capabilitiesNode struct { + nodeID + pipelineID pipeline.ID + baseConsumer + consumer.ConsumeTracesFunc + consumer.ConsumeMetricsFunc + consumer.ConsumeLogsFunc + consumerprofiles.ConsumeProfilesFunc +} + +func newCapabilitiesNode(pipelineID pipeline.ID) *capabilitiesNode { + return &capabilitiesNode{ + nodeID: newNodeID(capabilitiesSeed, pipelineID.String()), + pipelineID: pipelineID, + } +} + +func (n *capabilitiesNode) getConsumer() baseConsumer { + return n +} diff --git a/service/internal/graph/nodes.go b/service/internal/graph/connector.go similarity index 51% rename from service/internal/graph/nodes.go rename to service/internal/graph/connector.go index 89580dab487..8de04f1994e 100644 --- a/service/internal/graph/nodes.go +++ b/service/internal/graph/connector.go @@ -5,9 +5,6 @@ package graph // import "go.opentelemetry.io/collector/service/internal/graph" import ( "context" - "fmt" - "hash/fnv" - "strings" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentprofiles" @@ -15,205 +12,13 @@ import ( "go.opentelemetry.io/collector/connector/connectorprofiles" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumerprofiles" - "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/internal/fanoutconsumer" "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/processor" - "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/service/internal/builders" "go.opentelemetry.io/collector/service/internal/capabilityconsumer" "go.opentelemetry.io/collector/service/internal/components" ) -const ( - receiverSeed = "receiver" - processorSeed = "processor" - exporterSeed = "exporter" - connectorSeed = "connector" - capabilitiesSeed = "capabilities" - fanOutToExporters = "fanout_to_exporters" -) - -// baseConsumer redeclared here since not public in consumer package. May consider to make that public. -type baseConsumer interface { - Capabilities() consumer.Capabilities -} - -type nodeID int64 - -func (n nodeID) ID() int64 { - return int64(n) -} - -func newNodeID(parts ...string) nodeID { - h := fnv.New64a() - h.Write([]byte(strings.Join(parts, "|"))) - return nodeID(h.Sum64()) -} - -type consumerNode interface { - getConsumer() baseConsumer -} - -// A receiver instance can be shared by multiple pipelines of the same type. -// Therefore, nodeID is derived from "pipeline type" and "component ID". -type receiverNode struct { - nodeID - componentID component.ID - pipelineType pipeline.Signal - component.Component -} - -func newReceiverNode(pipelineType pipeline.Signal, recvID component.ID) *receiverNode { - return &receiverNode{ - nodeID: newNodeID(receiverSeed, pipelineType.String(), recvID.String()), - componentID: recvID, - pipelineType: pipelineType, - } -} - -func (n *receiverNode) buildComponent(ctx context.Context, - tel component.TelemetrySettings, - info component.BuildInfo, - builder *builders.ReceiverBuilder, - nexts []baseConsumer, -) error { - tel.Logger = components.ReceiverLogger(tel.Logger, n.componentID, n.pipelineType) - set := receiver.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} - var err error - switch n.pipelineType { - case pipeline.SignalTraces: - var consumers []consumer.Traces - for _, next := range nexts { - consumers = append(consumers, next.(consumer.Traces)) - } - n.Component, err = builder.CreateTraces(ctx, set, fanoutconsumer.NewTraces(consumers)) - case pipeline.SignalMetrics: - var consumers []consumer.Metrics - for _, next := range nexts { - consumers = append(consumers, next.(consumer.Metrics)) - } - n.Component, err = builder.CreateMetrics(ctx, set, fanoutconsumer.NewMetrics(consumers)) - case pipeline.SignalLogs: - var consumers []consumer.Logs - for _, next := range nexts { - consumers = append(consumers, next.(consumer.Logs)) - } - n.Component, err = builder.CreateLogs(ctx, set, fanoutconsumer.NewLogs(consumers)) - case componentprofiles.SignalProfiles: - var consumers []consumerprofiles.Profiles - for _, next := range nexts { - consumers = append(consumers, next.(consumerprofiles.Profiles)) - } - n.Component, err = builder.CreateProfiles(ctx, set, fanoutconsumer.NewProfiles(consumers)) - default: - return fmt.Errorf("error creating receiver %q for data type %q is not supported", set.ID, n.pipelineType) - } - if err != nil { - return fmt.Errorf("failed to create %q receiver for data type %q: %w", set.ID, n.pipelineType, err) - } - return nil -} - -var _ consumerNode = (*processorNode)(nil) - -// Every processor instance is unique to one pipeline. -// Therefore, nodeID is derived from "pipeline ID" and "component ID". -type processorNode struct { - nodeID - componentID component.ID - pipelineID pipeline.ID - component.Component -} - -func newProcessorNode(pipelineID pipeline.ID, procID component.ID) *processorNode { - return &processorNode{ - nodeID: newNodeID(processorSeed, pipelineID.String(), procID.String()), - componentID: procID, - pipelineID: pipelineID, - } -} - -func (n *processorNode) getConsumer() baseConsumer { - return n.Component.(baseConsumer) -} - -func (n *processorNode) buildComponent(ctx context.Context, - tel component.TelemetrySettings, - info component.BuildInfo, - builder *builders.ProcessorBuilder, - next baseConsumer, -) error { - tel.Logger = components.ProcessorLogger(tel.Logger, n.componentID, n.pipelineID) - set := processor.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} - var err error - switch n.pipelineID.Signal() { - case pipeline.SignalTraces: - n.Component, err = builder.CreateTraces(ctx, set, next.(consumer.Traces)) - case pipeline.SignalMetrics: - n.Component, err = builder.CreateMetrics(ctx, set, next.(consumer.Metrics)) - case pipeline.SignalLogs: - n.Component, err = builder.CreateLogs(ctx, set, next.(consumer.Logs)) - case componentprofiles.SignalProfiles: - n.Component, err = builder.CreateProfiles(ctx, set, next.(consumerprofiles.Profiles)) - default: - return fmt.Errorf("error creating processor %q in pipeline %q, data type %q is not supported", set.ID, n.pipelineID.String(), n.pipelineID.Signal()) - } - if err != nil { - return fmt.Errorf("failed to create %q processor, in pipeline %q: %w", set.ID, n.pipelineID.String(), err) - } - return nil -} - -var _ consumerNode = (*exporterNode)(nil) - -// An exporter instance can be shared by multiple pipelines of the same type. -// Therefore, nodeID is derived from "pipeline type" and "component ID". -type exporterNode struct { - nodeID - componentID component.ID - pipelineType pipeline.Signal - component.Component -} - -func newExporterNode(pipelineType pipeline.Signal, exprID component.ID) *exporterNode { - return &exporterNode{ - nodeID: newNodeID(exporterSeed, pipelineType.String(), exprID.String()), - componentID: exprID, - pipelineType: pipelineType, - } -} - -func (n *exporterNode) getConsumer() baseConsumer { - return n.Component.(baseConsumer) -} - -func (n *exporterNode) buildComponent( - ctx context.Context, - tel component.TelemetrySettings, - info component.BuildInfo, - builder *builders.ExporterBuilder, -) error { - tel.Logger = components.ExporterLogger(tel.Logger, n.componentID, n.pipelineType) - set := exporter.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} - var err error - switch n.pipelineType { - case pipeline.SignalTraces: - n.Component, err = builder.CreateTraces(ctx, set) - case pipeline.SignalMetrics: - n.Component, err = builder.CreateMetrics(ctx, set) - case pipeline.SignalLogs: - n.Component, err = builder.CreateLogs(ctx, set) - case componentprofiles.SignalProfiles: - n.Component, err = builder.CreateProfiles(ctx, set) - default: - return fmt.Errorf("error creating exporter %q for data type %q is not supported", set.ID, n.pipelineType) - } - if err != nil { - return fmt.Errorf("failed to create %q exporter for data type %q: %w", set.ID, n.pipelineType, err) - } - return nil -} +const connectorSeed = "connector" var _ consumerNode = (*connectorNode)(nil) @@ -420,52 +225,3 @@ func (n *connectorNode) buildComponent( } return nil } - -var _ consumerNode = (*capabilitiesNode)(nil) - -// Every pipeline has a "virtual" capabilities node immediately after the receiver(s). -// There are two purposes for this node: -// 1. Present aggregated capabilities to receivers, such as whether the pipeline mutates data. -// 2. Present a consistent "first consumer" for each pipeline. -// The nodeID is derived from "pipeline ID". -type capabilitiesNode struct { - nodeID - pipelineID pipeline.ID - baseConsumer - consumer.ConsumeTracesFunc - consumer.ConsumeMetricsFunc - consumer.ConsumeLogsFunc - consumerprofiles.ConsumeProfilesFunc -} - -func newCapabilitiesNode(pipelineID pipeline.ID) *capabilitiesNode { - return &capabilitiesNode{ - nodeID: newNodeID(capabilitiesSeed, pipelineID.String()), - pipelineID: pipelineID, - } -} - -func (n *capabilitiesNode) getConsumer() baseConsumer { - return n -} - -var _ consumerNode = (*fanOutNode)(nil) - -// Each pipeline has one fan-out node before exporters. -// Therefore, nodeID is derived from "pipeline ID". -type fanOutNode struct { - nodeID - pipelineID pipeline.ID - baseConsumer -} - -func newFanOutNode(pipelineID pipeline.ID) *fanOutNode { - return &fanOutNode{ - nodeID: newNodeID(fanOutToExporters, pipelineID.String()), - pipelineID: pipelineID, - } -} - -func (n *fanOutNode) getConsumer() baseConsumer { - return n.baseConsumer -} diff --git a/service/internal/graph/consumer.go b/service/internal/graph/consumer.go new file mode 100644 index 00000000000..2cdce4a7534 --- /dev/null +++ b/service/internal/graph/consumer.go @@ -0,0 +1,17 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package graph // import "go.opentelemetry.io/collector/service/internal/graph" + +import ( + "go.opentelemetry.io/collector/consumer" +) + +// baseConsumer redeclared here since not public in consumer package. May consider to make that public. +type baseConsumer interface { + Capabilities() consumer.Capabilities +} + +type consumerNode interface { + getConsumer() baseConsumer +} diff --git a/service/internal/graph/exporter.go b/service/internal/graph/exporter.go new file mode 100644 index 00000000000..04532a81992 --- /dev/null +++ b/service/internal/graph/exporter.go @@ -0,0 +1,68 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package graph // import "go.opentelemetry.io/collector/service/internal/graph" + +import ( + "context" + "fmt" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentprofiles" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/service/internal/builders" + "go.opentelemetry.io/collector/service/internal/components" +) + +const exporterSeed = "exporter" + +var _ consumerNode = (*exporterNode)(nil) + +// An exporter instance can be shared by multiple pipelines of the same type. +// Therefore, nodeID is derived from "pipeline type" and "component ID". +type exporterNode struct { + nodeID + componentID component.ID + pipelineType pipeline.Signal + component.Component +} + +func newExporterNode(pipelineType pipeline.Signal, exprID component.ID) *exporterNode { + return &exporterNode{ + nodeID: newNodeID(exporterSeed, pipelineType.String(), exprID.String()), + componentID: exprID, + pipelineType: pipelineType, + } +} + +func (n *exporterNode) getConsumer() baseConsumer { + return n.Component.(baseConsumer) +} + +func (n *exporterNode) buildComponent( + ctx context.Context, + tel component.TelemetrySettings, + info component.BuildInfo, + builder *builders.ExporterBuilder, +) error { + tel.Logger = components.ExporterLogger(tel.Logger, n.componentID, n.pipelineType) + set := exporter.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} + var err error + switch n.pipelineType { + case pipeline.SignalTraces: + n.Component, err = builder.CreateTraces(ctx, set) + case pipeline.SignalMetrics: + n.Component, err = builder.CreateMetrics(ctx, set) + case pipeline.SignalLogs: + n.Component, err = builder.CreateLogs(ctx, set) + case componentprofiles.SignalProfiles: + n.Component, err = builder.CreateProfiles(ctx, set) + default: + return fmt.Errorf("error creating exporter %q for data type %q is not supported", set.ID, n.pipelineType) + } + if err != nil { + return fmt.Errorf("failed to create %q exporter for data type %q: %w", set.ID, n.pipelineType, err) + } + return nil +} diff --git a/service/internal/graph/fanout.go b/service/internal/graph/fanout.go new file mode 100644 index 00000000000..13c8d4ad1c5 --- /dev/null +++ b/service/internal/graph/fanout.go @@ -0,0 +1,31 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package graph // import "go.opentelemetry.io/collector/service/internal/graph" + +import ( + "go.opentelemetry.io/collector/pipeline" +) + +const fanOutToExporters = "fanout_to_exporters" + +var _ consumerNode = (*fanOutNode)(nil) + +// Each pipeline has one fan-out node before exporters. +// Therefore, nodeID is derived from "pipeline ID". +type fanOutNode struct { + nodeID + pipelineID pipeline.ID + baseConsumer +} + +func newFanOutNode(pipelineID pipeline.ID) *fanOutNode { + return &fanOutNode{ + nodeID: newNodeID(fanOutToExporters, pipelineID.String()), + pipelineID: pipelineID, + } +} + +func (n *fanOutNode) getConsumer() baseConsumer { + return n.baseConsumer +} diff --git a/service/internal/graph/node.go b/service/internal/graph/node.go new file mode 100644 index 00000000000..0e17bb74bf5 --- /dev/null +++ b/service/internal/graph/node.go @@ -0,0 +1,21 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package graph // import "go.opentelemetry.io/collector/service/internal/graph" + +import ( + "hash/fnv" + "strings" +) + +type nodeID int64 + +func (n nodeID) ID() int64 { + return int64(n) +} + +func newNodeID(parts ...string) nodeID { + h := fnv.New64a() + h.Write([]byte(strings.Join(parts, "|"))) + return nodeID(h.Sum64()) +} diff --git a/service/internal/graph/processor.go b/service/internal/graph/processor.go new file mode 100644 index 00000000000..a20e8e8dfd8 --- /dev/null +++ b/service/internal/graph/processor.go @@ -0,0 +1,70 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package graph // import "go.opentelemetry.io/collector/service/internal/graph" + +import ( + "context" + "fmt" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentprofiles" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumerprofiles" + "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/service/internal/builders" + "go.opentelemetry.io/collector/service/internal/components" +) + +const processorSeed = "processor" + +var _ consumerNode = (*processorNode)(nil) + +// Every processor instance is unique to one pipeline. +// Therefore, nodeID is derived from "pipeline ID" and "component ID". +type processorNode struct { + nodeID + componentID component.ID + pipelineID pipeline.ID + component.Component +} + +func newProcessorNode(pipelineID pipeline.ID, procID component.ID) *processorNode { + return &processorNode{ + nodeID: newNodeID(processorSeed, pipelineID.String(), procID.String()), + componentID: procID, + pipelineID: pipelineID, + } +} + +func (n *processorNode) getConsumer() baseConsumer { + return n.Component.(baseConsumer) +} + +func (n *processorNode) buildComponent(ctx context.Context, + tel component.TelemetrySettings, + info component.BuildInfo, + builder *builders.ProcessorBuilder, + next baseConsumer, +) error { + tel.Logger = components.ProcessorLogger(tel.Logger, n.componentID, n.pipelineID) + set := processor.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} + var err error + switch n.pipelineID.Signal() { + case pipeline.SignalTraces: + n.Component, err = builder.CreateTraces(ctx, set, next.(consumer.Traces)) + case pipeline.SignalMetrics: + n.Component, err = builder.CreateMetrics(ctx, set, next.(consumer.Metrics)) + case pipeline.SignalLogs: + n.Component, err = builder.CreateLogs(ctx, set, next.(consumer.Logs)) + case componentprofiles.SignalProfiles: + n.Component, err = builder.CreateProfiles(ctx, set, next.(consumerprofiles.Profiles)) + default: + return fmt.Errorf("error creating processor %q in pipeline %q, data type %q is not supported", set.ID, n.pipelineID.String(), n.pipelineID.Signal()) + } + if err != nil { + return fmt.Errorf("failed to create %q processor, in pipeline %q: %w", set.ID, n.pipelineID.String(), err) + } + return nil +} diff --git a/service/internal/graph/reciever.go b/service/internal/graph/reciever.go new file mode 100644 index 00000000000..e462671bfc7 --- /dev/null +++ b/service/internal/graph/reciever.go @@ -0,0 +1,81 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package graph // import "go.opentelemetry.io/collector/service/internal/graph" + +import ( + "context" + "fmt" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentprofiles" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumerprofiles" + "go.opentelemetry.io/collector/internal/fanoutconsumer" + "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/service/internal/builders" + "go.opentelemetry.io/collector/service/internal/components" +) + +const receiverSeed = "receiver" + +// A receiver instance can be shared by multiple pipelines of the same type. +// Therefore, nodeID is derived from "pipeline type" and "component ID". +type receiverNode struct { + nodeID + componentID component.ID + pipelineType pipeline.Signal + component.Component +} + +func newReceiverNode(pipelineType pipeline.Signal, recvID component.ID) *receiverNode { + return &receiverNode{ + nodeID: newNodeID(receiverSeed, pipelineType.String(), recvID.String()), + componentID: recvID, + pipelineType: pipelineType, + } +} + +func (n *receiverNode) buildComponent(ctx context.Context, + tel component.TelemetrySettings, + info component.BuildInfo, + builder *builders.ReceiverBuilder, + nexts []baseConsumer, +) error { + tel.Logger = components.ReceiverLogger(tel.Logger, n.componentID, n.pipelineType) + set := receiver.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} + var err error + switch n.pipelineType { + case pipeline.SignalTraces: + var consumers []consumer.Traces + for _, next := range nexts { + consumers = append(consumers, next.(consumer.Traces)) + } + n.Component, err = builder.CreateTraces(ctx, set, fanoutconsumer.NewTraces(consumers)) + case pipeline.SignalMetrics: + var consumers []consumer.Metrics + for _, next := range nexts { + consumers = append(consumers, next.(consumer.Metrics)) + } + n.Component, err = builder.CreateMetrics(ctx, set, fanoutconsumer.NewMetrics(consumers)) + case pipeline.SignalLogs: + var consumers []consumer.Logs + for _, next := range nexts { + consumers = append(consumers, next.(consumer.Logs)) + } + n.Component, err = builder.CreateLogs(ctx, set, fanoutconsumer.NewLogs(consumers)) + case componentprofiles.SignalProfiles: + var consumers []consumerprofiles.Profiles + for _, next := range nexts { + consumers = append(consumers, next.(consumerprofiles.Profiles)) + } + n.Component, err = builder.CreateProfiles(ctx, set, fanoutconsumer.NewProfiles(consumers)) + default: + return fmt.Errorf("error creating receiver %q for data type %q is not supported", set.ID, n.pipelineType) + } + if err != nil { + return fmt.Errorf("failed to create %q receiver for data type %q: %w", set.ID, n.pipelineType, err) + } + return nil +}