Skip to content

Commit

Permalink
Export RegisterSub
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanTinianov committed Jan 10, 2025
1 parent 40c5272 commit a012866
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
8 changes: 4 additions & 4 deletions multinode/adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func (m *Adapter[RPC, HEAD]) LenSubs() int {
return len(m.subs)
}

// registerSub adds the sub to the rpcMultiNodeAdapter list
func (m *Adapter[RPC, HEAD]) registerSub(sub *ManagedSubscription, stopInFLightCh chan struct{}) error {
// RegisterSub adds the sub to the Adaptor list
func (m *Adapter[RPC, HEAD]) RegisterSub(sub *ManagedSubscription, stopInFLightCh chan struct{}) error {
// ensure that the `sub` belongs to current life cycle of the `rpcMultiNodeAdapter` and it should not be killed due to
// previous `DisconnectAll` call.
select {
Expand Down Expand Up @@ -146,7 +146,7 @@ func (m *Adapter[RPC, HEAD]) SubscribeToHeads(ctx context.Context) (<-chan HEAD,
onUnsubscribe: m.removeSub,
}

err := m.registerSub(sub, chStopInFlight)
err := m.RegisterSub(sub, chStopInFlight)
if err != nil {
sub.Unsubscribe()
return nil, nil, err
Expand Down Expand Up @@ -179,7 +179,7 @@ func (m *Adapter[RPC, HEAD]) SubscribeToFinalizedHeads(ctx context.Context) (<-c
onUnsubscribe: m.removeSub,
}

err := m.registerSub(sub, chStopInFlight)
err := m.RegisterSub(sub, chStopInFlight)
if err != nil {
sub.Unsubscribe()
return nil, nil, err
Expand Down
10 changes: 5 additions & 5 deletions multinode/adaptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,14 @@ func (s *mockSub) Err() <-chan error {
}

func TestMultiNodeClient_RegisterSubs(t *testing.T) {
t.Run("registerSub", func(t *testing.T) {
t.Run("RegisterSub", func(t *testing.T) {
c := newTestClient(t)
mockSub := newMockSub()
sub := &ManagedSubscription{
Subscription: mockSub,
onUnsubscribe: c.removeSub,
}
err := c.registerSub(sub, make(chan struct{}))
err := c.RegisterSub(sub, make(chan struct{}))
require.NoError(t, err)
require.Equal(t, 1, c.LenSubs())
c.UnsubscribeAllExcept()
Expand All @@ -153,7 +153,7 @@ func TestMultiNodeClient_RegisterSubs(t *testing.T) {
Subscription: mockSub,
onUnsubscribe: c.removeSub,
}
err := c.registerSub(sub, chStopInFlight)
err := c.RegisterSub(sub, chStopInFlight)
require.Error(t, err)
require.True(t, mockSub.unsubscribed)
})
Expand All @@ -171,9 +171,9 @@ func TestMultiNodeClient_RegisterSubs(t *testing.T) {
Subscription: mockSub2,
onUnsubscribe: c.removeSub,
}
err := c.registerSub(sub1, chStopInFlight)
err := c.RegisterSub(sub1, chStopInFlight)
require.NoError(t, err)
err = c.registerSub(sub2, chStopInFlight)
err = c.RegisterSub(sub2, chStopInFlight)
require.NoError(t, err)
require.Equal(t, 2, c.LenSubs())

Expand Down

0 comments on commit a012866

Please sign in to comment.