From 5f4c1e678dd0011e2856d28cc12c9f4ce77889b0 Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Tue, 12 Nov 2024 16:32:02 -0600 Subject: [PATCH] common/client: convert MultiNode to use *services.Engine --- common/client/multi_node.go | 123 ++++++++---------- common/client/multi_node_test.go | 39 ++---- common/client/node_lifecycle_test.go | 2 +- common/client/transaction_sender_test.go | 66 +++++----- core/chains/evm/client/rpc_client_test.go | 4 +- .../evm/gas/block_history_estimator_test.go | 3 +- .../evm/logpoller/observability_test.go | 6 +- 7 files changed, 101 insertions(+), 142 deletions(-) diff --git a/common/client/multi_node.go b/common/client/multi_node.go index 7d631954c54..5ac595161af 100644 --- a/common/client/multi_node.go +++ b/common/client/multi_node.go @@ -2,7 +2,6 @@ package client import ( "context" - "errors" "fmt" "math/big" "sync" @@ -32,7 +31,9 @@ type MultiNode[ CHAIN_ID types.ID, RPC any, ] struct { - services.StateMachine + services.Service + eng *services.Engine + primaryNodes []Node[CHAIN_ID, RPC] sendOnlyNodes []SendOnlyNode[CHAIN_ID, RPC] chainID CHAIN_ID @@ -47,9 +48,6 @@ type MultiNode[ activeMu sync.RWMutex activeNode Node[CHAIN_ID, RPC] - - chStop services.StopChan - wg sync.WaitGroup } func NewMultiNode[ @@ -73,15 +71,19 @@ func NewMultiNode[ primaryNodes: primaryNodes, sendOnlyNodes: sendOnlyNodes, chainID: chainID, - lggr: logger.Sugared(lggr).Named("MultiNode").With("chainID", chainID.String()), selectionMode: selectionMode, nodeSelector: nodeSelector, - chStop: make(services.StopChan), leaseDuration: leaseDuration, chainFamily: chainFamily, reportInterval: reportInterval, deathDeclarationDelay: deathDeclarationDelay, } + c.Service, c.eng = services.Config{ + Name: "MultiNode", + Start: c.start, + Close: c.close, + }.NewServiceEngine(logger.With(lggr, "chainID", chainID.String())) + c.lggr = c.eng.SugaredLogger c.lggr.Debugf("The MultiNode is configured to use NodeSelectionMode: %s", selectionMode) @@ -93,16 +95,12 @@ func (c *MultiNode[CHAIN_ID, RPC]) ChainID() CHAIN_ID { } func (c *MultiNode[CHAIN_ID, RPC]) DoAll(ctx context.Context, do func(ctx context.Context, rpc RPC, isSendOnly bool)) error { - var err error - ok := c.IfNotStopped(func() { - ctx, _ = c.chStop.Ctx(ctx) - + return c.eng.IfNotStopped(func() error { callsCompleted := 0 for _, n := range c.primaryNodes { select { case <-ctx.Done(): - err = ctx.Err() - return + return ctx.Err() default: if n.State() != nodeStateAlive { continue @@ -111,15 +109,11 @@ func (c *MultiNode[CHAIN_ID, RPC]) DoAll(ctx context.Context, do func(ctx contex callsCompleted++ } } - if callsCompleted == 0 { - err = ErroringNodeError - } for _, n := range c.sendOnlyNodes { select { case <-ctx.Done(): - err = ctx.Err() - return + return ctx.Err() default: if n.State() != nodeStateAlive { continue @@ -127,11 +121,11 @@ func (c *MultiNode[CHAIN_ID, RPC]) DoAll(ctx context.Context, do func(ctx contex do(ctx, n.RPC(), true) } } + if callsCompleted == 0 { + return ErroringNodeError + } + return nil }) - if !ok { - return errors.New("MultiNode is stopped") - } - return err } func (c *MultiNode[CHAIN_ID, RPC]) NodeStates() map[string]string { @@ -149,53 +143,44 @@ func (c *MultiNode[CHAIN_ID, RPC]) NodeStates() map[string]string { // // Nodes handle their own redialing and runloops, so this function does not // return any error if the nodes aren't available -func (c *MultiNode[CHAIN_ID, RPC]) Start(ctx context.Context) error { - return c.StartOnce("MultiNode", func() (merr error) { - if len(c.primaryNodes) == 0 { - return fmt.Errorf("no available nodes for chain %s", c.chainID.String()) +func (c *MultiNode[CHAIN_ID, RPC]) start(ctx context.Context) error { + if len(c.primaryNodes) == 0 { + return fmt.Errorf("no available nodes for chain %s", c.chainID.String()) + } + var ms services.MultiStart + for _, n := range c.primaryNodes { + if n.ConfiguredChainID().String() != c.chainID.String() { + return ms.CloseBecause(fmt.Errorf("node %s has configured chain ID %s which does not match multinode configured chain ID of %s", n.String(), n.ConfiguredChainID().String(), c.chainID.String())) } - var ms services.MultiStart - for _, n := range c.primaryNodes { - if n.ConfiguredChainID().String() != c.chainID.String() { - return ms.CloseBecause(fmt.Errorf("node %s has configured chain ID %s which does not match multinode configured chain ID of %s", n.String(), n.ConfiguredChainID().String(), c.chainID.String())) - } - n.SetPoolChainInfoProvider(c) - // node will handle its own redialing and automatic recovery - if err := ms.Start(ctx, n); err != nil { - return err - } + n.SetPoolChainInfoProvider(c) + // node will handle its own redialing and automatic recovery + if err := ms.Start(ctx, n); err != nil { + return err } - for _, s := range c.sendOnlyNodes { - if s.ConfiguredChainID().String() != c.chainID.String() { - return ms.CloseBecause(fmt.Errorf("sendonly node %s has configured chain ID %s which does not match multinode configured chain ID of %s", s.String(), s.ConfiguredChainID().String(), c.chainID.String())) - } - if err := ms.Start(ctx, s); err != nil { - return err - } + } + for _, s := range c.sendOnlyNodes { + if s.ConfiguredChainID().String() != c.chainID.String() { + return ms.CloseBecause(fmt.Errorf("sendonly node %s has configured chain ID %s which does not match multinode configured chain ID of %s", s.String(), s.ConfiguredChainID().String(), c.chainID.String())) } - c.wg.Add(1) - go c.runLoop() - - if c.leaseDuration.Seconds() > 0 && c.selectionMode != NodeSelectionModeRoundRobin { - c.lggr.Infof("The MultiNode will switch to best node every %s", c.leaseDuration.String()) - c.wg.Add(1) - go c.checkLeaseLoop() - } else { - c.lggr.Info("Best node switching is disabled") + if err := ms.Start(ctx, s); err != nil { + return err } + } + c.eng.Go(c.runLoop) - return nil - }) + if c.leaseDuration.Seconds() > 0 && c.selectionMode != NodeSelectionModeRoundRobin { + c.lggr.Infof("The MultiNode will switch to best node every %s", c.leaseDuration.String()) + c.eng.Go(c.checkLeaseLoop) + } else { + c.lggr.Info("Best node switching is disabled") + } + + return nil } // Close tears down the MultiNode and closes all nodes -func (c *MultiNode[CHAIN_ID, RPC]) Close() error { - return c.StopOnce("MultiNode", func() error { - close(c.chStop) - c.wg.Wait() - - return services.CloseAll(services.MultiCloser(c.primaryNodes), services.MultiCloser(c.sendOnlyNodes)) - }) +func (c *MultiNode[CHAIN_ID, RPC]) close() error { + return services.CloseAll(services.MultiCloser(c.primaryNodes), services.MultiCloser(c.sendOnlyNodes)) } // SelectRPC returns an RPC of an active node. If there are no active nodes it returns an error. @@ -233,8 +218,7 @@ func (c *MultiNode[CHAIN_ID, RPC]) selectNode() (node Node[CHAIN_ID, RPC], err e c.activeNode = c.nodeSelector.Select() if c.activeNode == nil { c.lggr.Criticalw("No live RPC nodes available", "NodeSelectionMode", c.nodeSelector.Name()) - errmsg := fmt.Errorf("no live nodes available for chain %s", c.chainID.String()) - c.SvcErrBuffer.Append(errmsg) + c.eng.EmitHealthErr(fmt.Errorf("no live nodes available for chain %s", c.chainID.String())) return nil, ErroringNodeError } @@ -296,8 +280,7 @@ func (c *MultiNode[CHAIN_ID, RPC]) checkLease() { } } -func (c *MultiNode[CHAIN_ID, RPC]) checkLeaseLoop() { - defer c.wg.Done() +func (c *MultiNode[CHAIN_ID, RPC]) checkLeaseLoop(ctx context.Context) { c.leaseTicker = time.NewTicker(c.leaseDuration) defer c.leaseTicker.Stop() @@ -305,15 +288,13 @@ func (c *MultiNode[CHAIN_ID, RPC]) checkLeaseLoop() { select { case <-c.leaseTicker.C: c.checkLease() - case <-c.chStop: + case <-ctx.Done(): return } } } -func (c *MultiNode[CHAIN_ID, RPC]) runLoop() { - defer c.wg.Done() - +func (c *MultiNode[CHAIN_ID, RPC]) runLoop(ctx context.Context) { nodeStates := make([]nodeWithState, len(c.primaryNodes)) for i, n := range c.primaryNodes { nodeStates[i] = nodeWithState{ @@ -332,7 +313,7 @@ func (c *MultiNode[CHAIN_ID, RPC]) runLoop() { select { case <-monitor.C: c.report(nodeStates) - case <-c.chStop: + case <-ctx.Done(): return } } @@ -376,7 +357,7 @@ func (c *MultiNode[CHAIN_ID, RPC]) report(nodesStateInfo []nodeWithState) { if total == dead { rerr := fmt.Errorf("no primary nodes available: 0/%d nodes are alive", total) c.lggr.Criticalw(rerr.Error(), "nodeStates", nodesStateInfo) - c.SvcErrBuffer.Append(rerr) + c.eng.EmitHealthErr(rerr) } else if dead > 0 { c.lggr.Errorw(fmt.Sprintf("At least one primary node is dead: %d/%d nodes are alive", live, total), "nodeStates", nodesStateInfo) } diff --git a/common/client/multi_node_test.go b/common/client/multi_node_test.go index 57b849a3c0a..f8fdd4261b2 100644 --- a/common/client/multi_node_test.go +++ b/common/client/multi_node_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" + "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -76,7 +77,7 @@ func TestMultiNode_Dial(t *testing.T) { chainID: types.RandomID(), }) err := mn.Start(tests.Context(t)) - assert.EqualError(t, err, fmt.Sprintf("no available nodes for chain %s", mn.chainID.String())) + assert.ErrorContains(t, err, fmt.Sprintf("no available nodes for chain %s", mn.chainID)) }) t.Run("Fails with wrong node's chainID", func(t *testing.T) { t.Parallel() @@ -92,7 +93,7 @@ func TestMultiNode_Dial(t *testing.T) { nodes: []Node[types.ID, multiNodeRPCClient]{node}, }) err := mn.Start(tests.Context(t)) - assert.EqualError(t, err, fmt.Sprintf("node %s has configured chain ID %s which does not match multinode configured chain ID of %s", nodeName, nodeChainID, mn.chainID)) + assert.ErrorContains(t, err, fmt.Sprintf("node %s has configured chain ID %s which does not match multinode configured chain ID of %s", nodeName, nodeChainID, mn.chainID)) }) t.Run("Fails if node fails", func(t *testing.T) { t.Parallel() @@ -108,7 +109,7 @@ func TestMultiNode_Dial(t *testing.T) { nodes: []Node[types.ID, multiNodeRPCClient]{node}, }) err := mn.Start(tests.Context(t)) - assert.EqualError(t, err, expectedError.Error()) + assert.ErrorIs(t, err, expectedError) }) t.Run("Closes started nodes on failure", func(t *testing.T) { @@ -127,7 +128,7 @@ func TestMultiNode_Dial(t *testing.T) { nodes: []Node[types.ID, multiNodeRPCClient]{node1, node2}, }) err := mn.Start(tests.Context(t)) - assert.EqualError(t, err, expectedError.Error()) + assert.ErrorIs(t, err, expectedError) }) t.Run("Fails with wrong send only node's chainID", func(t *testing.T) { t.Parallel() @@ -146,7 +147,7 @@ func TestMultiNode_Dial(t *testing.T) { sendonlys: []SendOnlyNode[types.ID, multiNodeRPCClient]{sendOnly}, }) err := mn.Start(tests.Context(t)) - assert.EqualError(t, err, fmt.Sprintf("sendonly node %s has configured chain ID %s which does not match multinode configured chain ID of %s", sendOnlyName, sendOnlyChainID, mn.chainID)) + assert.ErrorContains(t, err, fmt.Sprintf("sendonly node %s has configured chain ID %s which does not match multinode configured chain ID of %s", sendOnlyName, sendOnlyChainID, mn.chainID)) }) newHealthySendOnly := func(t *testing.T, chainID types.ID) *mockSendOnlyNode[types.ID, multiNodeRPCClient] { @@ -173,7 +174,7 @@ func TestMultiNode_Dial(t *testing.T) { sendonlys: []SendOnlyNode[types.ID, multiNodeRPCClient]{sendOnly1, sendOnly2}, }) err := mn.Start(tests.Context(t)) - assert.EqualError(t, err, expectedError.Error()) + assert.ErrorIs(t, err, expectedError) }) t.Run("Starts successfully with healthy nodes", func(t *testing.T) { t.Parallel() @@ -185,9 +186,7 @@ func TestMultiNode_Dial(t *testing.T) { nodes: []Node[types.ID, multiNodeRPCClient]{node}, sendonlys: []SendOnlyNode[types.ID, multiNodeRPCClient]{newHealthySendOnly(t, chainID)}, }) - defer func() { assert.NoError(t, mn.Close()) }() - err := mn.Start(tests.Context(t)) - require.NoError(t, err) + servicetest.Run(t, mn) selectedNode, err := mn.selectNode() require.NoError(t, err) assert.Equal(t, node, selectedNode) @@ -210,9 +209,7 @@ func TestMultiNode_Report(t *testing.T) { }) mn.reportInterval = tests.TestInterval mn.deathDeclarationDelay = tests.TestInterval - defer func() { assert.NoError(t, mn.Close()) }() - err := mn.Start(tests.Context(t)) - require.NoError(t, err) + servicetest.Run(t, mn) tests.AssertLogCountEventually(t, observedLogs, "At least one primary node is dead: 1/2 nodes are alive", 2) }) t.Run("Report critical error on all node failure", func(t *testing.T) { @@ -228,11 +225,9 @@ func TestMultiNode_Report(t *testing.T) { }) mn.reportInterval = tests.TestInterval mn.deathDeclarationDelay = tests.TestInterval - defer func() { assert.NoError(t, mn.Close()) }() - err := mn.Start(tests.Context(t)) - require.NoError(t, err) + servicetest.Run(t, mn) tests.AssertLogCountEventually(t, observedLogs, "no primary nodes available: 0/1 nodes are alive", 2) - err = mn.Healthy() + err := mn.HealthReport()["MultiNode"] require.Error(t, err) assert.Contains(t, err.Error(), "no primary nodes available: 0/1 nodes are alive") }) @@ -251,9 +246,7 @@ func TestMultiNode_CheckLease(t *testing.T) { logger: lggr, nodes: []Node[types.ID, multiNodeRPCClient]{node}, }) - defer func() { assert.NoError(t, mn.Close()) }() - err := mn.Start(tests.Context(t)) - require.NoError(t, err) + servicetest.Run(t, mn) tests.RequireLogMessage(t, observedLogs, "Best node switching is disabled") }) t.Run("Misconfigured lease check period won't start", func(t *testing.T) { @@ -268,9 +261,7 @@ func TestMultiNode_CheckLease(t *testing.T) { nodes: []Node[types.ID, multiNodeRPCClient]{node}, leaseDuration: 0, }) - defer func() { assert.NoError(t, mn.Close()) }() - err := mn.Start(tests.Context(t)) - require.NoError(t, err) + servicetest.Run(t, mn) tests.RequireLogMessage(t, observedLogs, "Best node switching is disabled") }) t.Run("Lease check updates active node", func(t *testing.T) { @@ -289,10 +280,8 @@ func TestMultiNode_CheckLease(t *testing.T) { nodes: []Node[types.ID, multiNodeRPCClient]{node, bestNode}, leaseDuration: tests.TestInterval, }) - defer func() { assert.NoError(t, mn.Close()) }() mn.nodeSelector = nodeSelector - err := mn.Start(tests.Context(t)) - require.NoError(t, err) + servicetest.Run(t, mn) tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("Switching to best node from %q to %q", node.String(), bestNode.String())) tests.AssertEventually(t, func() bool { mn.activeMu.RLock() diff --git a/common/client/node_lifecycle_test.go b/common/client/node_lifecycle_test.go index 6f9b4653393..e510e0a308a 100644 --- a/common/client/node_lifecycle_test.go +++ b/common/client/node_lifecycle_test.go @@ -395,7 +395,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { rpc.On("SubscribeToHeads", mock.Anything).Return(make(<-chan Head), sub, nil).Once() expectedError := errors.New("failed to subscribe to finalized heads") rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return(nil, sub, expectedError).Once() - lggr, _ := logger.TestObserved(t, zap.DebugLevel) + lggr := logger.Test(t) node := newDialedNode(t, testNodeOpts{ config: testNodeConfig{ finalizedBlockPollInterval: tests.TestInterval, diff --git a/common/client/transaction_sender_test.go b/common/client/transaction_sender_test.go index 5517a0c8dda..3844a4536ff 100644 --- a/common/client/transaction_sender_test.go +++ b/common/client/transaction_sender_test.go @@ -12,6 +12,7 @@ import ( "go.uber.org/zap" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink/v2/common/types" ) @@ -38,31 +39,17 @@ func (rpc *sendTxRPC) SendTransaction(ctx context.Context, _ any) error { return rpc.sendTxErr } +// newTestTransactionSender returns a sendTxMultiNode and TransactionSender. +// Only the TransactionSender is run via Start/Close. func newTestTransactionSender(t *testing.T, chainID types.ID, lggr logger.Logger, nodes []Node[types.ID, SendTxRPCClient[any]], sendOnlyNodes []SendOnlyNode[types.ID, SendTxRPCClient[any]], ) (*sendTxMultiNode, *TransactionSender[any, types.ID, SendTxRPCClient[any]]) { mn := sendTxMultiNode{NewMultiNode[types.ID, SendTxRPCClient[any]]( lggr, NodeSelectionModeRoundRobin, 0, nodes, sendOnlyNodes, chainID, "chainFamily", 0)} - err := mn.StartOnce("startedTestMultiNode", func() error { return nil }) - require.NoError(t, err) txSender := NewTransactionSender[any, types.ID, SendTxRPCClient[any]](lggr, chainID, mn.chainFamily, mn.MultiNode, classifySendTxError, tests.TestInterval) - err = txSender.Start(tests.Context(t)) - require.NoError(t, err) - - t.Cleanup(func() { - err := mn.Close() - if err != nil { - // Allow MultiNode to be closed early for testing - require.EqualError(t, err, "MultiNode has already been stopped: already stopped") - } - err = txSender.Close() - if err != nil { - // Allow TransactionSender to be closed early for testing - require.EqualError(t, err, "TransactionSender has already been stopped: already stopped") - } - }) + servicetest.Run(t, txSender) return &mn, txSender } @@ -82,7 +69,9 @@ func TestTransactionSender_SendTransaction(t *testing.T) { node.On("String").Return("node name").Maybe() node.On("RPC").Return(rpc).Maybe() node.On("State").Return(state).Maybe() - node.On("Close").Return(nil).Once() + node.On("Start", mock.Anything).Return(nil).Maybe() + node.On("Close").Return(nil).Maybe() + node.On("SetPoolChainInfoProvider", mock.Anything).Return(nil).Maybe() return node } @@ -91,10 +80,10 @@ func TestTransactionSender_SendTransaction(t *testing.T) { } t.Run("Fails if there is no nodes available", func(t *testing.T) { - lggr, _ := logger.TestObserved(t, zap.DebugLevel) + lggr := logger.Test(t) _, txSender := newTestTransactionSender(t, types.RandomID(), lggr, nil, nil) _, err := txSender.SendTransaction(tests.Context(t), nil) - assert.EqualError(t, err, ErroringNodeError.Error()) + assert.ErrorIs(t, err, ErroringNodeError) }) t.Run("Transaction failure happy path", func(t *testing.T) { @@ -137,7 +126,7 @@ func TestTransactionSender_SendTransaction(t *testing.T) { <-testContext.Done() }) - lggr, _ := logger.TestObserved(t, zap.DebugLevel) + lggr := logger.Test(t) _, txSender := newTestTransactionSender(t, types.RandomID(), lggr, []Node[types.ID, SendTxRPCClient[any]]{mainNode}, nil) @@ -161,11 +150,11 @@ func TestTransactionSender_SendTransaction(t *testing.T) { <-testContext.Done() }) - lggr, _ := logger.TestObserved(t, zap.DebugLevel) + lggr := logger.Test(t) _, txSender := newTestTransactionSender(t, chainID, lggr, []Node[types.ID, SendTxRPCClient[any]]{fastNode, slowNode}, nil) _, sendErr := txSender.SendTransaction(tests.Context(t), nil) - require.EqualError(t, sendErr, expectedError.Error()) + require.ErrorIs(t, sendErr, expectedError) }) t.Run("Returns success without waiting for the rest of the nodes", func(t *testing.T) { chainID := types.RandomID() @@ -181,19 +170,19 @@ func TestTransactionSender_SendTransaction(t *testing.T) { // block caller til end of the test <-testContext.Done() }) - lggr, _ := logger.TestObserved(t, zap.WarnLevel) - mn, txSender := newTestTransactionSender(t, chainID, lggr, + lggr := logger.Test(t) + _, txSender := newTestTransactionSender(t, chainID, lggr, []Node[types.ID, SendTxRPCClient[any]]{fastNode, slowNode}, []SendOnlyNode[types.ID, SendTxRPCClient[any]]{slowSendOnly}) rtnCode, err := txSender.SendTransaction(tests.Context(t), nil) require.NoError(t, err) require.Equal(t, Successful, rtnCode) - require.NoError(t, mn.Close()) }) t.Run("Fails when multinode is closed", func(t *testing.T) { chainID := types.RandomID() fastNode := newNode(t, nil, nil) + fastNode.On("ConfiguredChainID").Return(chainID).Maybe() // hold reply from the node till end of the test testContext, testCancel := context.WithCancel(tests.Context(t)) defer testCancel() @@ -201,20 +190,21 @@ func TestTransactionSender_SendTransaction(t *testing.T) { // block caller til end of the test <-testContext.Done() }) + slowNode.On("ConfiguredChainID").Return(chainID).Maybe() slowSendOnly := newNode(t, errors.New("send only failed"), func(_ mock.Arguments) { // block caller til end of the test <-testContext.Done() }) + slowSendOnly.On("ConfiguredChainID").Return(chainID).Maybe() - lggr, _ := logger.TestObserved(t, zap.DebugLevel) - - mn, txSender := newTestTransactionSender(t, chainID, lggr, + mn, txSender := newTestTransactionSender(t, chainID, logger.Test(t), []Node[types.ID, SendTxRPCClient[any]]{fastNode, slowNode}, []SendOnlyNode[types.ID, SendTxRPCClient[any]]{slowSendOnly}) + require.NoError(t, mn.Start(tests.Context(t))) require.NoError(t, mn.Close()) _, err := txSender.SendTransaction(tests.Context(t), nil) - require.EqualError(t, err, "MultiNode is stopped") + require.ErrorContains(t, err, "service is stopped") }) t.Run("Fails when closed", func(t *testing.T) { chainID := types.RandomID() @@ -231,22 +221,24 @@ func TestTransactionSender_SendTransaction(t *testing.T) { <-testContext.Done() }) - lggr, _ := logger.TestObserved(t, zap.DebugLevel) + var txSender *TransactionSender[any, types.ID, SendTxRPCClient[any]] - _, txSender := newTestTransactionSender(t, chainID, lggr, + t.Cleanup(func() { // after txSender.Close() + _, err := txSender.SendTransaction(tests.Context(t), nil) + assert.EqualError(t, err, "TransactionSender not started") + }) + + _, txSender = newTestTransactionSender(t, chainID, logger.Test(t), []Node[types.ID, SendTxRPCClient[any]]{fastNode, slowNode}, []SendOnlyNode[types.ID, SendTxRPCClient[any]]{slowSendOnly}) - require.NoError(t, txSender.Close()) - _, err := txSender.SendTransaction(tests.Context(t), nil) - require.EqualError(t, err, "TransactionSender not started") }) t.Run("Returns error if there is no healthy primary nodes", func(t *testing.T) { chainID := types.RandomID() primary := newNodeWithState(t, nodeStateUnreachable, nil, nil) sendOnly := newNodeWithState(t, nodeStateUnreachable, nil, nil) - lggr, _ := logger.TestObserved(t, zap.DebugLevel) + lggr := logger.Test(t) _, txSender := newTestTransactionSender(t, chainID, lggr, []Node[types.ID, SendTxRPCClient[any]]{primary}, @@ -265,7 +257,7 @@ func TestTransactionSender_SendTransaction(t *testing.T) { unhealthyNode := newNodeWithState(t, nodeStateUnreachable, nil, unexpectedCall) unhealthySendOnlyNode := newNodeWithState(t, nodeStateUnreachable, nil, unexpectedCall) - lggr, _ := logger.TestObserved(t, zap.DebugLevel) + lggr := logger.Test(t) _, txSender := newTestTransactionSender(t, chainID, lggr, []Node[types.ID, SendTxRPCClient[any]]{mainNode, unhealthyNode}, diff --git a/core/chains/evm/client/rpc_client_test.go b/core/chains/evm/client/rpc_client_test.go index 51ef65c5348..c8edc7c557c 100644 --- a/core/chains/evm/client/rpc_client_test.go +++ b/core/chains/evm/client/rpc_client_test.go @@ -86,7 +86,7 @@ func TestRPCClient_SubscribeToHeads(t *testing.T) { t.Run("WS and HTTP URL cannot be both empty", func(t *testing.T) { // ws is optional when LogBroadcaster is disabled, however SubscribeFilterLogs will return error if ws is missing - observedLggr, _ := logger.TestObserved(t, zap.DebugLevel) + observedLggr := logger.Test(t) rpcClient := client.NewRPCClient(nodePoolCfgHeadPolling, observedLggr, nil, nil, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") require.Equal(t, errors.New("cannot dial rpc client when both ws and http info are missing"), rpcClient.Dial(ctx)) }) @@ -339,7 +339,7 @@ func TestRPCClient_SubscribeFilterLogs(t *testing.T) { defer cancel() t.Run("Failed SubscribeFilterLogs when WSURL is empty", func(t *testing.T) { // ws is optional when LogBroadcaster is disabled, however SubscribeFilterLogs will return error if ws is missing - observedLggr, _ := logger.TestObserved(t, zap.DebugLevel) + observedLggr := logger.Test(t) rpcClient := client.NewRPCClient(nodePoolCfg, observedLggr, nil, &url.URL{}, "rpc", 1, chainId, commonclient.Primary, commonclient.QueryTimeout, commonclient.QueryTimeout, "") require.Nil(t, rpcClient.Dial(ctx)) diff --git a/core/chains/evm/gas/block_history_estimator_test.go b/core/chains/evm/gas/block_history_estimator_test.go index bf4c0eb4eef..e3df261f2cf 100644 --- a/core/chains/evm/gas/block_history_estimator_test.go +++ b/core/chains/evm/gas/block_history_estimator_test.go @@ -16,7 +16,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "go.uber.org/zap/zapcore" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" @@ -2179,7 +2178,7 @@ func TestBlockHistoryEstimator_HaltBumping(t *testing.T) { bhCfg := newBlockHistoryConfig() bhCfg.CheckInclusionBlocksF = uint16(4) bhCfg.CheckInclusionPercentileF = uint16(90) - lggr, _ := logger.TestObserved(t, zapcore.DebugLevel) + lggr := logger.Test(t) geCfg := &gas.MockGasEstimatorConfig{} geCfg.EIP1559DynamicFeesF = false geCfg.PriceMinF = assets.NewWeiI(1) diff --git a/core/chains/evm/logpoller/observability_test.go b/core/chains/evm/logpoller/observability_test.go index b34c16c0a98..6ebc5b0cce0 100644 --- a/core/chains/evm/logpoller/observability_test.go +++ b/core/chains/evm/logpoller/observability_test.go @@ -8,12 +8,10 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" io_prometheus_client "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap/zapcore" - - "github.com/prometheus/client_golang/prometheus/testutil" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -171,7 +169,7 @@ func generateRandomLogs(chainId, count int) []Log { } func createObservedORM(t *testing.T, chainId int64) *ObservedORM { - lggr, _ := logger.TestObserved(t, zapcore.ErrorLevel) + lggr := logger.Test(t) db := pgtest.NewSqlxDB(t) return NewObservedORM(big.NewInt(chainId), db, lggr) }