forked from grpc-ecosystem/go-grpc-middleware
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpayload_interceptors.go
150 lines (133 loc) · 5.89 KB
/
payload_interceptors.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package kit
import (
"bytes"
"fmt"
"context"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/logging"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/kit/ctxkit"
"google.golang.org/grpc"
)
var (
// JsonPbMarshaller is the marshaller used for serializing protobuf messages.
// If needed, this variable can be reassigned with a different marshaller with the same Marshal() signature.
JsonPbMarshaller grpc_logging.JsonPbMarshaler = &jsonpb.Marshaler{}
)
// PayloadUnaryServerInterceptor returns a new unary server interceptors that logs the payloads of requests.
//
// This *only* works when placed *after* the `kit.UnaryServerInterceptor`. However, the logging can be done to a
// separate instance of the logger.
func PayloadUnaryServerInterceptor(logger log.Logger, decider grpc_logging.ServerPayloadLoggingDecider) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if !decider(ctx, info.FullMethod, info.Server) {
return handler(ctx, req)
}
// Use the provided log.Logger for logging but use the fields from context.
logger = log.With(logger, append(serverCallFields(info.FullMethod), ctxkit.TagsToFields(ctx)...)...)
logProtoMessageAsJson(logger, req, "grpc.request.content", "server request payload logged as grpc.request.content field")
resp, err := handler(ctx, req)
if err == nil {
logProtoMessageAsJson(logger, resp, "grpc.response.content", "server response payload logged as grpc.request.content field")
}
return resp, err
}
}
// PayloadStreamServerInterceptor returns a new server server interceptors that logs the payloads of requests.
//
// This *only* works when placed *after* the `kit.StreamServerInterceptor`. However, the logging can be done to a
// separate instance of the logger.
func PayloadStreamServerInterceptor(logger log.Logger, decider grpc_logging.ServerPayloadLoggingDecider) grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if !decider(stream.Context(), info.FullMethod, srv) {
return handler(srv, stream)
}
logEntry := log.With(logger, append(serverCallFields(info.FullMethod), ctxkit.TagsToFields(stream.Context())...)...)
newStream := &loggingServerStream{ServerStream: stream, logger: logEntry}
return handler(srv, newStream)
}
}
// PayloadUnaryClientInterceptor returns a new unary client interceptor that logs the paylods of requests and responses.
func PayloadUnaryClientInterceptor(logger log.Logger, decider grpc_logging.ClientPayloadLoggingDecider) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if !decider(ctx, method) {
return invoker(ctx, method, req, reply, cc, opts...)
}
logEntry := log.With(logger, newClientLoggerFields(ctx, method)...)
logProtoMessageAsJson(logEntry, req, "grpc.request.content", "client request payload logged as grpc.request.content")
err := invoker(ctx, method, req, reply, cc, opts...)
if err == nil {
logProtoMessageAsJson(logEntry, reply, "grpc.response.content", "client response payload logged as grpc.response.content")
}
return err
}
}
// PayloadStreamClientInterceptor returns a new streaming client interceptor that logs the paylods of requests and responses.
func PayloadStreamClientInterceptor(logger log.Logger, decider grpc_logging.ClientPayloadLoggingDecider) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
if !decider(ctx, method) {
return streamer(ctx, desc, cc, method, opts...)
}
logEntry := log.With(logger, newClientLoggerFields(ctx, method)...)
clientStream, err := streamer(ctx, desc, cc, method, opts...)
newStream := &loggingClientStream{ClientStream: clientStream, logger: logEntry}
return newStream, err
}
}
type loggingClientStream struct {
grpc.ClientStream
logger log.Logger
}
func (l *loggingClientStream) SendMsg(m interface{}) error {
err := l.ClientStream.SendMsg(m)
if err == nil {
logProtoMessageAsJson(l.logger, m, "grpc.request.content", "server request payload logged as grpc.request.content field")
}
return err
}
func (l *loggingClientStream) RecvMsg(m interface{}) error {
err := l.ClientStream.RecvMsg(m)
if err == nil {
logProtoMessageAsJson(l.logger, m, "grpc.response.content", "server response payload logged as grpc.response.content field")
}
return err
}
type loggingServerStream struct {
grpc.ServerStream
logger log.Logger
}
func (l *loggingServerStream) SendMsg(m interface{}) error {
err := l.ServerStream.SendMsg(m)
if err == nil {
logProtoMessageAsJson(l.logger, m, "grpc.response.content", "server response payload logged as grpc.response.content field")
}
return err
}
func (l *loggingServerStream) RecvMsg(m interface{}) error {
err := l.ServerStream.RecvMsg(m)
if err == nil {
logProtoMessageAsJson(l.logger, m, "grpc.request.content", "server request payload logged as grpc.request.content field")
}
return err
}
func logProtoMessageAsJson(logger log.Logger, pbMsg interface{}, key string, msg string) {
if p, ok := pbMsg.(proto.Message); ok {
payload, err := (&jsonpbObjectMarshaler{pb: p}).marshalJSON()
if err != nil {
level.Info(logger).Log(key, err)
}
level.Info(logger).Log(key, string(payload))
}
}
type jsonpbObjectMarshaler struct {
pb proto.Message
}
func (j *jsonpbObjectMarshaler) marshalJSON() ([]byte, error) {
b := &bytes.Buffer{}
if err := JsonPbMarshaller.Marshal(b, j.pb); err != nil {
return nil, fmt.Errorf("jsonpb serializer failed: %v", err)
}
return b.Bytes(), nil
}