diff --git a/config.go b/config.go index 36c6e8c..73fb90c 100644 --- a/config.go +++ b/config.go @@ -6,20 +6,23 @@ package trpc import ( + "errors" "flag" "fmt" - "math/rand" "net" "os" "strconv" + "strings" "sync/atomic" "time" "trpc.group/trpc-go/trpc-go/client" "trpc.group/trpc-go/trpc-go/codec" "trpc.group/trpc-go/trpc-go/errs" + "trpc.group/trpc-go/trpc-go/internal/rand" "trpc.group/trpc-go/trpc-go/plugin" "trpc.group/trpc-go/trpc-go/rpcz" + trpcpb "trpc.group/trpc/trpc-protocol/pb/go/trpc" "gopkg.in/yaml.v3" ) @@ -106,63 +109,392 @@ func (c *RPCZConfig) generate() *rpcz.Config { c.Capacity = defaultCapacity } - return &rpcz.Config{ - Fraction: c.Fraction, - Capacity: c.Capacity, - ShouldRecord: c.RecordWhen.shouldRecord(), + config := &rpcz.Config{ + Fraction: c.Fraction, + Capacity: c.Capacity, } + if c.RecordWhen != nil { + config.ShouldRecord = c.RecordWhen.shouldRecord() + } + return config +} + +type node interface { + yaml.Unmarshaler + shouldRecorder +} + +type nodeKind string + +const ( + kindAND nodeKind = "AND" + kindOR nodeKind = "OR" + kindNOT nodeKind = "NOT" + kindMinDuration nodeKind = "__min_duration" + kindMinRequestSize nodeKind = "__min_request_size" + kindMinResponseSize nodeKind = "__min_response_size" + kindRPCName nodeKind = "__rpc_name" + kindErrorCodes nodeKind = "__error_code" + kindErrorMessages nodeKind = "__error_message" + kindSamplingFraction nodeKind = "__sampling_fraction" + kindHasAttributes nodeKind = "__has_attribute" +) + +var kindToNode = map[nodeKind]func() node{ + kindAND: func() node { return &andNode{} }, + kindOR: func() node { return &orNode{} }, + kindNOT: func() node { return ¬Node{} }, + kindMinDuration: func() node { return &minMinDurationNode{} }, + kindMinRequestSize: func() node { return &minRequestSizeNode{} }, + kindMinResponseSize: func() node { return &minResponseSizeNode{} }, + kindRPCName: func() node { return &rpcNameNode{} }, + kindErrorCodes: func() node { return &errorCodeNode{} }, + kindErrorMessages: func() node { return &errorMessageNode{} }, + kindSamplingFraction: func() node { return &samplingFractionNode{} }, + kindHasAttributes: func() node { return &hasAttributeNode{} }, +} + +var kinds = func() []nodeKind { + ks := make([]nodeKind, 0, len(kindToNode)) + for k := range kindToNode { + ks = append(ks, k) + } + return ks +}() + +func generate(k nodeKind) (node, error) { + if fn, ok := kindToNode[k]; ok { + return fn(), nil + } + return nil, fmt.Errorf("unknown node: %s, valid node must be one of %v", k, kinds) +} + +type shouldRecorder interface { + shouldRecord() rpcz.ShouldRecord +} + +type recorder struct { + rpcz.ShouldRecord +} + +func (n *recorder) shouldRecord() rpcz.ShouldRecord { + return n.ShouldRecord } // RecordWhenConfig stores the RecordWhenConfig field of Config. type RecordWhenConfig struct { - ErrorCodes []int `yaml:"error_codes,omitempty"` - MinDuration *time.Duration `yaml:"min_duration"` - SamplingFraction float64 `yaml:"sampling_fraction"` + andNode } -func (c *RecordWhenConfig) shouldRecord() rpcz.ShouldRecord { - if c == nil { - return rpcz.AlwaysRecord +// UnmarshalYAML customizes RecordWhenConfig's behavior when being unmarshalled from a YAML document. +func (c *RecordWhenConfig) UnmarshalYAML(node *yaml.Node) error { + if err := node.Decode(&c.andNode); err != nil { + return fmt.Errorf("decoding RecordWhenConfig's andNode: %w", err) } + return nil +} - return func(s rpcz.Span) bool { - if c.checkErrorCodes(s) || c.checkoutMinDuration(s) { - return c.checkoutSamplingFraction() +type nodeList struct { + shouldRecords []rpcz.ShouldRecord +} + +func (nl *nodeList) UnmarshalYAML(node *yaml.Node) error { + var nodes []map[nodeKind]yaml.Node + if err := node.Decode(&nodes); err != nil { + return fmt.Errorf("decoding []map[nodeKind]yaml.Node: %w", err) + } + nl.shouldRecords = make([]rpcz.ShouldRecord, 0, len(nodes)) + for _, n := range nodes { + if size := len(n); size != 1 { + return fmt.Errorf("%v node has %d element currently, "+ + "but the valid number of elements can only be 1", n, size) + } + for nodeKind, value := range n { + if valueEmpty(&value) { + return fmt.Errorf("decoding %s node: value is empty", nodeKind) + } + node, err := generate(nodeKind) + if err != nil { + return fmt.Errorf("generating %s node: %w", nodeKind, err) + } + if err := value.Decode(node); err != nil { + return fmt.Errorf("decoding %s node: %w", nodeKind, err) + } + nl.shouldRecords = append(nl.shouldRecords, node.shouldRecord()) } - return false } + return nil } -func (c *RecordWhenConfig) checkErrorCodes(s rpcz.Span) bool { - err, ok := s.Attribute(rpcz.TRPCAttributeError) - if !ok { - return false +func valueEmpty(node *yaml.Node) bool { + return len(node.Content) == 0 && len(node.Value) == 0 +} + +type andNode struct { + recorder +} + +func (n *andNode) UnmarshalYAML(node *yaml.Node) error { + nl := &nodeList{} + if err := node.Decode(nl); err != nil { + return fmt.Errorf("decoding andNode: %w", err) } + n.ShouldRecord = func(s rpcz.Span) bool { + if len(nl.shouldRecords) == 0 { + return false + } + for _, r := range nl.shouldRecords { + if !r(s) { + return false + } + } + return true + } + return nil +} - e, ok := err.(error) - if !ok || e == nil { +type orNode struct { + recorder +} + +func (n *orNode) UnmarshalYAML(node *yaml.Node) error { + nl := &nodeList{} + if err := node.Decode(nl); err != nil { + return fmt.Errorf("decoding orNode: %w", err) + } + n.ShouldRecord = func(s rpcz.Span) bool { + for _, r := range nl.shouldRecords { + if r(s) { + return true + } + } return false } + return nil +} + +type notNode struct { + recorder +} + +func (n *notNode) UnmarshalYAML(node *yaml.Node) error { + var not map[nodeKind]yaml.Node + if err := node.Decode(¬); err != nil { + return fmt.Errorf("decoding notNode: %w", err) + } + const numInvalidChildren = 1 + if n := len(not); n != numInvalidChildren { + return fmt.Errorf("NOT node has %d child node currently, "+ + "but the valid number of child node can only be %d", n, numInvalidChildren) + } + for nodeKind, value := range not { + node, err := generate(nodeKind) + if err != nil { + return fmt.Errorf("generating %s node: %w", nodeKind, err) + } + if err := value.Decode(node); err != nil { + return fmt.Errorf("decoding %s node: %w", nodeKind, err) + } + n.ShouldRecord = func(s rpcz.Span) bool { + return !node.shouldRecord()(s) + } + } + return nil +} + +type hasAttributeNode struct { + recorder +} + +func (n *hasAttributeNode) UnmarshalYAML(node *yaml.Node) error { + var attribute string + if err := node.Decode(&attribute); err != nil { + return fmt.Errorf("decoding hasAttributeNode: %w", err) + } + + key, value, err := parse(attribute) + if err != nil { + return fmt.Errorf("parsing attribute %s : %w", attribute, err) + } - code := errs.Code(e) - for _, c := range c.ErrorCodes { - if c == int(code) { + n.ShouldRecord = func(s rpcz.Span) bool { + v, ok := s.Attribute(key) + return ok && strings.Contains(fmt.Sprintf("%s", v), value) + } + return nil +} + +var errInvalidAttribute = errors.New("invalid attribute form [ valid attribute form: (key, value), " + + "only one space character after comma character, and key can't contain comma(',') character ]") + +func parse(attribute string) (key string, value string, err error) { + if len(attribute) == 0 || attribute[0] != '(' { + return "", "", errInvalidAttribute + } + attribute = attribute[1:] + + if n := len(attribute); n == 0 || attribute[n-1] != ')' { + return "", "", errInvalidAttribute + } + attribute = attribute[:len(attribute)-1] + + const delimiter = ", " + i := strings.Index(attribute, delimiter) + if i == -1 { + return "", "", errInvalidAttribute + } + return attribute[:i], attribute[i+len(delimiter):], nil +} + +type minRequestSizeNode struct { + recorder +} + +func (n *minRequestSizeNode) UnmarshalYAML(node *yaml.Node) error { + var minRequestSize int + if err := node.Decode(&minRequestSize); err != nil { + return fmt.Errorf("decoding minRequestSizeNode: %w", err) + } + n.ShouldRecord = func(s rpcz.Span) bool { + size, ok := s.Attribute(rpcz.TRPCAttributeRequestSize) + if !ok { + return false + } + if size, ok := size.(int); !ok || size < minRequestSize { + return false + } + return true + } + return nil +} + +type minResponseSizeNode struct { + recorder +} + +func (n *minResponseSizeNode) UnmarshalYAML(node *yaml.Node) error { + var minResponseSize int + if err := node.Decode(&minResponseSize); err != nil { + return fmt.Errorf("decoding minResponseSizeNode: %w", err) + } + n.ShouldRecord = func(s rpcz.Span) bool { + responseSize, ok := s.Attribute(rpcz.TRPCAttributeResponseSize) + if !ok { + return false + } + if size, ok := responseSize.(int); !ok || size < minResponseSize { + return false + } + return true + } + return nil +} + +type minMinDurationNode struct { + recorder +} + +func (n *minMinDurationNode) UnmarshalYAML(node *yaml.Node) error { + var dur time.Duration + if err := node.Decode(&dur); err != nil { + return fmt.Errorf("decoding minMinDurationNode: %w", err) + } + n.ShouldRecord = func(s rpcz.Span) bool { + if dur == 0 { return true } + et := s.EndTime() + return et.IsZero() || et.Sub(s.StartTime()) >= dur } - return false + return nil } -func (c *RecordWhenConfig) checkoutMinDuration(s rpcz.Span) bool { - if c.MinDuration == nil { - return false +type rpcNameNode struct { + recorder +} + +func (n *rpcNameNode) UnmarshalYAML(node *yaml.Node) error { + var rpcName string + if err := node.Decode(&rpcName); err != nil { + return fmt.Errorf("decoding rpcNameNode: %w", err) + } + n.ShouldRecord = func(s rpcz.Span) bool { + name, ok := s.Attribute(rpcz.TRPCAttributeRPCName) + if !ok { + return false + } + if name, ok := name.(string); !ok || !strings.Contains(name, rpcName) { + return false + } + return true + } + return nil +} + +type samplingFractionNode struct { + recorder +} + +var safeRand = rand.NewSafeRand(time.Now().UnixNano()) + +func (n *samplingFractionNode) UnmarshalYAML(node *yaml.Node) error { + var f float64 + if err := node.Decode(&f); err != nil { + return fmt.Errorf("decoding samplingFractionNode: %w", err) + } + n.ShouldRecord = func(s rpcz.Span) bool { + return f > safeRand.Float64() + } + return nil +} + +type errorCodeNode struct { + recorder +} + +func (n *errorCodeNode) UnmarshalYAML(node *yaml.Node) error { + var code trpcpb.TrpcRetCode + if err := node.Decode(&code); err != nil { + return fmt.Errorf("decoding errorCodeNode: %w", err) } - et := s.EndTime() - return et.IsZero() || et.Sub(s.StartTime()) > *c.MinDuration + n.ShouldRecord = func(s rpcz.Span) bool { + err, ok := extractError(s) + if !ok { + return false + } + c := errs.Code(err) + return c == code + } + return nil } -func (c *RecordWhenConfig) checkoutSamplingFraction() bool { - return c.SamplingFraction > rand.Float64() +type errorMessageNode struct { + recorder +} + +func (n *errorMessageNode) UnmarshalYAML(node *yaml.Node) error { + var message string + if err := node.Decode(&message); err != nil { + return fmt.Errorf("decoding errorMessageNode: %w", err) + } + n.ShouldRecord = func(s rpcz.Span) bool { + err, ok := extractError(s) + if !ok { + return false + } + return strings.Contains(message, errs.Msg(err)) + } + return nil +} + +func extractError(span rpcz.Span) (error, bool) { + err, ok := span.Attribute(rpcz.TRPCAttributeError) + if !ok { + return nil, false + } + + e, ok := err.(error) + return e, ok } // ServiceConfig is a configuration for a single service. A server process might have multiple services. diff --git a/config_test.go b/config_test.go index ecd072f..819180a 100644 --- a/config_test.go +++ b/config_test.go @@ -196,27 +196,11 @@ stream_filter: } func TestRecordWhen(t *testing.T) { - t.Run("empty MinDuration", func(t *testing.T) { - var rw RecordWhenConfig - require.Nil(t, yaml.Unmarshal([]byte(`min_duration:`), &rw)) - require.Nil(t, rw.MinDuration) - bts, err := yaml.Marshal(rw) - require.Nil(t, err) - require.Equal(t, "min_duration: null\nsampling_fraction: 0\n", string(bts)) - }) - t.Run("MinDuration", func(t *testing.T) { - var rw RecordWhenConfig - require.Nil(t, yaml.Unmarshal([]byte(`min_duration: 1000ms`), &rw)) - require.Equal(t, *rw.MinDuration, time.Second) - *rw.MinDuration = time.Minute - bts, err := yaml.Marshal(rw) - require.Nil(t, err) - require.Equal(t, "min_duration: 1m0s\nsampling_fraction: 0\n", string(bts)) - }) t.Run("empty record-when", func(t *testing.T) { config := &RPCZConfig{} require.Nil(t, yaml.Unmarshal( - []byte(`fraction: 1.0 + []byte(` +fraction: 1.0 capacity: 10`), config, )) @@ -232,73 +216,223 @@ capacity: 10`), require.True(t, ok) } }) + t.Run("unknown node", func(t *testing.T) { + config := &RecordWhenConfig{} + err := yaml.Unmarshal( + []byte(` +- XOR: + - __min_request_size: 30 + - __min_response_size: 40 +`), + config, + ) + require.Contains(t, errs.Msg(err), "unknown node: XOR") + }) + t.Run("AND node is map type", func(t *testing.T) { + config := &RecordWhenConfig{} + err := yaml.Unmarshal( + []byte(` +- AND: {__rpc_name: "/trpc.app.server.service/method"} +`), + config, + ) + require.Contains(t, errs.Msg(err), "cannot unmarshal !!map into []map[trpc.nodeKind]yaml.Node") + }) + t.Run("OR node is map type", func(t *testing.T) { + config := &RecordWhenConfig{} + err := yaml.Unmarshal( + []byte(` +- OR: {__rpc_name: "/trpc.app.server.service/method"} +`), + config, + ) + require.Contains(t, errs.Msg(err), "cannot unmarshal !!map into []map[trpc.nodeKind]yaml.Node") + }) + } -func TestRPCZ_RecordWhen_SamplingFraction(t *testing.T) { +func TestRecordWhen_NotNode(t *testing.T) { + t.Run("NOT node is empty", func(t *testing.T) { + config := &RPCZConfig{} + err := yaml.Unmarshal([]byte(` +record_when: + - NOT: +`), + config, + ) + require.ErrorContains(t, err, "value is empty") + }) + t.Run("NOT node has two children", func(t *testing.T) { + config := &RecordWhenConfig{} + err := yaml.Unmarshal( + []byte(` + - NOT: {__rpc_name: "/trpc.app.server.service/method", __min_duration: 1000ms} + `), + config, + ) + require.Contains(t, errs.Msg(err), "the valid number of child node can only be 1") + }) + t.Run("NOT has a leaf child", func(t *testing.T) { + config := &RecordWhenConfig{} + require.Nil(t, yaml.Unmarshal( + []byte(` + - NOT: + __rpc_name: "/trpc.app.server.service/method" + `), + config, + )) + }) + t.Run("NOT has a internal child", func(t *testing.T) { + config := &RecordWhenConfig{} + require.Nil(t, yaml.Unmarshal( + []byte(` +- NOT: + OR: + - __min_duration: 1000ms + - __rpc_name: "/trpc.app.server.service/method" +`), + config, + )) + }) + t.Run("NOT node is slice type", func(t *testing.T) { + config := &RecordWhenConfig{} + err := yaml.Unmarshal( + []byte(` +- NOT: + - __rpc_name: "/trpc.app.server.service/method" +`), + config, + ) + require.Contains(t, errs.Msg(err), "cannot unmarshal !!seq into map[trpc.nodeKind]yaml.Node") + }) +} +func TestRecordWhen_ANDNode(t *testing.T) { + t.Run("AND node is empty", func(t *testing.T) { + config := &RPCZConfig{} + err := yaml.Unmarshal([]byte(` +record_when: + - AND: +`), + config, + ) + require.ErrorContains(t, err, "value is empty") + }) + t.Run("AND node has two children", func(t *testing.T) { + config := &RecordWhenConfig{} + require.Nil(t, yaml.Unmarshal( + []byte(` +- AND: + - __rpc_name: "/trpc.app.server.service/method" + - __min_duration: 1000ms +`), + config, + )) + }) + t.Run("AND has a leaf child", func(t *testing.T) { + config := &RecordWhenConfig{} + require.Nil(t, yaml.Unmarshal( + []byte(` +- AND: + - __rpc_name: "/trpc.app.server.service/method" +`), + config, + )) + }) + t.Run("AND has a internal child", func(t *testing.T) { + config := &RecordWhenConfig{} + require.Nil(t, yaml.Unmarshal( + []byte(` +- AND: + - OR: + - __min_duration: 1000ms + - __rpc_name: "/trpc.app.server.service/method" +`), + config, + )) + }) + t.Run("AND node is map type", func(t *testing.T) { + config := &RecordWhenConfig{} + err := yaml.Unmarshal( + []byte(` +- AND: + __rpc_name: "/trpc.app.server.service/method" +`), + config, + ) + require.Contains(t, errs.Msg(err), "cannot unmarshal !!map into []map[trpc.nodeKind]yaml.Node") + }) +} +func TestRPCZ_RecordWhen_ErrorCode(t *testing.T) { config := &RPCZConfig{} - require.Nil(t, yaml.Unmarshal( - []byte( - `fraction: 1.0 + mustYamlUnmarshal(t, []byte(` +fraction: 1.0 capacity: 10 record_when: - error_codes: [0] - min_duration: 10ms - sample_rate: 0 -`), config)) + - __sampling_fraction: 1 + - OR: + - __error_code: 1 # RetServerDecodeFail = 1 + - __error_code: 2 # RetServerEncodeFail = 2 + - __error_message: "service codec" + - __error_message: "client codec" + - NOT: + OR: + - __error_code: 1 + - __error_message: "service codec" +`), config) + r := rpcz.NewRPCZ(config.generate()) - var unexpectedIDs []rpcz.SpanID + var ( + expectedIDs []rpcz.SpanID + unexpectedIDs []rpcz.SpanID + ) { s, ender := r.NewChild("") - s.SetAttribute(rpcz.TRPCAttributeError, errs.NewFrameError(errs.RetOK, "")) - // mimic some time-consuming operation. - time.Sleep(15 * time.Millisecond) + s.SetAttribute(rpcz.TRPCAttributeError, errs.NewFrameError(errs.RetServerDecodeFail, "service codec")) unexpectedIDs = append(unexpectedIDs, s.ID()) ender.End() } { s, ender := r.NewChild("") - s.SetAttribute(rpcz.TRPCAttributeError, errs.NewFrameError(errs.RetOK, "")) - // mimic some time-consuming operation. - time.Sleep(20 * time.Millisecond) + s.SetAttribute(rpcz.TRPCAttributeError, errs.NewFrameError(errs.RetServerEncodeFail, "service codec")) unexpectedIDs = append(unexpectedIDs, s.ID()) ender.End() } - { - s, _ := r.NewChild("") - s.SetAttribute(rpcz.TRPCAttributeError, errs.NewFrameError(errs.RetOK, "")) - unexpectedIDs = append(unexpectedIDs, s.ID()) - } { s, ender := r.NewChild("") - s.SetAttribute(rpcz.TRPCAttributeError, errs.NewFrameError(errs.RetOK, "")) - // mimic some time-consuming operation. - time.Sleep(1 * time.Millisecond) + s.SetAttribute(rpcz.TRPCAttributeError, errs.NewFrameError(errs.RetServerDecodeFail, "client codec")) unexpectedIDs = append(unexpectedIDs, s.ID()) ender.End() } { s, ender := r.NewChild("") - s.SetAttribute(rpcz.TRPCAttributeError, errs.NewFrameError(errs.RetOK, "")) - // mimic some time-consuming operation. - time.Sleep(2 * time.Millisecond) - unexpectedIDs = append(unexpectedIDs, s.ID()) + s.SetAttribute(rpcz.TRPCAttributeError, errs.NewFrameError(errs.RetServerEncodeFail, "client codec")) + expectedIDs = append(expectedIDs, s.ID()) ender.End() } - for _, id := range unexpectedIDs { + for i, id := range expectedIDs { _, ok := r.Query(id) - require.False(t, ok) + require.True(t, ok, i) + } + for i, id := range unexpectedIDs { + _, ok := r.Query(id) + require.False(t, ok, i) } } -func TestRPCZ_RecordWhen_ErrorCode(t *testing.T) { +func TestRPC_RecordWhen_CustomAttribute(t *testing.T) { config := &RPCZConfig{} - require.Nil(t, yaml.Unmarshal( - []byte( - `fraction: 1.0 + mustYamlUnmarshal(t, []byte(` +fraction: 1.0 capacity: 10 record_when: - error_codes: [1, 2] # RetServerDecodeFail = 1, RetServerEncodeFail = 2 - min_duration: 1s - sampling_fraction: 1 -`), config)) + - __sampling_fraction: 1 + - OR: + - __has_attribute: (race, elf) + - __has_attribute: (class, wizard) + - NOT: + OR: + - __has_attribute: (race, dwarf) + - __has_attribute: (class, warlock) +`), config) + r := rpcz.NewRPCZ(config.generate()) var ( expectedIDs []rpcz.SpanID @@ -306,61 +440,88 @@ record_when: ) { s, ender := r.NewChild("") - s.SetAttribute(rpcz.TRPCAttributeError, errs.NewFrameError(errs.RetServerDecodeFail, "")) + s.SetAttribute("race", "elf") + s.SetAttribute("class", "wizard") expectedIDs = append(expectedIDs, s.ID()) ender.End() } { s, ender := r.NewChild("") - s.SetAttribute(rpcz.TRPCAttributeError, errs.NewFrameError(errs.RetServerEncodeFail, "")) - expectedIDs = append(expectedIDs, s.ID()) - ender.End() - } - { - s, ender := r.NewChild("") - s.SetAttribute(rpcz.TRPCAttributeError, errs.NewFrameError(errs.RetUnknown, "")) + s.SetAttribute("race", "elf") + s.SetAttribute("class", "wizard, warlock") unexpectedIDs = append(unexpectedIDs, s.ID()) ender.End() } { s, ender := r.NewChild("") + s.SetAttribute("race", "elf, dwarf") + s.SetAttribute("class", "wizard") unexpectedIDs = append(unexpectedIDs, s.ID()) ender.End() } { s, ender := r.NewChild("") - s.SetAttribute(rpcz.TRPCAttributeError, nil) + s.SetAttribute("race", "elf, dwarf") + s.SetAttribute("class", "wizard, warlock") unexpectedIDs = append(unexpectedIDs, s.ID()) ender.End() } - for _, id := range expectedIDs { + for i, id := range expectedIDs { _, ok := r.Query(id) - require.True(t, ok) + require.True(t, ok, i) } - for _, id := range unexpectedIDs { + for i, id := range unexpectedIDs { _, ok := r.Query(id) - require.False(t, ok) + require.False(t, ok, i) } } - +func TestRPC_RecordWhen_InvalidCustomAttribute(t *testing.T) { + t.Run("miss left parenthesis", func(t *testing.T) { + config := &RPCZConfig{} + require.ErrorContains(t, yaml.Unmarshal([]byte(` +record_when: + - __has_attribute: race, elf) +`), config), "invalid attribute form") + }) + t.Run("miss right parenthesis", func(t *testing.T) { + config := &RPCZConfig{} + require.ErrorContains(t, yaml.Unmarshal([]byte(` +record_when: + - __has_attribute: (race, elf +`), config), "invalid attribute form") + }) + t.Run("middle delimiter space", func(t *testing.T) { + config := &RPCZConfig{} + require.ErrorContains(t, yaml.Unmarshal([]byte(` +record_when: + - __has_attribute: (race,elf) +`), config), "invalid attribute form") + }) + t.Run("middle delimiter comma", func(t *testing.T) { + config := &RPCZConfig{} + require.ErrorContains(t, yaml.Unmarshal([]byte(` +record_when: + - __has_attribute: (race elf) +`), config), "invalid attribute form") + }) +} func TestRPCZ_RecordWhen_MinDuration(t *testing.T) { t.Run("not empty", func(t *testing.T) { config := &RPCZConfig{} - require.Nil(t, yaml.Unmarshal( - []byte( - `fraction: 1.0 + mustYamlUnmarshal(t, []byte(` +fraction: 1.0 capacity: 10 record_when: - error_codes: [0,] - min_duration: 100ms - sampling_fraction: 1 -`), config)) + - __error_code: 999 # RetUnknown = 0 + - __min_duration: 100ms + - __sampling_fraction: 1 +`), config) + r := rpcz.NewRPCZ(config.generate()) var ( expectedIDs []rpcz.SpanID unexpectedIDs []rpcz.SpanID ) - { s, ender := r.NewChild("") s.SetAttribute(rpcz.TRPCAttributeError, errs.NewFrameError(errs.RetUnknown, "")) @@ -410,14 +571,14 @@ record_when: }) t.Run("empty", func(t *testing.T) { config := &RPCZConfig{} - require.Nil(t, yaml.Unmarshal( - []byte( - `fraction: 1.0 + mustYamlUnmarshal(t, []byte(` +fraction: 1.0 capacity: 10 record_when: - error_codes: [0,] - sampling_fraction: 1 -`), config)) + - __error_code: 0 # RetOK = 0 + - __sampling_fraction: 1 +`), config) + r := rpcz.NewRPCZ(config.generate()) var ( unexpectedID rpcz.SpanID @@ -446,17 +607,139 @@ record_when: require.True(t, ok) }) } +func TestRPCZ_RecordWhen_MinRequestSize(t *testing.T) { + config := &RPCZConfig{} + mustYamlUnmarshal(t, []byte(` +fraction: 1.0 +capacity: 10 +record_when: + - __sampling_fraction: 1 + - __min_request_size: 30 +`), config) + + r := rpcz.NewRPCZ(config.generate()) + t.Run("unset request size", func(t *testing.T) { + s, ender := r.NewChild("") + unexpectedID := s.ID() + ender.End() + _, ok := r.Query(unexpectedID) + require.False(t, ok) + }) + t.Run("request size less than min_request_size", func(t *testing.T) { + s, ender := r.NewChild("") + s.SetAttribute(rpcz.TRPCAttributeRequestSize, 29) + unexpectedID := s.ID() + ender.End() + _, ok := r.Query(unexpectedID) + require.False(t, ok) + }) + t.Run("request size equals to min_request_size", func(t *testing.T) { + s, ender := r.NewChild("") + s.SetAttribute(rpcz.TRPCAttributeRequestSize, 30) + expectedID := s.ID() + ender.End() + _, ok := r.Query(expectedID) + require.True(t, ok) + }) + t.Run("request size greater than min_request_size", func(t *testing.T) { + s, ender := r.NewChild("") + s.SetAttribute(rpcz.TRPCAttributeRequestSize, 31) + expectedID := s.ID() + ender.End() + _, ok := r.Query(expectedID) + require.True(t, ok) + }) +} +func TestRPCZ_RecordWhen_MinResponseSize(t *testing.T) { + config := &RPCZConfig{} + mustYamlUnmarshal(t, []byte(` +fraction: 1.0 +capacity: 10 +record_when: + - __sampling_fraction: 1 + - __min_response_size: 40 +`), config) + + r := rpcz.NewRPCZ(config.generate()) + t.Run("unset response size", func(t *testing.T) { + s, ender := r.NewChild("") + unexpectedID := s.ID() + ender.End() + _, ok := r.Query(unexpectedID) + require.False(t, ok) + }) + t.Run("request size less than min_response_size", func(t *testing.T) { + s, ender := r.NewChild("") + s.SetAttribute(rpcz.TRPCAttributeResponseSize, 39) + unexpectedID := s.ID() + ender.End() + _, ok := r.Query(unexpectedID) + require.False(t, ok) + }) + t.Run("request size equals to min_response_size", func(t *testing.T) { + s, ender := r.NewChild("") + s.SetAttribute(rpcz.TRPCAttributeResponseSize, 40) + expectedID := s.ID() + ender.End() + _, ok := r.Query(expectedID) + require.True(t, ok) + }) + t.Run("request size greater than min_response_size", func(t *testing.T) { + s, ender := r.NewChild("") + s.SetAttribute(rpcz.TRPCAttributeResponseSize, 41) + expectedID := s.ID() + ender.End() + _, ok := r.Query(expectedID) + require.True(t, ok) + }) +} +func TestRPCZ_RecordWhen_RPCName(t *testing.T) { + config := &RPCZConfig{} + mustYamlUnmarshal(t, []byte(` +fraction: 1.0 +capacity: 10 +record_when: + - __sampling_fraction: 1 + - __rpc_name: trpc.app.server.service +`), config) + + r := rpcz.NewRPCZ(config.generate()) + t.Run("unset RPCName", func(t *testing.T) { + s, ender := r.NewChild("") + unexpectedID := s.ID() + ender.End() + _, ok := r.Query(unexpectedID) + require.False(t, ok) + }) + t.Run("RPCName does not contain rpc_name", func(t *testing.T) { + s, ender := r.NewChild("") + s.SetAttribute(rpcz.TRPCAttributeRPCName, "/xxx.app.server.service/method") + unexpectedID := s.ID() + ender.End() + _, ok := r.Query(unexpectedID) + require.False(t, ok) + }) + t.Run("RPCName contains rpc_name", func(t *testing.T) { + s, ender := r.NewChild("") + s.SetAttribute(rpcz.TRPCAttributeRPCName, "/trpc.app.server.service/method") + expectedID := s.ID() + ender.End() + _, ok := r.Query(expectedID) + require.True(t, ok) + }) +} func TestRPCZ_RecordWhen_ErrorCodeAndMinDuration(t *testing.T) { config := &RPCZConfig{} - require.Nil(t, yaml.Unmarshal( - []byte( - `fraction: 1.0 + mustYamlUnmarshal(t, []byte(` +fraction: 1.0 capacity: 10 record_when: - error_codes: [0] # RetOK: 0 - min_duration: 100ms - sampling_fraction: 1 -`), config)) + - AND: + - __error_code: 0 # RetOK = 0 + - __min_duration: 100ms + - __sampling_fraction: 1 +`), config) + r := rpcz.NewRPCZ(config.generate()) var ( expectedIDs []rpcz.SpanID @@ -476,7 +759,7 @@ record_when: s.SetAttribute(rpcz.TRPCAttributeError, errs.NewFrameError(errs.RetUnknown, "")) // mimic some time-consuming operation. time.Sleep(2 * time.Second) - expectedIDs = append(expectedIDs, s.ID()) + unexpectedIDs = append(unexpectedIDs, s.ID()) ender.End() } { @@ -484,7 +767,7 @@ record_when: s.SetAttribute(rpcz.TRPCAttributeError, errs.NewFrameError(errs.RetOK, "")) // mimic some time-consuming operation. time.Sleep(1 * time.Millisecond) - expectedIDs = append(expectedIDs, s.ID()) + unexpectedIDs = append(unexpectedIDs, s.ID()) ender.End() } { @@ -501,13 +784,20 @@ record_when: s.SetAttribute(rpcz.TRPCAttributeError, errs.NewFrameError(errs.RetOK, "")) unexpectedIDs = append(unexpectedIDs, s.ID()) } - for _, id := range expectedIDs { + for i, id := range expectedIDs { _, ok := r.Query(id) - require.True(t, ok) + require.True(t, ok, i) } - for _, id := range unexpectedIDs { + for i, id := range unexpectedIDs { _, ok := r.Query(id) - require.False(t, ok) + require.False(t, ok, i) + } +} + +func mustYamlUnmarshal(t *testing.T, in []byte, out interface{}) { + t.Helper() + if err := yaml.Unmarshal(in, out); err != nil { + t.Fatal(err) } } func TestRepairServiceIdleTime(t *testing.T) { diff --git a/rpcz/README.md b/rpcz/README.md new file mode 100644 index 0000000..4486820 --- /dev/null +++ b/rpcz/README.md @@ -0,0 +1,646 @@ +# RPCZ [中文](README_CN.md) + +RPCZ is a tool for monitoring RPC, logging various events that occur in a single rpc, such as serialization/deserialization, compression/decompression and execution of interceptors. +It allows users to configure the events that need to be logged, and users can view the logged events through the admin tool, which can help them locate problems quickly and accurately. +In addition, since RPCZ records the duration of various events in RPC and the size of packets sent and received, it can help users analyze timeout events and optimize the performance of the service. + +## Explanation of terms + +### Event + +Event (Event) [1, 2, 3] is used to describe that something (`Event.Name`) happened at a particular moment (`Event.Time`). + +```go +type Event struct { + Name string + Time time.Time +} +``` +In a normal RPC call, a series of events will occur, for example, the Client side of the request is sent in chronological order, and generally the following series of events will occur. + +1. start running the pre-interceptor +2. finish running the pre-interceptor +3. start serialization +4. end serialization +5. start compression +6. end compression +7. start encoding protocol header fields +8. end encoding protocol header fields +9. start sending binaries to the network +10. end sending binary to network +11. start receiving binary files from the network +12. ends receiving binary files from the network +13. start decoding protocol header fields +14. end decoding protocol header fields +15. start decompression +16. end decompression +17. start deserialization +18. end deserialization +19. start running post-interceptor +20. finish running the post-interceptor + +On the server side, where the request is processed, the following sequence of events typically occurs in chronological order. + +1. start decoding the protocol header fields +2. finish decoding the protocol header fields +3. start decompression +4. end decompression +5. start deserialization +6. end deserialization +7. start running the pre-interceptor +8. finish running the pre-interceptor +9. start running user-defined handler +10. end running user-defined handler +11. start running post-interceptor +12. end running post-interceptor +13. start serialization +14. end serialization +15. start compression +16. end compression +17. start encoding protocol header fields +18. end decoding protocol header fields +19. start sending binary files to the network +20. end sending binary to network + +### Span + +Span[4, 5] is used to describe a single operation for a certain time interval (with a start time and an end time), such as a client sending a remote request and a server processing the request or a function call. +Depending on the size of the divided time interval, a large Span can contain multiple smaller Spans, just as multiple other functions may be called within a single function, creating a tree structured hierarchy. +Thus, a Span may contain many sub-Spans in addition to the name, the internal identifier span-id [6], the start time, the end time, and the set of events (Events) that occurred during this time. + +There are two types of Span in rpcz: + +- client-Span: describes the actions of the client during the interval from the start of the request to the receipt of the reply (covering the series of events on the client side described in the previous section Event). +- server-Span: describes the operation of the server from the time it starts receiving requests to the time it finishes sending replies (covers the series of events on the server side described in the previous section Event). + When server-Span runs a user-defined processing function, it may create a client to call a downstream service, so server-Span will contain several sub-client-Span. + +``` +server-Span + client-Span-1 + client-Span-2 + ...... + client-Span-n +``` + +Span is stored in context, rpcz will automatically call ContextWithSpan to store Span in context, you need to ensure that the Span in context will not be lost during the function call. + +## Life cycle of Span + +When examining the lifecycle of Span objects, most of the operations on Span in rpcz need to consider concurrency safety. +Pool and pre-allocated circular array are used to reduce the performance impact of memory allocation for Span. + +### Span construction + +rpcz initializes a global GlobalRPCZ at startup, which is used to generate and store Span. +There are only two possible locations where a Span can be constructed within the framework. +The first location is when the handle function of the transport layer on the server side first starts processing incoming requests. +The second location is when the Invoke function is called in the client-side stub code to start the rpc request. +Although the two locations create different types of Span, the code logic is similar, both will call `rpcz.NewSpanContext`, which actually performs following three operations successively: + +1. Call the SpanFromContext function to get the span from the context. +2. Call span.NewChild method to create a new child span. +3. Call the ContextWithSpan function to set the newly created child span into context. + +### Span passing in context + +The created span is stored in context until it is committed, and is passed along the link of the rpc call. +Use `rpcz.AddEvent` to add a new event to the span in the current context on the call link. + +### Span commits + +After the request is processed by the handle function at the transport layer on the server side, `ender.End()` is called to commit the Span to GlobalRPCZ. +After that, the Span is still stored in the context, but semantically, the Span is not allowed to be manipulated again after the End function has been called, and its behavior is undefined. + +### Accessing Span in admin + +The admin module calls `rpcz.Query` and `rpcz.BatchQuery` to read the Span from GlobalRPCZ. +One thing to note is that the Span obtained by admin is a read-only Span (ReadOnlySpan), which is exported from a writable Span for the sake of concurrent access security. + +### Delete redundant Span + +When too many Spans are stored in the hash table, it is necessary to remove the redundant Spans according to some elimination rules. +The current implementation removes the oldest Span when the number of Spans in the GlobalRPCZ exceeds the maximum capacity limit. + +## Origin of RPCZ name + +Regarding the origin of the name "RPCZ", the suffix -z has two general meanings in English [7]: it is used in nouns to change the singular to plural, e.g. Boy**z** are always trouble; and it is used in verbs to change the verb form He love**z** me. +In summary, adding -z to a word has the same effect as adding -s. +So "RPCZ" refers to various types of RPCs, and this does hold true from a distributed global call-link perspective, where there is a tree-like parent-child relationship of various RPC calls that combine to form the "RPCZ". + +The term "RPCZ" first came from Google's internal RPC framework Stubby, based on which Google implemented a similar function in the open source grpc channelz [8], which not only includes information about various channels, but also covers trace information. +After that, Baidu's open source brpc implemented a non-distributed trace tool based on the distributed trace system Dapper paper [9] published by google, imitating channelz named brpc-rpcz [10]. +The next step is that users need a tool similar to brpc-rpcz for debugging and optimization in tRPC, so tRPC-Cpp first supports similar functionality [11, 12], still keeping the name RPCZ. + +The last thing is to support similar functionality to "RPCZ" in tRPC-Go. During the implementation process, it was found that with the development of distributed tracing systems, open source systems of opentracing [13] and opentelemetry [14] emerged in the community, and the company also made tianji pavilion [15] internally. +tRPC-Go-RPCZ partially borrows the go language implementation of opentelemetry-trace for span and event design, and can be considered as a trace system inside the tRPC-Go framework. +Strictly speaking, tRPC-Go-RPCZ is non-distributed, because there is no communication between the different services at the protocol level. +Now it seems that brpc, tRPC-Cpp and the tRPC-Go implementation of rpcz, named spanz, might be more in line with the original meaning of the suffix "-z". + + +## How to configure rpcz + +The configuration of rpcz includes basic configuration, advanced configuration and code configuration, see `config_test.go` for more configuration examples. + +### Supporting for different tRPC-GO versions + +- v0.15.0: supporting tRPC streaming and tnet. +- v0.14.0: supporting http protocol. +- v0.11.0: supporting tRPC unary call. +- Versions prior to v0.11.0: Not supported. + +### Basic configuration + +Configure admin on the server side, and configure rpcz inside admin: + +```yaml +server: + admin: + ip: 127.0.0.1 + port: 9028 + rpcz: + fraction: 1.0 + capacity: 10000 +``` + +- `fraction`: the sampling rate, the range is `[0.0, 1.0]`, the default value is 0.0 which means no sampling, you need to configure it manually. +- `capacity`: the storage capacity of rpcz, the default value is 10000, which means the maximum number of spans can be stored. + +### Advanced configuration + +Advanced configuration allows you to filter the span of interest. Before using advanced configuration, you need to understand the sampling mechanism of rpcz. + +#### Sampling mechanism + +rpcz uses the sampling mechanism to control performance overhead and filter spans that are not of interest to you. +Sampling may occur at different stages of a Span's lifecycle, with the earliest sampling occurring before a Span is created and the latest sampling occurring before a Span is committed. + +##### Sampling results table + +Only Spans that are sampled before both creation and commit will eventually be collected in GlobalRPCZ for you to query through the admin interface. + +| Sampled before Span creation? | Sampled before Span commit? | Will the Span eventually be collected? | +|:------------------------------|:---------------------------:|:--------------------------------------:| +| true | true | true | +| true | false | false | +| false | true | false | +| false | false | false | + +##### Sampling before Span creation + +Span is created only when it is sampled, otherwise it is not created, which avoids a series of subsequent operations on Span and thus reduces the performance overhead to a large extent. +The sampling policy with fixed sampling rate [16, 17] has only one configurable floating-point parameter `rpcz.fraction`, for example, `rpcz.fraction` is 0.0001, which means one request is sampled for every 10000 (1/0.0001) requests. +When `rpcz.fraction` is less than 0, it is fetched up by 0. When `rpcz.fraction` is greater than 1, it is fetched down by 1. + +##### Sampling before Span commit + +Spans that have been created will record all kinds of information in the rpc, but you may only care about spans that contain certain information, such as spans with rpc errors, spans that are highly time consuming, and spans that contain certain property information. +In this case, it is necessary to sample only the Span that you needs before the Span is finally committed. +rpcz provides a flexible external interface that allows you to set the `rpcz.record_when` field in the configuration file to customize the sampling logic before the service is started. +"record_when" provides three common boolean operations: "AND", "OR", and "NOT", +as well as seven basic operations that return boolean values: "__min_request_size", "__min_response_size", "__error_code", "__error_message", "__rpc_name", "__min_duration", and "__has_attribute". It should be noted that "record_when" itself is an "AND" operation. +By combining these operations in any way, you can flexibly filter out the Spans of interest. + +```yaml +server: + admin: + rpcz: + record_when: + error_codes: [0,] + min_duration: 1000ms # ms or s + sampling_fraction: 1 # [0.0, 1.0] +``` + +- `error_codes`: Only sample spans containing any of these error codes, e.g. 0(RetOk), 21(RetServerTimeout). +- `min_duration`: Only sample spans that last longer than `min_duration`, which can be used for time-consuming analysis. +- `sampling_fraction`: The sampling rate, in the range of `[0, 1]`. + +#### Example of configuration + +##### Submitting the span that contains error code 1 (RetServerDecodeFail), the error message contains the string "unknown", and the duration is greater than 1 second. + +```yaml +server: + admin: + ip: 127.0.0.1 + port: 9028 + rpcz: + fraction: 1.0 + capacity: 10000 + record_when: + - __error_code: 1 + - __min_duration: 1000ms + - __sampling_fraction: 1 +``` + + +Note: "record_when" itself is an "AND" node, and it can also be written in the following ways: + +style1: + +```yaml +server: + admin: + ip: 127.0.0.1 + port: 9028 + rpcz: + fraction: 1.0 + capacity: 10000 + record_when: + - AND: + - __error_code: 1 + - __min_duration: 1000ms + - __sampling_fraction: 1 +``` + +style2: + +```yaml +server: + admin: + ip: 127.0.0.1 + port: 9028 + rpcz: + fraction: 1.0 + capacity: 10000 + record_when: + - AND: + - __error_code: 1 + - AND: + - __min_duration: 1000ms + - __sampling_fraction: 1 +``` + +style3: + +```yaml +server: + admin: + ip: 127.0.0.1 + port: 9028 + rpcz: + fraction: 1.0 + capacity: 10000 + record_when: + - AND: + - __error_code: 1 + - AND: + - __min_duration: 1000ms + - AND: + - __sampling_fraction: 1 +``` + +##### Submitting the span that contains error code 1 (RetServerDecodeFail) or 21 (RetServerTimeout), or the duration is greater than 2 seconds with a probability of 1/2. + +```yaml +server: + admin: + ip: 127.0.0.1 + port: 9028 + rpcz: + fraction: 1.0 + capacity: 10000 + record_when: + - OR: + - error_code: 1 + - error_code: 21 + - min_duration: 2s + - __sampling_fraction: 0.5 +``` + +##### Submitting the span that has a duration greater than 10 seconds, contains the string "TDXA/Transfer" in the rpc name, and the error message does not contain the string "pseudo". + +```yaml +server: + admin: + ip: 127.0.0.1 + port: 9028 + rpcz: + fraction: 1.0 + capacity: 10000 + record_when: + - min_duration: 2s + - __rpc_name: "TDXA/Transfer" + - NOT: + __error_message: "pseudo" + - __sampling_fraction: 1 +``` + +### Code configuration + +After reading the configuration file and before the service starts, rpcz can be configured with `rpcz.GlobalRPCZ`, where the commit sampling logic needs to implement the `ShouldRecord` function. + +```go +// ShouldRecord determines if the Span should be recorded. +type ShouldRecord = func(Span) bool +``` + +##### commits only for Span containing the "SpecialAttribute" attribute + +```go +const attributeName = "SpecialAttribute" +rpcz.GlobalRPCZ = rpcz.NewRPCZ(&rpcz.Config{ +Fraction: 1.0, +Capacity: 1000, +ShouldRecord: func(s rpcz.Span) bool { +_, ok = s.Attribute(attributeName) +return ok +}, +}) +``` + +### Query the summary information of the most recently submitted multiple span + +To query the summary information of the last num span, you can access the following url: + +```html +http://ip:port/cmds/rpcz/spans?num=xxx +``` + +For example, executing `curl http://ip:port/cmds/rpcz/spans?num=2` will return the summary information for 2 spans as follows. + +```html +1: +span: (client, 65744150616107367) +time: (Dec 1 20:57:43.946627, Dec 1 20:57:43.946947) +duration: (0, 319.792µs, 0) +attributes: (RPCName, /trpc.testing.end2end.TestTRPC/EmptyCall), (Error, ) + 2: + span: (server, 1844470940819923952) + time: (Dec 1 20:57:43.946677, Dec 1 20:57:43.946912) + duration: (0, 235.5µs, 0) + attributes: (RequestSize, 125),(ResponseSize, 18),(RPCName, /trpc.testing.end2end.TestTRPC/EmptyCall),(Error, success) +``` + +The summary information for each span matches the following template. + +```html +span: (name, id) +time: (startTime, endTime) +duration: (preDur, middleDur, postDur) +attributes: (name1, value1) (name2, value2) +``` + +The meaning of each of these fields is explained as follows. + +- name: the name of the span +- id: the unique identifier of the span, which can be used to query the details of a specific span +- startTime: the creation time of the span +- endTime: the commit time of the span, when the span is not successfully committed, the value of this field is "unknown" +- duration: contains a time period to describe the duration of currentSpan and parentSpan + - preDur: currentSpan.startTime - parentSpan.startTime + - middleDur: currentSpan.endTime - currentSpan.startTime, when currentSpan.endTime is "unknown", the value of middleDur is also "unknown". + - postDur: parentSpan.endTime - currentSpan.endTime, when parentSpan.endTime or currentSpan.endTime is "unknown", the value of postDur is also "unknown" +- attributes: attributes of the span, each attribute consists of (attribute name, attribute value), usually the following three attributes are displayed + - RequestSize: request packet size (byte) + - ResponseSize: response packet size (byte) + - RPCName: the service name of the counterpart + interface name (/trpc.app.server.service/method) + - Error: error message, according to the framework return code to determine whether the request is successful, success or nil means success + +If you do not specify the number of queries, the following query will default to return a summary of the [^1] 10 most recently submitted successful spans. + +```html +http://ip:port/cmds/rpcz/spans +``` + +[^1]: **The most recently committed span is not sorted strictly by time, there may be multiple goroutines submitting spans at the same time, and they are sorted by the most recently committed span.** + +### Query the details of a span + +To query the details of a span containing an id, you can access the following url. + +```html +http://ip:port/cmds/rpcz/spans/{id} +``` + +For example, execute `curl http://ip:port/cmds/rpcz/spans/6673650005084645130` to query the details of a span with the span id 6673650005084645130. + +``` +span: (server, 6673650005084645130) + time: (Dec 2 10:43:55.295935, Dec 2 10:43:55.399262) + duration: (0, 103.326ms, 0) + attributes: (RequestSize, 125),(ResponseSize, 18),(RPCName, /trpc.testing.end2end.TestTRPC/EmptyCall),(Error, success) + span: (DecodeProtocolHead, 6673650005084645130) + time: (Dec 2 10:43:55.295940, Dec 2 10:43:55.295952) + duration: (4.375µs, 12.375µs, 103.30925ms) + span: (Decompress, 6673650005084645130) + time: (Dec 2 10:43:55.295981, Dec 2 10:43:55.295982) + duration: (45.875µs, 791ns, 103.279334ms) + span: (Unmarshal, 6673650005084645130) + time: (Dec 2 10:43:55.295982, Dec 2 10:43:55.295983) + duration: (47.041µs, 334ns, 103.278625ms) + span: (filter1, 6673650005084645130) + time: (Dec 2 10:43:55.296161, Dec 2 10:43:55.399249) + duration: (225.708µs, 103.088ms, 12.292µs) + event: (your annotation at pre-filter, Dec 2 10:43:55.296163) + span: (filter2, 6673650005084645130) + time: (Dec 2 10:43:55.296164, Dec 2 10:43:55.399249) + duration: (2.75µs, 103.085ms, 250ns) + event: (your annotation at pre-filter, Dec 2 10:43:55.296165) + span: (server.WithFilter, 6673650005084645130) + time: (Dec 2 10:43:55.296165, Dec 2 10:43:55.399249) + duration: (1.208µs, 103.083625ms, 167ns) + event: (your annotation at pre-filter, Dec 2 10:43:55.296165) + span: (, 6673650005084645130) + time: (Dec 2 10:43:55.296166, Dec 2 10:43:55.399249) + duration: (792ns, 103.082583ms, 250ns) + span: (HandleFunc, 6673650005084645130) + time: (Dec 2 10:43:55.296177, Dec 2 10:43:55.399249) + duration: (11.583µs, 103.070917ms, 83ns) + event: (handling EmptyCallF, Dec 2 10:43:55.296179) + span: (client, 6673650005084645130) + time: (Dec 2 10:43:55.296187, Dec 2 10:43:55.297871) + duration: (9.125µs, 1.684625ms, 101.377167ms) + attributes: (RPCName, /trpc.testing.end2end.TestTRPC/UnaryCall),(Error, ) + span: (filter1, 6673650005084645130) + time: (Dec 2 10:43:55.296192, Dec 2 10:43:55.297870) + duration: (5.292µs, 1.678542ms, 791ns) + span: (client.WithFilter, 6673650005084645130) + time: (Dec 2 10:43:55.296192, Dec 2 10:43:55.297870) + duration: (542ns, 1.677875ms, 125ns) + span: (selector, 6673650005084645130) + time: (Dec 2 10:43:55.296193, Dec 2 10:43:55.297870) + duration: (541ns, 1.677209ms, 125ns) + span: (CallFunc, 6673650005084645130) + time: (Dec 2 10:43:55.296200, Dec 2 10:43:55.297869) + duration: (7.459µs, 1.668541ms, 1.209µs) + attributes: (RequestSize, 405),(ResponseSize, 338) + span: (Marshal, 6673650005084645130) + time: (Dec 2 10:43:55.296202, Dec 2 10:43:55.296341) + duration: (1.375µs, 138.875µs, 1.528291ms) + span: (Compress, 6673650005084645130) + time: (Dec 2 10:43:55.296341, Dec 2 10:43:55.296341) + duration: (140.708µs, 333ns, 1.5275ms) + span: (EncodeProtocolHead, 6673650005084645130) + time: (Dec 2 10:43:55.296342, Dec 2 10:43:55.296345) + duration: (141.458µs, 3.333µs, 1.52375ms) + span: (SendMessage, 6673650005084645130) + time: (Dec 2 10:43:55.297540, Dec 2 10:43:55.297555) + duration: (1.339375ms, 15.708µs, 313.458µs) + span: (ReceiveMessage, 6673650005084645130) + time: (Dec 2 10:43:55.297556, Dec 2 10:43:55.297860) + duration: (1.355666ms, 303.75µs, 9.125µs) + span: (DecodeProtocolHead, 6673650005084645130) + time: (Dec 2 10:43:55.297862, Dec 2 10:43:55.297865) + duration: (1.661916ms, 2.5µs, 4.125µs) + span: (Decompress, 6673650005084645130) + time: (Dec 2 10:43:55.297866, Dec 2 10:43:55.297866) + duration: (1.665583ms, 167ns, 2.791µs) + span: (Unmarshal, 6673650005084645130) + time: (Dec 2 10:43:55.297866, Dec 2 10:43:55.297868) + duration: (1.666041ms, 1.709µs, 791ns) + span: (sleep, 6673650005084645130) + time: (Dec 2 10:43:55.297876, unknown) + duration: (1.698709ms, unknown, unknown) + event: (awake, Dec 2 10:43:55.398954) + span: (client, 6673650005084645130) + time: (Dec 2 10:43:55.398979, Dec 2 10:43:55.399244) + duration: (102.80125ms, 265.417µs, 4.25µs) + attributes: (RPCName, /trpc.testing.end2end.TestTRPC/UnaryCall),(Error, ) + span: (filter2, 6673650005084645130) + time: (Dec 2 10:43:55.398986, Dec 2 10:43:55.399244) + duration: (6.834µs, 258.25µs, 333ns) + span: (client.WithFilter, 6673650005084645130) + time: (Dec 2 10:43:55.398987, Dec 2 10:43:55.399244) + duration: (1.708µs, 256.458µs, 84ns) + span: (selector, 6673650005084645130) + time: (Dec 2 10:43:55.398988, Dec 2 10:43:55.399244) + duration: (417ns, 255.916µs, 125ns) + span: (CallFunc, 6673650005084645130) + time: (Dec 2 10:43:55.399005, Dec 2 10:43:55.399243) + duration: (16.833µs, 238.375µs, 708ns) + attributes: (RequestSize, 405),(ResponseSize, 338) + span: (Marshal, 6673650005084645130) + time: (Dec 2 10:43:55.399006, Dec 2 10:43:55.399017) + duration: (1.792µs, 10.458µs, 226.125µs) + span: (Compress, 6673650005084645130) + time: (Dec 2 10:43:55.399017, Dec 2 10:43:55.399017) + duration: (12.583µs, 167ns, 225.625µs) + span: (EncodeProtocolHead, 6673650005084645130) + time: (Dec 2 10:43:55.399018, Dec 2 10:43:55.399023) + duration: (12.958µs, 4.917µs, 220.5µs) + span: (SendMessage, 6673650005084645130) + time: (Dec 2 10:43:55.399041, Dec 2 10:43:55.399070) + duration: (36.375µs, 29.083µs, 172.917µs) + span: (ReceiveMessage, 6673650005084645130) + time: (Dec 2 10:43:55.399070, Dec 2 10:43:55.399239) + duration: (65.75µs, 168.25µs, 4.375µs) + span: (DecodeProtocolHead, 6673650005084645130) + time: (Dec 2 10:43:55.399240, Dec 2 10:43:55.399241) + duration: (235.417µs, 1.375µs, 1.583µs) + span: (Decompress, 6673650005084645130) + time: (Dec 2 10:43:55.399242, Dec 2 10:43:55.399242) + duration: (237µs, 125ns, 1.25µs) + span: (Unmarshal, 6673650005084645130) + time: (Dec 2 10:43:55.399242, Dec 2 10:43:55.399243) + duration: (237.292µs, 750ns, 333ns) + event: (your annotation at post-filter, Dec 2 10:43:55.399249) + event: (your annotation at post-filter, Dec 2 10:43:55.399249) + event: (your annotation at post-filter, Dec 2 10:43:55.399249) + span: (Marshal, 6673650005084645130) + time: (Dec 2 10:43:55.399250, Dec 2 10:43:55.399251) + duration: (103.314625ms, 1.208µs, 10.167µs) + span: (Compress, 6673650005084645130) + time: (Dec 2 10:43:55.399252, Dec 2 10:43:55.399252) + duration: (103.315958ms, 125ns, 9.917µs) + span: (EncodeProtocolHead, 6673650005084645130) + time: (Dec 2 10:43:55.399252, Dec 2 10:43:55.399253) + duration: (103.316208ms, 750ns, 9.042µs) + span: (SendMessage, 6673650005084645130) + time: (Dec 2 10:43:55.399253, Dec 2 10:43:55.399261) + duration: (103.317333ms, 8.333µs, 334ns) +``` + +A new `event` field has been added to the span details, along with an embedded subspan. + +- event: describes what happened at a given moment, similar to a log. + Events that can be inserted by you, such as `Nov 4 14:39:23.594147: your annotation at pre-filter` in the example above. +- span: While the server is processing your custom function, a new client may be created to call the downstream service, and a sub-span will be created + As you can see, all subspans occur within `HandleFunc`. + +Note that the values of middleDur and postDur in endTime, duration may be ``unknown'', for example, the above span contains the following subspan. + +``` +span: (sleep, 6673650005084645130) +time: (Dec 2 10:43:55.297876, unknown) +duration: (1.698709ms, unknown, unknown) +event: (awake, Dec 2 10:43:55.398954) +``` + +## Span Interface + +You can call `rpcz.SpanFromContext`[^2] to get the current `Span` in the `context` and then use the following interface to manipulate Span. + +```go +type Span interface { +// AddEvent adds an event. +AddEvent(name string) + +// SetAttribute sets Attribute with (name, value). +SetAttribute(name string, value interface{}) + +// ID returns SpanID. +ID() SpanID + +// NewChild creates a child span from current span. +// Ender ends this span if related operation is completed. +NewChild(name string) (Span, Ender) +} +``` + +[^2]: Return a `noopSpan` when the `context` does not contain any `span`, any operation on the `noopSpan` is null and will not take effect. + +### Using AddEvent to add events + +```go +// If no Span is currently set in ctx an implementation of a Span that performs no operations is returned. +span := SpanFromContext(ctx context.Context) + +span.AddEvent("Acquiring lock") +mutex.Lock() + +span.AddEvent("Got lock, doing work...") +// do some stuff ... + +span.AddEvent("Unlocking") +mutex.Unlock() +``` + +### Use SetAttribute to set attributes + +```go +ctx, msg := codec.EnsureMessage(ctx) +span := SpanFromContext(ctx context.Context) +span.SetAttribute("RPCName", msg.ClientRPCName()) +span.SetAttribute("Error", msg.ClientRspErr()) +``` + +### Create a new child span + +**End() function should be called only once by the caller to end the life cycle of the child span; uncalled End and multiple calls to End are undefined** + +```go +span := SpanFromContext(ctx context.Context) +cs, end := span.NewChild("Decompress") +reqBodyBuf, err := codec.Decompress(compressType, reqBodyBuf) +end.End() +``` + +## Reference + +- [1] https://en.wikipedia.org/wiki/Event_(UML) +- [2] https://en.wikipedia.org/wiki/Event_(computing) +- [3] https://opentelemetry.io/docs/instrumentation/go/manual/#events +- [4] https://opentelemetry.io/docs/instrumentation/go/api/tracing/#starting-and-ending-a-span +- [5] https://opentelemetry.io/docs/concepts/observability-primer/#spans +- [6] span-id represented as an 8-byte array, satisfying the w3c trace-context specification. https://www.w3.org/TR/trace-context/#parent-id +- [7] https://en.wiktionary.org/wiki/-z#English +- [8] https://github.com/grpc/proposal/blob/master/A14-channelz.md +- [9] Dapper, a Large-Scale Distributed Systems Tracing Infrastructure: http://static.googleusercontent.com/media/research.google.com/en// pubs/archive/36356.pdf +- [10] brpc-rpcz: https://github.com/apache/incubator-brpc/blob/master/docs/cn/rpcz.md +- [11] tRPC-Cpp rpcz wiki. todo +- [12] tRPC-Cpp rpcz proposal. https://git.woa.com/trpc/trpc-proposal/blob/master/L17-cpp-rpcz.md +- [13] opentracing: https://opentracing.io/ +- [14] opentelemetry: https://opentelemetry.io/ +- [15] https://tpstelemetry.pages.woa.com/ +- [16] open-telemetry 2.0-sdk-go: https://git.woa.com/opentelemetry/opentelemetry-go-ecosystem/blob/master/sdk/trace/dyeing_sampler.go +- [17] open-telemetry-sdk-go- traceIDRatioSampler: https://github.com/open-telemetry/opentelemetry-go/blob/main/sdk/trace/sampling.go \ No newline at end of file diff --git a/rpcz/README_CN.md b/rpcz/README_CN.md index 6e899c2..c559308 100644 --- a/rpcz/README_CN.md +++ b/rpcz/README_CN.md @@ -1,17 +1,154 @@ -# RPCZ +# RPCZ [English](README.md) -RPCZ 是一个监控 RPC 过程运行状态的工具,记录了一次 rpc 中发生的各种事情,如序列化/反序列,压缩/解压缩和运行拦截器。 -RPCZ 在调试和性能优化两个方面有非常重要的应用。 -在调试方面,RPCZ 提供给用户记录各种事情发生的接口,通过 admin 管理界面可以查看记录的事件,能帮助用户快速准确的定位问题。 -在性能优化方面,RPCZ 记录了各种事件(序列化/反序列,压缩/解压缩和运行拦截器)的持续时间和收发数据包的大小,能够帮助用户优化耗时和存储,分析超时事件。 +RPCZ 是一个监控 RPC 的工具,记录了一次 rpc 中发生的各种事件,如序列化/反序列,压缩解压缩和执行拦截器。 +RPCZ 可以帮助用户调试服务,它允许用户自行配置需要被记录的事件,用户可以通过 admin 工具可以查看记录的事件,能帮助用户快速准确的定位问题。 +除此之外,由于 RPCZ 记录了 RPC 中各种事件的持续时间和收发数据包的大小,因此可以帮助用户分析超时事件,优化服务的性能。 + +## 术语解释 + +### 事件(Event) + +事件(Event)[1, 2, 3] 用来描述某一特定时刻(`Event.Time`)发生了某件事情(`Event.Name`)。 + +```go +type Event struct { + Name string + Time time.Time +} +``` +在一个普通 RPC 调用中会发生一系列的事件,例如发送请求的 Client 端按照时间先后顺序,一般会发生如下一系列事件: + +1. 开始运行前置拦截器 +2. 结束运行前置拦截器 +3. 开始序列化 +4. 结束序列化 +5. 开始压缩 +6. 结束压缩 +7. 开始编码协议头部字段 +8. 结束编码协议头部字段 +9. 开始发送二进制文件到网络 +10. 结束发送二进制文件到网络 +11. 开始从网络中接收二进制文件 +12. 结束从网络中接收二进制文件 +13. 开始解码协议头部字段 +14. 结束解码协议头部字段 +15. 开始解压缩 +16. 结束解压缩 +17. 开始反序列化 +18. 结束反序列化 +19. 开始运行后置拦截器 +20. 结束运行后置拦截器 + +而处理请求的 server 端,按照时间先后顺序,一般会发生如下一系列事件: + +1. 开始解码协议头部字段 +2. 结束解码协议头部字段 +3. 开始解压缩 +4. 结束解压缩 +5. 开始反序列化 +6. 结束反序列化 +7. 开始运行前置拦截器 +8. 结束运行前置拦截器 +9. 开始运行用户自定义处理函数 +10. 结束运行用户自定义处理函数 +11. 开始运行后置拦截器 +12. 结束运行后置拦截器 +13. 开始序列化 +14. 结束序列化 +15. 开始压缩 +16. 结束压缩 +17. 开始编码协议头部字段 +18. 结束解码协议头部字段 +19. 开始发送二进制文件到网络 +20. 结束发送二进制文件到网络 + +### Span + +Span[4, 5] 用来描述某段时间间隔(具有开始时间和结束时间)的单个操作,例如客户端发送远程请求,服务端处理请求或函数调用。 +根据划分的时间间隔大小不同,一个大的 Span 可以包含多个小的 Span,就像一个函数中可能调用多个其他函数一样,会形成树结构的层次关系。 +因此一个 Span 除了包含名字、内部标识 span-id[6],开始时间、结束时间和这段时间内发生的一系列事件(Event)外,还可能包含许多子 Span。 + +rpcz 中存在两种类型的 Span。 +1. client-Span:描述 client 从开始发送请求到接收到回复这段时间间隔内的操作(涵盖上一节 Event 中描述的 client 端发生一系列事件)。 + +2. server-Span:描述 server 从开始接收请求到发送完回复这段时间间隔内的操作(涵盖上一节 Event 中描述的 server 端发生一系列事件)。 + server-Span 运行用户自定义处理函数的时候,可能会创建 client 调用下游服务,此时 server-Span 会包含多个子 client-Span。 + +``` +server-Span + client-Span-1 + client-Span-2 + ...... + client-Span-n +``` + +Span 被存储在 context 中,rpcz 会自动调用 ContextWithSpan 往 context 中存 Span,在函数调用过程中需要保证 context 中的 Span 不会丢失。 + +## Span 的生命周期 + +考察 Span 对象的生命周期,rpcz 中对 Span 的绝大多数操作,都需要考虑并发安全。 +除此之外采用了 sync.Pool 和 预先分配的循环数组来降低 Span 的内存分配时对性能的影响。 + +### Span 的构造 + +rpcz 在启动时会初始化一个全局 GlobalRPCZ,用于生成和存储 Span。 +在框架内部 Span 只可能在两个位置被构造, +第一个位置是在 server 端的 transport 层的 handle 函数刚开始处理接收到的请求时; +第二个位置是在 client 端的桩代码中调用 Invoke 函数开始发起 rpc 请求时。 +虽然两个位置创建的 Span 类型是不同,但是代码逻辑是相似的,都会调用 rpczNewSpanContext,该函数实际上执行了三个操作 +1. 调用 SpanFromContext 函数,从 context 中获取 span。 +2. 调用 span.NewChild 方法,创建新的 child span。 +3. 调用 ContextWithSpan 函数,将新创建的 child span 设置到 context 中。 + +### Span 在 context 中传递 + +被创建 Span 在提交前,会一直在存放在 context 中,沿着 rpc 调用的链路传递。 +在调用链路上使用 `rpcz.AddEvent` 往当前 context 中的 Span 中添加新的事件。 + +### Span 的提交 + +在 server 端的 transport 层的 handle 函数处理完请求后,会调用 `ender.End()` 把 Span 提交到 GlobalRPCZ 当中。 +此后虽然 context 中仍然存放着 Span,但是从语义上来说,已经调用过的 End 函数的 Span 不允许再被继续操作,其行为是未定义的。 + +### 在 admin 中访问 Span + +admin 模块调用 `rpcz.Query` 和 `rpcz.BatchQuery` 从 GlobalRPCZ 中读取 Span。 +有一点需要注意的是,admin 获取的 Span 是只读类型的 Span(ReadOnlySpan),只读类型的 Span 是由可写入的 Span 导出得到的,这样做的原因是保证并发访问安全。 + +### 删除多余的 Span + +当哈希表中存储的 Span 过多时就需要按照某种淘汰规则,删除多余的 Span。 +目前的实现是当 GlobalRPCZ 中的 Span 个数超过最大容量上限时会删除最老的 Span。 + +## RPCZ 名字的由来 + +关于 "RPCZ" 的这个名字的由来,后缀 -z 有在英文中一般有两种含义 [7]: 一是用于名词,实现单数变复数,如 Boy**z** are always trouble;二是用于动词实现动词形态的变化 He love**z** me。 +总的来说,在单词后面加 -z 的效果类似于加 -s。 +所以 "RPCZ" 就是指各种类型的 RPC,从一个分布式全局的调用链路视角来看的确是成立的,各种 RPC 调用存在树状的父子关系,组合成了 "RPCZ"。 + +"RPCZ" 这一术语最早来源于 google 内部的 RPC 框架 Stubby,在此基础上 google 在开源的 grpc 实现了类似功能的 channelz[8],channelz 中除了包括各种 channel 的信息,也涵盖 trace 信息。 +之后,百度开源的 brpc 在 google 发表的分布式追踪系统 Dapper 论文 [9] 的基础上,实现了一个非分布式的 trace 工具,模仿 channelz 取名为 brpc-rpcz[10]。 +接着就是用户在使用 tRPC 中需要类似于 brpc-rpcz 的工具来进行调试和优化,所以 tRPC-Cpp 首先支持类似功能 [11, 12],仍然保留了 RPCZ 这个名字。 + +最后就是在 tRPC-Go 支持类似 "RPCZ" 的功能,在实现过程中发现随着分布式追踪系统的发展,社区中出现了 opentracing[13] 和 opentelemetry[14] 的开源系统,公司内部也做起了天机阁 [15]。 +tRPC-Go-RPCZ 在 span 和 event 设计上部分借鉴了 opentelemetry-trace 的 go 语言实现,可以认为是 tRPC-Go 框架内部的 trace 系统。 +严格来说,tRPC-Go-RPCZ 是非分布式,因为不同服务之间没有在协议层面实现通信。 +现在看来,brpc, tRPC-Cpp 和 tRPC-Go 实现的 rpcz,取名叫 spanz 或许更符合后缀 "-z" 本来的含义。 ## 如何配置 rpcz -rpcz 的配置分为基本配置、进阶配置和代码配置,更多配置例子见 `config_test.go`。 +rpcz 的配置包括基本配置,进阶配置和代码配置,更多配置例子见 `config_test.go`。 + +### 不同 tRPC-GO 版本的支持情况 + +- v0.15.0:支持 tRPC 流式和 tnet。 +- v0.14.0:支持 http 协议。 +- v0.11.0:支持 tRPC 一元调用。 +- v0.11.0 之前的版本:不支持。 ### 基本配置 -在 server 端配置 admin,同时在 admin 下面配置 rpcz : +在 server 端配置 admin,同时在 admin 里面配置 rpcz : ```yaml server: @@ -23,21 +160,21 @@ server: capacity: 10000 ``` -- `fraction` 字段为采样率,取值范围为`[0.0, 1.0]`,默认值为 0.0,即不进行任何采样,需要手动配置。 -- `capacity` 为 rpcz 的存储容量,默认值为 10000,表示最多能存储的 span 数量。 +- `fraction` : 采样率,其取值范围为`[0.0, 1.0]`,默认值为 0.0 代表不采样,需要手动配置。 +- `capacity`: rpcz 的存储容量,默认值为 10000,表示最多能存储的 span 数量。 ### 进阶配置 -进阶配置允许用户自行过滤感兴趣的 span,在使用进阶配置之前需要先了解 rpcz 的采样机制。 +进阶配置允许你自行过滤感兴趣的 span,在使用进阶配置之前需要先了解 rpcz 的采样机制。 #### 采样机制 -rpcz 使用采样机制来控制性能开销和过滤用户不感兴趣的 Span。 +rpcz 使用采样机制来控制性能开销和过滤你不感兴趣的 Span。 采样可能发生在 Span 的生命周期的不同阶段,最早的采样发生在 Span 创建之前,最晚的采样发生在 Span 提交之前。 ##### 采样结果表 -只有创建和提交之前都被采样到的 Span 才会最终被收集到 GlobalRPCZ 中,供用户通过 admin 接口查询。 +只有创建和提交之前都被采样到的 Span 才会最终被收集到 GlobalRPCZ 中,供你通过 admin 接口查询。 | 在 Span 创建之前采样? | 在 Span 提交之前采样? | Span 最终是否会被收集? | |:-----------------|:--------------:|:--------------:| @@ -54,27 +191,80 @@ rpcz 使用采样机制来控制性能开销和过滤用户不感兴趣的 Span ##### 在 Span 提交之前采样 -对于一个已经创建好的 Span,会记录一次 rpc 中的各种信息,而用户可能只关心包含某些特定信息的 Span,例如出现 rpc 错误的 Span,高耗时的 Span 以及包含特定属性信息的 Span。 -这时,就需要在 Span 最终提交前对上述用户感兴趣的 Span 进行采样。 -rpcz 提供了一个灵活的对外接口,允许用户在服务在启动之前,通过配置文件设置 `rpcz.record` 字段来自定义 Span 提交之前采样逻辑。 +已经创建好的 Span 会记录 rpc 中的各种信息,但是你可能只关心包含某些特定信息的 Span,例如出现 rpc 错误的 Span,高耗时的 Span 以及包含特定属性信息的 Span。 +这时,就需要在 Span 最终提交前只对你需要的 Span 进行采样。 +rpcz 提供了一个灵活的对外接口,允许你在服务在启动之前,通过配置文件设置 `rpcz.record_when` 字段来自定义 Span 提交之前采样逻辑。 +record_when 提供3种常见的布尔操作:`AND`, `OR` 和 `NOT`,7种返回值为布尔值的基本操作,`__min_request_size`, `__min_response_size`, `__error_code`, `__error_message`, `__rpc_name`, `__min_duration` 和 `__has_attribute`。 +**需要注意的是 record_when 本身是一个 `AND` 操作**。 +你可以通过对这些操作进行任意组合,灵活地过滤出感兴趣的 Span。 + +```yaml +server: # server configuration. + admin: + ip: 127.0.0.1 # ip. + port: 9528 # default: 9028. + rpcz: # tool that monitors the running state of RPC, recording various things that happen in a rpc. + fraction: 0.0 # sample rate, 0.0 <= fraction <= 1.0. + record_when: # record_when is actually an AND operation + - AND: + - __min_request_size: 30 # record span whose request_size is greater than__min_request_size in bytes. + - __min_response_size: 40 # record span whose response_size is greater than __min_response_size in bytes. + - OR: + - __error_code: 1 # record span whose error codes is 1. + - __error_code: 2 # record span whose error codes is 2. + - __error_message: "unknown" # record span whose error messages contain "unknown". + - __error_message: "not found" # record span whose error messages contain "not found". + - NOT: {__rpc_name: "/trpc.app.server.service/method1"} # record span whose RPCName doesn't contain __rpc_name. + - NOT: # record span whose RPCName doesn't contain "/trpc.app.server.service/method2, or "/trpc.app.server.service/method3". + OR: + - __rpc_name: "/trpc.app.server.service/method2" + - __rpc_name: "/trpc.app.server.service/method3" + - __min_duration: 1000ms # record span whose duration is greater than __min_duration. + # record span that has the attribute: name1, and name1's value contains "value1" + # valid attribute form: (key, value) only one space character after comma character, and key can't contain comma(',') character. + - __has_attribute: (name1, value1) + # record span that has the attribute: name2, and name2's value contains "value2". + - __has_attribute: (name2, value2) +``` + +#### 配置举例 + +##### 对包含错误码为 1(RetServerDecodeFail),且错误信息中包含 “unknown” 字符串,且持续时间大于 1s 的 span 进行提交 ```yaml server: admin: + ip: 127.0.0.1 + port: 9028 rpcz: + fraction: 1.0 + capacity: 10000 record_when: - error_codes: [0,] - min_duration: 1000ms # ms or s - sampling_fraction: 1 # [0.0, 1.0] + - __error_code: 1 + - __min_duration: 1000ms + - __sampling_fraction: 1 ``` -- `error_codes` 代表多个整数错误码,表示采样包含其中任意一个错误码的 span,例如 0(RetOk), 21(RetServerTimeout)。 -- `min_duration` 代表 span 的最小持续时间,只采集超过 `min_duration` 的 span,可用于耗时分析。 -- `sampling_fraction` 为提交采样比率,取值范围为 `[0, 1]` +注意: record_when 本身是一个 AND 节点,还可以有以下写法:写法1, 写法2 -#### 配置举例 +写法1: -##### 对包含包含错误码为 1(RetServerDecodeFail) 或持续时间大于 1s 的 span 进行提交 +```yaml +server: + admin: + ip: 127.0.0.1 + port: 9028 + rpcz: + fraction: 1.0 + capacity: 10000 + record_when: + - AND: + - __error_code: 1 + - __min_duration: 1000ms + - __sampling_fraction: 1 +``` + +写法2: ```yaml server: @@ -85,12 +275,14 @@ server: fraction: 1.0 capacity: 10000 record_when: - error_codes: 0 - min_duration: 1000ms - sampling_fraction: 1 + - AND: + - __error_code: 1 + - AND: + - __min_duration: 1000ms + - __sampling_fraction: 1 ``` -##### 对包含错误码为 1(RetServerDecodeFail) 或 21(RetServerTimeout) 的或持续时间大于 2s 的 span 以二分之一的概率进行提交 +写法3: ```yaml server: @@ -101,14 +293,55 @@ server: fraction: 1.0 capacity: 10000 record_when: - error_codes: [1, 21] - min_duration: 2s - sampling_fraction: 0.5 + - AND: + - __error_code: 1 + - AND: + - __min_duration: 1000ms + - AND: + - __sampling_fraction: 1 +``` + +还有其他写法,你可以自行尝试。 + +##### 对包含错误码为 1(RetServerDecodeFail) 或 21(RetServerTimeout) 的或持续时间大于 2s 的 span 以 1/2 的概率进行提交 + +```yaml +server: + admin: + ip: 127.0.0.1 + port: 9028 + rpcz: + fraction: 1.0 + capacity: 10000 + record_when: + - OR: + - error_code: 1 + - error_code: 21 + - min_duration: 2s + - __sampling_fraction: 0.5 +``` + +##### 对持续时间大于 10s, 且 rpc name 包含 "TDXA/Transfer" 字样,且 error message 里不能包含 "pseudo" 字样的 span 进行提交 + +```yaml +server: + admin: + ip: 127.0.0.1 + port: 9028 + rpcz: + fraction: 1.0 + capacity: 10000 + record_when: + - min_duration: 2s + - __rpc_name: "TDXA/Transfer" + - NOT: + __error_message: "pseudo" + - __sampling_fraction: 1 ``` ### 代码配置 -在读取配置文件之后且在服务启动前,可以通过代码设置 `rpcz.GlobalRPCZ` 灵活设置 rpcz,此时的提交采样逻辑需要实现 `ShouldRecord` 函数。 +在读取配置文件之后且在服务启动前,可以通过 `rpcz.GlobalRPCZ` 来配置 rpcz,此时的提交采样逻辑需要实现 `ShouldRecord` 函数。 ```go // ShouldRecord determines if the Span should be recorded. @@ -120,17 +353,15 @@ type ShouldRecord = func(Span) bool ```go const attributeName = "SpecialAttribute" rpcz.GlobalRPCZ = rpcz.NewRPCZ(&rpcz.Config{ - Fraction: 1.0, - Capacity: 1000, - ShouldRecord: func(s rpcz.Span) bool { - _, ok = s.Attribute(attributeName) - return ok - }, +Fraction: 1.0, +Capacity: 1000, +ShouldRecord: func(s rpcz.Span) bool { +_, ok = s.Attribute(attributeName) +return ok +}, }) ``` -## 使用 admin 命令查询 span 信息 - ### 查询最近提交的多条 span 的概要信息 查询最近 num 个 span 的概要信息,可以访问如下的 url: @@ -143,18 +374,18 @@ http://ip:port/cmds/rpcz/spans?num=xxx ```html 1: - span: (client, 65744150616107367) - time: (Dec 1 20:57:43.946627, Dec 1 20:57:43.946947) - duration: (0, 319.792µs, 0) - attributes: (RPCName, /trpc.testing.end2end.TestTRPC/EmptyCall),(Error, ) -2: +span: (client, 65744150616107367) +time: (Dec 1 20:57:43.946627, Dec 1 20:57:43.946947) +duration: (0, 319.792µs, 0) +attributes: (RPCName, /trpc.testing.end2end.TestTRPC/EmptyCall),(Error, ) + 2: span: (server, 1844470940819923952) - time: (Dec 1 20:57:43.946677, Dec 1 20:57:43.946912) - duration: (0, 235.5µs, 0) - attributes: (RequestSize, 125),(ResponseSize, 18),(RPCName, /trpc.testing.end2end.TestTRPC/EmptyCall),(Error, success) + time: (Dec 1 20:57:43.946677, Dec 1 20:57:43.946912) + duration: (0, 235.5µs, 0) + attributes: (RequestSize, 125),(ResponseSize, 18),(RPCName, /trpc.testing.end2end.TestTRPC/EmptyCall),(Error, success) ``` -每个 span 的概要信息和如下的模版匹配: +每个 span 的概要信息和如下的模版匹配: ```html span: (name, id) @@ -179,7 +410,7 @@ attributes: (name1, value1) (name2, value2) - RPCName:对端的服务名 + 接口名 (/trpc.app.server.service/method) - Error:错误信息,根据框架返回码判断请求是否成功,success 或 nil 表示成功 -如果不指定查询的个数,则下列查询将会默认返回最近 [^1] 10 个 span 的概要信息: +如果不指定查询的个数,则下列查询将会默认返回最近提交成功的 [^1] 10 个 span 的概要信息: ```html http://ip:port/cmds/rpcz/spans @@ -189,7 +420,7 @@ http://ip:port/cmds/rpcz/spans ### 查询某个 span 的详细信息 -查询 span id 为 "xxx" 的 span 的详细信息,可以访问如下的 url: +查询包含 id 的 span 的详细信息,可以访问如下的 url: ```html http://ip:port/cmds/rpcz/spans/{id} @@ -214,15 +445,15 @@ span: (server, 6673650005084645130) span: (filter1, 6673650005084645130) time: (Dec 2 10:43:55.296161, Dec 2 10:43:55.399249) duration: (225.708µs, 103.088ms, 12.292µs) - event: (user's annotation at pre-filter, Dec 2 10:43:55.296163) + event: (your annotation at pre-filter, Dec 2 10:43:55.296163) span: (filter2, 6673650005084645130) time: (Dec 2 10:43:55.296164, Dec 2 10:43:55.399249) duration: (2.75µs, 103.085ms, 250ns) - event: (user's annotation at pre-filter, Dec 2 10:43:55.296165) + event: (your annotation at pre-filter, Dec 2 10:43:55.296165) span: (server.WithFilter, 6673650005084645130) time: (Dec 2 10:43:55.296165, Dec 2 10:43:55.399249) duration: (1.208µs, 103.083625ms, 167ns) - event: (user's annotation at pre-filter, Dec 2 10:43:55.296165) + event: (your annotation at pre-filter, Dec 2 10:43:55.296165) span: (, 6673650005084645130) time: (Dec 2 10:43:55.296166, Dec 2 10:43:55.399249) duration: (792ns, 103.082583ms, 250ns) @@ -316,9 +547,9 @@ span: (server, 6673650005084645130) span: (Unmarshal, 6673650005084645130) time: (Dec 2 10:43:55.399242, Dec 2 10:43:55.399243) duration: (237.292µs, 750ns, 333ns) - event: (user's annotation at post-filter, Dec 2 10:43:55.399249) - event: (user's annotation at post-filter, Dec 2 10:43:55.399249) - event: (user's annotation at post-filter, Dec 2 10:43:55.399249) + event: (your annotation at post-filter, Dec 2 10:43:55.399249) + event: (your annotation at post-filter, Dec 2 10:43:55.399249) + event: (your annotation at post-filter, Dec 2 10:43:55.399249) span: (Marshal, 6673650005084645130) time: (Dec 2 10:43:55.399250, Dec 2 10:43:55.399251) duration: (103.314625ms, 1.208µs, 10.167µs) @@ -335,10 +566,10 @@ span: (server, 6673650005084645130) 在 span 的详细信息中新增了 `event` 字段,以及内嵌的子 span。 -- event: 表示描述了在某一时刻发生的事情,相当于打日志 -可以由用户自行插入的事件,如上面例子中的 `Nov 4 14:39:23.594147: user's annotation at pre-filter`。 -- span:在 server 处理用户自定义函数时,可能会以中转模式新起 client 调用下游服务,此时会创建子 span -可以看到,所有的 子 span 都是在 `HandleFunc` 内发生的。 +- event: 描述了在某一时刻发生的事情,类似于日志。 + 可以由你自行插入的事件,如上面例子中的 `Nov 4 14:39:23.594147: your annotation at pre-filter`。 +- span:在 server 处理你自定义函数时,可能会创建新的 client 调用下游服务,此时会创建子 span + 可以看到,所有的 子 span 都是在 `HandleFunc` 内发生的。 需要注意的是,endTime、duration 中的 middleDur 和 postDur 的值可能为 "unknown",例如上面的 span 中包含如下的子 span: @@ -349,30 +580,30 @@ duration: (1.698709ms, unknown, unknown) event: (awake, Dec 2 10:43:55.398954) ``` -## 提供给用户的接口 +## Span 接口 -用户可以先调用 `rpcz.SpanFromContext`[^2] 获取当前 `context` 中的 `Span`,然后 Span interface 提供的接口来操作 Span。 +你可以先调用 `rpcz.SpanFromContext`[^2] 获取当前 `context` 中的 `Span`,然后使用下面的接口来操作 Span。 -```golang +```go type Span interface { - // AddEvent adds a event. - AddEvent(name string) - - // SetAttribute sets Attribute with (name, value). - SetAttribute(name string, value interface{}) - - // ID returns SpanID. - ID() SpanID - - // NewChild creates a child span from current span. - // Ender ends this span if related operation is completed. - NewChild(name string) (Span, Ender) +// AddEvent adds a event. +AddEvent(name string) + +// SetAttribute sets Attribute with (name, value). +SetAttribute(name string, value interface{}) + +// ID returns SpanID. +ID() SpanID + +// NewChild creates a child span from current span. +// Ender ends this span if related operation is completed. +NewChild(name string) (Span, Ender) } ``` [^2]: 当 `context` 中不含有任何 `span` 的时候会返回一个 `noopSpan`,对 `noopSpan` 的任何操作都是空操作,不会生效。 -### 使用 AddEvent 添加事件 +### 使用 AddEvent 来添加事件 ```go // If no Span is currently set in ctx an implementation of a Span that performs no operations is returned. @@ -390,7 +621,7 @@ mutex.Unlock() ### 使用 SetAttribute 来设置属性 -```golang +```go ctx, msg := codec.EnsureMessage(ctx) span := SpanFromContext(ctx context.Context) span.SetAttribute("RPCName", msg.ClientRPCName()) @@ -399,145 +630,15 @@ span.SetAttribute("Error", msg.ClientRspErr()) ### 创建新的子 Span -**特别需要注意的是:创建的子 Span 应该由调用者自行调用一次 end.End() 函数来结束子 Span 的生命周期,未调用 End 和 多次调用 End 的行为是未定义的** +**特别需要注意的是:创建的子 Span 应该由调用者只调用一次 end.End() 函数来结束子 Span 的生命周期,未调用 End 和 多次调用 End 的行为是未定义的** -```golang +```go span := SpanFromContext(ctx context.Context) cs, end := span.NewChild("Decompress") reqBodyBuf, err := codec.Decompress(compressType, reqBodyBuf) end.End() ``` -## 术语解释 - -### 事件(Event) - -事件(Event)[1, 2, 3] 用来描述某一特定时刻(`Event.Time`)发生了某件事情(`Event.Name`)。 - -```go -type Event struct { - Name string - Time time.Time -} -``` -在一个普通 RPC 调用中会发生一系列的事件,例如发送请求的 Client 端按照时间先后顺序,一般会发生如下一系列事件: - -1. 开始运行前置拦截器 -2. 结束运行前置拦截器 -3. 开始序列化 -4. 结束序列化 -5. 开始压缩 -6. 结束压缩 -7. 开始编码协议头部字段 -8. 结束编码协议头部字段 -9. 开始发送二进制文件到网络 -10. 结束发送二进制文件到网络 -11. 开始从网络中接收二进制文件 -12. 结束从网络中接收二进制文件 -13. 开始解码协议头部字段 -14. 结束解码协议头部字段 -15. 开始解压缩 -16. 结束解压缩 -17. 开始反序列化 -18. 结束反序列化 -19. 开始运行后置拦截器 -20. 结束运行后置拦截器 - -而处理请求的 server 端,按照时间先后顺序,一般会发生如下一系列事件: - -1. 开始解码协议头部字段 -2. 结束解码协议头部字段 -3. 开始解压缩 -4. 结束解压缩 -5. 开始反序列化 -6. 结束反序列化 -7. 开始运行前置拦截器 -8. 结束运行前置拦截器 -9. 开始运行用户自定义处理函数 -10. 结束运行用户自定义处理函数 -11. 开始运行后置拦截器 -12. 结束运行后置拦截器 -13. 开始序列化 -14. 结束序列化 -15. 开始压缩 -16. 结束压缩 -17. 开始编码协议头部字段 -18. 结束解码协议头部字段 -19. 开始发送二进制文件到网络 -20. 结束发送二进制文件到网络 - -### Span - -Span[4, 5] 用来描述某段时间间隔(具有开始时间和结束时间)的单个操作,例如客户端发送远程请求,服务端处理请求或函数调用。 -根据划分的时间间隔大小不同,一个大的 Span 可以包含多个小的 Span,就像一个函数中可能调用多个其他函数一样,会形成树结构的层次关系。 -因此一个 Span 除了包含名字、内部标识 span-id[6],开始时间、结束时间和 这段时间内发生的一系列事件(Event)外,还可能包含许多子 Span。 - -rpcz 中存在两种类型的 Span。 -1. client-Span:描述 client 从开始发送请求到接收到回复这段时间间隔内的操作(涵盖上一节 Event 中描述的 client 端发生一系列事件)。 -2. server-Span:描述 server 从开始接收请求到发送完回复这段时间间隔内的操作(涵盖上一节 Event 中描述的 server 端发生一系列事件)。 -server-Span 运行用户自定义处理函数的时候,可能会创建 client 调用下游服务,此时 server-Span 会包含多个子 client-Span。 - -``` -server-Span - client-Span-1 - client-Span-2 - ...... - client-Span-n -``` - -Span 被存储在 context 中,rpcz 会自动调用 ContextWithSpan 往 context 中存 Span,在函数调用过程中需要保证 context 中的 Span 不会丢失。 - -## Span 的生命周期 - -考察 Span 对象的生命周期,rpcz 中对 Span 的绝大多数操作,都需要考虑并发安全。 -除此之外采用了 sync.Pool 和 预先分配的循环数组来降低 Span 的内存分配时对性能的影响。 - -### Span 的构造 - -rpcz 在启动后会初始化一个全局 GlobalRPCZ,用于生成和存储 Span。 -在框架内部 Span 只可能在两个位置被构造, -第一个位置是在 server 端的 transport 层的 handle 函数刚开始处理接收到的请求时; -第二个位置是在 client 端的桩代码中调用 Invoke 函数开始发起 rpc 请求时。 -虽然两个位置创建的 Span 类型是不同,但是代码逻辑是相似的,都会调用 `rpcz.NewSpanContext`,该函数实际上执行了三个操作: -1. 调用 SpanFromContext 函数,从 context 中获取 span。 -2. 调用 span.NewChild 方法,创建新的 child span。 -3. 调用 ContextWithSpan 函数,将新创建的 child span 设置到 context 中。 - -### Span 在 context 中传递 - -被创建 Span 在提交前,会一直在存放在 context 中,沿着 rpc 调用的链路传递。 -在调用链路上使用 `rpcz.AddEvent` 往当前 context 中的 Span 中添加新的事件。 - -### Span 的提交 - -在 server 端的 transport 层的 handle 函数处理完请求后,会调用 `ender.End()` 把 Span 提交到 GlobalRPCZ 当中。 -此后虽然 context 中仍然存放着 Span,但是从语义上来说,已经调用过的 End 函数的 Span 不允许再被继续操作,其行为是未定义的。 - -### 在 admin 中访问 Span - -admin 模块调用 `rpcz.Query` 和 `rpcz.BatchQuery` 从 GlobalRPCZ 中读取 Span。 -有一点需要注意的是,admin 获取的 Span 是只读类型的 Span(ReadOnlySpan),只读类型的 Span 是由可写入的 Span 导出得到的,这样做的原因是保证并发访问安全。 - -### 删除多余的 Span - -当哈希表中存储的 Span 过多时就需要按照某种淘汰规则,删除多余的 Span。 -目前的实现是当 GlobalRPCZ 中的 Span 个数超过最大容量上限时会删除最老的 Span。 - -## RPCZ 名字的由来 - -关于 "RPCZ" 的这个名字的由来,后缀 -z 有在英文中一般有两种含义 [7]: 一是用于名词,实现单数变复数,如 Boy**z** are always trouble;二是用于动词实现动词形态的变化 He love**z** me。 -总的来说,在单词后面加 -z 的效果类似于加 -s。 -所以 "RPCZ" 就是指各种类型的 RPC,从一个分布式全局的调用链路视角来看的确是成立的,各种 RPC 调用存在树状的父子关系,组合成了 "RPCZ"。 - -"RPCZ" 这一术语最早来源于 google 内部的 RPC 框架 Stubby,在此基础上 google 在开源的 grpc 实现了类似功能的 channelz[8],channelz 中除了包括各种 channel 的信息,也涵盖 trace 信息。 -之后,百度开源的 brpc 在 google 发表的分布式追踪系统 Dapper 论文 [9] 的基础上,实现了一个非分布式的 trace 工具,模仿 channelz 取名为 brpc-rpcz[10]。 -接着就是用户在使用 tRPC 中需要类似于 brpc-rpcz 的工具来进行调试和优化,所以 tRPC-Cpp 首先支持类似功能 [11, 12],仍然保留了 RPCZ 这个名字。 - -最后就是在 tRPC-Go 支持类似 "RPCZ" 的功能,在实现过程中发现随着分布式追踪系统的发展,社区中出现了 opentracing[13] 和 opentelemetry[14] 的开源系统,公司内部也做起了天机阁 [15]。 -tRPC-Go-RPCZ 在 span 和 event 设计上部分借鉴了 opentelemetry-trace 的 go 语言实现,可以认为是 tRPC-Go 框架内部的 trace 系统。 -严格来说,tRPC-Go-RPCZ 是非分布式,因为不同服务之间没有在协议层面实现通信。 -现在看来,brpc, tRPC-Cpp 和 tRPC-Go 实现的 rpcz,取名叫 spanz 或许更符合后缀 "-z" 本来的含义。 - ## 参考 - [1] https://en.wikipedia.org/wiki/Event_(UML) @@ -550,6 +651,10 @@ tRPC-Go-RPCZ 在 span 和 event 设计上部分借鉴了 opentelemetry-trace 的 - [8] https://github.com/grpc/proposal/blob/master/A14-channelz.md - [9] Dapper, a Large-Scale Distributed Systems Tracing Infrastructure: http://static.googleusercontent.com/media/research.google.com/en//pubs/archive/36356.pdf - [10] brpc-rpcz: https://github.com/apache/incubator-brpc/blob/master/docs/cn/rpcz.md +- [11] tRPC-Cpp rpcz wiki. todo +- [12] tRPC-Cpp rpcz proposal. https://git.woa.com/trpc/trpc-proposal/blob/master/L17-cpp-rpcz.md - [13] opentracing: https://opentracing.io/ - [14] opentelemetry: https://opentelemetry.io/ +- [15] https://tpstelemetry.pages.woa.com/ +- [16] 天机阁 2.0-sdk-go:https://git.woa.com/opentelemetry/opentelemetry-go-ecosystem/blob/master/sdk/trace/dyeing_sampler.go - [17] open-telemetry-sdk-go- traceIDRatioSampler: https://github.com/open-telemetry/opentelemetry-go/blob/main/sdk/trace/sampling.go \ No newline at end of file diff --git a/testdata/trpc_go.yaml b/testdata/trpc_go.yaml index 550c298..d4e6e94 100755 --- a/testdata/trpc_go.yaml +++ b/testdata/trpc_go.yaml @@ -15,11 +15,27 @@ server: # server configuration. write_timeout: 60000 # ms. the timeout setting for processing. enable_tls: false # whether to enable TLS, currently not supported. rpcz: # tool that monitors the running state of RPC, recording various things that happen in a rpc. - fraction: 0.0 # sample rate, 0.0 <= fraction <= 1.0. + fraction: 0.0 # sample rate, 0.0 <= fraction <= 1.0. record_when: - error_codes: [0, 1] # only record span that contain any of error_codes. - min_duration: 1000ms # only record span whose duration greater min_duration in milliseconds. - sampling_fraction: 1 # sampling fraction when recording span, 0.0 <= sampling_fraction <= 1.0. + - AND: + - __min_request_size: 30 # record span whose request_size is greater than__min_request_size in bytes. + - __min_response_size: 40 # record span whose response_size is greater than __min_response_size in bytes. + - OR: + - __error_code: 1 # record span whose error codes is 1. + - __error_code: 2 # record span whose error codes is 2. + - __error_message: "unknown" # record span whose error messages contain "unknown". + - __error_message: "not found" # record span whose error messages contain "not found". + - NOT: { __rpc_name: "/trpc.app.server.service/method1" } # record span whose RPCName doesn't contain __rpc_name. + - NOT: # record span whose RPCName doesn't contain "/trpc.app.server.service/method2, or "/trpc.app.server.service/method3". + OR: + - __rpc_name: "/trpc.app.server.service/method2" + - __rpc_name: "/trpc.app.server.service/method3" + - __min_duration: 1000ms # record span whose duration is greater than __min_duration. + # record span that has the attribute: name1, and name1's value contains "value1" + # valid attribute form: (key, value) only one space character after comma character, and key can't contain comma(',') character. + - __has_attribute: (name1, value1) + # record span that has the attribute: name2, and name2's value contains "value2". + - __has_attribute: (name2, value2) service: # business service configuration,can have multiple. - name: trpc.test.helloworld.Greeter1 # the route name of the service. ip: 127.0.0.1 # the service listening ip address, can use the placeholder ${ip}, choose one of ip and nic, priority ip.