Skip to content

Commit

Permalink
Merge pull request #105 from go-faster/feat/otel-add-query-metrics
Browse files Browse the repository at this point in the history
feat(otelch): add query metrics
  • Loading branch information
ernado authored May 24, 2022
2 parents 3dd10d7 + f49fefb commit 04d1945
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 8 deletions.
36 changes: 36 additions & 0 deletions otelch/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,44 @@ const (
ServerNameKey = attribute.Key("ch.server.name")
ErrorCodeKey = attribute.Key("ch.error.code")
ErrorNameKey = attribute.Key("ch.error.name")
BlocksSentKey = attribute.Key("ch.blocks_sent")
BlocksReceivedKey = attribute.Key("ch.blocks_received")
RowsKey = attribute.Key("ch.rows")
BytesKey = attribute.Key("ch.bytes")
)

// BlocksSent is cumulative blocks sent count during query execution.
func BlocksSent(v int) attribute.KeyValue {
return attribute.KeyValue{
Key: BlocksSentKey,
Value: attribute.IntValue(v),
}
}

// BlocksReceived is cumulative received sent count during query execution.
func BlocksReceived(v int) attribute.KeyValue {
return attribute.KeyValue{
Key: BlocksReceivedKey,
Value: attribute.IntValue(v),
}
}

// Rows is cumulative rows processed count during query execution.
func Rows(v int) attribute.KeyValue {
return attribute.KeyValue{
Key: RowsKey,
Value: attribute.IntValue(v),
}
}

// Bytes is cumulative bytes processed count during query execution.
func Bytes(v int) attribute.KeyValue {
return attribute.KeyValue{
Key: BytesKey,
Value: attribute.IntValue(v),
}
}

// QueryID attribute.
func QueryID(v string) attribute.KeyValue {
return attribute.KeyValue{
Expand Down
26 changes: 18 additions & 8 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,12 @@ func (c *Client) sendQuery(ctx context.Context, q Query) error {
// Resembling behavior of clickhouse-client.
q.ExternalTable = "_data"
}
if err := c.encodeBlock(q.ExternalTable, q.ExternalData); err != nil {
if err := c.encodeBlock(ctx, q.ExternalTable, q.ExternalData); err != nil {
return errors.Wrap(err, "external data")
}
}
// End of external data.
if err := c.encodeBlankBlock(); err != nil {
if err := c.encodeBlankBlock(ctx); err != nil {
return errors.Wrap(err, "external data end")
}

Expand Down Expand Up @@ -227,6 +227,7 @@ func (c *Client) decodeBlock(ctx context.Context, opt decodeOptions) error {
if block.End() {
return nil
}
c.metricsInc(ctx, queryMetrics{BlocksReceived: 1})
if err := opt.Handler(ctx, block); err != nil {
return errors.Wrap(err, "handler")
}
Expand All @@ -237,7 +238,7 @@ func (c *Client) decodeBlock(ctx context.Context, opt decodeOptions) error {
//
// If input length is zero, blank block will be encoded, which is special case
// for "end of data".
func (c *Client) encodeBlock(tableName string, input []proto.InputColumn) error {
func (c *Client) encodeBlock(ctx context.Context, tableName string, input []proto.InputColumn) error {
proto.ClientCodeData.Encode(c.buf)
clientData := proto.ClientData{
// External data table name.
Expand All @@ -252,6 +253,7 @@ func (c *Client) encodeBlock(tableName string, input []proto.InputColumn) error
Columns: len(input),
}
if len(input) > 0 {
c.metricsInc(ctx, queryMetrics{BlocksSent: 1})
b.Rows = input[0].Data.Rows()
b.Info = proto.BlockInfo{
// TODO(ernado): investigate and document
Expand Down Expand Up @@ -279,8 +281,8 @@ func (c *Client) encodeBlock(tableName string, input []proto.InputColumn) error

// encodeBlankBlock encodes block with zero columns and rows which is special
// case for "end of data".
func (c *Client) encodeBlankBlock() error {
return c.encodeBlock("", nil)
func (c *Client) encodeBlankBlock(ctx context.Context) error {
return c.encodeBlock(ctx, "", nil)
}

func (c *Client) sendInput(ctx context.Context, info proto.ColInfoInput, q Query) error {
Expand Down Expand Up @@ -323,7 +325,7 @@ func (c *Client) sendInput(ctx context.Context, info proto.ColInfoInput, q Query
if err := ctx.Err(); err != nil {
return errors.Wrap(err, "context")
}
if err := c.encodeBlock("", q.Input); err != nil {
if err := c.encodeBlock(ctx, "", q.Input); err != nil {
return errors.Wrap(err, "write block")
}
if f == nil {
Expand All @@ -347,7 +349,7 @@ End:
// End of input stream.
//
// Encoding that there are no more data.
if err := c.encodeBlankBlock(); err != nil {
if err := c.encodeBlankBlock(ctx); err != nil {
return errors.Wrap(err, "write end of data")
}

Expand Down Expand Up @@ -391,6 +393,7 @@ func (c *Client) handlePacket(ctx context.Context, p proto.ServerCode, q Query)
if err != nil {
return errors.Wrap(err, "progress")
}
c.metricsInc(ctx, queryMetrics{Rows: int(p.Rows), Bytes: int(p.Bytes)})
if ce := c.lg.Check(zap.DebugLevel, "Progress"); ce != nil {
ce.Write(
zap.Uint64("rows", p.Rows),
Expand Down Expand Up @@ -522,8 +525,15 @@ func (c *Client) Do(ctx context.Context, q Query) (err error) {
otelch.QueryID(q.QueryID),
),
)
ctx = newCtx
m := new(queryMetrics)
ctx = context.WithValue(newCtx, ctxQueryKey{}, m)
defer func() {
span.SetAttributes(
otelch.BlocksSent(m.BlocksSent),
otelch.BlocksReceived(m.BlocksReceived),
otelch.Rows(m.Rows),
otelch.Bytes(m.Bytes),
)
if err != nil {
span.RecordError(err)
status := "Failed"
Expand Down
28 changes: 28 additions & 0 deletions query_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package ch

import "context"

type (
ctxQueryKey struct{}
queryMetrics struct {
BlocksReceived int
BlocksSent int
Rows int
Bytes int
}
)

func (c *Client) metricsInc(ctx context.Context, delta queryMetrics) {
if !c.otel {
return
}
v, ok := ctx.Value(ctxQueryKey{}).(*queryMetrics)
if !ok {
return
}

v.Bytes += delta.Bytes
v.Rows += delta.Rows
v.BlocksReceived += delta.BlocksReceived
v.BlocksSent += delta.BlocksSent
}

0 comments on commit 04d1945

Please sign in to comment.