Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ethereum websocket module #102

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions dapp/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
sendEthTxMod "github.com/Bit-Nation/panthalassa/dapp/module/sendEthTx"
uuidv4Mod "github.com/Bit-Nation/panthalassa/dapp/module/uuidv4"
db "github.com/Bit-Nation/panthalassa/db"
ethws "github.com/Bit-Nation/panthalassa/ethws"
keyManager "github.com/Bit-Nation/panthalassa/keyManager"
storm "github.com/asdine/storm"
log "github.com/ipfs/go-log"
Expand Down Expand Up @@ -55,6 +56,7 @@ type Registry struct {
host host.Host
closeChan chan *dapp.Data
conf Config
ethWS *ethws.EthereumWS
api *api.API
km *keyManager.KeyManager
dAppDB dapp.Storage
Expand Down Expand Up @@ -85,6 +87,10 @@ func NewDAppRegistry(h host.Host, conf Config, api *api.API, km *keyManager.KeyM
fetchDAppChan: make(chan fetchDAppChanStr),
addDevStreamChan: make(chan addDevStreamChanStr),
fetchDevStreamChan: make(chan fetchDAppStreamStr),
ethWS: ethws.New(ethws.Config{
Retry: time.Second,
WSUrl: conf.EthWSEndpoint,
}),
}

// load all default DApps
Expand Down
221 changes: 221 additions & 0 deletions ethws/ws.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
package ethws

import (
"encoding/json"
"fmt"
"time"

wsg "github.com/gorilla/websocket"
log "github.com/ipfs/go-log"
)

const rpcVersion = "2.0"

var logger = log.Logger("ethws")

type Config struct {
Retry time.Duration
WSUrl string
}

type Request struct {
ID int64 `json:"id"`
Method string `json:"method"`
Params []interface{} `json:"params"`
JsonRPC string `json:"jsonrpc"`
}

func (r *Request) Marshal() ([]byte, error) {
return json.Marshal(r)
}

type Error struct {
Code int `json:"code"`
Message string `json:"message"`
}

type Response struct {
JsonRPC string `json:"jsonrpc"`
RPCError *Error `json:"error,omitempty"`
Result interface{} `json:"result"`
ID int64 `json:"id"`
error error
}

func (r *Response) Error() error {
return r.error
}

type EthereumWS struct {
state State
// requests that need to be send
requestQueue chan Request
requests map[int64]chan<- Response
conn *wsg.Conn
}

type State struct {
addRequestIfNotExist chan StateObject
getRequest chan StateObject
deleteRequest chan StateObject
response chan StateObject
}

type StateObject struct {
id int64
c chan<- Response
}

// start state machine
func (ws *EthereumWS) State() {
for {
select {
case getRequest := <-ws.state.getRequest:
// if the request exists, respond with the id and the response channel
if respChan, exist := ws.requests[getRequest.id]; exist {
ws.state.response <- StateObject{getRequest.id, respChan}
break
}
// if the request doesn't exist, respond with 0 and nil
ws.state.response <- StateObject{0, nil}
case addRequest := <-ws.state.addRequestIfNotExist:
// if the request exists, don't add it to the map, respond with the id and the response channel
if respChan, exist := ws.requests[addRequest.id]; exist {
ws.state.response <- StateObject{addRequest.id, respChan}
break
}
// if the request doesn't exist, add it to the map, respond with 0 and nil
ws.requests[addRequest.id] = addRequest.c
ws.state.response <- StateObject{0, nil}
case deleteRequest := <-ws.state.deleteRequest:
delete(ws.requests, deleteRequest.id)
ws.state.response <- StateObject{0, nil}
} // select
} // infinite for
} // func (ws *EthereumWS) State()

// send an request to ethereum network
func (ws *EthereumWS) SendRequest(r Request) (<-chan Response, error) {

c := make(chan Response)

// add request to stack
for {
id := time.Now().UnixNano()
ws.state.addRequestIfNotExist <- StateObject{id, c}
stateResponse := <-ws.state.response
// if the id didn't exist
if stateResponse.id == 0 {
r.ID = id
break
}
}

// send request to queue
ws.requestQueue <- r
return c, nil

}

// create new ethereum websocket
func New(conf Config) *EthereumWS {

startSendWorker := make(chan bool)
startReadWorker := make(chan bool)

etws := &EthereumWS{
state: State{
addRequestIfNotExist: make(chan StateObject),
getRequest: make(chan StateObject),
deleteRequest: make(chan StateObject),
response: make(chan StateObject),
},
requestQueue: make(chan Request, 1000),
requests: map[int64]chan<- Response{},
}
go etws.State()
// worker that sends the requests
go func() {

// wait for connection
<-startSendWorker
// send requests
for {
select {
case req := <-etws.requestQueue:

// send request
if err := etws.conn.WriteJSON(req); err != nil {
logger.Error(err)

etws.state.getRequest <- StateObject{req.ID, nil}
stateResponse := <-etws.state.response
respChan := stateResponse.c

respChan <- Response{error: err}
}
}
}

}()

// worker that read response from websocket
go func() {

// wait to start worker
<-startReadWorker
for {

// read message
_, response, err := etws.conn.ReadMessage()
if err != nil {
logger.Error(err)
continue
}
// unmarshal
var resp Response
if err := json.Unmarshal(response, &resp); err != nil {
logger.Error(err)
continue
}

// get response channel
etws.state.getRequest <- StateObject{resp.ID, nil}
stateResponse := <-etws.state.response
if stateResponse.id == 0 {
logger.Error(fmt.Sprintf("failed to get response channel for ID: %d", stateResponse.id))
continue
}
etws.state.deleteRequest <- StateObject{resp.ID, nil}
_ = <-etws.state.response
// send response
respChan := stateResponse.c
respChan <- resp

}

}()

// connect to ethereum node
go func() {

// try to connect till success
for {
co, _, err := wsg.DefaultDialer.Dial(conf.WSUrl, nil)
if err == nil {
etws.conn = co
break
}
logger.Error(err)
// wait a bit. We don't want to stress the endpoint
time.Sleep(conf.Retry)
}

// signal the workers to start
startReadWorker <- true
startSendWorker <- true

}()

return etws
}
30 changes: 30 additions & 0 deletions ethws/ws_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package ethws

import (
"testing"
"time"

require "github.com/stretchr/testify/require"
)

func TestEthereumWebSocket(t *testing.T) {

ws := New(Config{
Retry: time.Second,
WSUrl: "wss://mainnet.infura.io/_ws",
})

respChan, err := ws.SendRequest(Request{
Method: `eth_protocolVersion`,
JsonRPC: rpcVersion,
})

require.Nil(t, err)

resp := <-respChan
require.Nil(t, resp.Error())

require.Equal(t, "2.0", resp.JsonRPC)
require.Equal(t, "0x3f", resp.Result)

}