Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create MultiNode RPCClientBase #7

Merged
merged 44 commits into from
Jan 30, 2025
Merged
Changes from 1 commit
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
e196d79
Create adaptor
DylanTinianov Jan 10, 2025
d3fcab1
Add managed subscriptions
DylanTinianov Jan 10, 2025
3542ebf
lint
DylanTinianov Jan 10, 2025
3cb0f86
Rename Adaptor
DylanTinianov Jan 10, 2025
b2897eb
Update adaptor.go
DylanTinianov Jan 10, 2025
4d0e51e
Update adaptor_test.go
DylanTinianov Jan 10, 2025
99351ea
Add AdaptorConfig
DylanTinianov Jan 10, 2025
40c5272
Fix config
DylanTinianov Jan 10, 2025
a012866
Export RegisterSub
DylanTinianov Jan 10, 2025
25fe8d0
Export RemoveSub
DylanTinianov Jan 10, 2025
cac62e7
Create NewManagedSub
DylanTinianov Jan 10, 2025
5e53423
Remove cfg
DylanTinianov Jan 10, 2025
cfbc9d2
fix registersub
DylanTinianov Jan 10, 2025
50d459d
Update adaptor.go
DylanTinianov Jan 10, 2025
5ae4b1f
Add Subscriptions
DylanTinianov Jan 14, 2025
45b9ace
Export functions
DylanTinianov Jan 14, 2025
0334e1a
Update components
DylanTinianov Jan 14, 2025
4e9116d
Add ResetLatestChainInfo
DylanTinianov Jan 14, 2025
c278fa8
Add total difficulty
DylanTinianov Jan 14, 2025
f9ca2ef
Export StateMu
DylanTinianov Jan 15, 2025
a3f6d57
Merge branch 'main' into BCFR-1071-multinode-adaptor
DylanTinianov Jan 16, 2025
c9963c1
Update adaptor.go
DylanTinianov Jan 16, 2025
0540085
Merge branch 'BCFR-1071-multinode-adaptor' of https://github.com/smar…
DylanTinianov Jan 16, 2025
321938c
Use TotalDifficulty
DylanTinianov Jan 16, 2025
705a698
GetTotalDifficulty
DylanTinianov Jan 16, 2025
f643a5b
Unexport chStopInFlight
DylanTinianov Jan 16, 2025
b2fec74
Update adaptor_test.go
DylanTinianov Jan 16, 2025
7d2049f
Generate mocks
DylanTinianov Jan 21, 2025
34e81e3
Merge branch 'main' into BCFR-1071-multinode-adaptor
DylanTinianov Jan 21, 2025
de533c7
Merge branch 'main' into BCFR-1071-multinode-adaptor
DylanTinianov Jan 21, 2025
e428185
Fix naming
DylanTinianov Jan 21, 2025
8b3951a
Remove generic RPC
DylanTinianov Jan 28, 2025
0aeab85
Merge branch 'main' into BCFR-1071-multinode-adaptor
DylanTinianov Jan 28, 2025
addef94
Update adapter.go
DylanTinianov Jan 28, 2025
3f623d3
Add test coverage
DylanTinianov Jan 28, 2025
5860d61
Fix mutex naming
DylanTinianov Jan 29, 2025
7f11d38
Update comment
DylanTinianov Jan 29, 2025
ce429b0
Update adapter.go
DylanTinianov Jan 29, 2025
957a767
Update comments
DylanTinianov Jan 29, 2025
1e50e0b
Update naming
DylanTinianov Jan 29, 2025
28c0ebb
Update comment
DylanTinianov Jan 29, 2025
9bf17e5
Update naming
DylanTinianov Jan 30, 2025
656181e
Rename file
DylanTinianov Jan 30, 2025
d1adf37
Fix naming
DylanTinianov Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove cfg
DylanTinianov committed Jan 10, 2025
commit 5e53423461ab4533af56da82a3d48491b3ba661b
84 changes: 11 additions & 73 deletions multinode/adaptor.go
Original file line number Diff line number Diff line change
@@ -11,14 +11,8 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/services"
)

type AdaptorConfig interface {
NewHeadsPollInterval() time.Duration
FinalizedBlockPollInterval() time.Duration
}

// Adapter is used to integrate multinode into chain-specific clients
type Adapter[RPC any, HEAD Head] struct {
cfg AdaptorConfig
log logger.Logger
rpc *RPC
ctxTimeout time.Duration
@@ -42,12 +36,11 @@ type Adapter[RPC any, HEAD Head] struct {
}

func NewAdapter[RPC any, HEAD Head](
cfg AdaptorConfig, rpc *RPC, ctxTimeout time.Duration, log logger.Logger,
rpc *RPC, ctxTimeout time.Duration, log logger.Logger,
latestBlock func(ctx context.Context, rpc *RPC) (HEAD, error),
latestFinalizedBlock func(ctx context.Context, rpc *RPC) (HEAD, error),
) *Adapter[RPC, HEAD] {
return &Adapter[RPC, HEAD]{
cfg: cfg,
rpc: rpc,
log: log,
ctxTimeout: ctxTimeout,
@@ -64,23 +57,27 @@ func (m *Adapter[RPC, HEAD]) LenSubs() int {
return len(m.subs)
}

// RegisterSub adds the sub to the Adaptor list
func (m *Adapter[RPC, HEAD]) RegisterSub(sub *ManagedSubscription, stopInFLightCh chan struct{}) error {
// RegisterSub adds the sub to the Adaptor list and returns a sub which is managed on unsubscribe
func (m *Adapter[RPC, HEAD]) RegisterSub(sub Subscription, stopInFLightCh chan struct{}) (*ManagedSubscription, error) {
// ensure that the `sub` belongs to current life cycle of the `rpcMultiNodeAdapter` and it should not be killed due to
// previous `DisconnectAll` call.
select {
case <-stopInFLightCh:
sub.Unsubscribe()
return fmt.Errorf("failed to register subscription - all in-flight requests were canceled")
return nil, fmt.Errorf("failed to register subscription - all in-flight requests were canceled")
default:
}
m.subsSliceMu.Lock()
defer m.subsSliceMu.Unlock()
m.subs[sub] = struct{}{}
return nil
managedSub := &ManagedSubscription{
sub,
m.removeSub,
}
m.subs[managedSub] = struct{}{}
return managedSub, nil
}

func (m *Adapter[RPC, HEAD]) RemoveSub(sub Subscription) {
func (m *Adapter[RPC, HEAD]) removeSub(sub Subscription) {
m.subsSliceMu.Lock()
defer m.subsSliceMu.Unlock()
delete(m.subs, sub)
@@ -121,65 +118,6 @@ func (m *Adapter[RPC, HEAD]) LatestFinalizedBlock(ctx context.Context) (HEAD, er
return head, nil
}

func (m *Adapter[RPC, HEAD]) SubscribeToHeads(ctx context.Context) (<-chan HEAD, Subscription, error) {
ctx, cancel, chStopInFlight, _ := m.AcquireQueryCtx(ctx, m.ctxTimeout)
defer cancel()

pollInterval := m.cfg.NewHeadsPollInterval()
if pollInterval == 0 {
return nil, nil, errors.New("PollInterval is 0")
}
timeout := pollInterval
poller, channel := NewPoller[HEAD](pollInterval, func(pollRequestCtx context.Context) (HEAD, error) {
if CtxIsHeathCheckRequest(ctx) {
pollRequestCtx = CtxAddHealthCheckFlag(pollRequestCtx)
}
return m.LatestBlock(pollRequestCtx)
}, timeout, m.log)

if err := poller.Start(ctx); err != nil {
return nil, nil, err
}

sub := NewManagedSubscription(&poller, m.RemoveSub)
err := m.RegisterSub(sub, chStopInFlight)
if err != nil {
sub.Unsubscribe()
return nil, nil, err
}

return channel, sub, nil
}

func (m *Adapter[RPC, HEAD]) SubscribeToFinalizedHeads(ctx context.Context) (<-chan HEAD, Subscription, error) {
ctx, cancel, chStopInFlight, _ := m.AcquireQueryCtx(ctx, m.ctxTimeout)
defer cancel()

finalizedBlockPollInterval := m.cfg.FinalizedBlockPollInterval()
if finalizedBlockPollInterval == 0 {
return nil, nil, errors.New("FinalizedBlockPollInterval is 0")
}
timeout := finalizedBlockPollInterval
poller, channel := NewPoller[HEAD](finalizedBlockPollInterval, func(pollRequestCtx context.Context) (HEAD, error) {
if CtxIsHeathCheckRequest(ctx) {
pollRequestCtx = CtxAddHealthCheckFlag(pollRequestCtx)
}
return m.LatestFinalizedBlock(pollRequestCtx)
}, timeout, m.log)
if err := poller.Start(ctx); err != nil {
return nil, nil, err
}

sub := NewManagedSubscription(&poller, m.RemoveSub)
err := m.RegisterSub(sub, chStopInFlight)
if err != nil {
sub.Unsubscribe()
return nil, nil, err
}

return channel, sub, nil
}

func (m *Adapter[RPC, HEAD]) onNewHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) {
if !head.IsValid() {
return
100 changes: 6 additions & 94 deletions multinode/adaptor_test.go
Original file line number Diff line number Diff line change
@@ -8,10 +8,8 @@

"github.com/stretchr/testify/require"

common "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/smartcontractkit/chainlink-framework/multinode/config"
)

type testRPC struct {
@@ -31,34 +29,14 @@
return &testHead{rpc.latestBlock}, nil
}

func ptr[T any](t T) *T {

Check failure on line 32 in multinode/adaptor_test.go

GitHub Actions / golangci-lint

func `ptr` is unused (unused)
return &t
}

func newTestClient(t *testing.T) *Adapter[testRPC, *testHead] {
requestTimeout := 5 * time.Second
lggr := logger.Test(t)
cfg := &config.MultiNodeConfig{
MultiNode: config.MultiNode{
Enabled: ptr(true),
PollFailureThreshold: ptr(uint32(5)),
PollInterval: common.MustNewDuration(15 * time.Second),
SelectionMode: ptr(NodeSelectionModePriorityLevel),
SyncThreshold: ptr(uint32(10)),
LeaseDuration: common.MustNewDuration(time.Minute),
NodeIsSyncingEnabled: ptr(false),
NewHeadsPollInterval: common.MustNewDuration(5 * time.Second),
FinalizedBlockPollInterval: common.MustNewDuration(5 * time.Second),
EnforceRepeatableRead: ptr(true),
DeathDeclarationDelay: common.MustNewDuration(20 * time.Second),
NodeNoNewHeadsThreshold: common.MustNewDuration(20 * time.Second),
NoNewFinalizedHeadsThreshold: common.MustNewDuration(20 * time.Second),
FinalityTagEnabled: ptr(true),
FinalityDepth: ptr(uint32(0)),
FinalizedBlockOffset: ptr(uint32(50)),
},
}
c := NewAdapter[testRPC, *testHead](cfg, &testRPC{}, requestTimeout, lggr, LatestBlock, LatestBlock)
c := NewAdapter[testRPC, *testHead](&testRPC{}, requestTimeout, lggr, LatestBlock, LatestBlock)
t.Cleanup(c.Close)
return c
}
@@ -79,42 +57,6 @@
})
}

func TestMultiNodeClient_HeadSubscriptions(t *testing.T) {
t.Run("SubscribeToHeads", func(t *testing.T) {
c := newTestClient(t)
ch, sub, err := c.SubscribeToHeads(tests.Context(t))
require.NoError(t, err)
defer sub.Unsubscribe()

ctx, cancel := context.WithTimeout(tests.Context(t), time.Minute)
defer cancel()
select {
case head := <-ch:
latest, _ := c.GetInterceptedChainInfo()
require.Equal(t, head.BlockNumber(), latest.BlockNumber)
case <-ctx.Done():
t.Fatal("failed to receive head: ", ctx.Err())
}
})

t.Run("SubscribeToFinalizedHeads", func(t *testing.T) {
c := newTestClient(t)
finalizedCh, finalizedSub, err := c.SubscribeToFinalizedHeads(tests.Context(t))
require.NoError(t, err)
defer finalizedSub.Unsubscribe()

ctx, cancel := context.WithTimeout(tests.Context(t), time.Minute)
defer cancel()
select {
case finalizedHead := <-finalizedCh:
latest, _ := c.GetInterceptedChainInfo()
require.Equal(t, finalizedHead.BlockNumber(), latest.FinalizedBlockNumber)
case <-ctx.Done():
t.Fatal("failed to receive finalized head: ", ctx.Err())
}
})
}

type mockSub struct {
unsubscribed bool
}
@@ -134,9 +76,9 @@
t.Run("RegisterSub", func(t *testing.T) {
c := newTestClient(t)
mockSub := newMockSub()
sub := NewManagedSubscription(mockSub, c.RemoveSub)
err := c.RegisterSub(sub, make(chan struct{}))
sub, err := c.RegisterSub(mockSub, make(chan struct{}))
require.NoError(t, err)
require.NotNil(t, sub)
require.Equal(t, 1, c.LenSubs())
c.UnsubscribeAllExcept()
})
@@ -146,8 +88,7 @@
chStopInFlight := make(chan struct{})
close(chStopInFlight)
mockSub := newMockSub()
sub := NewManagedSubscription(mockSub, c.RemoveSub)
err := c.RegisterSub(sub, chStopInFlight)
_, err := c.RegisterSub(mockSub, chStopInFlight)
require.Error(t, err)
require.True(t, mockSub.unsubscribed)
})
@@ -156,12 +97,10 @@
c := newTestClient(t)
chStopInFlight := make(chan struct{})
mockSub1 := newMockSub()
sub1 := NewManagedSubscription(mockSub1, c.RemoveSub)
mockSub2 := newMockSub()
sub2 := NewManagedSubscription(mockSub2, c.RemoveSub)
err := c.RegisterSub(sub1, chStopInFlight)
sub1, err := c.RegisterSub(mockSub1, chStopInFlight)
require.NoError(t, err)
err = c.RegisterSub(sub2, chStopInFlight)
_, err = c.RegisterSub(mockSub2, chStopInFlight)
require.NoError(t, err)
require.Equal(t, 2, c.LenSubs())

@@ -175,31 +114,4 @@
require.Equal(t, 0, c.LenSubs())
require.True(t, mockSub1.unsubscribed)
})

t.Run("Remove Subscription on Unsubscribe", func(t *testing.T) {
c := newTestClient(t)
_, sub1, err := c.SubscribeToHeads(tests.Context(t))
require.NoError(t, err)
require.Equal(t, 1, c.LenSubs())
_, sub2, err := c.SubscribeToFinalizedHeads(tests.Context(t))
require.NoError(t, err)
require.Equal(t, 2, c.LenSubs())

sub1.Unsubscribe()
require.Equal(t, 1, c.LenSubs())
sub2.Unsubscribe()
require.Equal(t, 0, c.LenSubs())
})

t.Run("Ensure no deadlock on UnsubscribeAll", func(t *testing.T) {
c := newTestClient(t)
_, _, err := c.SubscribeToHeads(tests.Context(t))
require.NoError(t, err)
require.Equal(t, 1, c.LenSubs())
_, _, err = c.SubscribeToFinalizedHeads(tests.Context(t))
require.NoError(t, err)
require.Equal(t, 2, c.LenSubs())
c.UnsubscribeAllExcept()
require.Equal(t, 0, c.LenSubs())
})
}
7 changes: 0 additions & 7 deletions multinode/types.go
Original file line number Diff line number Diff line change
@@ -45,13 +45,6 @@ func (w *ManagedSubscription) Unsubscribe() {
}
}

func NewManagedSubscription(sub Subscription, onUnsubscribe func(sub Subscription)) *ManagedSubscription {
return &ManagedSubscription{
sub,
onUnsubscribe,
}
}

// RPCClient includes all the necessary generalized RPC methods used by Node to perform health checks
type RPCClient[
CHAIN_ID ID,