Skip to content

Commit

Permalink
Apply cosmos#15041, #16151
Browse files Browse the repository at this point in the history
  • Loading branch information
tkxkd0159 committed Jun 13, 2024
1 parent f688899 commit 27502b1
Show file tree
Hide file tree
Showing 8 changed files with 388 additions and 239 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ require (
github.com/tendermint/tm-db v0.6.7
golang.org/x/crypto v0.24.0
golang.org/x/exp v0.0.0-20240604190554-fc45aab8b7f8
golang.org/x/sync v0.7.0
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117
google.golang.org/grpc v1.64.0
google.golang.org/protobuf v1.34.1
Expand Down
46 changes: 37 additions & 9 deletions server/api/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"context"
"fmt"
"net"
"net/http"
Expand All @@ -27,9 +28,9 @@ type Server struct {
Router *mux.Router
GRPCGatewayRouter *runtime.ServeMux
ClientCtx client.Context
logger log.Logger
metrics *telemetry.Metrics

logger log.Logger
metrics *telemetry.Metrics
// Start() is blocking and generally called from a separate goroutine.
// Close() can be called asynchronously and access shared memory
// via the listener. Therefore, we sync access to Start and Close with
Expand Down Expand Up @@ -85,7 +86,12 @@ func New(clientCtx client.Context, logger log.Logger) *Server {
// JSON RPC server. Configuration options are provided via config.APIConfig
// and are delegated to the Tendermint JSON RPC server. The process is
// non-blocking, so an external signal handler must be used.
func (s *Server) Start(cfg config.Config) error {
// and are delegated to the CometBFT JSON RPC server.
//
// Note, this creates a blocking process if the server is started successfully.
// Otherwise, an error is returned. The caller is expected to provide a Context
// that is properly canceled or closed to indicate the server should be stopped.
func (s *Server) Start(ctx context.Context, cfg config.Config) error {
s.mtx.Lock()

tmCfg := tmrpcserver.DefaultConfig()
Expand All @@ -107,13 +113,35 @@ func (s *Server) Start(cfg config.Config) error {
// register grpc-gateway routes
s.Router.PathPrefix("/").Handler(s.GRPCGatewayRouter)

s.logger.Info("starting API server...")
if cfg.API.EnableUnsafeCORS {
allowAllCORS := handlers.CORS(handlers.AllowedHeaders([]string{"Content-Type"}))
return tmrpcserver.Serve(s.listener, allowAllCORS(s.Router), s.logger, tmCfg)
}
errCh := make(chan error)

// Start the API in an external goroutine as Serve is blocking and will return
// an error upon failure, which we'll send on the error channel that will be
// consumed by the for block below.
go func(enableUnsafeCORS bool) {
s.logger.Info("starting API server...", "address", cfg.API.Address)

return tmrpcserver.Serve(s.listener, s.Router, s.logger, tmCfg)
if enableUnsafeCORS {
allowAllCORS := handlers.CORS(handlers.AllowedHeaders([]string{"Content-Type"}))
errCh <- tmrpcserver.Serve(s.listener, allowAllCORS(s.Router), s.logger, tmCfg)
} else {
errCh <- tmrpcserver.Serve(s.listener, s.Router, s.logger, tmCfg)
}
}(cfg.API.EnableUnsafeCORS)

// Start a blocking select to wait for an indication to stop the server or that
// the server failed to start properly.
select {
case <-ctx.Done():
// The calling process canceled or closed the provided context, so we must
// gracefully stop the API server.
s.logger.Info("stopping API server...", "address", cfg.API.Address)
return s.Close()

case err := <-errCh:
s.logger.Error("failed to start API server", "err", err)
return err
}
}

// Close closes the API server.
Expand Down
53 changes: 38 additions & 15 deletions server/grpc/server.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package grpc

import (
"context"
"fmt"
sdk "github.com/Finschia/finschia-sdk/types"
"net"
"time"

"github.com/tendermint/tendermint/libs/log"
"google.golang.org/grpc"

"github.com/Finschia/finschia-sdk/client"
Expand All @@ -13,11 +15,11 @@ import (
"github.com/Finschia/finschia-sdk/server/grpc/gogoreflection"
reflection "github.com/Finschia/finschia-sdk/server/grpc/reflection/v2"
"github.com/Finschia/finschia-sdk/server/types"
sdk "github.com/Finschia/finschia-sdk/types"
)

// StartGRPCServer starts a gRPC server on the given address.
func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) {
// NewGRPCServer returns a correctly configured and initialized gRPC server.
// Note, the caller is responsible for starting the server. See StartGRPCServer.
func NewGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) {
maxSendMsgSize := cfg.MaxSendMsgSize
if maxSendMsgSize == 0 {
maxSendMsgSize = config.DefaultGRPCMaxSendMsgSize
Expand All @@ -29,10 +31,11 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config
}

grpcSrv := grpc.NewServer(
grpc.ForceServerCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()),
grpc.MaxSendMsgSize(maxSendMsgSize),
grpc.MaxRecvMsgSize(maxRecvMsgSize),
grpc.ForceServerCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()),
)

app.RegisterGRPCServer(grpcSrv)

// Reflection allows consumers to build dynamic clients that can write to any
Expand All @@ -51,30 +54,50 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config
InterfaceRegistry: clientCtx.InterfaceRegistry,
})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to register reflection service: %w", err)
}

// Reflection allows external clients to see what services and methods
// the gRPC server exposes.
gogoreflection.Register(grpcSrv)

return grpcSrv, nil
}

// StartGRPCServer starts the provided gRPC server on the address specified in cfg.
//
// Note, this creates a blocking process if the server is started successfully.
// Otherwise, an error is returned. The caller is expected to provide a Context
// that is properly canceled or closed to indicate the server should be stopped.
func StartGRPCServer(ctx context.Context, logger log.Logger, cfg config.GRPCConfig, grpcSrv *grpc.Server) error {
listener, err := net.Listen("tcp", cfg.Address)
if err != nil {
return nil, err
return fmt.Errorf("failed to listen on address %s: %w", cfg.Address, err)
}

errCh := make(chan error)

// Start the gRPC in an external goroutine as Serve is blocking and will return
// an error upon failure, which we'll send on the error channel that will be
// consumed by the for block below.
go func() {
err = grpcSrv.Serve(listener)
if err != nil {
errCh <- fmt.Errorf("failed to serve: %w", err)
}
logger.Info("starting gRPC server...", "address", cfg.Address)
errCh <- grpcSrv.Serve(listener)
}()

// Start a blocking select to wait for an indication to stop the server or that
// the server failed to start properly.
select {
case <-ctx.Done():
// The calling process canceled or closed the provided context, so we must
// gracefully stop the gRPC server.
logger.Info("stopping gRPC server...", "address", cfg.Address)
grpcSrv.GracefulStop()

return nil

case err := <-errCh:
return nil, err
case <-time.After(types.ServerStartTime):
// assume server started successfully
return grpcSrv, nil
logger.Error("failed to start gRPC server", "err", err)
return err
}
}
Loading

0 comments on commit 27502b1

Please sign in to comment.