Skip to content

Commit

Permalink
[receiver/tlscheck] Implement Scraper
Browse files Browse the repository at this point in the history
  • Loading branch information
michael-burt committed Oct 22, 2024
1 parent 33a5457 commit e70cc3d
Show file tree
Hide file tree
Showing 31 changed files with 520 additions and 319 deletions.
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter_compression-gzip.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Enable gzip compression by default

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35865]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: To disable compression, set config `compression` to `none`.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
27 changes: 27 additions & 0 deletions .chloggen/tlscheckreceiver-implementation.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: tlscheckreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Implement TLS Check Receiver for host-based checks

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35842]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ linters-settings:
- float-compare
- require-error
- suite-subtest-run
- encoded-compare # has false positives that cannot be fixed with testifylint-fix
enable-all: true

linters:
Expand Down
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -395,16 +395,16 @@ define updatehelper
echo "Usage: updatehelper <versions.yaml> <go.mod> <builder-config.yaml>"; \
exit 1; \
fi
grep "go\.opentelemetry\.io" $(1) | sed 's/^\s*-\s*//' | while IFS= read -r line; do \
grep "go\.opentelemetry\.io" $(1) | sed 's/^[[:space:]]*-[[:space:]]*//' | while IFS= read -r line; do \
if grep -qF "$$line" $(2); then \
package=$$(grep -F "$$line" $(2) | head -n 1 | awk '{print $$1}'); \
version=$$(grep -F "$$line" $(2) | head -n 1 | awk '{print $$2}'); \
builder_package=$$(grep -F "$$package" $(3) | awk '{print $$3}'); \
builder_version=$$(grep -F "$$package" $(3) | awk '{print $$4}'); \
if [ "$$builder_package" == "$$package" ]; then \
echo "$$builder_version";\
sed -i -e "s|$$builder_package.*$$builder_version|$$builder_package $$version|" $(3); \
echo "[$(3)]: $$package updated to $$version"; \
sed -i.bak -e "s|$$builder_package.*$$builder_version|$$builder_package $$version|" $(3); \
rm $(3).bak; \
echo "[$(3)]: $$package updated from $$builder_version to $$version"; \
fi; \
fi; \
done
Expand Down
2 changes: 1 addition & 1 deletion exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ service:
### HTTP settings
The Elasticsearch exporter supports common [HTTP Configuration Settings][confighttp], except for `compression` (all requests are uncompressed).
The Elasticsearch exporter supports common [HTTP Configuration Settings][confighttp]. Gzip compression is enabled by default. To disable compression, set `compression` to `none`.
As a consequence of supporting [confighttp], the Elasticsearch exporter also supports common [TLS Configuration Settings][configtls].

The Elasticsearch exporter sets `timeout` (HTTP request timeout) to 90s by default.
Expand Down
7 changes: 7 additions & 0 deletions exporter/elasticsearchexporter/bulkindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

import (
"compress/gzip"
"context"
"errors"
"io"
Expand All @@ -15,6 +16,7 @@ import (

"github.com/elastic/go-docappender/v2"
"github.com/elastic/go-elasticsearch/v7"
"go.opentelemetry.io/collector/config/configcompression"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -68,12 +70,17 @@ func bulkIndexerConfig(client *elasticsearch.Client, config *Config) docappender
maxDocRetries = config.Retry.MaxRetries
}
}
var compressionLevel int
if config.Compression == configcompression.TypeGzip {
compressionLevel = gzip.BestSpeed
}
return docappender.BulkIndexerConfig{
Client: client,
MaxDocumentRetries: maxDocRetries,
Pipeline: config.Pipeline,
RetryOnDocumentStatus: config.Retry.RetryOnStatus,
RequireDataStream: config.MappingMode() == MappingOTel,
CompressionLevel: compressionLevel,
}
}

Expand Down
94 changes: 80 additions & 14 deletions exporter/elasticsearchexporter/bulkindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/elastic/go-elasticsearch/v7"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/config/confighttp"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
Expand Down Expand Up @@ -62,13 +63,8 @@ func TestAsyncBulkIndexer_flushOnClose(t *testing.T) {
}})
require.NoError(t, err)

bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, &cfg)
require.NoError(t, err)
session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)
bulkIndexer := runBulkIndexerOnce(t, &cfg, client)

assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
assert.NoError(t, bulkIndexer.Close(context.Background()))
assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load())
}

Expand Down Expand Up @@ -157,13 +153,7 @@ func TestAsyncBulkIndexer_requireDataStream(t *testing.T) {
}})
require.NoError(t, err)

bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, &tt.config)
require.NoError(t, err)
session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
assert.NoError(t, bulkIndexer.Close(context.Background()))
runBulkIndexerOnce(t, &tt.config, client)

assert.Equal(t, tt.wantRequireDataStream, <-requireDataStreamCh)
})
Expand Down Expand Up @@ -234,14 +224,15 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {

bulkIndexer, err := newAsyncBulkIndexer(zap.New(core), client, &cfg)
require.NoError(t, err)
defer bulkIndexer.Close(context.Background())

session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
// should flush
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load())
assert.NoError(t, bulkIndexer.Close(context.Background()))
messages := observed.FilterMessage(tt.wantMessage)
require.Equal(t, 1, messages.Len(), "message not found; observed.All()=%v", observed.All())
for _, wantField := range tt.wantFields {
Expand All @@ -250,3 +241,78 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) {
})
}
}

func TestAsyncBulkIndexer_logRoundTrip(t *testing.T) {
tests := []struct {
name string
config Config
}{
{
name: "compression none",
config: Config{
NumWorkers: 1,
ClientConfig: confighttp.ClientConfig{Compression: "none"},
},
},
{
name: "compression gzip",
config: Config{
NumWorkers: 1,
ClientConfig: confighttp.ClientConfig{Compression: "gzip"},
},
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

loggerCore, logObserver := observer.New(zap.DebugLevel)

esLogger := clientLogger{
Logger: zap.New(loggerCore),
logRequestBody: true,
logResponseBody: true,
}

client, err := elasticsearch.NewClient(elasticsearch.Config{
Transport: &mockTransport{
RoundTripFunc: func(*http.Request) (*http.Response, error) {
return &http.Response{
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
Body: io.NopCloser(strings.NewReader(successResp)),
}, nil
},
},
Logger: &esLogger,
})
require.NoError(t, err)

runBulkIndexerOnce(t, &tt.config, client)

records := logObserver.AllUntimed()
assert.Len(t, records, 2)

assert.Equal(t, "/", records[0].ContextMap()["path"])
assert.Nil(t, records[0].ContextMap()["request_body"])
assert.JSONEq(t, successResp, records[0].ContextMap()["response_body"].(string))

assert.Equal(t, "/_bulk", records[1].ContextMap()["path"])
assert.Equal(t, "{\"create\":{\"_index\":\"foo\"}}\n{\"foo\": \"bar\"}\n", records[1].ContextMap()["request_body"])
assert.JSONEq(t, successResp, records[1].ContextMap()["response_body"].(string))
})
}
}

func runBulkIndexerOnce(t *testing.T, config *Config, client *elasticsearch.Client) *asyncBulkIndexer {
bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, config)
require.NoError(t, err)
session, err := bulkIndexer.StartSession(context.Background())
require.NoError(t, err)

assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil))
assert.NoError(t, bulkIndexer.Close(context.Background()))

return bulkIndexer
}
6 changes: 3 additions & 3 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strings"
"time"

"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
Expand Down Expand Up @@ -273,9 +274,8 @@ func (cfg *Config) Validate() error {
return fmt.Errorf("unknown mapping mode %q", cfg.Mapping.Mode)
}

if cfg.Compression != "" {
// TODO support confighttp.ClientConfig.Compression
return errors.New("compression is not currently configurable")
if cfg.Compression != "none" && cfg.Compression != configcompression.TypeGzip {
return errors.New("compression must be one of [none, gzip]")
}

if cfg.Retry.MaxRequests != 0 && cfg.Retry.MaxRetries != 0 {
Expand Down
30 changes: 27 additions & 3 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package elasticsearchexporter
import (
"net/http"
"path/filepath"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -38,6 +39,7 @@ func TestConfig(t *testing.T) {

defaultMaxIdleConns := 100
defaultIdleConnTimeout := 90 * time.Second
defaultCompression := configcompression.TypeGzip

tests := []struct {
configFile string
Expand Down Expand Up @@ -80,6 +82,7 @@ func TestConfig(t *testing.T) {
cfg.Headers = map[string]configopaque.String{
"myheader": "test",
}
cfg.Compression = defaultCompression
}),
Authentication: AuthenticationSettings{
User: "elastic",
Expand Down Expand Up @@ -150,6 +153,7 @@ func TestConfig(t *testing.T) {
cfg.Headers = map[string]configopaque.String{
"myheader": "test",
}
cfg.Compression = defaultCompression
}),
Authentication: AuthenticationSettings{
User: "elastic",
Expand Down Expand Up @@ -220,6 +224,7 @@ func TestConfig(t *testing.T) {
cfg.Headers = map[string]configopaque.String{
"myheader": "test",
}
cfg.Compression = defaultCompression
}),
Authentication: AuthenticationSettings{
User: "elastic",
Expand Down Expand Up @@ -301,10 +306,29 @@ func TestConfig(t *testing.T) {
cfg.Batcher.Enabled = &enabled
}),
},
{
id: component.NewIDWithName(metadata.Type, "compression_none"),
configFile: "config.yaml",
expected: withDefaultConfig(func(cfg *Config) {
cfg.Endpoint = "https://elastic.example.com:9200"

cfg.Compression = "none"
}),
},
{
id: component.NewIDWithName(metadata.Type, "compression_gzip"),
configFile: "config.yaml",
expected: withDefaultConfig(func(cfg *Config) {
cfg.Endpoint = "https://elastic.example.com:9200"

cfg.Compression = "gzip"
}),
},
}

for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
tt := tt
t.Run(strings.ReplaceAll(tt.id.String(), "/", "_"), func(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

Expand Down Expand Up @@ -387,9 +411,9 @@ func TestConfig_Validate(t *testing.T) {
"compression unsupported": {
config: withDefaultConfig(func(cfg *Config) {
cfg.Endpoints = []string{"http://test:9200"}
cfg.Compression = configcompression.TypeGzip
cfg.Compression = configcompression.TypeSnappy
}),
err: `compression is not currently configurable`,
err: `compression must be one of [none, gzip]`,
},
"both max_retries and max_requests specified": {
config: withDefaultConfig(func(cfg *Config) {
Expand Down
10 changes: 9 additions & 1 deletion exporter/elasticsearchexporter/esclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/cenkalti/backoff/v4"
"github.com/elastic/go-elasticsearch/v7"
"github.com/klauspost/compress/gzip"
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"

Expand All @@ -32,7 +33,14 @@ func (cl *clientLogger) LogRoundTrip(requ *http.Request, resp *http.Response, cl

var fields []zap.Field
if cl.logRequestBody && requ != nil && requ.Body != nil {
if b, err := io.ReadAll(requ.Body); err == nil {
body := requ.Body
if requ.Header.Get("Content-Encoding") == "gzip" {
if r, err := gzip.NewReader(body); err == nil {
defer r.Close()
body = r
}
}
if b, err := io.ReadAll(body); err == nil {
fields = append(fields, zap.ByteString("request_body", b))
}
}
Expand Down
Loading

0 comments on commit e70cc3d

Please sign in to comment.