Skip to content

Commit

Permalink
Merge pull request #317 from forta-network/caner/json-rpc-redundancy
Browse files Browse the repository at this point in the history
Support a ring of fallback RPCs in Ethereum client
  • Loading branch information
canercidam authored Jan 2, 2025
2 parents 62240fa + 98ddb55 commit 873cb25
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 42 deletions.
108 changes: 72 additions & 36 deletions ethereum/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/forta-network/forta-core-go/clients/health"
"github.com/forta-network/forta-core-go/domain"
"github.com/forta-network/forta-core-go/ethereum/provider"
"github.com/forta-network/forta-core-go/utils"
"github.com/forta-network/forta-core-go/utils/httpclient"

Expand Down Expand Up @@ -108,11 +109,10 @@ var maxBackoff = 1 * time.Minute

// streamEthClient wraps a go-ethereum client purpose-built for streaming txs (with long retries/timeouts)
type streamEthClient struct {
apiName string
rpcClient RPCClient
subscriber Subscriber
retryInterval time.Duration
isWebsocket bool
apiName string
rpcClientProvider provider.Provider[Subscriber]
retryInterval time.Duration
isWebsocket bool

lastBlockByNumberReq health.TimeTracker
lastBlockByNumberErr health.ErrorTracker
Expand All @@ -130,7 +130,7 @@ type RetryOptions struct {

// Close invokes close on the underlying client
func (e *streamEthClient) Close() {
e.rpcClient.Close()
e.rpcClientProvider.Close()
}

func (e *streamEthClient) SetRetryInterval(d time.Duration) {
Expand All @@ -154,8 +154,9 @@ func isPermanentError(err error) bool {
}

// withBackoff wraps an operation in an exponential backoff logic
func withBackoff(
ctx context.Context, name string, operation func(ctx context.Context) error, options RetryOptions,
func (e *streamEthClient) withBackoff(
ctx context.Context, name string,
operation func(ctx context.Context, rpcClient Subscriber) error, options RetryOptions,
timeTracker *health.TimeTracker, errorTracker *health.ErrorTracker,
) error {
bo := backoff.NewExponentialBackOff()
Expand All @@ -176,14 +177,18 @@ func withBackoff(
}

tCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
err := operation(tCtx)
err := operation(tCtx, e.rpcClientProvider.Provide())
cancel()
if timeTracker != nil {
timeTracker.Set()
}
if errorTracker != nil {
errorTracker.Set(err)
}
if err != nil {
// Move onto the next provider.
e.rpcClientProvider.Next()
}
if err == nil {
//success, returning now avoids failing on context timeouts in certain edge cases
return nil
Expand Down Expand Up @@ -213,8 +218,8 @@ func (e *streamEthClient) BlockByHash(ctx context.Context, hash string) (*domain
name := fmt.Sprintf("%s(%s)", blocksByHash, hash)
log.Debugf(name)
var result domain.Block
err := withBackoff(ctx, name, func(ctx context.Context) error {
err := e.rpcClient.CallContext(ctx, &result, blocksByHash, hash, true)
err := e.withBackoff(ctx, name, func(ctx context.Context, rpcClient Subscriber) error {
err := rpcClient.CallContext(ctx, &result, blocksByHash, hash, true)
if err != nil {
return err
}
Expand All @@ -235,8 +240,8 @@ func (e *streamEthClient) TraceBlock(ctx context.Context, number *big.Int) ([]do
name := fmt.Sprintf("%s(%s)", traceBlock, number)
log.Debugf(name)
var result []domain.Trace
err := withBackoff(ctx, name, func(ctx context.Context) error {
err := e.rpcClient.CallContext(ctx, &result, traceBlock, utils.BigIntToHex(number))
err := e.withBackoff(ctx, name, func(ctx context.Context, rpcClient Subscriber) error {
err := rpcClient.CallContext(ctx, &result, traceBlock, utils.BigIntToHex(number))
if err != nil {
return err
}
Expand Down Expand Up @@ -270,8 +275,8 @@ func (e *streamEthClient) DebugTraceCall(

args := []interface{}{req, block, traceCallConfig}

err := withBackoff(ctx, name, func(ctx context.Context) error {
err := e.rpcClient.CallContext(ctx, &result, debugTraceCall, args...)
err := e.withBackoff(ctx, name, func(ctx context.Context, rpcClient Subscriber) error {
err := rpcClient.CallContext(ctx, &result, debugTraceCall, args...)
if err != nil {
return err
}
Expand All @@ -296,8 +301,8 @@ func (e *streamEthClient) GetLogs(ctx context.Context, q ethereum.FilterQuery) (
return nil, err
}

err = withBackoff(ctx, name, func(ctx context.Context) error {
return e.rpcClient.CallContext(ctx, &result, getLogs, args)
err = e.withBackoff(ctx, name, func(ctx context.Context, rpcClient Subscriber) error {
return rpcClient.CallContext(ctx, &result, getLogs, args)
}, RetryOptions{
MinBackoff: pointDur(e.retryInterval),
MaxElapsedTime: pointDur(12 * time.Hour),
Expand All @@ -320,8 +325,8 @@ func (e *streamEthClient) BlockByNumber(ctx context.Context, number *big.Int) (*
name := fmt.Sprintf("%s(%s)", blocksByNumber, numDisplay)
log.Debugf(name)

err := withBackoff(ctx, name, func(ctx context.Context) error {
err := e.rpcClient.CallContext(ctx, &result, blocksByNumber, numArg, true)
err := e.withBackoff(ctx, name, func(ctx context.Context, rpcClient Subscriber) error {
err := rpcClient.CallContext(ctx, &result, blocksByNumber, numArg, true)
if err != nil {
return err
}
Expand All @@ -341,8 +346,8 @@ func (e *streamEthClient) BlockByNumber(ctx context.Context, number *big.Int) (*
func (e *streamEthClient) BlockNumber(ctx context.Context) (*big.Int, error) {
log.Debugf(blockNumber)
var result string
err := withBackoff(ctx, blockNumber, func(ctx context.Context) error {
return e.rpcClient.CallContext(ctx, &result, blockNumber)
err := e.withBackoff(ctx, blockNumber, func(ctx context.Context, rpcClient Subscriber) error {
return rpcClient.CallContext(ctx, &result, blockNumber)
}, RetryOptions{
MaxElapsedTime: pointDur(12 * time.Hour),
}, nil, nil)
Expand All @@ -356,8 +361,8 @@ func (e *streamEthClient) BlockNumber(ctx context.Context) (*big.Int, error) {
func (e *streamEthClient) ChainID(ctx context.Context) (*big.Int, error) {
log.Debugf(chainId)
var result string
err := withBackoff(ctx, chainId, func(ctx context.Context) error {
return e.rpcClient.CallContext(ctx, &result, chainId)
err := e.withBackoff(ctx, chainId, func(ctx context.Context, rpcClient Subscriber) error {
return rpcClient.CallContext(ctx, &result, chainId)
}, RetryOptions{
MaxElapsedTime: pointDur(1 * time.Minute),
}, nil, nil)
Expand All @@ -372,8 +377,8 @@ func (e *streamEthClient) TransactionReceipt(ctx context.Context, txHash string)
name := fmt.Sprintf("%s(%s)", transactionReceipt, txHash)
log.Debugf(name)
var result domain.TransactionReceipt
err := withBackoff(ctx, name, func(ctx context.Context) error {
if err := e.rpcClient.CallContext(ctx, &result, transactionReceipt, txHash); err != nil {
err := e.withBackoff(ctx, name, func(ctx context.Context, rpcClient Subscriber) error {
if err := rpcClient.CallContext(ctx, &result, transactionReceipt, txHash); err != nil {
return err
}
if result.TransactionHash == nil {
Expand All @@ -399,8 +404,8 @@ func (e *streamEthClient) GetTransactionCount(ctx context.Context, address strin

log.Debugf(name)
var result string
err := withBackoff(ctx, name, func(ctx context.Context) error {
err := e.rpcClient.CallContext(ctx, &result, getTransactionCount, address, block)
err := e.withBackoff(ctx, name, func(ctx context.Context, rpcClient Subscriber) error {
err := rpcClient.CallContext(ctx, &result, getTransactionCount, address, block)
if err != nil {
return err
}
Expand All @@ -427,7 +432,8 @@ func (e *streamEthClient) SubscribeToHead(ctx context.Context) (domain.HeaderCh,
log.Debug("subscribing to blockchain head")
recvCh := make(chan *types.Header)
sendCh := make(chan *types.Header)
sub, err := e.subscriber.Subscribe(ctx, "eth", recvCh, "newHeads")
// We currently do not support multiple RPCs here.
sub, err := e.rpcClientProvider.Provide().Subscribe(ctx, "eth", recvCh, "newHeads")
if err != nil {
return nil, fmt.Errorf("failed to subscribe: %v", err)
}
Expand Down Expand Up @@ -512,25 +518,55 @@ func NewRpcClient(ctx context.Context, url string) (*rpc.Client, error) {
return rpc.DialHTTPWithClient(url, &client)
}

// NewStreamEthClient creates a new ethereum client
func NewStreamEthClient(ctx context.Context, apiName, apiURL string) (*streamEthClient, error) {
func newInternalRPCClient(ctx context.Context, apiURL string) (*rpcClient, error) {
rClient, err := NewRpcClient(ctx, apiURL)
if err != nil {
return nil, err
}
rClient.SetHeader("Content-Type", "application/json")
return &rpcClient{Client: rClient}, nil
}

return NewStreamEthClientWithRPCClient(ctx, apiName, isWebsocket(apiURL), &rpcClient{Client: rClient})
// NewStreamEthClientMulti creates a new ethereum client with multiple RPCs.
func NewStreamEthClientMulti(ctx context.Context, apiName string, apiURLs ...string) (*streamEthClient, error) {
if apiURLs == nil {
return nil, errors.New("no api urls provided")
}
if len(apiURLs) == 1 {
return NewStreamEthClient(ctx, apiName, apiURLs[0])
}
var rpcClients []Subscriber
for _, apiURL := range apiURLs {
c, err := newInternalRPCClient(ctx, apiURL)
if err != nil {
return nil, err
}
rpcClients = append(rpcClients, c)
}
return &streamEthClient{
apiName: apiName,
rpcClientProvider: provider.NewRingProvider(rpcClients...),
retryInterval: defaultRetryInterval,
isWebsocket: false, // TODO: Support multiple websockets later if necessary.
}, nil
}

// NewStreamEthClient creates a new ethereum client.
func NewStreamEthClient(ctx context.Context, apiName, apiURL string) (*streamEthClient, error) {
c, err := newInternalRPCClient(ctx, apiURL)
if err != nil {
return nil, err
}
return NewStreamEthClientWithRPCClient(ctx, apiName, isWebsocket(apiURL), c)
}

// NewStreamEthClientWithRPCClient creates a new ethereum client
func NewStreamEthClientWithRPCClient(ctx context.Context, apiName string, isWs bool, rpcClient Subscriber) (*streamEthClient, error) {
return &streamEthClient{
apiName: apiName,
rpcClient: rpcClient,
subscriber: rpcClient,
retryInterval: defaultRetryInterval,
isWebsocket: isWs,
apiName: apiName,
rpcClientProvider: provider.NewRingProvider(rpcClient),
retryInterval: defaultRetryInterval,
isWebsocket: isWs,
}, nil
}

Expand Down
20 changes: 16 additions & 4 deletions ethereum/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/forta-network/forta-core-go/domain"
mock_domain "github.com/forta-network/forta-core-go/domain/mocks"
mocks "github.com/forta-network/forta-core-go/ethereum/mocks"
"github.com/forta-network/forta-core-go/ethereum/provider"
)

const testBlockHash = "0x4fc0862e76691f5312964883954d5c2db35e2b8f7a4f191775a4f50c69804a8d"
Expand All @@ -38,11 +39,22 @@ func initClient(t *testing.T) (*streamEthClient, *mocks.MockSubscriber, context.
func TestEthClient_BlockByHash(t *testing.T) {
r := require.New(t)

ethClient, client, ctx := initClient(t)
minBackoff = 1 * time.Millisecond
maxBackoff = 1 * time.Millisecond
ctx := context.Background()
ctrl := gomock.NewController(t)
client1 := mocks.NewMockSubscriber(ctrl)
client2 := mocks.NewMockSubscriber(ctrl)

ethClient := &streamEthClient{
rpcClientProvider: provider.NewRingProvider(Subscriber(client1), Subscriber(client2)),
retryInterval: defaultRetryInterval,
}
hash := testBlockHash
// verify retry
client.EXPECT().CallContext(gomock.Any(), gomock.Any(), blocksByHash, testBlockHash).Return(testErr).Times(1)
client.EXPECT().CallContext(gomock.Any(), gomock.Any(), blocksByHash, testBlockHash).DoAndReturn(func(ctx context.Context, result interface{}, method string, args ...interface{}) error {
client1.EXPECT().CallContext(gomock.Any(), gomock.Any(), blocksByHash, testBlockHash).Return(testErr)
client2.EXPECT().CallContext(gomock.Any(), gomock.Any(), blocksByHash, testBlockHash).Return(testErr)
client1.EXPECT().CallContext(gomock.Any(), gomock.Any(), blocksByHash, testBlockHash).Return(testErr)
client2.EXPECT().CallContext(gomock.Any(), gomock.Any(), blocksByHash, testBlockHash).DoAndReturn(func(ctx context.Context, result interface{}, method string, args ...interface{}) error {
b, _ := json.Marshal(domain.Block{Hash: hash})
return json.Unmarshal(b, result)
}).Times(1)
Expand Down
47 changes: 47 additions & 0 deletions ethereum/provider/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package provider

import (
"github.com/forta-network/forta-core-go/utils/slicering"
)

// Element is the type of element provided by the provider.
type Element interface {
Close()
}

// Provider keeps one or many elements provides when requested.
type Provider[T Element] interface {
Provide() T
Next() T
Close()
}

// RingProvider provides elements from a slice-backed thread-safe ring.
type RingProvider[T Element] struct {
*slicering.ThreadSafeRing[T]
}

// NewRingProvider creates a new ring provider.
func NewRingProvider[T Element](elements ...T) Provider[T] {
return &RingProvider[T]{ThreadSafeRing: slicering.NewThreadSafeRing(elements...)}
}

// Provide provides the currently pointed element.
func (rp *RingProvider[T]) Provide() T {
return rp.Current()
}

// Close closes all elements in the ring.
func (rp *RingProvider[T]) Close() {
for _, el := range rp.Elements() {
el.Close()
}
}

// Ensuring type checks below.

var _ Provider[*dummyElement] = &RingProvider[*dummyElement]{}

type dummyElement struct{}

func (dc *dummyElement) Close() {}
38 changes: 38 additions & 0 deletions ethereum/provider/provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package provider_test

import (
"testing"

"github.com/forta-network/forta-core-go/ethereum/provider"
"github.com/stretchr/testify/require"
)

type testElement struct {
closed bool
}

func (te *testElement) Close() {
te.closed = true
}

func TestRingProvider(t *testing.T) {
r := require.New(t)

el1 := &testElement{}
el2 := &testElement{}

p := provider.NewRingProvider(el1, el2)
r.Equal(el1, p.Provide())
r.Equal(el1, p.Provide())
r.Equal(el2, p.Next())
r.Equal(el2, p.Provide())
r.Equal(el1, p.Next())

r.False(el1.closed)
r.False(el2.closed)

p.Close()

r.True(el1.closed)
r.True(el2.closed)
}
4 changes: 2 additions & 2 deletions registry/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
func testListener(ctx context.Context, filter *ContractFilter, topic string, handlers Handlers) Listener {
l, err := NewListener(ctx, ListenerConfig{
Name: "listener",
JsonRpcURL: "https://rpc.ankr.com/polygon",
JsonRpcURL: "https://polygon-rpc.com",
ENSAddress: defaultEnsAddress,
ContractFilter: filter,
Topics: []string{topic},
Expand All @@ -46,7 +46,7 @@ func testBaseSepoliaListener(ctx context.Context, filter *ContractFilter, topic
l, err := NewListener(
ctx, ListenerConfig{
Name: "listener",
JsonRpcURL: "https://rpc.ankr.com/base_sepolia",
JsonRpcURL: "https://base-sepolia-rpc.publicnode.com",
// ENSAddress: devConfig.ENSAddress,
ENSAddress: "0x650AFCA8545964064b60ad040F9a09F788F714ed",
ContractFilter: filter,
Expand Down
Loading

0 comments on commit 873cb25

Please sign in to comment.