diff --git a/.tool-versions b/.tool-versions index c4111ddca..04823174d 100644 --- a/.tool-versions +++ b/.tool-versions @@ -2,7 +2,7 @@ nodejs 18.20.2 yarn 1.22.19 rust 1.59.0 golang 1.23.3 -golangci-lint 1.60.1 +golangci-lint 1.61.0 actionlint 1.6.22 shellcheck 0.8.0 helm 3.9.4 diff --git a/go.mod b/go.mod index 4fcaabc0a..04459180c 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,6 @@ require ( github.com/hashicorp/go-plugin v1.6.2 github.com/jackc/pgx/v4 v4.18.3 github.com/jpillora/backoff v1.0.0 - github.com/lib/pq v1.10.9 github.com/pelletier/go-toml/v2 v2.2.0 github.com/prometheus/client_golang v1.17.0 github.com/smartcontractkit/chainlink-common v0.4.2-0.20250116214855-f49c5c27db51 @@ -76,6 +75,7 @@ require ( github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/leodido/go-urn v1.2.0 // indirect + github.com/lib/pq v1.10.9 // indirect github.com/linkedin/goavro/v2 v2.12.0 // indirect github.com/logrusorgru/aurora v2.0.3+incompatible // indirect github.com/mailru/easyjson v0.7.7 // indirect diff --git a/integration-tests/smoke/event_loader_test.go b/integration-tests/smoke/event_loader_test.go index b3d001441..a209d3843 100644 --- a/integration-tests/smoke/event_loader_test.go +++ b/integration-tests/smoke/event_loader_test.go @@ -25,6 +25,7 @@ import ( contract "github.com/smartcontractkit/chainlink-solana/contracts/generated/log_read_test" "github.com/smartcontractkit/chainlink-solana/pkg/solana/client" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/config" "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller" "github.com/smartcontractkit/chainlink-solana/integration-tests/solclient" @@ -49,7 +50,8 @@ func TestEventLoader(t *testing.T) { require.NoError(t, err) rpcURL, wsURL := setupTestValidator(t, privateKey.PublicKey().String()) - rpcClient := rpc.New(rpcURL) + cl, rpcClient, err := client.NewTestClient(rpcURL, config.NewDefault(), 1*time.Second, logger.Nop()) + require.NoError(t, err) wsClient, err := ws.Connect(ctx, wsURL) require.NoError(t, err) @@ -62,7 +64,7 @@ func TestEventLoader(t *testing.T) { parser := &printParser{t: t} sender := newLogSender(t, rpcClient, wsClient) collector := logpoller.NewEncodedLogCollector( - rpcClient, + cl, parser, logger.Nop(), ) diff --git a/pkg/solana/chain.go b/pkg/solana/chain.go index 4492cb15d..9316b2981 100644 --- a/pkg/solana/chain.go +++ b/pkg/solana/chain.go @@ -28,16 +28,25 @@ import ( mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode" "github.com/smartcontractkit/chainlink-solana/pkg/solana/config" "github.com/smartcontractkit/chainlink-solana/pkg/solana/internal" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller" "github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor" "github.com/smartcontractkit/chainlink-solana/pkg/solana/txm" txmutils "github.com/smartcontractkit/chainlink-solana/pkg/solana/txm/utils" ) +type LogPoller interface { + Start(context.Context) error + Close() error + RegisterFilter(ctx context.Context, filter logpoller.Filter) error + UnregisterFilter(ctx context.Context, name string) error +} + type Chain interface { types.ChainService ID() string Config() config.Config + LogPoller() LogPoller TxManager() TxManager // Reader returns a new Reader from the available list of nodes (if there are multiple, it will randomly select one) Reader() (client.Reader, error) @@ -90,6 +99,7 @@ type chain struct { services.StateMachine id string cfg *config.TOMLConfig + lp LogPoller txm *txm.Txm balanceMonitor services.Service lggr logger.Logger @@ -312,6 +322,8 @@ func newChain(id string, cfg *config.TOMLConfig, ks core.Keystore, lggr logger.L bc = internal.NewLoader[monitor.BalanceClient](func() (monitor.BalanceClient, error) { return ch.multiNode.SelectRPC() }) } + // TODO: import typeProvider function from codec package and pass to constructor + ch.lp = logpoller.New(logger.Sugared(logger.Named(lggr, "LogPoller")), logpoller.NewORM(ch.ID(), ds, lggr), ch.multiClient) ch.txm = txm.NewTxm(ch.id, tc, sendTx, cfg, ks, lggr) ch.balanceMonitor = monitor.NewBalanceMonitor(ch.id, cfg, lggr, ks, bc) return &ch, nil @@ -401,6 +413,10 @@ func (c *chain) Config() config.Config { return c.cfg } +func (c *chain) LogPoller() LogPoller { + return c.lp +} + func (c *chain) TxManager() TxManager { return c.txm } diff --git a/pkg/solana/client/client.go b/pkg/solana/client/client.go index 36f706446..e3f170783 100644 --- a/pkg/solana/client/client.go +++ b/pkg/solana/client/client.go @@ -41,6 +41,7 @@ type Reader interface { GetTransaction(ctx context.Context, txHash solana.Signature) (*rpc.GetTransactionResult, error) GetBlocks(ctx context.Context, startSlot uint64, endSlot *uint64) (rpc.BlocksResult, error) GetBlocksWithLimit(ctx context.Context, startSlot uint64, limit uint64) (*rpc.BlocksResult, error) + GetBlockWithOpts(context.Context, uint64, *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) GetBlock(ctx context.Context, slot uint64) (*rpc.GetBlockResult, error) GetSignaturesForAddressWithOpts(ctx context.Context, addr solana.PublicKey, opts *rpc.GetSignaturesForAddressOpts) ([]*rpc.TransactionSignature, error) } @@ -73,10 +74,10 @@ type Client struct { requestGroup *singleflight.Group } -func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration, log logger.Logger) (*Client, error) { - return &Client{ +// Return both the client and the underlying rpc client for testing +func NewTestClient(endpoint string, cfg config.Config, requestTimeout time.Duration, log logger.Logger) (*Client, *rpc.Client, error) { + rpcClient := Client{ url: endpoint, - rpc: rpc.New(endpoint), skipPreflight: cfg.SkipPreflight(), commitment: cfg.Commitment(), maxRetries: cfg.MaxRetries(), @@ -84,7 +85,14 @@ func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration, contextDuration: requestTimeout, log: log, requestGroup: &singleflight.Group{}, - }, nil + } + rpcClient.rpc = rpc.New(endpoint) + return &rpcClient, rpcClient.rpc, nil +} + +func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration, log logger.Logger) (*Client, error) { + rpcClient, _, err := NewTestClient(endpoint, cfg, requestTimeout, log) + return rpcClient, err } func (c *Client) latency(name string) func() { @@ -339,6 +347,15 @@ func (c *Client) GetLatestBlockHeight(ctx context.Context) (uint64, error) { return v.(uint64), err } +func (c *Client) GetBlockWithOpts(ctx context.Context, slot uint64, opts *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) { + // get block based on slot with custom options set + done := c.latency("get_block_with_opts") + defer done() + ctx, cancel := context.WithTimeout(ctx, c.txTimeout) + defer cancel() + return c.rpc.GetBlockWithOpts(ctx, slot, opts) +} + func (c *Client) GetBlock(ctx context.Context, slot uint64) (*rpc.GetBlockResult, error) { done := c.latency("get_block") defer done() diff --git a/pkg/solana/client/mocks/reader_writer.go b/pkg/solana/client/mocks/reader_writer.go index 25a5a29ea..232320005 100644 --- a/pkg/solana/client/mocks/reader_writer.go +++ b/pkg/solana/client/mocks/reader_writer.go @@ -257,6 +257,66 @@ func (_c *ReaderWriter_GetBlock_Call) RunAndReturn(run func(context.Context, uin return _c } +// GetBlockWithOpts provides a mock function with given fields: _a0, _a1, _a2 +func (_m *ReaderWriter) GetBlockWithOpts(_a0 context.Context, _a1 uint64, _a2 *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) { + ret := _m.Called(_a0, _a1, _a2) + + if len(ret) == 0 { + panic("no return value specified for GetBlockWithOpts") + } + + var r0 *rpc.GetBlockResult + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint64, *rpc.GetBlockOpts) (*rpc.GetBlockResult, error)); ok { + return rf(_a0, _a1, _a2) + } + if rf, ok := ret.Get(0).(func(context.Context, uint64, *rpc.GetBlockOpts) *rpc.GetBlockResult); ok { + r0 = rf(_a0, _a1, _a2) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*rpc.GetBlockResult) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, uint64, *rpc.GetBlockOpts) error); ok { + r1 = rf(_a0, _a1, _a2) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ReaderWriter_GetBlockWithOpts_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetBlockWithOpts' +type ReaderWriter_GetBlockWithOpts_Call struct { + *mock.Call +} + +// GetBlockWithOpts is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 uint64 +// - _a2 *rpc.GetBlockOpts +func (_e *ReaderWriter_Expecter) GetBlockWithOpts(_a0 interface{}, _a1 interface{}, _a2 interface{}) *ReaderWriter_GetBlockWithOpts_Call { + return &ReaderWriter_GetBlockWithOpts_Call{Call: _e.mock.On("GetBlockWithOpts", _a0, _a1, _a2)} +} + +func (_c *ReaderWriter_GetBlockWithOpts_Call) Run(run func(_a0 context.Context, _a1 uint64, _a2 *rpc.GetBlockOpts)) *ReaderWriter_GetBlockWithOpts_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(uint64), args[2].(*rpc.GetBlockOpts)) + }) + return _c +} + +func (_c *ReaderWriter_GetBlockWithOpts_Call) Return(_a0 *rpc.GetBlockResult, _a1 error) *ReaderWriter_GetBlockWithOpts_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *ReaderWriter_GetBlockWithOpts_Call) RunAndReturn(run func(context.Context, uint64, *rpc.GetBlockOpts) (*rpc.GetBlockResult, error)) *ReaderWriter_GetBlockWithOpts_Call { + _c.Call.Return(run) + return _c +} + // GetBlocks provides a mock function with given fields: ctx, startSlot, endSlot func (_m *ReaderWriter) GetBlocks(ctx context.Context, startSlot uint64, endSlot *uint64) (rpc.BlocksResult, error) { ret := _m.Called(ctx, startSlot, endSlot) diff --git a/pkg/solana/client/multi_client.go b/pkg/solana/client/multi_client.go index 83cdf45ec..76c6447dc 100644 --- a/pkg/solana/client/multi_client.go +++ b/pkg/solana/client/multi_client.go @@ -166,3 +166,12 @@ func (m *MultiClient) GetSignaturesForAddressWithOpts(ctx context.Context, addr return r.GetSignaturesForAddressWithOpts(ctx, addr, opts) } + +func (m *MultiClient) GetBlockWithOpts(ctx context.Context, slot uint64, opts *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) { + r, err := m.getClient() + if err != nil { + return nil, err + } + + return r.GetBlockWithOpts(ctx, slot, opts) +} diff --git a/pkg/solana/codec/solana.go b/pkg/solana/codec/solana.go index 08ff964a9..3323ced10 100644 --- a/pkg/solana/codec/solana.go +++ b/pkg/solana/codec/solana.go @@ -491,3 +491,36 @@ func saveDependency(refs *codecRefs, parent, child string) { refs.dependencies[parent] = append(deps, child) } +func NewIDLEventCodec(idl IDL, builder commonencodings.Builder) (commontypes.RemoteCodec, error) { + typeCodecs := make(commonencodings.LenientCodecFromTypeCodec) + refs := &codecRefs{ + builder: builder, + codecs: make(map[string]commonencodings.TypeCodec), + typeDefs: idl.Types, + dependencies: make(map[string][]string), + } + + for _, event := range idl.Events { + name, instCodec, err := asStruct(eventFieldsAsStandardFields(event.Fields), refs, event.Name, false, false) + if err != nil { + return nil, err + } + + typeCodecs[name] = instCodec + } + + return typeCodecs, nil +} + +func eventFieldsAsStandardFields(event []IdlEventField) []IdlField { + output := make([]IdlField, len(event)) + + for idx := range output { + output[idx] = IdlField{ + Name: event[idx].Name, + Type: event[idx].Type, + } + } + + return output +} diff --git a/pkg/solana/logpoller/discriminator.go b/pkg/solana/logpoller/discriminator.go new file mode 100644 index 000000000..812057a1c --- /dev/null +++ b/pkg/solana/logpoller/discriminator.go @@ -0,0 +1,14 @@ +package logpoller + +import ( + "crypto/sha256" + "fmt" +) + +const DiscriminatorLength = 8 + +func Discriminator(namespace, name string) [DiscriminatorLength]byte { + h := sha256.New() + h.Write([]byte(fmt.Sprintf("%s:%s", namespace, name))) + return [DiscriminatorLength]byte(h.Sum(nil)[:DiscriminatorLength]) +} diff --git a/pkg/solana/logpoller/filters.go b/pkg/solana/logpoller/filters.go index 4a1496371..990d83432 100644 --- a/pkg/solana/logpoller/filters.go +++ b/pkg/solana/logpoller/filters.go @@ -2,36 +2,56 @@ package logpoller import ( "context" + "encoding/base64" "errors" "fmt" "iter" "maps" + "reflect" + "strconv" "sync" "sync/atomic" + "github.com/gagliardetto/solana-go" + "github.com/smartcontractkit/chainlink-common/pkg/codec/encodings/binary" "github.com/smartcontractkit/chainlink-common/pkg/logger" + + "github.com/smartcontractkit/chainlink-solana/pkg/solana/codec" ) type filters struct { orm ORM lggr logger.SugaredLogger - filtersByID map[int64]*Filter - filtersByName map[string]int64 - filtersByAddress map[PublicKey]map[EventSignature]map[int64]struct{} - filtersToBackfill map[int64]struct{} - filtersToDelete map[int64]Filter - filtersMutex sync.RWMutex - loadedFilters atomic.Bool + filtersByID map[int64]*Filter + filtersByName map[string]int64 + filtersByAddress map[PublicKey]map[EventSignature]map[int64]struct{} + filtersToBackfill map[int64]struct{} + filtersToDelete map[int64]Filter + filtersMutex sync.RWMutex + loadedFilters atomic.Bool + knownPrograms map[string]uint // fast lookup to see if a base58-encoded ProgramID matches any registered filters + knownDiscriminators map[string]uint // fast lookup by first 10 characters (60-bits) of a base64-encoded discriminator + seqNums map[int64]int64 + decoders map[int64]Decoder } func newFilters(lggr logger.SugaredLogger, orm ORM) *filters { return &filters{ - orm: orm, - lggr: lggr, + orm: orm, + lggr: lggr, + decoders: make(map[int64]Decoder), } } +// IncrementSeqNum increments the sequence number for a filterID and returns the new +// number. This means the sequence number assigned to the first log matched after registration will be 1. +// WARNING: not thread safe, should only be called while fl.filtersMutex is locked, and after filters have been loaded. +func (fl *filters) IncrementSeqNum(filterID int64) int64 { + fl.seqNums[filterID]++ + return fl.seqNums[filterID] +} + // PruneFilters - prunes all filters marked to be deleted from the database and all corresponding logs. func (fl *filters) PruneFilters(ctx context.Context) error { err := fl.LoadFilters(ctx) @@ -100,6 +120,15 @@ func (fl *filters) RegisterFilter(ctx context.Context, filter Filter) error { filter.ID = filterID + idl := codec.IDL{ + Events: []codec.IdlEvent{filter.EventIdl.IdlEvent}, + Types: filter.EventIdl.IdlTypeDefSlice, + } + fl.decoders[filter.ID], err = codec.NewIDLEventCodec(idl, binary.LittleEndian()) + if err != nil { + return fmt.Errorf("failed to create event decoder: %w", err) + } + fl.filtersByName[filter.Name] = filter.ID fl.filtersByID[filter.ID] = &filter filtersForAddress, ok := fl.filtersByAddress[filter.Address] @@ -118,6 +147,13 @@ func (fl *filters) RegisterFilter(ctx context.Context, filter Filter) error { if !filter.IsBackfilled { fl.filtersToBackfill[filter.ID] = struct{}{} } + + programID := filter.Address.ToSolana().String() + fl.knownPrograms[programID]++ + + discriminatorHead := filter.Discriminator()[:10] + fl.knownDiscriminators[discriminatorHead]++ + return nil } @@ -159,6 +195,7 @@ func (fl *filters) removeFilterFromIndexes(filter Filter) { delete(fl.filtersByName, filter.Name) delete(fl.filtersToBackfill, filter.ID) delete(fl.filtersByID, filter.ID) + delete(fl.seqNums, filter.ID) filtersForAddress, ok := fl.filtersByAddress[filter.Address] if !ok { @@ -180,13 +217,33 @@ func (fl *filters) removeFilterFromIndexes(filter Filter) { if len(filtersForAddress) == 0 { delete(fl.filtersByAddress, filter.Address) } + + programID := filter.Address.ToSolana().String() + if refcount, ok := fl.knownPrograms[programID]; ok { + refcount-- + if refcount > 0 { + fl.knownPrograms[programID] = refcount + } else { + delete(fl.knownPrograms, programID) + } + } + + discriminatorHead := filter.Discriminator()[:10] + if refcount, ok := fl.knownDiscriminators[discriminatorHead]; ok { + refcount-- + if refcount > 0 { + fl.knownDiscriminators[discriminatorHead] = refcount + } else { + delete(fl.knownDiscriminators, discriminatorHead) + } + } } // MatchingFilters - returns iterator to go through all matching filters. // Requires LoadFilters to be called at least once. -func (fl *filters) MatchingFilters(addr PublicKey, eventSignature EventSignature) iter.Seq[Filter] { +func (fl *filters) matchingFilters(addr PublicKey, eventSignature EventSignature) iter.Seq[Filter] { if !fl.loadedFilters.Load() { - fl.lggr.Critical("Invariant violation: expected filters to be loaded before call to MatchingFilters") + fl.lggr.Critical("Invariant violation: expected filters to be loaded before call to matchingFilters") return nil } return func(yield func(Filter) bool) { @@ -210,6 +267,56 @@ func (fl *filters) MatchingFilters(addr PublicKey, eventSignature EventSignature } } +// MatchchingFiltersForEncodedEvent - similar to MatchingFilters but accepts a raw encoded event. Under normal operation, +// this will be called on every new event that happens on the blockchain, so it's important it returns immediately if it +// doesn't match any registered filters. +func (fl *filters) MatchingFiltersForEncodedEvent(event ProgramEvent) iter.Seq[Filter] { + // If this log message corresponds to an anchor event, then it must begin with an 8 byte discriminator, + // which will appear as the first 11 bytes of base64-encoded data. Standard base64 encoding RFC requires + // that any base64-encoded string must be padding with the = char to make its length a multiple of 4, so + // 12 is the minimum length for a valid anchor event. + if len(event.Data) < 12 { + return nil + } + isKnown := func() (ok bool) { + fl.filtersMutex.RLock() + defer fl.filtersMutex.RUnlock() + + if _, ok = fl.knownPrograms[event.Program]; !ok { + return ok + } + + // The first 64-bits of the event data is the event sig. Because it's base64 encoded, this corresponds to + // the first 10 characters plus 4 bits of the 11th character. We can quickly rule it out as not matching any known + // discriminators if the first 10 characters don't match. If it passes that initial test, we base64-decode the + // first 12 characters, and use the first 8 bytes of that as the event sig to call MatchingFilters. The address + // also needs to be base58-decoded to pass to MatchingFilters + _, ok = fl.knownDiscriminators[event.Data[:10]] + return ok + } + + if !isKnown() { + return nil + } + + addr, err := solana.PublicKeyFromBase58(event.Program) + if err != nil { + fl.lggr.Errorw("failed to parse Program ID for event", "EventProgram", event) + return nil + } + + // Decoding first 12 characters will give us the first 9 bytes of binary data + // The first 8 of those is the discriminator + decoded, err := base64.StdEncoding.DecodeString(event.Data[:12]) + if err != nil || len(decoded) < 8 { + fl.lggr.Errorw("failed to decode event data", "EventProgram", event) + return nil + } + eventSig := EventSignature(decoded[:8]) + + return fl.matchingFilters(PublicKey(addr), eventSig) +} + // GetFiltersToBackfill - returns copy of backfill queue // Requires LoadFilters to be called at least once. func (fl *filters) GetFiltersToBackfill() []Filter { @@ -264,6 +371,8 @@ func (fl *filters) LoadFilters(ctx context.Context) error { fl.filtersByAddress = make(map[PublicKey]map[EventSignature]map[int64]struct{}) fl.filtersToBackfill = make(map[int64]struct{}) fl.filtersToDelete = make(map[int64]Filter) + fl.knownPrograms = make(map[string]uint) + fl.knownDiscriminators = make(map[string]uint) filters, err := fl.orm.SelectFilters(ctx) if err != nil { @@ -310,7 +419,89 @@ func (fl *filters) LoadFilters(ctx context.Context) error { } } + fl.seqNums, err = fl.orm.SelectSeqNums(ctx) + if err != nil { + return fmt.Errorf("failed to select sequence numbers from db: %w", err) + } + fl.loadedFilters.Store(true) return nil } + +// DecodeSubKey accepts raw Borsh-encoded event data, a filter ID and a subkeyPath. It uses the decoder +// associated with that filter to decode the event and extract the subkey value from the specified subKeyPath. +// WARNING: not thread safe, should only be called while fl.filtersMutex is held and after filters have been loaded. +func (fl *filters) DecodeSubKey(ctx context.Context, raw []byte, ID int64, subKeyPath []string) (any, error) { + filter, ok := fl.filtersByID[ID] + if !ok { + return nil, fmt.Errorf("filter %d not found", ID) + } + decoder, ok := fl.decoders[ID] + if !ok { + return nil, fmt.Errorf("decoder %d not found", ID) + } + decodedEvent, err := decoder.CreateType(filter.EventName, false) + if err != nil || decodedEvent == nil { + return nil, err + } + err = decoder.Decode(ctx, raw, decodedEvent, filter.EventName) + if err != nil { + return nil, err + } + return ExtractField(decodedEvent, subKeyPath) +} + +// ExtractField extracts the value of a field or nested subfield from a composite datatype composed +// of a series of nested structs and maps. Pointers at any level are automatically dereferenced, as long +// as they aren't nil. path is an ordered list of nested subfield names to traverse. For now, slices and +// arrays are not supported. (If the need arises, we could support them by converting the field to an +// integer to extract a specific element from a slice or array.) +func ExtractField(data any, path []string) (any, error) { + v := reflect.ValueOf(data) + for v.Kind() == reflect.Ptr { + if v.IsNil() { + if len(path) > 0 { + return nil, fmt.Errorf("cannot extract field '%s' from a nil pointer", path[0]) + } + return nil, nil // as long as this is the last field in the path, nil pointer is not a problem + } + v = v.Elem() + } + + if len(path) == 0 { + return v.Interface(), nil + } + field, path := path[0], path[1:] + + switch v.Kind() { + case reflect.Struct: + v = v.FieldByName(field) + if !v.IsValid() { + return nil, fmt.Errorf("field '%s' of struct %v does not exist", field, data) + } + return ExtractField(v.Interface(), path) + case reflect.Map: + var keyVal reflect.Value + if keyType := v.Type().Key(); keyType.Kind() != reflect.String { + // This map does not have string keys, so let's try int (or anything convertible to int) + intKey, err := strconv.Atoi(field) + if err != nil { + return nil, fmt.Errorf("map key '%s' for non-string type '%T' is not convertable to an integer", field, v.Type()) + } + if !keyType.ConvertibleTo(reflect.TypeOf(intKey)) { + return nil, fmt.Errorf("map has type '%T', must be a string or convertable to an integer", v.Type()) + } + keyVal = reflect.ValueOf(intKey) + } else { + keyVal = reflect.ValueOf(field) + } + v = v.MapIndex(keyVal) + if !v.IsValid() { + return nil, fmt.Errorf("key '%s' of map %v does not exist", field, data) + } + return ExtractField(v.Interface(), path) + default: + return nil, fmt.Errorf("extracting a field from a %s type is not supported", v.Kind().String()) + } +} diff --git a/pkg/solana/logpoller/filters_test.go b/pkg/solana/logpoller/filters_test.go index 9f8058703..b83b71385 100644 --- a/pkg/solana/logpoller/filters_test.go +++ b/pkg/solana/logpoller/filters_test.go @@ -4,10 +4,12 @@ import ( "errors" "fmt" "slices" + "strings" "testing" "github.com/gagliardetto/solana-go" "github.com/google/uuid" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -39,6 +41,12 @@ func TestFilters_LoadFilters(t *testing.T) { happyPath2, }, nil).Once() + orm.On("SelectSeqNums", mock.Anything).Return(map[int64]int64{ + 1: 18, + 2: 25, + 3: 0, + }, nil) + err := fs.LoadFilters(ctx) require.EqualError(t, err, "failed to select filters from db: db failed") err = fs.LoadFilters(ctx) @@ -96,12 +104,6 @@ func TestFilters_RegisterFilter(t *testing.T) { f.EventSig = EventSignature{3, 2, 1} }, }, - { - Name: "EventIDL", - ModifyField: func(f *Filter) { - f.EventIDL = uuid.NewString() - }, - }, { Name: "SubkeyPaths", ModifyField: func(f *Filter) { @@ -116,6 +118,7 @@ func TestFilters_RegisterFilter(t *testing.T) { const filterName = "Filter" dbFilter := Filter{Name: filterName} orm.On("SelectFilters", mock.Anything).Return([]Filter{dbFilter}, nil).Once() + orm.On("SelectSeqNums", mock.Anything).Return(map[int64]int64{}, nil) newFilter := dbFilter tc.ModifyField(&newFilter) err := fs.RegisterFilter(tests.Context(t), newFilter) @@ -128,11 +131,13 @@ func TestFilters_RegisterFilter(t *testing.T) { fs := newFilters(lggr, orm) const filterName = "Filter" orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once() + orm.On("SelectSeqNums", mock.Anything).Return(map[int64]int64{}, nil).Once() orm.On("InsertFilter", mock.Anything, mock.Anything).Return(int64(0), errors.New("failed to insert")).Once() filter := Filter{Name: filterName} err := fs.RegisterFilter(tests.Context(t), filter) require.Error(t, err) - // can readd after db issue is resolved + + // can read after db issue is resolved orm.On("InsertFilter", mock.Anything, mock.Anything).Return(int64(1), nil).Once() err = fs.RegisterFilter(tests.Context(t), filter) require.NoError(t, err) @@ -144,7 +149,7 @@ func TestFilters_RegisterFilter(t *testing.T) { orm.On("InsertFilter", mock.Anything, mock.Anything).Return(int64(1), nil).Once() err = fs.RegisterFilter(tests.Context(t), filter) require.NoError(t, err) - storedFilters := slices.Collect(fs.MatchingFilters(filter.Address, filter.EventSig)) + storedFilters := slices.Collect(fs.matchingFilters(filter.Address, filter.EventSig)) require.Len(t, storedFilters, 1) filter.ID = 1 require.Equal(t, filter, storedFilters[0]) @@ -154,6 +159,7 @@ func TestFilters_RegisterFilter(t *testing.T) { fs := newFilters(lggr, orm) const filterName = "Filter" orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once() + orm.On("SelectSeqNums", mock.Anything).Return(map[int64]int64{}, nil).Once() const filterID = int64(10) orm.On("InsertFilter", mock.Anything, mock.Anything).Return(filterID, nil).Once() err := fs.RegisterFilter(tests.Context(t), Filter{Name: filterName}) @@ -185,6 +191,7 @@ func TestFilters_UnregisterFilter(t *testing.T) { fs := newFilters(lggr, orm) const filterName = "Filter" orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once() + orm.On("SelectSeqNums", mock.Anything).Return(map[int64]int64{}, nil).Once() err := fs.UnregisterFilter(tests.Context(t), filterName) require.NoError(t, err) }) @@ -194,6 +201,7 @@ func TestFilters_UnregisterFilter(t *testing.T) { const filterName = "Filter" const id int64 = 10 orm.On("SelectFilters", mock.Anything).Return([]Filter{{ID: id, Name: filterName}}, nil).Once() + orm.On("SelectSeqNums", mock.Anything).Return(map[int64]int64{}, nil).Once() orm.On("MarkFilterDeleted", mock.Anything, id).Return(errors.New("db query failed")).Once() err := fs.UnregisterFilter(tests.Context(t), filterName) require.EqualError(t, err, "failed to mark filter deleted: db query failed") @@ -204,6 +212,7 @@ func TestFilters_UnregisterFilter(t *testing.T) { const filterName = "Filter" const id int64 = 10 orm.On("SelectFilters", mock.Anything).Return([]Filter{{ID: id, Name: filterName}}, nil).Once() + orm.On("SelectSeqNums", mock.Anything).Return(map[int64]int64{}, nil).Once() orm.On("MarkFilterDeleted", mock.Anything, id).Return(nil).Once() err := fs.UnregisterFilter(tests.Context(t), filterName) require.NoError(t, err) @@ -231,6 +240,9 @@ func TestFilters_PruneFilters(t *testing.T) { Name: "To keep", }, }, nil).Once() + orm.On("SelectSeqNums", mock.Anything).Return(map[int64]int64{ + 2: 25, + }, nil).Once() orm.On("DeleteFilters", mock.Anything, map[int64]Filter{toDelete.ID: toDelete}).Return(nil).Once() err := fs.PruneFilters(tests.Context(t)) require.NoError(t, err) @@ -251,6 +263,10 @@ func TestFilters_PruneFilters(t *testing.T) { Name: "To keep", }, }, nil).Once() + orm.EXPECT().SelectSeqNums(mock.Anything).Return(map[int64]int64{ + 1: 18, + 2: 25, + }, nil).Once() newToDelete := Filter{ ID: 3, Name: "To delete 2", @@ -267,7 +283,7 @@ func TestFilters_PruneFilters(t *testing.T) { }) } -func TestFilters_MatchingFilters(t *testing.T) { +func TestFilters_matchingFilters(t *testing.T) { orm := newMockORM(t) lggr := logger.Sugared(logger.Test(t)) expectedFilter1 := Filter{ @@ -296,17 +312,23 @@ func TestFilters_MatchingFilters(t *testing.T) { EventSig: expectedFilter1.EventSig, } orm.On("SelectFilters", mock.Anything).Return([]Filter{expectedFilter1, expectedFilter2, sameAddress, sameEventSig}, nil).Once() + orm.On("SelectSeqNums", mock.Anything).Return(map[int64]int64{ + 1: 18, + 2: 25, + 3: 14, + 4: 0, + }, nil) filters := newFilters(lggr, orm) err := filters.LoadFilters(tests.Context(t)) require.NoError(t, err) - matchingFilters := slices.Collect(filters.MatchingFilters(expectedFilter1.Address, expectedFilter1.EventSig)) + matchingFilters := slices.Collect(filters.matchingFilters(expectedFilter1.Address, expectedFilter1.EventSig)) require.Len(t, matchingFilters, 2) require.Contains(t, matchingFilters, expectedFilter1) require.Contains(t, matchingFilters, expectedFilter2) // if at least one key does not match - returns empty iterator - require.Empty(t, slices.Collect(filters.MatchingFilters(newRandomPublicKey(t), expectedFilter1.EventSig))) - require.Empty(t, slices.Collect(filters.MatchingFilters(expectedFilter1.Address, newRandomEventSignature(t)))) - require.Empty(t, slices.Collect(filters.MatchingFilters(newRandomPublicKey(t), newRandomEventSignature(t)))) + require.Empty(t, slices.Collect(filters.matchingFilters(newRandomPublicKey(t), expectedFilter1.EventSig))) + require.Empty(t, slices.Collect(filters.matchingFilters(expectedFilter1.Address, newRandomEventSignature(t)))) + require.Empty(t, slices.Collect(filters.matchingFilters(newRandomPublicKey(t), newRandomEventSignature(t)))) } func TestFilters_GetFiltersToBackfill(t *testing.T) { @@ -324,6 +346,10 @@ func TestFilters_GetFiltersToBackfill(t *testing.T) { Name: "notBackfilled", } orm.EXPECT().SelectFilters(mock.Anything).Return([]Filter{backfilledFilter, notBackfilled}, nil).Once() + orm.EXPECT().SelectSeqNums(mock.Anything).Return(map[int64]int64{ + 1: 18, + 2: 25, + }, nil) filters := newFilters(lggr, orm) err := filters.LoadFilters(tests.Context(t)) require.NoError(t, err) @@ -363,3 +389,65 @@ func TestFilters_GetFiltersToBackfill(t *testing.T) { require.NoError(t, filters.RegisterFilter(tests.Context(t), newFilter)) ensureInQueue(notBackfilled, newFilter) } + +func TestExtractField(t *testing.T) { + type innerInner struct { + P string + Q int + } + type innerStruct struct { + PtrString *string + ByteSlice []byte + DoubleNested innerInner + MapStringInt map[string]int + MapIntString map[int]string + } + myString := "string" + myInt32 := int32(16) + + testStruct := struct { + A int + B string + C *int32 + D innerStruct + }{ + 5, + "hello", + &myInt32, + innerStruct{ + &myString, + []byte("bytes"), + innerInner{"goodbye", 8}, + map[string]int{"key1": 1, "key2": 2}, + map[int]string{1: "val1", 2: "val2"}, + }, + } + + cases := []struct { + Name string + Path string + Result any + }{ + {"int from struct", "A", int(5)}, + {"string from struct", "B", "hello"}, + {"*int32 from struct", "C", myInt32}, + {"*string from nested struct", "D.PtrString", myString}, + {"[]byte from nested struct", "D.ByteSlice", []byte("bytes")}, + {"string from double-nested struct", "D.DoubleNested.P", "goodbye"}, + {"map[string]int from nested struct", "D.MapStringInt.key2", 2}, + {"key in map not found", "D.MapIntString.3", nil}, + {"non-integer key for map[int]string", "D.MapIntString.NotAnInt", nil}, + {"invalid field name in nested struct", "D.NoSuchField", nil}, + } + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + result, err := ExtractField(&testStruct, strings.Split(c.Path, ".")) + if c.Result == nil { + assert.Error(t, err) + return + } + require.NoError(t, err) + assert.Equal(t, c.Result, result) + }) + } +} diff --git a/pkg/solana/logpoller/job.go b/pkg/solana/logpoller/job.go index 165c0b5fe..448d800e4 100644 --- a/pkg/solana/logpoller/job.go +++ b/pkg/solana/logpoller/job.go @@ -36,6 +36,7 @@ type eventDetail struct { slotNumber uint64 blockHeight uint64 blockHash solana.Hash + blockTime solana.UnixTimeSeconds trxIdx int trxSig solana.Signature } @@ -114,9 +115,15 @@ func (j *getTransactionsFromBlockJob) Run(ctx context.Context) error { blockHash: block.Blockhash, } - if block.BlockHeight != nil { - detail.blockHeight = *block.BlockHeight + if block.BlockHeight == nil { + return fmt.Errorf("block at slot %d returned from rpc is missing block number", j.slotNumber) } + detail.blockHeight = *block.BlockHeight + + if block.BlockTime == nil { + return fmt.Errorf("received block %d from rpc with missing block time", block.BlockHeight) + } + detail.blockTime = *block.BlockTime if len(block.Transactions) != len(blockSigsOnly.Signatures) { return fmt.Errorf("block %d has %d transactions but %d signatures", j.slotNumber, len(block.Transactions), len(blockSigsOnly.Signatures)) @@ -143,6 +150,7 @@ func messagesToEvents(messages []string, parser ProgramEventProcessor, detail ev event.SlotNumber = detail.slotNumber event.BlockHeight = detail.blockHeight event.BlockHash = detail.blockHash + event.BlockTime = detail.blockTime event.TransactionHash = detail.trxSig event.TransactionIndex = detail.trxIdx event.TransactionLogIndex = logIdx diff --git a/pkg/solana/logpoller/loader.go b/pkg/solana/logpoller/loader.go index d714f08ad..39a985a98 100644 --- a/pkg/solana/logpoller/loader.go +++ b/pkg/solana/logpoller/loader.go @@ -27,8 +27,8 @@ type ProgramEventProcessor interface { } type RPCClient interface { - GetLatestBlockhash(ctx context.Context, commitment rpc.CommitmentType) (out *rpc.GetLatestBlockhashResult, err error) - GetBlocks(ctx context.Context, startSlot uint64, endSlot *uint64, commitment rpc.CommitmentType) (out rpc.BlocksResult, err error) + LatestBlockhash(ctx context.Context) (out *rpc.GetLatestBlockhashResult, err error) + GetBlocks(ctx context.Context, startSlot uint64, endSlot *uint64) (out rpc.BlocksResult, err error) GetBlockWithOpts(context.Context, uint64, *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) GetSignaturesForAddressWithOpts(context.Context, solana.PublicKey, *rpc.GetSignaturesForAddressOpts) ([]*rpc.TransactionSignature, error) } @@ -170,7 +170,7 @@ func (c *EncodedLogCollector) runSlotPolling(ctx context.Context) { ctxB, cancel := context.WithTimeout(ctx, c.rpcTimeLimit) // not to be run as a job, but as a blocking call - result, err := c.client.GetLatestBlockhash(ctxB, rpc.CommitmentFinalized) + result, err := c.client.LatestBlockhash(ctxB) if err != nil { c.lggr.Error("failed to get latest blockhash", "err", err) cancel() @@ -276,7 +276,7 @@ func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, en rpcCtx, cancel := context.WithTimeout(ctx, c.rpcTimeLimit) defer cancel() - if result, err = c.client.GetBlocks(rpcCtx, start, &end, rpc.CommitmentFinalized); err != nil { + if result, err = c.client.GetBlocks(rpcCtx, start, &end); err != nil { return err } diff --git a/pkg/solana/logpoller/loader_test.go b/pkg/solana/logpoller/loader_test.go index e3cbb7700..9eb2482bd 100644 --- a/pkg/solana/logpoller/loader_test.go +++ b/pkg/solana/logpoller/loader_test.go @@ -3,7 +3,6 @@ package logpoller_test import ( "context" "crypto/rand" - "reflect" "sync" "sync/atomic" "testing" @@ -19,7 +18,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller" - mocks "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller/mocks" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller/mocks" ) var ( @@ -63,7 +62,7 @@ func TestEncodedLogCollector_ParseSingleEvent(t *testing.T) { latest.Store(uint64(40)) client.EXPECT(). - GetLatestBlockhash(mock.Anything, rpc.CommitmentFinalized). + LatestBlockhash(mock.Anything). RunAndReturn(latestBlockhashReturnFunc(&latest)) client.EXPECT(). @@ -71,7 +70,6 @@ func TestEncodedLogCollector_ParseSingleEvent(t *testing.T) { mock.Anything, mock.MatchedBy(getBlocksStartValMatcher), mock.MatchedBy(getBlocksEndValMatcher(&latest)), - rpc.CommitmentFinalized, ). RunAndReturn(getBlocksReturnFunc(false)) @@ -79,11 +77,13 @@ func TestEncodedLogCollector_ParseSingleEvent(t *testing.T) { GetBlockWithOpts(mock.Anything, mock.Anything, mock.Anything). RunAndReturn(func(_ context.Context, slot uint64, _ *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) { height := slot - 1 + timeStamp := solana.UnixTimeSeconds(time.Now().Unix()) result := rpc.GetBlockResult{ Transactions: []rpc.TransactionWithMeta{}, Signatures: []solana.Signature{}, BlockHeight: &height, + BlockTime: &timeStamp, } _, _ = rand.Read(result.Blockhash[:]) @@ -133,13 +133,15 @@ func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) { hashes := make([]solana.Hash, len(slots)) scrambler := &slotUnsync{ch: make(chan struct{})} + timeStamp := solana.UnixTimeSeconds(time.Now().Unix()) + for idx := range len(sigs) { _, _ = rand.Read(sigs[idx][:]) _, _ = rand.Read(hashes[idx][:]) } client.EXPECT(). - GetLatestBlockhash(mock.Anything, rpc.CommitmentFinalized). + LatestBlockhash(mock.Anything). RunAndReturn(latestBlockhashReturnFunc(&latest)) client.EXPECT(). @@ -147,7 +149,6 @@ func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) { mock.Anything, mock.MatchedBy(getBlocksStartValMatcher), mock.MatchedBy(getBlocksEndValMatcher(&latest)), - rpc.CommitmentFinalized, ). RunAndReturn(getBlocksReturnFunc(false)) @@ -178,6 +179,7 @@ func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) { Transactions: []rpc.TransactionWithMeta{}, Signatures: []solana.Signature{}, BlockHeight: &height, + BlockTime: &timeStamp, }, nil } @@ -192,61 +194,68 @@ func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) { }, Signatures: []solana.Signature{sigs[slotIdx]}, BlockHeight: &height, + BlockTime: &timeStamp, }, nil }) tests.AssertEventually(t, func() bool { - return reflect.DeepEqual(parser.Events(), []logpoller.ProgramEvent{ - { - BlockData: logpoller.BlockData{ - SlotNumber: 41, - BlockHeight: 40, - BlockHash: hashes[3], - TransactionHash: sigs[3], - TransactionIndex: 0, - TransactionLogIndex: 0, - }, - Prefix: ">", - Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + return len(parser.Events()) >= 4 + }) + + assert.Equal(t, []logpoller.ProgramEvent{ + { + BlockData: logpoller.BlockData{ + SlotNumber: 41, + BlockHeight: 40, + BlockTime: timeStamp, + BlockHash: hashes[3], + TransactionHash: sigs[3], + TransactionIndex: 0, + TransactionLogIndex: 0, }, - { - BlockData: logpoller.BlockData{ - SlotNumber: 42, - BlockHeight: 41, - BlockHash: hashes[2], - TransactionHash: sigs[2], - TransactionIndex: 0, - TransactionLogIndex: 0, - }, - Prefix: ">", - Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + Program: "J1zQwrBNBngz26jRPNWsUSZMHJwBwpkoDitXRV95LdK4", + Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + }, + { + BlockData: logpoller.BlockData{ + SlotNumber: 42, + BlockHeight: 41, + BlockTime: timeStamp, + BlockHash: hashes[2], + TransactionHash: sigs[2], + TransactionIndex: 0, + TransactionLogIndex: 0, }, - { - BlockData: logpoller.BlockData{ - SlotNumber: 43, - BlockHeight: 42, - BlockHash: hashes[1], - TransactionHash: sigs[1], - TransactionIndex: 0, - TransactionLogIndex: 0, - }, - Prefix: ">", - Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + Program: "J1zQwrBNBngz26jRPNWsUSZMHJwBwpkoDitXRV95LdK4", + Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + }, + { + BlockData: logpoller.BlockData{ + SlotNumber: 43, + BlockHeight: 42, + BlockTime: timeStamp, + BlockHash: hashes[1], + TransactionHash: sigs[1], + TransactionIndex: 0, + TransactionLogIndex: 0, }, - { - BlockData: logpoller.BlockData{ - SlotNumber: 44, - BlockHeight: 43, - BlockHash: hashes[0], - TransactionHash: sigs[0], - TransactionIndex: 0, - TransactionLogIndex: 0, - }, - Prefix: ">", - Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + Program: "J1zQwrBNBngz26jRPNWsUSZMHJwBwpkoDitXRV95LdK4", + Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + }, + { + BlockData: logpoller.BlockData{ + SlotNumber: 44, + BlockHeight: 43, + BlockTime: timeStamp, + BlockHash: hashes[0], + TransactionHash: sigs[0], + TransactionIndex: 0, + TransactionLogIndex: 0, }, - }) - }) + Program: "J1zQwrBNBngz26jRPNWsUSZMHJwBwpkoDitXRV95LdK4", + Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + }, + }, parser.Events()) client.AssertExpectations(t) } @@ -298,7 +307,7 @@ func TestEncodedLogCollector_BackfillForAddress(t *testing.T) { // GetLatestBlockhash might be called at start-up; make it take some time because the result isn't needed for this test client.EXPECT(). - GetLatestBlockhash(mock.Anything, rpc.CommitmentFinalized). + LatestBlockhash(mock.Anything). RunAndReturn(latestBlockhashReturnFunc(&latest)). After(2 * time.Second). Maybe() @@ -308,7 +317,6 @@ func TestEncodedLogCollector_BackfillForAddress(t *testing.T) { mock.Anything, mock.MatchedBy(getBlocksStartValMatcher), mock.MatchedBy(getBlocksEndValMatcher(&latest)), - rpc.CommitmentFinalized, ). RunAndReturn(getBlocksReturnFunc(true)) @@ -340,12 +348,14 @@ func TestEncodedLogCollector_BackfillForAddress(t *testing.T) { } height := slot - 1 + timeStamp := solana.UnixTimeSeconds(time.Now().Unix()) if idx == -1 { return &rpc.GetBlockResult{ Transactions: []rpc.TransactionWithMeta{}, Signatures: []solana.Signature{}, BlockHeight: &height, + BlockTime: &timeStamp, }, nil } @@ -364,6 +374,7 @@ func TestEncodedLogCollector_BackfillForAddress(t *testing.T) { }, Signatures: []solana.Signature{sigs[idx*2], sigs[(idx*2)+1]}, BlockHeight: &height, + BlockTime: &timeStamp, }, nil }) @@ -455,7 +466,7 @@ func (p *testBlockProducer) Count() uint64 { return p.count } -func (p *testBlockProducer) GetLatestBlockhash(_ context.Context, _ rpc.CommitmentType) (out *rpc.GetLatestBlockhashResult, err error) { +func (p *testBlockProducer) LatestBlockhash(_ context.Context) (out *rpc.GetLatestBlockhashResult, err error) { p.b.Helper() p.mu.Lock() @@ -474,7 +485,7 @@ func (p *testBlockProducer) GetLatestBlockhash(_ context.Context, _ rpc.Commitme }, nil } -func (p *testBlockProducer) GetBlocks(_ context.Context, startSlot uint64, endSlot *uint64, _ rpc.CommitmentType) (out rpc.BlocksResult, err error) { +func (p *testBlockProducer) GetBlocks(_ context.Context, startSlot uint64, endSlot *uint64) (out rpc.BlocksResult, err error) { p.b.Helper() p.mu.Lock() @@ -486,7 +497,7 @@ func (p *testBlockProducer) GetBlocks(_ context.Context, startSlot uint64, endSl blocks[idx] = startSlot + uint64(idx) } - return rpc.BlocksResult(blocks), nil + return blocks, nil } func (p *testBlockProducer) GetBlockWithOpts(_ context.Context, block uint64, opts *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) { @@ -589,8 +600,8 @@ func (p *testParser) Events() []logpoller.ProgramEvent { return p.events } -func latestBlockhashReturnFunc(latest *atomic.Uint64) func(context.Context, rpc.CommitmentType) (*rpc.GetLatestBlockhashResult, error) { - return func(ctx context.Context, ct rpc.CommitmentType) (*rpc.GetLatestBlockhashResult, error) { +func latestBlockhashReturnFunc(latest *atomic.Uint64) func(context.Context) (*rpc.GetLatestBlockhashResult, error) { + return func(ctx context.Context) (*rpc.GetLatestBlockhashResult, error) { defer func() { latest.Store(latest.Load() + 2) }() @@ -608,8 +619,8 @@ func latestBlockhashReturnFunc(latest *atomic.Uint64) func(context.Context, rpc. } } -func getBlocksReturnFunc(empty bool) func(context.Context, uint64, *uint64, rpc.CommitmentType) (rpc.BlocksResult, error) { - return func(_ context.Context, u1 uint64, u2 *uint64, _ rpc.CommitmentType) (rpc.BlocksResult, error) { +func getBlocksReturnFunc(empty bool) func(context.Context, uint64, *uint64) (rpc.BlocksResult, error) { + return func(_ context.Context, u1 uint64, u2 *uint64) (rpc.BlocksResult, error) { blocks := []uint64{} if !empty { @@ -619,7 +630,7 @@ func getBlocksReturnFunc(empty bool) func(context.Context, uint64, *uint64, rpc. } } - return rpc.BlocksResult(blocks), nil + return blocks, nil } } diff --git a/pkg/solana/logpoller/log_data_parser.go b/pkg/solana/logpoller/log_data_parser.go index 4080a09e2..ca3fb79e9 100644 --- a/pkg/solana/logpoller/log_data_parser.go +++ b/pkg/solana/logpoller/log_data_parser.go @@ -19,6 +19,7 @@ type BlockData struct { SlotNumber uint64 BlockHeight uint64 BlockHash solana.Hash + BlockTime solana.UnixTimeSeconds TransactionHash solana.Signature TransactionIndex int TransactionLogIndex uint @@ -31,9 +32,9 @@ type ProgramLog struct { } type ProgramEvent struct { + Program string BlockData - Prefix string - Data string + Data string } type ProgramOutput struct { @@ -78,8 +79,8 @@ func parseProgramLogs(logs []string) []ProgramOutput { if len(dataMatches) > 1 { instLogs[lastLogIdx].Events = append(instLogs[lastLogIdx].Events, ProgramEvent{ - Prefix: prefixBuilder(depth), - Data: dataMatches[1], + Program: instLogs[lastLogIdx].Program, + Data: dataMatches[1], }) } } else if strings.HasPrefix(log, "Log truncated") { diff --git a/pkg/solana/logpoller/log_poller.go b/pkg/solana/logpoller/log_poller.go index 4c386693e..c82e8eb47 100644 --- a/pkg/solana/logpoller/log_poller.go +++ b/pkg/solana/logpoller/log_poller.go @@ -2,40 +2,53 @@ package logpoller import ( "context" + "encoding/base64" "errors" + "fmt" + "math" "time" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/utils" + + "github.com/smartcontractkit/chainlink-solana/pkg/solana/client" ) var ( - ErrFilterNameConflict = errors.New("filter with such name already exists") + ErrFilterNameConflict = errors.New("filter with such name already exists") + ErrMissingDiscriminator = errors.New("Solana log is missing discriminator") ) type ORM interface { + ChainID() string InsertFilter(ctx context.Context, filter Filter) (id int64, err error) SelectFilters(ctx context.Context) ([]Filter, error) DeleteFilters(ctx context.Context, filters map[int64]Filter) error MarkFilterDeleted(ctx context.Context, id int64) (err error) MarkFilterBackfilled(ctx context.Context, id int64) (err error) + InsertLogs(context.Context, []Log) (err error) + SelectSeqNums(ctx context.Context) (map[int64]int64, error) } -type LogPoller struct { +type Service struct { + services.StateMachine services.Service eng *services.Engine - lggr logger.SugaredLogger - orm ORM + lggr logger.SugaredLogger + orm ORM + client client.Reader + collector *EncodedLogCollector filters *filters } -func New(lggr logger.SugaredLogger, orm ORM) *LogPoller { +func New(lggr logger.SugaredLogger, orm ORM, cl client.Reader) *Service { lggr = logger.Sugared(logger.Named(lggr, "LogPoller")) - lp := &LogPoller{ + lp := &Service{ orm: orm, - lggr: lggr, + client: cl, filters: newFilters(lggr, orm), } @@ -44,30 +57,124 @@ func New(lggr logger.SugaredLogger, orm ORM) *LogPoller { Start: lp.start, }.NewServiceEngine(lggr) lp.lggr = lp.eng.SugaredLogger + return lp } -func (lp *LogPoller) start(context.Context) error { +func (lp *Service) start(_ context.Context) error { lp.eng.Go(lp.run) lp.eng.Go(lp.backgroundWorkerRun) return nil } +func makeLogIndex(txIndex int, txLogIndex uint) (int64, error) { + if txIndex > 0 && txIndex < math.MaxInt32 && txLogIndex < math.MaxUint32 { + return int64(txIndex<<32) | int64(txLogIndex), nil + } + return 0, fmt.Errorf("txIndex or txLogIndex out of range: txIndex=%d, txLogIndex=%d", txIndex, txLogIndex) +} + +// Process - process stream of events coming from log ingester +func (lp *Service) Process(programEvent ProgramEvent) (err error) { + ctx, cancel := utils.ContextFromChan(lp.eng.StopChan) + defer cancel() + + // This should never happen, since the log collector isn't started until after the filters + // get loaded. But just in case, return an error if they aren't so the collector knows to retry later. + if err = lp.filters.LoadFilters(ctx); err != nil { + return err + } + + blockData := programEvent.BlockData + + matchingFilters := lp.filters.MatchingFiltersForEncodedEvent(programEvent) + if matchingFilters == nil { + return nil + } + + var logs []Log + for filter := range matchingFilters { + logIndex, logIndexErr := makeLogIndex(blockData.TransactionIndex, blockData.TransactionLogIndex) + if logIndexErr != nil { + lp.lggr.Critical(err) + return err + } + if blockData.SlotNumber == math.MaxInt64 { + errSlot := fmt.Errorf("slot number %d out of range", blockData.SlotNumber) + lp.lggr.Critical(err.Error()) + return errSlot + } + log := Log{ + FilterID: filter.ID, + ChainID: lp.orm.ChainID(), + LogIndex: logIndex, + BlockHash: Hash(blockData.BlockHash), + BlockNumber: int64(blockData.SlotNumber), //nolint:gosec + BlockTimestamp: blockData.BlockTime.Time().UTC(), + Address: filter.Address, + EventSig: filter.EventSig, + TxHash: Signature(blockData.TransactionHash), + } + + eventData, decodeErr := base64.StdEncoding.DecodeString(programEvent.Data) + if decodeErr != nil { + return decodeErr + } + if len(eventData) < 8 { + err = fmt.Errorf("Assumption violation: %w, log.Data=%s", ErrMissingDiscriminator, log.Data) + lp.lggr.Criticalw(err.Error()) + return err + } + log.Data = eventData[8:] + + log.SubkeyValues = make([]IndexedValue, 0, len(filter.SubkeyPaths)) + for _, path := range filter.SubkeyPaths { + subKeyVal, decodeSubKeyErr := lp.filters.DecodeSubKey(ctx, log.Data, filter.ID, path) + if decodeSubKeyErr != nil { + return decodeSubKeyErr + } + indexedVal, newIndexedValErr := NewIndexedValue(subKeyVal) + if newIndexedValErr != nil { + return newIndexedValErr + } + log.SubkeyValues = append(log.SubkeyValues, indexedVal) + } + + log.SequenceNum = lp.filters.IncrementSeqNum(filter.ID) + + if filter.Retention > 0 { + expiresAt := time.Now().Add(filter.Retention).UTC() + log.ExpiresAt = &expiresAt + } + + logs = append(logs, log) + } + if len(logs) == 0 { + return nil + } + + err = lp.orm.InsertLogs(ctx, logs) + if err != nil { + return err + } + return nil +} + // RegisterFilter - refer to filters.RegisterFilter for details. -func (lp *LogPoller) RegisterFilter(ctx context.Context, filter Filter) error { +func (lp *Service) RegisterFilter(ctx context.Context, filter Filter) error { ctx, cancel := lp.eng.Ctx(ctx) defer cancel() return lp.filters.RegisterFilter(ctx, filter) } // UnregisterFilter refer to filters.UnregisterFilter for details -func (lp *LogPoller) UnregisterFilter(ctx context.Context, name string) error { +func (lp *Service) UnregisterFilter(ctx context.Context, name string) error { ctx, cancel := lp.eng.Ctx(ctx) defer cancel() return lp.filters.UnregisterFilter(ctx, name) } -func (lp *LogPoller) loadFilters(ctx context.Context) error { +func (lp *Service) retryUntilSuccess(ctx context.Context, failMessage string, fn func(context.Context) error) error { retryTicker := services.TickerConfig{Initial: 0, JitterPct: services.DefaultJitter}.NewTicker(time.Second) defer retryTicker.Stop() @@ -77,22 +184,29 @@ func (lp *LogPoller) loadFilters(ctx context.Context) error { return ctx.Err() case <-retryTicker.C: } - err := lp.filters.LoadFilters(ctx) - if err != nil { - lp.lggr.Errorw("Failed loading filters in init logpoller loop, retrying later", "err", err) - continue + err := fn(ctx) + if err == nil { + return nil } - - return nil + lp.lggr.Errorw(failMessage, "err", err) } + // unreachable } -func (lp *LogPoller) run(ctx context.Context) { - err := lp.loadFilters(ctx) +func (lp *Service) run(ctx context.Context) { + err := lp.retryUntilSuccess(ctx, "failed loading filters in init Service loop, retrying later", lp.filters.LoadFilters) + if err != nil { + lp.lggr.Warnw("never loaded filters before shutdown", "err", err) + return + } + + // safe to start fetching logs, now that filters are loaded + err = lp.retryUntilSuccess(ctx, "failed to start EncodedLogCollector, retrying later", lp.collector.Start) if err != nil { - lp.lggr.Warnw("Failed loading filters", "err", err) + lp.lggr.Warnw("EncodedLogCollector never started before shutdown", "err", err) return } + defer lp.collector.Close() var blocks chan struct { BlockNumber int64 @@ -119,7 +233,7 @@ func (lp *LogPoller) run(ctx context.Context) { } } -func (lp *LogPoller) backgroundWorkerRun(ctx context.Context) { +func (lp *Service) backgroundWorkerRun(ctx context.Context) { pruneFilters := services.NewTicker(time.Minute) defer pruneFilters.Stop() for { @@ -135,7 +249,7 @@ func (lp *LogPoller) backgroundWorkerRun(ctx context.Context) { } } -func (lp *LogPoller) startFilterBackfill(ctx context.Context, filter Filter, toBlock int64) { +func (lp *Service) startFilterBackfill(ctx context.Context, filter Filter, toBlock int64) { // TODO: NONEVM-916 start backfill lp.lggr.Debugw("Starting filter backfill", "filter", filter) err := lp.filters.MarkFilterBackfilled(ctx, filter.ID) diff --git a/pkg/solana/logpoller/log_poller_test.go b/pkg/solana/logpoller/log_poller_test.go new file mode 100644 index 000000000..e5006ae7b --- /dev/null +++ b/pkg/solana/logpoller/log_poller_test.go @@ -0,0 +1,126 @@ +package logpoller + +import ( + "context" + "encoding/base64" + "encoding/json" + "math/rand" + "testing" + + bin "github.com/gagliardetto/binary" + "github.com/gagliardetto/solana-go" + "github.com/google/uuid" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + clientmocks "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/mocks" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/codec" +) + +func TestProcess(t *testing.T) { + ctx := tests.Context(t) + + addr := newRandomPublicKey(t) + eventName := "myEvent" + eventSig := Discriminator("event", eventName) + event := struct { + A int64 + B string + }{55, "hello"} + subkeyValA, err := NewIndexedValue(event.A) + require.NoError(t, err) + subkeyValB, err := NewIndexedValue(event.B) + require.NoError(t, err) + + filterID := rand.Int63() + chainID := uuid.NewString() + + txIndex := int(rand.Int31()) + txLogIndex := uint(rand.Uint32()) + + expectedLog := newRandomLog(t, filterID, chainID, eventName) + expectedLog.Address = addr + expectedLog.LogIndex, err = makeLogIndex(txIndex, txLogIndex) + require.NoError(t, err) + expectedLog.SequenceNum = 1 + expectedLog.SubkeyValues = []IndexedValue{subkeyValA, subkeyValB} + + expectedLog.Data, err = bin.MarshalBorsh(&event) + require.NoError(t, err) + + ev := ProgramEvent{ + Program: addr.ToSolana().String(), + BlockData: BlockData{ + SlotNumber: uint64(expectedLog.BlockNumber), + BlockHeight: 3, + BlockHash: expectedLog.BlockHash.ToSolana(), + BlockTime: solana.UnixTimeSeconds(expectedLog.BlockTimestamp.Unix()), + TransactionHash: expectedLog.TxHash.ToSolana(), + TransactionIndex: txIndex, + TransactionLogIndex: txLogIndex, + }, + Data: base64.StdEncoding.EncodeToString(append(eventSig[:], expectedLog.Data...)), + } + + orm := newMockORM(t) + cl := clientmocks.NewReaderWriter(t) + lggr := logger.Sugared(logger.Test(t)) + lp := New(lggr, orm, cl) + + var idlTypeInt64 codec.IdlType + var idlTypeString codec.IdlType + + err = json.Unmarshal([]byte("\"i64\""), &idlTypeInt64) + require.NoError(t, err) + err = json.Unmarshal([]byte("\"string\""), &idlTypeString) + require.NoError(t, err) + + idl := EventIdl{ + codec.IdlEvent{ + Name: "myEvent", + Fields: []codec.IdlEventField{{ + Name: "A", + Type: idlTypeInt64, + }, { + Name: "B", + Type: idlTypeString, + }}, + }, + []codec.IdlTypeDef{}, + } + + filter := Filter{ + Name: "test filter", + EventName: eventName, + Address: addr, + EventSig: eventSig, + EventIdl: idl, + SubkeyPaths: [][]string{{"A"}, {"B"}}, + } + orm.EXPECT().SelectFilters(mock.Anything).Return([]Filter{filter}, nil).Once() + orm.EXPECT().SelectSeqNums(mock.Anything).Return(map[int64]int64{}, nil).Once() + orm.EXPECT().ChainID().Return(chainID).Once() + orm.EXPECT().InsertFilter(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, f Filter) (int64, error) { + require.Equal(t, f, filter) + return filterID, nil + }).Once() + + orm.EXPECT().InsertLogs(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, logs []Log) error { + require.Len(t, logs, 1) + log := logs[0] + assert.Equal(t, log, expectedLog) + return nil + }) + err = lp.RegisterFilter(ctx, filter) + require.NoError(t, err) + + err = lp.Process(ev) + require.NoError(t, err) + + orm.EXPECT().MarkFilterDeleted(mock.Anything, mock.Anything).Return(nil).Once() + err = lp.UnregisterFilter(ctx, filter.Name) + require.NoError(t, err) +} diff --git a/pkg/solana/logpoller/mock_orm.go b/pkg/solana/logpoller/mock_orm.go index 0595ea718..1508ba4aa 100644 --- a/pkg/solana/logpoller/mock_orm.go +++ b/pkg/solana/logpoller/mock_orm.go @@ -21,6 +21,51 @@ func (_m *mockORM) EXPECT() *mockORM_Expecter { return &mockORM_Expecter{mock: &_m.Mock} } +// ChainID provides a mock function with given fields: +func (_m *mockORM) ChainID() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for ChainID") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// mockORM_ChainID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ChainID' +type mockORM_ChainID_Call struct { + *mock.Call +} + +// ChainID is a helper method to define mock.On call +func (_e *mockORM_Expecter) ChainID() *mockORM_ChainID_Call { + return &mockORM_ChainID_Call{Call: _e.mock.On("ChainID")} +} + +func (_c *mockORM_ChainID_Call) Run(run func()) *mockORM_ChainID_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *mockORM_ChainID_Call) Return(_a0 string) *mockORM_ChainID_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *mockORM_ChainID_Call) RunAndReturn(run func() string) *mockORM_ChainID_Call { + _c.Call.Return(run) + return _c +} + // DeleteFilters provides a mock function with given fields: ctx, filters func (_m *mockORM) DeleteFilters(ctx context.Context, filters map[int64]Filter) error { ret := _m.Called(ctx, filters) @@ -125,6 +170,53 @@ func (_c *mockORM_InsertFilter_Call) RunAndReturn(run func(context.Context, Filt return _c } +// InsertLogs provides a mock function with given fields: _a0, _a1 +func (_m *mockORM) InsertLogs(_a0 context.Context, _a1 []Log) error { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for InsertLogs") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []Log) error); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// mockORM_InsertLogs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InsertLogs' +type mockORM_InsertLogs_Call struct { + *mock.Call +} + +// InsertLogs is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 []Log +func (_e *mockORM_Expecter) InsertLogs(_a0 interface{}, _a1 interface{}) *mockORM_InsertLogs_Call { + return &mockORM_InsertLogs_Call{Call: _e.mock.On("InsertLogs", _a0, _a1)} +} + +func (_c *mockORM_InsertLogs_Call) Run(run func(_a0 context.Context, _a1 []Log)) *mockORM_InsertLogs_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]Log)) + }) + return _c +} + +func (_c *mockORM_InsertLogs_Call) Return(err error) *mockORM_InsertLogs_Call { + _c.Call.Return(err) + return _c +} + +func (_c *mockORM_InsertLogs_Call) RunAndReturn(run func(context.Context, []Log) error) *mockORM_InsertLogs_Call { + _c.Call.Return(run) + return _c +} + // MarkFilterBackfilled provides a mock function with given fields: ctx, id func (_m *mockORM) MarkFilterBackfilled(ctx context.Context, id int64) error { ret := _m.Called(ctx, id) @@ -277,6 +369,64 @@ func (_c *mockORM_SelectFilters_Call) RunAndReturn(run func(context.Context) ([] return _c } +// SelectSeqNums provides a mock function with given fields: ctx +func (_m *mockORM) SelectSeqNums(ctx context.Context) (map[int64]int64, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for SelectSeqNums") + } + + var r0 map[int64]int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (map[int64]int64, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) map[int64]int64); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[int64]int64) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// mockORM_SelectSeqNums_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SelectSeqNums' +type mockORM_SelectSeqNums_Call struct { + *mock.Call +} + +// SelectSeqNums is a helper method to define mock.On call +// - ctx context.Context +func (_e *mockORM_Expecter) SelectSeqNums(ctx interface{}) *mockORM_SelectSeqNums_Call { + return &mockORM_SelectSeqNums_Call{Call: _e.mock.On("SelectSeqNums", ctx)} +} + +func (_c *mockORM_SelectSeqNums_Call) Run(run func(ctx context.Context)) *mockORM_SelectSeqNums_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *mockORM_SelectSeqNums_Call) Return(_a0 map[int64]int64, _a1 error) *mockORM_SelectSeqNums_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *mockORM_SelectSeqNums_Call) RunAndReturn(run func(context.Context) (map[int64]int64, error)) *mockORM_SelectSeqNums_Call { + _c.Call.Return(run) + return _c +} + // newMockORM creates a new instance of mockORM. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func newMockORM(t interface { diff --git a/pkg/solana/logpoller/mocks/rpc_client.go b/pkg/solana/logpoller/mocks/rpc_client.go index 851eba9ec..1d112f399 100644 --- a/pkg/solana/logpoller/mocks/rpc_client.go +++ b/pkg/solana/logpoller/mocks/rpc_client.go @@ -85,9 +85,9 @@ func (_c *RPCClient_GetBlockWithOpts_Call) RunAndReturn(run func(context.Context return _c } -// GetBlocks provides a mock function with given fields: ctx, startSlot, endSlot, commitment -func (_m *RPCClient) GetBlocks(ctx context.Context, startSlot uint64, endSlot *uint64, commitment rpc.CommitmentType) (rpc.BlocksResult, error) { - ret := _m.Called(ctx, startSlot, endSlot, commitment) +// GetBlocks provides a mock function with given fields: ctx, startSlot, endSlot +func (_m *RPCClient) GetBlocks(ctx context.Context, startSlot uint64, endSlot *uint64) (rpc.BlocksResult, error) { + ret := _m.Called(ctx, startSlot, endSlot) if len(ret) == 0 { panic("no return value specified for GetBlocks") @@ -95,19 +95,19 @@ func (_m *RPCClient) GetBlocks(ctx context.Context, startSlot uint64, endSlot *u var r0 rpc.BlocksResult var r1 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, *uint64, rpc.CommitmentType) (rpc.BlocksResult, error)); ok { - return rf(ctx, startSlot, endSlot, commitment) + if rf, ok := ret.Get(0).(func(context.Context, uint64, *uint64) (rpc.BlocksResult, error)); ok { + return rf(ctx, startSlot, endSlot) } - if rf, ok := ret.Get(0).(func(context.Context, uint64, *uint64, rpc.CommitmentType) rpc.BlocksResult); ok { - r0 = rf(ctx, startSlot, endSlot, commitment) + if rf, ok := ret.Get(0).(func(context.Context, uint64, *uint64) rpc.BlocksResult); ok { + r0 = rf(ctx, startSlot, endSlot) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(rpc.BlocksResult) } } - if rf, ok := ret.Get(1).(func(context.Context, uint64, *uint64, rpc.CommitmentType) error); ok { - r1 = rf(ctx, startSlot, endSlot, commitment) + if rf, ok := ret.Get(1).(func(context.Context, uint64, *uint64) error); ok { + r1 = rf(ctx, startSlot, endSlot) } else { r1 = ret.Error(1) } @@ -124,14 +124,13 @@ type RPCClient_GetBlocks_Call struct { // - ctx context.Context // - startSlot uint64 // - endSlot *uint64 -// - commitment rpc.CommitmentType -func (_e *RPCClient_Expecter) GetBlocks(ctx interface{}, startSlot interface{}, endSlot interface{}, commitment interface{}) *RPCClient_GetBlocks_Call { - return &RPCClient_GetBlocks_Call{Call: _e.mock.On("GetBlocks", ctx, startSlot, endSlot, commitment)} +func (_e *RPCClient_Expecter) GetBlocks(ctx interface{}, startSlot interface{}, endSlot interface{}) *RPCClient_GetBlocks_Call { + return &RPCClient_GetBlocks_Call{Call: _e.mock.On("GetBlocks", ctx, startSlot, endSlot)} } -func (_c *RPCClient_GetBlocks_Call) Run(run func(ctx context.Context, startSlot uint64, endSlot *uint64, commitment rpc.CommitmentType)) *RPCClient_GetBlocks_Call { +func (_c *RPCClient_GetBlocks_Call) Run(run func(ctx context.Context, startSlot uint64, endSlot *uint64)) *RPCClient_GetBlocks_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint64), args[2].(*uint64), args[3].(rpc.CommitmentType)) + run(args[0].(context.Context), args[1].(uint64), args[2].(*uint64)) }) return _c } @@ -141,34 +140,34 @@ func (_c *RPCClient_GetBlocks_Call) Return(out rpc.BlocksResult, err error) *RPC return _c } -func (_c *RPCClient_GetBlocks_Call) RunAndReturn(run func(context.Context, uint64, *uint64, rpc.CommitmentType) (rpc.BlocksResult, error)) *RPCClient_GetBlocks_Call { +func (_c *RPCClient_GetBlocks_Call) RunAndReturn(run func(context.Context, uint64, *uint64) (rpc.BlocksResult, error)) *RPCClient_GetBlocks_Call { _c.Call.Return(run) return _c } -// GetLatestBlockhash provides a mock function with given fields: ctx, commitment -func (_m *RPCClient) GetLatestBlockhash(ctx context.Context, commitment rpc.CommitmentType) (*rpc.GetLatestBlockhashResult, error) { - ret := _m.Called(ctx, commitment) +// GetSignaturesForAddressWithOpts provides a mock function with given fields: _a0, _a1, _a2 +func (_m *RPCClient) GetSignaturesForAddressWithOpts(_a0 context.Context, _a1 solana.PublicKey, _a2 *rpc.GetSignaturesForAddressOpts) ([]*rpc.TransactionSignature, error) { + ret := _m.Called(_a0, _a1, _a2) if len(ret) == 0 { - panic("no return value specified for GetLatestBlockhash") + panic("no return value specified for GetSignaturesForAddressWithOpts") } - var r0 *rpc.GetLatestBlockhashResult + var r0 []*rpc.TransactionSignature var r1 error - if rf, ok := ret.Get(0).(func(context.Context, rpc.CommitmentType) (*rpc.GetLatestBlockhashResult, error)); ok { - return rf(ctx, commitment) + if rf, ok := ret.Get(0).(func(context.Context, solana.PublicKey, *rpc.GetSignaturesForAddressOpts) ([]*rpc.TransactionSignature, error)); ok { + return rf(_a0, _a1, _a2) } - if rf, ok := ret.Get(0).(func(context.Context, rpc.CommitmentType) *rpc.GetLatestBlockhashResult); ok { - r0 = rf(ctx, commitment) + if rf, ok := ret.Get(0).(func(context.Context, solana.PublicKey, *rpc.GetSignaturesForAddressOpts) []*rpc.TransactionSignature); ok { + r0 = rf(_a0, _a1, _a2) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*rpc.GetLatestBlockhashResult) + r0 = ret.Get(0).([]*rpc.TransactionSignature) } } - if rf, ok := ret.Get(1).(func(context.Context, rpc.CommitmentType) error); ok { - r1 = rf(ctx, commitment) + if rf, ok := ret.Get(1).(func(context.Context, solana.PublicKey, *rpc.GetSignaturesForAddressOpts) error); ok { + r1 = rf(_a0, _a1, _a2) } else { r1 = ret.Error(1) } @@ -176,58 +175,59 @@ func (_m *RPCClient) GetLatestBlockhash(ctx context.Context, commitment rpc.Comm return r0, r1 } -// RPCClient_GetLatestBlockhash_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestBlockhash' -type RPCClient_GetLatestBlockhash_Call struct { +// RPCClient_GetSignaturesForAddressWithOpts_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSignaturesForAddressWithOpts' +type RPCClient_GetSignaturesForAddressWithOpts_Call struct { *mock.Call } -// GetLatestBlockhash is a helper method to define mock.On call -// - ctx context.Context -// - commitment rpc.CommitmentType -func (_e *RPCClient_Expecter) GetLatestBlockhash(ctx interface{}, commitment interface{}) *RPCClient_GetLatestBlockhash_Call { - return &RPCClient_GetLatestBlockhash_Call{Call: _e.mock.On("GetLatestBlockhash", ctx, commitment)} +// GetSignaturesForAddressWithOpts is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 solana.PublicKey +// - _a2 *rpc.GetSignaturesForAddressOpts +func (_e *RPCClient_Expecter) GetSignaturesForAddressWithOpts(_a0 interface{}, _a1 interface{}, _a2 interface{}) *RPCClient_GetSignaturesForAddressWithOpts_Call { + return &RPCClient_GetSignaturesForAddressWithOpts_Call{Call: _e.mock.On("GetSignaturesForAddressWithOpts", _a0, _a1, _a2)} } -func (_c *RPCClient_GetLatestBlockhash_Call) Run(run func(ctx context.Context, commitment rpc.CommitmentType)) *RPCClient_GetLatestBlockhash_Call { +func (_c *RPCClient_GetSignaturesForAddressWithOpts_Call) Run(run func(_a0 context.Context, _a1 solana.PublicKey, _a2 *rpc.GetSignaturesForAddressOpts)) *RPCClient_GetSignaturesForAddressWithOpts_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(rpc.CommitmentType)) + run(args[0].(context.Context), args[1].(solana.PublicKey), args[2].(*rpc.GetSignaturesForAddressOpts)) }) return _c } -func (_c *RPCClient_GetLatestBlockhash_Call) Return(out *rpc.GetLatestBlockhashResult, err error) *RPCClient_GetLatestBlockhash_Call { - _c.Call.Return(out, err) +func (_c *RPCClient_GetSignaturesForAddressWithOpts_Call) Return(_a0 []*rpc.TransactionSignature, _a1 error) *RPCClient_GetSignaturesForAddressWithOpts_Call { + _c.Call.Return(_a0, _a1) return _c } -func (_c *RPCClient_GetLatestBlockhash_Call) RunAndReturn(run func(context.Context, rpc.CommitmentType) (*rpc.GetLatestBlockhashResult, error)) *RPCClient_GetLatestBlockhash_Call { +func (_c *RPCClient_GetSignaturesForAddressWithOpts_Call) RunAndReturn(run func(context.Context, solana.PublicKey, *rpc.GetSignaturesForAddressOpts) ([]*rpc.TransactionSignature, error)) *RPCClient_GetSignaturesForAddressWithOpts_Call { _c.Call.Return(run) return _c } -// GetSignaturesForAddressWithOpts provides a mock function with given fields: _a0, _a1, _a2 -func (_m *RPCClient) GetSignaturesForAddressWithOpts(_a0 context.Context, _a1 solana.PublicKey, _a2 *rpc.GetSignaturesForAddressOpts) ([]*rpc.TransactionSignature, error) { - ret := _m.Called(_a0, _a1, _a2) +// LatestBlockhash provides a mock function with given fields: ctx +func (_m *RPCClient) LatestBlockhash(ctx context.Context) (*rpc.GetLatestBlockhashResult, error) { + ret := _m.Called(ctx) if len(ret) == 0 { - panic("no return value specified for GetSignaturesForAddressWithOpts") + panic("no return value specified for LatestBlockhash") } - var r0 []*rpc.TransactionSignature + var r0 *rpc.GetLatestBlockhashResult var r1 error - if rf, ok := ret.Get(0).(func(context.Context, solana.PublicKey, *rpc.GetSignaturesForAddressOpts) ([]*rpc.TransactionSignature, error)); ok { - return rf(_a0, _a1, _a2) + if rf, ok := ret.Get(0).(func(context.Context) (*rpc.GetLatestBlockhashResult, error)); ok { + return rf(ctx) } - if rf, ok := ret.Get(0).(func(context.Context, solana.PublicKey, *rpc.GetSignaturesForAddressOpts) []*rpc.TransactionSignature); ok { - r0 = rf(_a0, _a1, _a2) + if rf, ok := ret.Get(0).(func(context.Context) *rpc.GetLatestBlockhashResult); ok { + r0 = rf(ctx) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]*rpc.TransactionSignature) + r0 = ret.Get(0).(*rpc.GetLatestBlockhashResult) } } - if rf, ok := ret.Get(1).(func(context.Context, solana.PublicKey, *rpc.GetSignaturesForAddressOpts) error); ok { - r1 = rf(_a0, _a1, _a2) + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) } else { r1 = ret.Error(1) } @@ -235,32 +235,30 @@ func (_m *RPCClient) GetSignaturesForAddressWithOpts(_a0 context.Context, _a1 so return r0, r1 } -// RPCClient_GetSignaturesForAddressWithOpts_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSignaturesForAddressWithOpts' -type RPCClient_GetSignaturesForAddressWithOpts_Call struct { +// RPCClient_LatestBlockhash_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LatestBlockhash' +type RPCClient_LatestBlockhash_Call struct { *mock.Call } -// GetSignaturesForAddressWithOpts is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 solana.PublicKey -// - _a2 *rpc.GetSignaturesForAddressOpts -func (_e *RPCClient_Expecter) GetSignaturesForAddressWithOpts(_a0 interface{}, _a1 interface{}, _a2 interface{}) *RPCClient_GetSignaturesForAddressWithOpts_Call { - return &RPCClient_GetSignaturesForAddressWithOpts_Call{Call: _e.mock.On("GetSignaturesForAddressWithOpts", _a0, _a1, _a2)} +// LatestBlockhash is a helper method to define mock.On call +// - ctx context.Context +func (_e *RPCClient_Expecter) LatestBlockhash(ctx interface{}) *RPCClient_LatestBlockhash_Call { + return &RPCClient_LatestBlockhash_Call{Call: _e.mock.On("LatestBlockhash", ctx)} } -func (_c *RPCClient_GetSignaturesForAddressWithOpts_Call) Run(run func(_a0 context.Context, _a1 solana.PublicKey, _a2 *rpc.GetSignaturesForAddressOpts)) *RPCClient_GetSignaturesForAddressWithOpts_Call { +func (_c *RPCClient_LatestBlockhash_Call) Run(run func(ctx context.Context)) *RPCClient_LatestBlockhash_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(solana.PublicKey), args[2].(*rpc.GetSignaturesForAddressOpts)) + run(args[0].(context.Context)) }) return _c } -func (_c *RPCClient_GetSignaturesForAddressWithOpts_Call) Return(_a0 []*rpc.TransactionSignature, _a1 error) *RPCClient_GetSignaturesForAddressWithOpts_Call { - _c.Call.Return(_a0, _a1) +func (_c *RPCClient_LatestBlockhash_Call) Return(out *rpc.GetLatestBlockhashResult, err error) *RPCClient_LatestBlockhash_Call { + _c.Call.Return(out, err) return _c } -func (_c *RPCClient_GetSignaturesForAddressWithOpts_Call) RunAndReturn(run func(context.Context, solana.PublicKey, *rpc.GetSignaturesForAddressOpts) ([]*rpc.TransactionSignature, error)) *RPCClient_GetSignaturesForAddressWithOpts_Call { +func (_c *RPCClient_LatestBlockhash_Call) RunAndReturn(run func(context.Context) (*rpc.GetLatestBlockhashResult, error)) *RPCClient_LatestBlockhash_Call { _c.Call.Return(run) return _c } diff --git a/pkg/solana/logpoller/models.go b/pkg/solana/logpoller/models.go index 0be5b4874..2fe406d74 100644 --- a/pkg/solana/logpoller/models.go +++ b/pkg/solana/logpoller/models.go @@ -1,9 +1,9 @@ package logpoller import ( + "encoding/base64" + "fmt" "time" - - "github.com/lib/pq" ) type Filter struct { @@ -13,7 +13,7 @@ type Filter struct { EventName string EventSig EventSignature StartingBlock int64 - EventIDL string + EventIdl EventIdl SubkeyPaths SubkeyPaths Retention time.Duration MaxLogsKept int64 @@ -22,7 +22,20 @@ type Filter struct { } func (f Filter) MatchSameLogs(other Filter) bool { - return f.Address == other.Address && f.EventSig == other.EventSig && f.EventIDL == other.EventIDL && f.SubkeyPaths.Equal(other.SubkeyPaths) + return f.Address == other.Address && f.EventSig == other.EventSig && + f.EventIdl.Equal(other.EventIdl) && f.SubkeyPaths.Equal(other.SubkeyPaths) +} + +// Discriminator returns a 12 character base64-encoded string +// +// This is the base64 encoding of the [8]byte discriminator returned by utils.Discriminator +func (f Filter) Discriminator() string { + d := Discriminator("event", f.EventName) + b64encoded := base64.StdEncoding.EncodeToString(d[:]) + if len(b64encoded) != 12 { + panic(fmt.Sprintf("Assumption Violation: expected encoding/base64 to return 12 character base64-encoding, got %d characters", len(b64encoded))) + } + return b64encoded } type Log struct { @@ -35,7 +48,7 @@ type Log struct { BlockTimestamp time.Time Address PublicKey EventSig EventSignature - SubkeyValues pq.ByteaArray + SubkeyValues []IndexedValue TxHash Signature Data []byte CreatedAt time.Time diff --git a/pkg/solana/logpoller/orm.go b/pkg/solana/logpoller/orm.go index 2239ed608..ae6e9118e 100644 --- a/pkg/solana/logpoller/orm.go +++ b/pkg/solana/logpoller/orm.go @@ -27,6 +27,10 @@ func NewORM(chainID string, ds sqlutil.DataSource, lggr logger.Logger) *DSORM { } } +func (o *DSORM) ChainID() string { + return o.chainID +} + func (o *DSORM) Transact(ctx context.Context, fn func(*DSORM) error) (err error) { return sqlutil.Transact(ctx, o.new, o.ds, nil, fn) } @@ -48,7 +52,7 @@ func (o *DSORM) InsertFilter(ctx context.Context, filter Filter) (id int64, err withEventName(filter.EventName). withEventSig(filter.EventSig). withStartingBlock(filter.StartingBlock). - withEventIDL(filter.EventIDL). + withEventIDL(filter.EventIdl). withSubkeyPaths(filter.SubkeyPaths). withIsBackfilled(filter.IsBackfilled). toArgs() @@ -148,7 +152,7 @@ func (o *DSORM) insertLogsWithinTx(ctx context.Context, logs []Log, tx sqlutil.D (:filter_id, :chain_id, :log_index, :block_hash, :block_number, :block_timestamp, :address, :event_sig, :subkey_values, :tx_hash, :data, NOW(), :expires_at, :sequence_num) ON CONFLICT DO NOTHING` - _, err := tx.NamedExecContext(ctx, query, logs[start:end]) + res, err := tx.NamedExecContext(ctx, query, logs[start:end]) if err != nil { if errors.Is(err, context.DeadlineExceeded) && batchInsertSize > 500 { // In case of DB timeouts, try to insert again with a smaller batch upto a limit @@ -158,6 +162,14 @@ func (o *DSORM) insertLogsWithinTx(ctx context.Context, logs []Log, tx sqlutil.D } return err } + numRows, err := res.RowsAffected() + if err == nil { + if numRows != int64(len(logs)) { + // This probably just means we're trying to insert the same log twice, but could also be an indication + // of other constraint violations + o.lggr.Debugf("attempted to insert %d logs, but could only insert %d", len(logs), numRows) + } + } } return nil } @@ -225,3 +237,20 @@ func (o *DSORM) FilteredLogs(ctx context.Context, filter []query.Expression, lim return logs, nil } + +func (o *DSORM) SelectSeqNums(ctx context.Context) (map[int64]int64, error) { + results := make([]struct { + FilterID int64 + SequenceNum int64 + }, 0) + query := "SELECT filter_id, MAX(sequence_num) AS sequence_num FROM solana.logs WHERE chain_id=$1 GROUP BY filter_id" + err := o.ds.SelectContext(ctx, &results, query, o.chainID) + if err != nil { + return nil, err + } + seqNums := make(map[int64]int64) + for _, row := range results { + seqNums[row.FilterID] = row.SequenceNum + } + return seqNums, nil +} diff --git a/pkg/solana/logpoller/orm_test.go b/pkg/solana/logpoller/orm_test.go index 53512d696..7fab36467 100644 --- a/pkg/solana/logpoller/orm_test.go +++ b/pkg/solana/logpoller/orm_test.go @@ -3,18 +3,18 @@ package logpoller import ( + "math/rand" "testing" "time" "github.com/gagliardetto/solana-go" "github.com/google/uuid" - "github.com/stretchr/testify/require" _ "github.com/jackc/pgx/v4/stdlib" - "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + "github.com/stretchr/testify/require" ) // NOTE: at the moment it's not possible to run all db tests at once. This issue will be addressed separately @@ -33,7 +33,7 @@ func TestLogPollerFilters(t *testing.T) { EventName: "event", EventSig: EventSignature{1, 2, 3}, StartingBlock: 1, - EventIDL: "{}", + EventIdl: EventIdl{}, SubkeyPaths: SubkeyPaths([][]string{{"a", "b"}, {"c"}}), Retention: 1000, MaxLogsKept: 3, @@ -44,7 +44,6 @@ func TestLogPollerFilters(t *testing.T) { EventName: "event", EventSig: EventSignature{1, 2, 3}, StartingBlock: 1, - EventIDL: "{}", SubkeyPaths: SubkeyPaths([][]string{}), Retention: 1000, MaxLogsKept: 3, @@ -55,7 +54,6 @@ func TestLogPollerFilters(t *testing.T) { EventName: "event", EventSig: EventSignature{1, 2, 3}, StartingBlock: 1, - EventIDL: "{}", SubkeyPaths: nil, Retention: 1000, MaxLogsKept: 3, @@ -191,19 +189,35 @@ func TestLogPollerLogs(t *testing.T) { ctx := tests.Context(t) // create filter as it's required for a log filterID, err := orm.InsertFilter(ctx, newRandomFilter(t)) + filterID2, err := orm.InsertFilter(ctx, newRandomFilter(t)) require.NoError(t, err) log := newRandomLog(t, filterID, chainID) - err = orm.InsertLogs(ctx, []Log{log}) + log2 := newRandomLog(t, filterID2, chainID) + err = orm.InsertLogs(ctx, []Log{log, log2}) require.NoError(t, err) // insert of the same Log should not produce two instances err = orm.InsertLogs(ctx, []Log{log}) require.NoError(t, err) - dbLogs, err := orm.SelectLogs(ctx, 0, 100, log.Address, log.EventSig) + + dbLogs, err := orm.SelectLogs(ctx, 0, 1000000, log.Address, log.EventSig) require.NoError(t, err) require.Len(t, dbLogs, 1) log.ID = dbLogs[0].ID log.CreatedAt = dbLogs[0].CreatedAt require.Equal(t, log, dbLogs[0]) + + dbLogs, err = orm.SelectLogs(ctx, 0, 1000000, log2.Address, log2.EventSig) + require.NoError(t, err) + require.Len(t, dbLogs, 1) + log2.ID = dbLogs[0].ID + log2.CreatedAt = dbLogs[0].CreatedAt + require.Equal(t, log2, dbLogs[0]) + + t.Run("SelectSequenceNums", func(t *testing.T) { + seqNums, err := orm.SelectSeqNums(tests.Context(t)) + require.NoError(t, err) + require.Len(t, seqNums, 2) + }) } func newRandomFilter(t *testing.T) Filter { @@ -213,7 +227,6 @@ func newRandomFilter(t *testing.T) Filter { EventName: "event", EventSig: newRandomEventSignature(t), StartingBlock: 1, - EventIDL: "{}", SubkeyPaths: [][]string{{"a", "b"}, {"c"}}, Retention: 1000, MaxLogsKept: 3, @@ -230,14 +243,15 @@ func newRandomLog(t *testing.T, filterID int64, chainID string) Log { return Log{ FilterID: filterID, ChainID: chainID, - LogIndex: 1, + LogIndex: rand.Int63n(1000), BlockHash: Hash(pubKey), - BlockNumber: 10, + BlockNumber: rand.Int63n(1000000), BlockTimestamp: time.Unix(1731590113, 0), Address: PublicKey(pubKey), EventSig: EventSignature{3, 2, 1}, SubkeyValues: [][]byte{{3, 2, 1}, {1}, {1, 2}, pubKey.Bytes()}, TxHash: Signature(signature), Data: data, + SequenceNum: rand.Int63n(500), } } diff --git a/pkg/solana/logpoller/query.go b/pkg/solana/logpoller/query.go index ba310c6b5..4c7844183 100644 --- a/pkg/solana/logpoller/query.go +++ b/pkg/solana/logpoller/query.go @@ -86,8 +86,8 @@ func (q *queryArgs) withStartingBlock(startingBlock int64) *queryArgs { } // withEventIDL sets the EventIDL field in queryArgs. -func (q *queryArgs) withEventIDL(eventIDL string) *queryArgs { - return q.withField("event_idl", eventIDL) +func (q *queryArgs) withEventIDL(eventIdl EventIdl) *queryArgs { + return q.withField("event_idl", eventIdl) } // withSubkeyPaths sets the SubkeyPaths field in queryArgs. diff --git a/pkg/solana/logpoller/test_helpers.go b/pkg/solana/logpoller/test_helpers.go new file mode 100644 index 000000000..8511ae1ac --- /dev/null +++ b/pkg/solana/logpoller/test_helpers.go @@ -0,0 +1,45 @@ +package logpoller + +import ( + "math/rand" + "testing" + "time" + + "github.com/gagliardetto/solana-go" + "github.com/stretchr/testify/require" +) + +func newRandomPublicKey(t *testing.T) PublicKey { + privateKey, err := solana.NewRandomPrivateKey() + require.NoError(t, err) + pubKey := privateKey.PublicKey() + return PublicKey(pubKey) +} + +func newRandomEventSignature(t *testing.T) EventSignature { + pubKey := newRandomPublicKey(t) + return EventSignature(pubKey[:8]) +} + +func newRandomLog(t *testing.T, filterID int64, chainID string, eventName string) Log { + privateKey, err := solana.NewRandomPrivateKey() + require.NoError(t, err) + pubKey := privateKey.PublicKey() + data := []byte("solana is fun") + signature, err := privateKey.Sign(data) + require.NoError(t, err) + return Log{ + FilterID: filterID, + ChainID: chainID, + LogIndex: rand.Int63n(1000), + BlockHash: Hash(pubKey), + BlockNumber: rand.Int63n(1000000), + BlockTimestamp: time.Unix(1731590113, 0).UTC(), + Address: PublicKey(pubKey), + EventSig: Discriminator("event", eventName), + SubkeyValues: []IndexedValue{{3, 2, 1}, {1}, {1, 2}, pubKey.Bytes()}, + TxHash: Signature(signature), + Data: data, + SequenceNum: rand.Int63n(500), + } +} diff --git a/pkg/solana/logpoller/types.go b/pkg/solana/logpoller/types.go index 143c28898..dc8b614bc 100644 --- a/pkg/solana/logpoller/types.go +++ b/pkg/solana/logpoller/types.go @@ -1,12 +1,18 @@ package logpoller import ( + "context" "database/sql/driver" + "encoding/binary" "encoding/json" "fmt" + "math" + "reflect" "slices" "github.com/gagliardetto/solana-go" + + "github.com/smartcontractkit/chainlink-solana/pkg/solana/codec" ) type PublicKey solana.PublicKey @@ -76,6 +82,53 @@ func (p SubkeyPaths) Value() (driver.Value, error) { } func (p *SubkeyPaths) Scan(src interface{}) error { + return scanJSON("SubkeyPaths", p, src) +} + +func (p SubkeyPaths) Equal(o SubkeyPaths) bool { + return slices.EqualFunc(p, o, slices.Equal) +} + +const EventSignatureLength = 8 + +type EventSignature [EventSignatureLength]byte + +// Scan implements Scanner for database/sql. +func (s *EventSignature) Scan(src interface{}) error { + return scanFixedLengthArray("EventSignature", EventSignatureLength, src, s[:]) +} + +// Value implements valuer for database/sql. +func (s EventSignature) Value() (driver.Value, error) { + return s[:], nil +} + +type Decoder interface { + CreateType(itemType string, _ bool) (any, error) + Decode(_ context.Context, raw []byte, into any, itemType string) error +} + +type EventIdl struct { + codec.IdlEvent + codec.IdlTypeDefSlice +} + +func (e *EventIdl) Scan(src interface{}) error { + return scanJSON("EventIdl", e, src) +} + +func (e EventIdl) Value() (driver.Value, error) { + return json.Marshal(map[string]any{ + "IdlEvent": e.IdlEvent, + "IdlTypeDefSlice": e.IdlTypeDefSlice, + }) +} + +func (e EventIdl) Equal(o EventIdl) bool { + return reflect.DeepEqual(e, o) +} + +func scanJSON(name string, dest, src interface{}) error { var bSrc []byte switch src := src.(type) { case string: @@ -83,35 +136,73 @@ func (p *SubkeyPaths) Scan(src interface{}) error { case []byte: bSrc = src default: - return fmt.Errorf("can't scan %T into SubkeyPaths", src) + return fmt.Errorf("can't scan %T into %s", src, name) } if len(bSrc) == 0 || string(bSrc) == "null" { return nil } - err := json.Unmarshal(bSrc, p) + err := json.Unmarshal(bSrc, dest) if err != nil { - return fmt.Errorf("failed to scan %v into SubkeyPaths: %w", string(bSrc), err) + return fmt.Errorf("failed to scan %v into %s: %w", string(bSrc), name, err) } return nil } -func (p SubkeyPaths) Equal(o SubkeyPaths) bool { - return slices.EqualFunc(p, o, slices.Equal) -} +// IndexedValue represents a value which can be written to, read from, or compared to an indexed BYTEA +// postgres field. Maps, structs, and slices or arrays (of anything but byte) are not supported. For signed +// or unsigned integer types, strings, or byte arrays, the SQL operators <, =, & > should work in the expected +// way. +type IndexedValue []byte -const EventSignatureLength = 8 +func (v *IndexedValue) FromUint64(u uint64) { + *v = make([]byte, 8) + binary.BigEndian.PutUint64(*v, u) +} -type EventSignature [EventSignatureLength]byte +func (v *IndexedValue) FromInt64(i int64) { + v.FromUint64(uint64(i + math.MaxInt64 + 1)) //nolint gosec passing i=math.MaxInt64 and i=math.MinInt64 are proven safe in TestIndexedValue +} -// Scan implements Scanner for database/sql. -func (s *EventSignature) Scan(src interface{}) error { - return scanFixedLengthArray("EventSignature", EventSignatureLength, src, s[:]) +func (v *IndexedValue) FromFloat64(f float64) { + if f > 0 { + v.FromUint64(math.Float64bits(f) + math.MaxInt64 + 1) + return + } + v.FromUint64(math.MaxInt64 + 1 - math.Float64bits(f)) } -// Value implements valuer for database/sql. -func (s EventSignature) Value() (driver.Value, error) { - return s[:], nil +func NewIndexedValue(typedVal any) (iVal IndexedValue, err error) { + // handle 2 simplest cases first + switch t := typedVal.(type) { + case []byte: + return t, nil + case string: + return []byte(t), nil + } + + // handle numeric types + v := reflect.ValueOf(typedVal) + if v.CanUint() { + iVal.FromUint64(v.Uint()) + return iVal, nil + } + if v.CanInt() { + iVal.FromInt64(v.Int()) + return iVal, nil + } + if v.CanFloat() { + iVal.FromFloat64(v.Float()) + return iVal, nil + } + + // any length array is fine as long as the element type is byte + if t := v.Type(); t.Kind() == reflect.Array { + if t.Elem().Kind() == reflect.Uint8 { + return v.Bytes(), nil + } + } + return nil, fmt.Errorf("can't create indexed value from type %T", typedVal) } diff --git a/pkg/solana/logpoller/types_test.go b/pkg/solana/logpoller/types_test.go index 263c22bab..b7ed36c6d 100644 --- a/pkg/solana/logpoller/types_test.go +++ b/pkg/solana/logpoller/types_test.go @@ -1,20 +1,45 @@ package logpoller import ( + "math" "testing" - "github.com/gagliardetto/solana-go" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func newRandomPublicKey(t *testing.T) PublicKey { - privateKey, err := solana.NewRandomPrivateKey() - require.NoError(t, err) - pubKey := privateKey.PublicKey() - return PublicKey(pubKey) -} +func TestIndexedValue(t *testing.T) { + cases := []struct { + typeName string + lower any + higher any + }{ -func newRandomEventSignature(t *testing.T) EventSignature { - pubKey := newRandomPublicKey(t) - return EventSignature(pubKey[:8]) + {"uint64", uint64(math.MaxUint32), uint64(math.MaxUint64)}, + {"int32", int32(math.MinInt32), int32(math.MaxInt32)}, + {"int32", int32(-8), int32(-5)}, + {"int32", int32(5), int32(8)}, + {"int64", int64(math.MinInt64), int64(math.MaxInt64)}, + {"int64", int64(-8), int64(-5)}, + {"int64", int64(5), int64(8)}, + {"float32", float32(-5), float32(5)}, + {"float32", float32(-8), float32(-5)}, + {"float32", float32(5), float32(8)}, + {"float64", float64(-5), float64(5)}, + {"float64", float64(-8), float64(-5)}, + {"float64", float64(5), float64(8)}, + {"string", "abcc", "abcd"}, + {"string", "abcd", "abcdef"}, + {"[]byte", []byte("abcc"), []byte("abcd")}, + {"[]byte", []byte("abcd"), []byte("abcdef")}, + } + for _, c := range cases { + t.Run(c.typeName, func(t *testing.T) { + iVal1, err := NewIndexedValue(c.lower) + require.NoError(t, err) + iVal2, err := NewIndexedValue(c.higher) + require.NoError(t, err) + assert.Less(t, iVal1, iVal2) + }) + } }