Skip to content

Commit

Permalink
merged
Browse files Browse the repository at this point in the history
  • Loading branch information
wantonsolutions committed Aug 2, 2017
2 parents 9e961b2 + 7be3514 commit 358a3fa
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 28 deletions.
177 changes: 149 additions & 28 deletions dinvRT/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,20 @@ import (

"bitbucket.org/bestchai/dinv/logmerger"
"github.com/arcaneiceman/GoVector/govec"
ls "github.com/wantonsolutions/dara/servers/logserver"
"sort"
"sync"

"net/rpc"
l "log"
)


//enviornment variables
const (
HOSTNAME = "DINV_HOSTNAME" //name of the machine running here
LOGSTORE = "DINV_LOG_STORE" //remote location of log store (ip:port)
PROJECT = "DINV_PROJECT" //project identifier (change if source code changes)
)

var (
Expand All @@ -25,6 +37,7 @@ var (
packageName string // TODO packageName is not used -- can it be removed?
Encoder *json.Encoder // global name value pair point encoder

remotelogging = true
useKV = true
resetKV = true // determines if the KV is emptied after the values were written to the log
varStore map[string]logmerger.NameValuePair // used to store variable name/value pairs between multiple dumps
Expand All @@ -33,6 +46,11 @@ var (
genKVID func([]logmerger.NameValuePair) string

initMutex *sync.Mutex = &sync.Mutex{}

//remote logging data
rid ls.LogId // discriptive log for communicating with a logging server
logStoreLocation string //ip port of log store //specifed as an enviornment var "
rpcClient *rpc.Client
)

//Dump logs the values of variables passed in as a set of varadic
Expand Down Expand Up @@ -178,6 +196,9 @@ func concatStrings(a []string) string {
return id
}




// called from (un)pack functions, so before every network request
// if kv is enabled, all entries in varStore will be logged and the map will be emptied, if resetKV == true
func logVarStore() {
Expand Down Expand Up @@ -207,20 +228,26 @@ func logVarStore() {
//information
func Pack(msg interface{}) []byte {
initDinv("")
logVarStore()
var loggedMsg string
if fast {
return goVecLogger.PrepareSend("Sending from "+id, msg)
} else {
return goVecLogger.PrepareSend("Sending from "+getCallingFunctionID()+" "+id, msg) // slow
}
loggedMsg = "Sending from "+id
} else {
loggedMsg = "Sending from "+getCallingFunctionID()+" "+id
}
buf:= goVecLogger.PrepareSend(loggedMsg, msg)
//log after updating vector clock
go log(msg,ls.SEND,loggedMsg)
return buf
}

//PackM operates identically to Pack, but allows for custom messages
//to be logged
func PackM(msg interface{}, log string) []byte {
func PackM(msg interface{}, info string) []byte {
initDinv("")
logVarStore()
return goVecLogger.PrepareSend(log, msg)
buf:= goVecLogger.PrepareSend(info,msg)
//log after updating vector clock
go log(msg,ls.SEND,info)
return buf
}

//Unpack removes logging information from an array of bytes. The bytes
Expand All @@ -229,26 +256,29 @@ func PackM(msg interface{}, log string) []byte {
//Precondition, the array of bytes was packed before sending
func Unpack(msg []byte, pack interface{}) {
initDinv("")
logVarStore()
var loggedMsg string
if fast {
goVecLogger.UnpackReceive("Received on "+id, msg, pack)
} else {
goVecLogger.UnpackReceive("Received on "+getCallingFunctionID()+" "+id, msg, pack) //Slow
}
loggedMsg = "Received on "+id
} else {
loggedMsg = "Received on "+getCallingFunctionID()+" "+id
}
goVecLogger.UnpackReceive(loggedMsg, msg, pack)
go log(pack,ls.REC,loggedMsg)
return
}

func UnpackM(msg []byte, pack interface{}, log string) {
func UnpackM(msg []byte, pack interface{}, info string) {
initDinv("")
logVarStore()
goVecLogger.UnpackReceive(log, msg, pack)
goVecLogger.UnpackReceive(info, msg, pack)
go log(pack,ls.REC,info)

return
}


func Local(msg string) {
logVarStore()
goVecLogger.LogLocalEvent(msg)
go log(nil,ls.LOCAL,msg) //logVarStore()
}

//Initalize is an optional call for naming hosts uniquely based on a
Expand Down Expand Up @@ -306,23 +336,52 @@ func initDinv(hostName string) {
initMutex.Lock()
defer initMutex.Unlock()
if !initialized {

//get host name
if hostName != "" {
id = hostName
} else if os.Getenv("DINV_HOSTNAME") != "" {
id = os.Getenv("DINV_HOSTNAME")
} else if os.Getenv(HOSTNAME) != "" {
id = os.Getenv(HOSTNAME)
} else {
id = fmt.Sprintf("%d", time.Now().Nanosecond())
}
goVecLogger = govec.InitGoVector(id, id+".log")

encodedLogname := fmt.Sprintf("%sEncoded.txt", id)

logFile, err := os.Create(encodedLogname)
if err != nil {
panic(fmt.Errorf("%s:dinvRT/api.go: Error creating log file '%s': %s", id, encodedLogname, err.Error()))
}
Encoder = json.NewEncoder(logFile)
//Log everything locally to a file
if !remotelogging {
goVecLogger = govec.InitGoVector(id, id+".log")
encodedLogname := fmt.Sprintf("%sEncoded.txt", id)

logFile, err := os.Create(encodedLogname)
if err != nil {
panic(fmt.Errorf("%s:dinvRT/api.go: Error creating log file '%s': %s", id, encodedLogname, err.Error()))
}
Encoder = json.NewEncoder(logFile)
} else {
//set up remote logging with a dinv server
//TODO there are many failure conditions here, try to recover or give good messages
// or set up a name server
//TODO also find a way to not log with GoVec
goVecLogger = govec.InitGoVector(id, id+".log")
rid.Node = id
if os.Getenv(LOGSTORE) != "" {
logStoreLocation = os.Getenv(LOGSTORE)
} else {
l.Fatal("If set to remote logging then a log store location must be specified")
}
if os.Getenv(PROJECT) != "" {
rid.Project = os.Getenv(PROJECT)
} else {
l.Fatal("If set to remote then a project must be specified")
}
//setup RPC client
var err error
rpcClient, err = rpc.DialHTTP("tcp", logStoreLocation)
if err != nil {
l.Fatal(err)
}
}
}

/*TODO in the future only use the kvStore*/
if useKV && varStore == nil {
varStore = make(map[string]logmerger.NameValuePair)
varStoreMx = &sync.Mutex{}
Expand Down Expand Up @@ -407,8 +466,70 @@ func CreatePoint(vars []interface{}, varNames []string, id string, logger *govec
return point
}


type ByName []logmerger.NameValuePair

func (a ByName) Len() int { return len(a) }
func (a ByName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByName) Less(i, j int) bool { return a[i].VarName < a[j].VarName }


/***************************************************************************/
/* New logging format for remote logging (Done at Inria (merge with the new govector at some point (clement fab)
/**************************************************************************/


//log is a generalized function for logging message events, their type, and vector clock information. TODO when this works well integrate it everywhere.
func log(msg interface{}, eventType int, info string) {
//Assume that KV is being used
//TODO make sure that each invoked log function completes
varStoreMx.Lock()
defer varStoreMx.Unlock()
state := make([]logmerger.NameValuePair, len(varStore))
itt := 0
for i := range varStore {
state[itt] = varStore[i]
itt++
}
//TODO figure out a better way to do this than sorting is it even nessisary with JSON?
sort.Sort(ByName(state))
sclock := goVecLogger.GetCurrentVC()
sstate, err := json.Marshal(state)
if err != nil {
l.Fatal(err)
}
sevent := msgState(msg)
//turn into NV pair list
log := ls.SElog{Type: eventType, Message: []byte(info), VC: sclock, State: sstate, Event: sevent}
if err != nil {
l.Fatal(err)
}
request := ls.PostReq{Id: rid, Log: log}
resp := ls.PostReply{}
rpcClient.Call("LogStore.Log",request,&resp)
l.Println(resp)
//TODO Handel errors
if resp.Id.Session == "" {
l.Fatal()
}
//update SessionID
rid = resp.Id
fmt.Println("Made it back from RPC!!!")

}

func msgState(msg interface{}) []byte {
var e map[string]*json.RawMessage
buf, _ := json.Marshal(msg)
json.Unmarshal(buf, &e)
state := make([]logmerger.NameValuePair,len(e))

var i int
for k, v := range e {
state[i].VarName = k
state[i].Value = v
}
buf, _ = json.Marshal(state)
return buf

}
6 changes: 6 additions & 0 deletions logmerger/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,12 @@ func (nvp NameValuePair) value() string {
}
}

//returns the value of the Name value pair as a string
//TODO catch and print all possible reflected types
func (nvp NameValuePair) ValueG() string {
return nvp.value()
}

//Point is a representation of a program point. Name value pair is the
//variable values at that program point. LineNumber is the line the
//variables were gathered on. VectorClock is byte valued vector clock
Expand Down

0 comments on commit 358a3fa

Please sign in to comment.