Skip to content

Commit

Permalink
Merge pull request #37 from themue/feature/example-#30
Browse files Browse the repository at this point in the history
Merge base of cells example
  • Loading branch information
Frank Mueller authored Sep 10, 2017
2 parents 7b945b7 + d4aaebe commit afb66bb
Show file tree
Hide file tree
Showing 15 changed files with 378 additions and 28 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ _testmain.go
*.swp

# Editors
.vscode
.vscode
example/coinalyzer/coinalyzer
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Description

The *Tideland Go Cells* provide a package for the creation of event based
The **Tideland Go Cells** provide a package for the creation of event based
applications with networked concurrently working cells. The way how they
process the recevied events is defined by behaviors. During the processing
of an event a cell can emit multiple events to its subscribers.
Expand Down Expand Up @@ -72,10 +72,18 @@ still growing.
Those have to emit it when receiving the topic "status" with a status
cell ID as payload.
- **Ticker** emits tick events in a defined interval.
- **Topic/Payloads** collects payloads per topic, processes them and emits
the result payload.

### Example

An example application using the **Tideland Go Cells** to analyze a stream
of crypto coin information. It's called *coinalyzer*. Later extensions may
correlate it with news streams.

## Contributors

- Frank Mueller (https://github.com/TheMue / https://github.com/tideland)
- Frank Mueller (https://github.com/themue / https://github.com/tideland)
- Jonathan Camp (https://github.com/kung-foo)

## License
Expand Down
3 changes: 1 addition & 2 deletions behaviors/evaluator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,13 @@ func TestLimitedEvaluatorBehavior(t *testing.T) {
env.EmitNew("evaluator", topic, nil)
}
time.Sleep(time.Second)

env.EmitNew("collector", cells.TopicProcess, cells.PayloadClear)
assert.WaitTested(sigc, func(value interface{}) error {
event, ok := value.(cells.Event)
assert.True(ok)
var evaluation behaviors.Evaluation
err := event.Payload().Unmarshal(&evaluation)
assert.Equal(evaluation.Count, 10)
assert.Equal(evaluation.Count, 5)
assert.Equal(evaluation.MinRating, 1.0)
assert.Equal(evaluation.MaxRating, 3.0)
assert.Equal(evaluation.AvgRating, 2.2)
Expand Down
14 changes: 8 additions & 6 deletions behaviors/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,23 @@ import (
// ROUTER BEHAVIOR
//--------------------

// RouterFunc is a function type determining which subscribed
// Router is a function type determining which subscribed
// cells shall receive the event.
type RouterFunc func(emitterID, subscriberID string, event cells.Event) (bool, error)
type Router func(emitterID, subscriberID string, event cells.Event) (bool, error)

// routerBehavior check for each received event which subscriber will
// get it based on the router function.
type routerBehavior struct {
cell cells.Cell
routerFunc RouterFunc
shallRoute Router
}

// NewRouterBehavior creates a router behavior using the passed function
// to determine to which subscriber the received event will be emitted.
func NewRouterBehavior(rf RouterFunc) cells.Behavior {
return &routerBehavior{nil, rf}
func NewRouterBehavior(router Router) cells.Behavior {
return &routerBehavior{
shallRoute: router,
}
}

// Init the behavior.
Expand All @@ -51,7 +53,7 @@ func (b *routerBehavior) Terminate() error {
// function.
func (b *routerBehavior) ProcessEvent(event cells.Event) error {
return b.cell.SubscribersDo(func(s cells.Subscriber) error {
ok, err := b.routerFunc(b.cell.ID(), s.ID(), event)
ok, err := b.shallRoute(b.cell.ID(), s.ID(), event)
if err != nil {
return err
}
Expand Down
20 changes: 11 additions & 9 deletions behaviors/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,29 @@ import (
// SIMPLE BEHAVIOR
//--------------------

// SimpleProcessorFunc is a function type doing the event processing.
type SimpleProcessorFunc func(cell cells.Cell, event cells.Event) error
// SimpleProcessor is a function type doing the event processing.
type SimpleProcessor func(cell cells.Cell, event cells.Event) error

// simpleBehavior is a simple event processor using the processor
// function for its own logic.
type simpleBehavior struct {
cell cells.Cell
processorFunc SimpleProcessorFunc
cell cells.Cell
process SimpleProcessor
}

// NewSimpleProcessorBehavior creates a behavior based on the passed function.
// Instead of an own logic and an own state it uses the passed simple processor
// function for the event processing.
func NewSimpleProcessorBehavior(spf SimpleProcessorFunc) cells.Behavior {
if spf == nil {
spf = func(cell cells.Cell, event cells.Event) error {
func NewSimpleProcessorBehavior(processor SimpleProcessor) cells.Behavior {
if processor == nil {
processor = func(cell cells.Cell, event cells.Event) error {
logger.Errorf("simple processor %q used without function to handle event %v", cell.ID(), event)
return nil
}
}
return &simpleBehavior{nil, spf}
return &simpleBehavior{
process: processor,
}
}

// Init the behavior.
Expand All @@ -57,7 +59,7 @@ func (b *simpleBehavior) Terminate() error {

// ProcessEvent calls the simple processor function.
func (b *simpleBehavior) ProcessEvent(event cells.Event) error {
return b.processorFunc(b.cell, event)
return b.process(b.cell, event)
}

// Recover from an error.
Expand Down
1 change: 1 addition & 0 deletions cells/cell.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (c *cell) stop() error {
return nil
})
// Stop own backend.
c.queue.Close()
err := c.loop.Stop()
if err != nil {
logger.Errorf("cell '%s' stopped with error: %v", c.id, err)
Expand Down
59 changes: 51 additions & 8 deletions cells/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,39 +11,82 @@ package cells
// IMPORTS
//--------------------

import ()
import (
"github.com/tideland/golib/loop"
)

//--------------------
// CONSTANTS
//--------------------

// TODO(mue) maxPending will later limit the queue size.
const maxPending = 65536

//--------------------
// IN-MEMORY QUEUE
//--------------------

// inMemoryQueue implements Queue based on a simple channel.
type inMemoryQueue struct {
queuec chan Event
inc chan Event
outc chan Event
loop loop.Loop
}

// newInMemoryQueue creates the in-memory queue.
func newInMemoryQueue() Queue {
return &inMemoryQueue{
queuec: make(chan Event, 1),
q := &inMemoryQueue{
inc: make(chan Event),
outc: make(chan Event),
}
q.loop = loop.Go(q.backendLoop)
return q
}

// Emit implements the Queue interface.
func (q *inMemoryQueue) Emit(event Event) error {
q.queuec <- event
q.inc <- event
return nil
}

// Events implements the Queue interface.
func (q *inMemoryQueue) Events() <-chan Event {
return q.queuec
return q.outc
}

// Close implements the Queue interface.
func (q *inMemoryQueue) Close() error {
close(q.queuec)
return nil
return q.loop.Stop()
}

// backendLoop runs the queue goroutine.
func (q *inMemoryQueue) backendLoop(l loop.Loop) error {
defer close(q.outc)
defer close(q.inc)

var pending []Event

for {
var first Event
var outc chan Event

if len(pending) > 0 {
first = pending[0]
outc = q.outc
}

select {
case <-l.ShallStop():
return nil
case event := <-q.inc:
// TODO(mue) Limit queue size, have to think about strategy.
if len(pending) < maxPending {
pending = append(pending, event)
}
case outc <- first:
pending = pending[1:]
}
}
}

// EOF
12 changes: 12 additions & 0 deletions example/behaviors/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Tideland Go Cells - Example - Behaviors - Documentation
//
// Copyright (C) 2010-2017 Frank Mueller / Tideland / Oldenburg / Germany
//
// All rights reserved. Use of this source code is governed
// by the new BSD license.

// Package behaviors contains the behaviors and the specializd
// factory functions for the Tideland Go Cells example.
package behaviors

//EOF
44 changes: 44 additions & 0 deletions example/behaviors/global.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Tideland Go Cells - Example - Behaviors - Global usable ones
//
// Copyright (C) 2010-2017 Frank Mueller / Tideland / Oldenburg / Germany
//
// All rights reserved. Use of this source code is governed
// by the new BSD license.

package behaviors

//--------------------
// IMPORTS
//--------------------

import (
"sync"
"time"

"github.com/tideland/gocells/behaviors"
"github.com/tideland/gocells/cells"
)

//--------------------
// BEHAVIORS
//--------------------

// MakeLogger returns a logger behavior.
func MakeLogger() cells.Behavior {
return behaviors.NewLoggerBehavior()
}

// MakeTicker returns a ticker for the clocking of the
// emitting of the data.
func MakeTicker() cells.Behavior {
return behaviors.NewTickerBehavior(100 * time.Millisecond)
}

// MakeEndOFData returns a behavior reacting when all
// data is done.
func MakeEndOfData(wg sync.WaitGroup) cells.Behavior {
wg.Add(1)
return nil
}

// EOF
30 changes: 30 additions & 0 deletions example/behaviors/model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Tideland Go Cells - Example - Behaviors - Model
//
// Copyright (C) 2010-2017 Frank Mueller / Tideland / Oldenburg / Germany
//
// All rights reserved. Use of this source code is governed
// by the new BSD license.

package behaviors

//--------------------
// IMPORTS
//--------------------

//--------------------
// POPULATION
//--------------------

// Population contains the population of a country or region
// in one year.
type Population struct {
CountryName string
CountryCode string
Year string
Value int
}

// Populations is the set of all populations.
type Popoulations []Population

// EOF
47 changes: 47 additions & 0 deletions example/worldchange/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Tideland Go Cells - Example - Configuration
//
// Copyright (C) 2010-2017 Frank Mueller / Tideland / Oldenburg / Germany
//
// All rights reserved. Use of this source code is governed
// by the new BSD license.

package main

//--------------------
// IMPORTS
//--------------------

import (
"context"
)

//--------------------
// CONFIGURATION
//--------------------

// Configuration contains the sewttings for the example application.
type Configuration struct {
}

//--------------------
// CONTEXT
//--------------------

// contextKey is used to type keys for context values.
type contextKey int

// configurationKey addresses a configuration inside a context.
const configurationKey contextKey = 1

// NewContext returns a new context that carries a configuration.
func NewContext(ctx context.Context, cfg Configuration) context.Context {
return context.WithValue(ctx, configurationKey, cfg)
}

// FromContext returns the configuration stored in ctx, if any.
func FromContext(ctx context.Context) (Configuration, bool) {
cfg, ok := ctx.Value(configurationKey).(Configuration)
return cfg, ok
}

// EOS
12 changes: 12 additions & 0 deletions example/worldchange/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Tideland Go Cells - Example - Documentation
//
// Copyright (C) 2010-2017 Frank Mueller / Tideland / Oldenburg / Germany
//
// All rights reserved. Use of this source code is governed
// by the new BSD license.

// Package worldchange implements the main program of the
// Tideland Go Cells example.
package main

//EOF
Loading

0 comments on commit afb66bb

Please sign in to comment.