diff --git a/lib/sentinel/README.md b/lib/sentinel/README.md deleted file mode 100644 index d63ced760..000000000 --- a/lib/sentinel/README.md +++ /dev/null @@ -1,153 +0,0 @@ -# **Sentinel** - -## **Overview** - -The **Sentinel** is a modular and extensible system for monitoring blockchain events and managing event subscriptions. Sentinel orchestrates multiple `ChainPollerService` instances, each tied to a specific blockchain network, to fetch logs, process events, and notify subscribers. - ---- - -## **Key Features** - -- **Multi-Chain Support**: Manage multiple blockchain networks (e.g., Ethereum, Polygon, Arbitrum) concurrently. -- **Event Broadcasting**: Relay blockchain events to subscribers via a thread-safe subscription system. -- **Flexible Subscriptions**: Dynamically subscribe and unsubscribe to events based on addresses and topics. -- **Graceful Lifecycle Management**: Start, stop, and clean up resources across services effortlessly. - ---- - -## **System Architecture** - -### **How Components Interact** - -``` - +----------------+ - | Sentinel | - | (Coordinator) | - +----------------+ - | - +---------------------------------+--------------------------------+ - | | | -+------------+ +-------------------+ +-------------------+ -| Chain ID 1 | | Chain ID 2 | | Chain ID 3 | -| (Ethereum) | | (Polygon) | | (Arbitrum) | -+------------+ +-------------------+ +-------------------+ - | | | -+----------------+ +-------------------+ +-------------------+ -| ChainPollerSvc | | ChainPollerSvc | | ChainPollerSvc | -| (Service 1) | | (Service 2) | | (Service 3) | -+----------------+ +-------------------+ +-------------------+ - | | | -+----------------+ +-------------------+ +-------------------+ -| ChainPoller | | ChainPoller | | ChainPoller | -| (Log Fetching) | | (Log Fetching) | | (Log Fetching) | -+----------------+ +-------------------+ +-------------------+ - | | | -+----------------+ +-------------------+ +-------------------+ -| Subscription | | Subscription | | Subscription | -| Manager | | Manager | | Manager | -+----------------+ +-------------------+ +-------------------+ - | | | -+----------------+ +-------------------+ +-------------------+ -| Blockchain | | Blockchain | | Blockchain | -| (Ethereum) | | (Polygon) | | (Arbitrum) | -+----------------+ +-------------------+ +-------------------+ - -``` - -### **Core Components** -1. **Sentinel**: - - Central coordinator managing multiple `ChainPollerService` instances. - - Handles multi-chain subscriptions, lifecycle management, and configuration. - -2. **ChainPollerService**: - - Polls blockchain logs and broadcasts events to subscribers. - - Integrates `ChainPoller` and `SubscriptionManager`. - -3. **ChainPoller**: - - Fetches logs from blockchain networks based on filter queries. - -4. **SubscriptionManager**: - - Tracks subscriptions to blockchain events. - - Broadcasts logs to subscribers. - ---- - -## **Usage** - -### **Initialize Sentinel** -Set up a `Sentinel` instance: -```go -import ( - "github.com/smartcontractkit/chainlink-testing-framework/sentinel" - "github.com/smartcontractkit/chainlink-testing-framework/sentinel/chain_poller_service" -) - -logger := internal.NewDefaultLogger() -sentinelInstance := sentinel.NewSentinel(sentinel.SentinelConfig{ - Logger: logger, -}) -``` - -### **Add a Chain** -Add a blockchain to monitor: -```go -config := chain_poller_service.ChainPollerServiceConfig{ - PollInterval: 100 * time.Millisecond, - ChainPoller: chainPoller, - Logger: logger, - BlockchainClient: client, - ChainID: 1, -} - -err := sentinelInstance.AddChain(config) -if err != nil { - panic("Failed to add chain: " + err.Error()) -} -``` - -### **Subscribe to Events** -Subscribe to blockchain events: -```go -address := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678") -topic := common.HexToHash("0xabcdefabcdefabcdefabcdefabcdefabcdefabcdef") - -logCh, err := sentinelInstance.Subscribe(1, address, topic) -if err != nil { - panic("Failed to subscribe: " + err.Error()) -} - -// Listen for logs -go func() { - for log := range logCh { - fmt.Println("Received log:", log) - } -}() -``` - -### **Unsubscribe** -Unsubscribe from events: -```go -err = sentinelInstance.Unsubscribe(1, address, topic, logCh) -if err != nil { - panic("Failed to unsubscribe: " + err.Error()) -} -``` - -### **Remove a Chain** -Remove a blockchain from monitoring: -```go -err = sentinelInstance.RemoveChain(1) -if err != nil { - panic("Failed to remove chain: " + err.Error()) -} -``` - ---- - -## **Testing** - -### **Run Tests** -Run tests for Sentinel and its components: -```bash -go test ./sentinel -``` \ No newline at end of file diff --git a/lib/sentinel/chain_poller/README.md b/lib/sentinel/chain_poller/README.md deleted file mode 100644 index b3d6943bc..000000000 --- a/lib/sentinel/chain_poller/README.md +++ /dev/null @@ -1,73 +0,0 @@ -# **Chain Poller** - -## **Overview** - -The **Chain Poller** is a lightweight utility in the [Sentinel](https://github.com/smartcontractkit/sentinel/lib) framework designed to fetch blockchain logs based on filter queries. It serves as a bridge between the blockchain client and higher-level services like the `ChainPollerService`. - ---- - -## **Features** - -- **Flexible Queries**: Fetch logs based on block ranges, addresses, and topics. -- **Error Logging**: Captures errors during polling for troubleshooting. - ---- - -## **Usage** - -### **Initialization** -Create a new `ChainPoller` with the required configuration: -```go -config := chain_poller.ChainPollerConfig{ - BlockchainClient: client, - Logger: logger, - ChainID: 1, -} - -chainPoller, err := chain_poller.NewChainPoller(config) -if err != nil { - panic("Failed to initialize Chain Poller: " + err.Error()) -} -``` - -### **Polling** -Fetch logs using filter queries: -```go -queries := []internal.FilterQuery{ - { - FromBlock: 100, - ToBlock: 200, - Addresses: []common.Address{ - common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678"), - }, - Topics: [][]common.Hash{ - {common.HexToHash("0xabcdefabcdefabcdefabcdefabcdefabcdefabcdef")}, - }, - }, -} - -logs, err := chainPoller.Poll(context.Background(), queries) -if err != nil { - logger.Error("Failed to fetch logs", "error", err) -} -fmt.Println("Fetched logs:", logs) -``` - ---- - -## **API Reference** - -### **`NewChainPoller(config ChainPollerConfig) (*ChainPoller, error)`** -- Initializes a new Chain Poller with the specified blockchain client and logger. - -### **`Poll(ctx context.Context, filterQueries []FilterQuery) ([]Log, error)`** -- Fetches logs from the blockchain based on the given filter queries. - ---- - -## **Testing** - -Run the tests: -```bash -go test ./chain_poller -``` \ No newline at end of file diff --git a/lib/sentinel/chain_poller/chain_poller.go b/lib/sentinel/chain_poller/chain_poller.go deleted file mode 100644 index 116d8afa2..000000000 --- a/lib/sentinel/chain_poller/chain_poller.go +++ /dev/null @@ -1,53 +0,0 @@ -// File: chain_poller/chain_poller.go -package chain_poller - -import ( - "context" - "errors" - - "github.com/smartcontractkit/chainlink-testing-framework/lib/sentinel/internal" -) - -// ChainPollerConfig holds the configuration for the ChainPoller. -type ChainPollerConfig struct { - BlockchainClient internal.BlockchainClient - Logger internal.Logger - ChainID int64 -} - -// ChainPoller is responsible for polling logs from the blockchain. -type ChainPoller struct { - Config ChainPollerConfig -} - -// NewChainPoller initializes a new ChainPoller. -func NewChainPoller(cfg ChainPollerConfig) (*ChainPoller, error) { - if cfg.BlockchainClient == nil { - return nil, errors.New("blockchain client cannot be nil") - } - if cfg.Logger == nil { - return nil, errors.New("logger cannot be nil") - } - - return &ChainPoller{ - Config: cfg, - }, nil -} - -// Poll fetches logs from the blockchain based on the provided filter queries. -func (cp *ChainPoller) Poll(ctx context.Context, filterQueries []internal.FilterQuery) ([]internal.Log, error) { - var allLogs []internal.Log - - for _, query := range filterQueries { - logs, err := cp.Config.BlockchainClient.FilterLogs(ctx, query) - if err != nil { - cp.Config.Logger.Error("Failed to filter logs", "error", err, "query", query) - continue - } - allLogs = append(allLogs, logs...) - } - - return allLogs, nil -} - -var _ ChainPollerInterface = (*ChainPoller)(nil) diff --git a/lib/sentinel/chain_poller/chain_poller_test.go b/lib/sentinel/chain_poller/chain_poller_test.go deleted file mode 100644 index 4906e9fdf..000000000 --- a/lib/sentinel/chain_poller/chain_poller_test.go +++ /dev/null @@ -1,287 +0,0 @@ -// File: chain_poller/chain_poller_test.go -package chain_poller_test - -import ( - "context" - "errors" - "testing" - - "github.com/ethereum/go-ethereum/common" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - - "github.com/smartcontractkit/chainlink-testing-framework/lib/sentinel/chain_poller" - "github.com/smartcontractkit/chainlink-testing-framework/lib/sentinel/internal" -) - -func TestNewChainPoller_Success(t *testing.T) { - mockClient := new(internal.MockBlockchainClient) - mockLogger := internal.NewMockLogger() - - config := chain_poller.ChainPollerConfig{ - BlockchainClient: mockClient, - Logger: mockLogger, - ChainID: 1, - } - - chainPoller, err := chain_poller.NewChainPoller(config) - require.NoError(t, err) - require.NotNil(t, chainPoller) -} - -func TestNewChainPoller_NilBlockchainClient(t *testing.T) { - mockLogger := internal.NewMockLogger() - - config := chain_poller.ChainPollerConfig{ - BlockchainClient: nil, - Logger: mockLogger, - ChainID: 1, - } - - chainPoller, err := chain_poller.NewChainPoller(config) - require.Error(t, err) - assert.Nil(t, chainPoller) - assert.Equal(t, "blockchain client cannot be nil", err.Error()) -} - -func TestNewChainPoller_NilLogger(t *testing.T) { - mockClient := new(internal.MockBlockchainClient) - - config := chain_poller.ChainPollerConfig{ - BlockchainClient: mockClient, - Logger: nil, - ChainID: 1, - } - - chainPoller, err := chain_poller.NewChainPoller(config) - require.Error(t, err) - assert.Nil(t, chainPoller) - assert.Equal(t, "logger cannot be nil", err.Error()) -} - -func TestChainPoller_Poll_SingleFilterQueryWithLogs(t *testing.T) { - mockClient := new(internal.MockBlockchainClient) - mockLogger := internal.NewMockLogger() - - config := chain_poller.ChainPollerConfig{ - BlockchainClient: mockClient, - Logger: mockLogger, - ChainID: 1, - } - - chainPoller, err := chain_poller.NewChainPoller(config) - require.NoError(t, err) - require.NotNil(t, chainPoller) - - // Define a filter query - filterQuery := internal.FilterQuery{ - FromBlock: 101, - ToBlock: 110, - Addresses: []common.Address{ - common.HexToAddress("0xabcdefabcdefabcdefabcdefabcdefabcdefabcd"), - }, - Topics: [][]common.Hash{ - { - common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"), - }, - }, - } - - // Define mock logs to return - testLogs := []internal.Log{ - { - BlockNumber: 105, - TxHash: common.HexToHash("0x1234"), - Address: common.HexToAddress("0xabcdefabcdefabcdefabcdefabcdefabcdefabcd"), - Topics: []common.Hash{common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef")}, - Data: []byte("test data 1"), - Index: 0, - }, - { - BlockNumber: 107, - TxHash: common.HexToHash("0x5678"), - Address: common.HexToAddress("0xabcdefabcdefabcdefabcdefabcdefabcdefabcd"), - Topics: []common.Hash{common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef")}, - Data: []byte("test data 2"), - Index: 1, - }, - } - - mockClient.On("FilterLogs", mock.Anything, filterQuery).Return(testLogs, nil) - - // Perform polling - logs, err := chainPoller.Poll(context.Background(), []internal.FilterQuery{filterQuery}) - require.NoError(t, err) - require.Len(t, logs, 2) - - assert.Equal(t, testLogs, logs) - - // Verify that FilterLogs was called with expected query - mockClient.AssertCalled(t, "FilterLogs", mock.Anything, filterQuery) -} - -func TestChainPoller_Poll_MultipleFilterQueries(t *testing.T) { - mockClient := new(internal.MockBlockchainClient) - mockLogger := internal.NewMockLogger() - - config := chain_poller.ChainPollerConfig{ - BlockchainClient: mockClient, - Logger: mockLogger, - ChainID: 1, - } - - chainPoller, err := chain_poller.NewChainPoller(config) - require.NoError(t, err) - require.NotNil(t, chainPoller) - - // Define multiple filter queries - filterQuery1 := internal.FilterQuery{ - FromBlock: 101, - ToBlock: 110, - Addresses: []common.Address{ - common.HexToAddress("0xabcdefabcdefabcdefabcdefabcdefabcdefabcd"), - }, - Topics: [][]common.Hash{ - { - common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"), - }, - }, - } - - filterQuery2 := internal.FilterQuery{ - FromBlock: 101, - ToBlock: 110, - Addresses: []common.Address{ - common.HexToAddress("0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"), - }, - Topics: [][]common.Hash{ - { - common.HexToHash("0xfeedfacefeedfacefeedfacefeedfacefeedfacefeedfacefeedfacefeedface"), - }, - }, - } - - // Define mock logs for filterQuery1 - testLogs1 := []internal.Log{ - { - BlockNumber: 103, - TxHash: common.HexToHash("0x1111"), - Address: common.HexToAddress("0xabcdefabcdefabcdefabcdefabcdefabcdefabcd"), - Topics: []common.Hash{common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef")}, - Data: []byte("test data 1"), - Index: 0, - }, - } - - // Define mock logs for filterQuery2 - testLogs2 := []internal.Log{ - { - BlockNumber: 104, - TxHash: common.HexToHash("0x2222"), - Address: common.HexToAddress("0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"), - Topics: []common.Hash{common.HexToHash("0xfeedfacefeedfacefeedfacefeedfacefeedfacefeedfacefeedfacefeedface")}, - Data: []byte("test data 2"), - Index: 1, - }, - } - - mockClient.On("FilterLogs", mock.Anything, filterQuery1).Return(testLogs1, nil) - mockClient.On("FilterLogs", mock.Anything, filterQuery2).Return(testLogs2, nil) - - // Perform polling - logs, err := chainPoller.Poll(context.Background(), []internal.FilterQuery{filterQuery1, filterQuery2}) - require.NoError(t, err) - require.Len(t, logs, 2) - - expectedLogs := append(testLogs1, testLogs2...) - assert.Equal(t, expectedLogs, logs) - - // Verify that FilterLogs was called with both queries - mockClient.AssertCalled(t, "FilterLogs", mock.Anything, filterQuery1) - mockClient.AssertCalled(t, "FilterLogs", mock.Anything, filterQuery2) -} - -func TestChainPoller_Poll_NoLogs(t *testing.T) { - mockClient := new(internal.MockBlockchainClient) - mockLogger := internal.NewMockLogger() - - config := chain_poller.ChainPollerConfig{ - BlockchainClient: mockClient, - Logger: mockLogger, - ChainID: 1, - } - - chainPoller, err := chain_poller.NewChainPoller(config) - require.NoError(t, err) - require.NotNil(t, chainPoller) - - // Define a filter query with no matching logs - filterQuery := internal.FilterQuery{ - FromBlock: 101, - ToBlock: 110, - Addresses: []common.Address{ - common.HexToAddress("0xabcdefabcdefabcdefabcdefabcdefabcdefabcd"), - }, - Topics: [][]common.Hash{ - { - common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000"), - }, - }, - } - - // Mock FilterLogs to return no logs - mockClient.On("FilterLogs", mock.Anything, filterQuery).Return([]internal.Log{}, nil) - - // Perform polling - logs, err := chainPoller.Poll(context.Background(), []internal.FilterQuery{filterQuery}) - require.NoError(t, err) - require.Len(t, logs, 0) - - // Verify that FilterLogs was called with expected query - mockClient.AssertCalled(t, "FilterLogs", mock.Anything, filterQuery) -} - -func TestChainPoller_Poll_FilterLogsError(t *testing.T) { - mockClient := new(internal.MockBlockchainClient) - mockLogger := internal.NewMockLogger() - - config := chain_poller.ChainPollerConfig{ - BlockchainClient: mockClient, - Logger: mockLogger, - ChainID: 1, - } - - chainPoller, err := chain_poller.NewChainPoller(config) - require.NoError(t, err) - require.NotNil(t, chainPoller) - - // Define a filter query - filterQuery := internal.FilterQuery{ - FromBlock: 101, - ToBlock: 110, - Addresses: []common.Address{ - common.HexToAddress("0xabcdefabcdefabcdefabcdefabcdefabcdefabcd"), - }, - Topics: [][]common.Hash{ - { - common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"), - }, - }, - } - - // Mock FilterLogs to return an error - mockClient.On("FilterLogs", mock.Anything, filterQuery).Return([]internal.Log{}, errors.New("FilterLogs error")) - - // Perform polling - logs, err := chainPoller.Poll(context.Background(), []internal.FilterQuery{filterQuery}) - require.NoError(t, err) - require.Len(t, logs, 0) - - // Verify that FilterLogs was called with expected query - mockClient.AssertCalled(t, "FilterLogs", mock.Anything, filterQuery) - - // Verify that logger.Error was called - assert.True(t, mockLogger.ContainsError("Failed to filter logs")) - -} diff --git a/lib/sentinel/chain_poller/interface.go b/lib/sentinel/chain_poller/interface.go deleted file mode 100644 index 018212f31..000000000 --- a/lib/sentinel/chain_poller/interface.go +++ /dev/null @@ -1,13 +0,0 @@ -// File: chain_poller/interface.go -package chain_poller - -import ( - "context" - - "github.com/smartcontractkit/chainlink-testing-framework/lib/sentinel/internal" -) - -// ChainPollerInterface defines the methods that ChainPoller must implement. -type ChainPollerInterface interface { - Poll(ctx context.Context, filterQueries []internal.FilterQuery) ([]internal.Log, error) -} diff --git a/lib/sentinel/chain_poller_service/README.md b/lib/sentinel/chain_poller_service/README.md deleted file mode 100644 index b2479d925..000000000 --- a/lib/sentinel/chain_poller_service/README.md +++ /dev/null @@ -1,100 +0,0 @@ -# **Chain Poller Service** - -## **Overview** - -The **Chain Poller Service** is a higher-level abstraction in the [Sentinel](https://github.com/smartcontractkit/lib/sentinel) framework that manages periodic blockchain log polling and event broadcasting. It integrates with the `ChainPoller` and `SubscriptionManager` to provide a complete solution for monitoring blockchain activity and notifying subscribers in real time. - ---- - -## **Features** - -- **Automated Polling**: Periodically fetches logs based on active subscriptions. -- **Real-Time Broadcasting**: Sends logs to subscribers through the `SubscriptionManager`. -- **Multi-Chain Support**: Handles subscriptions and polling for multiple blockchain networks. -- **Graceful Start/Stop**: Ensures clean resource management when starting or stopping the service. - ---- - -## **Usage** - -### **Initialization** -Create a `ChainPollerService` with the necessary configuration: -```go -config := chain_poller_service.ChainPollerServiceConfig{ - PollInterval: 100 * time.Millisecond, - ChainPoller: chainPoller, // Instance of ChainPoller - Logger: logger, // Logger instance - BlockchainClient: client, // Blockchain client - ChainID: 1, // Chain ID -} - -pollerService, err := chain_poller_service.NewChainPollerService(config) -if err != nil { - panic("Failed to initialize Chain Poller Service: " + err.Error()) -} -``` - -### **Start and Stop** -Start and stop the polling service: -```go -pollerService.Start() -defer pollerService.Stop() -``` - -### **Subscriptions** -Manage subscriptions through the integrated `SubscriptionManager`: -```go -address := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678") -topic := common.HexToHash("0xabcdefabcdefabcdefabcdefabcdefabcdefabcdef") - -logCh, err := pollerService.SubscriptionMgr.Subscribe(address, topic) -if err != nil { - panic("Failed to subscribe: " + err.Error()) -} - -// Handle incoming logs -go func() { - for log := range logCh { - fmt.Println("Received log:", log) - } -}() - -// Unsubscribe when done -err = pollerService.SubscriptionMgr.Unsubscribe(address, topic, logCh) -if err != nil { - panic("Failed to unsubscribe: " + err.Error()) -} -``` - ---- - -## **API Reference** - -### **`NewChainPollerService(config ChainPollerServiceConfig) (*ChainPollerService, error)`** -- Initializes a new Chain Poller Service with the specified configuration. - -### **`Start()`** -- Begins the periodic polling process. - -### **`Stop()`** -- Gracefully stops the polling process and releases resources. - -### **`SubscriptionMgr`** -- Access the `SubscriptionManager` to manage subscriptions. - ---- - -## **Testing** - -### **Test Coverage** -The `ChainPollerService` includes tests for: -1. **Initialization**: Ensures valid configurations are required. -2. **Polling**: Verifies that logs are fetched and broadcasted correctly. -3. **Lifecycle Management**: Confirms proper start/stop behavior. -4. **Subscriptions**: Tests integration with the `SubscriptionManager`. - -### **How to Run Tests** -To execute tests: -```bash -go test ./chain_poller_service -``` \ No newline at end of file diff --git a/lib/sentinel/chain_poller_service/chain_poller_service.go b/lib/sentinel/chain_poller_service/chain_poller_service.go deleted file mode 100644 index 523ca0b2e..000000000 --- a/lib/sentinel/chain_poller_service/chain_poller_service.go +++ /dev/null @@ -1,208 +0,0 @@ -// File: chain_poller_service/chain_poller_service.go -package chain_poller_service - -import ( - "context" - "errors" - "fmt" - "math/big" - "sync" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/smartcontractkit/chainlink-testing-framework/lib/sentinel/chain_poller" - "github.com/smartcontractkit/chainlink-testing-framework/lib/sentinel/internal" - "github.com/smartcontractkit/chainlink-testing-framework/lib/sentinel/subscription_manager" -) - -// ChainPollerServiceConfig holds the configuration for the ChainPollerService. -type ChainPollerServiceConfig struct { - PollInterval time.Duration - ChainPoller chain_poller.ChainPollerInterface - Logger internal.Logger - BlockchainClient internal.BlockchainClient - ChainID int64 -} - -// ChainPollerService orchestrates the polling process and log broadcasting. -type ChainPollerService struct { - config ChainPollerServiceConfig - SubscriptionMgr *subscription_manager.SubscriptionManager - ChainID int64 - LastBlock *big.Int - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - started bool - mu sync.Mutex -} - -func (eps *ChainPollerService) SubscriptionManager() *subscription_manager.SubscriptionManager { - return eps.SubscriptionMgr -} - -// NewChainPollerService initializes a new ChainPollerService. -func NewChainPollerService(cfg ChainPollerServiceConfig) (*ChainPollerService, error) { - if cfg.ChainPoller == nil { - return nil, fmt.Errorf("chain poller cannot be nil") - } - if cfg.Logger == nil { - return nil, fmt.Errorf("logger cannot be nil") - } - if cfg.PollInterval <= 0 { - return nil, fmt.Errorf("poll interval must be positive") - } - if cfg.BlockchainClient == nil { - return nil, fmt.Errorf("blockchain client cannot be nil") - } - // Create a subscrition manager - subscription_manager := subscription_manager.NewSubscriptionManager(cfg.Logger, cfg.ChainID) - - // Initialize lastBlock as the latest block at startup - latestBlock, err := cfg.BlockchainClient.BlockNumber(context.Background()) - if err != nil { - return nil, fmt.Errorf("failed to get latest block: %w", err) - } - - if latestBlock == 0 { - return nil, errors.New("blockchain has no blocks") - } - lastBlock := new(big.Int).Sub(new(big.Int).SetUint64(latestBlock), big.NewInt(1)) - - return &ChainPollerService{ - config: cfg, - SubscriptionMgr: subscription_manager, - ChainID: cfg.ChainID, - LastBlock: lastBlock, - }, nil -} - -// Start begins the polling loop. -func (eps *ChainPollerService) Start() { - eps.mu.Lock() - defer eps.mu.Unlock() - - if eps.started { - eps.config.Logger.Warn("ChainPollerService already started") - return - } - - eps.ctx, eps.cancel = context.WithCancel(context.Background()) - eps.started = true - eps.wg.Add(1) - go eps.pollingLoop() - - eps.config.Logger.Info(fmt.Sprintf("ChainPollerService started with poll interval: %s", eps.config.PollInterval.String())) -} - -// Stop gracefully stops the polling loop. -func (eps *ChainPollerService) Stop() { - eps.mu.Lock() - defer eps.mu.Unlock() - - if !eps.started { - return - } - - eps.cancel() - eps.wg.Wait() - eps.started = false - - eps.config.Logger.Info("ChainPollerService stopped") -} - -// pollingLoop runs the periodic polling process. -func (eps *ChainPollerService) pollingLoop() { - defer eps.wg.Done() - - ticker := time.NewTicker(eps.config.PollInterval) - defer ticker.Stop() - - for { - select { - case <-eps.ctx.Done(): - eps.config.Logger.Info("Polling loop terminating") - return - case <-ticker.C: - eps.pollCycle() - } - } -} - -// pollCycle performs a single polling cycle: fetching logs and broadcasting them. -func (eps *ChainPollerService) pollCycle() { - startTime := time.Now() - eps.config.Logger.Debug("Starting polling cycle") - - // Fetch the latest block number - latestBlock, err := eps.config.BlockchainClient.BlockNumber(eps.ctx) - if err != nil { - eps.config.Logger.Error("Failed to get latest block", "error", err) - return - } - - toBlock := latestBlock - fromBlock := new(big.Int).Add(eps.LastBlock, big.NewInt(1)) - - // Ensure fromBlock is not greater than toBlock - if fromBlock.Cmp(new(big.Int).SetUint64(toBlock)) > 0 { - eps.config.Logger.Warn(fmt.Sprintf("fromBlock (%s) is greater than toBlock (%s), skipping poll", fromBlock.String(), new(big.Int).SetUint64(toBlock).String())) - return - } - - // Get current subscriptions - subscriptions := eps.SubscriptionMgr.GetAddressesAndTopics() - - if len(subscriptions) == 0 { - // Update the last processed block to toBlock - eps.LastBlock = new(big.Int).SetUint64(toBlock) - eps.config.Logger.Debug("No active subscriptions, skipping polling cycle") - return - } - - // Construct filter queries with the same fromBlock and toBlock - var filterQueries []internal.FilterQuery - for address, topics := range subscriptions { // 'topics' is []common.Hash - for _, topic := range topics { // Iterate over each topic - filterQueries = append(filterQueries, internal.FilterQuery{ - FromBlock: fromBlock.Uint64(), - ToBlock: toBlock, - Addresses: []common.Address{address}, - Topics: [][]common.Hash{{topic}}, // Separate query per topic - }) - } - } - - // Fetch logs using the Chain Poller - ctx, cancel := context.WithTimeout(eps.ctx, 10*time.Second) - defer cancel() - - logs, err := eps.config.ChainPoller.Poll(ctx, filterQueries) - if err != nil { - eps.config.Logger.Error("Error during po lling", "error", err) - return - } - - eps.config.Logger.Debug(fmt.Sprintf("Fetched %d logs from blockchain", len(logs))) - - // Broadcast each log to subscribers - for _, log := range logs { - if len(log.Topics) == 0 { - continue // Skip logs without topics - } - - for _, topic := range log.Topics { - eventKey := internal.EventKey{ - Address: log.Address, - Topic: topic, - } - eps.SubscriptionMgr.BroadcastLog(eventKey, log) - } - } - - // Update the last processed block to toBlock - eps.LastBlock = new(big.Int).SetUint64(toBlock) - - duration := time.Since(startTime) - eps.config.Logger.Debug(fmt.Sprintf("Completed polling cycle in %s", duration.String())) -} diff --git a/lib/sentinel/chain_poller_service/chain_poller_service_test.go b/lib/sentinel/chain_poller_service/chain_poller_service_test.go deleted file mode 100644 index 721e7e4a0..000000000 --- a/lib/sentinel/chain_poller_service/chain_poller_service_test.go +++ /dev/null @@ -1,430 +0,0 @@ -// File: chain_poller_service/chain_poller_service_test.go -package chain_poller_service_test - -import ( - "context" - "math/big" - "testing" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - - "github.com/smartcontractkit/chainlink-testing-framework/lib/sentinel/chain_poller" - "github.com/smartcontractkit/chainlink-testing-framework/lib/sentinel/chain_poller_service" - "github.com/smartcontractkit/chainlink-testing-framework/lib/sentinel/internal" -) - -// MockChainPoller implements the ChainPollerInterface for testing. -type MockChainPoller struct { - mock.Mock -} - -func (m *MockChainPoller) Poll(ctx context.Context, filterQueries []internal.FilterQuery) ([]internal.Log, error) { - args := m.Called(ctx, filterQueries) - if logs, ok := args.Get(0).([]internal.Log); ok { - return logs, args.Error(1) - } - return nil, args.Error(1) -} - -// Ensure MockChainPoller implements ChainPollerInterface -var _ chain_poller.ChainPollerInterface = (*MockChainPoller)(nil) - -func TestChainPollerService_Initialization(t *testing.T) { - mockChainPoller := new(MockChainPoller) - mockBlockchainClient := new(internal.MockBlockchainClient) - mockLogger := internal.NewMockLogger() - - // Mock BlockchainClient.BlockNumber during initialization - initialLastBlockNum := uint64(100) - mockBlockchainClient.On("BlockNumber", mock.Anything).Return(initialLastBlockNum, nil).Once() - - config := chain_poller_service.ChainPollerServiceConfig{ - PollInterval: 100 * time.Millisecond, - ChainPoller: mockChainPoller, - Logger: mockLogger, - ChainID: 1, - BlockchainClient: mockBlockchainClient, - } - - chainPollerService, err := chain_poller_service.NewChainPollerService(config) - require.NoError(t, err) - require.NotNil(t, chainPollerService) - - // Verify initial LastBlock is set correctly - assert.Equal(t, big.NewInt(99), chainPollerService.LastBlock) - - // Assert that BlockNumber was called once - mockBlockchainClient.AssertCalled(t, "BlockNumber", mock.Anything) -} - -func TestChainPollerService_Initialization_InvalidConfig(t *testing.T) { - mockLogger := internal.NewMockLogger() - - config := chain_poller_service.ChainPollerServiceConfig{ - PollInterval: 100 * time.Millisecond, - ChainPoller: nil, // Invalid - Logger: mockLogger, - ChainID: 1, - // BlockchainClient is missing - } - - chainPollerService, err := chain_poller_service.NewChainPollerService(config) - require.Error(t, err) - assert.Nil(t, chainPollerService) - assert.Equal(t, "chain poller cannot be nil", err.Error()) -} - -func TestChainPollerService_Initialization_InvalidBlockchainClient(t *testing.T) { - mockChainPoller := new(MockChainPoller) - mockLogger := internal.NewMockLogger() - - config := chain_poller_service.ChainPollerServiceConfig{ - PollInterval: 100 * time.Millisecond, - ChainPoller: mockChainPoller, - Logger: mockLogger, - ChainID: 1, - BlockchainClient: nil, - } - - chainPollerService, err := chain_poller_service.NewChainPollerService(config) - require.Error(t, err) - assert.Nil(t, chainPollerService) - assert.Equal(t, "blockchain client cannot be nil", err.Error()) -} - -func TestChainPollerService_StartAndStop(t *testing.T) { - mockChainPoller := new(MockChainPoller) - mockBlockchainClient := new(internal.MockBlockchainClient) - mockLogger := internal.NewMockLogger() - - // Mock BlockchainClient.BlockNumber during initialization - initialLastBlockNum := uint64(100) - mockBlockchainClient.On("BlockNumber", mock.Anything).Return(initialLastBlockNum, nil).Once() - - config := chain_poller_service.ChainPollerServiceConfig{ - PollInterval: 100 * time.Millisecond, - ChainPoller: mockChainPoller, - Logger: mockLogger, - ChainID: 1, - BlockchainClient: mockBlockchainClient, - } - - chainPollerService, err := chain_poller_service.NewChainPollerService(config) - require.NoError(t, err) - require.NotNil(t, chainPollerService) - - // Start the service - chainPollerService.Start() - - // Allow some time for polling loop to start - time.Sleep(10 * time.Millisecond) - - // Stop the service - chainPollerService.Stop() - - assert.True(t, mockLogger.ContainsLog("ChainPollerService started with poll interval: 100ms")) - assert.True(t, mockLogger.ContainsLog("Polling loop terminating")) - assert.True(t, mockLogger.ContainsLog("ChainPollerService stopped")) -} - -func TestChainPollerService_PollCycle_FetchAndBroadcast(t *testing.T) { - mockChainPoller := new(MockChainPoller) - mockBlockchainClient := new(internal.MockBlockchainClient) - mockLogger := internal.NewMockLogger() - - // Mock BlockchainClient.BlockNumber during initialization - initialLastBlockNum := uint64(100) - mockBlockchainClient.On("BlockNumber", mock.Anything).Return(initialLastBlockNum, nil).Once() - - // Initialize ChainPollerService - config := chain_poller_service.ChainPollerServiceConfig{ - PollInterval: 100 * time.Millisecond, - ChainPoller: mockChainPoller, - Logger: mockLogger, - ChainID: 1, - BlockchainClient: mockBlockchainClient, - } - - chainPollerService, err := chain_poller_service.NewChainPollerService(config) - require.NoError(t, err) - require.NotNil(t, chainPollerService) - - // Setup a subscriber - address := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678") - topic := common.HexToHash("0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd") - logCh, err := chainPollerService.SubscriptionMgr.Subscribe(address, topic) - require.NoError(t, err) - defer chainPollerService.SubscriptionMgr.Unsubscribe(address, topic, logCh) - - // Define the expected toBlock - toBlock := uint64(110) - - // Define the expected filter query - filterQuery := internal.FilterQuery{ - FromBlock: chainPollerService.LastBlock.Uint64() + 1, - ToBlock: toBlock, - Addresses: []common.Address{address}, - Topics: [][]common.Hash{{topic}}, - } - - // Define fetched logs - fetchedLogs := []internal.Log{ - { - BlockNumber: 105, - TxHash: common.HexToHash("0xdeadbeef"), - Address: address, - Topics: []common.Hash{topic}, - Data: []byte("test log data"), - Index: 0, - }, - } - - mockChainPoller.On("Poll", mock.Anything, []internal.FilterQuery{filterQuery}).Return(fetchedLogs, nil).Once() - - // Mock BlockchainClient.BlockNumber for the next poll - mockBlockchainClient.On("BlockNumber", mock.Anything).Return(toBlock, nil).Once() - - // Start the polling service - chainPollerService.Start() - - // Allow some time for polling cycle to execute - time.Sleep(150 * time.Millisecond) - - // Stop the polling service - chainPollerService.Stop() - - // Assert that the fetched log was broadcasted to the subscriber - select { - case receivedLog := <-logCh: - assert.Equal(t, fetchedLogs[0], receivedLog, "Received log should match the fetched log") - default: - t.Fatal("Did not receive the expected log") - } - - assert.True(t, mockLogger.ContainsLog("ChainPollerService started with poll interval: 100ms")) - assert.True(t, mockLogger.ContainsLog("Starting polling cycle")) - assert.True(t, mockLogger.ContainsLog("Fetched 1 logs from blockchain")) - assert.True(t, mockLogger.ContainsLog("Completed polling cycle in")) -} - -func TestChainPollerService_PollCycle_NoSubscriptions(t *testing.T) { - mockChainPoller := new(MockChainPoller) - mockBlockchainClient := new(internal.MockBlockchainClient) - mockLogger := internal.NewMockLogger() - - // Mock BlockchainClient.BlockNumber during initialization - initialLastBlockNum := uint64(100) - mockBlockchainClient.On("BlockNumber", mock.Anything).Return(initialLastBlockNum, nil).Once() - - // Initialize ChainPollerService - config := chain_poller_service.ChainPollerServiceConfig{ - PollInterval: 100 * time.Millisecond, - ChainPoller: mockChainPoller, - Logger: mockLogger, - ChainID: 1, - BlockchainClient: mockBlockchainClient, - } - - chainPollerService, err := chain_poller_service.NewChainPollerService(config) - require.NoError(t, err) - require.NotNil(t, chainPollerService) - - // Mock BlockchainClient.BlockNumber for the next poll - toBlock := uint64(110) - mockBlockchainClient.On("BlockNumber", mock.Anything).Return(toBlock, nil).Once() - - // Start the polling service - chainPollerService.Start() - - // Allow some time for polling cycle to execute - time.Sleep(150 * time.Millisecond) - - // Stop the polling service - chainPollerService.Stop() - - // Assert that Poll was not called - mockChainPoller.AssertNotCalled(t, "Poll", mock.Anything, mock.Anything) - - // Assert that the logger logged the no active subscriptions message - assert.True(t, mockLogger.ContainsLog("No active subscriptions, skipping polling cycle")) -} - -func TestChainPollerService_PollCycle_MultipleLogs(t *testing.T) { - mockChainPoller := new(MockChainPoller) - mockBlockchainClient := new(internal.MockBlockchainClient) - mockLogger := internal.NewMockLogger() - - // Mock BlockchainClient.BlockNumber during initialization - initialLastBlockNum := uint64(100) - mockBlockchainClient.On("BlockNumber", mock.Anything).Return(initialLastBlockNum, nil).Once() - - // Initialize ChainPollerService - config := chain_poller_service.ChainPollerServiceConfig{ - PollInterval: 100 * time.Millisecond, - ChainPoller: mockChainPoller, - Logger: mockLogger, - ChainID: 1, - BlockchainClient: mockBlockchainClient, - } - - chainPollerService, err := chain_poller_service.NewChainPollerService(config) - require.NoError(t, err) - require.NotNil(t, chainPollerService) - - // Verify initial LastBlock is set correctly - assert.Equal(t, big.NewInt(99), chainPollerService.LastBlock) - - // Setup subscribers - address := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678") - topic1 := common.HexToHash("0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd") - topic2 := common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef") - logCh1, err := chainPollerService.SubscriptionMgr.Subscribe(address, topic1) - require.NoError(t, err) - defer chainPollerService.SubscriptionMgr.Unsubscribe(address, topic1, logCh1) - - logCh2, err := chainPollerService.SubscriptionMgr.Subscribe(address, topic2) - require.NoError(t, err) - defer chainPollerService.SubscriptionMgr.Unsubscribe(address, topic2, logCh2) - - // Define the expected toBlock - toBlock := uint64(110) - - // Define the expected filter queries (same fromBlock and toBlock) - filterQueries := []internal.FilterQuery{ - { - FromBlock: 100, - ToBlock: toBlock, - Addresses: []common.Address{address}, - Topics: [][]common.Hash{{topic1}}, - }, - { - FromBlock: 100, - ToBlock: toBlock, - Addresses: []common.Address{address}, - Topics: [][]common.Hash{{topic2}}, - }, - } - - // Define fetched logs - fetchedLogs := []internal.Log{ - { - BlockNumber: 105, - TxHash: common.HexToHash("0xdeadbeef"), - Address: address, - Topics: []common.Hash{topic1}, - Data: []byte("test log data 1"), - Index: 0, - }, - { - BlockNumber: 106, - TxHash: common.HexToHash("0xfeedface"), - Address: address, - Topics: []common.Hash{topic2}, - Data: []byte("test log data 2"), - Index: 1, - }, - } - - // Mock ChainPoller.Poll to return fetchedLogs - mockChainPoller.On("Poll", mock.Anything, filterQueries).Return(fetchedLogs, nil).Once() - - // Mock BlockchainClient.BlockNumber for the next poll - mockBlockchainClient.On("BlockNumber", mock.Anything).Return(toBlock, nil).Once() - - // Start the polling service - chainPollerService.Start() - - // Allow some time for polling cycle to execute - time.Sleep(150 * time.Millisecond) - chainPollerService.Stop() - - // Assert that the fetched logs were broadcasted to the subscribers - select { - case receivedLog := <-logCh1: - assert.Equal(t, fetchedLogs[0], receivedLog, "Received log1 should match the fetched log1") - default: - t.Fatal("Did not receive the expected log1") - } - - select { - case receivedLog := <-logCh2: - assert.Equal(t, fetchedLogs[1], receivedLog, "Received log2 should match the fetched log2") - default: - t.Fatal("Did not receive the expected log2") - } - - // Assert that the mocks were called as expected - mockChainPoller.AssertCalled(t, "Poll", mock.Anything, filterQueries) - mockBlockchainClient.AssertCalled(t, "BlockNumber", mock.Anything) - assert.True(t, mockLogger.ContainsLog("Starting polling cycle")) - assert.True(t, mockLogger.ContainsLog("Fetched 2 logs from blockchain")) - assert.True(t, mockLogger.ContainsLog("Completed polling cycle in")) -} - -func TestChainPollerService_StopWithoutStart(t *testing.T) { - mockChainPoller := new(MockChainPoller) - mockBlockchainClient := new(internal.MockBlockchainClient) - mockLogger := internal.NewMockLogger() - - // Mock BlockchainClient.BlockNumber during initialization - initialLastBlockNum := uint64(100) - mockBlockchainClient.On("BlockNumber", mock.Anything).Return(initialLastBlockNum, nil).Once() - - // Initialize ChainPollerService - config := chain_poller_service.ChainPollerServiceConfig{ - PollInterval: 100 * time.Millisecond, - ChainPoller: mockChainPoller, - Logger: mockLogger, - ChainID: 1, - BlockchainClient: mockBlockchainClient, - } - - chainPollerService, err := chain_poller_service.NewChainPollerService(config) - require.NoError(t, err) - require.NotNil(t, chainPollerService) - - // Attempt to stop without starting - chainPollerService.Stop() - // Verify that no logger calls were made - assert.Equal(t, 0, mockLogger.NumLogs()) -} - -func TestChainPollerService_MultipleStartCalls(t *testing.T) { - mockChainPoller := new(MockChainPoller) - mockBlockchainClient := new(internal.MockBlockchainClient) - mockLogger := internal.NewMockLogger() - - // Mock BlockchainClient.BlockNumber during initialization - initialLastBlockNum := uint64(100) - mockBlockchainClient.On("BlockNumber", mock.Anything).Return(initialLastBlockNum, nil).Once() - - config := chain_poller_service.ChainPollerServiceConfig{ - PollInterval: 100 * time.Millisecond, - ChainPoller: mockChainPoller, - Logger: mockLogger, - ChainID: 1, - BlockchainClient: mockBlockchainClient, - } - - chainPollerService, err := chain_poller_service.NewChainPollerService(config) - require.NoError(t, err) - require.NotNil(t, chainPollerService) - - // Start the service first time - chainPollerService.Start() - - // Start the service second time - chainPollerService.Start() - - // Stop the service - chainPollerService.Stop() - - assert.True(t, mockLogger.ContainsLog("ChainPollerService started with poll interval: 100ms")) - assert.True(t, mockLogger.ContainsLog("ChainPollerService already started")) - assert.True(t, mockLogger.ContainsLog("Polling loop terminating")) - assert.True(t, mockLogger.ContainsLog("ChainPollerService stopped")) -} diff --git a/lib/sentinel/go.mod b/lib/sentinel/go.mod deleted file mode 100644 index befd3e594..000000000 --- a/lib/sentinel/go.mod +++ /dev/null @@ -1,24 +0,0 @@ -module github.com/smartcontractkit/chainlink-testing-framework/lib/sentinel - -go 1.22.5 - -toolchain go1.22.9 - -require ( - github.com/ethereum/go-ethereum v1.14.12 - github.com/rs/zerolog v1.30.0 - github.com/stretchr/testify v1.9.0 -) - -require ( - github.com/davecgh/go-spew v1.1.1 // indirect - github.com/holiman/uint256 v1.3.1 // indirect - github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.20 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/stretchr/objx v0.5.2 // indirect - golang.org/x/crypto v0.25.0 // indirect - golang.org/x/sys v0.22.0 // indirect - gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect -) diff --git a/lib/sentinel/go.sum b/lib/sentinel/go.sum deleted file mode 100644 index d62da9e36..000000000 --- a/lib/sentinel/go.sum +++ /dev/null @@ -1,47 +0,0 @@ -github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/ethereum/go-ethereum v1.14.12 h1:8hl57x77HSUo+cXExrURjU/w1VhL+ShCTJrTwcCQSe4= -github.com/ethereum/go-ethereum v1.14.12/go.mod h1:RAC2gVMWJ6FkxSPESfbshrcKpIokgQKsVKmAuqdekDY= -github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= -github.com/holiman/uint256 v1.3.1 h1:JfTzmih28bittyHM8z360dCjIA9dbPIBlcTI6lmctQs= -github.com/holiman/uint256 v1.3.1/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E= -github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= -github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= -github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= -github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= -github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= -github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= -github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= -github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= -github.com/rs/zerolog v1.30.0 h1:SymVODrcRsaRaSInD9yQtKbtWqwsfoPcRff/oRXLj4c= -github.com/rs/zerolog v1.30.0/go.mod h1:/tk+P47gFdPXq4QYjvCmT5/Gsug2nagsFWBWhAiSi1w= -github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= -github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= -golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= -golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= -golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/lib/sentinel/internal/default_logger.go b/lib/sentinel/internal/default_logger.go deleted file mode 100644 index f65c2cb8b..000000000 --- a/lib/sentinel/internal/default_logger.go +++ /dev/null @@ -1,79 +0,0 @@ -// File: internal/default_logger.go -package internal - -import ( - "fmt" - "os" - "strings" - - "github.com/rs/zerolog" - "github.com/rs/zerolog/log" -) - -// DefaultLogger is the default implementation of the Logger interface using zerolog. -type DefaultLogger struct { - logger zerolog.Logger -} - -// NewDefaultLogger initializes and returns a DefaultLogger. -func NewDefaultLogger() *DefaultLogger { - lvlStr := os.Getenv("SENTINEL_LOG_LEVEL") - if lvlStr == "" { - lvlStr = "info" - } - lvl, err := zerolog.ParseLevel(strings.ToLower(lvlStr)) - if err != nil { - panic(fmt.Sprintf("failed to parse log level: %v", err)) - } - - // Configure zerolog to output to the console with a human-friendly format. - logger := log.Output(zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: "15:04:05"}). - Level(lvl). - With(). - Timestamp(). - Logger() - - return &DefaultLogger{ - logger: logger, - } -} - -// Debug logs a message at debug level. -func (dl *DefaultLogger) Debug(args ...interface{}) { - dl.logger.Debug().Msg(fmt.Sprint(args...)) -} - -// Info logs a message at info level. -func (dl *DefaultLogger) Info(args ...interface{}) { - dl.logger.Info().Msg(fmt.Sprint(args...)) -} - -// Warn logs a message at warn level. -func (dl *DefaultLogger) Warn(args ...interface{}) { - dl.logger.Warn().Msg(fmt.Sprint(args...)) -} - -// Error logs a message at error level. -func (dl *DefaultLogger) Error(args ...interface{}) { - dl.logger.Error().Msg(fmt.Sprint(args...)) -} - -// Debugf logs a formatted message at debug level. -func (dl *DefaultLogger) Debugf(format string, args ...interface{}) { - dl.logger.Debug().Msgf(format, args...) -} - -// Infof logs a formatted message at info level. -func (dl *DefaultLogger) Infof(format string, args ...interface{}) { - dl.logger.Info().Msgf(format, args...) -} - -// Warnf logs a formatted message at warn level. -func (dl *DefaultLogger) Warnf(format string, args ...interface{}) { - dl.logger.Warn().Msgf(format, args...) -} - -// Errorf logs a formatted message at error level. -func (dl *DefaultLogger) Errorf(format string, args ...interface{}) { - dl.logger.Error().Msgf(format, args...) -} diff --git a/lib/sentinel/internal/interface.go b/lib/sentinel/internal/interface.go deleted file mode 100644 index 03fba45d3..000000000 --- a/lib/sentinel/internal/interface.go +++ /dev/null @@ -1,12 +0,0 @@ -// File: internal/interface.go -package internal - -import ( - "context" -) - -// BlockchainClient defines the required methods for interacting with a blockchain. -type BlockchainClient interface { - BlockNumber(ctx context.Context) (uint64, error) - FilterLogs(ctx context.Context, query FilterQuery) ([]Log, error) -} diff --git a/lib/sentinel/internal/logger.go b/lib/sentinel/internal/logger.go deleted file mode 100644 index 624b9f8d8..000000000 --- a/lib/sentinel/internal/logger.go +++ /dev/null @@ -1,14 +0,0 @@ -// File: internal/logger.go -package internal - -// Logger defines a generic logging interface. -type Logger interface { - Debug(args ...interface{}) - Info(args ...interface{}) - Warn(args ...interface{}) - Error(args ...interface{}) - Debugf(format string, args ...interface{}) - Infof(format string, args ...interface{}) - Warnf(format string, args ...interface{}) - Errorf(format string, args ...interface{}) -} diff --git a/lib/sentinel/internal/mock_blockchain_client.go b/lib/sentinel/internal/mock_blockchain_client.go deleted file mode 100644 index cbd6e1bae..000000000 --- a/lib/sentinel/internal/mock_blockchain_client.go +++ /dev/null @@ -1,26 +0,0 @@ -// File: internal/mock_blockchain_client.go -package internal - -import ( - "context" - - "github.com/stretchr/testify/mock" -) - -// MockBlockchainClient implements the internal.BlockchainClient interface for testing. -type MockBlockchainClient struct { - mock.Mock -} - -func (m *MockBlockchainClient) BlockNumber(ctx context.Context) (uint64, error) { - args := m.Called(ctx) - return args.Get(0).(uint64), args.Error(1) -} - -func (m *MockBlockchainClient) FilterLogs(ctx context.Context, query FilterQuery) ([]Log, error) { - args := m.Called(ctx, query) - return args.Get(0).([]Log), args.Error(1) -} - -// Ensure MockBlockchainClient implements BlockchainClient interface -var _ BlockchainClient = (*MockBlockchainClient)(nil) diff --git a/lib/sentinel/internal/mock_logger.go b/lib/sentinel/internal/mock_logger.go deleted file mode 100644 index 805477e82..000000000 --- a/lib/sentinel/internal/mock_logger.go +++ /dev/null @@ -1,117 +0,0 @@ -// File: internal/mock_logger.go -package internal - -import ( - "fmt" - "strings" - "sync" -) - -// MockLogger captures logs for testing purposes. -type MockLogger struct { - mu sync.Mutex - Logs []string - Errors []string -} - -// NewMockLogger initializes a new MockLogger. -func NewMockLogger() *MockLogger { - return &MockLogger{ - Logs: []string{}, - Errors: []string{}, - } -} - -func (ml *MockLogger) Reset() { - ml.mu.Lock() - defer ml.mu.Unlock() - ml.Logs = []string{} - ml.Errors = []string{} -} - -func (ml *MockLogger) Debug(args ...interface{}) { - ml.mu.Lock() - defer ml.mu.Unlock() - ml.Logs = append(ml.Logs, fmt.Sprint(args...)) -} - -func (ml *MockLogger) Info(args ...interface{}) { - ml.mu.Lock() - defer ml.mu.Unlock() - ml.Logs = append(ml.Logs, fmt.Sprint(args...)) -} - -func (ml *MockLogger) Warn(args ...interface{}) { - ml.mu.Lock() - defer ml.mu.Unlock() - ml.Logs = append(ml.Logs, fmt.Sprint(args...)) -} - -func (ml *MockLogger) Error(args ...interface{}) { - ml.mu.Lock() - defer ml.mu.Unlock() - ml.Errors = append(ml.Errors, fmt.Sprint(args...)) -} - -// Debugf logs a formatted debug message. -func (ml *MockLogger) Debugf(format string, args ...interface{}) { - ml.mu.Lock() - defer ml.mu.Unlock() - logMsg := fmt.Sprintf(format, args...) - ml.Logs = append(ml.Logs, logMsg) -} - -// Infof logs a formatted info message. -func (ml *MockLogger) Infof(format string, args ...interface{}) { - ml.mu.Lock() - defer ml.mu.Unlock() - logMsg := fmt.Sprintf(format, args...) - ml.Logs = append(ml.Logs, logMsg) -} - -// Warnf logs a formatted warning message. -func (ml *MockLogger) Warnf(format string, args ...interface{}) { - ml.mu.Lock() - defer ml.mu.Unlock() - logMsg := fmt.Sprintf(format, args...) - ml.Logs = append(ml.Logs, logMsg) -} - -// Errorf logs a formatted error message. -func (ml *MockLogger) Errorf(format string, args ...interface{}) { - ml.mu.Lock() - defer ml.mu.Unlock() - logMsg := fmt.Sprintf(format, args...) - ml.Errors = append(ml.Errors, logMsg) -} - -// ContainsLog checks if any log in Logs contains the specified substring. -func (ml *MockLogger) ContainsLog(substring string) bool { - ml.mu.Lock() - defer ml.mu.Unlock() - for _, log := range ml.Logs { - if strings.Contains(log, substring) { - return true - } - } - return false -} - -// ContainsLog checks if any log in Logs contains the specified substring. -func (ml *MockLogger) ContainsError(substring string) bool { - ml.mu.Lock() - defer ml.mu.Unlock() - for _, error := range ml.Errors { - if strings.Contains(error, substring) { - return true - } - } - return false -} - -// NumLogs returns the total number of logs. -func (ml *MockLogger) NumLogs() int { - ml.mu.Lock() - defer ml.mu.Unlock() - return len(ml.Logs) -} diff --git a/lib/sentinel/internal/types.go b/lib/sentinel/internal/types.go deleted file mode 100644 index 88728a4e6..000000000 --- a/lib/sentinel/internal/types.go +++ /dev/null @@ -1,28 +0,0 @@ -// File: internal/tools.go -package internal - -import "github.com/ethereum/go-ethereum/common" - -// FilterQuery represents the parameters to filter logs/events. -type FilterQuery struct { - FromBlock uint64 - ToBlock uint64 - Topics [][]common.Hash - Addresses []common.Address -} - -// Log represents a single log event fetched from the blockchain. -type Log struct { - Address common.Address - Topics []common.Hash - Data []byte - BlockNumber uint64 - TxHash common.Hash - Index uint -} - -// EventKey uniquely identifies an event subscription based on address and topic. -type EventKey struct { - Address common.Address - Topic common.Hash -} diff --git a/lib/sentinel/sentinel.go b/lib/sentinel/sentinel.go deleted file mode 100644 index 2d05b0de4..000000000 --- a/lib/sentinel/sentinel.go +++ /dev/null @@ -1,119 +0,0 @@ -// File: sentinel.go -package sentinel - -import ( - "fmt" - "sync" - - "github.com/ethereum/go-ethereum/common" - "github.com/smartcontractkit/chainlink-testing-framework/lib/sentinel/chain_poller_service" - "github.com/smartcontractkit/chainlink-testing-framework/lib/sentinel/internal" -) - -// SentinelConfig holds configuration for the Sentinel. -type SentinelConfig struct { - Logger internal.Logger -} - -type Sentinel struct { - config SentinelConfig - mu sync.RWMutex - services map[int64]*chain_poller_service.ChainPollerService // Map of chainID to ChianPollerService -} - -// NewSentinel initializes and returns a new Sentinel instance. -func NewSentinel(cfg SentinelConfig) *Sentinel { - return &Sentinel{ - config: cfg, - services: make(map[int64]*chain_poller_service.ChainPollerService), - } -} - -// AddChain adds a new chain to Sentinel. -func (s *Sentinel) AddChain(cfg chain_poller_service.ChainPollerServiceConfig) error { - s.mu.Lock() - defer s.mu.Unlock() - - if _, exists := s.services[cfg.ChainID]; exists { - return fmt.Errorf("chain with ID %d already exists", cfg.ChainID) - } - - eps, err := chain_poller_service.NewChainPollerService(cfg) - if err != nil { - return fmt.Errorf("failed to initialize ChainPollerService: %w", err) - } - - s.services[cfg.ChainID] = eps - eps.Start() - return nil -} - -// RemoveChain removes a chain from Sentinel. -func (s *Sentinel) RemoveChain(chainID int64) error { - s.mu.Lock() - defer s.mu.Unlock() - - eps, exists := s.services[chainID] - if !exists { - return fmt.Errorf("chain with ID %d does not exist", chainID) - } - - eps.Stop() - delete(s.services, chainID) - s.config.Logger.Info(fmt.Sprintf("Removed chain with ID %d", chainID)) - return nil -} - -// Subscribe subscribes to events for a specific chain. -func (s *Sentinel) Subscribe(chainID int64, address common.Address, topic common.Hash) (chan internal.Log, error) { - s.mu.RLock() - eps, exists := s.services[chainID] - s.mu.RUnlock() - - if !exists { - return nil, fmt.Errorf("chain with ID %d does not exist", chainID) - } - - return eps.SubscriptionManager().Subscribe(address, topic) -} - -// Unsubscribe unsubscribes from events for a specific chain. -func (s *Sentinel) Unsubscribe(chainID int64, address common.Address, topic common.Hash, ch chan internal.Log) error { - s.mu.RLock() - eps, exists := s.services[chainID] - s.mu.RUnlock() - - if !exists { - return fmt.Errorf("chain with ID %d does not exist", chainID) - } - - return eps.SubscriptionManager().Unsubscribe(address, topic, ch) -} - -// GetService gets the chain poller service for a chain id. -func (s *Sentinel) GetService(chainID int64) (*chain_poller_service.ChainPollerService, bool) { - s.mu.RLock() - eps, exists := s.services[chainID] - s.mu.RUnlock() - - return eps, exists -} - -// HasServices returns true if there is at least 1 service running. -func (s *Sentinel) HasServices() bool { - s.mu.RLock() - defer s.mu.RUnlock() - - return len(s.services) > 0 -} - -// Close shuts down all chains and the global registry. -func (s *Sentinel) Close() { - s.mu.Lock() - defer s.mu.Unlock() - - for _, eps := range s.services { - eps.Stop() - delete(s.services, eps.ChainID) - } -} diff --git a/lib/sentinel/sentinel_test.go b/lib/sentinel/sentinel_test.go deleted file mode 100644 index 26710daea..000000000 --- a/lib/sentinel/sentinel_test.go +++ /dev/null @@ -1,229 +0,0 @@ -// File: sentinel_test.go -package sentinel_test - -import ( - "context" - "testing" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - - "github.com/smartcontractkit/chainlink-testing-framework/lib/sentinel" - "github.com/smartcontractkit/chainlink-testing-framework/lib/sentinel/chain_poller" - "github.com/smartcontractkit/chainlink-testing-framework/lib/sentinel/chain_poller_service" - "github.com/smartcontractkit/chainlink-testing-framework/lib/sentinel/internal" -) - -// MockChainPoller implements the ChainPollerInterface for testing. -type MockChainPoller struct { - mock.Mock -} - -func (m *MockChainPoller) Poll(ctx context.Context, filterQueries []internal.FilterQuery) ([]internal.Log, error) { - args := m.Called(ctx, filterQueries) - if logs, ok := args.Get(0).([]internal.Log); ok { - return logs, args.Error(1) - } - return nil, args.Error(1) -} - -// Ensure MockChainPoller implements ChainPollerInterface -var _ chain_poller.ChainPollerInterface = (*MockChainPoller)(nil) - -func setupSentinel() (*sentinel.Sentinel, *internal.MockLogger) { - logger := internal.NewMockLogger() - s := sentinel.NewSentinel(sentinel.SentinelConfig{ - Logger: logger, - }) - - return s, logger -} - -func setupChainPollerServiceConfig(l *internal.MockLogger, chainID int64) (*chain_poller_service.ChainPollerServiceConfig, *internal.MockBlockchainClient) { - mockBlockchainClient := new(internal.MockBlockchainClient) - - mockChainPoller := new(MockChainPoller) - - return &chain_poller_service.ChainPollerServiceConfig{ - PollInterval: 100 * time.Millisecond, - ChainPoller: mockChainPoller, - Logger: l, - ChainID: chainID, - BlockchainClient: mockBlockchainClient, - }, mockBlockchainClient -} - -func TestNewSentinel_NoErrors(t *testing.T) { - s, _ := setupSentinel() - defer s.Close() - require.NotNil(t, s, "Sentinel should not be nil") -} - -func TestAddRemoveChain(t *testing.T) { - s, logger := setupSentinel() - defer s.Close() - - config1, mockClient1 := setupChainPollerServiceConfig(logger, 1) - mockClient1.On("BlockNumber", mock.Anything).Return(uint64(100), nil).Once() - - config2, mockClient2 := setupChainPollerServiceConfig(logger, 1) - mockClient2.On("BlockNumber", mock.Anything).Return(uint64(100), nil).Once() - - // Add and remove a chain - require.NoError(t, s.AddChain(*config1), "Should add chain without error") - require.NoError(t, s.RemoveChain(1), "Should remove chain without error") - - // Add another chain - require.NoError(t, s.AddChain(*config2), "Should add another chain without error") -} - -func TestAddChain_SubscribeUnsubscribeEvent(t *testing.T) { - s, logger := setupSentinel() - defer s.Close() - - config, mockClient1 := setupChainPollerServiceConfig(logger, 1) - mockClient1.On("BlockNumber", mock.Anything).Return(uint64(100), nil).Once() - - require.NoError(t, s.AddChain(*config), "Should add chain without error") - - address := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678") - topic := common.HexToHash("0xabcdefabcdefabcdefabcdefabcdefabcdefabcdef") - - ch, err := s.Subscribe(1, address, topic) - require.NoError(t, err, "Should subscribe without error") - defer s.Unsubscribe(1, address, topic, ch) - - // Wait to ensure subscription is ready - time.Sleep(50 * time.Millisecond) - - // Simulate log broadcast - eventKey := internal.EventKey{Address: address, Topic: topic} - log := internal.Log{Address: address, Topics: []common.Hash{topic}, Data: []byte("event data")} - - chainService, exists := s.GetService(1) // Add a helper to retrieve the service. - require.True(t, exists, "Chain service should exist") - - chainService.SubscriptionMgr.BroadcastLog(eventKey, log) - - select { - case receivedLog := <-ch: - assert.Equal(t, log, receivedLog, "Received log should match the broadcasted log") - default: - t.Fatal("Log not received") - } - - require.NoError(t, s.Unsubscribe(1, address, topic, ch), "Should unsubscribe without error") -} - -func TestAddChains_MultipleConsumers(t *testing.T) { - s, logger := setupSentinel() - defer s.Close() - - config1, mockClient1 := setupChainPollerServiceConfig(logger, 1) - mockClient1.On("BlockNumber", mock.Anything).Return(uint64(100), nil).Once() - - config2, mockClient2 := setupChainPollerServiceConfig(logger, 2) - mockClient2.On("BlockNumber", mock.Anything).Return(uint64(100), nil).Once() - - require.NoError(t, s.AddChain(*config1), "Should add chain 1 without error") - require.NoError(t, s.AddChain(*config2), "Should add chain 2 without error") - - // Chain 1 subscribers - address1 := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678") - topic1 := common.HexToHash("0xabcdefabcdefabcdefabcdefabcdefabcdefabcdef") - - ch1, err := s.Subscribe(1, address1, topic1) - require.NoError(t, err, "Subscriber 1 should subscribe without error") - defer s.Unsubscribe(1, address1, topic1, ch1) - - address2 := common.HexToAddress("0xabcdefabcdefabcdefabcdefabcdefabcdefabcd") - topic2 := common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef") - - ch2, err := s.Subscribe(1, address2, topic2) - require.NoError(t, err, "Subscriber 2 should subscribe without error") - defer s.Unsubscribe(1, address2, topic2, ch2) - - // Chain 2 subscriber - address3 := common.HexToAddress("0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef") - topic3 := common.HexToHash("0xcafebabecafebabecafebabecafebabecafebabe") - - ch3, err := s.Subscribe(2, address3, topic3) - require.NoError(t, err, "Subscriber 3 should subscribe without error") - defer s.Unsubscribe(2, address3, topic3, ch3) - - // Broadcast events - eventKey1 := internal.EventKey{Address: address1, Topic: topic1} - log1 := internal.Log{Address: address1, Topics: []common.Hash{topic1}, Data: []byte("log1")} - chainService, exists := s.GetService(1) - require.True(t, exists, "Chain service should exist") - chainService.SubscriptionMgr.BroadcastLog(eventKey1, log1) - - eventKey3 := internal.EventKey{Address: address3, Topic: topic3} - log3 := internal.Log{Address: address3, Topics: []common.Hash{topic3}, Data: []byte("log3")} - chainService2, exists := s.GetService(2) - require.True(t, exists, "Chain service should exist") - chainService2.SubscriptionMgr.BroadcastLog(eventKey3, log3) - - select { - case receivedLog := <-ch1: - assert.Equal(t, log1, receivedLog, "Subscriber 1 should receive the correct log") - default: - t.Fatal("Subscriber 1 did not receive the log") - } - - select { - case receivedLog := <-ch3: - assert.Equal(t, log3, receivedLog, "Subscriber 3 should receive the correct log") - default: - t.Fatal("Subscriber 3 did not receive the log") - } -} - -func TestAddChains_RemoveAndValidate(t *testing.T) { - s, logger := setupSentinel() - defer s.Close() - - config1, mockClient1 := setupChainPollerServiceConfig(logger, 1) - mockClient1.On("BlockNumber", mock.Anything).Return(uint64(100), nil).Once() - - config2, mockClient2 := setupChainPollerServiceConfig(logger, 2) - mockClient2.On("BlockNumber", mock.Anything).Return(uint64(100), nil).Once() - - require.NoError(t, s.AddChain(*config1), "Should add chain 1 without error") - require.NoError(t, s.AddChain(*config2), "Should add chain 2 without error") - - address := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678") - topic := common.HexToHash("0xabcdefabcdefabcdefabcdefabcdefabcdefabcdef") - - ch, err := s.Subscribe(1, address, topic) - require.NoError(t, err, "Should subscribe without error") - defer s.Unsubscribe(1, address, topic, ch) - - require.NoError(t, s.RemoveChain(1), "Should remove chain 1 without error") - - select { - case <-ch: - t.Fatal("Channel should be closed after chain removal") - default: - } -} - -func TestAddMultipleChains_CloseSentinel(t *testing.T) { - s, logger := setupSentinel() - - config1, mockClient1 := setupChainPollerServiceConfig(logger, 1) - mockClient1.On("BlockNumber", mock.Anything).Return(uint64(100), nil).Once() - - config2, mockClient2 := setupChainPollerServiceConfig(logger, 2) - mockClient2.On("BlockNumber", mock.Anything).Return(uint64(100), nil).Once() - - require.NoError(t, s.AddChain(*config1), "Should add chain 1 without error") - require.NoError(t, s.AddChain(*config2), "Should add chain 2 without error") - - s.Close() - - assert.False(t, s.HasServices(), "All chains should be cleaned up after Close") -} diff --git a/lib/sentinel/subscription_manager/README.md b/lib/sentinel/subscription_manager/README.md deleted file mode 100644 index 72b718e53..000000000 --- a/lib/sentinel/subscription_manager/README.md +++ /dev/null @@ -1,114 +0,0 @@ -# **Subscription Manager** - -## **Overview** - -The **Subscription Manager** is a utility in the [Sentinel](https://github.com/smartcontractkit/lib/sentinel) module of the Chainlink Testing Framework. It efficiently manages blockchain event subscriptions, broadcasts event logs, and maintains an optimized cache of active subscriptions. - ---- - -## **Features** - -- **Dynamic Subscriptions**: Subscribe and unsubscribe from blockchain events. -- **Log Broadcasting**: Relay logs to relevant subscribers. -- **Cache Optimization**: Maintains a mapping of active subscriptions for efficient filtering. -- **Thread Safety**: Ensures concurrency safety with mutex locks. -- **Integration**: Seamlessly works with `ChainPollerService` and `Sentinel`. - ---- - -## **How It Works** - -1. **Subscribe**: - - Add a subscription by providing an address and topic. - - Returns a channel for receiving logs. - -2. **Unsubscribe**: - - Remove a subscription and safely close the associated channel. - -3. **Broadcast Logs**: - - Sends logs to all subscribers of a specific address and topic. - -4. **Cache Management**: - - Maintains a cache for quick access to active subscriptions. - - Automatically invalidates the cache when subscriptions change. - ---- - -## **API Reference** - -### **`Subscribe(address common.Address, topic common.Hash) (chan Log, error)`** -Adds a subscription for a blockchain address and topic. Returns a channel for logs. - -### **`Unsubscribe(address common.Address, topic common.Hash, ch chan Log) error`** -Removes a subscription and closes the associated channel. - -### **`BroadcastLog(eventKey EventKey, log Log)`** -Broadcasts a log to all subscribers for a specific event. - -### **`GetAddressesAndTopics() map[common.Address][]common.Hash`** -Retrieves the current mapping of addresses and topics in the subscription cache. - -### **`Close()`** -Gracefully shuts down the manager, unsubscribes all listeners, and clears the registry. - ---- - -## **Usage Example** - -```go -package main - -import ( - "fmt" - "github.com/ethereum/go-ethereum/common" - "github.com/smartcontractkit/chainlink-testing-framework/sentinel/subscription_manager" -) - -func main() { - manager := subscription_manager.NewSubscriptionManager(/* logger, chainID */) - - address := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678") - topic := common.HexToHash("0xabcdefabcdefabcdefabcdefabcdefabcdefabcdef") - - // Subscribe to an event - ch, err := manager.Subscribe(address, topic) - if err != nil { - fmt.Println("Subscription failed:", err) - return - } - - // Simulate log broadcast - go func() { - log := internal.Log{Address: address, Topics: []common.Hash{topic}, Data: []byte("event data")} - manager.BroadcastLog(internal.EventKey{Address: address, Topic: topic}, log) - }() - - // Receive logs - go func() { - for log := range ch { - fmt.Println("Received log:", log) - } - }() - - // Unsubscribe - if err := manager.Unsubscribe(address, topic, ch); err != nil { - fmt.Println("Unsubscribe failed:", err) - } -} -``` - ---- - -## **Testing** - -### **Run Tests** -Execute unit tests with race detection: -```bash -go test -race ./subscription_manager -``` - -### **Test Coverage** -- Thread-safe operations. -- Subscription and unsubscription behavior. -- Log broadcasting accuracy. -- Cache invalidation and consistency. diff --git a/lib/sentinel/subscription_manager/subscription_manager.go b/lib/sentinel/subscription_manager/subscription_manager.go deleted file mode 100644 index 18f5bcaeb..000000000 --- a/lib/sentinel/subscription_manager/subscription_manager.go +++ /dev/null @@ -1,245 +0,0 @@ -// File: subscription_manager/subscription_manager.go -package subscription_manager - -import ( - "errors" - "sync" - "time" - - "github.com/smartcontractkit/chainlink-testing-framework/lib/sentinel/internal" - - "github.com/ethereum/go-ethereum/common" -) - -// SubscriptionManager manages subscriptions for a specific chain. -type SubscriptionManager struct { - registry map[internal.EventKey][]chan internal.Log - registryMutex sync.RWMutex - logger internal.Logger - chainID int64 - addressTopicCache map[common.Address][]common.Hash - cacheInitialized bool - cacheMutex sync.RWMutex - channelBufferSize int - - closing bool // Indicates if the manager is shutting down - activeSends int // Tracks active sends in BroadcastLog - cond *sync.Cond // Used to coordinate between BroadcastLog and Close -} - -// NewSubscriptionManager initializes a new SubscriptionManager. -func NewSubscriptionManager(logger internal.Logger, chainID int64) *SubscriptionManager { - mu := &sync.Mutex{} - - return &SubscriptionManager{ - registry: make(map[internal.EventKey][]chan internal.Log), - logger: logger, - chainID: chainID, - channelBufferSize: 3, - cond: sync.NewCond(mu), - } -} - -// Subscribe registers a new subscription and returns a channel for events. -func (sm *SubscriptionManager) Subscribe(address common.Address, topic common.Hash) (chan internal.Log, error) { - if address == (common.Address{}) { - sm.logger.Warn("Attempted to subscribe with an empty address") - return nil, errors.New("address cannot be empty") - } - if topic == (common.Hash{}) { - sm.logger.Warn("Attempted to subscribe with an empty topic") - return nil, errors.New("topic cannot be empty") - } - - sm.registryMutex.Lock() - defer sm.registryMutex.Unlock() - - eventKey := internal.EventKey{Address: address, Topic: topic} - newChan := make(chan internal.Log, sm.channelBufferSize) - sm.registry[eventKey] = append(sm.registry[eventKey], newChan) - - sm.invalidateCache() - - sm.logger.Infof("ChainID=%d Address=%s Topic=%s SubscriberCount=%d New subscription added", - sm.chainID, address.Hex(), topic.Hex(), len(sm.registry[eventKey])) - - return newChan, nil -} - -// Unsubscribe removes a subscription and closes the channel. -func (sm *SubscriptionManager) Unsubscribe(address common.Address, topic common.Hash, ch chan internal.Log) error { - sm.registryMutex.Lock() - defer sm.registryMutex.Unlock() - - eventKey := internal.EventKey{Address: address, Topic: topic} - subscribers, exists := sm.registry[eventKey] - if !exists { - sm.logger.Warnf("ChainID=%d Address=%s Topic=%s Attempted to unsubscribe from a non-existent EventKey", - sm.chainID, address.Hex(), topic.Hex()) - return errors.New("event key does not exist") - } - - found := false // Flag to track if the subscriber was found - - for i, subscriber := range subscribers { - if subscriber == ch { - // Remove the subscriber from the list - sm.registry[eventKey] = append(subscribers[:i], subscribers[i+1:]...) - sm.logger.Infof("ChainID=%d Address=%s Topic=%s RemainingSubscribers=%d Subscription removed", - sm.chainID, address.Hex(), topic.Hex(), len(sm.registry[eventKey])) - found = true - break - } - } - - if !found { - // Subscriber channel was not found in the registry - sm.logger.Warnf("ChainID=%d Address=%s Topic=%s Attempted to unsubscribe a non-existent subscriber", - sm.chainID, address.Hex(), topic.Hex()) - return errors.New("subscriber channel not found") - } - - if len(sm.registry[eventKey]) == 0 { - // Clean up the map if there are no more subscribers - delete(sm.registry, eventKey) - sm.logger.Debugf("ChainID=%d Address=%s Topic=%s No remaining subscribers, removing EventKey from registry", - sm.chainID, address.Hex(), topic.Hex()) - } - - sm.cond.L.Lock() - for sm.activeSends > 0 { - sm.cond.Wait() // Wait for active broadcasts to complete - } - sm.cond.L.Unlock() - - close(ch) // Safely close the channel - sm.logger.Infof("ChainID=%d Address=%s Topic=%s Subscription removed", sm.chainID, address.Hex(), topic.Hex()) - - sm.invalidateCache() - return nil -} - -// BroadcastLog sends the log event to all relevant subscribers. -func (sm *SubscriptionManager) BroadcastLog(eventKey internal.EventKey, log internal.Log) { - sm.registryMutex.RLock() - subscribers, exists := sm.registry[eventKey] - sm.registryMutex.RUnlock() - - if !exists { - return - } - - var wg sync.WaitGroup - for _, ch := range subscribers { - sm.cond.L.Lock() - if sm.closing { - // If the manager is closing, skip sending logs - sm.cond.L.Unlock() - return - } - sm.activeSends++ - sm.cond.L.Unlock() - wg.Add(1) - go func(ch chan internal.Log) { - defer func() { - sm.cond.L.Lock() - sm.activeSends-- - sm.cond.Broadcast() // Notify Close() when all sends are done - sm.cond.L.Unlock() - wg.Done() - }() - select { - case ch <- log: - case <-time.After(100 * time.Millisecond): // Prevent blocking forever - sm.logger.Warnf("ChainID=%d Log broadcast to channel timed out", sm.chainID) - } - }(ch) - } - wg.Wait() // Wait for all sends to complete before returning - sm.logger.Debugf("ChainID=%d Address=%s Topic=%s Log broadcasted to all subscribers", - sm.chainID, eventKey.Address.Hex(), eventKey.Topic.Hex()) -} - -// GetAddressesAndTopics retrieves all unique addresses and their associated topics. -// Implements caching: caches the result after the first call and invalidates it upon subscription changes. -// Returns a map where each key is an address and the value is a slice of topics. -func (sm *SubscriptionManager) GetAddressesAndTopics() map[common.Address][]common.Hash { - sm.cacheMutex.RLock() - if sm.cacheInitialized { - defer sm.cacheMutex.RUnlock() - return sm.addressTopicCache - } - sm.cacheMutex.RUnlock() - - sm.registryMutex.RLock() - defer sm.registryMutex.RUnlock() - - addressTopicMap := make(map[common.Address]map[common.Hash]struct{}) - - for eventKey := range sm.registry { - topicSet, exists := addressTopicMap[eventKey.Address] - if !exists { - topicSet = make(map[common.Hash]struct{}) - addressTopicMap[eventKey.Address] = topicSet - } - topicSet[eventKey.Topic] = struct{}{} - } - - result := make(map[common.Address][]common.Hash) - for addr, topics := range addressTopicMap { - topicList := make([]common.Hash, 0, len(topics)) - for topic := range topics { - topicList = append(topicList, topic) - } - result[addr] = topicList - } - - // Update cache - sm.cacheMutex.Lock() - sm.addressTopicCache = result - sm.cacheInitialized = true - sm.cacheMutex.Unlock() - - sm.logger.Debugf("ChainID=%d UniqueAddresses=%d Cached address-topic pairs", - sm.chainID, len(sm.addressTopicCache)) - - return sm.addressTopicCache -} - -// invalidateCache invalidates the cached addresses and topics. -func (sm *SubscriptionManager) invalidateCache() { - sm.cacheMutex.Lock() - sm.cacheInitialized = false - sm.addressTopicCache = nil - sm.cacheMutex.Unlock() - - sm.logger.Debugf("ChainID=%d Cache invalidated due to subscription change", sm.chainID) -} - -// Close gracefully shuts down the SubscriptionManager by closing all subscriber channels. -func (sm *SubscriptionManager) Close() { - sm.registryMutex.Lock() - sm.closing = true // Signal that the manager is closing - sm.registryMutex.Unlock() - - // Wait for all active sends to complete - sm.cond.L.Lock() - for sm.activeSends > 0 { - sm.cond.Wait() - } - sm.cond.L.Unlock() - - sm.registryMutex.Lock() - defer sm.registryMutex.Unlock() - - for eventKey, subscribers := range sm.registry { - for _, ch := range subscribers { - close(ch) - } - delete(sm.registry, eventKey) - } - - sm.invalidateCache() - - sm.logger.Infof("ChainID=%d SubscriptionManager closed, all subscriber channels have been closed", sm.chainID) -} diff --git a/lib/sentinel/subscription_manager/subscription_manager_test.go b/lib/sentinel/subscription_manager/subscription_manager_test.go deleted file mode 100644 index 78aad2b52..000000000 --- a/lib/sentinel/subscription_manager/subscription_manager_test.go +++ /dev/null @@ -1,587 +0,0 @@ -// File: subscription_manager/subscription_manager_test.go -package subscription_manager - -import ( - "fmt" - "testing" - - "github.com/ethereum/go-ethereum/common" - "github.com/smartcontractkit/chainlink-testing-framework/lib/sentinel/internal" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -// setupSubscriptionManager initializes a SubscriptionManager with a MockLogger for testing. -func setupSubscriptionManager() *SubscriptionManager { - mockLogger := internal.NewMockLogger() - return NewSubscriptionManager(mockLogger, 1) -} - -func TestSubscriptionManager_Subscribe(t *testing.T) { - manager := setupSubscriptionManager() - - address := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678") - topic := common.HexToHash("0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd") - - // Reset logs before Subscribe operation - mockLogger := manager.logger.(*internal.MockLogger) - mockLogger.Reset() - - // Valid subscription - ch, err := manager.Subscribe(address, topic) - require.NoError(t, err) - assert.NotNil(t, ch) - - // Assert the expected log message after Subscribe - expectedCacheLog := "ChainID=1 Cache invalidated due to subscription change" - expectedSubscriptionLog := fmt.Sprintf( - "ChainID=1 Address=%s Topic=%s SubscriberCount=1 New subscription added", - address.Hex(), - topic.Hex(), - ) - assert.True(t, mockLogger.ContainsLog(expectedCacheLog), "Expected cache invalidation log") - assert.True(t, mockLogger.ContainsLog(expectedSubscriptionLog), "Expected log message for valid subscription") - - // Reset logs before invalid subscription attempts - mockLogger.Reset() - - // Invalid subscription with empty address - _, err = manager.Subscribe(common.Address{}, topic) - assert.Error(t, err) - expectedWarn1 := "Attempted to subscribe with an empty address" - assert.True(t, mockLogger.ContainsLog(expectedWarn1), "Expected warning for empty address") - - // Reset logs before next invalid subscription - mockLogger.Reset() - - // Invalid subscription with empty topic - _, err = manager.Subscribe(address, common.Hash{}) - assert.Error(t, err) - expectedWarn2 := "Attempted to subscribe with an empty topic" - assert.True(t, mockLogger.ContainsLog(expectedWarn2), "Expected warning for empty topic") - - // Check registry state - manager.registryMutex.RLock() - defer manager.registryMutex.RUnlock() - assert.Len(t, manager.registry, 1, "Registry should contain one event key") - assert.Len(t, manager.registry[internal.EventKey{Address: address, Topic: topic}], 1, "EventKey should have one subscriber") -} - -func TestSubscriptionManager_MultipleSubscribers(t *testing.T) { - manager := setupSubscriptionManager() - - address := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678") - topic := common.HexToHash("0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd") - eventKey := internal.EventKey{Address: address, Topic: topic} - - // Reset logs before subscriptions - mockLogger := manager.logger.(*internal.MockLogger) - mockLogger.Reset() - - // Subscribe first consumer - ch1, err := manager.Subscribe(address, topic) - require.NoError(t, err) - - // Assert log for first subscription - expectedSubscriptionLog1 := fmt.Sprintf( - "ChainID=1 Address=%s Topic=%s SubscriberCount=1 New subscription added", - address.Hex(), - topic.Hex(), - ) - assert.True(t, mockLogger.ContainsLog(expectedSubscriptionLog1), "Expected log message for first subscription") - - // Reset logs before second subscription - mockLogger.Reset() - - // Subscribe second consumer - ch2, err := manager.Subscribe(address, topic) - require.NoError(t, err) - - // Assert log for second subscription - expectedSubscriptionLog2 := fmt.Sprintf( - "ChainID=1 Address=%s Topic=%s SubscriberCount=2 New subscription added", - address.Hex(), - topic.Hex(), - ) - assert.True(t, mockLogger.ContainsLog(expectedSubscriptionLog2), "Expected log message for second subscription") - - // Reset logs before broadcasting - mockLogger.Reset() - - // Verify that the list of channels grows - manager.registryMutex.RLock() - subscribers := manager.registry[eventKey] - manager.registryMutex.RUnlock() - assert.Len(t, subscribers, 2, "There should be two channels subscribed to the EventKey") - - // Broadcast a log and ensure both channels receive it - logEvent := internal.Log{ - BlockNumber: 1, - TxHash: common.HexToHash("0x1234"), - Data: []byte("log data"), - Address: address, - Topics: []common.Hash{topic}, - Index: 0, - } - - manager.BroadcastLog(eventKey, logEvent) - - receivedLog1 := <-ch1 - receivedLog2 := <-ch2 - - assert.Equal(t, logEvent, receivedLog1, "Subscriber 1 should receive the log") - assert.Equal(t, logEvent, receivedLog2, "Subscriber 2 should receive the log") - - // Assert broadcast log message - expectedBroadcastLog := fmt.Sprintf( - "ChainID=1 Address=%s Topic=%s Log broadcasted to all subscribers", - address.Hex(), - topic.Hex(), - ) - assert.True(t, mockLogger.ContainsLog(expectedBroadcastLog), "Expected broadcast log message") -} - -func TestSubscriptionManager_Unsubscribe(t *testing.T) { - manager := setupSubscriptionManager() - - address := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678") - topic := common.HexToHash("0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd") - - // Subscribe to an event - ch, err := manager.Subscribe(address, topic) - require.NoError(t, err) - assert.NotNil(t, ch) - - // Reset logs before Unsubscribe operation - mockLogger := manager.logger.(*internal.MockLogger) - mockLogger.Reset() - - // Unsubscribe existing channel - err = manager.Unsubscribe(address, topic, ch) - assert.NoError(t, err) - - // Assert the expected log message after Unsubscribe - expectedRemoveLog := fmt.Sprintf( - "ChainID=1 Address=%s Topic=%s RemainingSubscribers=0 Subscription removed", - address.Hex(), - topic.Hex(), - ) - assert.True(t, mockLogger.ContainsLog(expectedRemoveLog), "Expected log message for unsubscribing") - - // Reset logs before attempting to unsubscribe again - mockLogger.Reset() - - // Try unsubscribing again (should fail) - err = manager.Unsubscribe(address, topic, ch) - assert.Error(t, err) - - // Assert the expected warning log message - expectedWarn := "Attempted to unsubscribe from a non-existent EventKey" - assert.True(t, mockLogger.ContainsLog(expectedWarn), "Expected warning for unsubscribing a non-existent subscriber") - - // Reset logs before unsubscribing a non-existent EventKey - mockLogger.Reset() - - // Unsubscribe non-existent event key - otherCh := make(chan internal.Log) - err = manager.Unsubscribe(address, topic, otherCh) - assert.Error(t, err) - - // Assert the expected warning log message - assert.True(t, mockLogger.ContainsLog(expectedWarn), "Expected warning for unsubscribing a non-existent subscriber") - - // Check registry state - manager.registryMutex.RLock() - defer manager.registryMutex.RUnlock() - assert.Len(t, manager.registry, 0, "Registry should be empty after unsubscribing") -} - -func TestSubscriptionManager_UnsubscribeSelective(t *testing.T) { - manager := setupSubscriptionManager() - - address := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678") - topic := common.HexToHash("0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd") - eventKey := internal.EventKey{Address: address, Topic: topic} - - // Subscribe multiple consumers to the same EventKey - mockLogger := manager.logger.(*internal.MockLogger) - - // Reset logs before subscriptions - mockLogger.Reset() - - ch1, err := manager.Subscribe(address, topic) - require.NoError(t, err) - - // Assert log for first subscription - expectedSubscriptionLog1 := fmt.Sprintf( - "ChainID=1 Address=%s Topic=%s SubscriberCount=1 New subscription added", - address.Hex(), - topic.Hex(), - ) - assert.True(t, mockLogger.ContainsLog(expectedSubscriptionLog1), "Expected log message for first subscription") - - // Reset logs before second subscription - mockLogger.Reset() - - ch2, err := manager.Subscribe(address, topic) - require.NoError(t, err) - - // Assert log for second subscription - expectedSubscriptionLog2 := fmt.Sprintf( - "ChainID=1 Address=%s Topic=%s SubscriberCount=2 New subscription added", - address.Hex(), - topic.Hex(), - ) - assert.True(t, mockLogger.ContainsLog(expectedSubscriptionLog2), "Expected log message for second subscription") - - // Reset logs before Unsubscribe operation - mockLogger.Reset() - - // Unsubscribe one consumer and ensure the other remains - err = manager.Unsubscribe(address, topic, ch1) - require.NoError(t, err) - - // Assert log for selective unsubscription - expectedSelectiveRemoveLog := fmt.Sprintf( - "ChainID=1 Address=%s Topic=%s RemainingSubscribers=1 Subscription removed", - address.Hex(), - topic.Hex(), - ) - assert.True(t, mockLogger.ContainsLog(expectedSelectiveRemoveLog), "Expected log message for selective unsubscription") - - // Check registry state - manager.registryMutex.RLock() - subscribers := manager.registry[eventKey] - manager.registryMutex.RUnlock() - - assert.Len(t, subscribers, 1, "There should be one remaining channel after unsubscription") - assert.Equal(t, ch2, subscribers[0], "The remaining channel should be the second subscriber") - - // Reset logs before final Unsubscribe operation - mockLogger.Reset() - - // Unsubscribe the last consumer and ensure the registry is cleaned up - err = manager.Unsubscribe(address, topic, ch2) - require.NoError(t, err) - - // Assert log for final unsubscription - expectedFinalRemoveLog := fmt.Sprintf( - "ChainID=1 Address=%s Topic=%s RemainingSubscribers=0 Subscription removed", - address.Hex(), - topic.Hex(), - ) - assert.True(t, mockLogger.ContainsLog(expectedFinalRemoveLog), "Expected log message for final unsubscription") - - // Check registry state - manager.registryMutex.RLock() - _, exists := manager.registry[eventKey] - manager.registryMutex.RUnlock() - - assert.False(t, exists, "The EventKey should no longer exist in the registry after the last subscriber unsubscribes") -} - -func TestSubscriptionManager_BroadcastLog(t *testing.T) { - manager := setupSubscriptionManager() - - address := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678") - topic := common.HexToHash("0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd") - eventKey := internal.EventKey{Address: address, Topic: topic} - - // Subscribe to an event - ch, err := manager.Subscribe(address, topic) - require.NoError(t, err) - assert.NotNil(t, ch) - - // Reset logs before BroadcastLog operation - mockLogger := manager.logger.(*internal.MockLogger) - mockLogger.Reset() - - logEvent := internal.Log{ - BlockNumber: 1, - TxHash: common.HexToHash("0x1234"), - Data: []byte("log data"), - Address: address, - Topics: []common.Hash{topic}, - Index: 0, - } - - // Broadcast log event - manager.BroadcastLog(eventKey, logEvent) - - // Verify the channel received the event - receivedLog := <-ch - assert.Equal(t, logEvent, receivedLog, "Subscriber should receive the broadcasted log") - - // Assert broadcast log message - expectedBroadcastLog := fmt.Sprintf( - "ChainID=1 Address=%s Topic=%s Log broadcasted to all subscribers", - address.Hex(), - topic.Hex(), - ) - assert.True(t, mockLogger.ContainsLog(expectedBroadcastLog), "Expected log message for broadcasting") -} - -func TestSubscriptionManager_BroadcastToAllSubscribers(t *testing.T) { - manager := setupSubscriptionManager() - - address := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678") - topic := common.HexToHash("0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd") - eventKey := internal.EventKey{Address: address, Topic: topic} - - // Subscribe multiple consumers to the same EventKey - mockLogger := manager.logger.(*internal.MockLogger) - - // Reset logs before subscriptions - mockLogger.Reset() - - ch1, err := manager.Subscribe(address, topic) - require.NoError(t, err) - - // Assert log for first subscription - expectedSubscriptionLog1 := fmt.Sprintf( - "ChainID=1 Address=%s Topic=%s SubscriberCount=1 New subscription added", - address.Hex(), - topic.Hex(), - ) - assert.True(t, mockLogger.ContainsLog(expectedSubscriptionLog1), "Expected log message for first subscription") - - // Reset logs before second subscription - mockLogger.Reset() - - ch2, err := manager.Subscribe(address, topic) - require.NoError(t, err) - - // Assert log for second subscription - expectedSubscriptionLog2 := fmt.Sprintf( - "ChainID=1 Address=%s Topic=%s SubscriberCount=2 New subscription added", - address.Hex(), - topic.Hex(), - ) - assert.True(t, mockLogger.ContainsLog(expectedSubscriptionLog2), "Expected log message for second subscription") - - // Reset logs before third subscription - mockLogger.Reset() - - ch3, err := manager.Subscribe(address, topic) - require.NoError(t, err) - - // Assert log for third subscription - expectedSubscriptionLog3 := fmt.Sprintf( - "ChainID=1 Address=%s Topic=%s SubscriberCount=3 New subscription added", - address.Hex(), - topic.Hex(), - ) - assert.True(t, mockLogger.ContainsLog(expectedSubscriptionLog3), "Expected log message for third subscription") - - // Reset logs before broadcasting - mockLogger.Reset() - - // Broadcast a log and ensure all channels receive it - logEvent := internal.Log{ - BlockNumber: 2, - TxHash: common.HexToHash("0x5678"), - Data: []byte("another log data"), - Address: address, - Topics: []common.Hash{topic}, - Index: 0, - } - - manager.BroadcastLog(eventKey, logEvent) - - receivedLog1 := <-ch1 - receivedLog2 := <-ch2 - receivedLog3 := <-ch3 - - assert.Equal(t, logEvent, receivedLog1, "Subscriber 1 should receive the log") - assert.Equal(t, logEvent, receivedLog2, "Subscriber 2 should receive the log") - assert.Equal(t, logEvent, receivedLog3, "Subscriber 3 should receive the log") - - // Assert broadcast log message - expectedBroadcastLog := fmt.Sprintf( - "ChainID=1 Address=%s Topic=%s Log broadcasted to all subscribers", - address.Hex(), - topic.Hex(), - ) - assert.True(t, mockLogger.ContainsLog(expectedBroadcastLog), "Expected log message for broadcasting to all subscribers") -} - -func TestSubscriptionManager_GetAddressesAndTopics(t *testing.T) { - manager := setupSubscriptionManager() - - address1 := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678") - topic1 := common.HexToHash("0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd") - - address2 := common.HexToAddress("0xabcdefabcdefabcdefabcdefabcdefabcdefabcdef") - topic2 := common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef") - - // Reset logs before subscriptions - mockLogger := manager.logger.(*internal.MockLogger) - mockLogger.Reset() - - _, err := manager.Subscribe(address1, topic1) - require.NoError(t, err) - - // Assert log for first subscription - expectedSubscriptionLog1 := fmt.Sprintf( - "ChainID=1 Address=%s Topic=%s SubscriberCount=1 New subscription added", - address1.Hex(), - topic1.Hex(), - ) - assert.True(t, mockLogger.ContainsLog(expectedSubscriptionLog1), "Expected log message for first subscription") - - // Reset logs before second subscription - mockLogger.Reset() - - _, err = manager.Subscribe(address2, topic2) - require.NoError(t, err) - - // Assert log for second subscription - expectedSubscriptionLog2 := fmt.Sprintf( - "ChainID=1 Address=%s Topic=%s SubscriberCount=1 New subscription added", - address2.Hex(), - topic2.Hex(), - ) - assert.True(t, mockLogger.ContainsLog(expectedSubscriptionLog2), "Expected log message for second subscription") - - // Fetch addresses and topics - result := manager.GetAddressesAndTopics() - - // Verify addresses and topics - assert.Contains(t, result, address1, "Address1 should be in the cache") - assert.Contains(t, result, address2, "Address2 should be in the cache") - assert.ElementsMatch(t, result[address1], []common.Hash{topic1}, "Cache should contain topic1 for address1") - assert.ElementsMatch(t, result[address2], []common.Hash{topic2}, "Cache should contain topic2 for address2") - - // Assert cache log message - expectedCacheLog := fmt.Sprintf( - "ChainID=1 UniqueAddresses=%d Cached address-topic pairs", - len(result), - ) - assert.True(t, mockLogger.ContainsLog(expectedCacheLog), "Expected cache initialization log message") -} - -func TestSubscriptionManager_CacheInvalidation(t *testing.T) { - manager := setupSubscriptionManager() - - address1 := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678") - topic1 := common.HexToHash("0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd") - - address2 := common.HexToAddress("0xabcdefabcdefabcdefabcdefabcdefabcdefabcdef") - topic2 := common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef") - - // Reset logs before first subscription - mockLogger := manager.logger.(*internal.MockLogger) - mockLogger.Reset() - - // Subscribe to an initial event - _, err := manager.Subscribe(address1, topic1) - require.NoError(t, err) - - // Assert log for first subscription - expectedSubscriptionLog1 := fmt.Sprintf( - "ChainID=1 Address=%s Topic=%s SubscriberCount=1 New subscription added", - address1.Hex(), - topic1.Hex(), - ) - assert.True(t, mockLogger.ContainsLog(expectedSubscriptionLog1), "Expected log message for first subscription") - - // Assert cache initialization log - expectedCacheLog := "ChainID=1 Cache invalidated due to subscription change" - assert.True(t, mockLogger.ContainsLog(expectedCacheLog), "Expected cache invalidation log after first subscription") - - // Reset logs before second subscription - mockLogger.Reset() - - // Add another subscription and ensure cache invalidation - ch, err := manager.Subscribe(address2, topic2) - require.NoError(t, err) - - // Assert log for second subscription - expectedSubscriptionLog2 := fmt.Sprintf( - "ChainID=1 Address=%s Topic=%s SubscriberCount=1 New subscription added", - address2.Hex(), - topic2.Hex(), - ) - assert.True(t, mockLogger.ContainsLog(expectedSubscriptionLog2), "Expected log message for second subscription") - - // Assert cache invalidation log - assert.True(t, mockLogger.ContainsLog(expectedCacheLog), "Expected cache invalidation log after second subscription") - - // Check updated cache - updatedCache := manager.GetAddressesAndTopics() - require.Contains(t, updatedCache, address1, "Address1 should still be in the cache") - require.Contains(t, updatedCache, address2, "Address2 should now be in the cache") - assert.ElementsMatch(t, updatedCache[address1], []common.Hash{topic1}, "Cache should still contain topic1 for address1") - assert.ElementsMatch(t, updatedCache[address2], []common.Hash{topic2}, "Cache should contain topic2 for address2") - - // Reset logs before adding an extra subscription - mockLogger.Reset() - - // Add an extra subscription for address1/topic1 - _, err = manager.Subscribe(address1, topic1) - require.NoError(t, err) - - // Assert log for extra subscription - expectedSubscriptionLog3 := fmt.Sprintf( - "ChainID=1 Address=%s Topic=%s SubscriberCount=2 New subscription added", - address1.Hex(), - topic1.Hex(), - ) - assert.True(t, mockLogger.ContainsLog(expectedSubscriptionLog3), "Expected log message for extra subscription") - - // Reset logs before Unsubscribe operation - mockLogger.Reset() - - // Unsubscribe from address2/topic2 - err = manager.Unsubscribe(address2, topic2, ch) - require.NoError(t, err) - - // Assert log for unsubscription - expectedRemoveLog := fmt.Sprintf( - "ChainID=1 Address=%s Topic=%s RemainingSubscribers=0 Subscription removed", - address2.Hex(), - topic2.Hex(), - ) - assert.True(t, mockLogger.ContainsLog(expectedRemoveLog), "Expected log message for unsubscribing address2/topic2") - - // Assert cache invalidation log - assert.True(t, mockLogger.ContainsLog(expectedCacheLog), "Expected cache invalidation log after unsubscription") - - // Check final cache - finalCache := manager.GetAddressesAndTopics() - require.Contains(t, finalCache, address1, "Address1 should still be in the cache") - assert.ElementsMatch(t, finalCache[address1], []common.Hash{topic1}, "Cache should still contain topic1 for address1") - require.NotContains(t, finalCache[address2], topic2, "Topic2 should be removed for address2 after unsubscription") -} - -func TestSubscriptionManager_Close(t *testing.T) { - manager := setupSubscriptionManager() - - address := common.HexToAddress("0x1234567890abcdef1234567890abcdef12345678") - topic := common.HexToHash("0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd") - - // Subscribe to an event - ch, err := manager.Subscribe(address, topic) - require.NoError(t, err) - assert.NotNil(t, ch) - - // Reset logs before Close operation - mockLogger := manager.logger.(*internal.MockLogger) - mockLogger.Reset() - - // Close the SubscriptionManager - manager.Close() - - // Verify channel is closed - _, open := <-ch - assert.False(t, open, "Channel should be closed after Close()") - - // Assert close log message - expectedCloseLog := "ChainID=1 SubscriptionManager closed, all subscriber channels have been closed" - assert.True(t, mockLogger.ContainsLog(expectedCloseLog), "Expected close log message") - - // Verify registry is empty - manager.registryMutex.RLock() - defer manager.registryMutex.RUnlock() - assert.Len(t, manager.registry, 0, "Registry should be empty after Close()") -}