Skip to content

Commit

Permalink
Merge pull request #81 from Comcast/feature/device-status
Browse files Browse the repository at this point in the history
Feature/device status
  • Loading branch information
johnabass authored Feb 8, 2019
2 parents d5ac056 + 3f3cc89 commit a3a16ca
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 28 deletions.
11 changes: 6 additions & 5 deletions src/glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ package: .
homepage: https://github.com/Comcast/talaria
import:
- package: github.com/Comcast/webpa-common
version: 11a5e3024661b9f1ed0df38eeddb001f0be94202
version: a80e41190e054263e92efdae84423d12c0272a55
71 changes: 71 additions & 0 deletions src/talaria/deviceStatus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package main

import (
"fmt"
"time"

"github.com/Comcast/webpa-common/device"
"github.com/Comcast/webpa-common/wrp"
)

func statusEventType(id device.ID, subtype string) string {
return fmt.Sprintf("device-status/%s/%s", id, subtype)
}

func onlinePayload(t time.Time, d device.Interface) []byte {
return []byte(fmt.Sprintf(`{
"id": "%s",
"ts": "%s",
}`, d.ID(), t.Format(time.RFC3339Nano)))
}

func newOnlineMessage(source string, d device.Interface) (string, *wrp.Message) {
eventType := statusEventType(d.ID(), "online")

return eventType, &wrp.Message{
Type: wrp.SimpleEventMessageType,
Source: source,
Destination: "event:" + eventType,
ContentType: "json",
PartnerIDs: d.PartnerIDs(),
Payload: onlinePayload(time.Now(), d),
}
}

func offlinePayload(t time.Time, closeReason string, d device.Interface) []byte {
statistics := d.Statistics()

return []byte(fmt.Sprintf(`{
"id": "%s",
"ts": "%s",
"bytes-sent": %d,
"messages-sent": %d,
"bytes-received": %d,
"messages-received": %d,
"connected-at": "%s",
"up-time": "%s",
"reason-for-closure": "%s",
}`, d.ID(),
t.Format(time.RFC3339Nano),
statistics.BytesSent(),
statistics.MessagesSent(),
statistics.BytesReceived(),
statistics.MessagesReceived(),
statistics.ConnectedAt().Format(time.RFC3339Nano),
statistics.UpTime(),
closeReason,
))
}

func newOfflineMessage(source string, closeReason string, d device.Interface) (string, *wrp.Message) {
eventType := statusEventType(d.ID(), "offline")

return eventType, &wrp.Message{
Type: wrp.SimpleEventMessageType,
Source: source,
Destination: "event:" + eventType,
ContentType: "json",
PartnerIDs: d.PartnerIDs(),
Payload: offlinePayload(time.Now(), closeReason, d),
}
}
68 changes: 49 additions & 19 deletions src/talaria/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type dispatcher struct {
method string
timeout time.Duration
authorizationKeys []string
source string
eventMap event.MultiMap
queueSize metrics.Gauge
droppedMessages metrics.Counter
Expand Down Expand Up @@ -163,6 +164,23 @@ func (d *dispatcher) dispatchEvent(eventType, contentType string, contents []byt
return nil
}

func (d *dispatcher) encodeAndDispatchEvent(eventType string, format wrp.Format, message *wrp.Message) error {
var (
contents []byte
encoder = wrp.NewEncoderBytes(&contents, format)
)

if err := encoder.Encode(message); err != nil {
return err
}

if err := d.dispatchEvent(eventType, format.ContentType(), contents); err != nil {
return err
}

return nil
}

func (d *dispatcher) dispatchTo(unfiltered string, contentType string, contents []byte) error {
url, err := d.urlFilter.Filter(unfiltered)
if err != nil {
Expand All @@ -181,28 +199,40 @@ func (d *dispatcher) dispatchTo(unfiltered string, contentType string, contents
}

func (d *dispatcher) OnDeviceEvent(event *device.Event) {
if event.Type != device.MessageReceived {
return
}
switch event.Type {
case device.Connect:
eventType, message := newOnlineMessage(d.source, event.Device)
if err := d.encodeAndDispatchEvent(eventType, wrp.Msgpack, message); err != nil {
d.errorLog.Log(logging.MessageKey(), "Error dispatching online event", "eventType", eventType, "destination", message.Destination, logging.ErrorKey(), err)
}

if routable, ok := event.Message.(wrp.Routable); ok {
var (
destination = routable.To()
contentType = event.Format.ContentType()
)
case device.Disconnect:
// TODO: FIgure out how to get the reason for closure
eventType, message := newOfflineMessage(d.source, "unknown", event.Device)
if err := d.encodeAndDispatchEvent(eventType, wrp.Msgpack, message); err != nil {
d.errorLog.Log(logging.MessageKey(), "Error dispatching offline event", "eventType", eventType, "destination", message.Destination, logging.ErrorKey(), err)
}

if strings.HasPrefix(destination, EventPrefix) {
eventType := destination[len(EventPrefix):]
if err := d.dispatchEvent(eventType, contentType, event.Contents); err != nil {
d.errorLog.Log(logging.MessageKey(), "Error dispatching event", "destination", destination, logging.ErrorKey(), err)
}
} else if strings.HasPrefix(destination, DNSPrefix) {
unfilteredURL := destination[len(DNSPrefix):]
if err := d.dispatchTo(unfilteredURL, contentType, event.Contents); err != nil {
d.errorLog.Log(logging.MessageKey(), "Error dispatching to endpoint", "destination", destination, logging.ErrorKey(), err)
case device.MessageReceived:
if routable, ok := event.Message.(wrp.Routable); ok {
var (
destination = routable.To()
contentType = event.Format.ContentType()
)

if strings.HasPrefix(destination, EventPrefix) {
eventType := destination[len(EventPrefix):]
if err := d.dispatchEvent(eventType, contentType, event.Contents); err != nil {
d.errorLog.Log(logging.MessageKey(), "Error dispatching event", "eventType", eventType, "destination", destination, logging.ErrorKey(), err)
}
} else if strings.HasPrefix(destination, DNSPrefix) {
unfilteredURL := destination[len(DNSPrefix):]
if err := d.dispatchTo(unfilteredURL, contentType, event.Contents); err != nil {
d.errorLog.Log(logging.MessageKey(), "Error dispatching to endpoint", "destination", destination, logging.ErrorKey(), err)
}
} else {
d.errorLog.Log(logging.MessageKey(), "Unroutable destination", "destination", destination)
}
} else {
d.errorLog.Log(logging.MessageKey(), "Unroutable destination", "destination", destination)
}
}
}
11 changes: 8 additions & 3 deletions src/talaria/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,24 @@ import (
"github.com/stretchr/testify/require"
)

func testDispatcherIgnoredEvent(t *testing.T) {
func testDispatcherConnectEvent(t *testing.T) {
var (
assert = assert.New(t)
require = require.New(t)
d = new(device.MockDevice)
dispatcher, outbounds, err = NewDispatcher(NewTestOutboundMeasures(), nil, nil)
)

require.NotNil(dispatcher)
require.NotNil(outbounds)
require.NoError(err)

dispatcher.OnDeviceEvent(&device.Event{Type: device.Connect})
d.On("ID").Return(device.ID("mac:123412341234"))
d.On("PartnerIDs").Return([]string{"partner-1"})

dispatcher.OnDeviceEvent(&device.Event{Type: device.Connect, Device: d})
assert.Equal(0, len(outbounds))
d.AssertExpectations(t)
}

func testDispatcherUnroutable(t *testing.T) {
Expand Down Expand Up @@ -339,7 +344,7 @@ func testDispatcherOnDeviceEventDispatchTo(t *testing.T) {
}

func TestDispatcher(t *testing.T) {
t.Run("IgnoredEvent", testDispatcherIgnoredEvent)
t.Run("ConnectEvent", testDispatcherConnectEvent)
t.Run("Unroutable", testDispatcherUnroutable)
t.Run("BadURLFilter", testDispatcherBadURLFilter)
t.Run("OnDeviceEvent", func(t *testing.T) {
Expand Down

0 comments on commit a3a16ca

Please sign in to comment.