Skip to content

Commit

Permalink
Merged in vaastav_anand/dinv/add_catpure_lib (pull request #1)
Browse files Browse the repository at this point in the history
Move capture library from GoVector to DInv repo
  • Loading branch information
bestchai committed Jun 1, 2018
2 parents 4f68cf4 + a2f2f3a commit c7def08
Show file tree
Hide file tree
Showing 16 changed files with 799 additions and 13 deletions.
54 changes: 54 additions & 0 deletions capture/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#Capture networking calls for automatic instrumentation!

GoVector's capture library automatically injects vector clocks onto network reads and writes.
The capture functionality works on the standard [go net libary](https://golang.org/pkg/net/).

#Installing

To install GoVector's capture tool run
```
go install
```
In GoVector's main directory

#Dependencies

GoVector depends on [Dinv](https://bitbucket.org/bestchai/dinv) for static analysis.

#How it works

GoVector capture takes either a go file or directory containing a go package as a command line argument.
The package or file is then translated into its [AST](https://golang.org/pkg/go/ast/) representation.
GoVector traverses the AST searching for read and write calls to go's networking library. When matching calls are identified Capture
performs an AST rotation and injects wrapper code for the networking call. The wrapper code appends and truncates vector clocks from
network payloads. The set of library wrapping function are in [capture.go](https://github.com/DistributedClocks/GoVector/blob/master/capture/capture.go)
The result of running Capture is an augmented file, or directory. Case specific auto instrumentation's of both Go's RPC, and HTTP are
also implemented by Capture. Below is an example of a captured write.

###Before
```n, err := conn.Write(buf)``

###After
```n, err := capture.Write(conn.Write,buf)``

#Running


To run Capture on a single file run
`GoVector -f=filename.go`

To run Capture on a directory containing a single package run
`GoVector -dir=directory`

#Limitations

##Scale
Building an AST in go requires that all referenced files are read in while building the AST. Capture will fail if a single go file
with references to another local file is passed in. In this case the -dir option is suggests. The AST loading library only works on a
single package at a time. Therefore Capture cannot instrument more than one package at a time.

##Aliasing
Only calls to Go's standard networking library are completely supported. The net.Conn type implements io.ReadWriter, therefore it
can be passed to an encoder or decoder directly. Capture makes no attempt to reason about which encoders or decoders could be
attached to network connections. Encoder wrapped connections will not be instrumented.

281 changes: 281 additions & 0 deletions capture/capture.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
package capture

import (
"bitbucket.org/bestchai/dinv/dinvRT"
"bufio"

"encoding/gob"
"fmt"
"io"
"net"
"net/rpc"
)

/*
Functions for appending vector clocks to the standard net package
*/
func Read(read func([]byte) (int, error), b []byte) (int, error) {
buf := make([]byte, len(b))
n, err := read(buf)
dinvRT.Unpack(buf, &b)
return n, err
}

func ReadFrom(readFrom func([]byte) (int, net.Addr, error), b []byte) (int, net.Addr, error) {
buf := make([]byte, len(b))
n, addr, err := readFrom(buf)
dinvRT.Unpack(buf, &b)
return n, addr, err
}

func Write(write func(b []byte) (int, error), b []byte) (int, error) {
buf := dinvRT.Pack(b)
n, err := write(buf)
return n, err
}

func WriteTo(writeTo func([]byte, net.Addr) (int, error), b []byte, addr net.Addr) (int, error) {
buf := dinvRT.Pack(b)
n, err := writeTo(buf, addr)
return n, err
}

//Protocol specific funcions
func ReadFromIP(readfromip func([]byte) (int, *net.IPAddr, error), b []byte) (int, *net.IPAddr, error) {
buf := make([]byte, len(b))
n, addr, err := readfromip(buf)
dinvRT.Unpack(buf, &b)
return n, addr, err
}

func ReadMsgIP(readmsgip func([]byte, []byte) (int, int, int, *net.IPAddr, error), b, oob []byte) (int, int, int, *net.IPAddr, error) {
buf := make([]byte, len(b))
n, oobn, flags, addr, err := readmsgip(buf, oob)
dinvRT.Unpack(buf, &b)
return n, oobn, flags, addr, err
}

func WriteMsgIP(writemsgip func([]byte, []byte, *net.IPAddr) (int, int, error), b, oob []byte, addr *net.IPAddr) (int, int, error) {
buf := dinvRT.Pack(b)
n, oobn, err := writemsgip(buf, oob, addr)
return n, oobn, err
}

func WriteToIP(writetoip func([]byte, *net.IPAddr) (int, error), b []byte, addr *net.IPAddr) (int, error) {
buf := dinvRT.Pack(b)
n, err := writetoip(buf, addr)
return n, err
}

func ReadFromUDP(readfromudp func([]byte) (int, *net.UDPAddr, error), b []byte) (int, *net.UDPAddr, error) {
buf := make([]byte, len(b))
n, addr, err := readfromudp(buf)
dinvRT.Unpack(buf, &b)
return n, addr, err
}

func ReadMsgUDP(readmsgudp func([]byte, []byte) (int, int, int, *net.UDPAddr, error), b, oob []byte) (int, int, int, *net.UDPAddr, error) {
buf := make([]byte, len(b))
n, oobn, flags, addr, err := readmsgudp(b, oob)
dinvRT.Unpack(buf, &b)
return n, oobn, flags, addr, err
}

func WriteMsgUDP(writemsgudp func([]byte, []byte, *net.UDPAddr) (int, int, error), b, oob []byte, addr *net.UDPAddr) (int, int, error) {
buf := dinvRT.Pack(b)
n, oobn, err := writemsgudp(buf, oob, addr)
return n, oobn, err
}

func WriteToUDP(writetoudp func([]byte, *net.UDPAddr) (int, error), b []byte, addr *net.UDPAddr) (int, error) {
buf := dinvRT.Pack(b)
n, err := writetoudp(buf, addr)
return n, err
}

func ReadFromUnix(readfromunix func([]byte) (int, *net.UnixAddr, error), b []byte) (int, *net.UnixAddr, error) {
buf := make([]byte, len(b))
n, addr, err := readfromunix(buf)
dinvRT.Unpack(buf, &b)
return n, addr, err
}

func ReadMsgUnix(readmsgunix func([]byte, []byte) (int, int, int, *net.UnixAddr, error), b, oob []byte) (int, int, int, *net.UnixAddr, error) {
buf := make([]byte, len(b))
n, oobn, flags, addr, err := readmsgunix(b, oob)
dinvRT.Unpack(buf, &b)
return n, oobn, flags, addr, err
}

func WriteMsgUnix(writemsgunix func([]byte, []byte, *net.UnixAddr) (int, int, error), b, oob []byte, addr *net.UnixAddr) (int, int, error) {
buf := dinvRT.Pack(b)
n, oobn, err := writemsgunix(buf, oob, addr)
return n, oobn, err
}

func WriteToUnix(writetounix func([]byte, *net.UnixAddr) (int, error), b []byte, addr *net.UnixAddr) (int, error) {
buf := dinvRT.Pack(b)
n, err := writetounix(buf, addr)
return n, err
}

/* Functions to add Codecs to Client RPC calls */
func Dial(dial func(string, string) (*rpc.Client, error), network, address string) (*rpc.Client, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
return rpc.NewClientWithCodec(NewClientCodec(conn)), err
}

//TODO
func DialHTTP(dialHttp func(string, string) (*rpc.Client, error), network, address string) (*rpc.Client, error) {
return nil, fmt.Errorf("Dinv has yet to implement an rpc.DialHTTP wrapper\n")
}

//TODO
func DialHTTPPath(dialHttpPath func(string, string, string) (*rpc.Client, error), network, address, path string) (*rpc.Client, error) {
return nil, fmt.Errorf("Dinv has yet to implement an rpc.DialHTTP wrapper\n")
}

func NewClient(newClient func(io.ReadWriteCloser) *rpc.Client, conn io.ReadWriteCloser) *rpc.Client {
return rpc.NewClientWithCodec(NewClientCodec(conn))
}

//TODO
func NewClientWithCodec(newClientWithCodec func(rpc.ClientCodec) *rpc.Client, codec rpc.ClientCodec) *rpc.Client {
return nil
}

/* Functions for adding codecs to Server RPC */

//TODO
func ServeCodec(serveCodec func(rpc.ServerCodec), codec rpc.ServerCodec) {
return
}

func ServeConn(serveConn func(io.ReadWriteCloser), conn io.ReadWriteCloser) {
rpc.DefaultServer.ServeCodec(NewServerCodec(conn))
}

//TODO
func ServeRequest(serveRequest func(rpc.ServerCodec) error, codec rpc.ServerCodec) error {
return fmt.Errorf("Dinv has yet to implement an rpc.ServeRequest wrapper\n")
}

//TODO the interalfunction for the rpc.Server have yet to be
//implemented

/*
Client Codec for instrumenting RPC calls with vector clocks
*/

type ClientClockCodec struct {
C io.Closer
Dec *gob.Decoder
Enc *gob.Encoder
EncBuf *bufio.Writer
}

func NewClientCodec(conn io.ReadWriteCloser) rpc.ClientCodec {
encBuf := bufio.NewWriter(conn)
return &ClientClockCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
}

func (c *ClientClockCodec) WriteRequest(req *rpc.Request, param interface{}) (err error) {
fmt.Println("WriteRequest")
if err = c.Enc.Encode(req); err != nil {
return
}
buf := dinvRT.Pack(param)
if err = c.Enc.Encode(buf); err != nil {
return
}

return c.EncBuf.Flush()
}

func (c *ClientClockCodec) ReadResponseHeader(resp *rpc.Response) error {
fmt.Println("ReadResponseHeader")
return c.Dec.Decode(resp)
}

func (c *ClientClockCodec) ReadResponseBody(body interface{}) (err error) {
fmt.Println("ReadResponseBody")
var buf []byte
if err = c.Dec.Decode(&buf); err != nil {
return
}
dinvRT.Unpack(buf, body)
return nil
}

func (c *ClientClockCodec) Close() error {
return c.C.Close()
}

/*
Client Codec for instrumenting RPC calls with vector clocks
*/

type ServerClockCodec struct {
Rwc io.ReadWriteCloser
Dec *gob.Decoder
Enc *gob.Encoder
EncBuf *bufio.Writer
Closed bool
}

func NewServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec {
buf := bufio.NewWriter(conn)
srv := &ServerClockCodec{
Rwc: conn,
Dec: gob.NewDecoder(conn),
Enc: gob.NewEncoder(buf),
EncBuf: buf,
}
return srv
}

func (c *ServerClockCodec) ReadRequestHeader(r *rpc.Request) error {
fmt.Println("ReadRequestHeader")
return c.Dec.Decode(r)
}
func (c *ServerClockCodec) ReadRequestBody(body interface{}) (err error) {
fmt.Println("ReadResponseBody")
var buf []byte
if err = c.Dec.Decode(&buf); err != nil {
return
}
dinvRT.Unpack(buf, body)
return nil

}
func (c *ServerClockCodec) WriteResponse(r *rpc.Response, body interface{}) (err error) {
if err = c.Enc.Encode(r); err != nil {
if c.EncBuf.Flush() == nil {
//Gob Encoding Error
fmt.Println("RPC Error encoding response:", err)
c.Close()
}
return
}
buf := dinvRT.Pack(body)
if err = c.Enc.Encode(buf); err != nil {
if c.EncBuf.Flush() == nil {
//Gob Encoding Error
fmt.Println("RPC Error encoding body (This is likely Dinv's fault):", err)
c.Close()
}
return
}
return c.EncBuf.Flush()
}

func (c *ServerClockCodec) Close() error {
if c.Closed {
return nil
}
c.Closed = true
return c.Rwc.Close()
}
Loading

0 comments on commit c7def08

Please sign in to comment.