Skip to content
This repository has been archived by the owner on Mar 5, 2022. It is now read-only.

Commit

Permalink
does your proxy make TOO MUCH NOISE?!
Browse files Browse the repository at this point in the history
Clean up logging a bit, having 3 loggers (info, debug, warn) that
allow the user to control the logging detail. Now most of the noise is
hidden behind a -debug flag or a BOLT_PROXY_DEBUG environment
variable.

Uncovered some areas that need some cleanup. Joy.
  • Loading branch information
voutilad committed Dec 16, 2020
1 parent d6cd147 commit b953a88
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 91 deletions.
30 changes: 27 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ Usage of ./bolt-proxy:
host:port to bind to (default "localhost:8888")
-cert string
x509 certificate
-debug
enable debug logging
-key string
x509 private key
-pass string
Expand All @@ -124,12 +126,28 @@ configuration easier in the "cloud":
- `BOLT_PROXY_PASSWORD` -- password for the backend neo4j user for use
by the monitor
- `BOLT_PROXY_CERT` -- path to the x509 certificate (.pem) file
- `BOLE_PROXY_KEY` -- path to the x509 private key file
- `BOLT_PROXY_KEY` -- path to the x509 private key file
- `BOLT_PROXY_DEBUG` -- set to any value to enable debug mode/logging

### Lifecycle
When you start the proxy, it'll immediately try to connect to the
target backend using the provided bolt uri, username, and password. It
will then begin monitoring the routing table.
target backend using the provided bolt uri, username, and
password. The server version is extracted and it will then begin
monitoring the routing table.

When clients connect, the following occurs:

1. The proxy determines the connection type (direct vs. websocket)
2. The bolt handshake occurs, negotiating a version the client and
server can both speak.
3. The proxy brokers authentication with one of the backend servers.
4. If auth succeeds, the proxy then authenticates the client with all
other servers in the cluster.
5. The main client event loop kicks in, dealing with mapping bolt
messages from the client to the appropriate backend server based on
the target database and transaction type.
6. If all parties enjoy themselves, they say goodbye and everyone
thinks fondly of their experience.

### Connecting
You then tell your client application (e.g. cypher-shell, Browser) to
Expand Down Expand Up @@ -170,5 +188,11 @@ A bad request takes 2 forms and each has a different result:
2. A request to `/health` that's not a valid HTTP request will result
in an `HTTP/1.1 400 Bad Request` response.

### Logging
Some very verbose logging is available behind the `-debug` flag or the
`BOLT_PROXY_DEBUG` environment variable. It will log most Bolt
chatter, truncating messages, and will provide details on the state
changes of the event loops. Enjoy paying your log vendor!

# License
Provided under MIT. See [LICENSE](./LICENSE).
21 changes: 10 additions & 11 deletions backend/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package backend
import (
"crypto/tls"
"errors"
"log"
"fmt"
"net"

"github.com/voutilad/bolt-proxy/bolt"
Expand Down Expand Up @@ -45,34 +45,34 @@ func authClient(hello, version []byte, network, address string, useTls bool) (ne
0x00, 0x00, 0x00, 0x00}...)
_, err = conn.Write(handshake)
if err != nil {
log.Println("couldn't send handshake to auth server", address)
msg := fmt.Sprintf("couldn't send handshake to auth server %s: %s", address, err)
conn.Close()
return nil, err
return nil, errors.New(msg)
}

// Server should pick a version and provide as 4-byte array
// TODO: we eventually need version handling...for now ignore :-/
buf := make([]byte, 256)
n, err := conn.Read(buf)
if err != nil || n != 4 {
log.Println("didn't get valid handshake response from auth server", address)
msg := fmt.Sprintf("didn't get valid handshake response from auth server %s: %s", address, err)
conn.Close()
return nil, err
return nil, errors.New(msg)
}

// Try performing the bolt auth the given hello message
_, err = conn.Write(hello)
if err != nil {
log.Println("failed to send hello buffer to server", address)
msg := fmt.Sprintf("failed to send hello buffer to server %s: %s", address, err)
conn.Close()
return nil, err
return nil, errors.New(msg)
}

n, err = conn.Read(buf)
if err != nil {
log.Println("failed to get auth response from auth server", address)
msg := fmt.Sprintf("failed to get auth response from auth server %s: %s", address, err)
conn.Close()
return nil, err
return nil, errors.New(msg)
}

msg := bolt.IdentifyType(buf)
Expand All @@ -92,9 +92,8 @@ func authClient(hello, version []byte, network, address string, useTls bool) (ne
return nil, errors.New(failmsg)
}
}
log.Printf("!!! auth failure, but could not parse response: %v\n", r)
conn.Close()
return nil, errors.New("unknown auth failure")
return nil, errors.New("could not parse auth server response")
} else if msg == bolt.SuccessMsg {
// The only happy outcome! Keep conn open.
return conn, nil
Expand Down
22 changes: 12 additions & 10 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ type Backend struct {
monitor *Monitor
routingTable *RoutingTable
tls bool
log *log.Logger
// map of principals -> hosts -> connections
connectionPool map[string]map[string]bolt.BoltConn
}

func NewBackend(username, password string, uri string, hosts ...string) (*Backend, error) {
func NewBackend(logger *log.Logger, username, password string, uri string, hosts ...string) (*Backend, error) {
monitor, err := NewMonitor(username, password, uri, hosts...)
if err != nil {
return nil, err
Expand All @@ -35,6 +36,7 @@ func NewBackend(username, password string, uri string, hosts ...string) (*Backen
monitor: monitor,
routingTable: routingTable,
tls: tls,
log: logger,
}, nil
}

Expand All @@ -47,17 +49,17 @@ func (b *Backend) RoutingTable() *RoutingTable {
panic("attempting to use uninitialized BackendClient")
}

log.Println("checking routing table...")
b.log.Println("checking routing table...")
if b.routingTable.Expired() {
select {
case rt := <-b.monitor.C:
b.routingTable = rt
case <-time.After(60 * time.Second):
log.Fatal("timeout waiting for new routing table!")
b.log.Fatal("timeout waiting for new routing table!")
}
}

log.Println("using routing table")
b.log.Println("using routing table")
return b.routingTable
}

Expand All @@ -75,14 +77,14 @@ func (b *Backend) Authenticate(hello *bolt.Message) (map[string]bolt.BoltConn, e
// TODO: clean up this api...push the dirt into Bolt package
msg, pos, err := bolt.ParseTinyMap(hello.Data[4:])
if err != nil {
log.Printf("XXX pos: %d, hello map: %#v\n", pos, msg)
b.log.Printf("XXX pos: %d, hello map: %#v\n", pos, msg)
panic(err)
}
principal, ok := msg["principal"].(string)
if !ok {
panic("principal in Hello message was not a string")
}
log.Println("found principal:", principal)
b.log.Println("found principal:", principal)

// refresh routing table
// TODO: this api seems backwards...push down into table?
Expand All @@ -93,7 +95,7 @@ func (b *Backend) Authenticate(hello *bolt.Message) (map[string]bolt.BoltConn, e
writers, _ := rt.WritersFor(rt.DefaultDb)
defaultWriter := writers[0]

log.Printf("trying to auth %s to host %s\n", principal, defaultWriter)
b.log.Printf("trying to auth %s to host %s\n", principal, defaultWriter)
conn, err := authClient(hello.Data, b.Version().Bytes(),
"tcp", defaultWriter, b.tls)
if err != nil {
Expand All @@ -120,10 +122,10 @@ func (b *Backend) Authenticate(hello *bolt.Message) (map[string]bolt.BoltConn, e
defer wg.Done()
conn, err := authClient(hello.Data, b.Version().Bytes(), "tcp", h, b.tls)
if err != nil {
log.Printf("failed to auth %s to %s!?\n", principal, h)
b.log.Printf("failed to auth %s to %s!?\n", principal, h)
return
}
log.Printf("auth'd %s to host %s\n", principal, h)
b.log.Printf("auth'd %s to host %s\n", principal, h)
c <- pair{bolt.NewDirectConn(conn), h}
}(host)
}
Expand All @@ -136,6 +138,6 @@ func (b *Backend) Authenticate(hello *bolt.Message) (map[string]bolt.BoltConn, e
conns[p.host] = p.conn
}

log.Printf("auth'd principal to %d hosts\n", len(conns))
b.log.Printf("auth'd principal to %d hosts\n", len(conns))
return conns, err
}
42 changes: 27 additions & 15 deletions backend/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package backend
import (
"errors"
"fmt"
"log"
"strings"
"time"

Expand Down Expand Up @@ -31,8 +30,20 @@ type Version struct {
Major, Minor, Patch uint8
}

func ParseVersion(buf []byte) (Version, error) {
if len(buf) < 4 {
return Version{}, errors.New("buffer too short (< 4)")
}

version := Version{}
version.Major = uint8(buf[3])
version.Minor = uint8(buf[2])
version.Patch = uint8(buf[1])
return version, nil
}

func (v Version) String() string {
return fmt.Sprintf("{ major: %d, minor: %d, patch: %d }",
return fmt.Sprintf("Bolt{major: %d, minor: %d, patch: %d}",
v.Major,
v.Minor,
v.Patch)
Expand Down Expand Up @@ -133,9 +144,9 @@ func NewMonitor(user, password, uri string, hosts ...string) (*Monitor, error) {

version, err := getVersion(&driver)
if err != nil {
log.Fatal(err)
panic(err)
}
log.Printf("found neo4j version %v\n", version)
// log.Printf("found neo4j version %v\n", version)

// TODO: check if in SINGLE, CORE, or READ_REPLICA mode
// We can run `CALL dbms.listConfig('dbms.mode') YIELD value` and
Expand All @@ -146,7 +157,7 @@ func NewMonitor(user, password, uri string, hosts ...string) (*Monitor, error) {
// Get the first routing table and ttl details
rt, err := getNewRoutingTable(&driver)
if err != nil {
log.Fatal(err)
panic(err)
}
c <- rt

Expand All @@ -159,7 +170,7 @@ func NewMonitor(user, password, uri string, hosts ...string) (*Monitor, error) {
case <-ticker.C:
rt, err := getNewRoutingTable(monitor.driver)
if err != nil {
log.Fatal(err)
panic(err)
}
ticker.Reset(rt.Ttl)

Expand All @@ -179,9 +190,10 @@ func NewMonitor(user, password, uri string, hosts ...string) (*Monitor, error) {
}
case <-h:
ticker.Stop()
log.Println("monitor stopped")
case <-time.After(5 * rt.Ttl):
log.Fatalf("monitor timeout reached of 5 x %v\n", rt.Ttl)
// log.Println("monitor stopped")
case <-time.After(10 * rt.Ttl):
msg := fmt.Sprintf("monitor timeout of 10*%v reached\n", rt.Ttl)
panic(msg)
}
}
}()
Expand Down Expand Up @@ -381,14 +393,14 @@ func routingTableTx(tx neo4j.Transaction, names []string) (interface{}, error) {
func getNewRoutingTable(driver *neo4j.Driver) (*RoutingTable, error) {
names, err := queryDbNames(driver)
if err != nil {
log.Printf("error getting database names: %v\n", err)
return nil, err
msg := fmt.Sprintf("error getting database names: %v\n", err)
return nil, errors.New(msg)
}

tableMap, err := queryRoutingTable(driver, names)
if err != nil {
log.Printf("error getting routing table: %v\n", err)
return nil, err
msg := fmt.Sprintf("error getting routing table: %v\n", err)
return nil, errors.New(msg)
}

// build the new routing table instance
Expand Down Expand Up @@ -422,8 +434,8 @@ func getNewRoutingTable(driver *neo4j.Driver) (*RoutingTable, error) {
}
}

log.Printf("updated routing table: %s\n", &rt)
log.Printf("known hosts look like: %v\n", rt.Hosts)
// log.Printf("updated routing table: %s\n", &rt)
// log.Printf("known hosts look like: %v\n", rt.Hosts)

return &rt, nil
}
20 changes: 19 additions & 1 deletion bolt/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"errors"
"fmt"
"io"
// "log"
"net"

"github.com/gobwas/ws"
)
Expand Down Expand Up @@ -82,6 +82,15 @@ func NewDirectConn(c io.ReadWriteCloser) DirectConn {
return dc
}

func (c DirectConn) String() string {
switch c.conn.(type) {
case net.Conn:
return fmt.Sprintf("Direct[%s]", c.conn.(net.Conn).RemoteAddr())
default:
return fmt.Sprintf("Direct[%s]", &c.conn)
}
}

func (c DirectConn) R() <-chan *Message {
return c.r
}
Expand Down Expand Up @@ -210,6 +219,15 @@ func (c WsConn) R() <-chan *Message {
return c.r
}

func (c WsConn) String() string {
switch c.conn.(type) {
case net.Conn:
return fmt.Sprintf("WebSocket[%s]", c.conn.(net.Conn).RemoteAddr())
default:
return fmt.Sprintf("WebSocket[%s]", &c.conn)
}
}

// Read 0 or many Bolt Messages from a WebSocket frame since, apparently,
// small Bolt Messages sometimes get packed into a single Frame(?!).
//
Expand Down
Loading

0 comments on commit b953a88

Please sign in to comment.