From 1277c384f691db020c3ae4f5e802526f6882a983 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Wed, 17 Mar 2021 15:10:18 -0500 Subject: [PATCH 1/2] notify on new block data --- pkg/eth/api.go | 14 ++++- pkg/eth/api_test.go | 3 +- pkg/eth/eth_state_test.go | 6 ++- pkg/eth/helpers.go | 3 +- pkg/eth/subscription_config.go | 2 +- pkg/eth/subscriptions.go | 70 +++++++++++++++++++++++++ pkg/events/notifier.go | 96 ++++++++++++++++++++++++++++++++++ pkg/events/notifier_test.go | 1 + pkg/serve/config.go | 15 +++--- pkg/serve/service.go | 9 +++- 10 files changed, 202 insertions(+), 17 deletions(-) create mode 100644 pkg/eth/subscriptions.go create mode 100644 pkg/events/notifier.go create mode 100644 pkg/events/notifier_test.go diff --git a/pkg/eth/api.go b/pkg/eth/api.go index 9b462e3a2..a87299140 100644 --- a/pkg/eth/api.go +++ b/pkg/eth/api.go @@ -25,6 +25,8 @@ import ( "math/big" "time" + "github.com/vulcanize/ipld-eth-server/pkg/events" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -52,6 +54,9 @@ type PublicEthAPI struct { // Local db backend B *Backend + // Event subscription backend + events *events.Notifier + // Proxy node for forwarding cache misses supportsStateDiff bool // Whether or not the remote node supports the statediff_writeStateDiffAt endpoint, if it does we can fill the local cache when we hit a miss rpc *rpc.Client @@ -59,17 +64,22 @@ type PublicEthAPI struct { } // NewPublicEthAPI creates a new PublicEthAPI with the provided underlying Backend -func NewPublicEthAPI(b *Backend, client *rpc.Client, supportsStateDiff bool) *PublicEthAPI { +func NewPublicEthAPI(b *Backend, client *rpc.Client, supportsStateDiff bool) (*PublicEthAPI, error) { var ethClient *ethclient.Client if client != nil { ethClient = ethclient.NewClient(client) } + notifier, err := events.NewNotifier(b.DB.Config.DbConnectionString(), channelName) + if err != nil { + return nil, err + } return &PublicEthAPI{ B: b, + events: notifier, supportsStateDiff: supportsStateDiff, rpc: client, ethClient: ethClient, - } + }, nil } /* diff --git a/pkg/eth/api_test.go b/pkg/eth/api_test.go index 24af52149..290031b6d 100644 --- a/pkg/eth/api_test.go +++ b/pkg/eth/api_test.go @@ -189,7 +189,8 @@ var _ = Describe("API", func() { indexAndPublisher := eth2.NewIPLDPublisher(db) backend, err := eth.NewEthBackend(db, ð.Config{}) Expect(err).ToNot(HaveOccurred()) - api = eth.NewPublicEthAPI(backend, nil, false) + api, err = eth.NewPublicEthAPI(backend, nil, false) + Expect(err).ToNot(HaveOccurred()) err = indexAndPublisher.Publish(test_helpers.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) err = publishCode(db, test_helpers.ContractCodeHash, test_helpers.ContractCode) diff --git a/pkg/eth/eth_state_test.go b/pkg/eth/eth_state_test.go index 79784951f..67c83c310 100644 --- a/pkg/eth/eth_state_test.go +++ b/pkg/eth/eth_state_test.go @@ -83,7 +83,8 @@ var _ = Describe("eth state reading tests", func() { RPCGasCap: big.NewInt(10000000000), }) Expect(err).ToNot(HaveOccurred()) - api = eth.NewPublicEthAPI(backend, nil, false) + api, err = eth.NewPublicEthAPI(backend, nil, false) + Expect(err).ToNot(HaveOccurred()) // make the test blockchain (and state) blocks, receipts, chain = test_helpers.MakeChain(5, test_helpers.Genesis, test_helpers.TestChainGen) @@ -153,7 +154,8 @@ var _ = Describe("eth state reading tests", func() { // Insert some non-canonical data into the database so that we test our ability to discern canonicity indexAndPublisher := eth2.NewIPLDPublisher(db) - api = eth.NewPublicEthAPI(backend, nil, false) + api, err = eth.NewPublicEthAPI(backend, nil, false) + Expect(err).ToNot(HaveOccurred()) err = indexAndPublisher.Publish(test_helpers.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) // The non-canonical header has a child diff --git a/pkg/eth/helpers.go b/pkg/eth/helpers.go index fc09d13c8..b4d4f679a 100644 --- a/pkg/eth/helpers.go +++ b/pkg/eth/helpers.go @@ -19,9 +19,10 @@ package eth import ( "encoding/json" "fmt" + "os" + "github.com/ethereum/go-ethereum/cmd/utils" log "github.com/sirupsen/logrus" - "os" sdtypes "github.com/ethereum/go-ethereum/statediff/types" diff --git a/pkg/eth/subscription_config.go b/pkg/eth/subscription_config.go index d74ad3fd9..bd4cdface 100644 --- a/pkg/eth/subscription_config.go +++ b/pkg/eth/subscription_config.go @@ -72,7 +72,7 @@ type StorageFilter struct { IntermediateNodes bool } -// Init is used to initialize a EthSubscription struct with env variables +// NewEthSubscriptionConfig initializes and returns an EthSubscription struct with env variables func NewEthSubscriptionConfig() (*SubscriptionSettings, error) { sc := new(SubscriptionSettings) // Below default to false, which means we do not backfill by default diff --git a/pkg/eth/subscriptions.go b/pkg/eth/subscriptions.go new file mode 100644 index 000000000..685268d30 --- /dev/null +++ b/pkg/eth/subscriptions.go @@ -0,0 +1,70 @@ +package eth + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/eth/filters" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/rpc" +) + +const ( + channelName = "channelName" +) + +func (api *PublicEthAPI) NewStateChanges(ctx context.Context, crit filters.FilterCriteria) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + + go func() { + stateChanges := make(chan Payload) + stateChangeSub := api.events.SubscribeStateChanges(ethereum.FilterQuery(crit), stateChanges) + + for { + select { + case s := <-stateChanges: + notifier.Notify(rpcSub.ID, s) + case <-rpcSub.Err(): + stateChangeSub.Unsubscribe() + return + case <-notifier.Closed(): + stateChangeSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + +// Payload packages the data to send to statediff subscriptions +type Payload struct { + StateDiffRlp []byte `json:"stateDiff" gencodec:"required"` +} + +// StateDiff is the final output structure from the builder +type StateDiff struct { + BlockNumber *big.Int `json:"blockNumber" gencodec:"required"` + BlockHash common.Hash `json:"blockHash" gencodec:"required"` + UpdatedAccounts []AccountDiff `json:"updatedAccounts" gencodec:"required"` +} + +// AccountDiff holds the data for a single state diff node +type AccountDiff struct { + Key []byte `json:"key" gencodec:"required"` + Value []byte `json:"value" gencodec:"required"` + Storage []StorageDiff `json:"storage" gencodec:"required"` +} + +// StorageDiff holds the data for a single storage diff node +type StorageDiff struct { + Key []byte `json:"key" gencodec:"required"` + Value []byte `json:"value" gencodec:"required"` +} diff --git a/pkg/events/notifier.go b/pkg/events/notifier.go new file mode 100644 index 000000000..ee72a9f5a --- /dev/null +++ b/pkg/events/notifier.go @@ -0,0 +1,96 @@ +// VulcanizeDB +// Copyright © 2021 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package events + +import ( + "sync" + "time" + + "github.com/lib/pq" + log "github.com/sirupsen/logrus" +) + +// Notifier listens to inserts on Postgres tables and forwards the data +type Notifier struct { + listener *pq.Listener + failed chan error +} + +// NewNotifier creates a new notifier for given PostgreSQL credentials. +func NewNotifier(connectStr, channelName string) (*Notifier, error) { + n := &Notifier{failed: make(chan error, 2)} + + listener := pq.NewListener( + connectStr, + 10*time.Second, time.Minute, + n.callBack) + + if err := listener.Listen(channelName); err != nil { + listener.Close() + log.Println("ERROR!:", err) + return nil, err + } + + n.listener = listener + return n, nil +} + +// Notify is the main loop of the notifier to receive data from +// the database in JSON-FORMAT and send it down the provided channel. +func (n *Notifier) Notify(wg *sync.WaitGroup, outChan chan []byte, errChan chan error) { + wg.Wait() + go func() { + defer wg.Done() + for { + select { + case e := <-n.listener.Notify: + if e == nil { + continue + } + outChan <- []byte(e.Extra) + case err := <-n.failed: + if err != nil { + errChan <- err + } + return + case <-time.After(time.Minute): + if err := n.listener.Ping(); err != nil { + errChan <- err + return + } + } + } + }() +} + +// callBack +func (n *Notifier) callBack(event pq.ListenerEventType, err error) { + if err != nil { + log.Errorf("listener error: %s\n", err) + } + if event == pq.ListenerEventConnectionAttemptFailed { + n.failed <- err + } + if event == pq.ListenerEventDisconnected { + n.failed <- err + } +} + +// close closes the notifier. +func (n *Notifier) close() error { + return n.listener.Close() +} diff --git a/pkg/events/notifier_test.go b/pkg/events/notifier_test.go new file mode 100644 index 000000000..b3adf695c --- /dev/null +++ b/pkg/events/notifier_test.go @@ -0,0 +1 @@ +package events diff --git a/pkg/serve/config.go b/pkg/serve/config.go index 4578fddfe..d0b4d8357 100644 --- a/pkg/serve/config.go +++ b/pkg/serve/config.go @@ -22,14 +22,12 @@ import ( "os" "path/filepath" - "github.com/ethereum/go-ethereum/rpc" - "github.com/vulcanize/ipld-eth-indexer/pkg/shared" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rpc" "github.com/spf13/viper" "github.com/vulcanize/ipld-eth-indexer/pkg/postgres" + "github.com/vulcanize/ipld-eth-indexer/pkg/shared" "github.com/vulcanize/ipld-eth-indexer/utils" "github.com/vulcanize/ipld-eth-server/pkg/prom" @@ -55,7 +53,6 @@ const ( // Config struct type Config struct { DB *postgres.DB - DBConfig postgres.Config WSEndpoint string HTTPEndpoint string IPCEndpoint string @@ -80,7 +77,7 @@ func NewConfig() (*Config, error) { viper.BindEnv("ethereum.chainConfig", ETH_CHAIN_CONFIG) viper.BindEnv("ethereum.supportsStateDiff", ETH_SUPPORTS_STATEDIFF) - c.DBConfig.Init() + dbConfig := postgres.NewConfig() ethHTTP := viper.GetString("ethereum.httpPath") nodeInfo, cli, err := shared.GetEthNodeAndClient(fmt.Sprintf("http://%s", ethHTTP)) @@ -109,9 +106,9 @@ func NewConfig() (*Config, error) { httpPath = "127.0.0.1:8081" } c.HTTPEndpoint = httpPath - overrideDBConnConfig(&c.DBConfig) - serveDB := utils.LoadPostgres(c.DBConfig, nodeInfo, false) - prom.RegisterDBCollector(c.DBConfig.Name, serveDB.DB) + overrideDBConnConfig(dbConfig) + serveDB := utils.LoadPostgres(dbConfig, nodeInfo, false) + prom.RegisterDBCollector(dbConfig.Name, serveDB.DB) c.DB = &serveDB defaultSenderStr := viper.GetString("ethereum.defaultSender") diff --git a/pkg/serve/service.go b/pkg/serve/service.go index 0ca5b2c51..ab3d4ae89 100644 --- a/pkg/serve/service.go +++ b/pkg/serve/service.go @@ -76,6 +76,8 @@ type Service struct { SubscriptionTypes map[common.Hash]eth.SubscriptionSettings // Underlying db db *postgres.DB + // db config + dbConfig *postgres.Config // wg for syncing serve processes serveWg *sync.WaitGroup // rpc client for forwarding cache misses @@ -142,10 +144,15 @@ func (sap *Service) APIs() []rpc.API { Public: true, }, } + pea, err := eth.NewPublicEthAPI(sap.backend, sap.client, sap.supportsStateDiffing) + if err != nil { + log.Errorf("failed to create a new PublicEthAPI %v", err) + return apis + } return append(apis, rpc.API{ Namespace: eth.APIName, Version: eth.APIVersion, - Service: eth.NewPublicEthAPI(sap.backend, sap.client, sap.supportsStateDiffing), + Service: pea, Public: true, }) } From c0b1b36ef57b0eca56b2cae564e1e35641d2dbc5 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Fri, 19 Mar 2021 11:00:16 -0500 Subject: [PATCH 2/2] support SubscribeStateChanges --- pkg/eth/api.go | 2 +- pkg/eth/subscriptions.go | 46 +++-------- pkg/events/notifier.go | 157 +++++++++++++++++++++++++++++++++--- pkg/events/notifier_test.go | 1 - pkg/events/types.go | 53 ++++++++++++ 5 files changed, 212 insertions(+), 47 deletions(-) delete mode 100644 pkg/events/notifier_test.go create mode 100644 pkg/events/types.go diff --git a/pkg/eth/api.go b/pkg/eth/api.go index a87299140..1bb0cb205 100644 --- a/pkg/eth/api.go +++ b/pkg/eth/api.go @@ -69,7 +69,7 @@ func NewPublicEthAPI(b *Backend, client *rpc.Client, supportsStateDiff bool) (*P if client != nil { ethClient = ethclient.NewClient(client) } - notifier, err := events.NewNotifier(b.DB.Config.DbConnectionString(), channelName) + notifier, err := events.NewNotifier(b.DB, channelName) if err != nil { return nil, err } diff --git a/pkg/eth/subscriptions.go b/pkg/eth/subscriptions.go index 685268d30..52ea7d883 100644 --- a/pkg/eth/subscriptions.go +++ b/pkg/eth/subscriptions.go @@ -2,17 +2,16 @@ package eth import ( "context" - "math/big" - - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/eth/filters" + "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-eth-server/pkg/events" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/rpc" ) const ( - channelName = "channelName" + channelName = "postgraphile:header_cids" ) func (api *PublicEthAPI) NewStateChanges(ctx context.Context, crit filters.FilterCriteria) (*rpc.Subscription, error) { @@ -20,22 +19,24 @@ func (api *PublicEthAPI) NewStateChanges(ctx context.Context, crit filters.Filte if !supported { return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported } - rpcSub := notifier.CreateSubscription() - go func() { - stateChanges := make(chan Payload) - stateChangeSub := api.events.SubscribeStateChanges(ethereum.FilterQuery(crit), stateChanges) + errChan := make(chan error) + stateChanges := make(chan events.Payload) + api.events.SubscribeStateChanges(ethereum.FilterQuery(crit), stateChanges, errChan) for { select { case s := <-stateChanges: notifier.Notify(rpcSub.ID, s) case <-rpcSub.Err(): - stateChangeSub.Unsubscribe() + api.events.Close() return case <-notifier.Closed(): - stateChangeSub.Unsubscribe() + api.events.Close() + return + case err := <- errChan: + logrus.Errorf("error from NewStateChanges notifier: %v", err) return } } @@ -43,28 +44,3 @@ func (api *PublicEthAPI) NewStateChanges(ctx context.Context, crit filters.Filte return rpcSub, nil } - -// Payload packages the data to send to statediff subscriptions -type Payload struct { - StateDiffRlp []byte `json:"stateDiff" gencodec:"required"` -} - -// StateDiff is the final output structure from the builder -type StateDiff struct { - BlockNumber *big.Int `json:"blockNumber" gencodec:"required"` - BlockHash common.Hash `json:"blockHash" gencodec:"required"` - UpdatedAccounts []AccountDiff `json:"updatedAccounts" gencodec:"required"` -} - -// AccountDiff holds the data for a single state diff node -type AccountDiff struct { - Key []byte `json:"key" gencodec:"required"` - Value []byte `json:"value" gencodec:"required"` - Storage []StorageDiff `json:"storage" gencodec:"required"` -} - -// StorageDiff holds the data for a single storage diff node -type StorageDiff struct { - Key []byte `json:"key" gencodec:"required"` - Value []byte `json:"value" gencodec:"required"` -} diff --git a/pkg/events/notifier.go b/pkg/events/notifier.go index ee72a9f5a..3b87f8502 100644 --- a/pkg/events/notifier.go +++ b/pkg/events/notifier.go @@ -17,25 +17,46 @@ package events import ( - "sync" + "encoding/json" + "math/big" "time" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rlp" + "github.com/jmoiron/sqlx" "github.com/lib/pq" log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-eth-indexer/pkg/postgres" + + "github.com/vulcanize/ipld-eth-server/pkg/shared" +) + +var ( + getBlockInfoPgStr = `SELECT block_number, block_hash FROM eth.header_cids WHERE id = $1` + getStateLeafsPgStr = `SELECT state_cids.id, state_leaf_key, data FROM eth.state_cids + INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id) + INNER JOIN public.blocks ON (state_cids.mh_key = blocks.key) + WHERE header_cids.id = $1 AND state_leaf_key IS NOT NULL` + getStorageLeafsPgStr = `SELECT storage_leaf_key, data FROM eth.storage_cids + INNER JOIN eth.state_cids ON (storage_cids.state_id = state_cids.id) + INNER JOIN public.blocks ON (storage_cids.mh_key = blocks.key) + WHERE state_cids.id = $1 AND storage_leaf_key IS NOT NULL` ) // Notifier listens to inserts on Postgres tables and forwards the data type Notifier struct { + db *postgres.DB listener *pq.Listener failed chan error } // NewNotifier creates a new notifier for given PostgreSQL credentials. -func NewNotifier(connectStr, channelName string) (*Notifier, error) { - n := &Notifier{failed: make(chan error, 2)} +func NewNotifier(db *postgres.DB, channelName string) (*Notifier, error) { + n := &Notifier{db: db, failed: make(chan error)} listener := pq.NewListener( - connectStr, + db.Config.DbConnectionString(), 10*time.Second, time.Minute, n.callBack) @@ -49,12 +70,128 @@ func NewNotifier(connectStr, channelName string) (*Notifier, error) { return n, nil } -// Notify is the main loop of the notifier to receive data from +func (n *Notifier) SubscribeStateChanges(query ethereum.FilterQuery, payloadChan chan Payload, errChan chan error) { + outChan := make(chan []byte) + doneChan := make(chan struct{}) + n.notify(doneChan, outChan, errChan) + go func() { + for { + select { + case out := <-outChan: + jsonPayload := new(JSONPayload) + json.Unmarshal(out, jsonPayload) + if len(jsonPayload.Node) < 2 { + log.Warn("pushed JSON payload does not contain expected number of entrie in __node__ array") + continue + } + payload, err := n.getStateChanges(jsonPayload.Node[1]) + if err != nil { + errChan <- err + continue + } + payloadChan <- *payload + case <-doneChan: + return + } + } + }() +} + +func (n *Notifier) getStateChanges(headerID string) (*Payload, error) { + tx, err := n.db.Beginx() + if err != nil { + return nil, err + } + blockInfo := new(BlockInfoPayload) + if err := tx.Select(blockInfo, getBlockInfoPgStr, headerID); err != nil { + shared.Rollback(tx) + return nil, err + } + blockNum := new(big.Int) + blockNum.SetString(blockInfo.BlockNumber, 10) + stateLeafPayloads, err := n.getStateLeafs(tx, headerID) + if err != nil { + shared.Rollback(tx) + return nil, err + } + stateAccounts := make([]AccountDiff, len(stateLeafPayloads)) + for i, slp := range stateLeafPayloads { + storageLeafPayloads, err := n.getStorageLeafs(tx, slp.ID) + if err != nil { + shared.Rollback(tx) + return nil, err + } + stateAccounts[i] = AccountDiff{ + Key: common.Hex2Bytes(slp.StateLeafKey), + Value: slp.RLPData, + Storage: storageLeafPayloads, + } + } + stateChangePayload := new(StateDiff) + stateChangePayload.BlockHash = common.HexToHash(blockInfo.BlockHash) + stateChangePayload.BlockNumber = blockNum + stateChangePayload.UpdatedAccounts = stateAccounts + if err := tx.Commit(); err != nil { + return nil, err + } + + by, err := rlp.EncodeToBytes(stateChangePayload) + if err != nil { + return nil, err + } + return &Payload{ + StateDiffRlp: by, + }, nil +} + +func (n *Notifier) getStateLeafs(tx *sqlx.Tx, headerID string) ([]StateLeafPayload, error) { + rows, err := tx.Queryx(getStateLeafsPgStr, headerID) + if err != nil { + return nil, err + } + stateLeafPayloads := make([]StateLeafPayload, 0) + defer rows.Close() + for rows.Next() { + stateLeafPayload := new(StateLeafPayload) + if err := rows.StructScan(stateLeafPayload); err != nil { + return nil, err + } + stateLeafPayloads = append(stateLeafPayloads, *stateLeafPayload) + } + if rows.Err() != nil { + return nil, err + } + return stateLeafPayloads, err +} + +func (n *Notifier) getStorageLeafs(tx *sqlx.Tx, stateID int64) ([]StorageDiff, error) { + rows, err := tx.Queryx(getStorageLeafsPgStr, stateID) + if err != nil { + return nil, err + } + storageLeafPayloads := make([]StorageDiff, 0) + defer rows.Close() + for rows.Next() { + storageLeafPayload := new(StorageLeafPayload) + if err := rows.StructScan(storageLeafPayload); err != nil { + return nil, err + } + storageLeafPayloads = append(storageLeafPayloads, StorageDiff{ + Key: common.Hex2Bytes(storageLeafPayload.StorageLeaf), + Value: storageLeafPayload.RLPData, + }) + } + if rows.Err() != nil { + return nil, err + } + return storageLeafPayloads, err +} + +// notify is the main loop of the notifier to receive data from // the database in JSON-FORMAT and send it down the provided channel. -func (n *Notifier) Notify(wg *sync.WaitGroup, outChan chan []byte, errChan chan error) { - wg.Wait() +func (n *Notifier) notify(doneChan chan struct{}, outChan chan []byte, errChan chan error) { go func() { - defer wg.Done() + defer close(doneChan) for { select { case e := <-n.listener.Notify: @@ -90,7 +227,7 @@ func (n *Notifier) callBack(event pq.ListenerEventType, err error) { } } -// close closes the notifier. -func (n *Notifier) close() error { +// Close closes the notifier. +func (n *Notifier) Close() error { return n.listener.Close() } diff --git a/pkg/events/notifier_test.go b/pkg/events/notifier_test.go deleted file mode 100644 index b3adf695c..000000000 --- a/pkg/events/notifier_test.go +++ /dev/null @@ -1 +0,0 @@ -package events diff --git a/pkg/events/types.go b/pkg/events/types.go new file mode 100644 index 000000000..ae2953eeb --- /dev/null +++ b/pkg/events/types.go @@ -0,0 +1,53 @@ +package events + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/common" +) + +// Payload packages the data to send to statediff subscriptions +type Payload struct { + StateDiffRlp []byte `json:"stateDiff" gencodec:"required"` +} + +// StateDiff is the final output structure from the builder +type StateDiff struct { + BlockNumber *big.Int `json:"blockNumber" gencodec:"required"` + BlockHash common.Hash `json:"blockHash" gencodec:"required"` + UpdatedAccounts []AccountDiff `json:"updatedAccounts" gencodec:"required"` +} + +// AccountDiff holds the data for a single state diff node +type AccountDiff struct { + Key []byte `json:"key" gencodec:"required"` + Value []byte `json:"value" gencodec:"required"` + Storage []StorageDiff `json:"storage" gencodec:"required"` +} + +// StorageDiff holds the data for a single storage diff node +type StorageDiff struct { + Key []byte `json:"key" gencodec:"required"` + Value []byte `json:"value" gencodec:"required"` +} + +// JSONPayload notify payload +type JSONPayload struct { + Node []string `json:"__node__"` +} + +type BlockInfoPayload struct { + BlockNumber string `db:"block_number"` + BlockHash string `db:"block_hash"` +} + +type StateLeafPayload struct { + StateLeafKey string `db:"state_leaf_key"` + ID int64 `db:"id"` + RLPData []byte `db:"data"` +} + +type StorageLeafPayload struct { + StorageLeaf string `db:"storage_leaf_key"` + RLPData []byte `db:"data"` +} \ No newline at end of file