Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support a ring of fallback RPCs in Ethereum client #317

Merged
merged 3 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 72 additions & 36 deletions ethereum/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/forta-network/forta-core-go/clients/health"
"github.com/forta-network/forta-core-go/domain"
"github.com/forta-network/forta-core-go/ethereum/provider"
"github.com/forta-network/forta-core-go/utils"
"github.com/forta-network/forta-core-go/utils/httpclient"

Expand Down Expand Up @@ -108,11 +109,10 @@ var maxBackoff = 1 * time.Minute

// streamEthClient wraps a go-ethereum client purpose-built for streaming txs (with long retries/timeouts)
type streamEthClient struct {
apiName string
rpcClient RPCClient
subscriber Subscriber
retryInterval time.Duration
isWebsocket bool
apiName string
rpcClientProvider provider.Provider[Subscriber]
retryInterval time.Duration
isWebsocket bool

lastBlockByNumberReq health.TimeTracker
lastBlockByNumberErr health.ErrorTracker
Expand All @@ -130,7 +130,7 @@ type RetryOptions struct {

// Close invokes close on the underlying client
func (e *streamEthClient) Close() {
e.rpcClient.Close()
e.rpcClientProvider.Close()
}

func (e *streamEthClient) SetRetryInterval(d time.Duration) {
Expand All @@ -154,8 +154,9 @@ func isPermanentError(err error) bool {
}

// withBackoff wraps an operation in an exponential backoff logic
func withBackoff(
ctx context.Context, name string, operation func(ctx context.Context) error, options RetryOptions,
func (e *streamEthClient) withBackoff(
ctx context.Context, name string,
operation func(ctx context.Context, rpcClient Subscriber) error, options RetryOptions,
timeTracker *health.TimeTracker, errorTracker *health.ErrorTracker,
) error {
bo := backoff.NewExponentialBackOff()
Expand All @@ -176,14 +177,18 @@ func withBackoff(
}

tCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
err := operation(tCtx)
err := operation(tCtx, e.rpcClientProvider.Provide())
cancel()
if timeTracker != nil {
timeTracker.Set()
}
if errorTracker != nil {
errorTracker.Set(err)
}
if err != nil {
// Move onto the next provider.
e.rpcClientProvider.Next()
}
if err == nil {
//success, returning now avoids failing on context timeouts in certain edge cases
return nil
Expand Down Expand Up @@ -213,8 +218,8 @@ func (e *streamEthClient) BlockByHash(ctx context.Context, hash string) (*domain
name := fmt.Sprintf("%s(%s)", blocksByHash, hash)
log.Debugf(name)
var result domain.Block
err := withBackoff(ctx, name, func(ctx context.Context) error {
err := e.rpcClient.CallContext(ctx, &result, blocksByHash, hash, true)
err := e.withBackoff(ctx, name, func(ctx context.Context, rpcClient Subscriber) error {
err := rpcClient.CallContext(ctx, &result, blocksByHash, hash, true)
if err != nil {
return err
}
Expand All @@ -235,8 +240,8 @@ func (e *streamEthClient) TraceBlock(ctx context.Context, number *big.Int) ([]do
name := fmt.Sprintf("%s(%s)", traceBlock, number)
log.Debugf(name)
var result []domain.Trace
err := withBackoff(ctx, name, func(ctx context.Context) error {
err := e.rpcClient.CallContext(ctx, &result, traceBlock, utils.BigIntToHex(number))
err := e.withBackoff(ctx, name, func(ctx context.Context, rpcClient Subscriber) error {
err := rpcClient.CallContext(ctx, &result, traceBlock, utils.BigIntToHex(number))
if err != nil {
return err
}
Expand Down Expand Up @@ -270,8 +275,8 @@ func (e *streamEthClient) DebugTraceCall(

args := []interface{}{req, block, traceCallConfig}

err := withBackoff(ctx, name, func(ctx context.Context) error {
err := e.rpcClient.CallContext(ctx, &result, debugTraceCall, args...)
err := e.withBackoff(ctx, name, func(ctx context.Context, rpcClient Subscriber) error {
err := rpcClient.CallContext(ctx, &result, debugTraceCall, args...)
if err != nil {
return err
}
Expand All @@ -296,8 +301,8 @@ func (e *streamEthClient) GetLogs(ctx context.Context, q ethereum.FilterQuery) (
return nil, err
}

err = withBackoff(ctx, name, func(ctx context.Context) error {
return e.rpcClient.CallContext(ctx, &result, getLogs, args)
err = e.withBackoff(ctx, name, func(ctx context.Context, rpcClient Subscriber) error {
return rpcClient.CallContext(ctx, &result, getLogs, args)
}, RetryOptions{
MinBackoff: pointDur(e.retryInterval),
MaxElapsedTime: pointDur(12 * time.Hour),
Expand All @@ -320,8 +325,8 @@ func (e *streamEthClient) BlockByNumber(ctx context.Context, number *big.Int) (*
name := fmt.Sprintf("%s(%s)", blocksByNumber, numDisplay)
log.Debugf(name)

err := withBackoff(ctx, name, func(ctx context.Context) error {
err := e.rpcClient.CallContext(ctx, &result, blocksByNumber, numArg, true)
err := e.withBackoff(ctx, name, func(ctx context.Context, rpcClient Subscriber) error {
err := rpcClient.CallContext(ctx, &result, blocksByNumber, numArg, true)
if err != nil {
return err
}
Expand All @@ -341,8 +346,8 @@ func (e *streamEthClient) BlockByNumber(ctx context.Context, number *big.Int) (*
func (e *streamEthClient) BlockNumber(ctx context.Context) (*big.Int, error) {
log.Debugf(blockNumber)
var result string
err := withBackoff(ctx, blockNumber, func(ctx context.Context) error {
return e.rpcClient.CallContext(ctx, &result, blockNumber)
err := e.withBackoff(ctx, blockNumber, func(ctx context.Context, rpcClient Subscriber) error {
return rpcClient.CallContext(ctx, &result, blockNumber)
}, RetryOptions{
MaxElapsedTime: pointDur(12 * time.Hour),
}, nil, nil)
Expand All @@ -356,8 +361,8 @@ func (e *streamEthClient) BlockNumber(ctx context.Context) (*big.Int, error) {
func (e *streamEthClient) ChainID(ctx context.Context) (*big.Int, error) {
log.Debugf(chainId)
var result string
err := withBackoff(ctx, chainId, func(ctx context.Context) error {
return e.rpcClient.CallContext(ctx, &result, chainId)
err := e.withBackoff(ctx, chainId, func(ctx context.Context, rpcClient Subscriber) error {
return rpcClient.CallContext(ctx, &result, chainId)
}, RetryOptions{
MaxElapsedTime: pointDur(1 * time.Minute),
}, nil, nil)
Expand All @@ -372,8 +377,8 @@ func (e *streamEthClient) TransactionReceipt(ctx context.Context, txHash string)
name := fmt.Sprintf("%s(%s)", transactionReceipt, txHash)
log.Debugf(name)
var result domain.TransactionReceipt
err := withBackoff(ctx, name, func(ctx context.Context) error {
if err := e.rpcClient.CallContext(ctx, &result, transactionReceipt, txHash); err != nil {
err := e.withBackoff(ctx, name, func(ctx context.Context, rpcClient Subscriber) error {
if err := rpcClient.CallContext(ctx, &result, transactionReceipt, txHash); err != nil {
return err
}
if result.TransactionHash == nil {
Expand All @@ -399,8 +404,8 @@ func (e *streamEthClient) GetTransactionCount(ctx context.Context, address strin

log.Debugf(name)
var result string
err := withBackoff(ctx, name, func(ctx context.Context) error {
err := e.rpcClient.CallContext(ctx, &result, getTransactionCount, address, block)
err := e.withBackoff(ctx, name, func(ctx context.Context, rpcClient Subscriber) error {
err := rpcClient.CallContext(ctx, &result, getTransactionCount, address, block)
if err != nil {
return err
}
Expand All @@ -427,7 +432,8 @@ func (e *streamEthClient) SubscribeToHead(ctx context.Context) (domain.HeaderCh,
log.Debug("subscribing to blockchain head")
recvCh := make(chan *types.Header)
sendCh := make(chan *types.Header)
sub, err := e.subscriber.Subscribe(ctx, "eth", recvCh, "newHeads")
// We currently do not support multiple RPCs here.
sub, err := e.rpcClientProvider.Provide().Subscribe(ctx, "eth", recvCh, "newHeads")
if err != nil {
return nil, fmt.Errorf("failed to subscribe: %v", err)
}
Expand Down Expand Up @@ -512,25 +518,55 @@ func NewRpcClient(ctx context.Context, url string) (*rpc.Client, error) {
return rpc.DialHTTPWithClient(url, &client)
}

// NewStreamEthClient creates a new ethereum client
func NewStreamEthClient(ctx context.Context, apiName, apiURL string) (*streamEthClient, error) {
func newInternalRPCClient(ctx context.Context, apiURL string) (*rpcClient, error) {
rClient, err := NewRpcClient(ctx, apiURL)
if err != nil {
return nil, err
}
rClient.SetHeader("Content-Type", "application/json")
return &rpcClient{Client: rClient}, nil
}

return NewStreamEthClientWithRPCClient(ctx, apiName, isWebsocket(apiURL), &rpcClient{Client: rClient})
// NewStreamEthClientMulti creates a new ethereum client with multiple RPCs.
func NewStreamEthClientMulti(ctx context.Context, apiName string, apiURLs ...string) (*streamEthClient, error) {
if apiURLs == nil {
return nil, errors.New("no api urls provided")
}
if len(apiURLs) == 1 {
return NewStreamEthClient(ctx, apiName, apiURLs[0])
}
var rpcClients []Subscriber
for _, apiURL := range apiURLs {
c, err := newInternalRPCClient(ctx, apiURL)
if err != nil {
return nil, err
}
rpcClients = append(rpcClients, c)
}
return &streamEthClient{
apiName: apiName,
rpcClientProvider: provider.NewRingProvider(rpcClients...),
retryInterval: defaultRetryInterval,
isWebsocket: false, // TODO: Support multiple websockets later if necessary.
}, nil
}

// NewStreamEthClient creates a new ethereum client.
func NewStreamEthClient(ctx context.Context, apiName, apiURL string) (*streamEthClient, error) {
c, err := newInternalRPCClient(ctx, apiURL)
if err != nil {
return nil, err
}
return NewStreamEthClientWithRPCClient(ctx, apiName, isWebsocket(apiURL), c)
}

// NewStreamEthClientWithRPCClient creates a new ethereum client
func NewStreamEthClientWithRPCClient(ctx context.Context, apiName string, isWs bool, rpcClient Subscriber) (*streamEthClient, error) {
return &streamEthClient{
apiName: apiName,
rpcClient: rpcClient,
subscriber: rpcClient,
retryInterval: defaultRetryInterval,
isWebsocket: isWs,
apiName: apiName,
rpcClientProvider: provider.NewRingProvider(rpcClient),
retryInterval: defaultRetryInterval,
isWebsocket: isWs,
}, nil
}

Expand Down
20 changes: 16 additions & 4 deletions ethereum/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/forta-network/forta-core-go/domain"
mock_domain "github.com/forta-network/forta-core-go/domain/mocks"
mocks "github.com/forta-network/forta-core-go/ethereum/mocks"
"github.com/forta-network/forta-core-go/ethereum/provider"
)

const testBlockHash = "0x4fc0862e76691f5312964883954d5c2db35e2b8f7a4f191775a4f50c69804a8d"
Expand All @@ -38,11 +39,22 @@ func initClient(t *testing.T) (*streamEthClient, *mocks.MockSubscriber, context.
func TestEthClient_BlockByHash(t *testing.T) {
r := require.New(t)

ethClient, client, ctx := initClient(t)
minBackoff = 1 * time.Millisecond
maxBackoff = 1 * time.Millisecond
ctx := context.Background()
ctrl := gomock.NewController(t)
client1 := mocks.NewMockSubscriber(ctrl)
client2 := mocks.NewMockSubscriber(ctrl)

ethClient := &streamEthClient{
rpcClientProvider: provider.NewRingProvider(Subscriber(client1), Subscriber(client2)),
retryInterval: defaultRetryInterval,
}
hash := testBlockHash
// verify retry
client.EXPECT().CallContext(gomock.Any(), gomock.Any(), blocksByHash, testBlockHash).Return(testErr).Times(1)
client.EXPECT().CallContext(gomock.Any(), gomock.Any(), blocksByHash, testBlockHash).DoAndReturn(func(ctx context.Context, result interface{}, method string, args ...interface{}) error {
client1.EXPECT().CallContext(gomock.Any(), gomock.Any(), blocksByHash, testBlockHash).Return(testErr)
client2.EXPECT().CallContext(gomock.Any(), gomock.Any(), blocksByHash, testBlockHash).Return(testErr)
client1.EXPECT().CallContext(gomock.Any(), gomock.Any(), blocksByHash, testBlockHash).Return(testErr)
client2.EXPECT().CallContext(gomock.Any(), gomock.Any(), blocksByHash, testBlockHash).DoAndReturn(func(ctx context.Context, result interface{}, method string, args ...interface{}) error {
b, _ := json.Marshal(domain.Block{Hash: hash})
return json.Unmarshal(b, result)
}).Times(1)
Expand Down
47 changes: 47 additions & 0 deletions ethereum/provider/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package provider

import (
"github.com/forta-network/forta-core-go/utils/slicering"
)

// Element is the type of element provided by the provider.
type Element interface {
Close()
}

// Provider keeps one or many elements provides when requested.
type Provider[T Element] interface {
Provide() T
Next() T
Close()
}

// RingProvider provides elements from a slice-backed thread-safe ring.
type RingProvider[T Element] struct {
*slicering.ThreadSafeRing[T]
}

// NewRingProvider creates a new ring provider.
func NewRingProvider[T Element](elements ...T) Provider[T] {
return &RingProvider[T]{ThreadSafeRing: slicering.NewThreadSafeRing(elements...)}
}

// Provide provides the currently pointed element.
func (rp *RingProvider[T]) Provide() T {
return rp.Current()
}

// Close closes all elements in the ring.
func (rp *RingProvider[T]) Close() {
for _, el := range rp.Elements() {
el.Close()
}
}

// Ensuring type checks below.

var _ Provider[*dummyElement] = &RingProvider[*dummyElement]{}

type dummyElement struct{}

func (dc *dummyElement) Close() {}
38 changes: 38 additions & 0 deletions ethereum/provider/provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package provider_test

import (
"testing"

"github.com/forta-network/forta-core-go/ethereum/provider"
"github.com/stretchr/testify/require"
)

type testElement struct {
closed bool
}

func (te *testElement) Close() {
te.closed = true
}

func TestRingProvider(t *testing.T) {
r := require.New(t)

el1 := &testElement{}
el2 := &testElement{}

p := provider.NewRingProvider(el1, el2)
r.Equal(el1, p.Provide())
r.Equal(el1, p.Provide())
r.Equal(el2, p.Next())
r.Equal(el2, p.Provide())
r.Equal(el1, p.Next())

r.False(el1.closed)
r.False(el2.closed)

p.Close()

r.True(el1.closed)
r.True(el2.closed)
}
4 changes: 2 additions & 2 deletions registry/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
func testListener(ctx context.Context, filter *ContractFilter, topic string, handlers Handlers) Listener {
l, err := NewListener(ctx, ListenerConfig{
Name: "listener",
JsonRpcURL: "https://rpc.ankr.com/polygon",
JsonRpcURL: "https://polygon-rpc.com",
ENSAddress: defaultEnsAddress,
ContractFilter: filter,
Topics: []string{topic},
Expand All @@ -46,7 +46,7 @@ func testBaseSepoliaListener(ctx context.Context, filter *ContractFilter, topic
l, err := NewListener(
ctx, ListenerConfig{
Name: "listener",
JsonRpcURL: "https://rpc.ankr.com/base_sepolia",
JsonRpcURL: "https://base-sepolia-rpc.publicnode.com",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just nuke this test?

// ENSAddress: devConfig.ENSAddress,
ENSAddress: "0x650AFCA8545964064b60ad040F9a09F788F714ed",
ContractFilter: filter,
Expand Down
Loading
Loading