Skip to content

Commit

Permalink
Merge branch 'main' into observer/cfgardenobserver-3
Browse files Browse the repository at this point in the history
  • Loading branch information
m1rp authored Oct 31, 2024
2 parents e9e6fcc + 909b1d3 commit 92677c2
Show file tree
Hide file tree
Showing 38 changed files with 446 additions and 87 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Makes the Supervisor's OpAmp server port configurable with 'agent::opamp_server_port'.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36001]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ linters:
- unconvert
- unparam
- unused
- usestdlibvars
- wastedassign

issues:
Expand Down
65 changes: 65 additions & 0 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1443,6 +1443,71 @@ func TestSupervisorLogging(t *testing.T) {
require.NoError(t, logFile.Close())
}

func TestSupervisorOpAmpServerPort(t *testing.T) {
var agentConfig atomic.Value
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
agentConfig.Store(string(config.Body))
}
}

return &protobufs.ServerToAgent{}
},
})

supervisorOpAmpServerPort, err := findRandomPort()
require.NoError(t, err)

s := newSupervisor(t, "server_port", map[string]string{"url": server.addr, "supervisor_opamp_server_port": fmt.Sprintf("%d", supervisorOpAmpServerPort)})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)

cfg, hash, inputFile, outputFile := createSimplePipelineCollectorConf(t)

server.sendToSupervisor(&protobufs.ServerToAgent{
RemoteConfig: &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: cfg.Bytes()},
},
},
ConfigHash: hash,
},
})

require.Eventually(t, func() bool {
cfg, ok := agentConfig.Load().(string)
if ok {
// The effective config may be structurally different compared to what was sent,
// and will also have some data redacted,
// so just check that it includes the filelog receiver
return strings.Contains(cfg, "filelog")
}

return false
}, 5*time.Second, 500*time.Millisecond, "Collector was not started with remote config")

n, err := inputFile.WriteString("{\"body\":\"hello, world\"}\n")
require.NotZero(t, n, "Could not write to input file")
require.NoError(t, err)

require.Eventually(t, func() bool {
logRecord := make([]byte, 1024)
n, _ := outputFile.Read(logRecord)

return n != 0
}, 10*time.Second, 500*time.Millisecond, "Log never appeared in output")
}

func findRandomPort() (int, error) {
l, err := net.Listen("tcp", "localhost:0")

Expand Down
9 changes: 8 additions & 1 deletion cmd/opampsupervisor/specification/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,14 @@ agent:
client.id: "01HWWSK84BMT7J45663MBJMTPJ"
non_identifying_attributes:
custom.attribute: "custom-value"


# The port the Collector's health check extension will be configured to use
health_check_port:

# The port the Supervisor will start its OpAmp server on and the Collector's
# OpAmp extension will connect to
opamp_server_port:

```

### Operation When OpAMP Server is Unavailable
Expand Down
5 changes: 5 additions & 0 deletions cmd/opampsupervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ type Agent struct {
Description AgentDescription `mapstructure:"description"`
BootstrapTimeout time.Duration `mapstructure:"bootstrap_timeout"`
HealthCheckPort int `mapstructure:"health_check_port"`
OpAMPServerPort int `mapstructure:"opamp_server_port"`
PassthroughLogs bool `mapstructure:"passthrough_logs"`
}

Expand All @@ -171,6 +172,10 @@ func (a Agent) Validate() error {
return errors.New("agent::health_check_port must be a valid port number")
}

if a.OpAMPServerPort < 0 || a.OpAMPServerPort > 65535 {
return errors.New("agent::opamp_server_port must be a valid port number")
}

if a.Executable == "" {
return errors.New("agent::executable must be specified")
}
Expand Down
53 changes: 50 additions & 3 deletions cmd/opampsupervisor/supervisor/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func TestValidate(t *testing.T) {
expectedError: "agent::orphan_detection_interval must be positive",
},
{
name: "Invalid port number",
name: "Invalid health check port number",
config: Supervisor{
Server: OpAMPServer{
Endpoint: "wss://localhost:9090/opamp",
Expand All @@ -254,7 +254,7 @@ func TestValidate(t *testing.T) {
expectedError: "agent::health_check_port must be a valid port number",
},
{
name: "Zero value port number",
name: "Zero value health check port number",
config: Supervisor{
Server: OpAMPServer{
Endpoint: "wss://localhost:9090/opamp",
Expand All @@ -280,7 +280,7 @@ func TestValidate(t *testing.T) {
},
},
{
name: "Normal port number",
name: "Normal health check port number",
config: Supervisor{
Server: OpAMPServer{
Endpoint: "wss://localhost:9090/opamp",
Expand Down Expand Up @@ -331,6 +331,53 @@ func TestValidate(t *testing.T) {
},
expectedError: "agent::bootstrap_timeout must be positive",
},
{
name: "Invalid opamp server port number",
config: Supervisor{
Server: OpAMPServer{
Endpoint: "wss://localhost:9090/opamp",
Headers: http.Header{
"Header1": []string{"HeaderValue"},
},
},
Agent: Agent{
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
OpAMPServerPort: 65536,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
AcceptsRemoteConfig: true,
},
Storage: Storage{
Directory: "/etc/opamp-supervisor/storage",
},
},
expectedError: "agent::opamp_server_port must be a valid port number",
},
{
name: "Zero value opamp server port number",
config: Supervisor{
Server: OpAMPServer{
Endpoint: "wss://localhost:9090/opamp",
Headers: http.Header{
"Header1": []string{"HeaderValue"},
},
},
Agent: Agent{
Executable: "${file_path}",
OrphanDetectionInterval: 5 * time.Second,
OpAMPServerPort: 0,
BootstrapTimeout: 5 * time.Second,
},
Capabilities: Capabilities{
AcceptsRemoteConfig: true,
},
Storage: Storage{
Directory: "/etc/opamp-supervisor/storage",
},
},
},
}

// create some fake files for validating agent config
Expand Down
11 changes: 9 additions & 2 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (s *Supervisor) createTemplates() error {
// shuts down the Collector. This only needs to happen
// once per Collector binary.
func (s *Supervisor) getBootstrapInfo() (err error) {
s.opampServerPort, err = s.findRandomPort()
s.opampServerPort, err = s.getSupervisorOpAMPServerPort()
if err != nil {
return err
}
Expand Down Expand Up @@ -457,7 +457,7 @@ func (s *Supervisor) startOpAMPServer() error {
s.opampServer = server.New(newLoggerFromZap(s.logger))

var err error
s.opampServerPort, err = s.findRandomPort()
s.opampServerPort, err = s.getSupervisorOpAMPServerPort()
if err != nil {
return err
}
Expand Down Expand Up @@ -1345,6 +1345,13 @@ func (s *Supervisor) agentConfigFilePath() string {
return filepath.Join(s.config.Storage.Directory, agentConfigFileName)
}

func (s *Supervisor) getSupervisorOpAMPServerPort() (int, error) {
if s.config.Agent.OpAMPServerPort != 0 {
return s.config.Agent.OpAMPServerPort, nil
}
return s.findRandomPort()
}

func (s *Supervisor) findRandomPort() (int, error) {
l, err := net.Listen("tcp", "localhost:0")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
server:
endpoint: ws://{{.url}}/v1/opamp

capabilities:
reports_effective_config: true
reports_own_metrics: true
reports_health: true
accepts_remote_config: true
reports_remote_config: true
accepts_restart_command: true

storage:
directory: '{{.storage_dir}}'

agent:
executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}}
opamp_server_port: {{ .supervisor_opamp_server_port }}
3 changes: 2 additions & 1 deletion exporter/datadogexporter/internal/clientutil/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package clientutil // import "github.com/open-telemetry/opentelemetry-collector-
import (
"context"
"errors"
"net/http"

"github.com/DataDog/datadog-api-client-go/v2/api/datadog"
"github.com/DataDog/datadog-api-client-go/v2/api/datadogV1"
Expand Down Expand Up @@ -45,7 +46,7 @@ func ValidateAPIKey(ctx context.Context, apiKey string, logger *zap.Logger, apiC
return nil
}
if err != nil {
if httpresp != nil && httpresp.StatusCode == 403 {
if httpresp != nil && httpresp.StatusCode == http.StatusForbidden {
return WrapError(ErrInvalidAPI, httpresp)
}
logger.Warn("Error while validating API key", zap.Error(err))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ func WrapError(err error, resp *http.Response) error {
}

func isNonRetriable(resp *http.Response) bool {
return resp.StatusCode == 400 || resp.StatusCode == 404 || resp.StatusCode == 413 || resp.StatusCode == 403
return resp.StatusCode == http.StatusBadRequest || resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusRequestEntityTooLarge || resp.StatusCode == http.StatusForbidden
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
)

func TestWrapError(t *testing.T) {
respOK := http.Response{StatusCode: 200}
respRetriable := http.Response{StatusCode: 402}
respNonRetriable := http.Response{StatusCode: 404}
respOK := http.Response{StatusCode: http.StatusOK}
respRetriable := http.Response{StatusCode: http.StatusPaymentRequired}
respNonRetriable := http.Response{StatusCode: http.StatusNotFound}
err := fmt.Errorf("Test error")
assert.False(t, consumererror.IsPermanent(WrapError(err, &respOK)))
assert.False(t, consumererror.IsPermanent(WrapError(err, &respRetriable)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestNoRetriesOnPermanentError(t *testing.T) {
scrubber := scrub.NewScrubber()
retrier := NewRetrier(zap.NewNop(), configretry.NewDefaultBackOffConfig(), scrubber)
ctx := context.Background()
respNonRetriable := http.Response{StatusCode: 404}
respNonRetriable := http.Response{StatusCode: http.StatusNotFound}

retryNum, err := retrier.DoWithRetries(ctx, func(context.Context) error {
return WrapError(fmt.Errorf("test"), &respNonRetriable)
Expand Down
6 changes: 3 additions & 3 deletions exporter/elasticsearchexporter/bulkindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
name: "500",
roundTripFunc: func(*http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: 500,
StatusCode: http.StatusInternalServerError,
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
Body: io.NopCloser(strings.NewReader("error")),
}, nil
Expand All @@ -182,7 +182,7 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
name: "429",
roundTripFunc: func(*http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: 429,
StatusCode: http.StatusTooManyRequests,
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
Body: io.NopCloser(strings.NewReader("error")),
}, nil
Expand All @@ -200,7 +200,7 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
name: "known version conflict error",
roundTripFunc: func(*http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: 200,
StatusCode: http.StatusOK,
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
Body: io.NopCloser(strings.NewReader(
`{"items":[{"create":{"_index":".ds-metrics-generic.otel-default","status":400,"error":{"type":"version_conflict_engine_exception","reason":""}}}]}`)),
Expand Down
6 changes: 3 additions & 3 deletions exporter/influxdbexporter/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,10 @@ func (b *influxHTTPWriterBatch) WriteBatch(ctx context.Context) error {
if err = res.Body.Close(); err != nil {
return err
}
switch res.StatusCode / 100 {
case 2: // Success
switch {
case res.StatusCode >= 200 && res.StatusCode < 300: // Success
break
case 5: // Retryable error
case res.StatusCode >= 500 && res.StatusCode < 600: // Retryable error
return fmt.Errorf("line protocol write returned %q %q", res.Status, string(body))
default: // Terminal error
return consumererror.NewPermanent(fmt.Errorf("line protocol write returned %q %q", res.Status, string(body)))
Expand Down
2 changes: 1 addition & 1 deletion exporter/lokiexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (l *lokiExporter) sendPushRequest(ctx context.Context, tenant string, reque
return consumererror.NewPermanent(err)
}

req, err := http.NewRequestWithContext(ctx, "POST", l.config.ClientConfig.Endpoint, bytes.NewReader(buf))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, l.config.ClientConfig.Endpoint, bytes.NewReader(buf))
if err != nil {
return consumererror.NewPermanent(err)
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequ
}

// Create the HTTP POST request to send to the endpoint
req, err := http.NewRequestWithContext(ctx, "POST", prwe.endpointURL.String(), bytes.NewReader(compressedData))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, prwe.endpointURL.String(), bytes.NewReader(compressedData))
if err != nil {
return backoff.Permanent(consumererror.NewPermanent(err))
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/signalfxexporter/dpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *sfxDPClient) postData(ctx context.Context, body io.Reader, headers map[
if !strings.HasSuffix(datapointURL.Path, "v2/datapoint") {
datapointURL.Path = path.Join(datapointURL.Path, "v2/datapoint")
}
req, err := http.NewRequestWithContext(ctx, "POST", datapointURL.String(), body)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, datapointURL.String(), body)
if err != nil {
return consumererror.NewPermanent(err)
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/signalfxexporter/eventclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (s *sfxEventClient) pushLogsData(ctx context.Context, ld plog.Logs) (int, e
if !strings.HasSuffix(eventURL.Path, "v2/event") {
eventURL.Path = path.Join(eventURL.Path, "v2/event")
}
req, err := http.NewRequestWithContext(ctx, "POST", eventURL.String(), body)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, eventURL.String(), body)
if err != nil {
return ld.LogRecordCount(), consumererror.NewPermanent(err)
}
Expand Down
Loading

0 comments on commit 92677c2

Please sign in to comment.