-
Notifications
You must be signed in to change notification settings - Fork 0
/
resolver.go
106 lines (97 loc) · 3.28 KB
/
resolver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package loadbalance
import (
"context"
"fmt"
api "github.com/anshulsood11/loghouse/api/v1"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"sync"
)
type Resolver struct {
clientConn resolver.ClientConn
resolverConn *grpc.ClientConn
serviceConfig *serviceconfig.ParseResult
logger *zap.Logger
mu sync.Mutex
}
var _ resolver.Builder = (*Resolver)(nil)
var _ resolver.Resolver = (*Resolver)(nil)
const Name = "loghouse"
// Build receives the data (like the target address) needed to build a
// resolver that can discover the servers and update the client connection
// with the servers it discovers.
func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn,
opts resolver.BuildOptions) (resolver.Resolver, error) {
r.logger = zap.L().Named("resolver")
// clientConn connection is the user’s client connection and gRPC passes it
// to the resolver for the resolver to update with the servers it discovers.
r.clientConn = cc
var dialOpts []grpc.DialOption
if opts.DialCreds != nil {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(opts.DialCreds))
}
// Services can specify how clients should balance their calls to the service
// by updating the state with a service config. The state is updated with a
// service config that specifies to use the “Loghouse” load balancer written
// in our picker.go
r.serviceConfig = r.clientConn.ParseServiceConfig(
fmt.Sprintf(`{"loadBalancingConfig":[{"%s":{}}]}`, Name),
)
var err error
// resolverConn is the resolver’s own client connection to the server so it
// can call GetServers() and get the servers.
r.resolverConn, err = grpc.Dial(target.Endpoint(), dialOpts...)
if err != nil {
return nil, err
}
r.ResolveNow(resolver.ResolveNowOptions{})
return r, nil
}
// Scheme returns the resolver’s scheme identifier. When grpc.Dial
// is called, gRPC parses out the scheme from the target address given
// and tries to find a resolver that matches, defaulting to its DNS
// resolver. For this resolver, target address will be formatted like
// this: loghouse://our-service-address.
func (r *Resolver) Scheme() string {
return Name
}
// ResolveNow is used by gRPC to resolve the target, discover the servers,
// and update the client connection with the servers.
func (r *Resolver) ResolveNow(resolver.ResolveNowOptions) {
r.mu.Lock()
defer r.mu.Unlock()
client := api.NewLogClient(r.resolverConn)
ctx := context.Background()
res, err := client.GetServers(ctx, &api.GetServersRequest{})
if err != nil {
r.logger.Error("failed to resolve server", zap.Error(err))
return
}
var addrs []resolver.Address
for _, server := range res.Servers {
addrs = append(addrs, resolver.Address{
Addr: server.RpcAddr,
Attributes: attributes.New("is_leader", server.IsLeader),
})
}
err = r.clientConn.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: r.serviceConfig,
})
if err != nil {
r.logger.Error("failed to update state", zap.Error(err))
return
}
}
func (r *Resolver) Close() {
if err := r.resolverConn.Close(); err != nil {
r.logger.Error("failed to close conn", zap.Error(err))
}
}
func init() {
// register this resolver with gRPC
resolver.Register(&Resolver{})
}