Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change increment-only-counter to G-counter #22

Merged
merged 1 commit into from
Mar 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 7 additions & 33 deletions examples/increment-only-counter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func main() {
TrustedSubnets: []*net.IPNet{},
}, name, *nickname, mesh.NullOverlay{})

peer := newPeer(log.New(os.Stderr, *nickname+"> ", log.LstdFlags))
peer := newPeer(name, log.New(os.Stderr, *nickname+"> ", log.LstdFlags))
gossip := router.NewGossip(*channel, peer)
peer.register(gossip)

Expand Down Expand Up @@ -87,45 +87,19 @@ func main() {
log.Print(<-errs)
}

type kv interface {
get(key string) (result int, ok bool)
set(key string, value int) (result int)
type counter interface {
get() int
incr() int
}

func handle(kv kv) http.HandlerFunc {
func handle(c counter) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
key := r.FormValue("key")
if key == "" {
http.Error(w, "no key specified", http.StatusBadRequest)
return
}
result, ok := kv.get(key)
if !ok {
http.Error(w, fmt.Sprintf("%s not found", key), http.StatusNotFound)
return
}
fmt.Fprintf(w, "get(%s) => %d\n", key, result)
fmt.Fprintf(w, "get => %d\n", c.get())

case "POST":
key := r.FormValue("key")
if key == "" {
http.Error(w, "no key specified", http.StatusBadRequest)
return
}
valueStr := r.FormValue("value")
if valueStr == "" {
http.Error(w, "no value specified", http.StatusBadRequest)
return
}
value, err := strconv.Atoi(valueStr)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
result := kv.set(key, value)
fmt.Fprintf(w, "set(%s, %d) => %d\n", key, value, result)
fmt.Fprintf(w, "incr => %d\n", c.incr())
}
}
}
Expand Down
43 changes: 18 additions & 25 deletions examples/increment-only-counter/peer.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package main

import (
"encoding/json"
"log"

"bytes"
"encoding/gob"

"github.com/weaveworks/mesh"
)

Expand All @@ -25,10 +27,10 @@ var _ mesh.Gossiper = &peer{}
// Construct a peer with empty state.
// Be sure to register a channel, later,
// so we can make outbound communication.
func newPeer(logger *log.Logger) *peer {
func newPeer(self mesh.PeerName, logger *log.Logger) *peer {
actions := make(chan func())
p := &peer{
st: newState(),
st: newState(self),
send: nil, // must .register() later
actions: actions,
quit: make(chan struct{}),
Expand All @@ -54,32 +56,23 @@ func (p *peer) register(send mesh.Gossip) {
p.actions <- func() { p.send = send }
}

// Return the current value of the key.
func (p *peer) get(key string) (result int, ok bool) {
c := make(chan struct{})
p.actions <- func() {
defer close(c)
result, ok = p.st.set[key]
}
<-c
return result, ok
// Return the current value of the counter.
func (p *peer) get() int {
return p.st.get()
}

// Set key to value, and return the new value.
// The returned result may be different from the passed value,
// if the passed value is lower than the existing result.
func (p *peer) set(key string, value int) (result int) {
// Increment the counter by one.
func (p *peer) incr() (result int) {
c := make(chan struct{})
p.actions <- func() {
defer close(c)
st := newState().mergeComplete(map[string]int{key: value})
data := p.st.Merge(st)
st := p.st.incr()
if p.send != nil {
p.send.GossipBroadcast(st)
} else {
log.Printf("no sender configured; not broadcasting update right now")
}
result = data.(*state).set[key]
result = st.get()
}
<-c
return result
Expand All @@ -99,8 +92,8 @@ func (p *peer) Gossip() (complete mesh.GossipData) {
// Merge the gossiped data represented by buf into our state.
// Return the state information that was modified.
func (p *peer) OnGossip(buf []byte) (delta mesh.GossipData, err error) {
var set map[string]int
if err := json.Unmarshal(buf, &set); err != nil {
var set map[mesh.PeerName]int
if err := gob.NewDecoder(bytes.NewReader(buf)).Decode(&set); err != nil {
return nil, err
}

Expand All @@ -116,8 +109,8 @@ func (p *peer) OnGossip(buf []byte) (delta mesh.GossipData, err error) {
// Merge the gossiped data represented by buf into our state.
// Return the state information that was modified.
func (p *peer) OnGossipBroadcast(src mesh.PeerName, buf []byte) (received mesh.GossipData, err error) {
var set map[string]int
if err := json.Unmarshal(buf, &set); err != nil {
var set map[mesh.PeerName]int
if err := gob.NewDecoder(bytes.NewReader(buf)).Decode(&set); err != nil {
return nil, err
}

Expand All @@ -132,8 +125,8 @@ func (p *peer) OnGossipBroadcast(src mesh.PeerName, buf []byte) (received mesh.G

// Merge the gossiped data represented by buf into our state.
func (p *peer) OnGossipUnicast(src mesh.PeerName, buf []byte) error {
var set map[string]int
if err := json.Unmarshal(buf, &set); err != nil {
var set map[mesh.PeerName]int
if err := gob.NewDecoder(bytes.NewReader(buf)).Decode(&set); err != nil {
return err
}

Expand Down
100 changes: 55 additions & 45 deletions examples/increment-only-counter/peer_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package main

import (
"encoding/json"
"bytes"
"encoding/gob"
"io/ioutil"
"log"
"reflect"
Expand All @@ -12,30 +13,33 @@ import (

func TestPeerOnGossip(t *testing.T) {
for _, testcase := range []struct {
initial map[string]int
msg map[string]int
want map[string]int
initial map[mesh.PeerName]int
msg map[mesh.PeerName]int
want map[mesh.PeerName]int
}{
{
map[string]int{},
map[string]int{"a": 1, "b": 2},
map[string]int{"a": 1, "b": 2},
map[mesh.PeerName]int{},
map[mesh.PeerName]int{123: 1, 456: 2},
map[mesh.PeerName]int{123: 1, 456: 2},
},
{
map[string]int{"a": 1},
map[string]int{"a": 0, "b": 2},
map[string]int{"b": 2},
map[mesh.PeerName]int{123: 1},
map[mesh.PeerName]int{123: 0, 456: 2},
map[mesh.PeerName]int{456: 2},
},
{
map[string]int{"a": 9},
map[string]int{"a": 8},
map[mesh.PeerName]int{123: 9},
map[mesh.PeerName]int{123: 8},
nil,
},
} {
p := newPeer(log.New(ioutil.Discard, "", 0))
p := newPeer(mesh.PeerName(999), log.New(ioutil.Discard, "", 0))
p.st.mergeComplete(testcase.initial)
buf, _ := json.Marshal(testcase.msg)
delta, err := p.OnGossip(buf)
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(testcase.msg); err != nil {
t.Fatal(err)
}
delta, err := p.OnGossip(buf.Bytes())
if err != nil {
t.Errorf("%v OnGossip %v: %v", testcase.initial, testcase.msg, err)
continue
Expand All @@ -54,30 +58,33 @@ func TestPeerOnGossip(t *testing.T) {

func TestPeerOnGossipBroadcast(t *testing.T) {
for _, testcase := range []struct {
initial map[string]int
msg map[string]int
want map[string]int
initial map[mesh.PeerName]int
msg map[mesh.PeerName]int
want map[mesh.PeerName]int
}{
{
map[string]int{},
map[string]int{"a": 1, "b": 2},
map[string]int{"a": 1, "b": 2},
map[mesh.PeerName]int{},
map[mesh.PeerName]int{123: 1, 456: 2},
map[mesh.PeerName]int{123: 1, 456: 2},
},
{
map[string]int{"a": 1},
map[string]int{"a": 0, "b": 2},
map[string]int{"b": 2},
map[mesh.PeerName]int{123: 1},
map[mesh.PeerName]int{123: 0, 456: 2},
map[mesh.PeerName]int{456: 2},
},
{
map[string]int{"a": 9},
map[string]int{"a": 8},
map[string]int{}, // OnGossipBroadcast returns received, which should never be nil
map[mesh.PeerName]int{123: 9},
map[mesh.PeerName]int{123: 8},
map[mesh.PeerName]int{}, // OnGossipBroadcast returns received, which should never be nil
},
} {
p := newPeer(log.New(ioutil.Discard, "", 0))
p := newPeer(999, log.New(ioutil.Discard, "", 0))
p.st.mergeComplete(testcase.initial)
buf, _ := json.Marshal(testcase.msg)
delta, err := p.OnGossipBroadcast(mesh.UnknownPeerName, buf)
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(testcase.msg); err != nil {
t.Fatal(err)
}
delta, err := p.OnGossipBroadcast(mesh.UnknownPeerName, buf.Bytes())
if err != nil {
t.Errorf("%v OnGossipBroadcast %v: %v", testcase.initial, testcase.msg, err)
continue
Expand All @@ -90,30 +97,33 @@ func TestPeerOnGossipBroadcast(t *testing.T) {

func TestPeerOnGossipUnicast(t *testing.T) {
for _, testcase := range []struct {
initial map[string]int
msg map[string]int
want map[string]int
initial map[mesh.PeerName]int
msg map[mesh.PeerName]int
want map[mesh.PeerName]int
}{
{
map[string]int{},
map[string]int{"a": 1, "b": 2},
map[string]int{"a": 1, "b": 2},
map[mesh.PeerName]int{},
map[mesh.PeerName]int{123: 1, 456: 2},
map[mesh.PeerName]int{123: 1, 456: 2},
},
{
map[string]int{"a": 1},
map[string]int{"a": 0, "b": 2},
map[string]int{"a": 1, "b": 2},
map[mesh.PeerName]int{123: 1},
map[mesh.PeerName]int{123: 0, 456: 2},
map[mesh.PeerName]int{123: 1, 456: 2},
},
{
map[string]int{"a": 9},
map[string]int{"a": 8},
map[string]int{"a": 9},
map[mesh.PeerName]int{123: 9},
map[mesh.PeerName]int{123: 8},
map[mesh.PeerName]int{123: 9},
},
} {
p := newPeer(log.New(ioutil.Discard, "", 0))
p := newPeer(999, log.New(ioutil.Discard, "", 0))
p.st.mergeComplete(testcase.initial)
buf, _ := json.Marshal(testcase.msg)
if err := p.OnGossipUnicast(mesh.UnknownPeerName, buf); err != nil {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(testcase.msg); err != nil {
t.Fatal(err)
}
if err := p.OnGossipUnicast(mesh.UnknownPeerName, buf.Bytes()); err != nil {
t.Errorf("%v OnGossipBroadcast %v: %v", testcase.initial, testcase.msg, err)
continue
}
Expand Down
Loading