Skip to content

Commit

Permalink
Merge pull request #4 from ffdo/multi-interface
Browse files Browse the repository at this point in the history
Multi interface
  • Loading branch information
dereulenspiegel committed Dec 17, 2015
2 parents c159640 + 5c118e5 commit ba080f7
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 36 deletions.
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Switch | Description | Default | Mandatory

gluon-collector should run in the background. It queries in regular intervals all nodes
listening on the default announced multicast group and stores the received information.
The received information is the available via a REST API or data prepared for meshviewer.
The received information is available via a REST API or data prepared for meshviewer.

## Command line switches

Expand All @@ -48,15 +48,17 @@ collected data.
## Example config

```yaml
announced: # Block describing the behavior of the annoucned requester
interval:
statistics: 300 # The interval in seconds to fetch fast changing data like statistics and neighbours
nodeinfo: 1800 # The interval in seconds to request more static data and discover new nodes
expire: 3 # This is a multiplicator for the statistics interval. A node is considered offline if
# statistics interval multiplied by expire seconds have passed since the last response
receiver: # List of receiver receiving informantion from nodes.
- type: announced # Type of the receiver. Currently only announced is supported
interface: "bat0" # The interface to use for announced
port: 21444 # The port to use as a source port announced requests and to listen for responses on

interval:
statistics: 300 # The interval in seconds to fetch fast changing data like statistics and neighbours
nodeinfo: 1800 # The interval in seconds to request more static data and discover new nodes
expire: 3 # This is a multiplicator for the statistics interval. A node is considered offline if
# statistics interval multiplied by expire seconds have passed since the last response

logger:
level: "warn" # The log level, see logrus for valid values
file: /var/log/gluon-collector.log # If the log file is specified the log is written there. If not everything is send to stdout.
Expand Down
10 changes: 6 additions & 4 deletions example-config.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
announced:
interval:
statistics: 300
nodeinfo: 1800
receiver:
- type: announced
interface: "bat0"
port: 21444

interval:
statistics: 300
nodeinfo: 1800

logger:
level: "debug"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"net"
"runtime"

"io"

log "github.com/Sirupsen/logrus"
)

Expand All @@ -25,9 +27,12 @@ var announcedAddr = &net.UDPAddr{IP: net.ParseIP(MultiCastGroup), Port: Port}
// AnnouncedPacketReceiver abstracts the receiption of packets on the network side
// away so we can mock this easily in tests.
type AnnouncedPacketReceiver interface {
io.Closer
// Receive registers a callback method called every time packet is delivered
// Normally this method jusz enqueues the Repsonse in a channel for further processing.
Receive(rFunc func(Response))
Query(queryString string)
QueryUnicast(addr *net.UDPAddr, queryString string)
}

// Query represents who and what to query. If TargetAddr is null the default
Expand Down Expand Up @@ -150,10 +155,11 @@ func (r *Requester) readLoop() {
}

// Close closes the Requester instance and frees all allocated resources
func (r *Requester) Close() {
func (r *Requester) Close() error {
r.unicastConn.Close()
close(r.ReceiveChan)
close(r.queryChan)
return nil
}

// QueryUnicast sends an UDP query to a host directly via unicast. The IPv6 address
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ func NewBoltStore(path string) (*BoltStore, error) {
}

// Close closes the underlying bolt database.
func (b *BoltStore) Close() {
func (b *BoltStore) Close() error {
b.onlineStatusJob.Stop()
b.db.Close()
return nil
}

// executeHandlersOnNodeIdList is a helper method to simply invoke a list of handler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"flag"
"fmt"
"io"
"os"
"os/signal"
"syscall"
Expand All @@ -27,7 +28,7 @@ var importPath = flag.String("import", "", "Import data from this path")
var importType = flag.String("importType", "ffmap-backend", "The data format to import from, i.e ffmap-backend")

var DataStore data.Nodeinfostore
var Closeables []pipeline.Closeable
var Closeables []io.Closer

type LogPipe struct {
logFile *bufio.Writer
Expand Down Expand Up @@ -67,9 +68,9 @@ func getProcessPipes(store data.Nodeinfostore) []pipeline.ProcessPipe {
return pipes
}

func BuildPipelines(store data.Nodeinfostore, receiver announced.AnnouncedPacketReceiver, pipeEnd func(response data.ParsedResponse)) ([]pipeline.Closeable, error) {
func BuildPipelines(store data.Nodeinfostore, receiver announced.AnnouncedPacketReceiver, pipeEnd func(response data.ParsedResponse)) ([]io.Closer, error) {

closeables := make([]pipeline.Closeable, 0, 2)
closeables := make([]io.Closer, 0, 2)

receivePipeline := pipeline.NewReceivePipeline(&pipeline.JsonParsePipe{}, &pipeline.DeflatePipe{})
processPipe := pipeline.NewProcessPipeline(getProcessPipes(store)...)
Expand All @@ -94,16 +95,8 @@ func BuildPipelines(store data.Nodeinfostore, receiver announced.AnnouncedPacket
return closeables, nil
}

func Assemble() ([]pipeline.Closeable, error) {
iface, err := conf.Global.String("announced.interface")
if err != nil {
return nil, err
}
requester, err := announced.NewRequester(iface, conf.Global.UInt("announced.port", 12444))
if err != nil {
log.Fatalf("Can't create requester: %v", err)
return nil, err
}
func Assemble() ([]io.Closer, error) {
requester := buildReceiver()
closeables, err := BuildPipelines(DataStore, requester, func(response data.ParsedResponse) {
//Do nothing. This is the last step and we do not need to do anything here,
// just pull the chan clean
Expand Down Expand Up @@ -225,7 +218,7 @@ func ImportData() {
}

func main() {
Closeables = make([]pipeline.Closeable, 0, 5)
Closeables = make([]io.Closer, 0, 5)
flag.Parse()
conf.InitConfig()
if conf.Global == nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"net"
"os"
"testing"
"time"
Expand All @@ -18,12 +19,25 @@ type TestDataReceiver struct {
TestData []announced.Response
}

func (t *TestDataReceiver) Query(queryString string) {
//Nothing just here for interface compatibility
}

func (t *TestDataReceiver) QueryUnicast(addr *net.UDPAddr, queryString string) {
//Nothing just here for interface compatibility
}

func (t *TestDataReceiver) Receive(rFunc func(announced.Response)) {
for _, data := range t.TestData {
rFunc(data)
}
}

func (t *TestDataReceiver) Close() error {
// Only here to satisfy the announced.AnnouncedPacketReceiver interface
return nil
}

func executeCompletePipe(t *testing.T, store data.Nodeinfostore) {
log.SetLevel(log.ErrorLevel)
assert := assert.New(t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@ import (
"github.com/dereulenspiegel/node-informant/gluon-collector/data"
)

// Closeable should be implemented by all types which need to free resources
// on shutdown.
type Closeable interface {
Close()
}

// ReceivePipe needs to implemented by all types which want to participate in the
// process of receiving a response from announced and transforming it into something
// usable. This can be deflating and parsing the date for example.
Expand Down Expand Up @@ -62,9 +56,10 @@ type ParsePipe interface {
Process(in chan announced.Response) chan data.ParsedResponse
}

func (pipeline *ReceivePipeline) Close() {
func (pipeline *ReceivePipeline) Close() error {
close(pipeline.head)
close(pipeline.tail)
return nil
}

// ProcessPipe needs to be implemented by all types which want to participate in
Expand All @@ -79,9 +74,10 @@ type ProcessPipeline struct {
tail chan data.ParsedResponse
}

func (pipeline *ProcessPipeline) Close() {
func (pipeline *ProcessPipeline) Close() error {
close(pipeline.head)
close(pipeline.tail)
return nil
}

// Enqueue gives a common interface to push ParsedResponses into the ProcessPipeline
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package main

import (
"fmt"
"net"

log "github.com/Sirupsen/logrus"
conf "github.com/dereulenspiegel/node-informant/gluon-collector/config"
cfg "github.com/olebedev/config"

"github.com/dereulenspiegel/node-informant/announced"
)

type MultiReceiver struct {
packetChan chan announced.Response
childReceiver []announced.AnnouncedPacketReceiver
}

func NewMultiReceiver(receivers ...announced.AnnouncedPacketReceiver) *MultiReceiver {
mr := &MultiReceiver{make(chan announced.Response, 100), make([]announced.AnnouncedPacketReceiver, 0, 2)}
mr.childReceiver = append(mr.childReceiver, receivers...)
for _, receiver := range receivers {
go mr.singleReceive(receiver)
}
return mr
}

func (m *MultiReceiver) Query(queryString string) {
for _, receiver := range m.childReceiver {
receiver.Query(queryString)
}
}

func (m *MultiReceiver) QueryUnicast(addr *net.UDPAddr, queryString string) {
for _, receiver := range m.childReceiver {
receiver.QueryUnicast(addr, queryString)
}
}

func (m *MultiReceiver) singleReceive(receiver announced.AnnouncedPacketReceiver) {
receiver.Receive(func(response announced.Response) {
m.packetChan <- response
})
}

func (m *MultiReceiver) Receive(rFunc func(announced.Response)) {
for packet := range m.packetChan {
rFunc(packet)
}
}

func (m *MultiReceiver) Close() error {
for _, receiver := range m.childReceiver {
receiver.Close()
}
return nil
}

func buildReceiver() announced.AnnouncedPacketReceiver {
receiverConfigList, err := conf.Global.List("receiver")
if err != nil {
log.Fatalf("Receiver don't seem to be configured: %v", err)
}
receiverCount := len(receiverConfigList)
receiverSlice := make([]announced.AnnouncedPacketReceiver, 0, receiverCount)
for i := 0; i < receiverCount; i++ {
receiverConfig, err := conf.Global.Get(fmt.Sprintf("receiver.%d", i))
if err != nil {
log.Fatalf("Error retrieving config for %dth receiver: %v", i, err)
}
receiver := receiverFactory(receiverConfig)
receiverSlice = append(receiverSlice, receiver)
}

return NewMultiReceiver(receiverSlice...)
}

func receiverFactory(receiverConfig *cfg.Config) announced.AnnouncedPacketReceiver {
receiverType, err := receiverConfig.String("type")
if err != nil {
log.Fatalf("Can't retrieve type of receiver: %v", err)
}

switch receiverType {
case "announced":
return buildAnnouncedReceiver(receiverConfig)
default:
log.Fatalf("Unknown receiver type %s", receiverType)
return nil
}
}

func buildAnnouncedReceiver(announcedConfig *cfg.Config) announced.AnnouncedPacketReceiver {
iface, err := announcedConfig.String("interface")
if err != nil {
log.Fatalf("Can't determine interface for announced receiver")
}

port, err := announcedConfig.Int("port")
if err != nil {
log.Fatalf("Can't determine port for announced receiver")
}
requester, err := announced.NewRequester(iface, port)
if err != nil {
log.Fatalf("Error creating requester")
}
return requester
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package main

import (
"net"
"testing"
"time"

"github.com/dereulenspiegel/node-informant/announced"
"github.com/dereulenspiegel/node-informant/gluon-collector/test"
"github.com/stretchr/testify/assert"
)

var (
additionalData = []announced.Response{
announced.Response{
ClientAddr: &net.UDPAddr{
IP: net.ParseIP("fe80::1001:bcad"),
Port: 4242,
},
Payload: []byte("Additional payload"),
},
}
)

func TestMultiReceiver(t *testing.T) {
assert := assert.New(t)

testReceiver1 := &TestDataReceiver{test.TestData}
testReceiver2 := &TestDataReceiver{additionalData}

multiReceiver := NewMultiReceiver(testReceiver1, testReceiver2)

totalPacketCount := len(test.TestData) + len(additionalData)
packetFound := false
i := 0
go multiReceiver.Receive(func(packet announced.Response) {
i = i + 1
payloadString := string(packet.Payload)
if payloadString == "Additional payload" {
packetFound = true
}
})
testReceiver1.Close()
testReceiver2.Close()
for i < totalPacketCount {
time.Sleep(time.Millisecond * 1)
}
assert.Equal(totalPacketCount, i, "Received less packets than we fed through 2 receiver")
assert.True(packetFound, "Didn't found the additional payload")
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
// This makes more sense if data in the data store can expire (this is a TODO for
// the BoltStore).
type MissingUpdater struct {
Requester *announced.Requester
Requester announced.AnnouncedPacketReceiver
Store data.Nodeinfostore
}

Expand Down

0 comments on commit ba080f7

Please sign in to comment.