diff --git a/API.md b/API.md new file mode 100644 index 0000000..658ffb6 --- /dev/null +++ b/API.md @@ -0,0 +1,511 @@ +# Versions +## 1.0.0 +### New: + * `user.addWhitelist` + * `user.delWhitelist` + * `user.addBlacklist` + * `user.delBlacklist` + * `user.setMaxSessions` + * `sys.version` + * New error code `-32006 - ErrLockNotOwned` available for the Sync operations + + +### Modified: + * `sys.node.list` return value now includes the api version (check [sys.node.list](#sysnodelist)) + * `task.list` returns value improved (check [task.list](#tasklist)) + * `sys.session.list` field `id` renamed to `connid` + * `sys.session.kick` field `connId` renamed to `connid` + * `sys.session.reload` field `connId` renamed to `connid` + * `sys.watchdog` parameter made optional, and put into a map + * `sync.lock` and `sync.unlock` return error `-32006` on unsuccessful lock/unlock instead of `"ok":false` + * `sys.ping` returns an `"ok": true` on success + + +### Deprecates: + * `user.listTemplate` + * `sys.reload` + +## 0.1.0 + * Initial specification + +------ + +# Nexus JSONRPC 2.0 API +[JSONRPC 2.0 specification](http://www.jsonrpc.org/specification) + +> Any Nexus response which has no error but its result would be empty, has an `{ "ok": true }` instead + +# Errors + + ErrParse = -32700 + ErrInvalidRequest = -32600 + ErrInternal = -32603 + ErrInvalidParams = -32602 + ErrMethodNotFound = -32601 + ErrTtlExpired = -32011 + ErrPermissionDenied = -32010 + ErrLockNotOwned = -32006 + ErrUserExists = -32005 + ErrInvalidUser = -32004 + ErrInvalidPipe = -32003 + ErrInvalidTask = -32002 + ErrCancel = -32001 + ErrTimeout = -32000 + ErrNoError = 0 + +> Any API call can fail and return with an error instead of the documented result value, but these have been ommited below since error codes are self-explanatory. + + +# API Table of Contents + * [System](#system) + * [sys.ping](#sysping) + * [sys.version](#sysversion) + * [sys.watchdog](#syswatchdog) + * [sys.login](#syslogin) + * [sys.node.list](#sysnodelist) + * [sys.session.list](#syssessionlist) + * [sys.session.kick](#syssessionkick) + * [sys.session.reload](#syssessionreload) + * [Pipes](#pipes) + * [pipe.create](#pipecreate) + * [pipe.close](#pipeclose) + * [pipe.write](#pipewrite) + * [pipe.read](#piperead) + * [Sync](#sync) + * [sync.lock](#synclock) + * [sync.unlock](#syncunlock) + * [Tasks](#tasks) + * [task.push](#taskpush) + * [task.pull](#taskpull) + * [task.result](#taskresult) + * [task.error](#taskerror) + * [task.reject](#taskreject) + * [task.cancel](#taskcancel) + * [task.list](#tasklist) + * [Topics](#topics) + * [topic.sub](#topicsub) + * [topic.unsub](#topicunsub) + * [topic.pub](#topicpub) + * [Users](#users) + * [user.create](#usercreate) + * [user.delete](#userdelete) + * [user.list](#userlist) + * [user.setTags](#usersettags) + * [user.delTags](#userdeltags) + * [user.setPass](#usersetpass) + * [user.addTemplate](#useraddtemplate) + * [user.delTemplate](#userdeltemplate) + * [user.addWhitelist](#useraddwhitelist) + * [user.delWhitelist](#userdelwhitelist) + * [user.addBlacklist](#useraddblacklist) + * [user.delBlacklist](#userdelblacklist) + * [user.setMaxSessions](#usersetmaxsessions) + +# System + +## sys.ping +Test the connection or generate some traffic to keep the connection alive. + +### Parameter: + * `null` + +### Result: + "result": { "ok": true } + +## sys.version +Returns the semantic version of the node. + +### Parameter: + * `null` + +### Result: + "result": { "version": "0.2.0" } + +## sys.watchdog +Configure the time the connection will be considered alive without traffic. + +### Parameters: + * `"watchdog": ` - *Optional* - Sets the number of seconds the watchdog will hold. If not set, the result will show the current value. + +### Result: + "result": { "watchdog": 10 } + +## sys.login +Switches the user working with the current connection. + +### Parameters: + + * `"method": ` - *Optional* - Specifies the login method. If omitted, defaults to "basic". + +If auth method is basic: + + * `"user": ` - User to login as + * `"pass": ` - User's password + +Else, the specified method should document which fields its expecting + + +### Result: + "result": { "ok": true, "connid": , "user": } + +## sys.node.list +List the nexus nodes connected to the cluster. Includes some info about connected clients, CPU load and nexus version for each node. + +### Parameters: +* `"limit": ` - *Optional* - Limit the number of results. Defaults to 100 +* `"skip": ` - *Optional* - Skips a number of results. Defaults to 0 + +### Result: + "result": [ {"id": , "version": , "clients": , "load": {"load1": , "load5": , "load15": }}, ... ] + +## sys.session.list +List the active sessions for an user prefix on the cluster. + +### Parameters: +* `"prefix": ` - Username prefix to list from +* `"limit": ` - *Optional* - Limit the number of results. Defaults to 100 +* `"skip": ` - *Optional* - Skips a number of results. Defaults to 0 + +### Result: + "result": [{"sessions":[{"creationTime":"2016-08-30T12:39:16.39Z","connid":"687c3b7baf4b9471","nodeid":"687c3b7b","protocol":"tcp","remoteAddress":"172.17.0.1:51398"},{"creationTime":"2016-08-30T12:39:21.283Z","id":"687c3b7b407bcce2","nodeid":"687c3b7b","protocol":"tcp","remoteAddress":"172.17.0.1:51402"}],"user":"root","n":2}, ...] + +## sys.session.kick +Terminates any connection which session id matches the prefix + +### Parameters: +* `"connid": ` - Connection ID prefix + +### Result: + "result": { "kicked": 7 } + +## sys.session.reload +Reloads user data for any connection which connection id matches the prefix + +### Parameters: +* `"connid": ` - Connection ID prefix + +### Result: + "result": { "reloaded": 2 } + +# Pipes + +## pipe.create +Creates a new pipe. + +### Parameters: +* `"len": ` - *Optional* - Maximum capacity of the pipe. Defaults to 1000 + +### Result: + "result": { "pipeid": } + +## pipe.close +Closes a pipe + +### Parameters: +* `"pipeid": ` - PipeID of the pipe to close + +### Result: + "result": { "ok": true } + +## pipe.write +Writes any JSON object into a pipe. + +### Parameters: +* `"pipeid": ` - PipeID of the pipe to write to +* `"msg": ` - Data to write to the pipe + +### Result: + "result": { "ok": true } + +## pipe.read +Reads a JSON object from a pipe. Blocks until an element is available on the pipe or exceeds the timeout + +### Parameters: +* `"pipeid": ` - PipeID of the pipe to write to +* `"max": ` - Maximum number of elements to read from the pipe +* `"timeout": ` - Maximum number of second to wait for a read to happen. Defaults to blocking forever + +### Result: + { "waiting": , "drops": , "msgs": [{ "msg": , "count": }, ...] } +* `waiting`: Number of messages still in the pipe +* `drops`: Number of messages which could not be read on time, did not fit on the pipe and were lost. +* `msgs`: Array of objects containing the data written to the pipe and a secuential identifier + + +# Sync + +## sync.lock +Grabs a lock, cluster-wide. + +### Parameters: +* `"lock": ` - Name of the lock to grab + +### Result: + "result": { "ok": true } + +## sync.unlock +Frees a lock, cluster-wide. + +### Parameters: +* `"lock": ` - Name of the lock to grab + +### Result: + "result": { "ok": true } + +# Tasks + +## task.push +Calls a method which will be resolved by the system, and will return a result or an error (examples on the result section) + +### Parameters: + * `"method": ` - Which method is this task invoking + * `"params": ` - Method parameters + * `"detached": ` - The task will eventually be processed but we do not care about the result + * `"prio": ` - Sets the priority of this task among other pushes on the same method + * `"ttl": ` - How many times this task can be requeued (by a failed worker/node or a task reject) + * `"timeout": ` - How much time should a task be on any state other than "done" before the task is considered failed. + +### Result: +If "detached" is true, it will immediately receive: + + "result": { "ok": true } + +Otherwise, it will get an answer defined by the worker who pulls the task: + + "result": { "answer": 42 } + +or + + "error": {"code":123,"message":"asdf","data":""} + +## task.pull +Pulls a task from a path to work on + +### Parameters: + * `"prefix": ` - Prefix to pull tasks from + * `"timeout": ` - How much time should we wait for a task to get pulled + +### Result: + "result": {detach":false,"method":"test","params":{},"path":"asdf.","prio":0,"tags":{"@admin":true},"taskid":"687c3b7b966f55e92d376e4b6a6da37f9c8d","user":"root"} + +## task.result +Mark a task as finished successfully, and set the task result parameter + +### Parameters: + * `"taskid": ` - Task being resolved + * `"result": ` - Data delivered to the pusher as "result" + +### Result: + "result": { "ok": true } + +## task.error +Mark a task as finished with an error, and set the error fields + +### Parameters: +* `"taskid": ` - Task being resolved with an error +* `"code": ` - *Optional* - Error code +* `"message": ` - *Optional* - Error message +* `"data": ` - *Optional* - Error data + +### Result: + "result": { "ok": true } + +## task.reject +Reject a pulled task. It will be marked as waiting, and available to be pulled again. +Decrements the task's TTL + +### Parameters: +* `"taskid": ` - Task being rejected + +### Result: + "result": { "ok": true } + +## task.cancel +Cancel a task, which will mark it as cancelled and wake up whoever was waiting for its completion + +### Parameters: +* `"taskid": ` - Task being cancelled + +### Result: + "result": { "ok": true } + +## task.list +List tasks happening inside a prefix and its properties + +### Parameters: +* `"prefix": ` - Path prefix +* `"limit": ` - *Optional* - Limit the number of results. Defaults to 100 +* `"skip": ` - *Optional* - Skips a number of results. Defaults to 0 + +### Result: + "result": [{"id":"687c3b7bfbcdae7cb774d215cf923252f3fb","state":"waiting","path":"test.","priority":0,"ttl":0,"detached":false,"user":"root","method":"","params":null,"targetSession":"","result":null,"errCode":null,"errString":"","errObject":null,"tags":null,"creationTime":"2016-08-31T09:44:16.316Z","deadline":"2016-08-31T09:45:16.316Z"}, ...] + + +# Topics + +## topic.sub +Subscribe a pipe to a topic. Everything published on the topic will be written on the pipe + +### Parameters: +* `"pipeid": ` - PipeID to subscribe +* `"topic": ` - Topic to subscribe the pipe to + +### Result: + "result": { "ok": true } + + +## topic.unsub +Unsubscribe a pipe from a topic. + +### Parameters: +* `"pipeid": ` - PipeID to subscribe +* `"topic": ` - Topic to unsubscribe the pipe from + +### Result: + "result": { "ok": true } + +## topic.pub +Publish data to a topic. + +### Parameters: +* `"topic": ` - Topic to send the data to +* `"msg": ` - Data to send + +### Result: + "result": { "ok": true } + + +# Users + +## user.create +Create a new user which will be able to authenticate by basic auth + +### Parameters: +* `"user": ` - Username of the new user +* `"pass": ` - Password of the new user + +### Result: + "result": { "ok": true } + +## user.delete +Delete an existent user + +### Parameters: +* `"user": ` - Username of the user to delete + +### Result: + "result": { "ok": true } + +## user.list +Lists users which username starts with a prefix + +### Parameters: +* `"prefix": ` - Prefix where the tags will take effect +* `"limit": ` - *Optional* - Limit the number of results. Defaults to 100 +* `"skip": ` - *Optional* - Skips a number of results. Defaults to 0 + +### Result: + "result": [{"blacklist":["172.17.*"],"maxsessions":42,"tags":{test":{"@admin":true}},"templates":["template1","auth-token"],"user":"test","whitelist":["172.17.0.1"]},] + + +## user.setTags +Set a tag on an user on a prefix + +### Parameters: +* `"user": ` - Username of the user to set tags on +* `"prefix": ` - Prefix where the tags will take effect +* `"tags": ` - Tags to be set + +### Result: + "result": { "ok": true } + +## user.delTags +Remove a tag from an user on a prefix + +### Parameters: +* `"user": ` - Username of the user to remove tags from +* `"prefix": ` - Prefix where the tags will take effect +* `"tags": ` - Tags to be deleted + +### Result: + "result": { "ok": true } + +## user.setPass +Set the user password for basic auth + +### Parameters: +* `"user": ` - Username of the user +* `"pass": ` - New password + +### Result: + "result": { "ok": true } + +## user.addTemplate +Add a template to an user + +### Parameters: +* `"user": ` - Username +* `"template": ` - Template to add + +### Result: + "result": { "ok": true } + +## user.delTemplate +Remove a template from an user + +### Parameters: +* `"user": ` - Username +* `"template": ` - Template to remove + +### Result: + "result": { "ok": true } + +## user.addWhitelist +Add an address to an user whitelist + +### Parameters: +* `"user": ` - Username +* `"ip": ` - IP address. Accepts regular expressions (192.168.\*) + +### Result: + "result": { "ok": true } + +## user.delWhitelist +Remove an address to an user whitelist + +### Parameters: +* `"user": ` - Username of the user +* `"ip": ` - IP address. Accepts regular expressions (192.168.\*) + +### Result: + "result": { "ok": true } + +## user.addBlacklist +Add an address to an user blacklist + +### Parameters: +* `"user": ` - Username of the user +* `"ip": ` - IP address. Accepts regular expressions (192.168.\*) + +### Result: + "result": { "ok": true } + +## user.delBlacklist +Remove an address to an user blacklist + +### Parameters: +* `"user": ` - Username of the user +* `"ip": ` - IP address. Accepts regular expressions (192.168.\*) + +### Result: + "result": { "ok": true } + +## user.setMaxSessions +Set the maximum number of parallel sessions active of an user + +### Parameters: +* `"user": ` - Username of the user +* `"maxsessions": ` - Number of maximum sessions + +### Result: + "result": { "ok": true } diff --git a/connections.go b/connections.go index a6bcf94..1ba7842 100644 --- a/connections.go +++ b/connections.go @@ -341,7 +341,9 @@ func (nc *NexusConn) close() { nc.cancelFun() nc.conn.Close() if mainContext.Err() == nil { - Log.Printf("Closing [%s] session", nc.connId) + if nc.proto != "internal" || LogLevelIs(DebugLevel) { + Log.Printf("Closing [%s] session", nc.connId) + } dbClean(nc.connId) } } @@ -356,7 +358,14 @@ func (nc *NexusConn) reload(fromSameSession bool) (bool, int) { return false, ErrInternal } - atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&nc.user)), unsafe.Pointer(&UserData{User: ud.User, Mask: ud.Mask, Tags: maskTags(ud.Tags, ud.Mask)})) + atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&nc.user)), unsafe.Pointer(&UserData{ + User: ud.User, + Mask: nc.user.Mask, + Tags: maskTags(ud.Tags, nc.user.Mask), + MaxSessions: ud.MaxSessions, + Whitelist: ud.Whitelist, + Blacklist: ud.Blacklist, + })) if !fromSameSession { wres, err := r.Table("sessions"). @@ -395,6 +404,14 @@ func (nc *NexusConn) updateSession() { var numconn int64 +type JsonRpcReqLog struct { + ConnID string `json:"connid"` + Method string `json:"method"` + Params interface{} `json:"params"` + Remote string `json:"remoteAddr"` + ID interface{} `json:"id"` +} + func (nc *NexusConn) handle() { if nc.proto != "internal" { @@ -416,16 +433,27 @@ func (nc *NexusConn) handle() { Log.Debugf("Error on [%s] connection handler: %s", nc.connId, err) break } - params, err := json.Marshal(req.Params) - if err != nil { - Log.Printf("[%s@%s] %s: %#v - id: %.0f", req.nc.connId, req.nc.conn.RemoteAddr(), req.Method, req.Params, req.Id) - } else { - Log.Printf("[%s@%s] %s: %s - id: %.0f", req.nc.connId, req.nc.conn.RemoteAddr(), req.Method, params, req.Id) - } + if (req.Jsonrpc != "2.0" && req.Jsonrpc != "") || req.Method == "" { //"jsonrpc":"2.0" is optional req.Error(ErrInvalidRequest, "", nil) continue } + + if (req.Method != "sys.ping" && nc.proto != "internal") || LogLevelIs(DebugLevel) { + if opts.IsProduction { + d := JsonRpcReqLog{ID: req.Id, ConnID: req.nc.connId, Method: req.Method, Params: req.Params, Remote: req.nc.conn.RemoteAddr().String()} + marshalled, _ := json.Marshal(d) + Log.Printf(fmt.Sprintf("%s", marshalled)) + } else { + marshalled, err := json.Marshal(req.Params) + if err != nil { + Log.Printf("[%s@%s] %s: %#v - id: %.0f", req.nc.connId, req.nc.conn.RemoteAddr(), req.Method, req.Params, req.Id) + } else { + Log.Printf("[%s@%s] %s: %s - id: %.0f", req.nc.connId, req.nc.conn.RemoteAddr(), req.Method, marshalled, req.Id) + } + } + } + go nc.handleReq(req) } } diff --git a/database.go b/database.go index 28888ea..13d7e1a 100644 --- a/database.go +++ b/database.go @@ -4,16 +4,18 @@ import ( r "github.com/dancannon/gorethink" "github.com/jaracil/ei" . "github.com/jaracil/nexus/log" + "strings" + "time" ) var db *r.Session func dbOpen() (err error) { db, err = r.Connect(r.ConnectOpts{ - Address: opts.Rethink.Host, - Database: opts.Rethink.Database, - MaxIdle: opts.Rethink.MaxIdle, - MaxOpen: opts.Rethink.MaxOpen, + Addresses: opts.Rethink.Hosts, + Database: opts.Rethink.Database, + MaxIdle: opts.Rethink.MaxIdle, + MaxOpen: opts.Rethink.MaxOpen, }) if err != nil { return @@ -197,25 +199,45 @@ func dbBootstrap() error { func dbClean(prefix string) (err error) { // Delete all tasks from this prefix - _, err = r.Table("tasks"). + wres, err := r.Table("tasks"). Between(prefix, prefix+"\uffff"). Filter(r.Row.Field("detach").Not()). - Delete(). + Delete(r.DeleteOpts{ReturnChanges: true}). RunWrite(db, r.RunOpts{Durability: "soft"}) if err != nil { return } + for _, change := range wres.Changes { + task := ei.N(change.OldValue) + if path := task.M("path").StringZ(); !strings.HasPrefix(path, "@pull.") { + hook("task", path+task.M("method").StringZ(), task.M("user").StringZ(), ei.M{ + "action": "pusherDisconnect", + "id": task.M("id").StringZ(), + "timestamp": time.Now().UTC(), + }) + } + } + // Recover all tasks whose target session is this prefix - _, err = r.Table("tasks"). + wres, err = r.Table("tasks"). Between(prefix, prefix+"\uffff", r.BetweenOpts{Index: "tses"}). Update(r.Branch(r.Row.Field("stat").Eq("working"), map[string]interface{}{"stat": "waiting", "tses": nil, "ttl": r.Row.Field("ttl").Add(-1)}, map[string]interface{}{}), - r.UpdateOpts{ReturnChanges: false}). + r.UpdateOpts{ReturnChanges: true}). RunWrite(db, r.RunOpts{Durability: "soft"}) if err != nil { return } + for _, change := range wres.Changes { + task := ei.N(change.OldValue) + hook("task", task.M("path").StringZ()+task.M("method").StringZ(), task.M("user").StringZ(), ei.M{ + "action": "pullerDisconnect", + "id": task.M("id").StringZ(), + "timestamp": time.Now().UTC(), + }) + } + // Delete all pipes from this prefix _, err = r.Table("pipes"). Between(prefix, prefix+"\uffff"). diff --git a/errors.go b/errors.go index 6a66d80..7b04f00 100644 --- a/errors.go +++ b/errors.go @@ -3,17 +3,18 @@ package main const ( ErrParse = -32700 ErrInvalidRequest = -32600 - ErrMethodNotFound = -32601 - ErrInvalidParams = -32602 ErrInternal = -32603 - ErrTimeout = -32000 - ErrCancel = -32001 - ErrInvalidTask = -32002 - ErrInvalidPipe = -32003 - ErrInvalidUser = -32004 - ErrUserExists = -32005 - ErrPermissionDenied = -32010 + ErrInvalidParams = -32602 + ErrMethodNotFound = -32601 ErrTtlExpired = -32011 + ErrPermissionDenied = -32010 + ErrLockNotOwned = -32006 + ErrUserExists = -32005 + ErrInvalidUser = -32004 + ErrInvalidPipe = -32003 + ErrInvalidTask = -32002 + ErrCancel = -32001 + ErrTimeout = -32000 ErrNoError = 0 ) @@ -31,4 +32,5 @@ var ErrStr = map[int]string{ ErrUserExists: "User already exists", ErrPermissionDenied: "Permission denied", ErrTtlExpired: "TTL expired", + ErrLockNotOwned: "Lock not owned", } diff --git a/hooks.go b/hooks.go new file mode 100644 index 0000000..dbe8b00 --- /dev/null +++ b/hooks.go @@ -0,0 +1,223 @@ +package main + +import ( + "fmt" + "strings" + "sync" + "time" + + r "github.com/dancannon/gorethink" + "github.com/jaracil/ei" + . "github.com/jaracil/nexus/log" +) + +type HookBans struct { + *sync.RWMutex + Map map[string]time.Time +} + +type HookCache struct { + *sync.Mutex + Map map[string]*HookCacheItem +} + +type HookCacheItem struct { + List []interface{} + Expire time.Time +} + +func (c *HookCache) Get(p string) []interface{} { + c.Lock() + if res, ok := c.Map[p]; ok { + res.Expire = time.Now().Add(_hookCacheTime) + c.Unlock() + return res.List + } + c.Unlock() + return nil +} + +func (c *HookCache) Set(p string, list []interface{}) { + c.Lock() + c.Map[p] = &HookCacheItem{list, time.Now().Add(_hookCacheTime)} + c.Unlock() +} + +var _validHookTypes = []string{"task", "user"} + +var hookBans = &HookBans{ + &sync.RWMutex{}, + map[string]time.Time{}, +} +var _hookBanTime = time.Minute * 5 + +var hookCache = &HookCache{ + &sync.Mutex{}, + map[string]*HookCacheItem{}, +} +var _hookCacheTime = time.Hour +var _hookCacheExpirePeriod = time.Minute * 30 + +func hookList(ty string, path string, user string) (res []interface{}) { + p := fmt.Sprintf("%s|%s|%s", ty, path, user) + if res = hookCache.Get(p); res != nil { + return res + } + res = append(res, "hook.*", "hook."+ty+".*") + for _, ps := range topicList(path) { + res = append(res, fmt.Sprintf("hook.%s|%s", ty, ps)) + for _, us := range topicList(user) { + res = append(res, fmt.Sprintf("hook.%s|%s|%s", ty, ps, us)) + } + } + hookCache.Set(p, res) + return +} + +func hookPublish(ty string, path string, user string, message interface{}) (int, error) { + msg := ei.M{"topic": fmt.Sprintf("hook.%s|%s|%s", ty, path, user), "msg": message} + hookTopics := hookList(ty, path, user) + res, err := r.Table("pipes"). + GetAllByIndex("subs", hookTopics...). + Update(map[string]interface{}{"msg": r.Literal(msg), "count": r.Row.Field("count").Add(1), "ismsg": true}). + RunWrite(db, r.RunOpts{Durability: "soft"}) + return res.Replaced, err +} + +func hook(ty string, path string, user string, data interface{}) { + if hookIsBanned(ty, path, user) { + return + } + switch ty { + case "task", "user": + n, _ := hookPublish(ty, path, user, data) + if n == 0 { + hookBan(ty, path, user) + } + } +} + +func hookBan(ty string, path string, user string) { + hookBans.Lock() + hookBans.Map[ty+"|"+path+"|"+user] = time.Now().Add(_hookBanTime) + hookBans.Unlock() +} + +func hookUnban(ty string, path string, user string) { + hookBans.Lock() + typ, _ := normalizeHookPath(ty) + if typ == "" { // All + hookBans.Map = map[string]time.Time{} + } else { + pth, rec := normalizeHookPath(path) + if pth == "" && rec { // All paths of one type + for k, _ := range hookBans.Map { + if strings.HasPrefix(k, ty+"|") { + delete(hookBans.Map, k) + } + } + } else if rec { // Some paths of one type + for k, _ := range hookBans.Map { + if strings.HasPrefix(k, ty+"|"+pth+".") || strings.HasPrefix(k, ty+"|"+pth+"|") { + delete(hookBans.Map, k) + } + } + } else { + usr, rec := normalizeHookPath(user) + if usr == "" && rec { // All users of one path + for k, _ := range hookBans.Map { + if strings.HasPrefix(k, ty+"|"+pth+"|") { + delete(hookBans.Map, k) + } + } + } else if rec { // Some users of one path + for k, _ := range hookBans.Map { + if strings.HasPrefix(k, ty+"|"+pth+"|"+usr+".") || k == ty+"|"+pth+"|"+usr { + delete(hookBans.Map, k) + } + } + } else { // One user of one path + delete(hookBans.Map, ty+"|"+pth+"|"+usr) + } + } + } + hookBans.Unlock() +} + +func normalizeHookPath(s string) (string, bool) { + if s == "" || s == "*" { + return "", true + } + recursive := strings.HasSuffix(s, ".*") + return strings.TrimRight(s, "*."), recursive +} + +func hookIsBanned(ty string, path string, user string) bool { + hookBans.RLock() + if t, ok := hookBans.Map[ty+"|"+path+"|"+user]; ok { + if time.Since(t) <= 0 { + hookBans.RUnlock() + return true + } + hookBans.RUnlock() + hookUnban(ty, path, user) + return false + } + hookBans.RUnlock() + return false +} + +func hooksTrack() { + go hookCacheExpire() + defer exit("hooks topic-listen error") + nic := NewInternalClient() + defer nic.Close() +HookLoop: + for retry := 0; retry < 10; retry++ { + pipe, err := nic.PipeCreate() + if err != nil { + Log.Errorln("Error creating pipe on hooks topic-listen:", err.Error()) + time.Sleep(time.Second) + continue + } + _, err = nic.TopicSubscribe(pipe, "hook.listen") + if err != nil { + Log.Errorln("Error subscribing to topic on hooks topic-listen:", err.Error()) + time.Sleep(time.Second) + continue + } + retry = 0 + for { + topicData, err := pipe.TopicRead(10, time.Minute) + if err != nil { + Log.Errorln("Error reading from pipe on hooks topic-listen") + time.Sleep(time.Second) + continue HookLoop + } + if topicData.Drops != 0 { + Log.Warnf("Got %d drops reading from pipe on hooks topic-listen", topicData.Drops) + } + for _, msg := range topicData.Msgs { + m := ei.N(msg.Msg) + ty := m.M("type").StringZ() + path := m.M("path").StringZ() + user := m.M("user").StringZ() + hookUnban(ty, path, user) + } + } + } +} + +func hookCacheExpire() { + for { + time.Sleep(_hookCacheExpirePeriod) + hookCache.Lock() + now := time.Now() + for key, ci := range hookCache.Map { + if ci.Expire.Before(now) { + delete(hookCache.Map, key) + } + } + hookCache.Unlock() + } +} diff --git a/inputcheck.go b/inputcheck.go index efa73b4..6290d32 100644 --- a/inputcheck.go +++ b/inputcheck.go @@ -1,15 +1,15 @@ package main import ( - "regexp" - "github.com/jaracil/ei" "errors" + "github.com/jaracil/ei" + "regexp" ) const ( - _userRegexp = "^[a-zA-Z][a-zA-Z0-9-_.]*" - _userMinLen = 3 - _userMaxLen = 500 + _userRegexp = "^[a-zA-Z][a-zA-Z0-9-_.]*" + _userMinLen = 3 + _userMaxLen = 500 _passwordMinLen = 4 _passwordMaxLen = 500 ) @@ -44,4 +44,3 @@ func checkLen(i ei.Ei, p ...interface{}) ei.Ei { } return i } - diff --git a/listeners.go b/listeners.go index e9c7948..398c6b6 100644 --- a/listeners.go +++ b/listeners.go @@ -21,9 +21,13 @@ func listeners(ctx context.Context) { switch u.Scheme { case "tcp": - go tcpListener(u, ctx) + go tcpListener(u, ctx, false) + case "tcp+proxy": + go tcpListener(u, ctx, true) case "ssl": - go sslListener(u, ctx) + go sslListener(u, ctx, false) + case "ssl+proxy": + go sslListener(u, ctx, true) case "http": go httpListener(u, ctx) case "https": diff --git a/log/formatters.go b/log/formatters.go new file mode 100644 index 0000000..844247a --- /dev/null +++ b/log/formatters.go @@ -0,0 +1,60 @@ +package log + +import ( + "encoding/json" + "fmt" + + "github.com/Sirupsen/logrus" +) + +// ProductionFormatter is a copy of the code from the JSONFormatter but slightly modified to always include some custom fields +type ProductionFormatter struct { + // TimestampFormat sets the format used for marshaling timestamps. + TimestampFormat string + NodeID string +} + +func prefixFieldClashes(data logrus.Fields) { + if t, ok := data["time"]; ok { + data["fields.time"] = t + } + + if m, ok := data["msg"]; ok { + data["fields.msg"] = m + } + + if l, ok := data["level"]; ok { + data["fields.level"] = l + } +} + +func (f ProductionFormatter) Format(entry *logrus.Entry) ([]byte, error) { + data := make(logrus.Fields, len(entry.Data)+3) + for k, v := range entry.Data { + switch v := v.(type) { + case error: + // Otherwise errors are ignored by `encoding/json` + // https://github.com/Sirupsen/logrus/issues/137 + data[k] = v.Error() + default: + data[k] = v + } + } + prefixFieldClashes(data) + + timestampFormat := f.TimestampFormat + if timestampFormat == "" { + timestampFormat = logrus.DefaultTimestampFormat + } + + data["time"] = entry.Time.Format(timestampFormat) + data["msg"] = entry.Message + data["level"] = entry.Level.String() + data["node"] = f.NodeID + + serialized, err := json.Marshal(data) + if err != nil { + return nil, fmt.Errorf("Failed to marshal fields to JSON, %v", err) + } + return append(serialized, '\n'), nil +} diff --git a/log/log.go b/log/log.go index 69b1fd1..95f9d1d 100644 --- a/log/log.go +++ b/log/log.go @@ -1,11 +1,7 @@ // Package log package log -import ( - "strings" - - "github.com/Sirupsen/logrus" -) +import "github.com/Sirupsen/logrus" // Singleton logrus logger object with custom format. // Verbosity can be changed through SetLogLevel. @@ -25,26 +21,51 @@ func init() { customFormatter := new(logrus.TextFormatter) customFormatter.FullTimestamp = true customFormatter.TimestampFormat = "2006/01/02 15:04:05" - customFormatter.ForceColors = true Log.Formatter = customFormatter Log.Level = logrus.DebugLevel } // Sets log level to one of (debug, info, warn, error, fatal, panic) -func SetLogLevel(l string) error { - switch strings.ToLower(l) { - case "debug": +func SetLogLevel(l uint8) { + switch l { + case DebugLevel: Log.Level = logrus.DebugLevel - case "info": + case InfoLevel: Log.Level = logrus.InfoLevel - case "warn": + case WarnLevel: Log.Level = logrus.WarnLevel - case "error": + case ErrorLevel: Log.Level = logrus.ErrorLevel - case "fatal": + case FatalLevel: Log.Level = logrus.FatalLevel - case "panic": + case PanicLevel: Log.Level = logrus.PanicLevel + + default: + Log.Level = logrus.DebugLevel + } +} + +func GetLogLevel() uint8 { + switch Log.Level { + case logrus.DebugLevel: + return DebugLevel + case logrus.InfoLevel: + return InfoLevel + case logrus.WarnLevel: + return WarnLevel + case logrus.ErrorLevel: + return ErrorLevel + case logrus.FatalLevel: + return FatalLevel + case logrus.PanicLevel: + return PanicLevel + + default: + return DebugLevel } - return nil +} + +func LogLevelIs(l uint8) bool { + return GetLogLevel() == l } diff --git a/nexus.go b/nexus.go index 5a3feea..ad53da5 100644 --- a/nexus.go +++ b/nexus.go @@ -60,7 +60,18 @@ func exit(cause string) { func main() { parseOptions() + + if opts.Verbose { + SetLogLevel(DebugLevel) + } else { + SetLogLevel(InfoLevel) + } + nodeId = safeId(4) + if opts.IsProduction { + Log.Formatter = ProductionFormatter{NodeID: nodeId} + } + signal.Notify(sigChan) go signalManager() @@ -76,6 +87,7 @@ func main() { go pipeTrack() go sessionTrack() go taskPurge() + go hooksTrack() listen() diff --git a/nodes.go b/nodes.go index cba6572..5e93d5c 100644 --- a/nodes.go +++ b/nodes.go @@ -32,6 +32,7 @@ func nodeTrack() { "id": nodeId, "deadline": r.Now().Add(10), "kill": false, + "version": Version.String(), } _, err := r.Table("nodes").Insert(ndata).RunWrite(db) if err != nil { @@ -150,13 +151,13 @@ func (nc *NexusConn) handleNodesReq(req *JsonRpcReq) { return } - term := r.Table("nodes").Pluck("id", "clients", "load") + term := r.Table("nodes").Pluck("id", "clients", "load", "version") if skip >= 0 { term = term.Skip(skip) } - if limit >= 0 { + if limit > 0 { term = term.Limit(limit) } diff --git a/options.go b/options.go index 7a820ec..78d05f1 100644 --- a/options.go +++ b/options.go @@ -7,19 +7,20 @@ import ( ) var opts struct { - Verbose []bool `short:"v" long:"verbose" description:"Show verbose debug information"` - Listeners []string `short:"l" long:"listen" description:"Listen on (tcp|ssl|http|https)://addr:port" default:"tcp://0.0.0.0:1717"` - Rethink RethinkOptions `group:"RethinkDB Options"` - SSL SSLOptions `group:"SSL Options"` + Verbose bool `short:"v" long:"verbose" description:"Show debug information"` + Listeners []string `short:"l" long:"listen" description:"Listen on (tcp|tcp+proxy|ssl|ssl+proxy|http|https)://addr:port" default:"tcp://0.0.0.0:1717"` + IsProduction bool `long:"production" description:"Enables Production mode"` + Rethink RethinkOptions `group:"RethinkDB Options"` + SSL SSLOptions `group:"SSL Options"` } type RethinkOptions struct { - Host string `short:"r" long:"rethinkdb" description:"RethinkDB host[:port]" default:"localhost:28015"` - Database string `short:"d" long:"database" description:"RethinkDB database" default:"nexus"` - MaxIdle int `long:"maxidle" description:"Max RethinkDB idle connections" default:"50"` - MaxOpen int `long:"maxopen" description:"Max RethinkDB open connections" default:"200"` - DefPipeLen int `long:"defpipelen" description:"Default pipe length" default:"1000"` - MaxPipeLen int `long:"maxpipelen" description:"Max pipe length" default:"100000"` + Hosts []string `short:"r" long:"rethinkdb" description:"RethinkDB host[:port]" default:"localhost:28015"` + Database string `short:"d" long:"database" description:"RethinkDB database" default:"nexus"` + MaxIdle int `long:"maxidle" description:"Max RethinkDB idle connections" default:"50"` + MaxOpen int `long:"maxopen" description:"Max RethinkDB open connections" default:"200"` + DefPipeLen int `long:"defpipelen" description:"Default pipe length" default:"1000"` + MaxPipeLen int `long:"maxpipelen" description:"Max pipe length" default:"100000"` } type SSLOptions struct { diff --git a/sessions.go b/sessions.go index 497fe29..e92a8f1 100644 --- a/sessions.go +++ b/sessions.go @@ -26,8 +26,8 @@ func sessionTrack() { Between(nodeId, nodeId+"\uffff"). Changes(r.ChangesOpts{IncludeInitial: true, Squash: false}). Pluck(map[string]interface{}{ - "new_val": []string{"id", "kick", "reload"}, - "old_val": []string{"id"}}). + "new_val": []string{"id", "kick", "reload"}, + "old_val": []string{"id"}}). Run(db) if err != nil { Log.Errorf("Error opening sessionTrack iterator:%s", err.Error()) @@ -70,22 +70,32 @@ func (nc *NexusConn) handleSessionReq(req *JsonRpcReq) { } term := r.Table("sessions"). Between(prefix, prefix+"\uffff", r.BetweenOpts{Index: "users"}). + Map(func(row r.Term) interface{} { + return ei.M{"user": row.Field("user"), + "connid": row.Field("id"), + "nodeid": row.Field("nodeId"), + "remoteAddress": row.Field("remoteAddress"), + "creationTime": row.Field("creationTime"), + "protocol": row.Field("protocol")} + }). Group("user"). - Pluck("id", "nodeId", "remoteAddress", "creationTime", "protocol"). + Pluck("connid", "nodeid", "remoteAddress", "creationTime", "protocol"). Filter(r.Row.Field("protocol").Ne("internal")) if skip >= 0 { term = term.Skip(skip) } - if limit >= 0 { + if limit > 0 { term = term.Limit(limit) } cur, err := term.Ungroup(). Map(func(row r.Term) interface{} { - return ei.M{"user": row.Field("group"), "sessions": row.Field("reduction"), "n": row.Field("reduction").Count()} - }).Run(db) + return ei.M{"user": row.Field("group"), + "sessions": row.Field("reduction"), + "n": row.Field("reduction").Count()} + }).Run(db) if err != nil { req.Error(ErrInternal, "", nil) return @@ -101,13 +111,22 @@ func (nc *NexusConn) handleSessionReq(req *JsonRpcReq) { fallthrough case "sys.session.reload": action := req.Method[12:] - prefix := ei.N(req.Params).M("connId").StringZ() + prefix := ei.N(req.Params).M("connid").StringZ() if len(prefix) < 16 { req.Error(ErrInvalidParams, "", nil) return } + if action == "reload" && prefix == nc.connId { + if done, errcode := nc.reload(true); !done { + req.Error(errcode, "", nil) + } else { + req.Result(ei.M{"reloaded": 1}) + } + return + } + connuser, err := r.Table("sessions"). Between(prefix, prefix+"\uffff"). Pluck("user"). diff --git a/sync.go b/sync.go index f410bd1..f4fe2a8 100644 --- a/sync.go +++ b/sync.go @@ -23,13 +23,17 @@ func (nc *NexusConn) handleSyncReq(req *JsonRpcReq) { RunWrite(db, r.RunOpts{Durability: "hard"}) if err != nil { if r.IsConflictErr(err) { - req.Result(ei.M{"ok": false}) + req.Error(ErrLockNotOwned, "", nil) } else { req.Error(ErrInternal, "", nil) } return } - req.Result(ei.M{"ok": res.Inserted > 0}) + if res.Inserted <= 0 { + req.Error(ErrLockNotOwned, "", nil) + return + } + req.Result(ei.M{"ok": true}) case "sync.unlock": lock, err := ei.N(req.Params).M("lock").String() if err != nil { @@ -51,7 +55,11 @@ func (nc *NexusConn) handleSyncReq(req *JsonRpcReq) { req.Error(ErrInternal, err.Error(), nil) return } - req.Result(ei.M{"ok": res.Deleted > 0}) + if res.Deleted <= 0 { + req.Error(ErrLockNotOwned, "", nil) + return + } + req.Result(ei.M{"ok": true}) default: req.Error(ErrMethodNotFound, "", nil) } diff --git a/sys.go b/sys.go index 93cc8c4..e10a55e 100644 --- a/sys.go +++ b/sys.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "reflect" + "regexp" "strings" "sync/atomic" "time" @@ -11,6 +12,7 @@ import ( r "github.com/dancannon/gorethink" "github.com/jaracil/ei" + . "github.com/jaracil/nexus/log" "github.com/jaracil/nxcli/nxcore" ) @@ -22,15 +24,20 @@ type LoginResponse struct { func (nc *NexusConn) handleSysReq(req *JsonRpcReq) { switch req.Method { case "sys.ping": - req.Result("pong") + req.Result(ei.M{"ok": true}) + + case "sys.version": + req.Result(ei.M{"version": Version.String()}) case "sys.watchdog": - wdt := ei.N(req.Params).Int64Z() - if wdt < 10 { - wdt = 10 + wdt, err := ei.N(req.Params).M("watchdog").Lower().Int64() + if err == nil { + if wdt < 10 { + wdt = 10 + } + atomic.StoreInt64(&nc.wdog, wdt) } - atomic.StoreInt64(&nc.wdog, wdt) - req.Result(ei.M{"ok": true, "watchdog": wdt}) + req.Result(ei.M{"watchdog": nc.wdog}) case "sys.login": var user string @@ -41,7 +48,7 @@ func (nc *NexusConn) handleSysReq(req *JsonRpcReq) { switch method { case "", "basic": var err int - user, mask, err = nc.BasicAuth(req.Params) + user, _, err = nc.BasicAuth(req.Params) if err != ErrNoError { req.Error(err, "", nil) return @@ -71,22 +78,40 @@ func (nc *NexusConn) handleSysReq(req *JsonRpcReq) { mask = loginResponse.Tags } + // Check user limits + ud, err := loadUserData(user) if err != ErrNoError { req.Error(err, "", nil) return } - atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&nc.user)), unsafe.Pointer(&UserData{User: ud.User, Mask: mask, Tags: maskTags(ud.Tags, mask)})) + if !nc.checkUserLimits(ud) { + req.Error(ErrPermissionDenied, "", nil) + return + } + // LOGGED IN! + atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&nc.user)), unsafe.Pointer(&UserData{ + User: ud.User, + Mask: mask, + Tags: maskTags(ud.Tags, mask), + MaxSessions: ud.MaxSessions, + Whitelist: ud.Whitelist, + Blacklist: ud.Blacklist, + })) nc.updateSession() - req.Result(ei.M{"ok": true, "user": nc.user.User, "connId": nc.connId}) - case "sys.reload": - if done, errcode := nc.reload(true); !done { - req.Error(errcode, "", nil) - } else { - req.Result(ei.M{"ok": true}) - } + hook("user", ud.User, ud.User, ei.M{ + "action": "login", + "user": nc.user.User, + "mask": nc.user.Mask, + "tags": nc.user.Tags, + "maxSessions": nc.user.MaxSessions, + "whitelist": nc.user.Whitelist, + "blacklist": nc.user.Blacklist, + }) + req.Result(ei.M{"ok": true, "user": nc.user.User, "connid": nc.connId}) + default: req.Error(ErrMethodNotFound, "", nil) } @@ -165,6 +190,46 @@ func mergeTags(src, dst *UserData) { } } +func (nc *NexusConn) checkUserLimits(ud *UserData) bool { + nci := NewInternalClient() + defer nci.Close() + + // Max Sessions opened? + // soft limit because race condition checking sessions + sessions, err := nci.SessionList(ud.User, -1, 0) + seslen := 0 + for _, u := range sessions { + if u.User == ud.User { + seslen = len(u.Sessions) + break + } + } + if err != nil || (ud.MaxSessions > 0 && seslen+1 > ud.MaxSessions) { + Log.Warnf("User %s has too many sessions opened: %d/%d", ud.User, seslen, ud.MaxSessions) + return false + } + + remoteaddr := nc.conn.RemoteAddr().String() + + // Whitelisted? + for _, wr := range ud.Whitelist { + if match, err := regexp.MatchString(wr, remoteaddr); err == nil && match { + Log.Warnf("User %s from %s whitelisted by %s", ud.User, remoteaddr, wr) + return true + } + } + + // Blacklisted? + for _, br := range ud.Blacklist { + if match, err := regexp.MatchString(br, remoteaddr); err == nil && match { + Log.Warnf("User %s from %s blacklisted by %s", ud.User, remoteaddr, br) + return false + } + } + + return true +} + func (nc *NexusConn) BasicAuth(params interface{}) (string, map[string]map[string]interface{}, int) { user, err := ei.N(params).M("user").Lower().String() if err != nil { diff --git a/tasks.go b/tasks.go index cc3e11d..ab00276 100644 --- a/tasks.go +++ b/tasks.go @@ -10,24 +10,24 @@ import ( ) type Task struct { - Id string `gorethink:"id"` - Stat string `gorethink:"stat"` - Path string `gorethink:"path"` - Prio int `gorethink:"prio"` - Ttl int `gorethink:"ttl"` - Detach bool `gorethink:"detach"` - User string `gorethink:"user"` - Method string `gorethink:"method"` - Params interface{} `gorethink:"params"` - LocalId interface{} `gorethink:"localId"` - Tses string `gorethink:"tses"` - Result interface{} `gorethink:"result,omitempty"` - ErrCode *int `gorethink:"errCode,omitempty"` - ErrStr string `gorethink:"errStr,omitempty"` - ErrObj interface{} `gorethink:"errObj,omitempty"` - Tags interface{} `gorethink:"tags,omitempty"` - CreationTime interface{} `gorethink:"creationTime,omitempty"` - DeadLine interface{} `gorethink:"deadLine,omitempty"` + Id string `gorethink:"id" json:"id"` + Stat string `gorethink:"stat" json:"state""` + Path string `gorethink:"path" json:"path"` + Prio int `gorethink:"prio" json:"priority"` + Ttl int `gorethink:"ttl" json:"ttl"` + Detach bool `gorethink:"detach" json:"detached"` + User string `gorethink:"user" json:"user"` + Method string `gorethink:"method" json:"method"` + Params interface{} `gorethink:"params" json:"params"` + LocalId interface{} `gorethink:"localId" json:"-"` + Tses string `gorethink:"tses" json:"targetSession"` + Result interface{} `gorethink:"result,omitempty" json:"result"` + ErrCode *int `gorethink:"errCode,omitempty" json:"errCode"` + ErrStr string `gorethink:"errStr,omitempty" json:"errString"` + ErrObj interface{} `gorethink:"errObj,omitempty" json:"errObject"` + Tags interface{} `gorethink:"tags,omitempty" json:"tags"` + CreationTime interface{} `gorethink:"creationTime,omitempty" json:"creationTime"` + DeadLine interface{} `gorethink:"deadLine,omitempty" json:"deadline"` } type TaskFeed struct { @@ -43,13 +43,26 @@ func taskPurge() { select { case <-tick.C: if isMasterNode() { - r.Table("tasks"). + wres, err := r.Table("tasks"). Between(r.MinVal, r.Now(), r.BetweenOpts{Index: "deadLine"}). Update(r.Branch(r.Row.Field("stat").Ne("done"), - ei.M{"stat": "done", "errCode": ErrTimeout, "errStr": ErrStr[ErrTimeout], "deadLine": r.Now().Add(600)}, - ei.M{}), - r.UpdateOpts{ReturnChanges: false}). + ei.M{"stat": "done", "errCode": ErrTimeout, "errStr": ErrStr[ErrTimeout], "deadLine": r.Now().Add(600)}, + ei.M{}), + r.UpdateOpts{ReturnChanges: true}). RunWrite(db, r.RunOpts{Durability: "soft"}) + if err == nil { + for _, change := range wres.Changes { + task := ei.N(change.OldValue) + if path := task.M("path").StringZ(); !strings.HasPrefix(path, "@pull.") { + hook("task", path+task.M("method").StringZ(), task.M("user").StringZ(), ei.M{ + "action": "timeout", + "id": task.M("id").StringZ(), + "timestamp": time.Now().UTC(), + }) + } + } + } + r.Table("tasks"). Between(r.MinVal, r.Now(), r.BetweenOpts{Index: "deadLine"}). Filter(r.Row.Field("stat").Eq("done")). @@ -119,9 +132,9 @@ func taskPull(task *Task) bool { Between(ei.S{prefix, "waiting", r.MinVal, r.MinVal}, ei.S{prefix, "waiting", r.MaxVal, r.MaxVal}, r.BetweenOpts{RightBound: "closed", Index: "pspc"}). Limit(1). Update(r.Branch(r.Row.Field("stat").Eq("waiting"), - ei.M{"stat": "working", "tses": task.Id[0:16]}, - ei.M{}), - r.UpdateOpts{ReturnChanges: true}). + ei.M{"stat": "working", "tses": task.Id[0:16]}, + ei.M{}), + r.UpdateOpts{ReturnChanges: true}). RunWrite(db, r.RunOpts{Durability: "soft"}) if err != nil { break @@ -140,8 +153,8 @@ func taskPull(task *Task) bool { pres, err := r.Table("tasks"). Get(task.Id). Update(r.Branch(r.Row.Field("stat").Eq("working"), - ei.M{"stat": "done", "result": result, "deadLine": r.Now().Add(600)}, - ei.M{})). + ei.M{"stat": "done", "result": result, "deadLine": r.Now().Add(600)}, + ei.M{})). RunWrite(db, r.RunOpts{Durability: "soft"}) if err != nil || pres.Replaced != 1 { r.Table("tasks"). @@ -150,6 +163,14 @@ func taskPull(task *Task) bool { RunWrite(db, r.RunOpts{Durability: "soft"}) break } + hook("task", newTask.M("path").StringZ()+newTask.M("method").StringZ(), newTask.M("user").StringZ(), ei.M{ + "action": "pull", + "id": result["taskid"], + "connid": task.Id[0:16], + "user": task.User, + "ttl": newTask.M("ttl").IntZ(), + "timestamp": time.Now().UTC(), + }) return true } if wres.Unchanged > 0 { @@ -160,8 +181,8 @@ func taskPull(task *Task) bool { r.Table("tasks"). Get(task.Id). Update(r.Branch(r.Row.Field("stat").Eq("working"), - ei.M{"stat": "waiting"}, - ei.M{})). + ei.M{"stat": "waiting"}, + ei.M{})). RunWrite(db, r.RunOpts{Durability: "soft"}) return false } @@ -169,11 +190,13 @@ func taskPull(task *Task) bool { func taskWakeup(task *Task) bool { for { wres, err := r.Table("tasks"). - Between(ei.S{"@pull." + task.Path, "waiting", r.MinVal, r.MinVal}, ei.S{"@pull." + task.Path, "waiting", r.MaxVal, r.MaxVal}, r.BetweenOpts{RightBound: "closed", Index: "pspc"}). + Between(ei.S{"@pull." + task.Path, "waiting", r.MinVal, r.MinVal}, + ei.S{"@pull." + task.Path, "waiting", r.MaxVal, r.MaxVal}, + r.BetweenOpts{RightBound: "closed", Index: "pspc"}). Sample(1). Update(r.Branch(r.Row.Field("stat").Eq("waiting"), - ei.M{"stat": "working"}, - ei.M{})). + ei.M{"stat": "working"}, + ei.M{})). RunWrite(db, r.RunOpts{Durability: "soft"}) if err != nil { return false @@ -194,10 +217,20 @@ func deleteTask(id string) { } func taskExpireTtl(taskid string) { - r.Table("tasks"). + wres, err := r.Table("tasks"). Get(taskid). - Update(ei.M{"stat": "done", "errCode": ErrTtlExpired, "errStr": ErrStr[ErrTtlExpired], "deadLine": r.Now().Add(600)}). + Update(ei.M{"stat": "done", "errCode": ErrTtlExpired, "errStr": ErrStr[ErrTtlExpired], "deadLine": r.Now().Add(600)}, r.UpdateOpts{ReturnChanges: true}). RunWrite(db, r.RunOpts{Durability: "soft"}) + if err == nil { + for _, change := range wres.Changes { + task := ei.N(change.OldValue) + hook("task", task.M("path").StringZ()+task.M("method").StringZ(), task.M("user").StringZ(), ei.M{ + "action": "ttlExpired", + "id": task.M("id").StringZ(), + "timestamp": time.Now().UTC(), + }) + } + } } func (nc *NexusConn) handleTaskReq(req *JsonRpcReq) { @@ -245,11 +278,26 @@ func (nc *NexusConn) handleTaskReq(req *JsonRpcReq) { CreationTime: r.Now(), DeadLine: r.Now().Add(timeout), } - _, err = r.Table("tasks").Insert(task).RunWrite(db, r.RunOpts{Durability: "soft"}) + _, err = r.Table("tasks").Insert(task, r.InsertOpts{}).RunWrite(db, r.RunOpts{Durability: "soft"}) if err != nil { req.Error(ErrInternal, "", nil) return } + hook("task", task.Path+task.Method, task.User, ei.M{ + "action": "push", + "id": task.Id, + "connid": nc.connId, + "user": nc.user.User, + "tags": nc.user.Tags, + "path": path, + "method": met, + "params": params, + "detach": detach, + "ttl": ttl, + "prio": prio, + "timestamp": time.Now().UTC(), + "timeout": timeout, + }) if detach { req.Result(ei.M{"ok": true}) } @@ -283,6 +331,7 @@ func (nc *NexusConn) handleTaskReq(req *JsonRpcReq) { LocalId: req.Id, CreationTime: r.Now(), DeadLine: r.Now().Add(timeout), + User: nc.user.User, } _, err := r.Table("tasks").Insert(task).RunWrite(db, r.RunOpts{Durability: "soft"}) if err != nil { @@ -295,13 +344,20 @@ func (nc *NexusConn) handleTaskReq(req *JsonRpcReq) { result := ei.N(req.Params).M("result").RawZ() res, err := r.Table("tasks"). Get(taskid). - Update(ei.M{"stat": "done", "result": result, "deadLine": r.Now().Add(600)}). + Update(ei.M{"stat": "done", "result": result, "deadLine": r.Now().Add(600)}, r.UpdateOpts{ReturnChanges: true}). RunWrite(db, r.RunOpts{Durability: "soft"}) if err != nil { req.Error(ErrInternal, "", nil) return } if res.Replaced > 0 { + task := ei.N(res.Changes[0].OldValue) + hook("task", task.M("path").StringZ()+task.M("method").StringZ(), task.M("user").StringZ(), ei.M{ + "action": "result", + "id": taskid, + "result": result, + "timestamp": time.Now().UTC(), + }) req.Result(ei.M{"ok": true}) } else { req.Error(ErrInvalidTask, "", nil) @@ -314,13 +370,22 @@ func (nc *NexusConn) handleTaskReq(req *JsonRpcReq) { data := ei.N(req.Params).M("data").RawZ() res, err := r.Table("tasks"). Get(taskid). - Update(ei.M{"stat": "done", "errCode": code, "errStr": message, "errObj": data, "deadLine": r.Now().Add(600)}). + Update(ei.M{"stat": "done", "errCode": code, "errStr": message, "errObj": data, "deadLine": r.Now().Add(600)}, r.UpdateOpts{ReturnChanges: true}). RunWrite(db, r.RunOpts{Durability: "soft"}) if err != nil { req.Error(ErrInternal, "", nil) return } if res.Replaced > 0 { + task := ei.N(res.Changes[0].OldValue) + hook("task", task.M("path").StringZ()+task.M("method").StringZ(), task.M("user").StringZ(), ei.M{ + "action": "error", + "id": taskid, + "code": code, + "message": message, + "data": data, + "timestamp": time.Now().UTC(), + }) req.Result(ei.M{"ok": true}) } else { req.Error(ErrInvalidTask, "", nil) @@ -330,13 +395,19 @@ func (nc *NexusConn) handleTaskReq(req *JsonRpcReq) { taskid := ei.N(req.Params).M("taskid").StringZ() res, err := r.Table("tasks"). Get(taskid). - Update(ei.M{"stat": "waiting", "tses": nil, "ttl": r.Row.Field("ttl").Add(-1)}). + Update(ei.M{"stat": "waiting", "tses": nil, "ttl": r.Row.Field("ttl").Add(-1)}, r.UpdateOpts{ReturnChanges: true}). RunWrite(db, r.RunOpts{Durability: "soft"}) if err != nil { req.Error(ErrInternal, "", nil) return } if res.Replaced > 0 { + task := ei.N(res.Changes[0].OldValue) + hook("task", task.M("path").StringZ()+task.M("method").StringZ(), task.M("user").StringZ(), ei.M{ + "action": "reject", + "id": taskid, + "timestamp": time.Now().UTC(), + }) req.Result(ei.M{"ok": true}) } else { req.Error(ErrInvalidTask, "", nil) @@ -344,20 +415,26 @@ func (nc *NexusConn) handleTaskReq(req *JsonRpcReq) { case "task.cancel": id := ei.N(req.Params).M("id").RawZ() - res, err := r.Table("tasks"). + wres, err := r.Table("tasks"). Between(nc.connId, nc.connId+"\uffff"). Filter(r.Row.Field("localId").Eq(id)). Update(r.Branch(r.Row.Field("stat").Ne("done"), - ei.M{"stat": "done", "errCode": ErrCancel, "errStr": ErrStr[ErrCancel], "deadLine": r.Now().Add(600)}, - ei.M{}), - r.UpdateOpts{ReturnChanges: false}). + ei.M{"stat": "done", "errCode": ErrCancel, "errStr": ErrStr[ErrCancel], "deadLine": r.Now().Add(600)}, + ei.M{}), + r.UpdateOpts{ReturnChanges: true}). RunWrite(db, r.RunOpts{Durability: "soft"}) if err != nil { req.Error(ErrInternal, "", nil) return } - if res.Replaced > 0 { + if wres.Replaced > 0 { + task := ei.N(wres.Changes[0].NewValue) + hook("task", task.M("path").StringZ()+task.M("method").StringZ(), task.M("user").StringZ(), ei.M{ + "action": "cancel", + "id": task.M("taskid").StringZ(), + "timestamp": time.Now().UTC(), + }) req.Result(ei.M{"ok": true}) } else { req.Error(ErrInvalidTask, "", nil) @@ -382,13 +459,13 @@ func (nc *NexusConn) handleTaskReq(req *JsonRpcReq) { req.Error(ErrPermissionDenied, "", nil) return } - term := r.Table("tasks").Pluck("path") + term := r.Table("tasks") if skip >= 0 { term = term.Skip(skip) } - if limit >= 0 { + if limit > 0 { term = term.Limit(limit) } @@ -397,20 +474,15 @@ func (nc *NexusConn) handleTaskReq(req *JsonRpcReq) { req.Error(ErrInternal, "", nil) return } - pulls := make(map[string]int) - pushs := make(map[string]int) - var task Task - for cur.Next(&task) { - if strings.HasPrefix(task.Path, "@pull."+prefix) { - p := strings.TrimPrefix(task.Path, "@pull.") - pulls[p]++ - } else if strings.HasPrefix(task.Path, prefix) { - pushs[task.Path]++ - } + ret := make([]*Task, 0) + cur.All(&ret) + + for _, task := range ret { + task.Path = strings.TrimPrefix(task.Path, "@pull.") + task.Params = truncateJson(task.Params) + task.ErrObj = truncateJson(task.ErrObj) } - ret := make(map[string]interface{}) - ret["pulls"] = pulls - ret["pushes"] = pushs + req.Result(ret) default: req.Error(ErrMethodNotFound, "", nil) diff --git a/tcplistener.go b/tcplistener.go index 809ab2e..009ab89 100644 --- a/tcplistener.go +++ b/tcplistener.go @@ -5,11 +5,12 @@ import ( "net" "net/url" + "github.com/armon/go-proxyproto" . "github.com/jaracil/nexus/log" "golang.org/x/net/context" ) -func tcpListener(u *url.URL, ctx context.Context) { +func tcpListener(u *url.URL, ctx context.Context, proxyed bool) { defer Log.Println("Listener", u, "finished") addr, err := net.ResolveTCPAddr("tcp", u.Host) @@ -19,13 +20,19 @@ func tcpListener(u *url.URL, ctx context.Context) { return } - listen, err := net.ListenTCP("tcp", addr) + var listen net.Listener + + listen, err = net.ListenTCP("tcp", addr) if err != nil { Log.Println("Cannot open tcpListener:", err) exit("tcpListener goroutine error") return } + if proxyed { + listen = &proxyproto.Listener{Listener: listen} + } + Log.Println("Listening on", u) go func() { @@ -37,6 +44,7 @@ func tcpListener(u *url.URL, ctx context.Context) { for { conn, err := listen.Accept() + if ctx.Err() == nil { if err != nil { Log.Errorln("Error accepting tcp socket:", err) @@ -44,7 +52,7 @@ func tcpListener(u *url.URL, ctx context.Context) { return } else { Log.Warnf("Unencrypted connection from %s!", conn.RemoteAddr()) - Log.Print("TCP connection from:", conn.RemoteAddr()) + Log.Printf("TCP connection from: %s", conn.RemoteAddr()) nc := NewNexusConn(conn) nc.proto = "tcp" go nc.handle() @@ -52,11 +60,10 @@ func tcpListener(u *url.URL, ctx context.Context) { } else { return } - } } -func sslListener(u *url.URL, ctx context.Context) { +func sslListener(u *url.URL, ctx context.Context, proxyed bool) { defer Log.Println("Listener", u, "finished") Log.Debugln("Loading SSL cert/key") @@ -70,11 +77,32 @@ func sslListener(u *url.URL, ctx context.Context) { tlsConfig := &tls.Config{} tlsConfig.Certificates = []tls.Certificate{cert} - listen, err := tls.Listen("tcp", u.Host, tlsConfig) - if err != nil && ctx.Err() == nil { - Log.Errorln("Cannot open sslListener:", err) - exit("sslListener goroutine error") - return + var listen net.Listener + + if proxyed { + addr, err := net.ResolveTCPAddr("tcp", u.Host) + if err != nil { + Log.Errorln("Cannot resolve the address: ", err) + exit("ssl+proxy Listener goroutine error") + return + } + + l, err := net.ListenTCP("tcp", addr) + if err != nil { + Log.Println("Cannot open ssl+proxy Listener:", err) + exit("ssl+proxy Listener goroutine error") + return + } + + proxyListen := &proxyproto.Listener{Listener: l} + listen = tls.NewListener(proxyListen, tlsConfig) + } else { + listen, err = tls.Listen("tcp", u.Host, tlsConfig) + if err != nil && ctx.Err() == nil { + Log.Errorln("Cannot open sslListener:", err) + exit("sslListener goroutine error") + return + } } Log.Println("Listening on", u) @@ -87,14 +115,16 @@ func sslListener(u *url.URL, ctx context.Context) { }() for { + conn, err := listen.Accept() + if ctx.Err() == nil { if err != nil { Log.Errorln("Error accepting ssl socket:", err) exit("sslListener goroutine error") return } else { - Log.Println("SSL connection from:", conn.RemoteAddr()) + Log.Printf("SSL connection from: %s", conn.RemoteAddr()) nc := NewNexusConn(conn) nc.proto = "ssl" go nc.handle() diff --git a/test/main_test.go b/test/main_test.go index d997c9d..f1bc874 100644 --- a/test/main_test.go +++ b/test/main_test.go @@ -7,8 +7,8 @@ import ( "testing" "time" - nxcli "github.com/jaracil/nxcli" - nexus "github.com/jaracil/nxcli/nxcore" + nxcli "github.com/nayarsystems/nxgo" + nexus "github.com/nayarsystems/nxgo/nxcore" ) var NexusServer = "localhost:1717" diff --git a/test/pipe_test.go b/test/pipe_test.go index abbee42..c164cb5 100644 --- a/test/pipe_test.go +++ b/test/pipe_test.go @@ -1,9 +1,10 @@ package test import ( - nexus "github.com/jaracil/nxcli/nxcore" "testing" "time" + + nexus "github.com/nayarsystems/nxgo/nxcore" ) func TestPipeUnexisting(t *testing.T) { diff --git a/test/sync_test.go b/test/sync_test.go index 5c844f8..9ab10af 100644 --- a/test/sync_test.go +++ b/test/sync_test.go @@ -3,6 +3,8 @@ package test import ( "testing" "time" + + "github.com/nayarsystems/nxgo/nxcore" ) func TestSyncUnlockNotLocked(t *testing.T) { @@ -11,9 +13,9 @@ func TestSyncUnlockNotLocked(t *testing.T) { t.Fatalf("login with UserA: %s", err.Error()) } defer ses.Close() - if done, err := ses.Unlock(Prefix3); err != nil { + if _, err := ses.Unlock(Prefix3); err != nil && !IsNexusErrCode(err, nxcore.ErrLockNotOwned) { t.Errorf("sync.unlock not locked: %s", err.Error()) - } else if done { + } else if err == nil { t.Errorf("sync.unlock not locked: expecting not done") } } @@ -25,14 +27,14 @@ func TestSyncRelock(t *testing.T) { } defer ses.Close() - if done, err := ses.Lock(Prefix3); err != nil { + if _, err := ses.Lock(Prefix3); err != nil && !IsNexusErrCode(err, nxcore.ErrLockNotOwned) { t.Errorf("sync.lock: %s", err.Error()) - } else if !done { + } else if err != nil { t.Errorf("sync.lock: expecting done") } - if done, err := ses.Lock(Prefix3); err != nil { + if _, err := ses.Lock(Prefix3); err != nil && !IsNexusErrCode(err, nxcore.ErrLockNotOwned) { t.Errorf("sync.lock: %s", err.Error()) - } else if done { + } else if err == nil { t.Errorf("sync.lock: expecting not done") } ses.Unlock(Prefix3) @@ -51,18 +53,18 @@ func TestSyncLockFail(t *testing.T) { defer sesb.Close() // Lock - if done, err := sesa.Lock(Prefix3); err != nil { + if _, err := sesa.Lock(Prefix3); err != nil && !IsNexusErrCode(err, nxcore.ErrLockNotOwned) { t.Errorf("sync.lock: %s", err.Error()) - } else if !done { + } else if err != nil { t.Errorf("sync.lock: expecting done") } time.Sleep(time.Millisecond * 100) - + // Fail to lock from another session - if done, err := sesb.Lock(Prefix3); err != nil { + if _, err := sesb.Lock(Prefix3); err != nil && !IsNexusErrCode(err, nxcore.ErrLockNotOwned) { t.Errorf("sync.lock: %s", err.Error()) - } else if done { + } else if err == nil { t.Errorf("sync.lock: expecting not done") } sesa.Unlock(Prefix3) @@ -82,9 +84,9 @@ func TestSyncUnlockSesClose(t *testing.T) { defer sesb.Close() // Lock - if done, err := sesb.Lock(Prefix3); err != nil { + if _, err := sesb.Lock(Prefix3); err != nil && !IsNexusErrCode(err, nxcore.ErrLockNotOwned) { t.Errorf("sync.lock: %s", err.Error()) - } else if !done { + } else if err != nil { t.Errorf("sync.lock: expecting done") } @@ -94,9 +96,9 @@ func TestSyncUnlockSesClose(t *testing.T) { time.Sleep(time.Second * 1) // Lock - if done, err := sesa.Lock(Prefix3); err != nil { + if _, err := sesa.Lock(Prefix3); err != nil && !IsNexusErrCode(err, nxcore.ErrLockNotOwned) { t.Errorf("sync.lock: %s", err.Error()) - } else if !done { + } else if err != nil { t.Errorf("sync.lock: expecting done") } sesa.Unlock(Prefix3) diff --git a/test/sys_test.go b/test/sys_test.go index 43a2a9d..e73aabb 100644 --- a/test/sys_test.go +++ b/test/sys_test.go @@ -5,8 +5,8 @@ import ( "testing" "time" - nxcli "github.com/jaracil/nxcli" - nexus "github.com/jaracil/nxcli/nxcore" + nxcli "github.com/nayarsystems/nxgo" + nexus "github.com/nayarsystems/nxgo/nxcore" ) func TestPing(t *testing.T) { diff --git a/test/task_test.go b/test/task_test.go index 9d064ee..483e47c 100644 --- a/test/task_test.go +++ b/test/task_test.go @@ -1,10 +1,11 @@ package test import ( - nexus "github.com/jaracil/nxcli/nxcore" "sync" "testing" "time" + + nexus "github.com/nayarsystems/nxgo/nxcore" ) func TestTaskTimeout(t *testing.T) { diff --git a/test/test.sh b/test/test.sh index 19b713e..3dab6e7 100755 --- a/test/test.sh +++ b/test/test.sh @@ -52,4 +52,4 @@ x docker stop $CONTAINER_ID x docker rm $CONTAINER_ID # Exit with build error status -exit $EXIT_STATUS \ No newline at end of file +exit $EXIT_STATUS diff --git a/test/topic_test.go b/test/topic_test.go index 0841790..785831c 100644 --- a/test/topic_test.go +++ b/test/topic_test.go @@ -1,10 +1,11 @@ package test import ( - "github.com/jaracil/ei" - nexus "github.com/jaracil/nxcli/nxcore" "testing" "time" + + "github.com/jaracil/ei" + nexus "github.com/nayarsystems/nxgo/nxcore" ) func TestTopicBadPipe(t *testing.T) { diff --git a/test/users_test.go b/test/users_test.go index b7fc160..a1d39e7 100644 --- a/test/users_test.go +++ b/test/users_test.go @@ -1,9 +1,10 @@ package test import ( - nexus "github.com/jaracil/nxcli/nxcore" "testing" "time" + + nexus "github.com/nayarsystems/nxgo/nxcore" ) func TestUserCreateFail(t *testing.T) { @@ -81,7 +82,7 @@ func TestUserTags(t *testing.T) { if err != nil { t.Errorf("user.login: %s", err.Error()) } - + _, err = RootSes.UserSetTags(UserA, Prefix1, map[string]interface{}{ "test": 1, "prueba": []string{"vaya", "vaya"}, @@ -97,9 +98,9 @@ func TestUserTags(t *testing.T) { if err != nil { t.Errorf("session.reload: %s", err.Error()) } - + time.Sleep(time.Second) - + _, _, err = sesA.ExecNoWait("task.push", map[string]interface{}{ "method": Prefix1 + ".method", "params": "hello", @@ -134,9 +135,9 @@ func TestUserTags(t *testing.T) { t.Errorf("user.delTags: %s", err.Error()) } - _, err = sesA.Exec("sys.reload", nil) + _, err = sesA.Exec("sys.session.reload", map[string]interface{}{"connid": sesA.Id()}) if err != nil { - t.Errorf("sys.reload: %s", err.Error()) + t.Errorf("sys.session.reload: %s", err.Error()) } _, _, err = sesA.ExecNoWait("task.push", map[string]interface{}{ diff --git a/topics.go b/topics.go index d1062e2..8b0d16a 100644 --- a/topics.go +++ b/topics.go @@ -20,7 +20,7 @@ func topicList(s string) (res []interface{}) { return } -func (nc *NexusConn) topicPublish(topic string, message interface{}) (int, error) { +func topicPublish(topic string, message interface{}) (int, error) { msg := ei.M{"topic": topic, "msg": message} res, err := r.Table("pipes"). GetAllByIndex("subs", topicList(topic)...). @@ -116,7 +116,7 @@ func (nc *NexusConn) handleTopicReq(req *JsonRpcReq) { return } - sent, err := nc.topicPublish(topic, msg) + sent, err := topicPublish(topic, msg) if err != nil { req.Error(ErrInternal, "", nil) return diff --git a/users.go b/users.go index d982aea..f67a542 100644 --- a/users.go +++ b/users.go @@ -1,6 +1,8 @@ package main import ( + "strings" + r "github.com/dancannon/gorethink" "github.com/jaracil/ei" ) @@ -10,11 +12,18 @@ type UserData struct { Pass string `gorethink:"pass,omitempty"` Salt string `gorethink:"salt,omitempty"` Tags map[string]map[string]interface{} `gorethink:"tags,omitempty"` - Mask map[string]map[string]interface{} `gorethink:"mask,omitempty"` Templates []string `gorethink:"templates,omitempty"` + + // Limits + Mask map[string]map[string]interface{} `gorethink:"mask,omitempty"` + MaxSessions int `gorethink:"maxsessions,omitempty"` + Whitelist []string `gorethink:"whitelist,omitempty"` + Blacklist []string `gorethink:"blacklist,omitempty"` } -var Nobody *UserData = &UserData{User: "nobody", Tags: map[string]map[string]interface{}{}} +var Nobody *UserData = &UserData{User: "nobody", Tags: map[string]map[string]interface{}{}, MaxSessions: 100000} + +const DEFAULT_MAX_SESSIONS = 50 func (nc *NexusConn) handleUserReq(req *JsonRpcReq) { switch req.Method { @@ -34,7 +43,7 @@ func (nc *NexusConn) handleUserReq(req *JsonRpcReq) { req.Error(ErrPermissionDenied, "", nil) return } - ud := UserData{User: user, Salt: safeId(16), Tags: map[string]map[string]interface{}{}, Templates: []string{}} + ud := UserData{User: user, Salt: safeId(16), Tags: map[string]map[string]interface{}{}, Templates: []string{}, MaxSessions: DEFAULT_MAX_SESSIONS} ud.Pass, err = HashPass(pass, ud.Salt) if err != nil { req.Error(ErrInternal, "", nil) @@ -49,7 +58,13 @@ func (nc *NexusConn) handleUserReq(req *JsonRpcReq) { } return } + hook("user", user, nc.user.User, ei.M{ + "action": "create", + "user": user, + "pass": pass, + }) req.Result(map[string]interface{}{"ok": true}) + case "user.delete": user, err := ei.N(req.Params).M("user").Lower().String() if err != nil { @@ -67,6 +82,10 @@ func (nc *NexusConn) handleUserReq(req *JsonRpcReq) { return } if res.Deleted > 0 { + hook("user", user, nc.user.User, ei.M{ + "action": "delete", + "user": user, + }) req.Result(map[string]interface{}{"ok": true}) } else { req.Error(ErrInvalidUser, "", nil) @@ -93,7 +112,7 @@ func (nc *NexusConn) handleUserReq(req *JsonRpcReq) { req.Error(ErrPermissionDenied, "", nil) return } - res, err := r.Table("users").Get(user).Update(map[string]interface{}{"tags": map[string]interface{}{prefix: tgs}}).RunWrite(db, r.RunOpts{Durability: "hard"}) + res, err := r.Table("users").Get(user).Update(map[string]interface{}{"tags": map[string]interface{}{prefix: tgs}}, r.UpdateOpts{ReturnChanges: true}).RunWrite(db, r.RunOpts{Durability: "hard"}) if err != nil { req.Error(ErrInternal, "", nil) return @@ -102,6 +121,13 @@ func (nc *NexusConn) handleUserReq(req *JsonRpcReq) { req.Error(ErrInvalidUser, "", nil) return } + hook("user", user, nc.user.User, ei.M{ + "action": "setTags", + "user": user, + "prefix": prefix, + "addTags": tgs, + "tags": ei.N(res.Changes[0].NewValue).M("tags").MapStrZ(), + }) req.Result(map[string]interface{}{"ok": true}) case "user.delTags": user, err := ei.N(req.Params).M("user").Lower().String() @@ -124,7 +150,7 @@ func (nc *NexusConn) handleUserReq(req *JsonRpcReq) { req.Error(ErrPermissionDenied, "", nil) return } - res, err := r.Table("users").Get(user).Update(map[string]interface{}{"tags": map[string]interface{}{prefix: r.Literal(r.Row.Field("tags").Field(prefix).Without(tgs))}}).RunWrite(db, r.RunOpts{Durability: "hard"}) + res, err := r.Table("users").Get(user).Update(map[string]interface{}{"tags": map[string]interface{}{prefix: r.Literal(r.Row.Field("tags").Field(prefix).Without(tgs))}}, r.UpdateOpts{ReturnChanges: true}).RunWrite(db, r.RunOpts{Durability: "hard"}) if err != nil { req.Error(ErrInternal, "", nil) return @@ -133,7 +159,15 @@ func (nc *NexusConn) handleUserReq(req *JsonRpcReq) { req.Error(ErrInvalidUser, "", nil) return } + hook("user", user, nc.user.User, ei.M{ + "action": "delTags", + "user": user, + "prefix": prefix, + "delTags": tgs, + "tags": ei.N(res.Changes[0].NewValue).M("tags").MapStrZ(), + }) req.Result(map[string]interface{}{"ok": true}) + case "user.setPass": user, err := ei.N(req.Params).M("user").Lower().String() if err != nil { @@ -165,7 +199,13 @@ func (nc *NexusConn) handleUserReq(req *JsonRpcReq) { req.Error(ErrInvalidUser, "", nil) return } + hook("user", user, nc.user.User, ei.M{ + "action": "setPass", + "user": user, + "pass": pass, + }) req.Result(map[string]interface{}{"ok": true}) + case "user.list": prefix := ei.N(req.Params).M("prefix").Lower().StringZ() limit, err := ei.N(req.Params).M("limit").Int() @@ -183,18 +223,25 @@ func (nc *NexusConn) handleUserReq(req *JsonRpcReq) { } term := r.Table("users"). Between(prefix, prefix+"\uffff"). - Pluck("id", "tags") + Pluck("id", "tags", "templates", "whitelist", "blacklist", "maxsessions") if skip >= 0 { term = term.Skip(skip) } - if limit >= 0 { + if limit > 0 { term = term.Limit(limit) } cur, err := term.Map(func(row r.Term) interface{} { - return ei.M{"user": row.Field("id"), "tags": row.Field("tags").Default(ei.M{})} + return ei.M{ + "user": row.Field("id"), + "tags": row.Field("tags").Default(ei.M{}), + "templates": row.Field("templates").Default(ei.S{}), + "whitelist": row.Field("whitelist").Default(ei.S{}), + "blacklist": row.Field("blacklist").Default(ei.S{}), + "maxsessions": row.Field("maxsessions").Default(DEFAULT_MAX_SESSIONS), + } }).Run(db) if err != nil { req.Error(ErrInternal, err.Error(), nil) @@ -205,112 +252,117 @@ func (nc *NexusConn) handleUserReq(req *JsonRpcReq) { req.Result(all) case "user.addTemplate": - user, err := ei.N(req.Params).M("user").Lower().String() + param, err := ei.N(req.Params).M("template").String() if err != nil { - req.Error(ErrInvalidParams, "user", nil) + req.Error(ErrInvalidParams, "template", nil) return } + nc.userAddParam(req, param, "templates") - template, err := ei.N(req.Params).M("template").Lower().String() + case "user.delTemplate": + param, err := ei.N(req.Params).M("template").String() if err != nil { req.Error(ErrInvalidParams, "template", nil) return } + nc.userDelParam(req, param, "templates") - userTags := ei.N(nc.getTags(user)) - if !(userTags.M("@"+req.Method).BoolZ() || userTags.M("@admin").BoolZ()) { - req.Error(ErrPermissionDenied, "", nil) - return - } - - templateTags := ei.N(nc.getTags(template)) - if !(templateTags.M("@"+req.Method).BoolZ() || templateTags.M("@admin").BoolZ()) { - req.Error(ErrPermissionDenied, "", nil) - return - } - - res, err := r.Table("users").Get(user).Update(map[string]interface{}{ - "templates": r.Row.Field("templates").Default([]string{}).SetInsert(template), - }).RunWrite(db, r.RunOpts{Durability: "hard"}) + case "user.addWhitelist": + param, err := ei.N(req.Params).M("ip").String() if err != nil { - req.Error(ErrInternal, "", nil) + req.Error(ErrInvalidParams, "ip", nil) return } - if res.Unchanged == 0 && res.Replaced == 0 { - req.Error(ErrInvalidUser, "", nil) - return - } - req.Result(map[string]interface{}{"ok": true}) + nc.userAddParam(req, param, "whitelist") - case "user.delTemplate": - user, err := ei.N(req.Params).M("user").Lower().String() + case "user.delWhitelist": + param, err := ei.N(req.Params).M("ip").String() if err != nil { - req.Error(ErrInvalidParams, "user", nil) + req.Error(ErrInvalidParams, "ip", nil) return } + nc.userDelParam(req, param, "whitelist") - template, err := ei.N(req.Params).M("template").Lower().String() + case "user.addBlacklist": + param, err := ei.N(req.Params).M("ip").String() if err != nil { - req.Error(ErrInvalidParams, "template", nil) - return - } - - userTags := ei.N(nc.getTags(user)) - if !(userTags.M("@"+req.Method).BoolZ() || userTags.M("@admin").BoolZ()) { - req.Error(ErrPermissionDenied, "", nil) + req.Error(ErrInvalidParams, "ip", nil) return } + nc.userAddParam(req, param, "blacklist") - templateTags := ei.N(nc.getTags(template)) - if !(templateTags.M("@"+req.Method).BoolZ() || templateTags.M("@admin").BoolZ()) { - req.Error(ErrPermissionDenied, "", nil) + case "user.delBlacklist": + param, err := ei.N(req.Params).M("ip").String() + if err != nil { + req.Error(ErrInvalidParams, "ip", nil) return } + nc.userDelParam(req, param, "blacklist") - res, err := r.Table("users").Get(user).Update(map[string]interface{}{ - "templates": r.Row.Field("templates").Default(ei.S{}).SetDifference([]string{template}), - }).RunWrite(db, r.RunOpts{Durability: "hard"}) + case "user.setMaxSessions": + param, err := ei.N(req.Params).M("maxsessions").Int() if err != nil { - req.Error(ErrInternal, "", nil) + req.Error(ErrInvalidParams, "maxsessions", nil) return } - if res.Unchanged == 0 && res.Replaced == 0 { - req.Error(ErrInvalidUser, "", nil) - return - } - req.Result(map[string]interface{}{"ok": true}) + nc.userSetParam(req, param, "maxsessions") - case "user.listTemplate": - user, err := ei.N(req.Params).M("user").Lower().String() + default: + req.Error(ErrMethodNotFound, "", nil) + } +} - userTags := ei.N(nc.getTags(user)) - if !(userTags.M("@"+req.Method).BoolZ() || userTags.M("@admin").BoolZ()) { - req.Error(ErrPermissionDenied, "", nil) - return - } +func (nc *NexusConn) userAddParam(req *JsonRpcReq, param interface{}, field string) { + nc.userChangeParam(req, param, field, "add") +} - type udt struct { - Templates []string `gorethink:"templates"` - } +func (nc *NexusConn) userDelParam(req *JsonRpcReq, param interface{}, field string) { + nc.userChangeParam(req, param, field, "del") +} - res, err := r.Table("users").Get(user).Pluck("templates").Run(db) - if err != nil { - req.Error(ErrInvalidUser, "", nil) - return - } +func (nc *NexusConn) userSetParam(req *JsonRpcReq, param interface{}, field string) { + nc.userChangeParam(req, param, field, "set") +} - ret := udt{Templates: []string{}} - if err := res.One(&ret); err != nil && err != r.ErrEmptyResult { - req.Error(ErrInternal, "", nil) - return - } +func (nc *NexusConn) userChangeParam(req *JsonRpcReq, param interface{}, field, action string) { + user, err := ei.N(req.Params).M("user").Lower().String() + if err != nil { + req.Error(ErrInvalidParams, "user", nil) + return + } - if len(ret.Templates) == 0 { - ret.Templates = []string{} - } - req.Result(ret.Templates) + userTags := ei.N(nc.getTags(user)) + if !(userTags.M("@"+req.Method).BoolZ() || userTags.M("@admin").BoolZ()) { + req.Error(ErrPermissionDenied, "", nil) + return + } - default: - req.Error(ErrMethodNotFound, "", nil) + term := r.Table("users").Get(user) + switch action { + case "add": + term = term.Update(map[string]interface{}{ + field: r.Row.Field(field).Default(ei.S{}).SetInsert(param), + }, r.UpdateOpts{ReturnChanges: true}) + case "del": + term = term.Update(map[string]interface{}{ + field: r.Row.Field(field).Default(ei.S{}).SetDifference([]interface{}{param}), + }, r.UpdateOpts{ReturnChanges: true}) + case "set": + term = term.Update(map[string]interface{}{field: param}, r.UpdateOpts{ReturnChanges: true}) + } + res, err := term.RunWrite(db, r.RunOpts{Durability: "hard"}) + if err != nil { + req.Error(ErrInternal, "", nil) + return + } + if res.Unchanged == 0 && res.Replaced == 0 { + req.Error(ErrInvalidUser, "", nil) + return } + hook("user", user, nc.user.User, ei.M{ + "action": strings.TrimPrefix(req.Method, "user."), + action: param, + field: ei.N(res.Changes[0].NewValue).M(field), + }) + req.Result(map[string]interface{}{"ok": true}) } diff --git a/utils.go b/utils.go index e0bffbf..e7ddc75 100644 --- a/utils.go +++ b/utils.go @@ -3,7 +3,10 @@ package main import ( "crypto/rand" "encoding/hex" + "encoding/json" "errors" + "fmt" + "reflect" "strings" "golang.org/x/crypto/scrypt" @@ -72,3 +75,47 @@ func HashPass(pass, salt string) (string, error) { } return hex.EncodeToString(bdk), nil } + +func truncateJson(j interface{}) interface{} { + switch t := j.(type) { + //Number + case float64: + return j + + // Null + case nil: + return j + + // Bool + case bool: + return j + + // String + case string: + maxlen := 1024 * 10 + + if len(t) > maxlen { + return t[:maxlen] + "..." + } + return t + + // Object + case map[string]interface{}: + a := make(map[string]interface{}) + for k, v := range t { + a[k] = truncateJson(v) + } + return a + + // Array? + default: + slice := make([]interface{}, 0) + if b, e := json.Marshal(j); e == nil && json.Unmarshal(b, &slice) == nil { + for k, v := range slice { + slice[k] = truncateJson(v) + } + return slice + } + } + return fmt.Sprintf("Unknown JSON type: %s", reflect.TypeOf(j)) +} diff --git a/version.go b/version.go new file mode 100644 index 0000000..43b96ef --- /dev/null +++ b/version.go @@ -0,0 +1,21 @@ +package main + +import ( + "fmt" +) + +var Version = &NxVersion{ + Major: 1, + Minor: 0, + Patch: 0, +} + +type NxVersion struct { + Major int + Minor int + Patch int +} + +func (v *NxVersion) String() string { + return fmt.Sprintf("%d.%d.%d", v.Major, v.Minor, v.Patch) +}