diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 017a1d1a..f161a939 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -20,6 +20,7 @@ jobs: - 21.11.11.1-stable - 21.12.4.1-stable - 22.1.3.7-stable + - 22.2.2.1-stable steps: - uses: actions/checkout@v2 diff --git a/cht/cht.go b/cht/cht.go index b8391208..ac0362cb 100644 --- a/cht/cht.go +++ b/cht/cht.go @@ -207,6 +207,13 @@ func BinOrSkip(t testing.TB) string { return binaryPath } +type OpenTelemetry struct { + Engine string `xml:"engine,omitempty"` + Database string `xml:"database,omitempty"` + Table string `xml:"table,omitempty"` + FlushIntervalMs int `xml:"flush_interval_milliseconds,omitempty"` +} + // New creates new ClickHouse server and returns it. // Use Many to start multiple servers at once. // @@ -251,6 +258,14 @@ func New(t testing.TB, opts ...Option) Server { MarkCacheSize: 5368709120, MMAPCacheSize: 1000, + OpenTelemetrySpanLog: &OpenTelemetry{ + Table: "opentelemetry_span_log", + Database: "system", + Engine: `engine MergeTree + partition by toYYYYMM(finish_date) + order by (finish_date, finish_time_us, trace_id)`, + }, + UserDirectories: UserDir{ UsersXML: UsersXML{ Path: userCfgPath, diff --git a/cht/config.go b/cht/config.go index 5af94203..4ce12c2e 100644 --- a/cht/config.go +++ b/cht/config.go @@ -113,6 +113,8 @@ type Config struct { MarkCacheSize int64 `xml:"mark_cache_size"` MMAPCacheSize int64 `xml:"mmap_cache_size"` + OpenTelemetrySpanLog *OpenTelemetry `xml:"opentelemetry_span_log,omitempty"` + // ZooKeeper configures ZooKeeper nodes. ZooKeeper []ZooKeeperNode `xml:"zookeeper>node,omitempty"` Macros Map `xml:"macros,omitempty"` diff --git a/go.mod b/go.mod index 40fb8d9f..b19e398b 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/stretchr/testify v1.7.0 go.opentelemetry.io/otel v1.4.1 go.opentelemetry.io/otel/metric v0.27.0 + go.opentelemetry.io/otel/sdk v1.4.1 go.opentelemetry.io/otel/trace v1.4.1 go.uber.org/atomic v1.9.0 go.uber.org/multierr v1.7.0 diff --git a/go.sum b/go.sum index 273f0558..8f45ce07 100644 --- a/go.sum +++ b/go.sum @@ -54,6 +54,8 @@ go.opentelemetry.io/otel/internal/metric v0.27.0 h1:9dAVGAfFiiEq5NVB9FUJ5et+btbD go.opentelemetry.io/otel/internal/metric v0.27.0/go.mod h1:n1CVxRqKqYZtqyTh9U/onvKapPGv7y/rpyOTI+LFNzw= go.opentelemetry.io/otel/metric v0.27.0 h1:HhJPsGhJoKRSegPQILFbODU56NS/L1UE4fS1sC5kIwQ= go.opentelemetry.io/otel/metric v0.27.0/go.mod h1:raXDJ7uP2/Jc0nVZWQjJtzoyssOYWu/+pjZqRzfvZ7g= +go.opentelemetry.io/otel/sdk v1.4.1 h1:J7EaW71E0v87qflB4cDolaqq3AcujGrtyIPGQoZOB0Y= +go.opentelemetry.io/otel/sdk v1.4.1/go.mod h1:NBwHDgDIBYjwK2WNu1OPgsIc2IJzmBXNnvIJxJc8BpE= go.opentelemetry.io/otel/trace v1.4.0/go.mod h1:uc3eRsqDfWs9R7b92xbQbU42/eTNz4N+gLP8qJCi4aE= go.opentelemetry.io/otel/trace v1.4.1 h1:O+16qcdTrT7zxv2J6GejTPFinSwA++cYerC5iSiF8EQ= go.opentelemetry.io/otel/trace v1.4.1/go.mod h1:iYEVbroFCNut9QkwEczV9vMRPHNKSSwYZjulEtsmhFc= @@ -94,6 +96,7 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211110154304-99a53858aa08 h1:WecRHqgE09JBkh/584XIE6PMz5KKE/vER4izNUi30AQ= golang.org/x/sys v0.0.0-20211110154304-99a53858aa08/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/proto/_golden/client_info_otel.hex b/proto/_golden/client_info_otel.hex index c241c92b..5fe1fe2e 100644 --- a/proto/_golden/client_info_otel.hex +++ b/proto/_golden/client_info_otel.hex @@ -3,5 +3,5 @@ 00000020 39 61 61 37 30 63 33 09 30 2e 30 2e 30 2e 30 3a |9aa70c3.0.0.0.0:| 00000030 30 00 00 00 00 00 00 00 00 01 04 75 73 65 72 08 |0..........user.| 00000040 68 6f 73 74 6e 61 6d 65 04 4e 61 6d 65 15 0b 9d |hostname.Name...| -00000050 a9 03 00 00 04 01 01 02 03 04 05 06 07 08 09 0a |................| -00000060 0b 0c 0d 0e 0f 10 01 02 03 04 05 06 07 08 00 00 |................| +00000050 a9 03 00 00 04 01 08 07 06 05 04 03 02 01 10 0f |................| +00000060 0e 0d 0c 0b 0a 09 08 07 06 05 04 03 02 01 00 00 |................| diff --git a/proto/_golden/client_info_otel.raw b/proto/_golden/client_info_otel.raw index 51abf4a8..f8811f8c 100644 Binary files a/proto/_golden/client_info_otel.raw and b/proto/_golden/client_info_otel.raw differ diff --git a/proto/_golden/query_otel.hex b/proto/_golden/query_otel.hex index 234aec2f..39f21b1d 100644 --- a/proto/_golden/query_otel.hex +++ b/proto/_golden/query_otel.hex @@ -6,8 +6,8 @@ 00000050 0c 31 2e 31 2e 31 2e 31 3a 31 34 34 38 00 00 00 |.1.1.1.1:1448...| 00000060 00 00 00 00 00 01 05 61 67 65 6e 74 05 6e 65 78 |.......agent.nex| 00000070 75 73 06 4d 61 74 72 69 78 15 0e e3 99 03 06 75 |us.Matrix......u| -00000080 2d 39 37 64 63 00 c0 53 01 01 02 03 04 00 00 00 |-97dc..S........| -00000090 00 00 00 00 00 00 00 00 00 06 07 08 09 0a 00 00 |................| -000000a0 00 00 01 00 06 73 65 63 72 65 74 02 01 15 43 52 |.....secret...CR| +00000080 2d 39 37 64 63 00 c0 53 01 00 00 00 00 04 03 02 |-97dc..S........| +00000090 01 00 00 00 00 00 00 00 00 00 00 00 0a 09 08 07 |................| +000000a0 06 00 01 00 06 73 65 63 72 65 74 02 01 15 43 52 |.....secret...CR| 000000b0 45 41 54 45 20 44 41 54 41 42 41 53 45 20 74 65 |EATE DATABASE te| 000000c0 73 74 3b |st;| diff --git a/proto/_golden/query_otel.raw b/proto/_golden/query_otel.raw index a15f3422..4fd14f16 100644 Binary files a/proto/_golden/query_otel.raw and b/proto/_golden/query_otel.raw differ diff --git a/proto/client_info.go b/proto/client_info.go index 193f6e7b..39ac0ddb 100644 --- a/proto/client_info.go +++ b/proto/client_info.go @@ -2,6 +2,7 @@ package proto import ( "github.com/go-faster/errors" + "github.com/segmentio/asm/bswap" "go.opentelemetry.io/otel/trace" ) @@ -89,11 +90,15 @@ func (c ClientInfo) EncodeAware(b *Buffer, version int) { b.PutByte(1) { v := c.Span.TraceID() + start := len(b.Buf) b.Buf = append(b.Buf, v[:]...) + bswap.Swap64(b.Buf[start:]) // https://github.com/ClickHouse/ClickHouse/issues/34369 } { v := c.Span.SpanID() + start := len(b.Buf) b.Buf = append(b.Buf, v[:]...) + bswap.Swap64(b.Buf[start:]) // https://github.com/ClickHouse/ClickHouse/issues/34369 } b.PutString(c.Span.TraceState().String()) b.PutByte(byte(c.Span.TraceFlags())) @@ -239,6 +244,7 @@ func (c *ClientInfo) DecodeAware(r *Reader, version int) error { if err != nil { return errors.Wrap(err, "trace id") } + bswap.Swap64(v) // https://github.com/ClickHouse/ClickHouse/issues/34369 copy(cfg.TraceID[:], v) } { @@ -246,6 +252,7 @@ func (c *ClientInfo) DecodeAware(r *Reader, version int) error { if err != nil { return errors.Wrap(err, "span id") } + bswap.Swap64(v) // https://github.com/ClickHouse/ClickHouse/issues/34369 copy(cfg.SpanID[:], v) } { diff --git a/query_test.go b/query_test.go index 23bc0bd3..3c3a8a2d 100644 --- a/query_test.go +++ b/query_test.go @@ -979,6 +979,10 @@ func BenchmarkClient_decodeBlock(b *testing.B) { } } +func discardResult() proto.Result { + return (&proto.Results{}).Auto() +} + func TestClient_ResultsAuto(t *testing.T) { t.Parallel() ctx := context.Background() @@ -1011,6 +1015,6 @@ func TestClient_OpenTelemetryInstrumentation(t *testing.T) { }) require.NoError(t, conn.Do(ctx, Query{ Body: "SELECT 1 as v", - Result: (&proto.Results{}).Auto(), + Result: discardResult(), }), "select") } diff --git a/tracing_test.go b/tracing_test.go new file mode 100644 index 00000000..e953c58f --- /dev/null +++ b/tracing_test.go @@ -0,0 +1,100 @@ +package ch + +import ( + "context" + "fmt" + "math/rand" + "sync" + "testing" + + "github.com/stretchr/testify/require" + tracesdk "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" + + "github.com/go-faster/ch/proto" +) + +type randomIDGenerator struct { + sync.Mutex + rand *rand.Rand +} + +// NewSpanID returns a non-zero span ID from a randomly-chosen sequence. +func (gen *randomIDGenerator) NewSpanID(ctx context.Context, traceID trace.TraceID) (sid trace.SpanID) { + gen.Lock() + defer gen.Unlock() + gen.rand.Read(sid[:]) + return sid +} + +// NewIDs returns a non-zero trace ID and a non-zero span ID from a +// randomly-chosen sequence. +func (gen *randomIDGenerator) NewIDs(ctx context.Context) (tid trace.TraceID, sid trace.SpanID) { + gen.Lock() + defer gen.Unlock() + gen.rand.Read(tid[:]) + gen.rand.Read(sid[:]) + return tid, sid +} + +func TestClient_Do_tracing(t *testing.T) { + ctx := context.Background() + exporter := tracetest.NewInMemoryExporter() + randSource := rand.NewSource(15) + tp := tracesdk.NewTracerProvider( + // Using deterministic random ids. + tracesdk.WithIDGenerator(&randomIDGenerator{ + rand: rand.New(randSource), + }), + tracesdk.WithBatcher(exporter, + tracesdk.WithBatchTimeout(0), // instant + ), + ) + conn := ConnOpt(t, Options{ + OpenTelemetryInstrumentation: true, + TracerProvider: tp, + Settings: []Setting{ + { + Key: "send_logs_level", + Value: "trace", + Important: true, + }, + }, + }) + + if v := conn.ServerInfo(); (v.Major < 22) || (v.Major == 22 && v.Minor < 2) { + t.Skip("Skipping (not supported)") + } + + // Should record trace and spans. + var traceID trace.TraceID + require.NoError(t, conn.Do(ctx, Query{ + Body: "SELECT 1", + Result: discardResult(), + OnLog: func(ctx context.Context, l Log) error { + sc := trace.SpanContextFromContext(ctx) + traceID = sc.TraceID() + t.Log(l.Text, sc.TraceID(), sc.SpanID()) + return nil + }, + })) + + require.True(t, traceID.IsValid(), "trace id not registered") + + // Force flushing. + require.NoError(t, tp.ForceFlush(ctx)) + spans := exporter.GetSpans() + require.NotEmpty(t, spans) + require.NoError(t, conn.Do(ctx, Query{Body: "system flush logs"})) + + var total proto.ColUInt64 + require.NoError(t, conn.Do(ctx, Query{ + Body: fmt.Sprintf("SELECT count() as total FROM system.opentelemetry_span_log WHERE lower(hex(trace_id)) = '%s'", traceID), + Result: proto.Results{ + {Name: "total", Data: &total}, + }, + })) + + require.Greater(t, total.Row(0), uint64(1), "spans should be recorded") +}