-
Notifications
You must be signed in to change notification settings - Fork 94
/
trpc.go
181 lines (161 loc) · 5.91 KB
/
trpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
//
//
// Tencent is pleased to support the open source community by making tRPC available.
//
// Copyright (C) 2023 THL A29 Limited, a Tencent company.
// All rights reserved.
//
// If you have downloaded a copy of the tRPC source code from Tencent,
// please note that tRPC source code is licensed under the Apache 2.0 License,
// A copy of the Apache 2.0 License is included in this file.
//
//
// Package trpc is the Go implementation of tRPC, which is designed to be high-performance,
// everything-pluggable and easy for testing.
package trpc
import (
"errors"
"fmt"
"trpc.group/trpc-go/trpc-go/admin"
"trpc.group/trpc-go/trpc-go/filter"
"trpc.group/trpc-go/trpc-go/log"
"trpc.group/trpc-go/trpc-go/naming/registry"
"trpc.group/trpc-go/trpc-go/rpcz"
"trpc.group/trpc-go/trpc-go/server"
"trpc.group/trpc-go/trpc-go/transport"
"go.uber.org/automaxprocs/maxprocs"
)
// NewServer parses the yaml config file to quickly start the server with multiple services.
// The config file is ./trpc_go.yaml by default and can be set by the flag -conf.
// This method should be called only once.
func NewServer(opt ...server.Option) *server.Server {
// load and parse config file
cfg, err := LoadConfig(serverConfigPath())
if err != nil {
panic("load config fail: " + err.Error())
}
// set to global config for other plugins' accessing to the config
SetGlobalConfig(cfg)
closePlugins, err := SetupPlugins(cfg.Plugins)
if err != nil {
panic("setup plugin fail: " + err.Error())
}
if err := SetupClients(&cfg.Client); err != nil {
panic("failed to setup client: " + err.Error())
}
// set default GOMAXPROCS for docker
maxprocs.Set(maxprocs.Logger(log.Debugf))
s := NewServerWithConfig(cfg, opt...)
s.RegisterOnShutdown(func() {
if err := closePlugins(); err != nil {
log.Errorf("failed to close plugins, err: %s", err)
}
})
return s
}
// NewServerWithConfig initializes a server with a Config.
// If yaml config file not used, custom Config parsing is needed to pass the Config into this function.
// Plugins' setup is left to do if this method is called.
func NewServerWithConfig(cfg *Config, opt ...server.Option) *server.Server {
// repair config
if err := RepairConfig(cfg); err != nil {
panic("repair config fail: " + err.Error())
}
// set to global Config
SetGlobalConfig(cfg)
s := &server.Server{
MaxCloseWaitTime: getMillisecond(cfg.Server.MaxCloseWaitTime),
}
// setup admin service
setupAdmin(s, cfg)
// init service one by one
for _, c := range cfg.Server.Service {
s.AddService(c.Name, newServiceWithConfig(cfg, c, opt...))
}
return s
}
// GetAdminService gets admin service from server.Server.
func GetAdminService(s *server.Server) (*admin.Server, error) {
adminServer, ok := s.Service(admin.ServiceName).(*admin.Server)
if !ok {
return nil, errors.New("admin server may not be enabled")
}
return adminServer, nil
}
func setupAdmin(s *server.Server, cfg *Config) {
// admin configured, then admin service will be started
opts := []admin.Option{
admin.WithSkipServe(cfg.Server.Admin.Port == 0),
admin.WithVersion(Version()),
admin.WithTLS(cfg.Server.Admin.EnableTLS),
admin.WithConfigPath(ServerConfigPath),
admin.WithReadTimeout(getMillisecond(cfg.Server.Admin.ReadTimeout)),
admin.WithWriteTimeout(getMillisecond(cfg.Server.Admin.WriteTimeout)),
}
if cfg.Server.Admin.Port > 0 {
opts = append(opts, admin.WithAddr(fmt.Sprintf("%s:%d", cfg.Server.Admin.IP, cfg.Server.Admin.Port)))
}
if cfg.Server.Admin.RPCZ != nil {
rpcz.GlobalRPCZ = rpcz.NewRPCZ(cfg.Server.Admin.RPCZ.generate())
}
s.AddService(admin.ServiceName, admin.NewServer(opts...))
}
func newServiceWithConfig(cfg *Config, serviceCfg *ServiceConfig, opt ...server.Option) server.Service {
var (
filters filter.ServerChain
filterNames []string
)
// Global filter is at front and is deduplicated.
for _, name := range deduplicate(cfg.Server.Filter, serviceCfg.Filter) {
f := filter.GetServer(name)
if f == nil {
panic(fmt.Sprintf("filter %s no registered, do not configure", name))
}
filters = append(filters, f)
filterNames = append(filterNames, name)
}
filterNames = append(filterNames, "fixTimeout")
var streamFilter []server.StreamFilter
for _, name := range deduplicate(cfg.Server.StreamFilter, serviceCfg.StreamFilter) {
f := server.GetStreamFilter(name)
if f == nil {
panic(fmt.Sprintf("stream filter %s no registered, do not configure", name))
}
streamFilter = append(streamFilter, f)
}
// get registry by service
reg := registry.Get(serviceCfg.Name)
if serviceCfg.Registry != "" && reg == nil {
log.Warnf("service:%s registry not exist", serviceCfg.Name)
}
opts := []server.Option{
server.WithNamespace(cfg.Global.Namespace),
server.WithEnvName(cfg.Global.EnvName),
server.WithContainer(cfg.Global.ContainerName),
server.WithServiceName(serviceCfg.Name),
server.WithProtocol(serviceCfg.Protocol),
server.WithTransport(transport.GetServerTransport(serviceCfg.Transport)),
server.WithNetwork(serviceCfg.Network),
server.WithAddress(serviceCfg.Address),
server.WithStreamFilters(streamFilter...),
server.WithRegistry(reg),
server.WithTimeout(getMillisecond(serviceCfg.Timeout)),
server.WithDisableRequestTimeout(serviceCfg.DisableRequestTimeout),
server.WithDisableKeepAlives(serviceCfg.DisableKeepAlives),
server.WithCloseWaitTime(getMillisecond(cfg.Server.CloseWaitTime)),
server.WithMaxCloseWaitTime(getMillisecond(cfg.Server.MaxCloseWaitTime)),
server.WithIdleTimeout(getMillisecond(serviceCfg.Idletime)),
server.WithTLS(serviceCfg.TLSCert, serviceCfg.TLSKey, serviceCfg.CACert),
server.WithServerAsync(*serviceCfg.ServerAsync),
server.WithMaxRoutines(serviceCfg.MaxRoutines),
server.WithWritev(*serviceCfg.Writev),
}
for i := range filters {
opts = append(opts, server.WithNamedFilter(filterNames[i], filters[i]))
}
if cfg.Global.EnableSet == "Y" {
opts = append(opts, server.WithSetName(cfg.Global.FullSetName))
}
opts = append(opts, opt...)
return server.New(opts...)
}