Skip to content

Commit

Permalink
support SubscribeStateChanges
Browse files Browse the repository at this point in the history
  • Loading branch information
i-norden committed Mar 19, 2021
1 parent 1277c38 commit bed13b9
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 45 deletions.
46 changes: 11 additions & 35 deletions pkg/eth/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,69 +2,45 @@ package eth

import (
"context"
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-server/pkg/events"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/rpc"
)

const (
channelName = "channelName"
channelName = "postgraphile:header_cids"
)

func (api *PublicEthAPI) NewStateChanges(ctx context.Context, crit filters.FilterCriteria) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}

rpcSub := notifier.CreateSubscription()

go func() {
stateChanges := make(chan Payload)
stateChangeSub := api.events.SubscribeStateChanges(ethereum.FilterQuery(crit), stateChanges)
errChan := make(chan error)
stateChanges := make(chan events.Payload)
api.events.SubscribeStateChanges(ethereum.FilterQuery(crit), stateChanges, errChan)

for {
select {
case s := <-stateChanges:
notifier.Notify(rpcSub.ID, s)
case <-rpcSub.Err():
stateChangeSub.Unsubscribe()
api.events.Close()
return
case <-notifier.Closed():
stateChangeSub.Unsubscribe()
api.events.Close()
return
case err := <- errChan:
logrus.Errorf("error from NewStateChanges notifier: %v", err)
return
}
}
}()

return rpcSub, nil
}

// Payload packages the data to send to statediff subscriptions
type Payload struct {
StateDiffRlp []byte `json:"stateDiff" gencodec:"required"`
}

// StateDiff is the final output structure from the builder
type StateDiff struct {
BlockNumber *big.Int `json:"blockNumber" gencodec:"required"`
BlockHash common.Hash `json:"blockHash" gencodec:"required"`
UpdatedAccounts []AccountDiff `json:"updatedAccounts" gencodec:"required"`
}

// AccountDiff holds the data for a single state diff node
type AccountDiff struct {
Key []byte `json:"key" gencodec:"required"`
Value []byte `json:"value" gencodec:"required"`
Storage []StorageDiff `json:"storage" gencodec:"required"`
}

// StorageDiff holds the data for a single storage diff node
type StorageDiff struct {
Key []byte `json:"key" gencodec:"required"`
Value []byte `json:"value" gencodec:"required"`
}
68 changes: 58 additions & 10 deletions pkg/events/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,37 @@
package events

import (
"sync"
"encoding/json"
"time"

"github.com/ethereum/go-ethereum"
"github.com/lib/pq"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
)

var (
getBlockInfoPgStr = `SELECT block_number, block_hash FROM eth.header_cids WHERE id = $1`
getStateLeafsPgStr = `SELECT state_cids.id, state_leaf_key, data FROM eth.state_cids
INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id)
INNER JOIN public.blocks ON (state_cids.mh_key = public.blocks.key)
WHERE header_cids.id = $1 AND state_leaf_key IS NOT NULL`
getStorageLeafsPgStr = `SELECT storage_cids.id, storage_leaf_key, data FROM eth.storage_cids
INNER JOIN eth.state_cids ON (storage_cids.state_id = state_cids.id)
INNER JOIN public.blocks ON (storage_cids.mh_key = public.blocks.key)
WHERE state_cids.id = $1 AND storage_leaf_key IS NOT NULL`
)

// Notifier listens to inserts on Postgres tables and forwards the data
type Notifier struct {
db *postgres.DB
listener *pq.Listener
failed chan error
}

// NewNotifier creates a new notifier for given PostgreSQL credentials.
func NewNotifier(connectStr, channelName string) (*Notifier, error) {
n := &Notifier{failed: make(chan error, 2)}
func NewNotifier(db *postgres.DB, connectStr, channelName string) (*Notifier, error) {
n := &Notifier{db: db, failed: make(chan error, 2)}

listener := pq.NewListener(
connectStr,
Expand All @@ -49,12 +64,46 @@ func NewNotifier(connectStr, channelName string) (*Notifier, error) {
return n, nil
}

// Notify is the main loop of the notifier to receive data from
type JSONPayload struct {
Node []string `json:"__node__"`
}

func (n *Notifier) SubscribeStateChanges(query ethereum.FilterQuery, payloadChan chan Payload, errChan chan error) {
outChan := make(chan []byte)
doneChan := make(chan struct{})
n.notify(doneChan, outChan, errChan)
go func() {
for {
select {
case out := <-outChan:
jsonPayload := new(JSONPayload)
json.Unmarshal(out, jsonPayload)
if len(jsonPayload.Node) < 2 {
log.Warn("pushed JSON payload does not contain expected number of entrie in __node__ array")
continue
}
payload, err := n.getStateChanges(jsonPayload.Node[1])
if err != nil {
errChan <- err
continue
}
payloadChan <- payload
case <-doneChan:
return
}
}
}()
}

func (n *Notifier) getStateChanges(headerID string) (Payload, error) {
return Payload{}, nil
}

// notify is the main loop of the notifier to receive data from
// the database in JSON-FORMAT and send it down the provided channel.
func (n *Notifier) Notify(wg *sync.WaitGroup, outChan chan []byte, errChan chan error) {
wg.Wait()
func (n *Notifier) notify(doneChan chan struct{}, outChan chan []byte, errChan chan error) {
go func() {
defer wg.Done()
defer close(doneChan)
for {
select {
case e := <-n.listener.Notify:
Expand All @@ -76,7 +125,6 @@ func (n *Notifier) Notify(wg *sync.WaitGroup, outChan chan []byte, errChan chan
}
}()
}

// callBack
func (n *Notifier) callBack(event pq.ListenerEventType, err error) {
if err != nil {
Expand All @@ -90,7 +138,7 @@ func (n *Notifier) callBack(event pq.ListenerEventType, err error) {
}
}

// close closes the notifier.
func (n *Notifier) close() error {
// Close closes the notifier.
func (n *Notifier) Close() error {
return n.listener.Close()
}
38 changes: 38 additions & 0 deletions pkg/events/payload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package events

import (
"math/big"

"github.com/ethereum/go-ethereum/common"
)

type Subscription interface {
Err() <-chan error
Unsubscribe()

}

// Payload packages the data to send to statediff subscriptions
type Payload struct {
StateDiffRlp []byte `json:"stateDiff" gencodec:"required"`
}

// StateDiff is the final output structure from the builder
type StateDiff struct {
BlockNumber *big.Int `json:"blockNumber" gencodec:"required"`
BlockHash common.Hash `json:"blockHash" gencodec:"required"`
UpdatedAccounts []AccountDiff `json:"updatedAccounts" gencodec:"required"`
}

// AccountDiff holds the data for a single state diff node
type AccountDiff struct {
Key []byte `json:"key" gencodec:"required"`
Value []byte `json:"value" gencodec:"required"`
Storage []StorageDiff `json:"storage" gencodec:"required"`
}

// StorageDiff holds the data for a single storage diff node
type StorageDiff struct {
Key []byte `json:"key" gencodec:"required"`
Value []byte `json:"value" gencodec:"required"`
}

0 comments on commit bed13b9

Please sign in to comment.