Skip to content
This repository has been archived by the owner on Nov 18, 2017. It is now read-only.

Commit

Permalink
change to use []byte for retreiving from FSM
Browse files Browse the repository at this point in the history
  • Loading branch information
Joshua Deare committed Sep 20, 2016
1 parent a6b6f1f commit 426748f
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 42 deletions.
26 changes: 9 additions & 17 deletions fsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ type SingleLeaderFSM interface {
RefreshLeader() error
ForceLeader(leader Leader) error
DeleteLeader(leader Leader) error
Leader(leader Leader) (bool, error)
Leader() ([]byte, bool, error)

MemberObserver() (MemberObserver, error)
SetMember(member Member) error
RefreshMember(id string) error
DeleteMember(id string) error
Member(id string, member Member) (bool, error)
Member(id string) ([]byte, bool, error)
Members() ([][]byte, error)

CompletedRestore() bool
Expand Down Expand Up @@ -294,19 +294,15 @@ func (f *fsm) DeleteLeader(leader Leader) error {
return errors.Wrap(f.proposeDeleteLeader(leader), "Error proposing delete leader")
}

func (f *fsm) Leader(leader Leader) (bool, error) {
func (f *fsm) Leader() ([]byte, bool, error) {
f.Lock()
defer f.Unlock()
if f.leader == nil {
f.leader = nil
return false, nil
}

if err := leader.UnmarshalFSM(f.leader.Data); err != nil {
return false, errors.Wrap(err, "Error unmarshaling leader")
return []byte{}, false, nil
}

return true, nil
return f.leader.Data, true, nil
}

func (f *fsm) MemberObserver() (MemberObserver, error) {
Expand Down Expand Up @@ -364,17 +360,13 @@ func (f *fsm) DeleteMember(id string) error {
return errors.Wrap(f.proposeDeleteMember(id), "Error proposing delete member")
}

func (f *fsm) Member(id string, member Member) (bool, error) {
func (f *fsm) Member(id string) ([]byte, bool, error) {
f.Lock()
defer f.Unlock()
if data, ok := f.members[id]; ok {
err := member.UnmarshalFSM(data.Data)
if err != nil {
return false, errors.Wrap(err, "Error unmarshaling member")
}
return true, nil
if member, ok := f.members[id]; ok {
return member.Data, true, nil
}
return false, nil
return []byte{}, false, nil
}

// Members gives all the members of the cluster
Expand Down
36 changes: 26 additions & 10 deletions fsm/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,12 @@ func TestRaceForLeader(t *testing.T) {
time.Sleep(time.Duration(fsm.leaderTTL / 2))

tl = &testLeader{}
ok, err := fsm.Leader(tl)
leaderData, ok, err := fsm.Leader()
assert.True(t, ok, "There should be a leader set at this point")
assert.NoError(t, err, "There should be no error querying the leader id")

tl.UnmarshalFSM(leaderData)
assert.NoError(t, err, "There should be no error unmarshalling")
assert.Equal(t, "1234", tl.UID)

tl = &testLeader{UID: "5678"}
Expand All @@ -159,9 +162,12 @@ func TestRaceForLeader(t *testing.T) {
time.Sleep(time.Duration(fsm.leaderTTL / 2))

tl = &testLeader{}
ok, err = fsm.Leader(tl)
leaderData, ok, err = fsm.Leader()
assert.NoError(t, err, "There should be no error querying for leader")
assert.True(t, ok, "There should still be a leader present")

err = tl.UnmarshalFSM(leaderData)
assert.NoError(t, err, "There should be no error unmarshalling")
assert.Equal(t, "1234", tl.UID, "The leader should remain what it was before")
}

Expand Down Expand Up @@ -275,9 +281,12 @@ func TestForceLeader(t *testing.T) {

time.Sleep(time.Duration(time.Duration(tlb.TTL / 2)))
tl = &testLeader{}
ok, err := fsm.Leader(tl)
leaderData, ok, err := fsm.Leader()
assert.True(t, ok, "Should be a leader set right now")
assert.NoError(t, err, "Should be no error fetching current leader")

err = tl.UnmarshalFSM(leaderData)
assert.NoError(t, err, "Should be no error unmarshalling leader")
assert.Equal(t, "5678", tl.UID)
})

Expand All @@ -289,9 +298,11 @@ func TestForceLeader(t *testing.T) {

time.Sleep(500 * time.Millisecond)
tl = &testLeader{}
ok, err := fsm.Leader(tl)
leaderData, ok, err := fsm.Leader()
assert.True(t, ok, "Should be a leader set right now")
assert.NoError(t, err, "Should be no error fetching current leader")

err = tl.UnmarshalFSM(leaderData)
assert.Equal(t, "5678", tl.UID)
})
}
Expand Down Expand Up @@ -371,9 +382,12 @@ func TestLeader(t *testing.T) {
fsm.Unlock()

tl = &testLeader{}
ok, err := fsm.Leader(tl)
leaderData, ok, err := fsm.Leader()
assert.True(t, ok, "There should be a leader since we just set it")
assert.NoError(t, err, "There should be no error retreiving the leader")

err = tl.UnmarshalFSM(leaderData)
assert.NoError(t, err, "Should be no error unmarshaling leader data")
assert.Equal(t, "1234", tl.UID)

fsm.Lock()
Expand All @@ -382,10 +396,10 @@ func TestLeader(t *testing.T) {
})

t.Run("Should not get leader back when not set", func(t *testing.T) {
tl := &testLeader{}
ok, err := fsm.Leader(tl)
_, ok, err := fsm.Leader()
assert.False(t, ok, "There should not be a leader since we just set it")
assert.NoError(t, err, "There should be no error retreiving the leader")

})
}

Expand Down Expand Up @@ -586,9 +600,12 @@ func TestMember(t *testing.T) {
fsm.Unlock()

tm = &testMember{}
ok, err := fsm.Member(tmb.ID, tm)
memberData, ok, err := fsm.Member(tmb.ID)
assert.True(t, ok, "There should be a member since we just set it")
assert.NoError(t, err, "There should be no error retreiving the leader")

err = tm.UnmarshalFSM(memberData)
assert.NoError(t, err, "Should be no error unmarshaling member")
assert.Equal(t, "1234", tm.UID)

fsm.Lock()
Expand All @@ -597,8 +614,7 @@ func TestMember(t *testing.T) {
})

t.Run("Should not get member back when not set", func(t *testing.T) {
tm := &testMember{}
ok, err := fsm.Member("1234", tm)
_, ok, err := fsm.Member("1234")
assert.False(t, ok, "There should not be a leader since we just set it")
assert.NoError(t, err, "There should be no error retreiving the leader")
})
Expand Down
11 changes: 7 additions & 4 deletions ha/ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,8 @@ func (ha *SingleLeaderHA) raceRetryForInit(eb *backoff.ExponentialBackOff) (bool
}

func (ha *SingleLeaderHA) waitForLeader(eb *backoff.ExponentialBackOff) (fsm.Leader, error) {
leader := ha.service.FSMLeaderTemplate()
for {
exists, err := ha.fsm.Leader(leader)
leaderData, exists, err := ha.fsm.Leader()
if err != nil {
return nil, err
}
Expand All @@ -226,6 +225,11 @@ func (ha *SingleLeaderHA) waitForLeader(eb *backoff.ExponentialBackOff) (fsm.Lea
continue
}

leader, err := ha.service.FSMLeaderFromBytes(leaderData)
if err != nil {
return nil, errors.Wrap(err, "Error converting bytes to leader")
}

return leader, nil
}
}
Expand Down Expand Up @@ -289,8 +293,7 @@ func (ha *SingleLeaderHA) RunCycle() error {
return err
}

tempMember := ha.service.FSMMemberTemplate()
exists, err := ha.fsm.Member(member.ID(), tempMember)
_, exists, err := ha.fsm.Member(member.ID())
if err != nil {
return errors.Wrap(err, "Error checking for member existance")
}
Expand Down
8 changes: 0 additions & 8 deletions service/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,14 +210,6 @@ func (p *postgresql) FSMMemberFromBytes(data []byte) (fsm.Member, error) {
return member, nil
}

func (p *postgresql) FSMLeaderTemplate() fsm.Leader {
return &clusterMember{}
}

func (p *postgresql) FSMMemberTemplate() fsm.Member {
return &clusterMember{}
}

// TODO: Change interface to (bool, error)???
// TODO: Have HA ask FSM to give update of WAL and pass those in
// That way we have consensus in the FSM of which node should be elected
Expand Down
3 changes: 0 additions & 3 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ type SingleLeaderService interface {
FSMMemberFromBytes(data []byte) (fsm.Member, error)
FSMLeaderFromBytes(data []byte) (fsm.Leader, error)

FSMMemberTemplate() fsm.Member
FSMLeaderTemplate() fsm.Leader

IsHealthy() bool
IsRunning() bool
Ping() error // Check
Expand Down

0 comments on commit 426748f

Please sign in to comment.