diff --git a/fsm/fsm.go b/fsm/fsm.go index e2fe6c56..b787a199 100644 --- a/fsm/fsm.go +++ b/fsm/fsm.go @@ -60,13 +60,13 @@ type SingleLeaderFSM interface { RefreshLeader() error ForceLeader(leader Leader) error DeleteLeader(leader Leader) error - Leader(leader Leader) (bool, error) + Leader() ([]byte, bool, error) MemberObserver() (MemberObserver, error) SetMember(member Member) error RefreshMember(id string) error DeleteMember(id string) error - Member(id string, member Member) (bool, error) + Member(id string) ([]byte, bool, error) Members() ([][]byte, error) CompletedRestore() bool @@ -294,19 +294,15 @@ func (f *fsm) DeleteLeader(leader Leader) error { return errors.Wrap(f.proposeDeleteLeader(leader), "Error proposing delete leader") } -func (f *fsm) Leader(leader Leader) (bool, error) { +func (f *fsm) Leader() ([]byte, bool, error) { f.Lock() defer f.Unlock() if f.leader == nil { f.leader = nil - return false, nil - } - - if err := leader.UnmarshalFSM(f.leader.Data); err != nil { - return false, errors.Wrap(err, "Error unmarshaling leader") + return []byte{}, false, nil } - return true, nil + return f.leader.Data, true, nil } func (f *fsm) MemberObserver() (MemberObserver, error) { @@ -364,17 +360,13 @@ func (f *fsm) DeleteMember(id string) error { return errors.Wrap(f.proposeDeleteMember(id), "Error proposing delete member") } -func (f *fsm) Member(id string, member Member) (bool, error) { +func (f *fsm) Member(id string) ([]byte, bool, error) { f.Lock() defer f.Unlock() - if data, ok := f.members[id]; ok { - err := member.UnmarshalFSM(data.Data) - if err != nil { - return false, errors.Wrap(err, "Error unmarshaling member") - } - return true, nil + if member, ok := f.members[id]; ok { + return member.Data, true, nil } - return false, nil + return []byte{}, false, nil } // Members gives all the members of the cluster diff --git a/fsm/fsm_test.go b/fsm/fsm_test.go index 805663e6..b8c4edae 100644 --- a/fsm/fsm_test.go +++ b/fsm/fsm_test.go @@ -146,9 +146,12 @@ func TestRaceForLeader(t *testing.T) { time.Sleep(time.Duration(fsm.leaderTTL / 2)) tl = &testLeader{} - ok, err := fsm.Leader(tl) + leaderData, ok, err := fsm.Leader() assert.True(t, ok, "There should be a leader set at this point") assert.NoError(t, err, "There should be no error querying the leader id") + + tl.UnmarshalFSM(leaderData) + assert.NoError(t, err, "There should be no error unmarshalling") assert.Equal(t, "1234", tl.UID) tl = &testLeader{UID: "5678"} @@ -159,9 +162,12 @@ func TestRaceForLeader(t *testing.T) { time.Sleep(time.Duration(fsm.leaderTTL / 2)) tl = &testLeader{} - ok, err = fsm.Leader(tl) + leaderData, ok, err = fsm.Leader() assert.NoError(t, err, "There should be no error querying for leader") assert.True(t, ok, "There should still be a leader present") + + err = tl.UnmarshalFSM(leaderData) + assert.NoError(t, err, "There should be no error unmarshalling") assert.Equal(t, "1234", tl.UID, "The leader should remain what it was before") } @@ -275,9 +281,12 @@ func TestForceLeader(t *testing.T) { time.Sleep(time.Duration(time.Duration(tlb.TTL / 2))) tl = &testLeader{} - ok, err := fsm.Leader(tl) + leaderData, ok, err := fsm.Leader() assert.True(t, ok, "Should be a leader set right now") assert.NoError(t, err, "Should be no error fetching current leader") + + err = tl.UnmarshalFSM(leaderData) + assert.NoError(t, err, "Should be no error unmarshalling leader") assert.Equal(t, "5678", tl.UID) }) @@ -289,9 +298,11 @@ func TestForceLeader(t *testing.T) { time.Sleep(500 * time.Millisecond) tl = &testLeader{} - ok, err := fsm.Leader(tl) + leaderData, ok, err := fsm.Leader() assert.True(t, ok, "Should be a leader set right now") assert.NoError(t, err, "Should be no error fetching current leader") + + err = tl.UnmarshalFSM(leaderData) assert.Equal(t, "5678", tl.UID) }) } @@ -371,9 +382,12 @@ func TestLeader(t *testing.T) { fsm.Unlock() tl = &testLeader{} - ok, err := fsm.Leader(tl) + leaderData, ok, err := fsm.Leader() assert.True(t, ok, "There should be a leader since we just set it") assert.NoError(t, err, "There should be no error retreiving the leader") + + err = tl.UnmarshalFSM(leaderData) + assert.NoError(t, err, "Should be no error unmarshaling leader data") assert.Equal(t, "1234", tl.UID) fsm.Lock() @@ -382,10 +396,10 @@ func TestLeader(t *testing.T) { }) t.Run("Should not get leader back when not set", func(t *testing.T) { - tl := &testLeader{} - ok, err := fsm.Leader(tl) + _, ok, err := fsm.Leader() assert.False(t, ok, "There should not be a leader since we just set it") assert.NoError(t, err, "There should be no error retreiving the leader") + }) } @@ -586,9 +600,12 @@ func TestMember(t *testing.T) { fsm.Unlock() tm = &testMember{} - ok, err := fsm.Member(tmb.ID, tm) + memberData, ok, err := fsm.Member(tmb.ID) assert.True(t, ok, "There should be a member since we just set it") assert.NoError(t, err, "There should be no error retreiving the leader") + + err = tm.UnmarshalFSM(memberData) + assert.NoError(t, err, "Should be no error unmarshaling member") assert.Equal(t, "1234", tm.UID) fsm.Lock() @@ -597,8 +614,7 @@ func TestMember(t *testing.T) { }) t.Run("Should not get member back when not set", func(t *testing.T) { - tm := &testMember{} - ok, err := fsm.Member("1234", tm) + _, ok, err := fsm.Member("1234") assert.False(t, ok, "There should not be a leader since we just set it") assert.NoError(t, err, "There should be no error retreiving the leader") }) diff --git a/ha/ha.go b/ha/ha.go index ea8cb2fc..1822b09d 100644 --- a/ha/ha.go +++ b/ha/ha.go @@ -210,9 +210,8 @@ func (ha *SingleLeaderHA) raceRetryForInit(eb *backoff.ExponentialBackOff) (bool } func (ha *SingleLeaderHA) waitForLeader(eb *backoff.ExponentialBackOff) (fsm.Leader, error) { - leader := ha.service.FSMLeaderTemplate() for { - exists, err := ha.fsm.Leader(leader) + leaderData, exists, err := ha.fsm.Leader() if err != nil { return nil, err } @@ -226,6 +225,11 @@ func (ha *SingleLeaderHA) waitForLeader(eb *backoff.ExponentialBackOff) (fsm.Lea continue } + leader, err := ha.service.FSMLeaderFromBytes(leaderData) + if err != nil { + return nil, errors.Wrap(err, "Error converting bytes to leader") + } + return leader, nil } } @@ -289,8 +293,7 @@ func (ha *SingleLeaderHA) RunCycle() error { return err } - tempMember := ha.service.FSMMemberTemplate() - exists, err := ha.fsm.Member(member.ID(), tempMember) + _, exists, err := ha.fsm.Member(member.ID()) if err != nil { return errors.Wrap(err, "Error checking for member existance") } diff --git a/service/postgresql.go b/service/postgresql.go index 1367b963..8a2a1341 100644 --- a/service/postgresql.go +++ b/service/postgresql.go @@ -210,14 +210,6 @@ func (p *postgresql) FSMMemberFromBytes(data []byte) (fsm.Member, error) { return member, nil } -func (p *postgresql) FSMLeaderTemplate() fsm.Leader { - return &clusterMember{} -} - -func (p *postgresql) FSMMemberTemplate() fsm.Member { - return &clusterMember{} -} - // TODO: Change interface to (bool, error)??? // TODO: Have HA ask FSM to give update of WAL and pass those in // That way we have consensus in the FSM of which node should be elected diff --git a/service/service.go b/service/service.go index fc0852d4..ab1c675e 100644 --- a/service/service.go +++ b/service/service.go @@ -53,9 +53,6 @@ type SingleLeaderService interface { FSMMemberFromBytes(data []byte) (fsm.Member, error) FSMLeaderFromBytes(data []byte) (fsm.Leader, error) - FSMMemberTemplate() fsm.Member - FSMLeaderTemplate() fsm.Leader - IsHealthy() bool IsRunning() bool Ping() error // Check