Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
fengbeihong committed Aug 14, 2021
1 parent 2e3392f commit e2a61e4
Show file tree
Hide file tree
Showing 17 changed files with 195 additions and 226 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
.idea/
demo/main
demo/demo
demo/demo
25 changes: 14 additions & 11 deletions demo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ func EchoWithHttpWrapper(w http.ResponseWriter, req *http.Request) {
return
}
var reqData pb.EchoRequest
err = json.Unmarshal(data, &reqData)
if err != nil {
w.Write([]byte(err.Error()))
return
if len(data) != 0 {
err = json.Unmarshal(data, &reqData)
if err != nil {
w.Write([]byte(err.Error()))
return
}
}
var es echoServer
respData, err := es.Echo(context.Background(), &reqData)
Expand All @@ -57,11 +59,11 @@ func EchoWithHttpWrapper(w http.ResponseWriter, req *http.Request) {
type MyLogger struct {
}

func (m *MyLogger) Infof(format string, args ...interface{}) {
func (m *MyLogger) Info(format string, args ...interface{}) {
log.Printf(format, args...)
}

func (m *MyLogger) Errorf(format string, args ...interface{}) {
func (m *MyLogger) Error(format string, args ...interface{}) {
log.Printf(format, args...)
}

Expand All @@ -72,15 +74,16 @@ func getCurrentFilePath() string {
return filePath
}
func main() {
s := rpc.InitRpc(path.Join(path.Dir(getCurrentFilePath()), "rpc.toml"), rpc.WithLogger(&MyLogger{}))
cfgPath := path.Join(path.Dir(getCurrentFilePath()), "rpc.toml")
s, _ := rpc.InitRpc(cfgPath, rpc.WithLogger(&MyLogger{}))
// 也可以使用默认logger
// s := rpc.InitRpc("./rpc.toml")

// register rpc
pb.RegisterEchoServiceServer(s.GrpcServer(), &echoServer{})

http.HandleFunc("/echo/", EchoWithHttpWrapper)
go http.ListenAndServe(s.HttpServerAddr(), nil)
http.HandleFunc("/echo", EchoWithHttpWrapper)
go http.ListenAndServe(s.HttpAddr(), nil)

// 调用client的例子
go clientExample()
Expand All @@ -92,7 +95,7 @@ func main() {

func clientExample() {
time.Sleep(time.Duration(2) * time.Second)
clientExampleRpcConsul()
//clientExampleRpcConsul()
clientExampleRpcLocal()
clientExampleHttpLocal()
}
Expand Down Expand Up @@ -140,7 +143,7 @@ func clientExampleHttpLocal() {
}
bb, _ := json.Marshal(data)
body := bytes.NewReader(bb)
b, err := rpc.HttpPost(context.Background(), "rpcservername_http", "/v1/example/echo", nil, body)
b, err := rpc.HttpPost(context.Background(), "rpcservername_http", "/echo", nil, body)
if err != nil {
log.Println("clientExampleHttpLocal error: ", err)
return
Expand Down
2 changes: 1 addition & 1 deletion demo/rpc.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ http_port = 9901
port=6060

[consul]
enabled=true
enabled=false
host="127.0.0.1"

[metrics]
Expand Down
6 changes: 3 additions & 3 deletions rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ func init() {
clientConfigMap = make(map[string]*clientConfig)
}

func initRpcClient(cfg *Config) {
for _, item := range cfg.RpcClients {
func initRpcClient(s *Server) {
for _, item := range s.cfg.RpcClients {
if item.CallType == callTypeLocal {
if err := item.checkEndpoints(); err != nil {
cfg.Log.Errorf("%v", err)
s.Log.Error(err.Error())
continue
}
}
Expand Down
11 changes: 4 additions & 7 deletions rpc/client_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ func httpDo(opt *httpclientOption) ([]byte, error) {
}
req, err := http.NewRequest(opt.method, url, opt.body)
if err != nil {
GlobalConf.Log.Errorf("failed to execute http request, service_name: %s, url: %s, error: %s", opt.serviceName, url, err.Error())
return nil, err
return nil, fmt.Errorf("failed to execute http request, service_name: %s, url: %s, error: %s", opt.serviceName, url, err.Error())
}

req.Header.Set("Content-Type", "application/json")
Expand All @@ -118,17 +117,15 @@ func httpDo(opt *httpclientOption) ([]byte, error) {

resp, err := c.Do(req)
if err != nil {
GlobalConf.Log.Errorf("http request failed, service name: %s, url: %s, error: %s", opt.serviceName, url, err.Error())
return nil, err
return nil, fmt.Errorf("http request failed, service name: %s, url: %s, error: %s", opt.serviceName, url, err.Error())
}
if resp.StatusCode >= http.StatusBadRequest {
GlobalConf.Log.Errorf("http request failed, service name: %s, url: %s, code: %d, status: %s", opt.serviceName, url, resp.StatusCode, resp.Status)
return nil, fmt.Errorf("%d:%s", resp.StatusCode, resp.Status)
return nil, fmt.Errorf("http request failed, service name: %s, url: %s, code: %d, status: %s", opt.serviceName, url, resp.StatusCode, resp.Status)
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
GlobalConf.Log.Errorf("http request read body failed, service name: %s, url: %s, error: %s", opt.serviceName, url, err.Error())
return nil, fmt.Errorf("http request read body failed, service name: %s, url: %s, error: %s", opt.serviceName, url, err.Error())
}

return b, nil
Expand Down
6 changes: 2 additions & 4 deletions rpc/client_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ func dialWithConsul(ctx context.Context, cfg *clientConfig, opts ...grpc.DialOpt

conn, err := grpc.DialContext(ctx, fmt.Sprintf("consul://%s:8500/%s", GlobalConf.Consul.Host, cfg.ServiceName), opts...)
if err != nil {
GlobalConf.Log.Errorf("dialWithConsul, dial with context failed: %s", err.Error())
return nil, err
return nil, fmt.Errorf("dialWithConsul, dial with context failed: %s", err.Error())
}

return conn, nil
Expand All @@ -100,8 +99,7 @@ func dialWithLocal(ctx context.Context, cfg *clientConfig, opts ...grpc.DialOpti

conn, err := grpc.DialContext(ctx, cfg.endpointByBalancer(), opts...)
if err != nil {
GlobalConf.Log.Errorf("dialWithLocal, dial with context failed: %s", err.Error())
return nil, err
return nil, fmt.Errorf("dialWithLocal, dial with context failed: %s", err.Error())
}

return conn, nil
Expand Down
1 change: 0 additions & 1 deletion rpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ const (
)

type Config struct {
Log Logger
Pprof pprofConfig `toml:"pprof"`
Server serverConfig
Consul consulConfig
Expand Down
6 changes: 3 additions & 3 deletions rpc/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ func registerConsul(cfg *Config) error {
if cfg.Consul.Host == "" {
cfg.Consul.Host = defaultConsulHost
}
consulConfig := api.DefaultConfig()
consulConfig.Address = fmt.Sprintf("%s:8500", cfg.Consul.Host)
client, err := api.NewClient(consulConfig)
cc := api.DefaultConfig()
cc.Address = fmt.Sprintf("%s:8500", cfg.Consul.Host)
client, err := api.NewClient(cc)
if err != nil {
return fmt.Errorf("waring: create consul client error: %v", err)
}
Expand Down
40 changes: 27 additions & 13 deletions rpc/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,42 @@ package rpc
import "os"

func init() {
// https://developer.aliyun.com/article/238940
// http://tbg.github.io/golang-static-linking-bug?spm=a2c6h.12873639.0.0.52933341DDCBdG
os.Setenv("GODEBUG", "netdns=go")
}

func InitRpc(filePath string, opts ...InitOption) *Server {
cfg := initConfig(filePath)
func InitRpc(filePath string, opts ...InitOption) (*Server, error) {
s := &Server{
cfg: initConfig(filePath),
Log: defaultLogger(),
}

initLogger(cfg, opts...)
for _, opt := range opts {
opt.f(s)
}

initRpcClient(cfg)
setGLogger(s.Log)

initRedisClient(cfg)
initRpcClient(s)

initDBClient(cfg)
initRedisClient(s)

// init rpc server
return initServer(cfg)
}
initDBClient(s)

// init grpc server
s.server = initGrpcServer(s.cfg)

func InitRpcSimple(filePath string, opts ...InitOption) {
cfg := initConfig(filePath)
return s, s.Err
}

initLogger(cfg, opts...)
type InitOption struct {
f func(*Server)
}

initRpcClient(cfg)
// WithLogger init logger
func WithLogger(l Logger) InitOption {
return InitOption{func(s *Server) {
s.Log = l
}}
}
36 changes: 17 additions & 19 deletions rpc/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,35 @@ import (
"os"
)

var gLogger Logger

type Logger interface {
Infof(format string, args ...interface{})
Errorf(format string, args ...interface{})
Info(format string, args ...interface{})
Error(format string, args ...interface{})
}

func init() {
setGLogger(defaultLogger())
}

func newLogger() Logger {
return &logger{
func setGLogger(l Logger) {
gLogger = l
}

func defaultLogger() Logger {
return &myLogger{
log: log.New(os.Stdout, "[rpc] ", log.LstdFlags),
}
}

type logger struct {
type myLogger struct {
log *log.Logger
}

func (l *logger) Infof(format string, args ...interface{}) {
func (l *myLogger) Info(format string, args ...interface{}) {
l.log.Printf("[INFO] "+format, args...)
}

func (l *logger) Errorf(format string, args ...interface{}) {
func (l *myLogger) Error(format string, args ...interface{}) {
l.log.Printf("[ERROR] "+format, args...)
}

func initLogger(cfg *Config, opts ...InitOption) {
options := initOptions{
logger: newLogger(),
}

for _, opt := range opts {
opt.f(&options)
}

cfg.Log = options.logger
}
9 changes: 4 additions & 5 deletions rpc/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package rpc
import (
"context"
"fmt"
"log"
"sync"

"gorm.io/driver/mysql"
Expand Down Expand Up @@ -35,13 +34,13 @@ func loadDB(key string) *DBInfo {
return nil
}

func initDBClient(globalCfg *Config) {
for _, dbCfg := range globalCfg.DBClients {
func initDBClient(s *Server) {
for _, dbCfg := range s.cfg.DBClients {
cfg := dbCfg
info, err := initDB(&cfg)
if err != nil {
// 服务启动阶段链接不上db就直接fatal
log.Fatalf(err.Error())
s.Log.Error(err.Error())
continue
}
globalDBMap.Store(cfg.ServiceName, info)
}
Expand Down
21 changes: 12 additions & 9 deletions rpc/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@ import (
"testing"
)

var mysqlTestConfig = &Config{
DBClients: []dbConfig{
{
ServiceName: "test",
Host: "127.0.0.1",
Port: 3306,
Username: "root",
Password: "aaaaaaaa",
Database: "test",
var mysqlTestConfig = &Server{
cfg: &Config{
DBClients: []dbConfig{
{
ServiceName: "test",
Host: "127.0.0.1",
Port: 3306,
Username: "root",
Password: "aaaaaaaa",
Database: "test",
},
},
},
Log: defaultLogger(),
}

func TestMysqlConn(t *testing.T) {
Expand Down
24 changes: 0 additions & 24 deletions rpc/option.go

This file was deleted.

Loading

0 comments on commit e2a61e4

Please sign in to comment.