Skip to content

Commit

Permalink
Merged branch develop into master
Browse files Browse the repository at this point in the history
  • Loading branch information
rogerzr committed Jan 11, 2017
2 parents c2ad683 + fe0f8b9 commit 5b2cbad
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 61 deletions.
32 changes: 31 additions & 1 deletion API.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
# Versions
## 1.4.x
* Bump version to align numbers with server/clients.
### New:
* `sys.reload` is back to reload a connection's own session without permissions

## 1.2.0
### New:
* `user.getTags`
* `user.setDisabled`

## 1.1.0
### Modified:
* `sys.login` also returns the current tags of the user


## 1.0.0
### New:
* `user.addWhitelist`
Expand Down Expand Up @@ -97,6 +106,7 @@
* [user.list](#userlist)
* [user.setTags](#usersettags)
* [user.delTags](#userdeltags)
* [user.getTags](#usergettags)
* [user.setPass](#usersetpass)
* [user.addTemplate](#useraddtemplate)
* [user.delTemplate](#userdeltemplate)
Expand All @@ -105,6 +115,7 @@
* [user.addBlacklist](#useraddblacklist)
* [user.delBlacklist](#userdelblacklist)
* [user.setMaxSessions](#usersetmaxsessions)
* [user.setDisabled](#usersetdisabled)

# System

Expand Down Expand Up @@ -435,6 +446,15 @@ Remove a tag from an user on a prefix
### Result:
"result": { "ok": true }

## user.getTags
Return the list of tags result from merging the user tags with the tags being inherited by the templates

### Parameters:
* `"user": <String>` - Username of the user to get the tags from

### Result:
"result": { "tags": { "test": { "@task.pull": true, "@task.push": true }}}

## user.setPass
Set the user password for basic auth

Expand Down Expand Up @@ -514,3 +534,13 @@ Set the maximum number of parallel sessions active of an user

### Result:
"result": { "ok": true }

## user.setDisabled
A disabled user cannot login

### Parameters:
* `"user": <String>` - Username of the user
* `"disabled": <Bool>` - Enable/Disable the login

### Result:
"result": { "ok": true }
139 changes: 91 additions & 48 deletions connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,30 +127,81 @@ func (req *JsonRpcReq) Result(result interface{}) {

}

func (nc *NexusConn) pushRes(res *JsonRpcRes) (err error) {
select {
case nc.chRes <- res:
if res.req == nil || res.req.Method != "sys.ping" || LogLevelIs(DebugLevel) {
wf := nc.log.WithFields(logrus.Fields{
"connid": nc.connId,
"id": res.Id,
"type": "response",
"remote": nc.conn.RemoteAddr().String(),
"proto": nc.proto,
})

if res.Error != nil {
wf.WithFields(logrus.Fields{
"code": res.Error.Code,
"message": res.Error.Message,
"data": res.Error.Data,
}).Info("<< error")
} else {
wf.WithFields(logrus.Fields{
func (nc *NexusConn) logRes(res *JsonRpcRes) {
if res.req != nil {
wf := nc.log.WithFields(logrus.Fields{
"connid": nc.connId,
"id": res.Id,
"type": "response",
"remote": nc.conn.RemoteAddr().String(),
"proto": nc.proto,
"method": res.req.Method,
})
switch res.req.Method {
// Do not log verbose actions
case "pipe.read", "pipe.write", "sys.ping":
if !LogLevelIs(DebugLevel) {
return
}
}

if res.Error != nil {
wf.WithFields(logrus.Fields{
"code": res.Error.Code,
"message": res.Error.Message,
"data": res.Error.Data,
}).Info("<< error")

} else {
switch res.req.Method {

// Do not log verbose results
case "user.list", "task.list", "sys.session.list":

default:
wf = wf.WithFields(logrus.Fields{
"result": res.Result,
}).Info("<< result")
})
}
wf.Info("<< result")
}
}
}

func (nc *NexusConn) logReq(req *JsonRpcReq) {
e := nc.log.WithFields(logrus.Fields{
"connid": req.nc.connId,
"id": req.Id,
"method": req.Method,
"remote": req.nc.conn.RemoteAddr().String(),
"proto": nc.proto,
"params": truncateJson(req.Params),
"type": "request",
})

// Fine tuning of logged fields
if opts.IsProduction {
switch req.Method {

// Hide sensible parameters
case "sys.login", "user.setPass":
e = e.WithField("params", make(map[string]interface{}))

// Do not log verbose actions
case "pipe.read", "pipe.write", "sys.ping":
if !LogLevelIs(DebugLevel) {
return
}
}
}

e.Infof(">> %s", req.Method)
}

func (nc *NexusConn) pushRes(res *JsonRpcRes) (err error) {
select {
case nc.chRes <- res:
nc.logRes(res)

case <-nc.context.Done():
err = errors.New("Context cancelled")
Expand All @@ -170,27 +221,8 @@ func (nc *NexusConn) pullRes() (res *JsonRpcRes, err error) {
func (nc *NexusConn) pushReq(req *JsonRpcReq) (err error) {
select {
case nc.chReq <- req:
if req.Method != "sys.ping" || LogLevelIs(DebugLevel) {
e := nc.log.WithFields(logrus.Fields{
"connid": req.nc.connId,
"id": req.Id,
"method": req.Method,
"remote": req.nc.conn.RemoteAddr().String(),
"proto": nc.proto,
"params": truncateJson(req.Params),
"type": "request",
})

// Fine tuning of logged fields
if opts.IsProduction {
switch req.Method {
case "sys.login":
e = e.WithField("params", make(map[string]interface{}))
}
}
nc.logReq(req)

e.Infof(">> %s", req.Method)
}
case <-nc.context.Done():
err = errors.New("Context cancelled")
}
Expand Down Expand Up @@ -276,10 +308,8 @@ func (nc *NexusConn) respWorker() {
case *Task:

if !strings.HasPrefix(res.Path, "@pull.") {
cT, _ := res.CreationTime.(time.Time)
wT, _ := res.WorkingTime.(time.Time)

nc.log.WithFields(logrus.Fields{
i := nc.log.WithFields(logrus.Fields{
"type": "metric",
"kind": "taskCompleted",
"connid": nc.connId,
Expand All @@ -289,9 +319,22 @@ func (nc *NexusConn) respWorker() {
"method": res.Method,
"ttl": res.Ttl,
"targetSession": res.Tses,
"waitingTime": round(wT.Sub(cT).Seconds(), 8),
"workingTime": round(time.Since(wT).Seconds(), 8),
}).Info("Task completed")
})

if cT, ok := res.CreationTime.(time.Time); ok {
if wT, ok := res.WorkingTime.(time.Time); ok {
i = i.WithFields(logrus.Fields{
"waitingTime": round(wT.Sub(cT).Seconds(), 8),
"workingTime": round(time.Since(wT).Seconds(), 8),
})
} else {
i = i.WithFields(logrus.Fields{
"waitingTime": round(time.Now().Sub(cT).Seconds(), 8),
})
}
}

i.Info("Task completed")
}

if res.ErrCode != nil {
Expand Down Expand Up @@ -453,7 +496,7 @@ func (nc *NexusConn) close() {
}

func (nc *NexusConn) reload(fromSameSession bool) (bool, int) {
if nc.user == nil {
if nc.user == nil || nc.user == Nobody {
return false, ErrInvalidRequest
}
ud, err := loadUserData(nc.user.User)
Expand Down
112 changes: 112 additions & 0 deletions nodes.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"context"
"fmt"
"sync/atomic"
"time"

Expand Down Expand Up @@ -42,6 +44,10 @@ func nodeTrack() {
}).Errorf("Error, can't insert on nodes table")
return
}

var masterCtx context.Context
var masterCancel context.CancelFunc

// WatchDog loop
tick := time.NewTicker(time.Second * 3)
defer tick.Stop()
Expand Down Expand Up @@ -112,11 +118,15 @@ func nodeTrack() {
if !isMasterNode() {
Log.Printf("I'm the master node now")
setMasterNode(true)

masterCtx, masterCancel = context.WithCancel(context.Background())
go searchOrphaned(masterCtx)
}
} else {
if isMasterNode() {
Log.Printf("I'm NOT the master node anymore")
setMasterNode(false)
masterCancel()
}
}
}
Expand All @@ -126,12 +136,114 @@ func nodeTrack() {
exit = true
}
}

if masterCancel != nil {
masterCancel()
}

r.Table("nodes").
Get(nodeId).
Update(ei.M{"kill": true}).
RunWrite(db)
}

func searchOrphaned(ctx context.Context) {
t := time.After(time.Second)
for {
select {
case <-ctx.Done():
return

case <-t:
t = time.After(time.Minute)
nodes := make([]map[string]interface{}, 0)
tcur, err := r.Table("nodes").Pluck("id").Run(db)
if err != nil {
Log.WithFields(logrus.Fields{
"error": err,
}).Errorf("Error listing nodes")
return
}
tcur.All(&nodes)

var nodesregexp string
// (^node1|^node2|^node3)
if len(nodes) > 0 {
nodesregexp = "("
for k, node := range nodes {
nodesregexp = fmt.Sprintf("%s^%s", nodesregexp, node["id"])

if k < len(nodes)-1 {
nodesregexp = fmt.Sprintf("%s|", nodesregexp)
} else {
nodesregexp = fmt.Sprintf("%s)", nodesregexp)
}
}
} else {
Log.Errorf("Length of nodes list is 0... who am I??")
return
}

searchOrphanedStuff(nodesregexp, "sessions", "nodeId")
searchOrphanedStuff(nodesregexp, "tasks", "id")
searchOrphanedStuff(nodesregexp, "pipes", "id")
searchOrphanedStuff(nodesregexp, "locks", "owner")
}
}
}

func searchOrphanedStuff(regex, what, field string) {

orphaned, err := r.Table(what).Filter(func(ses r.Term) r.Term {
return ses.Field(field).Match(regex).Not()
}).Run(db)
if err != nil {
Log.WithFields(logrus.Fields{
"error": err,
}).Errorf("Error searching orphaned %s", what)
return
}

orphans := make([]interface{}, 0)
err = orphaned.All(&orphans)

switch err {
default:
Log.WithFields(logrus.Fields{
"error": err,
}).Errorf("Error searching orphaned %s", what)

case r.ErrEmptyResult:

case nil:
if len(orphans) <= 0 {
return
}

o := make([]string, 0)
for _, e := range orphans {
if om, ok := e.(map[string]interface{}); ok {
o = append(o, fmt.Sprintf("%s", om[field])[:8])
}
}

Log.WithFields(logrus.Fields{
"orphans": o,
}).Warnf("Found %d orphaned %s", len(o), what)

for _, s := range o {
err := dbClean(s)

if err != nil {
Log.WithFields(logrus.Fields{
"error": err,
what: s,
}).Errorln("Error deleting orphaned %s", what)
}
}
}
}

func cleanNode(node string) {
err := dbClean(node)
if err == nil {
Expand Down
Loading

0 comments on commit 5b2cbad

Please sign in to comment.