Skip to content

Commit

Permalink
Ultilize Module Event Listener (#133)
Browse files Browse the repository at this point in the history
* finish unit test

* fix grammar error

* add a new error named ErrIgnoreChange of updateEvent to tell the caller not to notify listener

* add doc and example

Co-authored-by: Donghong Huang <[email protected]>
  • Loading branch information
EastMacro2020 and Donghong Huang authored Jan 6, 2021
1 parent fb897c6 commit a0ffe95
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 14 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,11 @@ Complete [example](https://github.com/go-chassis/go-archaius/tree/master/example

Complete [example](https://github.com/go-chassis/go-archaius/tree/master/examples/kie)

### Example: Manage module change events

Sometimes, we may want to handle multiple key value changes as a module, which means that
the different key in the module has some relation with each other.
Once such keys have changed, we expect to handle the changes as a whole instead of one by one.
Module events help us to handle this case.

Complete [example](https://github.com/go-chassis/go-archaius/tree/master/examples/module_event)
5 changes: 5 additions & 0 deletions archaius.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ func Init(opts ...Option) error {
//CustomInit accept a list of config source, add it into archaius runtime.
//it almost like Init(), but you can fully control config sources you inject to archaius
func CustomInit(sources ...source.ConfigSource) error {
if running {
openlog.Warn("can not init archaius again, call Clean first")
return nil
}
var err error
manager = source.NewManager()
for _, s := range sources {
Expand All @@ -120,6 +124,7 @@ func CustomInit(sources ...source.ConfigSource) error {
return err
}
}
running = true
return err
}

Expand Down
92 changes: 88 additions & 4 deletions event/event_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,81 @@ const (
InvalidAction = "INVALID-ACTION"
)

type PrefixIndex struct {
Prefix string
NextParts map[string]*PrefixIndex
}

func (pre *PrefixIndex) AddPrefix(prefix string) {
parts := strings.Split(prefix, ".")
cur := pre
for _, part := range parts {
if cur.NextParts == nil {
cur.NextParts = map[string]*PrefixIndex{}
}
next, ok := cur.NextParts[part]
if !ok {
next = &PrefixIndex{}
cur.NextParts[part] = next
}
cur = next
}
cur.Prefix = prefix
}

func (pre *PrefixIndex) RemovePrefix(prefix string) {
parts := strings.Split(prefix, ".")
cur := pre
var path []*PrefixIndex
path = append(path, cur)
for _, part := range parts {
if cur.NextParts == nil {
return
}
next, ok := cur.NextParts[part]
if !ok {
return
}
cur = next
path = append(path, cur)
}
cur.Prefix = ""
remove := ""
for i:=len(path); i>0; i-- {
cur = path[i-1]
if remove != "" {
delete(cur.NextParts, remove)
}
if len(cur.NextParts) > 0 {
break
}
if cur.Prefix != "" {
break
}
if i > 1 {
remove = parts[i-2]
} else {
cur.NextParts = nil
}
}
}

func (pre *PrefixIndex) FindPrefix(key string) string {
parts := strings.Split(key, ".")
cur := pre
for _, part := range parts {
if cur.Prefix != "" {
return cur.Prefix
}
next, ok := cur.NextParts[part]
if !ok {
return ""
}
cur = next
}
return cur.Prefix
}

// Event generated when any config changes
type Event struct {
EventSource string
Expand All @@ -64,6 +139,7 @@ type ModuleListener interface {
type Dispatcher struct {
listeners map[string][]Listener
moduleListeners map[string][]ModuleListener
modulePrefixIndex PrefixIndex
}

// NewDispatcher is a new Dispatcher for listeners
Expand Down Expand Up @@ -166,6 +242,7 @@ func (dis *Dispatcher) RegisterModuleListener(listenerObj ModuleListener, module
moduleListeners, ok := dis.moduleListeners[prefix]
if !ok {
moduleListeners = make([]ModuleListener, 0)
dis.modulePrefixIndex.AddPrefix(prefix)
}

// for duplicate registration
Expand Down Expand Up @@ -207,6 +284,9 @@ func (dis *Dispatcher) UnRegisterModuleListener(listenerObj ModuleListener, modu

// assign latest moduleListener list
dis.moduleListeners[prefix] = newListenerList
if len(newListenerList) == 0 {
dis.modulePrefixIndex.RemovePrefix(prefix)
}
}
return nil
}
Expand Down Expand Up @@ -238,14 +318,18 @@ func (dis *Dispatcher) parseEvents(events []*Event) map[string][]*Event {
var eventList = make(map[string][]*Event)
for _, event := range events {
// find first prefix from event.key
registerKey := dis.findFirstRegisterPrefix(event.Key)
if module, ok := eventList[registerKey]; ok {
//registerKey := dis.findFirstRegisterPrefix(event.Key)
prefix := dis.modulePrefixIndex.FindPrefix(event.Key)
if prefix == "" {
continue
}
if module, ok := eventList[prefix]; ok {
events := module
events = append(events, event)
eventList[registerKey] = events
eventList[prefix] = events
} else {
newModule := append([]*Event{}, event)
eventList[registerKey] = newModule
eventList[prefix] = newModule
}
}

Expand Down
89 changes: 89 additions & 0 deletions event/event_system_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package event_test

import (
"github.com/stretchr/testify/assert"
"sync"
"testing"

"github.com/go-chassis/go-archaius/event"
Expand Down Expand Up @@ -81,3 +83,90 @@ func TestDispatchEvent(t *testing.T) {
dispatcher.UnRegisterListener(eventListener3, "Key1")

}

type MListener struct {
eventKeys []string
wg sync.WaitGroup
}

func (m *MListener) Event(events []*event.Event) {
for _, ev := range events {
m.eventKeys = append(m.eventKeys, ev.Key)
m.wg.Done()
}
m.wg.Done()
}

func TestDispatcher_DispatchModuleEvent(t *testing.T) {
t.Run("RegisterModuleEvent", func(t *testing.T) {
dispatcher := event.NewDispatcher()
lis := &MListener{}
dispatcher.RegisterModuleListener(lis, "aaa.bbb")
lis.wg.Add(3)
dispatcher.DispatchModuleEvent([]*event.Event{
{
Key:"aaa.bbb.ccc",
},
{
Key:"aaa",
},
{
Key:"aaa.bbb",
},
})
lis.wg.Wait()
if assert.Len(t, lis.eventKeys, 2) {
assert.Equal(t, "aaa.bbb.ccc", lis.eventKeys[0])
assert.Equal(t, "aaa.bbb", lis.eventKeys[1])
}
})
t.Run("RegisterModuleEventCovered", func(t *testing.T) {
dispatcher := event.NewDispatcher()
lis1 := &MListener{}
dispatcher.RegisterModuleListener(lis1, "aaa.bbb")
lis2 := &MListener{}
dispatcher.RegisterModuleListener(lis2, "aaa.bbb.ccc")
lis1.wg.Add(3)
dispatcher.DispatchModuleEvent([]*event.Event{
{
Key:"aaa.bbb.ccc",
},
{
Key:"aaa",
},
{
Key:"aaa.bbb",
},
})
lis1.wg.Wait()
if assert.Len(t, lis1.eventKeys, 2) {
assert.Equal(t, "aaa.bbb.ccc", lis1.eventKeys[0])
assert.Equal(t, "aaa.bbb", lis1.eventKeys[1])
}
assert.Len(t, lis2.eventKeys, 0)
})
t.Run("UnRegisterModuleEventCovered", func(t *testing.T) {
dispatcher := event.NewDispatcher()
lis1 := &MListener{}
dispatcher.RegisterModuleListener(lis1, "aaa.bbb")
lis2 := &MListener{}
dispatcher.RegisterModuleListener(lis2, "aaa.bbb.ccc")
dispatcher.UnRegisterModuleListener(lis1, "aaa.bbb")
lis2.wg.Add(2)
dispatcher.DispatchModuleEvent([]*event.Event{
{
Key:"aaa.bbb.ccc",
},
{
Key:"aaa",
},
{
Key:"aaa.bbb",
},
})
lis2.wg.Wait()
if assert.Len(t, lis2.eventKeys, 1) {
assert.Equal(t, "aaa.bbb.ccc", lis2.eventKeys[0])
}
})
}
12 changes: 12 additions & 0 deletions examples/module_event/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
module_event.go keep a file under archaius's management, and watch module changes,
so that if there are multiple changes under the module **test.person**,
the events will be triggered once, and listener will receive the event list

```
go build module_event.go
./module_event
```

change configs under **test.person** in the module_event.yaml

check the stdout to see events
45 changes: 45 additions & 0 deletions examples/module_event/module_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package main

import (
"fmt"
"log"
"time"

"github.com/go-chassis/go-archaius"
"github.com/go-chassis/go-archaius/event"
"github.com/go-chassis/openlog"
)

//Listener is a struct used for Event listener
type Person struct {
Name string `yaml:"name"`
Age int `yaml:"age"`
Favorites map[string]string `yaml:"favorites"`
}
type Listener struct {
Person Person `yaml:"test.person"`
}

//Event is a method for QPS event listening
func (e *Listener) Event(events []*event.Event) {
for i, ev := range events {
openlog.GetLogger().Info(fmt.Sprintf("%dth event:%+v", i, ev))
}
archaius.UnmarshalConfig(e)
}

func main() {
err := archaius.Init(archaius.WithRequiredFiles([]string{
"./module_event.yaml",
}))
if err != nil {
openlog.Error("Error:" + err.Error())
return
}
lis := &Listener{}
archaius.RegisterModuleListener(lis, "test.person")
for {
log.Printf("%+v\n", lis)
time.Sleep(5 * time.Second)
}
}
8 changes: 8 additions & 0 deletions examples/module_event/module_event.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
test:
person:
name: tom
age: 24
favorites:
food: dumplins
color: red
8 changes: 5 additions & 3 deletions source/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,10 +436,12 @@ func (wth *watch) watchFile() {
openlog.Debug(fmt.Sprintf("new config: %v", newConf))
events := wth.fileSource.compareUpdate(newConf, event.Name)
openlog.Debug(fmt.Sprintf("generated events %v", events))
for _, e := range events {
wth.callback.OnEvent(e)
if len(events) > 0 { //avoid OnModuleEvent empty events error
for _, e := range events {
wth.callback.OnEvent(e)
}
wth.callback.OnModuleEvent(events)
}
wth.callback.OnModuleEvent(events)

case err := <-wth.watcher.Errors:
openlog.Debug(fmt.Sprintf("watch file error: %s", err))
Expand Down
Loading

0 comments on commit a0ffe95

Please sign in to comment.