Skip to content

Commit

Permalink
Merged develop into master
Browse files Browse the repository at this point in the history
  • Loading branch information
rogerzr committed Sep 12, 2016
2 parents b951b53 + 62fa267 commit ecd2ea1
Show file tree
Hide file tree
Showing 29 changed files with 1,471 additions and 267 deletions.
511 changes: 511 additions & 0 deletions API.md

Large diffs are not rendered by default.

44 changes: 36 additions & 8 deletions connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,9 @@ func (nc *NexusConn) close() {
nc.cancelFun()
nc.conn.Close()
if mainContext.Err() == nil {
Log.Printf("Closing [%s] session", nc.connId)
if nc.proto != "internal" || LogLevelIs(DebugLevel) {
Log.Printf("Closing [%s] session", nc.connId)
}
dbClean(nc.connId)
}
}
Expand All @@ -356,7 +358,14 @@ func (nc *NexusConn) reload(fromSameSession bool) (bool, int) {
return false, ErrInternal
}

atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&nc.user)), unsafe.Pointer(&UserData{User: ud.User, Mask: ud.Mask, Tags: maskTags(ud.Tags, ud.Mask)}))
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&nc.user)), unsafe.Pointer(&UserData{
User: ud.User,
Mask: nc.user.Mask,
Tags: maskTags(ud.Tags, nc.user.Mask),
MaxSessions: ud.MaxSessions,
Whitelist: ud.Whitelist,
Blacklist: ud.Blacklist,
}))

if !fromSameSession {
wres, err := r.Table("sessions").
Expand Down Expand Up @@ -395,6 +404,14 @@ func (nc *NexusConn) updateSession() {

var numconn int64

type JsonRpcReqLog struct {
ConnID string `json:"connid"`
Method string `json:"method"`
Params interface{} `json:"params"`
Remote string `json:"remoteAddr"`
ID interface{} `json:"id"`
}

func (nc *NexusConn) handle() {

if nc.proto != "internal" {
Expand All @@ -416,16 +433,27 @@ func (nc *NexusConn) handle() {
Log.Debugf("Error on [%s] connection handler: %s", nc.connId, err)
break
}
params, err := json.Marshal(req.Params)
if err != nil {
Log.Printf("[%s@%s] %s: %#v - id: %.0f", req.nc.connId, req.nc.conn.RemoteAddr(), req.Method, req.Params, req.Id)
} else {
Log.Printf("[%s@%s] %s: %s - id: %.0f", req.nc.connId, req.nc.conn.RemoteAddr(), req.Method, params, req.Id)
}

if (req.Jsonrpc != "2.0" && req.Jsonrpc != "") || req.Method == "" { //"jsonrpc":"2.0" is optional
req.Error(ErrInvalidRequest, "", nil)
continue
}

if (req.Method != "sys.ping" && nc.proto != "internal") || LogLevelIs(DebugLevel) {
if opts.IsProduction {
d := JsonRpcReqLog{ID: req.Id, ConnID: req.nc.connId, Method: req.Method, Params: req.Params, Remote: req.nc.conn.RemoteAddr().String()}
marshalled, _ := json.Marshal(d)
Log.Printf(fmt.Sprintf("%s", marshalled))
} else {
marshalled, err := json.Marshal(req.Params)
if err != nil {
Log.Printf("[%s@%s] %s: %#v - id: %.0f", req.nc.connId, req.nc.conn.RemoteAddr(), req.Method, req.Params, req.Id)
} else {
Log.Printf("[%s@%s] %s: %s - id: %.0f", req.nc.connId, req.nc.conn.RemoteAddr(), req.Method, marshalled, req.Id)
}
}
}

go nc.handleReq(req)
}
}
38 changes: 30 additions & 8 deletions database.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@ import (
r "github.com/dancannon/gorethink"
"github.com/jaracil/ei"
. "github.com/jaracil/nexus/log"
"strings"
"time"
)

var db *r.Session

func dbOpen() (err error) {
db, err = r.Connect(r.ConnectOpts{
Address: opts.Rethink.Host,
Database: opts.Rethink.Database,
MaxIdle: opts.Rethink.MaxIdle,
MaxOpen: opts.Rethink.MaxOpen,
Addresses: opts.Rethink.Hosts,
Database: opts.Rethink.Database,
MaxIdle: opts.Rethink.MaxIdle,
MaxOpen: opts.Rethink.MaxOpen,
})
if err != nil {
return
Expand Down Expand Up @@ -197,25 +199,45 @@ func dbBootstrap() error {

func dbClean(prefix string) (err error) {
// Delete all tasks from this prefix
_, err = r.Table("tasks").
wres, err := r.Table("tasks").
Between(prefix, prefix+"\uffff").
Filter(r.Row.Field("detach").Not()).
Delete().
Delete(r.DeleteOpts{ReturnChanges: true}).
RunWrite(db, r.RunOpts{Durability: "soft"})
if err != nil {
return
}
for _, change := range wres.Changes {
task := ei.N(change.OldValue)
if path := task.M("path").StringZ(); !strings.HasPrefix(path, "@pull.") {
hook("task", path+task.M("method").StringZ(), task.M("user").StringZ(), ei.M{
"action": "pusherDisconnect",
"id": task.M("id").StringZ(),
"timestamp": time.Now().UTC(),
})
}
}

// Recover all tasks whose target session is this prefix
_, err = r.Table("tasks").
wres, err = r.Table("tasks").
Between(prefix, prefix+"\uffff", r.BetweenOpts{Index: "tses"}).
Update(r.Branch(r.Row.Field("stat").Eq("working"),
map[string]interface{}{"stat": "waiting", "tses": nil, "ttl": r.Row.Field("ttl").Add(-1)},
map[string]interface{}{}),
r.UpdateOpts{ReturnChanges: false}).
r.UpdateOpts{ReturnChanges: true}).
RunWrite(db, r.RunOpts{Durability: "soft"})
if err != nil {
return
}
for _, change := range wres.Changes {
task := ei.N(change.OldValue)
hook("task", task.M("path").StringZ()+task.M("method").StringZ(), task.M("user").StringZ(), ei.M{
"action": "pullerDisconnect",
"id": task.M("id").StringZ(),
"timestamp": time.Now().UTC(),
})
}

// Delete all pipes from this prefix
_, err = r.Table("pipes").
Between(prefix, prefix+"\uffff").
Expand Down
20 changes: 11 additions & 9 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ package main
const (
ErrParse = -32700
ErrInvalidRequest = -32600
ErrMethodNotFound = -32601
ErrInvalidParams = -32602
ErrInternal = -32603
ErrTimeout = -32000
ErrCancel = -32001
ErrInvalidTask = -32002
ErrInvalidPipe = -32003
ErrInvalidUser = -32004
ErrUserExists = -32005
ErrPermissionDenied = -32010
ErrInvalidParams = -32602
ErrMethodNotFound = -32601
ErrTtlExpired = -32011
ErrPermissionDenied = -32010
ErrLockNotOwned = -32006
ErrUserExists = -32005
ErrInvalidUser = -32004
ErrInvalidPipe = -32003
ErrInvalidTask = -32002
ErrCancel = -32001
ErrTimeout = -32000
ErrNoError = 0
)

Expand All @@ -31,4 +32,5 @@ var ErrStr = map[int]string{
ErrUserExists: "User already exists",
ErrPermissionDenied: "Permission denied",
ErrTtlExpired: "TTL expired",
ErrLockNotOwned: "Lock not owned",
}
Loading

0 comments on commit ecd2ea1

Please sign in to comment.