From 00240aa30cd92e81a61a6fe3e796f7849de16ea5 Mon Sep 17 00:00:00 2001 From: "Steven E. Harris" Date: Fri, 9 Dec 2016 13:45:46 -0500 Subject: [PATCH] Provide ScheduleAppInstanceUpdates method Allow use of a Eureka application as a source of instances just like a VIP address, optionally subjected to the same filtering and shuffling. --- example_appupdate_test.go | 23 ++++++++++++ net.go | 78 +++++++++++++++++++++++++++------------ 2 files changed, 77 insertions(+), 24 deletions(-) diff --git a/example_appupdate_test.go b/example_appupdate_test.go index e03581b..ae29727 100644 --- a/example_appupdate_test.go +++ b/example_appupdate_test.go @@ -27,6 +27,29 @@ func ExampleEurekaConnection_ScheduleAppUpdates() { fmt.Printf("Done monitoring application %q.\n", name) } +func ExampleEurekaConnection_ScheduleAppInstanceUpdates() { + e := makeConnection() + done := make(chan struct{}) + time.AfterFunc(2*time.Minute, func() { + close(done) + }) + name := "my_app" + updates, err := e.ScheduleAppInstanceUpdates(name, true, done, fargo.ThatAreUp, fargo.Shuffled) + if err != nil { + fmt.Println(err) + return + } + fmt.Printf("Monitoring instances of application %q.\n", name) + for update := range updates { + if update.Err != nil { + fmt.Printf("Most recent request for application %q failed: %v\n", name, update.Err) + continue + } + fmt.Printf("Application %q has %d instances available.\n", name, len(update.Instances)) + } + fmt.Printf("Done monitoring instances of application %q.\n", name) +} + func ExampleAppSource_Latest() { e := makeConnection() name := "my_app" diff --git a/net.go b/net.go index c94ec8a..e3f48da 100644 --- a/net.go +++ b/net.go @@ -434,10 +434,7 @@ func exchangeInstancesEvery(d time.Duration, produce func() ([]*Instance, error) } } -func (e *EurekaConnection) scheduleVIPAddressUpdates(addr string, kind vipAddressKind, await bool, done <-chan struct{}, opts instanceQueryOptions) <-chan InstanceSetUpdate { - produce := func() ([]*Instance, error) { - return e.getInstancesByVIPAddress(addr, kind, opts) - } +func scheduleInstanceUpdates(d time.Duration, produce func() ([]*Instance, error), await bool, done <-chan struct{}) <-chan InstanceSetUpdate { c := make(chan InstanceSetUpdate, 1) if await { instances, err := produce() @@ -452,11 +449,18 @@ func (e *EurekaConnection) scheduleVIPAddressUpdates(addr string, kind vipAddres } go func() { defer close(c) - exchangeInstancesEvery(time.Duration(e.PollInterval)*time.Second, produce, consume, done) + exchangeInstancesEvery(d, produce, consume, done) }() return c } +func (e *EurekaConnection) scheduleVIPAddressUpdates(addr string, kind vipAddressKind, await bool, done <-chan struct{}, opts instanceQueryOptions) <-chan InstanceSetUpdate { + produce := func() ([]*Instance, error) { + return e.getInstancesByVIPAddress(addr, kind, opts) + } + return scheduleInstanceUpdates(e.PollInterval, produce, await, done) +} + // ScheduleVIPAddressUpdates starts polling for updates to the set of instances registered with the // given Eureka VIP address, potentially filtered per the constraints supplied as options, using the // connection's configured polling interval as its period. It sends the outcome of each update @@ -496,6 +500,50 @@ func (e *EurekaConnection) ScheduleSecureVIPAddressUpdates(addr string, await bo return e.scheduleVIPAddressUpdates(addr, secure, await, done, options), nil } +func (e *EurekaConnection) makeInstanceProducerForApp(name string, opts []InstanceQueryOption) (func() ([]*Instance, error), error) { + options, err := collectInstanceQueryOptions(opts) + if err != nil { + return nil, err + } + predicate := options.predicate + intn := options.intn + return func() ([]*Instance, error) { + app, err := e.GetApp(name) + if err != nil { + return nil, err + } + instances := app.Instances + if instances != nil { + if predicate != nil { + instances = filterInstances(instances, predicate) + } + if intn != nil { + shuffleInstances(instances, intn) + } + } + return instances, nil + }, nil +} + +// ScheduleAppInstanceUpdates starts polling for updates to the set of instances from the Eureka +// application with the given name, potentially filtered per the constraints supplied as options, +// using the connection's configured polling interval as its period. It sends the outcome of each +// update attempt to the returned channel, and continues until the supplied done channel is either +// closed or has a value available. Once done sending updates to the returned channel, it closes it. +// +// If await is true, it sends at least one instance set update outcome to the returned channel +// before returning. +// +// It returns an error if any of the supplied options are invalid, precluding it from scheduling the +// intended updates. +func (e *EurekaConnection) ScheduleAppInstanceUpdates(name string, await bool, done <-chan struct{}, opts ...InstanceQueryOption) (<-chan InstanceSetUpdate, error) { + produce, err := e.makeInstanceProducerForApp(name, opts) + if err != nil { + return nil, err + } + return scheduleInstanceUpdates(e.PollInterval, produce, await, done), nil +} + // An InstanceSetSource holds a periodically updated set of instances registered with Eureka. type InstanceSetSource struct { m sync.RWMutex @@ -596,28 +644,10 @@ func (e *EurekaConnection) NewInstanceSetSourceForSecureVIPAddress(addr string, // It returns an error if any of the supplied options are invalid, precluding it from scheduling the // intended updates. func (e *EurekaConnection) NewInstanceSetSourceForApp(name string, await bool, opts ...InstanceQueryOption) (*InstanceSetSource, error) { - options, err := collectInstanceQueryOptions(opts) + produce, err := e.makeInstanceProducerForApp(name, opts) if err != nil { return nil, err } - predicate := options.predicate - intn := options.intn - produce := func() ([]*Instance, error) { - app, err := e.GetApp(name) - if err != nil { - return nil, err - } - instances := app.Instances - if instances != nil { - if predicate != nil { - instances = filterInstances(instances, predicate) - } - if intn != nil { - shuffleInstances(instances, intn) - } - } - return instances, nil - } return e.newInstanceSetSourceFor(produce, await), nil }