Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create MultiNode RPCClientBase #7

Merged
merged 44 commits into from
Jan 30, 2025
Merged
Changes from 1 commit
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
e196d79
Create adaptor
DylanTinianov Jan 10, 2025
d3fcab1
Add managed subscriptions
DylanTinianov Jan 10, 2025
3542ebf
lint
DylanTinianov Jan 10, 2025
3cb0f86
Rename Adaptor
DylanTinianov Jan 10, 2025
b2897eb
Update adaptor.go
DylanTinianov Jan 10, 2025
4d0e51e
Update adaptor_test.go
DylanTinianov Jan 10, 2025
99351ea
Add AdaptorConfig
DylanTinianov Jan 10, 2025
40c5272
Fix config
DylanTinianov Jan 10, 2025
a012866
Export RegisterSub
DylanTinianov Jan 10, 2025
25fe8d0
Export RemoveSub
DylanTinianov Jan 10, 2025
cac62e7
Create NewManagedSub
DylanTinianov Jan 10, 2025
5e53423
Remove cfg
DylanTinianov Jan 10, 2025
cfbc9d2
fix registersub
DylanTinianov Jan 10, 2025
50d459d
Update adaptor.go
DylanTinianov Jan 10, 2025
5ae4b1f
Add Subscriptions
DylanTinianov Jan 14, 2025
45b9ace
Export functions
DylanTinianov Jan 14, 2025
0334e1a
Update components
DylanTinianov Jan 14, 2025
4e9116d
Add ResetLatestChainInfo
DylanTinianov Jan 14, 2025
c278fa8
Add total difficulty
DylanTinianov Jan 14, 2025
f9ca2ef
Export StateMu
DylanTinianov Jan 15, 2025
a3f6d57
Merge branch 'main' into BCFR-1071-multinode-adaptor
DylanTinianov Jan 16, 2025
c9963c1
Update adaptor.go
DylanTinianov Jan 16, 2025
0540085
Merge branch 'BCFR-1071-multinode-adaptor' of https://github.com/smar…
DylanTinianov Jan 16, 2025
321938c
Use TotalDifficulty
DylanTinianov Jan 16, 2025
705a698
GetTotalDifficulty
DylanTinianov Jan 16, 2025
f643a5b
Unexport chStopInFlight
DylanTinianov Jan 16, 2025
b2fec74
Update adaptor_test.go
DylanTinianov Jan 16, 2025
7d2049f
Generate mocks
DylanTinianov Jan 21, 2025
34e81e3
Merge branch 'main' into BCFR-1071-multinode-adaptor
DylanTinianov Jan 21, 2025
de533c7
Merge branch 'main' into BCFR-1071-multinode-adaptor
DylanTinianov Jan 21, 2025
e428185
Fix naming
DylanTinianov Jan 21, 2025
8b3951a
Remove generic RPC
DylanTinianov Jan 28, 2025
0aeab85
Merge branch 'main' into BCFR-1071-multinode-adaptor
DylanTinianov Jan 28, 2025
addef94
Update adapter.go
DylanTinianov Jan 28, 2025
3f623d3
Add test coverage
DylanTinianov Jan 28, 2025
5860d61
Fix mutex naming
DylanTinianov Jan 29, 2025
7f11d38
Update comment
DylanTinianov Jan 29, 2025
ce429b0
Update adapter.go
DylanTinianov Jan 29, 2025
957a767
Update comments
DylanTinianov Jan 29, 2025
1e50e0b
Update naming
DylanTinianov Jan 29, 2025
28c0ebb
Update comment
DylanTinianov Jan 29, 2025
9bf17e5
Update naming
DylanTinianov Jan 30, 2025
656181e
Rename file
DylanTinianov Jan 30, 2025
d1adf37
Fix naming
DylanTinianov Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Create adaptor
DylanTinianov committed Jan 10, 2025
commit e196d79795fafff9632ebd84fa70a6e52de95e59
273 changes: 273 additions & 0 deletions multinode/adaptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
package multinode

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-framework/multinode/config"
)

// MultiNodeAdapter is used to integrate multinode into chain-specific clients
type MultiNodeAdapter[RPC any, HEAD Head] struct {

Check warning on line 16 in multinode/adaptor.go

GitHub Actions / golangci-lint

exported: type name will be used as multinode.MultiNodeAdapter by other packages, and that stutters; consider calling this Adapter (revive)
cfg *config.MultiNodeConfig
log logger.Logger
rpc *RPC
ctxTimeout time.Duration
stateMu sync.RWMutex // protects state* fields
subsSliceMu sync.RWMutex
subs map[Subscription]struct{}

latestBlock func(ctx context.Context, rpc *RPC) (HEAD, error)
latestFinalizedBlock func(ctx context.Context, rpc *RPC) (HEAD, error)

// chStopInFlight can be closed to immediately cancel all in-flight requests on
// this RpcMultiNodeAdapter. Closing and replacing should be serialized through
// stateMu since it can happen on state transitions as well as RpcMultiNodeAdapter Close.
chStopInFlight chan struct{}

chainInfoLock sync.RWMutex
// intercepted values seen by callers of the rpcMultiNodeAdapter excluding health check calls. Need to ensure MultiNode provides repeatable read guarantee
highestUserObservations ChainInfo
// most recent chain info observed during current lifecycle (reseted on DisconnectAll)
latestChainInfo ChainInfo
}

func NewMultiNodeAdapter[RPC any, HEAD Head](
cfg *config.MultiNodeConfig, rpc *RPC, ctxTimeout time.Duration, log logger.Logger,
latestBlock func(ctx context.Context, rpc *RPC) (HEAD, error),
latestFinalizedBlock func(ctx context.Context, rpc *RPC) (HEAD, error),
) (*MultiNodeAdapter[RPC, HEAD], error) {
return &MultiNodeAdapter[RPC, HEAD]{
cfg: cfg,
rpc: rpc,
log: log,
ctxTimeout: ctxTimeout,
latestBlock: latestBlock,
latestFinalizedBlock: latestFinalizedBlock,
subs: make(map[Subscription]struct{}),
chStopInFlight: make(chan struct{}),
}, nil
}

func (m *MultiNodeAdapter[RPC, HEAD]) LenSubs() int {
m.subsSliceMu.RLock()
defer m.subsSliceMu.RUnlock()
return len(m.subs)
}

// registerSub adds the sub to the rpcMultiNodeAdapter list
func (m *MultiNodeAdapter[RPC, HEAD]) registerSub(sub Subscription, stopInFLightCh chan struct{}) error {
m.subsSliceMu.Lock()
defer m.subsSliceMu.Unlock()
// ensure that the `sub` belongs to current life cycle of the `rpcMultiNodeAdapter` and it should not be killed due to
// previous `DisconnectAll` call.
select {
case <-stopInFLightCh:
sub.Unsubscribe()
return fmt.Errorf("failed to register subscription - all in-flight requests were canceled")
default:
}
// TODO: BCI-3358 - delete sub when caller unsubscribes.
m.subs[sub] = struct{}{}
return nil
}

func (m *MultiNodeAdapter[RPC, HEAD]) LatestBlock(ctx context.Context) (HEAD, error) {
// capture chStopInFlight to ensure we are not updating chainInfo with observations related to previous life cycle
ctx, cancel, chStopInFlight, rpc := m.AcquireQueryCtx(ctx, m.ctxTimeout)
defer cancel()

head, err := m.latestBlock(ctx, rpc)
if err != nil {
return head, err
}

if !head.IsValid() {
return head, errors.New("invalid head")
}

m.onNewHead(ctx, chStopInFlight, head)
return head, nil
}

func (m *MultiNodeAdapter[RPC, HEAD]) LatestFinalizedBlock(ctx context.Context) (HEAD, error) {
ctx, cancel, chStopInFlight, rpc := m.AcquireQueryCtx(ctx, m.ctxTimeout)
defer cancel()

head, err := m.latestFinalizedBlock(ctx, rpc)
if err != nil {
return head, err
}

if !head.IsValid() {
return head, errors.New("invalid head")
}

m.OnNewFinalizedHead(ctx, chStopInFlight, head)
return head, nil
}

func (m *MultiNodeAdapter[RPC, HEAD]) SubscribeToHeads(ctx context.Context) (<-chan HEAD, Subscription, error) {
ctx, cancel, chStopInFlight, _ := m.AcquireQueryCtx(ctx, m.ctxTimeout)
defer cancel()

// TODO: BCFR-1070 - Add BlockPollInterval
pollInterval := m.cfg.FinalizedBlockPollInterval() // Use same interval as finalized polling
if pollInterval == 0 {
return nil, nil, errors.New("PollInterval is 0")
}
timeout := pollInterval
poller, channel := NewPoller[HEAD](pollInterval, func(pollRequestCtx context.Context) (HEAD, error) {
if CtxIsHeathCheckRequest(ctx) {
pollRequestCtx = CtxAddHealthCheckFlag(pollRequestCtx)
}
return m.LatestBlock(pollRequestCtx)
}, timeout, m.log)

if err := poller.Start(ctx); err != nil {
return nil, nil, err
}

err := m.registerSub(&poller, chStopInFlight)
if err != nil {
poller.Unsubscribe()
return nil, nil, err
}

return channel, &poller, nil
}

func (m *MultiNodeAdapter[RPC, HEAD]) SubscribeToFinalizedHeads(ctx context.Context) (<-chan HEAD, Subscription, error) {
ctx, cancel, chStopInFlight, _ := m.AcquireQueryCtx(ctx, m.ctxTimeout)
defer cancel()

finalizedBlockPollInterval := m.cfg.FinalizedBlockPollInterval()
if finalizedBlockPollInterval == 0 {
return nil, nil, errors.New("FinalizedBlockPollInterval is 0")
}
timeout := finalizedBlockPollInterval
poller, channel := NewPoller[HEAD](finalizedBlockPollInterval, func(pollRequestCtx context.Context) (HEAD, error) {
if CtxIsHeathCheckRequest(ctx) {
pollRequestCtx = CtxAddHealthCheckFlag(pollRequestCtx)
}
return m.LatestFinalizedBlock(pollRequestCtx)
}, timeout, m.log)
if err := poller.Start(ctx); err != nil {
return nil, nil, err
}

err := m.registerSub(&poller, chStopInFlight)
if err != nil {
poller.Unsubscribe()
return nil, nil, err
}

return channel, &poller, nil
}

func (m *MultiNodeAdapter[RPC, HEAD]) onNewHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) {
if !head.IsValid() {
return
}

m.chainInfoLock.Lock()
defer m.chainInfoLock.Unlock()
if !CtxIsHeathCheckRequest(ctx) {
m.highestUserObservations.BlockNumber = max(m.highestUserObservations.BlockNumber, head.BlockNumber())
}
select {
case <-requestCh: // no need to update latestChainInfo, as rpcMultiNodeAdapter already started new life cycle
return
default:
m.latestChainInfo.BlockNumber = head.BlockNumber()
}
}

func (m *MultiNodeAdapter[RPC, HEAD]) OnNewFinalizedHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) {
if !head.IsValid() {
return
}

m.chainInfoLock.Lock()
defer m.chainInfoLock.Unlock()
if !CtxIsHeathCheckRequest(ctx) {
m.highestUserObservations.FinalizedBlockNumber = max(m.highestUserObservations.FinalizedBlockNumber, head.BlockNumber())
}
select {
case <-requestCh: // no need to update latestChainInfo, as rpcMultiNodeAdapter already started new life cycle
return
default:
m.latestChainInfo.FinalizedBlockNumber = head.BlockNumber()
}
}

// MakeQueryCtx returns a context that cancels if:
// 1. Passed in ctx cancels
// 2. Passed in channel is closed
// 3. Default timeout is reached (queryTimeout)
func MakeQueryCtx(ctx context.Context, ch services.StopChan, timeout time.Duration) (context.Context, context.CancelFunc) {
var chCancel, timeoutCancel context.CancelFunc
ctx, chCancel = ch.Ctx(ctx)
ctx, timeoutCancel = context.WithTimeout(ctx, timeout)
cancel := func() {
chCancel()
timeoutCancel()
}
return ctx, cancel
}

func (m *MultiNodeAdapter[RPC, HEAD]) AcquireQueryCtx(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc,
chStopInFlight chan struct{}, raw *RPC) {
// Need to wrap in mutex because state transition can cancel and replace context
m.stateMu.RLock()
chStopInFlight = m.chStopInFlight
cp := *m.rpc
raw = &cp
m.stateMu.RUnlock()
ctx, cancel = MakeQueryCtx(parentCtx, chStopInFlight, timeout)
return
}

func (m *MultiNodeAdapter[RPC, HEAD]) UnsubscribeAllExcept(subs ...Subscription) {
m.subsSliceMu.Lock()
defer m.subsSliceMu.Unlock()

keepSubs := map[Subscription]struct{}{}
for _, sub := range subs {
keepSubs[sub] = struct{}{}
}

for sub := range m.subs {
if _, keep := keepSubs[sub]; !keep {
sub.Unsubscribe()
delete(m.subs, sub)
}
}
}

// cancelInflightRequests closes and replaces the chStopInFlight
func (m *MultiNodeAdapter[RPC, HEAD]) cancelInflightRequests() {
m.stateMu.Lock()
defer m.stateMu.Unlock()
close(m.chStopInFlight)
m.chStopInFlight = make(chan struct{})
}

func (m *MultiNodeAdapter[RPC, HEAD]) Close() {
m.cancelInflightRequests()
m.UnsubscribeAllExcept()
m.chainInfoLock.Lock()
m.latestChainInfo = ChainInfo{}
m.chainInfoLock.Unlock()
}

func (m *MultiNodeAdapter[RPC, HEAD]) GetInterceptedChainInfo() (latest, highestUserObservations ChainInfo) {
m.chainInfoLock.Lock()
defer m.chainInfoLock.Unlock()
return m.latestChainInfo, m.highestUserObservations
}
Loading