Skip to content

Commit

Permalink
basic boilerplate that connects loader to base log poller
Browse files Browse the repository at this point in the history
  • Loading branch information
EasterTheBunny committed Dec 4, 2024
1 parent c3a71b0 commit d60fe71
Show file tree
Hide file tree
Showing 5 changed files with 292 additions and 27 deletions.
1 change: 1 addition & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ packages:
github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller:
interfaces:
RPCClient:
EventSaver:
58 changes: 31 additions & 27 deletions pkg/solana/logpoller/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,33 +53,7 @@ func TestEncodedLogCollector_ParseSingleEvent(t *testing.T) {
require.NoError(t, collector.Close())
})

slot := uint64(42)
sig := solana.Signature{2, 1, 4, 2}
blockHeight := uint64(21)

client.EXPECT().GetLatestBlockhash(mock.Anything, rpc.CommitmentFinalized).Return(&rpc.GetLatestBlockhashResult{
RPCContext: rpc.RPCContext{
Context: rpc.Context{
Slot: slot,
},
},
}, nil)

client.EXPECT().GetBlocks(mock.Anything, uint64(1), mock.MatchedBy(func(val *uint64) bool {
return val != nil && *val == slot
}), mock.Anything).Return(rpc.BlocksResult{slot}, nil)

client.EXPECT().GetBlockWithOpts(mock.Anything, slot, mock.Anything).Return(&rpc.GetBlockResult{
Transactions: []rpc.TransactionWithMeta{
{
Meta: &rpc.TransactionMeta{
LogMessages: messages,
},
},
},
Signatures: []solana.Signature{sig},
BlockHeight: &blockHeight,
}, nil).Twice()
clientExpectSingleEvent(client)

tests.AssertEventually(t, func() bool {
return parser.Called()
Expand Down Expand Up @@ -364,3 +338,33 @@ func (p *testParser) Called() bool {
func (p *testParser) Count() uint64 {
return p.count.Load()
}

func clientExpectSingleEvent(client *mocks.RPCClient) {
slot := uint64(42)
sig := solana.Signature{2, 1, 4, 2}
blockHeight := uint64(21)

client.EXPECT().GetLatestBlockhash(mock.Anything, rpc.CommitmentFinalized).Return(&rpc.GetLatestBlockhashResult{
RPCContext: rpc.RPCContext{
Context: rpc.Context{
Slot: slot,
},
},
}, nil)

client.EXPECT().GetBlocks(mock.Anything, uint64(1), mock.MatchedBy(func(val *uint64) bool {
return val != nil && *val == slot
}), mock.Anything).Return(rpc.BlocksResult{slot}, nil)

client.EXPECT().GetBlockWithOpts(mock.Anything, slot, mock.Anything).Return(&rpc.GetBlockResult{
Transactions: []rpc.TransactionWithMeta{
{
Meta: &rpc.TransactionMeta{
LogMessages: messages,
},
},
},
Signatures: []solana.Signature{sig},
BlockHeight: &blockHeight,
}, nil).Twice()
}
81 changes: 81 additions & 0 deletions pkg/solana/logpoller/mocks/event_saver.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

121 changes: 121 additions & 0 deletions pkg/solana/logpoller/poller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package logpoller

import (
"context"
"crypto/sha256"
"encoding/base64"
"fmt"
"strings"
"sync"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
)

type EventSaver interface {
SaveEvent(evt ProgramEvent) error
}

type Service struct {
// dependencies
lggr logger.Logger
saver EventSaver

// internal
loader *EncodedLogCollector
mu sync.RWMutex
discriminators map[string]struct{}
chSave chan ProgramEvent

// service state management
services.Service
engine *services.Engine
}

func New(client RPCClient, lggr logger.Logger, saver EventSaver) *Service {
p := &Service{
saver: saver,
discriminators: make(map[string]struct{}),
chSave: make(chan ProgramEvent),
}

p.Service, p.engine = services.Config{
Name: "LogPollerService",
NewSubServices: func(lggr logger.Logger) []services.Service {
p.loader = NewEncodedLogCollector(client, p, lggr)

return []services.Service{p.loader}
},
Start: p.start,
Close: p.close,
}.NewServiceEngine(lggr)
p.lggr = p.engine.SugaredLogger

return p
}

func (p *Service) AddFilter(name string) error {
p.mu.Lock()
defer p.mu.Unlock()

hash := sha256.New()
hash.Write([]byte(fmt.Sprintf("event:%s", name)))

p.discriminators[string(hash.Sum(nil)[:8])] = struct{}{}

return nil
}

func (p *Service) start(_ context.Context) error {
p.engine.Go(p.runSaveProcess)

return nil
}

func (p *Service) close() error {
return nil
}

func (p *Service) Process(evt ProgramEvent) error {
encodedData := strings.TrimSpace(evt.Data)
data, err := base64.StdEncoding.DecodeString(encodedData)
if err != nil {
// don't return an error here, just log it
// returning an error will trigger a retry
p.lggr.Errorw("failed to base64 decode data", "err", err)

return nil
}

// silently discard events that don't match any expected event signatures
if !p.dataMatchesEventSig(data[:8]) {
return nil
}

p.chSave <- evt

return nil
}

func (p *Service) runSaveProcess(ctx context.Context) {
// this process should ensure ordered batches before saving atomically to the database
for {
select {
case <-ctx.Done():
return
case evt := <-p.chSave:
if err := p.saver.SaveEvent(evt); err != nil {
p.lggr.Errorw("failed to save event", "err", err)
}
}
}
}

func (p *Service) dataMatchesEventSig(sig []byte) bool {
p.mu.RLock()
defer p.mu.RUnlock()

_, ok := p.discriminators[string(sig)]

return ok
}
58 changes: 58 additions & 0 deletions pkg/solana/logpoller/poller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package logpoller_test

import (
"sync/atomic"
"testing"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller/mocks"
)

func TestLogPoller_ProcessAndSave(t *testing.T) {
t.Parallel()

client := new(mocks.RPCClient)
saver := new(testSaver)
poller := logpoller.New(client, logger.Nop(), saver)

require.NoError(t, poller.AddFilter("TestEvent"))

clientExpectSingleEvent(client)

require.NoError(t, poller.Start(tests.Context(t)))

t.Cleanup(func() {
require.NoError(t, poller.Close())
})

tests.AssertEventually(t, func() bool {
return saver.Called()
})

client.AssertExpectations(t)
}

type testSaver struct {
called atomic.Bool
count atomic.Uint64
}

func (s *testSaver) SaveEvent(event logpoller.ProgramEvent) error {
s.called.Store(true)
s.count.Store(s.count.Load() + 1)

return nil
}

func (s *testSaver) Called() bool {
return s.called.Load()
}

func (s *testSaver) Count() uint64 {
return s.count.Load()
}

0 comments on commit d60fe71

Please sign in to comment.