Skip to content

Commit

Permalink
[service/extensions] extension lifecycle order (#8768)
Browse files Browse the repository at this point in the history
**Description**:
Enforce order of start and shutdown of extensions according to their
internally declared dependencies

**Link to tracking Issue**:
Resolves #8732

**Motivation**:
This is an alternative approach to #8733 which uses declaration order in
the config to start extensions. That approach (a) enforces order when
it's not always necessary to enforce, and (b) exposes unnecessary
complexity to the user by making them responsible for the order.

This PR instead derives the desired order of extensions based on the
dependencies they declare by implementing a `DependentExtension`
interface. That means that extensions that must depend on others can
expose this interface and be guaranteed to start after their
dependencies, while other extensions can be started in arbitrary order
(same as happens today because of iterating over a map).

The extensions that have dependencies have two options to expose them:
1. if the dependency is always static (e.g. `jaeger_query` extension
depending on `jaeger_storage` as in the OP), the extension can express
this statically as well, by returning a predefined ID of the dependent
extension
2. in cases where dependencies are dynamic, the extension can read the
names of the dependencies from its configuration.

The 2nd scenario is illustrated by the following configuration. Here
each complex extension knows that it needs dependencies that implement
`storage` and `encoding` interfaces (both existing APIs in collector &
contrib), but does not know statically which instances of those, the
actual names are supplied by the user in the configuration.

```yaml
extensions:
  complex_extension_1:
    storage: filestorage
    encoding: otlpencoding
  complex_extension_2:
    storage: dbstorage
    encoding: jsonencoding
  filestorage:
    ...
  dbstorage:
    ...
  otlpencoding:
  jsonencoding:
```

**Changes**:
* Introduce `DependentExtension` optional interface 
* Change `Extensions` constructor to derive the required order using a
directed graph (similar to pipelines)
* Inherited from #8733 - use new ordered list of IDs to
start/stop/notify extensions in the desired order (previously a map was
used to iterate over, which resulted in random order).
* Tests

**Testing**:
Unit tests

---------

Signed-off-by: Yuri Shkuro <[email protected]>
Co-authored-by: Antoine Toulme <[email protected]>
  • Loading branch information
yurishkuro and atoulme authored Nov 2, 2023
1 parent 29c16a2 commit 2e44da3
Show file tree
Hide file tree
Showing 7 changed files with 351 additions and 42 deletions.
25 changes: 25 additions & 0 deletions .chloggen/extension-lifecycle-order.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service/extensions

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Allow extensions to declare dependencies on other extensions and guarantee start/stop/notification order accordingly.

# One or more tracking issues or pull requests related to the change
issues: [8732]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
55 changes: 28 additions & 27 deletions docs/service-extensions.md
Original file line number Diff line number Diff line change
@@ -1,47 +1,47 @@
# OpenTelemetry Collector: Extensions

Besides the pipeline elements (receivers, processors, and exporters) the Collector
uses various service extensions (e.g.: healthcheck, z-pages, etc).
uses various service extensions (e.g.: healthcheck, z-pages, etc).
This document describes the “extensions” design and how they are implemented.

## Configuration and Interface

The configuration follows the same pattern used for pipelines: a base
configuration type and the creation of factories to instantiate the extension
The configuration follows the same pattern used for pipelines: a base
configuration type and the creation of factories to instantiate the extension
objects.

In order to support generic service extensions an interface is defined
In order to support generic service extensions an interface is defined
so the service can interact uniformly with these. At minimum service extensions
need to implement the interface that covers Start and Shutdown.

In addition to this base interface there is support to notify extensions when
pipelines are “ready” and when they are about to be stopped, i.e.: “not ready”
to receive data. These are a necessary addition to allow implementing extensions
that indicate to LBs and external systems if the service instance is ready or
not to receive data
(e.g.: a [k8s readiness probe](https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/#define-readiness-probes)).
These state changes are under the control of the service server hosting
need to implement the interface that covers Start and Shutdown.

In addition to this base interface there is support to notify extensions when
pipelines are “ready” and when they are about to be stopped, i.e.: “not ready”
to receive data. These are a necessary addition to allow implementing extensions
that indicate to LBs and external systems if the service instance is ready or
not to receive data
(e.g.: a [k8s readiness probe](https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/#define-readiness-probes)).
These state changes are under the control of the service server hosting
the extensions.

There are more complex scenarios in which there can be notifications of state
changes from the extensions to their host. These more complex cases are not
There are more complex scenarios in which there can be notifications of state
changes from the extensions to their host. These more complex cases are not
supported at this moment, but this design doesn’t prevent such extensions in the
future[^1].


## Collector State and Extensions

The diagram below shows the basic state transitions of the OpenTelemetry Collector
The diagram below shows the basic state transitions of the OpenTelemetry Collector
and how it will interact with the service extensions.

![ServiceLifeCycle](images/design-service-lifecycle.png)


## Configuration

The config package will be extended to load the service extensions when the
configuration is loaded. The settings for service extensions will live in the
same configuration file as the pipeline elements. Below is an example of how
The config package will be extended to load the service extensions when the
configuration is loaded. The settings for service extensions will live in the
same configuration file as the pipeline elements. Below is an example of how
these sections would look like in the configuration file:

```yaml
Expand All @@ -61,17 +61,18 @@ extensions:
# The service lists extensions not directly related to data pipelines, but used
# by the service.
service:
# extensions lists the extensions added to the service. They are started
# in the order presented below and stopped in the reverse order.
# extensions lists the extensions added to the service. The start-up order respects
# any internal dependencies between extensions, but otherwise is arbitrary.
# The stop order is always the reverse of the start-up order.
extensions: [health_check, pprof, zpages]
```
The configuration base type does not share any common fields.
The configuration, analogous to pipelines, allows to have multiple extensions of
the same type. Implementers of extensions need to take care to return error
the same type. Implementers of extensions need to take care to return error
if it can only execute a single instance. (Note: the configuration uses composite
key names in the form of `type[/name]`
key names in the form of `type[/name]`
as defined in this [this document](https://docs.google.com/document/d/1NeheFG7DmcUYo_h2vLtNRlia9x5wOJMlV4QKEK05FhQ/edit#)).

The factory follows the same pattern established for pipeline configuration:
Expand All @@ -80,7 +81,7 @@ The factory follows the same pattern established for pipeline configuration:
// Factory is a factory interface for extensions to the service.
type Factory interface {
// Type gets the type of the extension created by this factory.
Type() string
Type() string
// CreateDefaultConfig creates the default configuration for the extension.
CreateDefaultConfig() config.Extension
Expand All @@ -93,7 +94,7 @@ type Factory interface {

## Extension Interface

The interface defined below is the minimum required for
The interface defined below is the minimum required for
extensions in use on the service:

```go
Expand Down Expand Up @@ -141,6 +142,6 @@ type Host interface {
## Notes

[^1]:
This can be done by adding specific interfaces to extension types that support
those and having the service checking which of the extension instances support
This can be done by adding specific interfaces to extension types that support
those and having the service checking which of the extension instances support
each interface.
8 changes: 8 additions & 0 deletions extension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ import (
// to the service, examples: health check endpoint, z-pages, etc.
type Extension = component.Component

// Dependent is an optional interface that can be implemented by extensions
// that depend on other extensions and must be started only after their dependencies.
// See https://github.com/open-telemetry/opentelemetry-collector/pull/8768 for examples.
type Dependent interface {
Extension
Dependencies() []component.ID
}

// PipelineWatcher is an extra interface for Extension hosted by the OpenTelemetry
// Collector that is to be implemented by extensions interested in changes to pipeline
// states. Typically this will be used by extensions that change their behavior if data is
Expand Down
42 changes: 27 additions & 15 deletions service/extensions/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,20 @@ const zExtensionName = "zextensionname"

// Extensions is a map of extensions created from extension configs.
type Extensions struct {
telemetry servicetelemetry.TelemetrySettings
extMap map[component.ID]extension.Extension
instanceIDs map[component.ID]*component.InstanceID
telemetry servicetelemetry.TelemetrySettings
extMap map[component.ID]extension.Extension
instanceIDs map[component.ID]*component.InstanceID
extensionIDs []component.ID // start order (and reverse stop order)
}

// Start starts all extensions.
func (bes *Extensions) Start(ctx context.Context, host component.Host) error {
bes.telemetry.Logger.Info("Starting extensions...")
for extID, ext := range bes.extMap {
for _, extID := range bes.extensionIDs {
extLogger := components.ExtensionLogger(bes.telemetry.Logger, extID)
extLogger.Info("Extension is starting...")
instanceID := bes.instanceIDs[extID]
ext := bes.extMap[extID]
_ = bes.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStarting))
if err := ext.Start(ctx, components.NewHostWrapper(host, extLogger)); err != nil {
_ = bes.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(err))
Expand All @@ -49,8 +51,10 @@ func (bes *Extensions) Start(ctx context.Context, host component.Host) error {
func (bes *Extensions) Shutdown(ctx context.Context) error {
bes.telemetry.Logger.Info("Stopping extensions...")
var errs error
for extID, ext := range bes.extMap {
for i := len(bes.extensionIDs) - 1; i >= 0; i-- {
extID := bes.extensionIDs[i]
instanceID := bes.instanceIDs[extID]
ext := bes.extMap[extID]
_ = bes.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopping))
if err := ext.Shutdown(ctx); err != nil {
_ = bes.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(err))
Expand All @@ -64,7 +68,8 @@ func (bes *Extensions) Shutdown(ctx context.Context) error {
}

func (bes *Extensions) NotifyPipelineReady() error {
for extID, ext := range bes.extMap {
for _, extID := range bes.extensionIDs {
ext := bes.extMap[extID]
if pw, ok := ext.(extension.PipelineWatcher); ok {
if err := pw.Ready(); err != nil {
return fmt.Errorf("failed to notify extension %q: %w", extID, err)
Expand All @@ -75,9 +80,9 @@ func (bes *Extensions) NotifyPipelineReady() error {
}

func (bes *Extensions) NotifyPipelineNotReady() error {
// Notify extensions in reverse order.
var errs error
for _, ext := range bes.extMap {
for _, extID := range bes.extensionIDs {
ext := bes.extMap[extID]
if pw, ok := ext.(extension.PipelineWatcher); ok {
errs = multierr.Append(errs, pw.NotReady())
}
Expand All @@ -87,7 +92,8 @@ func (bes *Extensions) NotifyPipelineNotReady() error {

func (bes *Extensions) NotifyConfig(ctx context.Context, conf *confmap.Conf) error {
var errs error
for _, ext := range bes.extMap {
for _, extID := range bes.extensionIDs {
ext := bes.extMap[extID]
if cw, ok := ext.(extension.ConfigWatcher); ok {
clonedConf := confmap.NewFromStringMap(conf.ToStringMap())
errs = multierr.Append(errs, cw.NotifyConfig(ctx, clonedConf))
Expand All @@ -97,7 +103,8 @@ func (bes *Extensions) NotifyConfig(ctx context.Context, conf *confmap.Conf) err
}

func (bes *Extensions) NotifyComponentStatusChange(source *component.InstanceID, event *component.StatusEvent) {
for _, ext := range bes.extMap {
for _, extID := range bes.extensionIDs {
ext := bes.extMap[extID]
if sw, ok := ext.(extension.StatusWatcher); ok {
sw.ComponentStatusChanged(source, event)
}
Expand All @@ -120,7 +127,7 @@ func (bes *Extensions) HandleZPages(w http.ResponseWriter, r *http.Request) {
data := zpages.SummaryExtensionsTableData{}

data.Rows = make([]zpages.SummaryExtensionsTableRowData, 0, len(bes.extMap))
for id := range bes.extMap {
for _, id := range bes.extensionIDs {
row := zpages.SummaryExtensionsTableRowData{FullName: id.String()}
data.Rows = append(data.Rows, row)
}
Expand Down Expand Up @@ -150,9 +157,10 @@ type Settings struct {
// New creates a new Extensions from Config.
func New(ctx context.Context, set Settings, cfg Config) (*Extensions, error) {
exts := &Extensions{
telemetry: set.Telemetry,
extMap: make(map[component.ID]extension.Extension),
instanceIDs: make(map[component.ID]*component.InstanceID),
telemetry: set.Telemetry,
extMap: make(map[component.ID]extension.Extension),
instanceIDs: make(map[component.ID]*component.InstanceID),
extensionIDs: make([]component.ID, 0, len(cfg)),
}
for _, extID := range cfg {
instanceID := &component.InstanceID{
Expand All @@ -179,6 +187,10 @@ func New(ctx context.Context, set Settings, cfg Config) (*Extensions, error) {
exts.extMap[extID] = ext
exts.instanceIDs[extID] = instanceID
}

order, err := computeOrder(exts)
if err != nil {
return nil, err
}
exts.extensionIDs = order
return exts, nil
}
Loading

0 comments on commit 2e44da3

Please sign in to comment.