Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump github.com/cyphar/filepath-securejoin from 0.2.3 to 0.2.4 in /cmd/otelcontribcol #68

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
480 changes: 480 additions & 0 deletions .github/workflows/build-test-publish.yaml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ require (
github.com/coreos/go-oidc v2.2.1+incompatible // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cskr/pubsub v1.0.2 // indirect
github.com/cyphar/filepath-securejoin v0.2.3 // indirect
github.com/cyphar/filepath-securejoin v0.2.4 // indirect
github.com/danieljoos/wincred v1.1.2 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/denisenkom/go-mssqldb v0.12.2 // indirect
Expand Down
3 changes: 2 additions & 1 deletion cmd/otelcontribcol/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions cmd/otelcontribcol/processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ func TestDefaultProcessors(t *testing.T) {
processor: "k8sattributes",
skipLifecycle: true, // Requires a k8s API to communicate with
},
{
processor: "logstransform",
skipLifecycle: true,
},
{
processor: "memory_limiter",
getConfigFn: func() component.Config {
Expand Down
213 changes: 152 additions & 61 deletions exporter/clickhouseexporter/exporter_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"context"
"database/sql"
"fmt"
"net/url"
"strings"
"time"

_ "github.com/ClickHouse/clickhouse-go/v2" // For register database driver.
"github.com/ClickHouse/clickhouse-go/v2"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
Expand All @@ -28,7 +30,7 @@ type logsExporter struct {
}

func newLogsExporter(logger *zap.Logger, cfg *Config) (*logsExporter, error) {
client, err := newClickhouseClient(cfg)
client, err := newClickHouseConn(cfg)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -57,35 +59,72 @@ func (e *logsExporter) shutdown(_ context.Context) error {
return nil
}

func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
func (e *logsExporter) pushLogsData(_ context.Context, ld plog.Logs) error {
start := time.Now()
err := doWithTx(ctx, e.client, func(tx *sql.Tx) error {
statement, err := tx.PrepareContext(ctx, e.insertSQL)
err := func() error {
scope, err := e.client.Begin()
if err != nil {
return fmt.Errorf("PrepareContext:%w", err)
return fmt.Errorf("Begin:%w", err)
}
defer func() {
_ = statement.Close()
}()

batch, err := scope.Prepare(e.insertSQL)
if err != nil {
return fmt.Errorf("Prepare:%w", err)
}

var serviceName string
for i := 0; i < ld.ResourceLogs().Len(); i++ {
logs := ld.ResourceLogs().At(i)
var podName string
var containerName string
var region string
var cloudProvider string
var cell string

resAttr := make(map[string]string)

resourceLogs := ld.ResourceLogs()
for i := 0; i < resourceLogs.Len(); i++ {
logs := resourceLogs.At(i)
res := logs.Resource()
resURL := logs.SchemaUrl()
resAttr := attributesToMap(res.Attributes())
if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok {
serviceName = v.Str()
}

attrs := res.Attributes()
attributesToMap(attrs, resAttr)

attrs.Range(func(key string, value pcommon.Value) bool {
switch key {
case conventions.AttributeServiceName:
serviceName = value.Str()
case conventions.AttributeK8SPodName:
podName = value.AsString()
case conventions.AttributeK8SContainerName:
containerName = value.AsString()
// TODO use AttributeCloudRegion 'cloud.region'
// https://github.com/ClickHouse/data-plane-application/issues/4155
case "region":
fallthrough
case conventions.AttributeCloudRegion:
region = value.AsString()
case conventions.AttributeCloudProvider:
cloudProvider = value.AsString()
case "cell":
cell = value.AsString()
}
return true
})
for j := 0; j < logs.ScopeLogs().Len(); j++ {
rs := logs.ScopeLogs().At(j).LogRecords()
scopeURL := logs.ScopeLogs().At(j).SchemaUrl()
scopeName := logs.ScopeLogs().At(j).Scope().Name()
scopeVersion := logs.ScopeLogs().At(j).Scope().Version()
scopeAttr := attributesToMap(logs.ScopeLogs().At(j).Scope().Attributes())
scopeAttr := make(map[string]string, attrs.Len())
attributesToMap(logs.ScopeLogs().At(j).Scope().Attributes(), scopeAttr)
for k := 0; k < rs.Len(); k++ {
r := rs.At(k)
logAttr := attributesToMap(r.Attributes())
_, err = statement.ExecContext(ctx,

logAttr := make(map[string]string, attrs.Len())
attributesToMap(r.Attributes(), logAttr)

_, err = batch.Exec(
r.Timestamp().AsTime(),
traceutil.TraceIDToHexOrEmptyString(r.TraceID()),
traceutil.SpanIDToHexOrEmptyString(r.SpanID()),
Expand All @@ -95,6 +134,11 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
serviceName,
r.Body().AsString(),
resURL,
podName,
containerName,
region,
cloudProvider,
cell,
resAttr,
scopeURL,
scopeName,
Expand All @@ -103,26 +147,31 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
logAttr,
)
if err != nil {
return fmt.Errorf("ExecContext:%w", err)
return fmt.Errorf("Append:%w", err)
}
}
}

// clear map for reuse
for k := range resAttr {
delete(resAttr, k)
}
}
return nil
})

return scope.Commit()
}()

duration := time.Since(start)
e.logger.Debug("insert logs", zap.Int("records", ld.LogRecordCount()),
zap.String("cost", duration.String()))
return err
}

func attributesToMap(attributes pcommon.Map) map[string]string {
m := make(map[string]string, attributes.Len())
func attributesToMap(attributes pcommon.Map, dest map[string]string) {
attributes.Range(func(k string, v pcommon.Value) bool {
m[k] = v.AsString()
dest[k] = v.AsString()
return true
})
return m
}

const (
Expand All @@ -136,7 +185,12 @@ CREATE TABLE IF NOT EXISTS %s (
SeverityText LowCardinality(String) CODEC(ZSTD(1)),
SeverityNumber Int32 CODEC(ZSTD(1)),
ServiceName LowCardinality(String) CODEC(ZSTD(1)),
Body String CODEC(ZSTD(1)),
Body LowCardinality(String) CODEC(ZSTD(1)),
PodName LowCardinality(String),
ContainerName LowCardinality(String),
Region LowCardinality(String),
CloudProvider LowCardinality(String),
Cell LowCardinality(String),
ResourceSchemaUrl String CODEC(ZSTD(1)),
ResourceAttributes Map(LowCardinality(String), String) CODEC(ZSTD(1)),
ScopeSchemaUrl String CODEC(ZSTD(1)),
Expand All @@ -154,10 +208,11 @@ CREATE TABLE IF NOT EXISTS %s (
INDEX idx_body Body TYPE tokenbf_v1(32768, 3, 0) GRANULARITY 1
) ENGINE MergeTree()
%s
PARTITION BY toDate(Timestamp)
ORDER BY (ServiceName, SeverityText, toUnixTimestamp(Timestamp), TraceId)
PARTITION BY toYYYYMM(Timestamp)
ORDER BY (PodName, ContainerName, SeverityText, Timestamp)
SETTINGS index_granularity=8192, ttl_only_drop_parts = 1;
`

// language=ClickHouse SQL
insertLogsSQLTemplate = `INSERT INTO %s (
Timestamp,
Expand All @@ -169,42 +224,89 @@ SETTINGS index_granularity=8192, ttl_only_drop_parts = 1;
ServiceName,
Body,
ResourceSchemaUrl,
PodName,
ContainerName,
Region,
CloudProvider,
Cell,
ResourceAttributes,
ScopeSchemaUrl,
ScopeName,
ScopeVersion,
ScopeAttributes,
LogAttributes
)`
inlineinsertLogsSQLTemplate = `INSERT INTO %s SETTINGS async_insert=1, wait_for_async_insert=0 (
Timestamp,
TraceId,
SpanId,
TraceFlags,
SeverityText,
SeverityNumber,
ServiceName,
Body,
ResourceSchemaUrl,
PodName,
ContainerName,
Region,
CloudProvider,
Cell,
ResourceAttributes,
ScopeSchemaUrl,
ScopeName,
ScopeVersion,
ScopeAttributes,
LogAttributes
) VALUES (
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?,
?
)`
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
)

var driverName = "clickhouse" // for testing

// newClickhouseClient create a clickhouse client.
func newClickhouseClient(cfg *Config) (*sql.DB, error) {
// newClickHouseClient create a clickhouse client.
// used by metrics and traces:
func newClickHouseClient(cfg *Config) (*sql.DB, error) {
db, err := cfg.buildDB(cfg.Database)
if err != nil {
return nil, err
}
return db, nil
}

// used by logs:
func newClickHouseConn(cfg *Config) (*sql.DB, error) {
endpoint := cfg.Endpoint

if len(cfg.ConnectionParams) > 0 {
values := make(url.Values, len(cfg.ConnectionParams))
for k, v := range cfg.ConnectionParams {
values.Add(k, v)
}

if !strings.Contains(endpoint, "?") {
endpoint += "?"
} else if !strings.HasSuffix(endpoint, "&") {
endpoint += "&"
}

endpoint += values.Encode()
}

opts, err := clickhouse.ParseDSN(endpoint)
if err != nil {
return nil, fmt.Errorf("unable to parse endpoint: %w", err)
}

opts.Auth = clickhouse.Auth{
Database: cfg.Database,
Username: cfg.Username,
Password: string(cfg.Password),
}

// can return a "bad" connection if misconfigured, we won't know
// until a Ping, Exec, etc.. is done
return clickhouse.OpenDB(opts), nil
}

func createDatabase(ctx context.Context, cfg *Config) error {
// use default database to create new database
if cfg.Database == defaultDatabase {
Expand Down Expand Up @@ -242,19 +344,8 @@ func renderCreateLogsTableSQL(cfg *Config) string {
}

func renderInsertLogsSQL(cfg *Config) string {
return fmt.Sprintf(insertLogsSQLTemplate, cfg.LogsTableName)
}

func doWithTx(_ context.Context, db *sql.DB, fn func(tx *sql.Tx) error) error {
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("db.Begin: %w", err)
if strings.HasPrefix(cfg.Endpoint, "tcp") && cfg.ConnectionParams["async_insert"] == "1" {
return fmt.Sprintf(inlineinsertLogsSQLTemplate, cfg.LogsTableName)
}
defer func() {
_ = tx.Rollback()
}()
if err := fn(tx); err != nil {
return err
}
return tx.Commit()
return fmt.Sprintf(insertLogsSQLTemplate, cfg.LogsTableName)
}
Loading