Skip to content
This repository has been archived by the owner on Dec 28, 2024. It is now read-only.

Commit

Permalink
Handle RPC errors gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Nov 1, 2017
1 parent a36f61a commit c4df7c1
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 25 deletions.
24 changes: 21 additions & 3 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ func (app *App) Subscribe(conn *Conn, msg *Message) {
return
}

res := rpc.Subscribe(conn.identifiers, msg.Identifier)
res, err := rpc.Subscribe(conn.identifiers, msg.Identifier)

if err != nil {
log.Errorf("RPC Subscribe Error: %v", err)
// TODO: need a way to tell client to retry later
return
}

if res.Status.String() == "SUCCESS" {
conn.subscriptions[msg.Identifier] = true
Expand All @@ -69,7 +75,13 @@ func (app *App) Unsubscribe(conn *Conn, msg *Message) {
return
}

res := rpc.Unsubscribe(conn.identifiers, msg.Identifier)
res, err := rpc.Unsubscribe(conn.identifiers, msg.Identifier)

if err != nil {
log.Errorf("RPC Unsubscribe Error: %v", err)
// TODO: need a way to tell client to retry later
return
}

if res.Status.String() == "SUCCESS" {
delete(conn.subscriptions, msg.Identifier)
Expand All @@ -84,7 +96,13 @@ func (app *App) Perform(conn *Conn, msg *Message) {
return
}

res := rpc.Perform(conn.identifiers, msg.Identifier, msg.Data)
res, err := rpc.Perform(conn.identifiers, msg.Identifier, msg.Data)

if err != nil {
log.Errorf("RPC Perform Error: %v", err)
// TODO: need a way to tell client to retry later
return
}

log.Debugf("Perform %s", res)

Expand Down
5 changes: 4 additions & 1 deletion disconnect_notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ func (d *DisconnectNotifier) run() {
case conn := <-d.disconnect:
<-throttle
log.Debugf("Commit disconnect %s %s %v", conn.identifiers, conn.path, conn.headers)
rpc.Disconnect(conn.identifiers, SubscriptionsList(conn.subscriptions), conn.path, conn.headers)
_, err := rpc.Disconnect(conn.identifiers, SubscriptionsList(conn.subscriptions), conn.path, conn.headers)
if err != nil {
log.Errorf("RPC Disconnect Error: %v", err)
}
}
}
}
Expand Down
34 changes: 16 additions & 18 deletions remote.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"errors"
"time"

"github.com/anycable/anycable-go/pool"
Expand Down Expand Up @@ -49,7 +50,7 @@ func (rpc *Remote) Close() {
rpc.pool.Close()
}

func (rpc *Remote) VerifyConnection(path string, headers map[string]string) *pb.ConnectionResponse {
func (rpc *Remote) VerifyConnection(path string, headers map[string]string) (*pb.ConnectionResponse, error) {
conn := rpc.GetConn()
defer conn.Close()
client := pb.NewRPCClient(conn.Conn)
Expand All @@ -61,18 +62,17 @@ func (rpc *Remote) VerifyConnection(path string, headers map[string]string) *pb.
response, err := retry(op)

if err != nil {
log.Errorf("RPC Error: %v", err)
return nil
return nil, err
}

if r, ok := response.(*pb.ConnectionResponse); ok {
return r
return r, nil
} else {
return nil
return nil, errors.New("Failed to deserialize connection response")
}
}

func (rpc *Remote) Subscribe(connId string, channelId string) *pb.CommandResponse {
func (rpc *Remote) Subscribe(connId string, channelId string) (*pb.CommandResponse, error) {
conn := rpc.GetConn()
defer conn.Close()
client := pb.NewRPCClient(conn.Conn)
Expand All @@ -86,7 +86,7 @@ func (rpc *Remote) Subscribe(connId string, channelId string) *pb.CommandRespons
return ParseCommandResponse(response, err)
}

func (rpc *Remote) Unsubscribe(connId string, channelId string) *pb.CommandResponse {
func (rpc *Remote) Unsubscribe(connId string, channelId string) (*pb.CommandResponse, error) {
conn := rpc.GetConn()
defer conn.Close()
client := pb.NewRPCClient(conn.Conn)
Expand All @@ -100,7 +100,7 @@ func (rpc *Remote) Unsubscribe(connId string, channelId string) *pb.CommandRespo
return ParseCommandResponse(response, err)
}

func (rpc *Remote) Perform(connId string, channelId string, data string) *pb.CommandResponse {
func (rpc *Remote) Perform(connId string, channelId string, data string) (*pb.CommandResponse, error) {
conn := rpc.GetConn()
defer conn.Close()
client := pb.NewRPCClient(conn.Conn)
Expand All @@ -114,7 +114,7 @@ func (rpc *Remote) Perform(connId string, channelId string, data string) *pb.Com
return ParseCommandResponse(response, err)
}

func (rpc *Remote) Disconnect(connId string, subscriptions []string, path string, headers map[string]string) *pb.DisconnectResponse {
func (rpc *Remote) Disconnect(connId string, subscriptions []string, path string, headers map[string]string) (*pb.DisconnectResponse, error) {
conn := rpc.GetConn()
defer conn.Close()
client := pb.NewRPCClient(conn.Conn)
Expand All @@ -126,14 +126,13 @@ func (rpc *Remote) Disconnect(connId string, subscriptions []string, path string
response, err := retry(op)

if err != nil {
log.Errorf("RPC Error: %v", err)
return nil
return nil, err
}

if r, ok := response.(*pb.DisconnectResponse); ok {
return r
return r, nil
} else {
return nil
return nil, errors.New("Failed to deserialize disconnect response")
}
}

Expand All @@ -155,15 +154,14 @@ func retry(callback func() (interface{}, error)) (res interface{}, err error) {
}
}

func ParseCommandResponse(response interface{}, err error) *pb.CommandResponse {
func ParseCommandResponse(response interface{}, err error) (*pb.CommandResponse, error) {
if err != nil {
log.Errorf("RPC Error: %v", err)
return nil
return nil, err
}

if r, ok := response.(*pb.CommandResponse); ok {
return r
return r, nil
} else {
return nil
return nil, errors.New("Failed to deserialize command response")
}
}
16 changes: 13 additions & 3 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,13 @@ func serveWs(w http.ResponseWriter, r *http.Request) {
path := r.URL.String()
headers := GetHeaders(r, config.headers)

response := rpc.VerifyConnection(path, headers)
response, err := rpc.VerifyConnection(path, headers)

if err != nil {
log.Errorf("RPC Connect Error: %v", err)
CloseWS(ws, "RPC Error")
return
}

log.Debugf("Auth %s", response)

Expand Down Expand Up @@ -239,7 +245,11 @@ func main() {
app.Disconnector = &DisconnectNotifier{rate: *disconnectRate, disconnect: make(chan *Conn)}
go app.Disconnector.run()

log.Infof("Running AnyCable websocket server v%s on %s at %s", version, *addr, *wspath)
http.HandleFunc(*wspath, serveWs)
http.ListenAndServe(*addr, nil)

log.Infof("Running AnyCable websocket server v%s on %s at %s", version, *addr, *wspath)
err := http.ListenAndServe(*addr, nil)
if err != nil {
log.Fatal("HTTP Server Error: ", err)
}
}

0 comments on commit c4df7c1

Please sign in to comment.