Skip to content

Commit

Permalink
Version 1.2.0
Browse files Browse the repository at this point in the history
1. add redis protocol support.
  • Loading branch information
felixhao authored Jul 31, 2018
2 parents ae10d9d + 559ebd1 commit ddd4638
Show file tree
Hide file tree
Showing 54 changed files with 2,505 additions and 378 deletions.
4 changes: 4 additions & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
coverage:
status:
project: on
patch: off
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
.glide/

cmd/proxy/proxy
coverage.txt
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
language: go

go:
- 1.9.x
- 1.10.x

go_import_path: overlord
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Overlord

## Version 1.2.0
1. add redis protocol support.

## Version 1.1.0
1. add memcache binary protocol support.
2. add conf file check
Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,24 @@ go build
#### Test

```shell
# test memcache
echo -e "set a_11 0 0 5\r\nhello\r\n" | nc 127.0.0.1 21211
# STORED
echo -e "get a_11\r\n" | nc 127.0.0.1 21211
# VALUE a_11 0 5
# hello
# END

# test redis
python ./scripts/validate_redis_features.py # require fakeredis==0.11.0 redis==2.10.6 gevent==1.3.5
```

Congratulations! You've just run the overlord proxy.

## Features

- [x] support memcache protocol
- [ ] support redis protocol
- [x] support redis protocol
- [x] connection pool for reduce number to backend caching servers
- [x] keepalive & failover
- [x] hash tag: specify the part of the key used for hashing
Expand Down
2 changes: 2 additions & 0 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func main() {
go http.ListenAndServe(c.Pprof, nil)
if c.Proxy.UseMetrics {
prom.Init()
} else {
prom.On = false
}
}
// new proxy
Expand Down
42 changes: 38 additions & 4 deletions cmd/proxy/proxy-cluster-example.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
[[clusters]]
# This be used to specify the name of cache cluster.
name = "test-cluster"
name = "test-mc"
# The name of the hash function. Possible values are: sha1.
hash_method = "sha1"
hash_method = "fnv1a_64"
# The key distribution mode. Possible values are: ketama.
hash_distribution = "ketama"
# A two character string that specifies the part of the key used for hashing. Eg "{}".
hash_tag = ""
# cache type: memcache | redis
# cache type: memcache | memcache_binary |redis
cache_type = "memcache"
# proxy listen proto: tcp | unix
listen_proto = "tcp"
Expand All @@ -29,5 +29,39 @@ ping_fail_limit = 3
ping_auto_eject = true
# A list of server address, port and weight (name:port:weight or ip:port:weight) for this server pool. Also you can use alias name like: ip:port:weight alias.
servers = [
"127.0.0.1:11211:10",
"127.0.0.1:11211:1",
]

[[clusters]]
# This be used to specify the name of cache cluster.
name = "test-redis"
# The name of the hash function. Possible values are: sha1.
hash_method = "fnv1a_64"
# The key distribution mode. Possible values are: ketama.
hash_distribution = "ketama"
# A two character string that specifies the part of the key used for hashing. Eg "{}".
hash_tag = ""
# cache type: memcache | redis
cache_type = "redis"
# proxy listen proto: tcp | unix
listen_proto = "tcp"
# proxy listen addr: tcp addr | unix sock path
listen_addr = "0.0.0.0:26379"
# Authenticate to the Redis server on connect.
redis_auth = ""
# The dial timeout value in msec that we wait for to establish a connection to the server. By default, we wait indefinitely.
dial_timeout = 1000
# The read timeout value in msec that we wait for to receive a response from a server. By default, we wait indefinitely.
read_timeout = 1000
# The write timeout value in msec that we wait for to write a response to a server. By default, we wait indefinitely.
write_timeout = 1000
# The number of connections that can be opened to each server. By default, we open at most 1 server connection.
node_connections = 2
# The number of consecutive failures on a server that would lead to it being temporarily ejected when auto_eject is set to true. Defaults to 3.
ping_fail_limit = 3
# A boolean value that controls if server should be ejected temporarily when it fails consecutively ping_fail_limit times.
ping_auto_eject = false
# A list of server address, port and weight (name:port:weight or ip:port:weight) for this server pool. Also you can use alias name like: ip:port:weight alias.
servers = [
"127.0.0.1:6379:1",
]
3 changes: 2 additions & 1 deletion codecov.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ set -e
echo "" > coverage.txt

for d in $(go list ./... | grep -v vendor | grep -v cmd); do
echo "testing for $d ..."
go test -coverprofile=profile.out -covermode=atomic $d
if [ -f profile.out ]; then
cat profile.out >> coverage.txt
rm profile.out
fi
done
done
12 changes: 7 additions & 5 deletions lib/bufio/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ func init() {
func initBufPool(idx int) {
pools[idx] = &sync.Pool{
New: func() interface{} {
return &Buffer{
buf: make([]byte, sizes[idx]),
}
return NewBuffer(sizes[idx])
},
}
}
Expand All @@ -49,6 +47,11 @@ type Buffer struct {
r, w int
}

// NewBuffer new buffer.
func NewBuffer(size int) *Buffer {
return &Buffer{buf: make([]byte, size)}
}

// Bytes return the bytes readed
func (b *Buffer) Bytes() []byte {
return b.buf[b.r:b.w]
Expand Down Expand Up @@ -97,8 +100,7 @@ func Get(size int) *Buffer {
}
i := sort.SearchInts(sizes, size)
if i >= len(pools) {
b := &Buffer{buf: make([]byte, size)}
return b
return NewBuffer(size)
}
b := pools[i].Get().(*Buffer)
b.Reset()
Expand Down
130 changes: 43 additions & 87 deletions lib/bufio/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
libnet "overlord/lib/net"
)

const (
maxBuffered = 64
var (
// ErrBufferFull err buffer full
ErrBufferFull = bufio.ErrBufferFull
)

// ErrProxy
var (
ErrBufferFull = bufio.ErrBufferFull
crlfBytes = []byte("\r\n")
)

// Reader implements buffering for an io.Reader object.
Expand Down Expand Up @@ -47,6 +47,16 @@ func (r *Reader) Advance(n int) {
r.b.Advance(n)
}

// Mark return buf read pos.
func (r *Reader) Mark() int {
return r.b.r
}

// AdvanceTo reset buffer read pos.
func (r *Reader) AdvanceTo(mark int) {
r.Advance(mark - r.b.r)
}

// Buffer will return the reference of local buffer
func (r *Reader) Buffer() *Buffer {
return r.b
Expand All @@ -57,21 +67,31 @@ func (r *Reader) Read() error {
if r.err != nil {
return r.err
}

if r.b.buffered() == r.b.len() {
r.b.grow()
}

if r.b.w == r.b.len() {
r.b.shrink()
}

if err := r.fill(); err != io.EOF {
return err
}
return nil
}

// ReadLine will read until meet the first crlf bytes.
func (r *Reader) ReadLine() (line []byte, err error) {
idx := bytes.Index(r.b.buf[r.b.r:r.b.w], crlfBytes)
if idx == -1 {
line = nil
err = ErrBufferFull
return
}
line = r.b.buf[r.b.r : r.b.r+idx+2]
r.b.r += idx + 2
return
}

// ReadSlice will read until the delim or return ErrBufferFull.
// It never contains any I/O operation
func (r *Reader) ReadSlice(delim byte) (data []byte, err error) {
Expand Down Expand Up @@ -121,73 +141,9 @@ func (r *Reader) ResetBuffer(b *Buffer) {
r.b.w = b.w
}

// ReadUntil reads until the first occurrence of delim in the input,
// returning a slice pointing at the bytes in the buffer.
// The bytes stop being valid at the next read.
// If ReadUntil encounters an error before finding a delimiter,
// it returns all the data in the buffer and the error itself (often io.EOF).
// ReadUntil returns err != nil if and only if line does not end in delim.
func (r *Reader) ReadUntil(delim byte) ([]byte, error) {
if r.err != nil {
return nil, r.err
}
for {
var index = bytes.IndexByte(r.b.buf[r.b.r:r.b.w], delim)
if index >= 0 {
limit := r.b.r + index + 1
slice := r.b.buf[r.b.r:limit]
r.b.r = limit
return slice, nil
}
if r.b.w >= r.b.len() {
r.b.grow()
}
err := r.fill()
if err == io.EOF && r.b.buffered() > 0 {
data := r.b.buf[r.b.r:r.b.w]
r.b.r = r.b.w
return data, nil
} else if err != nil {
r.err = err
return nil, err
}
}
}

// ReadFull reads exactly n bytes from r into buf.
// It returns the number of bytes copied and an error if fewer bytes were read.
// The error is EOF only if no bytes were read.
// If an EOF happens after reading some but not all the bytes,
// ReadFull returns ErrUnexpectedEOF.
// On return, n == len(buf) if and only if err == nil.
func (r *Reader) ReadFull(n int) ([]byte, error) {
if n <= 0 {
return nil, nil
}
if r.err != nil {
return nil, r.err
}
for {
if r.b.buffered() >= n {
bs := r.b.buf[r.b.r : r.b.r+n]
r.b.r += n
return bs, nil
}
maxCanRead := r.b.len() - r.b.w + r.b.buffered()
if maxCanRead < n {
r.b.grow()
}
err := r.fill()
if err == io.EOF && r.b.buffered() > 0 {
data := r.b.buf[r.b.r:r.b.w]
r.b.r = r.b.w
return data, nil
} else if err != nil {
r.err = err
return nil, err
}
}
}
const (
maxWritevSize = 1024
)

// Writer implements buffering for an io.Writer object.
// If an error occurs writing to a Writer, no more data will be
Expand All @@ -196,17 +152,17 @@ func (r *Reader) ReadFull(n int) ([]byte, error) {
// Flush method to guarantee all data has been forwarded to
// the underlying io.Writer.
type Writer struct {
wr *libnet.Conn
bufsp net.Buffers
bufs [][]byte
cursor int
wr *libnet.Conn
bufsp net.Buffers
bufs [][]byte
cnt int

err error
}

// NewWriter returns a new Writer whose buffer has the default size.
func NewWriter(wr *libnet.Conn) *Writer {
return &Writer{wr: wr, bufs: make([][]byte, maxBuffered)}
return &Writer{wr: wr, bufs: make([][]byte, 0, maxWritevSize)}
}

// Flush writes any buffered data to the underlying io.Writer.
Expand All @@ -217,12 +173,13 @@ func (w *Writer) Flush() error {
if len(w.bufs) == 0 {
return nil
}
w.bufsp = net.Buffers(w.bufs[:w.cursor])
w.bufsp = net.Buffers(w.bufs[:w.cnt])
_, err := w.wr.Writev(&w.bufsp)
if err != nil {
w.err = err
}
w.cursor = 0
w.bufs = w.bufs[:0]
w.cnt = 0
return w.err
}

Expand All @@ -237,11 +194,10 @@ func (w *Writer) Write(p []byte) (err error) {
if p == nil {
return nil
}

if w.cursor+1 == maxBuffered {
w.Flush()
w.bufs = append(w.bufs, p)
w.cnt++
if len(w.bufs) == maxWritevSize {
err = w.Flush()
}
w.bufs[w.cursor] = p
w.cursor = (w.cursor + 1) % maxBuffered
return nil
return
}
Loading

0 comments on commit ddd4638

Please sign in to comment.