Skip to content

Commit

Permalink
SafuNode init. Full node RPC for private transactions.
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Melnikov committed Aug 10, 2021
0 parents commit 7a80115
Show file tree
Hide file tree
Showing 9 changed files with 1,074 additions and 0 deletions.
12 changes: 12 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
.PHONY: all test clean

GOBASE := $(shell pwd)
GOBIN := $(GOBASE)/bin
GOPATH := $(if $(GOPATH),$(GOPATH),~/go)

all:
mkdir -p $(GOBIN)
go build -v -o $(GOBIN)

clean:
rm -rf $(GOBIN)/*
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module safunode

go 1.16

require github.com/ethereum/go-ethereum v1.10.6 // indirect
529 changes: 529 additions & 0 deletions go.sum

Large diffs are not rendered by default.

37 changes: 37 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package main

import (
"flag"

"safunode/server"
)

// ChainID: 1, Infura:
// ./bin/safunode --listen 0.0.0.0:9000 --proxy https://mainnet.infura.io/v3/dce4f913d749454d94daa2c87f01ceb2 --relayer http://0.0.0.0:9545 --subscribe ws://127.0.0.1:8546

// ChainID: 1, LocalGeth:
// ./bin/safunode --listen 0.0.0.0:9000 --proxy http://127.0.0.1:8545 --relayer http://0.0.0.0:9545 --subscribe ws://127.0.0.1:8546

// ChainID: 1337, LocalMevGeth (--dev):
// ./bin/safunode --listen 0.0.0.0:9999 --proxy http://0.0.0.0:9545 --relayer http://0.0.0.0:9545 --subscribe ws://0.0.0.0:8546

const (
defaultListenAddress = "0.0.0.0:9000"
// MiningDAO Infura endpoint for public examples, please don't abuse
defaultProxyUrl = "https://mainnet.infura.io/v3/dce4f913d749454d94daa2c87f01ceb2"
defaultRelayerUrl = "https://relay.flashbots.net"
defaultSubscribeWsUrl = "ws://127.0.0.1:8546"
)

var listenAddress = flag.String("listen", defaultListenAddress, "Listen address")
var proxyUrl = flag.String("proxy", defaultProxyUrl, "URL for proxy eth_call-like request")
var relayerUrl = flag.String("relayer", defaultRelayerUrl, "URL for eth_sendRawTransaction relay")
var subscribeWsUrl = flag.String("subscribe", defaultSubscribeWsUrl, "URL for blockchain subscriptions (must be WebSocket)")

func main() {
flag.Parse()
bm := server.NewBlockchainManager(*subscribeWsUrl)
relayer := server.NewPrivateTxRelayer(*relayerUrl, bm)
s := server.NewSafuNodeServer(*listenAddress, *proxyUrl, relayer)
s.Start()
}
93 changes: 93 additions & 0 deletions server/blockchain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package server

import (
"context"
"log"
"sync"
"sync/atomic"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
)

type BlockchainManager struct {
client *ethclient.Client
headerCh chan *types.Header
headerSub ethereum.Subscription

latestBlockNumber uint64
blockSubs []chan *types.Block
blockSubsMux sync.Mutex
}

func NewBlockchainManager(wsUrl string) *BlockchainManager {
client, err := ethclient.Dial(wsUrl)
if err != nil {
log.Fatalf("ERROR: failed to connect to ws: %v", err)
}

latestBlockNumber, err := client.BlockNumber(context.Background())
if err != nil {
log.Fatalf("ERROR: failed to get latest block: %v", err)
}

headerCh := make(chan *types.Header)
headerSub, err := client.SubscribeNewHead(context.Background(), headerCh)
if err != nil {
log.Fatalf("ERROR: failed to connect to ws: %v", err)
}

bm := &BlockchainManager{
client: client,
headerCh: headerCh,
headerSub: headerSub,
latestBlockNumber: latestBlockNumber,
blockSubs: make([]chan *types.Block, 0),
}
go bm.loop()
return bm
}

func (bm *BlockchainManager) GetLatestBlockNumber() uint64 {
return atomic.LoadUint64(&bm.latestBlockNumber)
}

func (bm *BlockchainManager) SubscribeNewBlock(ch chan *types.Block) {
bm.blockSubsMux.Lock()
bm.blockSubs = append(bm.blockSubs, ch)
bm.blockSubsMux.Unlock()
}

func (bm *BlockchainManager) loop() {
for {
select {
case err := <-bm.headerSub.Err():
log.Printf("ERROR: head subscription error: %v", err)
case header := <-bm.headerCh:
bm.processNewHeader(header)
}
}
}

func (bm *BlockchainManager) processNewHeader(header *types.Header) {
log.Printf("New header at height %d", header.Number)
atomic.StoreUint64(&bm.latestBlockNumber, header.Number.Uint64())
block, err := bm.client.BlockByNumber(context.Background(), header.Number)
if err != nil {
log.Printf("ERROR: failed to get block by number: %v", err)
return
}
bm.broadcastNewBlock(block)
}

func (bm *BlockchainManager) broadcastNewBlock(block *types.Block) {
bm.blockSubsMux.Lock()
for _, ch := range bm.blockSubs {
select {
case ch <- block:
default:
}
}
bm.blockSubsMux.Unlock()
}
207 changes: 207 additions & 0 deletions server/relayer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package server

import (
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"strconv"
"sync"
"sync/atomic"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
)

type PendingTransaction struct {
Tx *types.Transaction
BlockNumber uint64
}

type PrivateTxRelayer struct {
Url string
id uint64
blockchainManager *BlockchainManager
blockCh chan *types.Block
txByFromAndNonce map[string]map[uint64]*PendingTransaction
txByFromAndNonceMux sync.RWMutex
}

func NewPrivateTxRelayer(url string, bm *BlockchainManager) *PrivateTxRelayer {
relayer := &PrivateTxRelayer{
Url: url,
id: uint64(1e9),
blockchainManager: bm,
blockCh: make(chan *types.Block, 10),
txByFromAndNonce: make(map[string]map[uint64]*PendingTransaction),
}
go relayer.txMonitorLoop()
relayer.blockchainManager.SubscribeNewBlock(relayer.blockCh)
return relayer
}

func (r *PrivateTxRelayer) SendRawTransaction(rawJsonReq *JsonRpcRequest) (*JsonRpcResponse, error) {
// Validate JSON RPC parameters:
if len(rawJsonReq.Params) == 0 {
return nil, errors.New("invalid params")
}
rawTxHex, ok := rawJsonReq.Params[0].(string)
if !ok || len(rawTxHex) < 2 {
return nil, errors.New("invalid raw transaction")
}
rawTxBytes, err := hex.DecodeString(rawTxHex[2:])
if err != nil {
return nil, errors.New("invalid raw transaction")
}
tx := new(types.Transaction)
if err := tx.UnmarshalBinary(rawTxBytes); err != nil {
return nil, err
}

// Send bundle:
blockNumber := r.blockchainManager.GetLatestBlockNumber() + 1
if err := r.SendBundle([]string{rawTxHex}, blockNumber); err != nil {
return nil, err
}

// Save transaction:
if err := r.addPendingTransaction(tx, blockNumber); err != nil {
return nil, err
}

// eth_sendRawTransaction response:
jsonResp := &JsonRpcResponse{
Id: rawJsonReq.Id,
Result: tx.Hash().Hex(),
Version: "2.0",
}
return jsonResp, nil
}

func (r *PrivateTxRelayer) SendBundle(rawTxs []string, blockNumber uint64) error {
// Convert eth_sendRawTransaction-style into eth_sendBundle:
sendBundleArgs := SendBundleArgs{
Txs: rawTxs,
BlockNumber: "0x" + strconv.FormatUint(blockNumber, 16),
}
bundleJsonReq := &JsonRpcRequest{
Id: atomic.AddUint64(&r.id, 1),
Method: "eth_sendBundle",
Params: []interface{}{sendBundleArgs},
Version: "2.0",
}
data, err := json.Marshal(bundleJsonReq)
if err != nil {
return err
}

// Make eth_sendBundle request:
log.Printf("[DEBUG] eth_sendBundle request: %s", string(data))
resp, err := MakeRequest(r.Url, data, true)
if err != nil {
return err
}

// Read response body:
respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
defer resp.Body.Close()
log.Printf("[DEBUG] eth_sendBundle response: %s", string(respBody))

// Parse response:
var bundleJsonResp *JsonRpcResponse
if err := json.Unmarshal(respBody, &bundleJsonResp); err != nil {
return err
}
if bundleJsonResp.Error != nil {
return fmt.Errorf("eth_sendBundle returned error: +v", bundleJsonResp.Error)
}
return nil
}

func (r *PrivateTxRelayer) addPendingTransaction(tx *types.Transaction, blockNumber uint64) error {
log.Printf("Adding pending tx with hash %s at block %d", tx.Hash().Hex(), blockNumber)
from, err := From(tx)
if err != nil {
return err
}
r.txByFromAndNonceMux.Lock()
if r.txByFromAndNonce[from] == nil {
r.txByFromAndNonce[from] = make(map[uint64]*PendingTransaction)
}
r.txByFromAndNonce[from][tx.Nonce()] = &PendingTransaction{tx, blockNumber}
r.txByFromAndNonceMux.Unlock()
return nil
}

func (r *PrivateTxRelayer) removePendingTransaction(tx *types.Transaction) error {
from, err := From(tx)
if err != nil {
return err
}
//r.txByFromAndNonceMux.Lock()
if r.txByFromAndNonce[from] == nil {
return nil
}
log.Printf("Removing pending tx with hash %s", tx.Hash().Hex())
delete(r.txByFromAndNonce[from], tx.Nonce())
//r.txByFromAndNonceMux.Unlock()
return nil
}

func (r *PrivateTxRelayer) txMonitorLoop() {
for {
select {
case block := <-r.blockCh:
log.Printf("Relayer got new block header: %v", block.NumberU64())
// 1. Remove pending transactions with outdated <from, nonce>
newTxs := block.Transactions()
for i := range newTxs {
if err := r.removePendingTransaction(newTxs[i]); err != nil {
log.Printf("ERROR: failed to remove transaction %+v: %v", newTxs[i], err)
continue
}
}
// 2. Re-send transactions
blockNumber := block.NumberU64()
r.txByFromAndNonceMux.Lock()
for from, txInfoByNonce := range r.txByFromAndNonce {
for nonce, txInfo := range txInfoByNonce {
if txInfo.BlockNumber > blockNumber {
continue
}
txInfo.BlockNumber = blockNumber + 1

tx := txInfo.Tx
rawTxBytes, err := tx.MarshalBinary()
if err != nil {
log.Printf("ERROR: failed to serialize tx: %v", err)
continue
}
rawTxHex := hexutil.Encode(rawTxBytes)

//go func() {
log.Printf("Re-sending: from=%v nonce=%v hash=%s", from, nonce, tx.Hash().Hex())
if err := r.SendBundle([]string{rawTxHex}, txInfo.BlockNumber); err != nil {
log.Printf("ERROR: failed to resend eth_sendBundle: %v", err)
}
//}()
}
}
r.txByFromAndNonceMux.Unlock()
}
}
}

func From(tx *types.Transaction) (string, error) {
signer := types.LatestSignerForChainID(tx.ChainId())
sender, err := types.Sender(signer, tx)
if err != nil {
return "", err
}
return sender.Hex(), nil
}
Loading

0 comments on commit 7a80115

Please sign in to comment.