diff --git a/dinvRT/api.go b/dinvRT/api.go index 9037ed8..61c9425 100755 --- a/dinvRT/api.go +++ b/dinvRT/api.go @@ -13,8 +13,20 @@ import ( "bitbucket.org/bestchai/dinv/logmerger" "github.com/arcaneiceman/GoVector/govec" + ls "github.com/wantonsolutions/dara/servers/logserver" "sort" "sync" + + "net/rpc" + l "log" +) + + +//enviornment variables +const ( + HOSTNAME = "DINV_HOSTNAME" //name of the machine running here + LOGSTORE = "DINV_LOG_STORE" //remote location of log store (ip:port) + PROJECT = "DINV_PROJECT" //project identifier (change if source code changes) ) var ( @@ -25,6 +37,7 @@ var ( packageName string // TODO packageName is not used -- can it be removed? Encoder *json.Encoder // global name value pair point encoder + remotelogging = true useKV = true resetKV = true // determines if the KV is emptied after the values were written to the log varStore map[string]logmerger.NameValuePair // used to store variable name/value pairs between multiple dumps @@ -33,6 +46,11 @@ var ( genKVID func([]logmerger.NameValuePair) string initMutex *sync.Mutex = &sync.Mutex{} + + //remote logging data + rid ls.LogId // discriptive log for communicating with a logging server + logStoreLocation string //ip port of log store //specifed as an enviornment var " + rpcClient *rpc.Client ) //Dump logs the values of variables passed in as a set of varadic @@ -178,6 +196,9 @@ func concatStrings(a []string) string { return id } + + + // called from (un)pack functions, so before every network request // if kv is enabled, all entries in varStore will be logged and the map will be emptied, if resetKV == true func logVarStore() { @@ -207,20 +228,26 @@ func logVarStore() { //information func Pack(msg interface{}) []byte { initDinv("") - logVarStore() + var loggedMsg string if fast { - return goVecLogger.PrepareSend("Sending from "+id, msg) - } else { - return goVecLogger.PrepareSend("Sending from "+getCallingFunctionID()+" "+id, msg) // slow - } + loggedMsg = "Sending from "+id + } else { + loggedMsg = "Sending from "+getCallingFunctionID()+" "+id + } + buf:= goVecLogger.PrepareSend(loggedMsg, msg) + //log after updating vector clock + go log(msg,ls.SEND,loggedMsg) + return buf } //PackM operates identically to Pack, but allows for custom messages //to be logged -func PackM(msg interface{}, log string) []byte { +func PackM(msg interface{}, info string) []byte { initDinv("") - logVarStore() - return goVecLogger.PrepareSend(log, msg) + buf:= goVecLogger.PrepareSend(info,msg) + //log after updating vector clock + go log(msg,ls.SEND,info) + return buf } //Unpack removes logging information from an array of bytes. The bytes @@ -229,26 +256,29 @@ func PackM(msg interface{}, log string) []byte { //Precondition, the array of bytes was packed before sending func Unpack(msg []byte, pack interface{}) { initDinv("") - logVarStore() + var loggedMsg string if fast { - goVecLogger.UnpackReceive("Received on "+id, msg, pack) - } else { - goVecLogger.UnpackReceive("Received on "+getCallingFunctionID()+" "+id, msg, pack) //Slow - } + loggedMsg = "Received on "+id + } else { + loggedMsg = "Received on "+getCallingFunctionID()+" "+id + } + goVecLogger.UnpackReceive(loggedMsg, msg, pack) + go log(pack,ls.REC,loggedMsg) return } -func UnpackM(msg []byte, pack interface{}, log string) { +func UnpackM(msg []byte, pack interface{}, info string) { initDinv("") - logVarStore() - goVecLogger.UnpackReceive(log, msg, pack) + goVecLogger.UnpackReceive(info, msg, pack) + go log(pack,ls.REC,info) + return } func Local(msg string) { - logVarStore() goVecLogger.LogLocalEvent(msg) + go log(nil,ls.LOCAL,msg) //logVarStore() } //Initalize is an optional call for naming hosts uniquely based on a @@ -306,23 +336,52 @@ func initDinv(hostName string) { initMutex.Lock() defer initMutex.Unlock() if !initialized { + + //get host name if hostName != "" { id = hostName - } else if os.Getenv("DINV_HOSTNAME") != "" { - id = os.Getenv("DINV_HOSTNAME") + } else if os.Getenv(HOSTNAME) != "" { + id = os.Getenv(HOSTNAME) } else { id = fmt.Sprintf("%d", time.Now().Nanosecond()) } - goVecLogger = govec.InitGoVector(id, id+".log") - - encodedLogname := fmt.Sprintf("%sEncoded.txt", id) - - logFile, err := os.Create(encodedLogname) - if err != nil { - panic(fmt.Errorf("%s:dinvRT/api.go: Error creating log file '%s': %s", id, encodedLogname, err.Error())) - } - Encoder = json.NewEncoder(logFile) + //Log everything locally to a file + if !remotelogging { + goVecLogger = govec.InitGoVector(id, id+".log") + encodedLogname := fmt.Sprintf("%sEncoded.txt", id) + + logFile, err := os.Create(encodedLogname) + if err != nil { + panic(fmt.Errorf("%s:dinvRT/api.go: Error creating log file '%s': %s", id, encodedLogname, err.Error())) + } + Encoder = json.NewEncoder(logFile) + } else { + //set up remote logging with a dinv server + //TODO there are many failure conditions here, try to recover or give good messages + // or set up a name server + //TODO also find a way to not log with GoVec + goVecLogger = govec.InitGoVector(id, id+".log") + rid.Node = id + if os.Getenv(LOGSTORE) != "" { + logStoreLocation = os.Getenv(LOGSTORE) + } else { + l.Fatal("If set to remote logging then a log store location must be specified") + } + if os.Getenv(PROJECT) != "" { + rid.Project = os.Getenv(PROJECT) + } else { + l.Fatal("If set to remote then a project must be specified") + } + //setup RPC client + var err error + rpcClient, err = rpc.DialHTTP("tcp", logStoreLocation) + if err != nil { + l.Fatal(err) + } + } } + + /*TODO in the future only use the kvStore*/ if useKV && varStore == nil { varStore = make(map[string]logmerger.NameValuePair) varStoreMx = &sync.Mutex{} @@ -407,8 +466,70 @@ func CreatePoint(vars []interface{}, varNames []string, id string, logger *govec return point } + type ByName []logmerger.NameValuePair func (a ByName) Len() int { return len(a) } func (a ByName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a ByName) Less(i, j int) bool { return a[i].VarName < a[j].VarName } + + +/***************************************************************************/ +/* New logging format for remote logging (Done at Inria (merge with the new govector at some point (clement fab) +/**************************************************************************/ + + +//log is a generalized function for logging message events, their type, and vector clock information. TODO when this works well integrate it everywhere. +func log(msg interface{}, eventType int, info string) { + //Assume that KV is being used + //TODO make sure that each invoked log function completes + varStoreMx.Lock() + defer varStoreMx.Unlock() + state := make([]logmerger.NameValuePair, len(varStore)) + itt := 0 + for i := range varStore { + state[itt] = varStore[i] + itt++ + } + //TODO figure out a better way to do this than sorting is it even nessisary with JSON? + sort.Sort(ByName(state)) + sclock := goVecLogger.GetCurrentVC() + sstate, err := json.Marshal(state) + if err != nil { + l.Fatal(err) + } + sevent := msgState(msg) + //turn into NV pair list + log := ls.SElog{Type: eventType, Message: []byte(info), VC: sclock, State: sstate, Event: sevent} + if err != nil { + l.Fatal(err) + } + request := ls.PostReq{Id: rid, Log: log} + resp := ls.PostReply{} + rpcClient.Call("LogStore.Log",request,&resp) + l.Println(resp) + //TODO Handel errors + if resp.Id.Session == "" { + l.Fatal() + } + //update SessionID + rid = resp.Id + fmt.Println("Made it back from RPC!!!") + +} + +func msgState(msg interface{}) []byte { + var e map[string]*json.RawMessage + buf, _ := json.Marshal(msg) + json.Unmarshal(buf, &e) + state := make([]logmerger.NameValuePair,len(e)) + + var i int + for k, v := range e { + state[i].VarName = k + state[i].Value = v + } + buf, _ = json.Marshal(state) + return buf + +} diff --git a/logmerger/logs.go b/logmerger/logs.go index 11ea2f8..9543ce3 100755 --- a/logmerger/logs.go +++ b/logmerger/logs.go @@ -365,6 +365,12 @@ func (nvp NameValuePair) value() string { } } +//returns the value of the Name value pair as a string +//TODO catch and print all possible reflected types +func (nvp NameValuePair) ValueG() string { + return nvp.value() +} + //Point is a representation of a program point. Name value pair is the //variable values at that program point. LineNumber is the line the //variables were gathered on. VectorClock is byte valued vector clock