Skip to content

Commit

Permalink
Remove intermediate channel and exchange loop
Browse files Browse the repository at this point in the history
For InstanceSetSource, don't rely on consuming the InstanceSetUpdate
channel produced by the ScheduleVIPAddressUpdates and
ScheduleSecureVIPAddressUpdates methods. Instead, push updates
directly into the InstanceSetSource.

Likewise, for AppSource, don't rely on consuming the AppUpdate channel
produced by ScheduleAppUpdates.
  • Loading branch information
seh committed Jan 13, 2017
1 parent 46abb44 commit a7ad7a5
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 87 deletions.
53 changes: 30 additions & 23 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,20 +103,16 @@ type AppUpdate struct {
Err error
}

func sendAppUpdatesEvery(d time.Duration, produce func() AppUpdate, c chan<- AppUpdate, done <-chan struct{}) {
func exchangeAppEvery(d time.Duration, produce func() (*Application, error), consume func(*Application, error), done <-chan struct{}) {
t := time.NewTicker(d)
defer t.Stop()
for {
select {
case <-done:
close(c)
return
case <-t.C:
// Drop attempted sends when the consumer hasn't received the last buffered update.
select {
case c <- produce():
default:
}
app, err := produce()
consume(app, err)
}
}
}
Expand All @@ -130,21 +126,31 @@ func sendAppUpdatesEvery(d time.Duration, produce func() AppUpdate, c chan<- App
// If await is true, it sends at least one application update outcome to the
// returned channel before returning.
func (e *EurekaConnection) ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan AppUpdate {
produce := func() AppUpdate {
app, err := e.GetApp(name)
return AppUpdate{app, err}
produce := func() (*Application, error) {
return e.GetApp(name)
}
c := make(chan AppUpdate, 1)
if await {
c <- produce()
app, err := produce()
c <- AppUpdate{app, err}
}
consume := func(app *Application, err error) {
// Drop attempted sends when the consumer hasn't received the last buffered update.
select {
case c <- AppUpdate{app, err}:
default:
}
}
go sendAppUpdatesEvery(time.Duration(e.PollInterval)*time.Second, produce, c, done)
go func() {
defer close(c)
exchangeAppEvery(e.PollInterval, produce, consume, done)
}()
return c
}

// An AppSource holds a periodically updated copy of a Eureka application.
type AppSource struct {
m *sync.RWMutex
m sync.RWMutex
app *Application
done chan<- struct{}
}
Expand All @@ -159,22 +165,23 @@ type AppSource struct {
// would return false.
func (e *EurekaConnection) NewAppSource(name string, await bool) *AppSource {
done := make(chan struct{})
updates := e.ScheduleAppUpdates(name, await, done)
s := &AppSource{
done: done,
}
produce := func() (*Application, error) {
return e.GetApp(name)
}
if await {
if u := <-updates; u.Err != nil {
s.app = u.App
if app, err := produce(); err == nil {
s.app = app
}
}
go func() {
for u := range updates {
s.m.Lock()
s.app = u.App
s.m.Unlock()
}
}()
consume := func(app *Application, err error) {
s.m.Lock()
s.app = app
s.m.Unlock()
}
go exchangeAppEvery(e.PollInterval, produce, consume, done)
return s
}

Expand Down
127 changes: 63 additions & 64 deletions net.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,34 +420,40 @@ type InstanceSetUpdate struct {
Err error
}

func sendVIPAddressUpdatesEvery(d time.Duration, produce func() InstanceSetUpdate, c chan<- InstanceSetUpdate, done <-chan struct{}) {
func exchangeInstancesEvery(d time.Duration, produce func() ([]*Instance, error), consume func([]*Instance, error), done <-chan struct{}) {
t := time.NewTicker(d)
defer t.Stop()
for {
select {
case <-done:
close(c)
return
case <-t.C:
// Drop attempted sends when the consumer hasn't received the last buffered update.
select {
case c <- produce():
default:
}
instances, err := produce()
consume(instances, err)
}
}
}

func (e *EurekaConnection) scheduleVIPAddressUpdates(addr string, kind vipAddressKind, await bool, done <-chan struct{}, opts instanceQueryOptions) <-chan InstanceSetUpdate {
produce := func() InstanceSetUpdate {
instances, err := e.getInstancesByVIPAddress(addr, kind, opts)
return InstanceSetUpdate{instances, err}
produce := func() ([]*Instance, error) {
return e.getInstancesByVIPAddress(addr, kind, opts)
}
c := make(chan InstanceSetUpdate, 1)
if await {
c <- produce()
instances, err := produce()
c <- InstanceSetUpdate{instances, err}
}
consume := func(instances []*Instance, err error) {
// Drop attempted sends when the consumer hasn't received the last buffered update.
select {
case c <- InstanceSetUpdate{instances, err}:
default:
}
}
go sendVIPAddressUpdatesEvery(time.Duration(e.PollInterval)*time.Second, produce, c, done)
go func() {
defer close(c)
exchangeInstancesEvery(time.Duration(e.PollInterval)*time.Second, produce, consume, done)
}()
return c
}

Expand Down Expand Up @@ -492,12 +498,13 @@ func (e *EurekaConnection) ScheduleSecureVIPAddressUpdates(addr string, await bo

// An InstanceSetSource holds a periodically updated set of instances registered with Eureka.
type InstanceSetSource struct {
m *sync.RWMutex
m sync.RWMutex
instances []*Instance
done chan<- struct{}
}

func newInstanceSetSourceFromChannel(await bool, updates <-chan InstanceSetUpdate, done chan<- struct{}) *InstanceSetSource {
func (e *EurekaConnection) newInstanceSetSourceFor(produce func() ([]*Instance, error), await bool) *InstanceSetSource {
done := make(chan struct{})
s := &InstanceSetSource{
done: done,
}
Expand All @@ -507,33 +514,38 @@ func newInstanceSetSourceFromChannel(await bool, updates <-chan InstanceSetUpdat
// getInstancesByVIPAddress (or similar) will be nil. Make it possible to discern when we've
// received at least one update in Latest by never storing a nil value for a successful update.
if await {
if u := <-updates; u.Err != nil {
if proposed := u.Instances; proposed != nil {
s.instances = proposed
if instances, err := produce(); err == nil {
if instances != nil {
s.instances = instances
} else {
s.instances = []*Instance{}
}
}
}
go func() {
for u := range updates {
var latest []*Instance
if u.Err != nil {
if proposed := u.Instances; proposed != nil {
latest = proposed
} else {
latest = []*Instance{}
}
consume := func(instances []*Instance, err error) {
var latest []*Instance
if err == nil {
if instances != nil {
latest = instances
} else {
latest = []*Instance{}
}
s.m.Lock()
s.instances = latest
s.m.Unlock()
}

}()
s.m.Lock()
s.instances = latest
s.m.Unlock()
}
go exchangeInstancesEvery(e.PollInterval, produce, consume, done)
return s
}

func (e *EurekaConnection) newInstanceSetSourceForVIPAddress(addr string, kind vipAddressKind, await bool, opts instanceQueryOptions) *InstanceSetSource {
produce := func() ([]*Instance, error) {
return e.getInstancesByVIPAddress(addr, kind, opts)
}
return e.newInstanceSetSourceFor(produce, await)
}

// NewInstanceSetSourceForVIPAddress returns a new InstantSetSource that offers a periodically
// updated set of instances registered with the given Eureka VIP address, potentially filtered per
// the constraints supplied as options, using the connection's configured polling interval as its
Expand All @@ -546,12 +558,11 @@ func newInstanceSetSourceFromChannel(await bool, updates <-chan InstanceSetUpdat
// It returns an error if any of the supplied options are invalid, precluding it from scheduling the
// intended updates.
func (e *EurekaConnection) NewInstanceSetSourceForVIPAddress(addr string, await bool, opts ...InstanceQueryOption) (*InstanceSetSource, error) {
done := make(chan struct{})
updates, err := e.ScheduleVIPAddressUpdates(addr, await, done, opts...)
options, err := collectInstanceQueryOptions(opts)
if err != nil {
return nil, err
}
return newInstanceSetSourceFromChannel(await, updates, done), nil
return e.newInstanceSetSourceForVIPAddress(addr, insecure, await, options), nil
}

// NewInstanceSetSourceForSecureVIPAddress returns a new InstantSetSource that offers a periodically
Expand All @@ -566,12 +577,11 @@ func (e *EurekaConnection) NewInstanceSetSourceForVIPAddress(addr string, await
// It returns an error if any of the supplied options are invalid, precluding it from scheduling the
// intended updates.
func (e *EurekaConnection) NewInstanceSetSourceForSecureVIPAddress(addr string, await bool, opts ...InstanceQueryOption) (*InstanceSetSource, error) {
done := make(chan struct{})
updates, err := e.ScheduleSecureVIPAddressUpdates(addr, await, done, opts...)
options, err := collectInstanceQueryOptions(opts)
if err != nil {
return nil, err
}
return newInstanceSetSourceFromChannel(await, updates, done), nil
return e.newInstanceSetSourceForVIPAddress(addr, secure, await, options), nil
}

// NewInstanceSetSourceForApp returns a new InstantSetSource that offers a periodically updated set
Expand All @@ -590,36 +600,25 @@ func (e *EurekaConnection) NewInstanceSetSourceForApp(name string, await bool, o
if err != nil {
return nil, err
}
done := make(chan struct{})
downstream := make(chan InstanceSetUpdate, 1)
go func() {
predicate := options.predicate
intn := options.intn
// This channel will be closed when done is closed via a call to InstanceSetSource.Stop.
upstream := e.ScheduleAppUpdates(name, false, done)
for u := range upstream {
var update InstanceSetUpdate
if err := u.Err; err != nil {
update.Err = err
} else if instances := u.App.Instances; instances != nil {
if predicate != nil {
instances = filterInstances(instances, predicate)
}
if intn != nil {
shuffleInstances(instances, intn)
}
update.Instances = instances
} else {
update.Instances = []*Instance{}
predicate := options.predicate
intn := options.intn
produce := func() ([]*Instance, error) {
app, err := e.GetApp(name)
if err != nil {
return nil, err
}
instances := app.Instances
if instances != nil {
if predicate != nil {
instances = filterInstances(instances, predicate)
}
// Drop attempted sends when the consumer hasn't received the last buffered update.
select {
case downstream <- update:
default:
if intn != nil {
shuffleInstances(instances, intn)
}
}
}()
return newInstanceSetSourceFromChannel(await, downstream, done), nil
return instances, nil
}
return e.newInstanceSetSourceFor(produce, await), nil
}

// Latest returns the most recently acquired set of Eureka instances, if any. If the most recent
Expand Down

0 comments on commit a7ad7a5

Please sign in to comment.