Skip to content

Commit

Permalink
examples: an increment-only counter
Browse files Browse the repository at this point in the history
  • Loading branch information
peterbourgon committed Feb 2, 2016
1 parent 9af1d5f commit 7aa1910
Show file tree
Hide file tree
Showing 7 changed files with 704 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ _testmain.go
Makefile
Dockerfile

examples/increment-only-counter/increment-only-counter

36 changes: 36 additions & 0 deletions examples/increment-only-counter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Simple example

This example implements an in-memory key-value cache.
Keys are strings, and values are increment-only counters.
This is a state-based CRDT, so the write operation is set(key, value).

## Demo

Start several peers on the same host.
Tell the second and subsequent peers to connect to the first one.

```
$ ./simple -hwaddr="00:00:00:00:00:01" -nickname=one -mesh=":6001" -http=":8001" &
$ ./simple -hwaddr="00:00:00:00:00:02" -nickname=two -mesh=":6002" -http=":8002" -peer="127.0.0.1:6001" &
$ ./simple -hwaddr="00:00:00:00:00:03" -nickname=three -mesh=":6003" -http=":8003" -peer="127.0.0.1:6001" &
```

Set a value using the HTTP API of any peer.

```
$ curl -Ss -XPOST "http://localhost:8003/?key=a&value=123"
set(a, 123) => 123
```

Get the value from any other peer.

```
$ curl -Ss -XGET "http://localhost:8001/?key=a"
get(a) => 123
```

## Implementation

- [The state object](/examples/simple/state.go) implements GossipData.
- [The peer object](/examples/simple/peer.go) implements Gossiper.
- [The func main](/examples/simple/main.go) wires the components together.
172 changes: 172 additions & 0 deletions examples/increment-only-counter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package main

import (
"flag"
"fmt"
"log"
"net"
"net/http"
"os"
"os/signal"
"sort"
"strconv"
"strings"
"syscall"

"github.com/weaveworks/mesh"
)

func main() {
peers := &stringset{}
var (
httpListen = flag.String("http", ":8080", "HTTP listen address")
meshListen = flag.String("mesh", net.JoinHostPort("0.0.0.0", strconv.Itoa(mesh.Port)), "mesh listen address")
hwaddr = flag.String("hwaddr", mustHardwareAddr(), "MAC address, i.e. mesh peer ID")
nickname = flag.String("nickname", mustHostname(), "peer nickname")
password = flag.String("password", "", "password (optional)")
channel = flag.String("channel", "default", "gossip channel name")
)
flag.Var(peers, "peer", "initial peer (may be repeated)")
flag.Parse()

log.SetPrefix(*nickname + "> ")

host, portStr, err := net.SplitHostPort(*meshListen)
if err != nil {
log.Fatalf("mesh address: %s: %v", *meshListen, err)
}
port, err := strconv.Atoi(portStr)
if err != nil {
log.Fatalf("mesh address: %s: %v", *meshListen, err)
}

name, err := mesh.PeerNameFromString(*hwaddr)
if err != nil {
log.Fatalf("%s: %v", *hwaddr, err)
}

router := mesh.NewRouter(mesh.Config{
Host: host,
Port: port,
ProtocolMinVersion: mesh.ProtocolMinVersion,
Password: []byte(*password),
ConnLimit: 64,
PeerDiscovery: true,
TrustedSubnets: []*net.IPNet{},
}, name, *nickname, mesh.NullOverlay{})

peer := newPeer(log.New(os.Stderr, *nickname+"> ", log.LstdFlags))
gossip := router.NewGossip(*channel, peer)
peer.register(gossip)

func() {
log.Printf("mesh router starting (%s)", *meshListen)
router.Start()
}()
defer func() {
log.Printf("mesh router stopping")
router.Stop()
}()

router.ConnectionMaker.InitiateConnections(peers.slice(), true)

errs := make(chan error, 2)

go func() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT)
errs <- fmt.Errorf("%s", <-c)
}()

go func() {
log.Printf("HTTP server starting (%s)", *httpListen)
http.HandleFunc("/", handle(peer))
errs <- http.ListenAndServe(*httpListen, nil)
}()

log.Print(<-errs)
}

type kv interface {
get(key string) (result int, ok bool)
set(key string, value int) (result int)
}

func handle(kv kv) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
key := r.FormValue("key")
if key == "" {
http.Error(w, "no key specified", http.StatusBadRequest)
return
}
result, ok := kv.get(key)
if !ok {
http.Error(w, fmt.Sprintf("%s not found", key), http.StatusNotFound)
return
}
fmt.Fprintf(w, "get(%s) => %d\n", key, result)

case "POST":
key := r.FormValue("key")
if key == "" {
http.Error(w, "no key specified", http.StatusBadRequest)
return
}
valueStr := r.FormValue("value")
if valueStr == "" {
http.Error(w, "no value specified", http.StatusBadRequest)
return
}
value, err := strconv.Atoi(valueStr)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
result := kv.set(key, value)
fmt.Fprintf(w, "set(%s, %d) => %d\n", key, value, result)
}
}
}

type stringset map[string]struct{}

func (ss stringset) Set(value string) error {
ss[value] = struct{}{}
return nil
}

func (ss stringset) String() string {
return strings.Join(ss.slice(), ",")
}

func (ss stringset) slice() []string {
slice := make([]string, 0, len(ss))
for k := range ss {
slice = append(slice, k)
}
sort.Strings(slice)
return slice
}

func mustHardwareAddr() string {
ifaces, err := net.Interfaces()
if err != nil {
panic(err)
}
for _, iface := range ifaces {
if s := iface.HardwareAddr.String(); s != "" {
return s
}
}
panic("no valid network interfaces")
}

func mustHostname() string {
hostname, err := os.Hostname()
if err != nil {
panic(err)
}
return hostname
}
137 changes: 137 additions & 0 deletions examples/increment-only-counter/peer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package main

import (
"encoding/json"
"log"

"github.com/weaveworks/mesh"
)

// Peer encapsulates state and implements mesh.Gossiper.
// It should be passed to mesh.Router.NewGossip,
// and the resulting Gossip registered in turn,
// before calling mesh.Router.Start.
type peer struct {
st *state
send mesh.Gossip
actions chan<- func()
quit chan struct{}
logger *log.Logger
}

// peer implements mesh.Gossiper.
var _ mesh.Gossiper = &peer{}

// Construct a peer with empty state.
// Be sure to register a channel, later,
// so we can make outbound communication.
func newPeer(logger *log.Logger) *peer {
actions := make(chan func())
p := &peer{
st: newState(),
send: nil, // must .register() later
actions: actions,
quit: make(chan struct{}),
logger: logger,
}
go p.loop(actions)
return p
}

func (p *peer) loop(actions <-chan func()) {
for {
select {
case f := <-actions:
f()
case <-p.quit:
return
}
}
}

// register the result of a mesh.Router.NewGossip.
func (p *peer) register(send mesh.Gossip) {
p.actions <- func() { p.send = send }
}

// Return the current value of the key.
func (p *peer) get(key string) (result int, ok bool) {
c := make(chan struct{})
p.actions <- func() {
defer close(c)
result, ok = p.st.set[key]
}
<-c
return result, ok
}

// Set key to value, and return the new value.
// The returned result may be different from the passed value,
// if the passed value is lower than the existing result.
func (p *peer) set(key string, value int) (result int) {
c := make(chan struct{})
p.actions <- func() {
defer close(c)
st := newState().completeMerge(map[string]int{key: value})
data := p.st.Merge(st)
if p.send != nil {
p.send.GossipBroadcast(st)
} else {
log.Printf("no sender configured; not broadcasting update right now")
}
result = data.(*state).set[key]
}
<-c
return result
}

func (p *peer) stop() {
close(p.quit)
}

// Return a copy of our complete state.
func (p *peer) Gossip() (complete mesh.GossipData) {
return p.st.copy()
}

// Merge the gossiped data represented by buf into our state.
// Return the state information that was modified.
func (p *peer) OnGossip(buf []byte) (delta mesh.GossipData, err error) {
var set map[string]int
if err := json.Unmarshal(buf, &set); err != nil {
return nil, err
}

delta = p.st.deltaMerge(set)
if delta == nil {
p.logger.Printf("OnGossip %v => delta %v", set, delta)
} else {
p.logger.Printf("OnGossip %v => delta %v", set, delta.(*state).set)
}
return delta, nil
}

// Merge the gossiped data represented by buf into our state.
// Return our complete resulting state.
func (p *peer) OnGossipBroadcast(src mesh.PeerName, buf []byte) (complete mesh.GossipData, err error) {
var set map[string]int
if err := json.Unmarshal(buf, &set); err != nil {
return nil, err
}

complete = p.st.completeMerge(set)
p.logger.Printf("OnGossipBroadcast %s %v => complete %v", src, set, complete.(*state).set)
return complete, nil
}

// Merge the gossiped data represented by buf into our state.
func (p *peer) OnGossipUnicast(src mesh.PeerName, buf []byte) error {
var set map[string]int
if err := json.Unmarshal(buf, &set); err != nil {
return err
}

complete := p.st.completeMerge(set)
p.logger.Printf("OnGossipUnicast %s %v => complete %v", src, set, complete)
return nil
}
Loading

0 comments on commit 7aa1910

Please sign in to comment.