Skip to content

Commit

Permalink
fix: barry 2024-07-24 22:24:43
Browse files Browse the repository at this point in the history
  • Loading branch information
kooksee committed Jul 24, 2024
1 parent 3c8796f commit 1f9e55b
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 26 deletions.
38 changes: 25 additions & 13 deletions pkg/gateway/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ func CompressorOption(contentEncoding string, c Compressor) MuxOption {
var _ Gateway = (*Mux)(nil)

type Mux struct {
cc *inprocgrpc.Channel
opts *muxOptions
routerTree *routertree.RouteTree
localClient *inprocgrpc.Channel
opts *muxOptions
routerTree *routertree.RouteTree
}

func (m *Mux) GetRouteMethods() []RouteOperation { return m.routerTree.List() }
Expand Down Expand Up @@ -221,11 +221,23 @@ func (m *Mux) Handler(ctx *fiber.Ctx) error {
}

func (m *Mux) Invoke(ctx context.Context, method string, args, reply any, opts ...grpc.CallOption) error {
return m.cc.Invoke(ctx, method, args, reply, opts...)
if mth := m.opts.handlers[method]; mth != nil {
if mth.srv.remoteProxyCli != nil {
return mth.srv.remoteProxyCli.Invoke(ctx, method, args, reply, opts...)
}
}

return m.localClient.Invoke(ctx, method, args, reply, opts...)
}

func (m *Mux) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
return m.cc.NewStream(ctx, desc, method, opts...)
if mth := m.opts.handlers[method]; mth != nil {
if mth.srv.remoteProxyCli != nil {
return mth.srv.remoteProxyCli.NewStream(ctx, desc, method, opts...)
}
}

return m.localClient.NewStream(ctx, desc, method, opts...)
}

func (m *Mux) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
Expand Down Expand Up @@ -276,24 +288,24 @@ func NewMux(opts ...MuxOption) *Mux {
sort.Strings(muxOpts.encodingTypeOffers)

mux := &Mux{
opts: &muxOpts,
cc: new(inprocgrpc.Channel),
routerTree: routertree.NewRouteTree(),
opts: &muxOpts,
localClient: new(inprocgrpc.Channel),
routerTree: routertree.NewRouteTree(),
}

return mux
}

func (m *Mux) SetUnaryInterceptor(interceptor grpc.UnaryServerInterceptor) {
m.opts.unaryInterceptor = interceptor
m.cc.WithServerUnaryInterceptor(interceptor)
m.localClient.WithServerUnaryInterceptor(interceptor)
}

// SetStreamInterceptor configures the in-process channel to use the
// given server interceptor for streaming RPCs when dispatching.
func (m *Mux) SetStreamInterceptor(interceptor grpc.StreamServerInterceptor) {
m.opts.streamInterceptor = interceptor
m.cc.WithServerStreamInterceptor(interceptor)
m.localClient.WithServerStreamInterceptor(interceptor)
}

func (m *Mux) RegisterProxy(sd *grpc.ServiceDesc, proxy lava.GrpcProxy) {
Expand All @@ -306,7 +318,7 @@ func (m *Mux) RegisterProxy(sd *grpc.ServiceDesc, proxy lava.GrpcProxy) {
func (m *Mux) RegisterService(sd *grpc.ServiceDesc, ss interface{}) {
assert.If(generic.IsNil(ss), "ss params is nil")

m.cc.RegisterService(sd, ss)
m.localClient.RegisterService(sd, ss)

ht := reflect.TypeOf(sd.HandlerType).Elem()
st := reflect.TypeOf(ss)
Expand All @@ -328,7 +340,7 @@ func (m *Mux) registerRouter(rule *methodWrapper) {
rule.inputType = assert.Must1(protoregistry.GlobalTypes.FindMessageByName(rule.grpcMethodProtoDesc.Input().FullName()))
rule.outputType = assert.Must1(protoregistry.GlobalTypes.FindMessageByName(rule.grpcMethodProtoDesc.Output().FullName()))

if rule.srv.grpcProxyCli != nil {
if rule.srv.remoteProxyCli != nil {
if rule.grpcMethodDesc != nil {
rule.grpcMethodDesc.Handler = grpcMethodHandlerWrapper(rule)
}
Expand Down Expand Up @@ -365,7 +377,7 @@ func (m *Mux) registerService(gsd *grpc.ServiceDesc, ss interface{}) error {
}

if p, ok := ss.(lava.GrpcProxy); ok && p != nil {
srv.grpcProxyCli = p.Proxy()
srv.remoteProxyCli = p.Proxy()
}

findMethodDesc := func(methodName string) protoreflect.MethodDescriptor {
Expand Down
12 changes: 6 additions & 6 deletions pkg/gateway/stream_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ func (s *streamHTTP) SendMsg(m interface{}) error {
msg := cur.Interface()

reqName := msg.ProtoReflect().Descriptor().FullName()
handler := s.method.srv.opts.responseInterceptors[reqName]
if handler != nil {
return errors.Wrapf(handler(s.handler, msg), "failed to handler response data by %s", reqName)
rspInterceptor := s.method.srv.opts.responseInterceptors[reqName]
if rspInterceptor != nil {
return errors.Wrapf(rspInterceptor(s.handler, msg), "failed to do rsp interceptor response data by %s", reqName)
}

b, err := protojson.Marshal(msg)
Expand Down Expand Up @@ -119,9 +119,9 @@ func (s *streamHTTP) RecvMsg(m interface{}) error {
msg := cur.Interface()

reqName := msg.ProtoReflect().Descriptor().FullName()
handler := s.method.srv.opts.requestInterceptors[reqName]
if handler != nil {
return errors.Wrapf(handler(s.handler, msg), "failed to handler request data by %s", reqName)
reqInterceptor := s.method.srv.opts.requestInterceptors[reqName]
if reqInterceptor != nil {
return errors.Wrapf(reqInterceptor(s.handler, msg), "failed to go req interceptor request data by %s", reqName)
}

if method == http.MethodPut ||
Expand Down
14 changes: 7 additions & 7 deletions pkg/gateway/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
)

type serviceWrapper struct {
opts *muxOptions
srv any
serviceDesc *grpc.ServiceDesc
servicePbDesc protoreflect.ServiceDescriptor
grpcProxyCli grpc.ClientConnInterface
opts *muxOptions
srv any
serviceDesc *grpc.ServiceDesc
servicePbDesc protoreflect.ServiceDescriptor
remoteProxyCli grpc.ClientConnInterface
}

type GrpcMethod struct {
Expand Down Expand Up @@ -79,7 +79,7 @@ func grpcMethodHandlerWrapper(mth *methodWrapper, opts ...grpc.CallOption) GrpcM

var h = func(ctx context.Context, req any) (any, error) {
var out = mth.outputType.New().Interface()
err := mth.srv.grpcProxyCli.Invoke(ctx, mth.grpcFullMethod, in, out, opts...)
err := mth.srv.remoteProxyCli.Invoke(ctx, mth.grpcFullMethod, in, out, opts...)
if err != nil {
return nil, err
}
Expand All @@ -97,6 +97,6 @@ func grpcMethodHandlerWrapper(mth *methodWrapper, opts ...grpc.CallOption) GrpcM

func grpcMethodStreamWrapper(mth *methodWrapper, opts ...grpc.CallOption) GrpcStreamHandler {
return TransparentHandler(func(ctx context.Context, fullMethodName string) (context.Context, grpc.ClientConnInterface, error) {
return ctx, mth.srv.grpcProxyCli, nil
return ctx, mth.srv.remoteProxyCli, nil
})
}

0 comments on commit 1f9e55b

Please sign in to comment.