Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create MultiNode RPCClientBase #7

Open
wants to merge 43 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
e196d79
Create adaptor
DylanTinianov Jan 10, 2025
d3fcab1
Add managed subscriptions
DylanTinianov Jan 10, 2025
3542ebf
lint
DylanTinianov Jan 10, 2025
3cb0f86
Rename Adaptor
DylanTinianov Jan 10, 2025
b2897eb
Update adaptor.go
DylanTinianov Jan 10, 2025
4d0e51e
Update adaptor_test.go
DylanTinianov Jan 10, 2025
99351ea
Add AdaptorConfig
DylanTinianov Jan 10, 2025
40c5272
Fix config
DylanTinianov Jan 10, 2025
a012866
Export RegisterSub
DylanTinianov Jan 10, 2025
25fe8d0
Export RemoveSub
DylanTinianov Jan 10, 2025
cac62e7
Create NewManagedSub
DylanTinianov Jan 10, 2025
5e53423
Remove cfg
DylanTinianov Jan 10, 2025
cfbc9d2
fix registersub
DylanTinianov Jan 10, 2025
50d459d
Update adaptor.go
DylanTinianov Jan 10, 2025
5ae4b1f
Add Subscriptions
DylanTinianov Jan 14, 2025
45b9ace
Export functions
DylanTinianov Jan 14, 2025
0334e1a
Update components
DylanTinianov Jan 14, 2025
4e9116d
Add ResetLatestChainInfo
DylanTinianov Jan 14, 2025
c278fa8
Add total difficulty
DylanTinianov Jan 14, 2025
f9ca2ef
Export StateMu
DylanTinianov Jan 15, 2025
a3f6d57
Merge branch 'main' into BCFR-1071-multinode-adaptor
DylanTinianov Jan 16, 2025
c9963c1
Update adaptor.go
DylanTinianov Jan 16, 2025
0540085
Merge branch 'BCFR-1071-multinode-adaptor' of https://github.com/smar…
DylanTinianov Jan 16, 2025
321938c
Use TotalDifficulty
DylanTinianov Jan 16, 2025
705a698
GetTotalDifficulty
DylanTinianov Jan 16, 2025
f643a5b
Unexport chStopInFlight
DylanTinianov Jan 16, 2025
b2fec74
Update adaptor_test.go
DylanTinianov Jan 16, 2025
7d2049f
Generate mocks
DylanTinianov Jan 21, 2025
34e81e3
Merge branch 'main' into BCFR-1071-multinode-adaptor
DylanTinianov Jan 21, 2025
de533c7
Merge branch 'main' into BCFR-1071-multinode-adaptor
DylanTinianov Jan 21, 2025
e428185
Fix naming
DylanTinianov Jan 21, 2025
8b3951a
Remove generic RPC
DylanTinianov Jan 28, 2025
0aeab85
Merge branch 'main' into BCFR-1071-multinode-adaptor
DylanTinianov Jan 28, 2025
addef94
Update adapter.go
DylanTinianov Jan 28, 2025
3f623d3
Add test coverage
DylanTinianov Jan 28, 2025
5860d61
Fix mutex naming
DylanTinianov Jan 29, 2025
7f11d38
Update comment
DylanTinianov Jan 29, 2025
ce429b0
Update adapter.go
DylanTinianov Jan 29, 2025
957a767
Update comments
DylanTinianov Jan 29, 2025
1e50e0b
Update naming
DylanTinianov Jan 29, 2025
28c0ebb
Update comment
DylanTinianov Jan 29, 2025
9bf17e5
Update naming
DylanTinianov Jan 30, 2025
656181e
Rename file
DylanTinianov Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion multinode/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ func CtxAddHealthCheckFlag(ctx context.Context) context.Context {
return context.WithValue(ctx, contextKeyHeathCheckRequest, struct{}{})
}

func CtxIsHeathCheckRequest(ctx context.Context) bool {
func CtxIsHealthCheckRequest(ctx context.Context) bool {
return ctx.Value(contextKeyHeathCheckRequest) != nil
}
4 changes: 2 additions & 2 deletions multinode/ctx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

func TestContext(t *testing.T) {
ctx := tests.Context(t)
assert.False(t, CtxIsHeathCheckRequest(ctx), "expected false for test context")
assert.False(t, CtxIsHealthCheckRequest(ctx), "expected false for test context")
ctx = CtxAddHealthCheckFlag(ctx)
assert.True(t, CtxIsHeathCheckRequest(ctx), "expected context to contain the healthcheck flag")
assert.True(t, CtxIsHealthCheckRequest(ctx), "expected context to contain the healthcheck flag")
}
47 changes: 47 additions & 0 deletions multinode/mock_head_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions multinode/node_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ func (h head) ToMockHead(t *testing.T) *mockHead {
m := newMockHead(t)
m.On("BlockNumber").Return(h.BlockNumber).Maybe()
m.On("BlockDifficulty").Return(h.BlockDifficulty).Maybe()
m.On("GetTotalDifficulty").Return(h.BlockDifficulty).Maybe()
m.On("IsValid").Return(true).Maybe()
return m
}
Expand Down
298 changes: 298 additions & 0 deletions multinode/rpc_client_base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,298 @@
package multinode

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
)

type RPCClientBaseConfig interface {
NewHeadsPollInterval() time.Duration
FinalizedBlockPollInterval() time.Duration
}

// RPCClientBase is used to integrate multinode into chain-specific clients.
// For new MultiNode integrations, we wrap the RPC client and inherit from the RPCClientBase
// to get the required RPCClient methods and enable the use of MultiNode.
//
// The RPCClientBase provides chain-agnostic functionality such as head and finalized head
// subscriptions, which are required in each Node lifecycle to execute various
// health checks.
type RPCClientBase[HEAD Head] struct {
cfg RPCClientBaseConfig
log logger.Logger
ctxTimeout time.Duration
subsMu sync.RWMutex
subs map[Subscription]struct{}

latestBlock func(ctx context.Context) (HEAD, error)
latestFinalizedBlock func(ctx context.Context) (HEAD, error)

// lifeCycleCh can be closed to immediately cancel all in-flight requests on
// this RPC. Closing and replacing should be serialized through
// lifeCycleMu since it can happen on state transitions as well as RPCClientBase Close.
// Also closed when RPC is declared unhealthy.
lifeCycleMu sync.RWMutex
lifeCycleCh chan struct{}

// chainInfoLock protects highestUserObservations and latestChainInfo
chainInfoLock sync.RWMutex
// intercepted values seen by callers of the RPCClientBase excluding health check calls. Need to ensure MultiNode provides repeatable read guarantee
highestUserObservations ChainInfo
// most recent chain info observed during current lifecycle
latestChainInfo ChainInfo
}

func NewRPCClientBase[HEAD Head](
cfg RPCClientBaseConfig, ctxTimeout time.Duration, log logger.Logger,
latestBlock func(ctx context.Context) (HEAD, error),
latestFinalizedBlock func(ctx context.Context) (HEAD, error),
) *RPCClientBase[HEAD] {
return &RPCClientBase[HEAD]{
cfg: cfg,
log: log,
ctxTimeout: ctxTimeout,
latestBlock: latestBlock,
latestFinalizedBlock: latestFinalizedBlock,
subs: make(map[Subscription]struct{}),
lifeCycleCh: make(chan struct{}),
}
}

func (m *RPCClientBase[HEAD]) LenSubs() int {
m.subsMu.RLock()
defer m.subsMu.RUnlock()
return len(m.subs)
}

// RegisterSub adds the sub to the RPCClientBase list and returns a managed sub which is removed on unsubscribe
func (m *RPCClientBase[HEAD]) RegisterSub(sub Subscription, lifeCycleCh chan struct{}) (*ManagedSubscription, error) {
// ensure that the `sub` belongs to current life cycle of the `rpcMultiNodeAdapter` and it should not be killed due to
// previous `DisconnectAll` call.
select {
case <-lifeCycleCh:
sub.Unsubscribe()
return nil, fmt.Errorf("failed to register subscription - all in-flight requests were canceled")
default:
}
m.subsMu.Lock()
defer m.subsMu.Unlock()
managedSub := &ManagedSubscription{
sub,
m.removeSub,
}
m.subs[managedSub] = struct{}{}
return managedSub, nil
}

func (m *RPCClientBase[HEAD]) removeSub(sub Subscription) {
m.subsMu.Lock()
defer m.subsMu.Unlock()
delete(m.subs, sub)
}

func (m *RPCClientBase[HEAD]) SubscribeToHeads(ctx context.Context) (<-chan HEAD, Subscription, error) {
ctx, cancel, lifeCycleCh := m.AcquireQueryCtx(ctx, m.ctxTimeout)
defer cancel()

pollInterval := m.cfg.NewHeadsPollInterval()
if pollInterval == 0 {
return nil, nil, errors.New("PollInterval is 0")
}
timeout := pollInterval
poller, channel := NewPoller[HEAD](pollInterval, func(pollRequestCtx context.Context) (HEAD, error) {
if CtxIsHealthCheckRequest(ctx) {
pollRequestCtx = CtxAddHealthCheckFlag(pollRequestCtx)
}
return m.LatestBlock(pollRequestCtx)
}, timeout, m.log)

if err := poller.Start(ctx); err != nil {
return nil, nil, err
}

sub, err := m.RegisterSub(&poller, lifeCycleCh)
if err != nil {
sub.Unsubscribe()
return nil, nil, err
}
return channel, sub, nil
}

func (m *RPCClientBase[HEAD]) SubscribeToFinalizedHeads(ctx context.Context) (<-chan HEAD, Subscription, error) {
ctx, cancel, lifeCycleCh := m.AcquireQueryCtx(ctx, m.ctxTimeout)
defer cancel()

finalizedBlockPollInterval := m.cfg.FinalizedBlockPollInterval()
if finalizedBlockPollInterval == 0 {
return nil, nil, errors.New("FinalizedBlockPollInterval is 0")
}
timeout := finalizedBlockPollInterval
poller, channel := NewPoller[HEAD](finalizedBlockPollInterval, func(pollRequestCtx context.Context) (HEAD, error) {
if CtxIsHealthCheckRequest(ctx) {
pollRequestCtx = CtxAddHealthCheckFlag(pollRequestCtx)
}
return m.LatestFinalizedBlock(pollRequestCtx)
}, timeout, m.log)
if err := poller.Start(ctx); err != nil {
return nil, nil, err
}

sub, err := m.RegisterSub(&poller, lifeCycleCh)
if err != nil {
poller.Unsubscribe()
return nil, nil, err
}
return channel, sub, nil
}

func (m *RPCClientBase[HEAD]) LatestBlock(ctx context.Context) (HEAD, error) {
// capture lifeCycleCh to ensure we are not updating chainInfo with observations related to previous life cycle
ctx, cancel, lifeCycleCh := m.AcquireQueryCtx(ctx, m.ctxTimeout)
defer cancel()

head, err := m.latestBlock(ctx)
if err != nil {
return head, err
}

if !head.IsValid() {
return head, errors.New("invalid head")
}

m.OnNewHead(ctx, lifeCycleCh, head)
return head, nil
}

func (m *RPCClientBase[HEAD]) LatestFinalizedBlock(ctx context.Context) (HEAD, error) {
ctx, cancel, lifeCycleCh := m.AcquireQueryCtx(ctx, m.ctxTimeout)
defer cancel()

head, err := m.latestFinalizedBlock(ctx)
if err != nil {
return head, err
}

if !head.IsValid() {
return head, errors.New("invalid head")
}

m.OnNewFinalizedHead(ctx, lifeCycleCh, head)
return head, nil
}

func (m *RPCClientBase[HEAD]) OnNewHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) {
if !head.IsValid() {
return
}

m.chainInfoLock.Lock()
defer m.chainInfoLock.Unlock()
blockNumber := head.BlockNumber()
totalDifficulty := head.GetTotalDifficulty()
if !CtxIsHealthCheckRequest(ctx) {
m.highestUserObservations.BlockNumber = max(m.highestUserObservations.BlockNumber, blockNumber)
m.highestUserObservations.TotalDifficulty = MaxTotalDifficulty(m.highestUserObservations.TotalDifficulty, totalDifficulty)
}
select {
case <-requestCh: // no need to update latestChainInfo, as rpcMultiNodeAdapter already started new life cycle
return
default:
m.latestChainInfo.BlockNumber = blockNumber
m.latestChainInfo.TotalDifficulty = totalDifficulty
}
}

func (m *RPCClientBase[HEAD]) OnNewFinalizedHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) {
if !head.IsValid() {
return
}

m.chainInfoLock.Lock()
defer m.chainInfoLock.Unlock()
if !CtxIsHealthCheckRequest(ctx) {
m.highestUserObservations.FinalizedBlockNumber = max(m.highestUserObservations.FinalizedBlockNumber, head.BlockNumber())
}
select {
case <-requestCh: // no need to update latestChainInfo, as rpcMultiNodeAdapter already started new life cycle
return
default:
m.latestChainInfo.FinalizedBlockNumber = head.BlockNumber()
}
}

// makeQueryCtx returns a context that cancels if:
// 1. Passed in ctx cancels
// 2. Passed in channel is closed
// 3. Default timeout is reached (queryTimeout)
func makeQueryCtx(ctx context.Context, ch services.StopChan, timeout time.Duration) (context.Context, context.CancelFunc) {
var chCancel, timeoutCancel context.CancelFunc
ctx, chCancel = ch.Ctx(ctx)
ctx, timeoutCancel = context.WithTimeout(ctx, timeout)
cancel := func() {
chCancel()
timeoutCancel()
}
return ctx, cancel
}

func (m *RPCClientBase[HEAD]) AcquireQueryCtx(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc,
lifeCycleCh chan struct{}) {
// Need to wrap in mutex because state transition can cancel and replace context
m.lifeCycleMu.RLock()
lifeCycleCh = m.lifeCycleCh
m.lifeCycleMu.RUnlock()
ctx, cancel = makeQueryCtx(parentCtx, lifeCycleCh, timeout)
return
}

func (m *RPCClientBase[HEAD]) UnsubscribeAllExcept(subs ...Subscription) {
m.subsMu.Lock()
keepSubs := map[Subscription]struct{}{}
for _, sub := range subs {
keepSubs[sub] = struct{}{}
}

var unsubs []Subscription
for sub := range m.subs {
if _, keep := keepSubs[sub]; !keep {
unsubs = append(unsubs, sub)
}
}
m.subsMu.Unlock()

for _, sub := range unsubs {
sub.Unsubscribe()
}
}

// CancelLifeCycle closes and replaces the lifeCycleCh
func (m *RPCClientBase[HEAD]) CancelLifeCycle() {
m.lifeCycleMu.Lock()
defer m.lifeCycleMu.Unlock()
close(m.lifeCycleCh)
m.lifeCycleCh = make(chan struct{})
}

func (m *RPCClientBase[HEAD]) resetLatestChainInfo() {
m.chainInfoLock.Lock()
m.latestChainInfo = ChainInfo{}
m.chainInfoLock.Unlock()
}

func (m *RPCClientBase[HEAD]) Close() {
m.CancelLifeCycle()
m.UnsubscribeAllExcept()
m.resetLatestChainInfo()
}

func (m *RPCClientBase[HEAD]) GetInterceptedChainInfo() (latest, highestUserObservations ChainInfo) {
m.chainInfoLock.RLock()
defer m.chainInfoLock.RUnlock()
return m.latestChainInfo, m.highestUserObservations
}
Loading
Loading