Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

Commit

Permalink
Merge pull request #1673 from endocode/dongsu/grpc-fix-build-clientconn
Browse files Browse the repository at this point in the history
registry/rpc: use simpleBalancer instead of ClientConn.State()
  • Loading branch information
Dongsu Park committed Nov 24, 2016
2 parents 774ac31 + ecb121a commit eb6872f
Show file tree
Hide file tree
Showing 66 changed files with 13,025 additions and 3,148 deletions.
7 changes: 6 additions & 1 deletion engine/rpcengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,12 @@ func rpcAcquireLeadership(reg registry.Registry, lManager lease.Manager, machID
return l
}

if existing != nil && existing.Version() >= ver {
// If reg is not ready, we have to give it an opportunity to steal lease
// below. Otherwise it could be blocked forever by the existing engine leader,
// which could cause gRPC registry to always fail when a leader already exists.
// Thus we return the existing leader, only if reg.IsRegistryReady() == true.
// TODO(dpark): refactor the entire function for better readability. - 20160908
if (existing != nil && existing.Version() >= ver) && reg.IsRegistryReady() {
log.Debugf("Lease already held by Machine(%s) operating at acceptable version %d", existing.MachineID(), existing.Version())
return existing
}
Expand Down
7 changes: 4 additions & 3 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ import:
- package: google.golang.org/api
subpackages:
- googleapi
- package: github.com/spf13/viper
2,247 changes: 699 additions & 1,548 deletions protobuf/fleet.pb.go

Large diffs are not rendered by default.

80 changes: 80 additions & 0 deletions registry/rpc/balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2016 The fleet Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rpc

import (
"sync"
"sync/atomic"

"golang.org/x/net/context"
"google.golang.org/grpc"
)

// simpleBalancer implements grpc.Balancer interface, being as simple as possible.
// to be used only for fleet.
//
// In principle grpc.Balancer is meant to be handling load balancer across
// multiple connections via addresses for RPCs.
// * Start() does initialization work to bootstrap a Balancer.
// * Up() informs the Balancer that gRPC has a connection to the server at addr.
// It returns Down() which is called once the connection to addr gets lost
// or closed.
// * Get() gets the address of a server for the RPC corresponding to ctx.
// * Notify() returns a channel that is used by gRPC internals to watch the
// addresses gRPC needs to connect.
// * Close() shuts down the balancer.
//
// However, as fleet needs to care only about a single connection, simpleBalancer
// in fleet should be kept as simple as possible. Most crucially simpleBalancer
// provides a simple channel, readyc, to notify the rpcRegistry of the connection
// being available. readyc gets closed in Up(), which will cause, for example,
// IsRegistryReady() to recognize that the connection is available. We don't need
// to care about which value the readyc has.
type simpleBalancer struct {
addrs []string
numGets uint32

// readyc closes once the first connection is up
readyc chan struct{}
readyOnce sync.Once
}

func newSimpleBalancer(eps []string) *simpleBalancer {
return &simpleBalancer{
addrs: eps,
readyc: make(chan struct{}),
}
}

func (b *simpleBalancer) Start(target string) error { return nil }

func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
b.readyOnce.Do(func() { close(b.readyc) })
return func(error) {}
}

func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) {
v := atomic.AddUint32(&b.numGets, 1)
addr := b.addrs[v%uint32(len(b.addrs))]

return grpc.Address{Addr: addr}, func() {}, nil
}

func (b *simpleBalancer) Notify() <-chan []grpc.Address { return nil }

func (b *simpleBalancer) Close() error {
b.readyc = make(chan struct{})
return nil
}
60 changes: 16 additions & 44 deletions registry/rpc/rpcregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,14 @@ import (
"github.com/coreos/fleet/unit"
)

const (
grpcConnectionTimeout = 5000 * time.Millisecond

grpcConnectionStateReady = "READY"
grpcConnectionStateConnecting = "CONNECTING"
grpcConnectionStateShutdown = "SHUTDOWN"
grpcConnectionStateFailure = "TRANSIENT_FAILURE"
)

var DebugRPCRegistry bool = false

type RPCRegistry struct {
dialer func(addr string, timeout time.Duration) (net.Conn, error)
mu *sync.Mutex
registryClient pb.RegistryClient
registryConn *grpc.ClientConn
balancer *simpleBalancer
}

func NewRPCRegistry(dialer func(string, time.Duration) (net.Conn, error)) *RPCRegistry {
Expand All @@ -63,35 +55,17 @@ func (r *RPCRegistry) ctx() context.Context {
}

func (r *RPCRegistry) getClient() pb.RegistryClient {
for {
st, err := r.registryConn.State()
if err != nil {
log.Fatalf("Unable to get the state of rpc connection: %v", err)
}
state := st.String()
if state == grpcConnectionStateReady {
break
} else if state == grpcConnectionStateConnecting {
if DebugRPCRegistry {
log.Infof("gRPC connection state: %s", state)
}
continue
} else if state == grpcConnectionStateFailure || state == grpcConnectionStateShutdown {
log.Infof("gRPC connection state '%s' reports an error in the connection", state)
log.Info("Reconnecting gRPC peer to fleet-engine...")
r.Connect()
}

time.Sleep(grpcConnectionTimeout)
}

return r.registryClient
}

func (r *RPCRegistry) Connect() {
// We want the connection operation to block and constantly reconnect using grpc backoff
log.Info("Starting gRPC connection to fleet-engine...")
connection, err := grpc.Dial(":fleet-engine:", grpc.WithTimeout(12*time.Second), grpc.WithInsecure(), grpc.WithDialer(r.dialer), grpc.WithBlock())
ep_engines := []string{":fleet-engine:"}
r.balancer = newSimpleBalancer(ep_engines)
connection, err := grpc.Dial(ep_engines[0],
grpc.WithTimeout(12*time.Second), grpc.WithInsecure(),
grpc.WithDialer(r.dialer), grpc.WithBlock(), grpc.WithBalancer(r.balancer))
if err != nil {
log.Fatalf("Unable to dial to registry: %s", err)
}
Expand All @@ -106,24 +80,22 @@ func (r *RPCRegistry) Close() {

func (r *RPCRegistry) IsRegistryReady() bool {
if r.registryConn != nil {
st, err := r.registryConn.State()
if err != nil {
log.Fatalf("Unable to get the state of rpc connection: %v", err)
}
connState := st.String()
log.Infof("Registry connection state: %s", connState)
if connState != grpcConnectionStateReady {
log.Errorf("unable to connect to registry connection state: %s", connState)
return false
hasConn := false
if r.balancer != nil {
select {
case <-r.balancer.readyc:
hasConn = true
}
}
log.Infof("Getting server status...")

status, err := r.Status()
if err != nil {
log.Errorf("unable to get the status of the registry service %v", err)
return false
}
log.Infof("Status of rpc service: %d, connection state: %s", status, connState)
return status == pb.HealthCheckResponse_SERVING && err == nil
log.Infof("Status of rpc service: %d, balancer has a connection: %t", status, hasConn)

return hasConn && status == pb.HealthCheckResponse_SERVING && err == nil
}
return false
}
Expand Down
4 changes: 3 additions & 1 deletion registry/rpc/rpcregistry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ func TestRPCRegistryClientCreation(t *testing.T) {
t.Fatalf("failed to parse listener address: %v", err)
}
addr := "localhost:" + port
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second), grpc.WithBlock())
b := newSimpleBalancer([]string{addr})
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second),
grpc.WithBlock(), grpc.WithBalancer(b))
if err != nil {
t.Fatalf("failed to dial to the server %q: %v", addr, err)
}
Expand Down
2 changes: 0 additions & 2 deletions vendor/github.com/coreos/go-systemd/activation/listeners.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit eb6872f

Please sign in to comment.