forked from open-telemetry/opentelemetry-collector
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[chore][graph] Separate node types (open-telemetry#11321)
Having spent some time on open-telemetry#11311, I think it may be time to start refactoring this codebase into a more maintainable state. This PR just moves the various types of nodes into separate files, which I think is a bit more readable when considering changes.
- Loading branch information
1 parent
121eac3
commit 25bee1a
Showing
8 changed files
with
329 additions
and
245 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package graph // import "go.opentelemetry.io/collector/service/internal/graph" | ||
|
||
import ( | ||
"go.opentelemetry.io/collector/consumer" | ||
"go.opentelemetry.io/collector/consumer/consumerprofiles" | ||
"go.opentelemetry.io/collector/pipeline" | ||
) | ||
|
||
const capabilitiesSeed = "capabilities" | ||
|
||
var _ consumerNode = (*capabilitiesNode)(nil) | ||
|
||
// Every pipeline has a "virtual" capabilities node immediately after the receiver(s). | ||
// There are two purposes for this node: | ||
// 1. Present aggregated capabilities to receivers, such as whether the pipeline mutates data. | ||
// 2. Present a consistent "first consumer" for each pipeline. | ||
// The nodeID is derived from "pipeline ID". | ||
type capabilitiesNode struct { | ||
nodeID | ||
pipelineID pipeline.ID | ||
baseConsumer | ||
consumer.ConsumeTracesFunc | ||
consumer.ConsumeMetricsFunc | ||
consumer.ConsumeLogsFunc | ||
consumerprofiles.ConsumeProfilesFunc | ||
} | ||
|
||
func newCapabilitiesNode(pipelineID pipeline.ID) *capabilitiesNode { | ||
return &capabilitiesNode{ | ||
nodeID: newNodeID(capabilitiesSeed, pipelineID.String()), | ||
pipelineID: pipelineID, | ||
} | ||
} | ||
|
||
func (n *capabilitiesNode) getConsumer() baseConsumer { | ||
return n | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package graph // import "go.opentelemetry.io/collector/service/internal/graph" | ||
|
||
import ( | ||
"go.opentelemetry.io/collector/consumer" | ||
) | ||
|
||
// baseConsumer redeclared here since not public in consumer package. May consider to make that public. | ||
type baseConsumer interface { | ||
Capabilities() consumer.Capabilities | ||
} | ||
|
||
type consumerNode interface { | ||
getConsumer() baseConsumer | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package graph // import "go.opentelemetry.io/collector/service/internal/graph" | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/component/componentprofiles" | ||
"go.opentelemetry.io/collector/exporter" | ||
"go.opentelemetry.io/collector/pipeline" | ||
"go.opentelemetry.io/collector/service/internal/builders" | ||
"go.opentelemetry.io/collector/service/internal/components" | ||
) | ||
|
||
const exporterSeed = "exporter" | ||
|
||
var _ consumerNode = (*exporterNode)(nil) | ||
|
||
// An exporter instance can be shared by multiple pipelines of the same type. | ||
// Therefore, nodeID is derived from "pipeline type" and "component ID". | ||
type exporterNode struct { | ||
nodeID | ||
componentID component.ID | ||
pipelineType pipeline.Signal | ||
component.Component | ||
} | ||
|
||
func newExporterNode(pipelineType pipeline.Signal, exprID component.ID) *exporterNode { | ||
return &exporterNode{ | ||
nodeID: newNodeID(exporterSeed, pipelineType.String(), exprID.String()), | ||
componentID: exprID, | ||
pipelineType: pipelineType, | ||
} | ||
} | ||
|
||
func (n *exporterNode) getConsumer() baseConsumer { | ||
return n.Component.(baseConsumer) | ||
} | ||
|
||
func (n *exporterNode) buildComponent( | ||
ctx context.Context, | ||
tel component.TelemetrySettings, | ||
info component.BuildInfo, | ||
builder *builders.ExporterBuilder, | ||
) error { | ||
tel.Logger = components.ExporterLogger(tel.Logger, n.componentID, n.pipelineType) | ||
set := exporter.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} | ||
var err error | ||
switch n.pipelineType { | ||
case pipeline.SignalTraces: | ||
n.Component, err = builder.CreateTraces(ctx, set) | ||
case pipeline.SignalMetrics: | ||
n.Component, err = builder.CreateMetrics(ctx, set) | ||
case pipeline.SignalLogs: | ||
n.Component, err = builder.CreateLogs(ctx, set) | ||
case componentprofiles.SignalProfiles: | ||
n.Component, err = builder.CreateProfiles(ctx, set) | ||
default: | ||
return fmt.Errorf("error creating exporter %q for data type %q is not supported", set.ID, n.pipelineType) | ||
} | ||
if err != nil { | ||
return fmt.Errorf("failed to create %q exporter for data type %q: %w", set.ID, n.pipelineType, err) | ||
} | ||
return nil | ||
} |
Oops, something went wrong.