Skip to content

Commit

Permalink
Provide ScheduleAppInstanceUpdates method
Browse files Browse the repository at this point in the history
Allow use of a Eureka application as a source of instances just like a
VIP address, optionally subjected to the same filtering and shuffling.
  • Loading branch information
seh committed Jan 13, 2017
1 parent a7ad7a5 commit 00240aa
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 24 deletions.
23 changes: 23 additions & 0 deletions example_appupdate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,29 @@ func ExampleEurekaConnection_ScheduleAppUpdates() {
fmt.Printf("Done monitoring application %q.\n", name)
}

func ExampleEurekaConnection_ScheduleAppInstanceUpdates() {
e := makeConnection()
done := make(chan struct{})
time.AfterFunc(2*time.Minute, func() {
close(done)
})
name := "my_app"
updates, err := e.ScheduleAppInstanceUpdates(name, true, done, fargo.ThatAreUp, fargo.Shuffled)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("Monitoring instances of application %q.\n", name)
for update := range updates {
if update.Err != nil {
fmt.Printf("Most recent request for application %q failed: %v\n", name, update.Err)
continue
}
fmt.Printf("Application %q has %d instances available.\n", name, len(update.Instances))
}
fmt.Printf("Done monitoring instances of application %q.\n", name)
}

func ExampleAppSource_Latest() {
e := makeConnection()
name := "my_app"
Expand Down
78 changes: 54 additions & 24 deletions net.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,10 +434,7 @@ func exchangeInstancesEvery(d time.Duration, produce func() ([]*Instance, error)
}
}

func (e *EurekaConnection) scheduleVIPAddressUpdates(addr string, kind vipAddressKind, await bool, done <-chan struct{}, opts instanceQueryOptions) <-chan InstanceSetUpdate {
produce := func() ([]*Instance, error) {
return e.getInstancesByVIPAddress(addr, kind, opts)
}
func scheduleInstanceUpdates(d time.Duration, produce func() ([]*Instance, error), await bool, done <-chan struct{}) <-chan InstanceSetUpdate {
c := make(chan InstanceSetUpdate, 1)
if await {
instances, err := produce()
Expand All @@ -452,11 +449,18 @@ func (e *EurekaConnection) scheduleVIPAddressUpdates(addr string, kind vipAddres
}
go func() {
defer close(c)
exchangeInstancesEvery(time.Duration(e.PollInterval)*time.Second, produce, consume, done)
exchangeInstancesEvery(d, produce, consume, done)
}()
return c
}

func (e *EurekaConnection) scheduleVIPAddressUpdates(addr string, kind vipAddressKind, await bool, done <-chan struct{}, opts instanceQueryOptions) <-chan InstanceSetUpdate {
produce := func() ([]*Instance, error) {
return e.getInstancesByVIPAddress(addr, kind, opts)
}
return scheduleInstanceUpdates(e.PollInterval, produce, await, done)
}

// ScheduleVIPAddressUpdates starts polling for updates to the set of instances registered with the
// given Eureka VIP address, potentially filtered per the constraints supplied as options, using the
// connection's configured polling interval as its period. It sends the outcome of each update
Expand Down Expand Up @@ -496,6 +500,50 @@ func (e *EurekaConnection) ScheduleSecureVIPAddressUpdates(addr string, await bo
return e.scheduleVIPAddressUpdates(addr, secure, await, done, options), nil
}

func (e *EurekaConnection) makeInstanceProducerForApp(name string, opts []InstanceQueryOption) (func() ([]*Instance, error), error) {
options, err := collectInstanceQueryOptions(opts)
if err != nil {
return nil, err
}
predicate := options.predicate
intn := options.intn
return func() ([]*Instance, error) {
app, err := e.GetApp(name)
if err != nil {
return nil, err
}
instances := app.Instances
if instances != nil {
if predicate != nil {
instances = filterInstances(instances, predicate)
}
if intn != nil {
shuffleInstances(instances, intn)
}
}
return instances, nil
}, nil
}

// ScheduleAppInstanceUpdates starts polling for updates to the set of instances from the Eureka
// application with the given name, potentially filtered per the constraints supplied as options,
// using the connection's configured polling interval as its period. It sends the outcome of each
// update attempt to the returned channel, and continues until the supplied done channel is either
// closed or has a value available. Once done sending updates to the returned channel, it closes it.
//
// If await is true, it sends at least one instance set update outcome to the returned channel
// before returning.
//
// It returns an error if any of the supplied options are invalid, precluding it from scheduling the
// intended updates.
func (e *EurekaConnection) ScheduleAppInstanceUpdates(name string, await bool, done <-chan struct{}, opts ...InstanceQueryOption) (<-chan InstanceSetUpdate, error) {
produce, err := e.makeInstanceProducerForApp(name, opts)
if err != nil {
return nil, err
}
return scheduleInstanceUpdates(e.PollInterval, produce, await, done), nil
}

// An InstanceSetSource holds a periodically updated set of instances registered with Eureka.
type InstanceSetSource struct {
m sync.RWMutex
Expand Down Expand Up @@ -596,28 +644,10 @@ func (e *EurekaConnection) NewInstanceSetSourceForSecureVIPAddress(addr string,
// It returns an error if any of the supplied options are invalid, precluding it from scheduling the
// intended updates.
func (e *EurekaConnection) NewInstanceSetSourceForApp(name string, await bool, opts ...InstanceQueryOption) (*InstanceSetSource, error) {
options, err := collectInstanceQueryOptions(opts)
produce, err := e.makeInstanceProducerForApp(name, opts)
if err != nil {
return nil, err
}
predicate := options.predicate
intn := options.intn
produce := func() ([]*Instance, error) {
app, err := e.GetApp(name)
if err != nil {
return nil, err
}
instances := app.Instances
if instances != nil {
if predicate != nil {
instances = filterInstances(instances, predicate)
}
if intn != nil {
shuffleInstances(instances, intn)
}
}
return instances, nil
}
return e.newInstanceSetSourceFor(produce, await), nil
}

Expand Down

0 comments on commit 00240aa

Please sign in to comment.