Skip to content

Commit

Permalink
Version 1.5.0
Browse files Browse the repository at this point in the history
1. refactor message pipeline.
2. non-persistent connection for cluster redirect.
  • Loading branch information
felixhao authored Dec 5, 2018
2 parents 3d1fde6 + 341b173 commit 0cfee0f
Show file tree
Hide file tree
Showing 55 changed files with 1,459 additions and 1,567 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Overlord

## Version 1.5.0
1. refactor message pipeline.
2. non-persistent connection for cluster redirect.

## Version 1.4.0
1. add redis cluster support.

Expand Down
16 changes: 8 additions & 8 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

const (
// VERSION version
VERSION = "1.4.0"
VERSION = "1.5.0"
)

var (
Expand Down Expand Up @@ -77,6 +77,13 @@ func main() {
if initLog(c) {
defer log.Close()
}
// new proxy
p, err := proxy.New(c)
if err != nil {
panic(err)
}
defer p.Close()
p.Serve(ccs)
// pprof
if c.Pprof != "" {
go http.ListenAndServe(c.Pprof, nil)
Expand All @@ -86,13 +93,6 @@ func main() {
prom.On = false
}
}
// new proxy
p, err := proxy.New(c)
if err != nil {
panic(err)
}
defer p.Close()
go p.Serve(ccs)
// hanlde signal
signalHandler()
}
Expand Down
6 changes: 2 additions & 4 deletions lib/backoff/backoff_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
package backoff_test
package backoff

import (
"testing"
"time"

"overlord/lib/backoff"
)

func TestBackoff(t *testing.T) {
for i, j := 0, 0; i < 120; i++ {
if i == 10 {
j = 0
}
tm := backoff.Backoff(j)
tm := Backoff(j)
j++
t.Logf("backoff second: %d", tm/time.Second)
}
Expand Down
23 changes: 0 additions & 23 deletions lib/bufio/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,29 +118,6 @@ func (r *Reader) ReadExact(n int) (data []byte, err error) {
return
}

// ResetBuffer reset buf.
func (r *Reader) ResetBuffer(b *Buffer) {
if b == nil {
r.b = b
return
}
b.Reset()
var n int
if r.b != nil {
if r.b.buffered() > 0 {
for b.len() < r.b.buffered() {
b.grow()
}
n = copy(b.buf[b.w:], r.b.buf[r.b.r:r.b.w])
b.w += n
}
}
r.err = nil
r.b = b
r.b.r = b.r
r.b.w = b.w
}

const (
maxWritevSize = 1024
)
Expand Down
27 changes: 0 additions & 27 deletions lib/bufio/io_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,33 +91,6 @@ func TestReaderReadExact(t *testing.T) {
assert.Equal(t, ErrBufferFull, err)
}

func TestReaderResetBuffer(t *testing.T) {
bts := _genData()
b := NewReader(bytes.NewBuffer(bts), Get(defaultBufferSize))

err := b.Read()
assert.NoError(t, err)

_, err = b.ReadExact(512)
assert.NoError(t, err)

b.ResetBuffer(Get(defaultBufferSize))
err = b.Read()
assert.NoError(t, err)

data, err := b.ReadExact(300)
assert.NoError(t, err)
assert.Len(t, data, 300)

_, err = b.ReadExact(300)
assert.Error(t, err)
assert.Equal(t, ErrBufferFull, err)

b.ResetBuffer(nil)
buf := b.Buffer()
assert.Nil(t, buf)
}

type mockAddr string

func (m mockAddr) Network() string {
Expand Down
27 changes: 27 additions & 0 deletions lib/conv/conv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package conv

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestBtoi(t *testing.T) {
i := int64(123)
is := []byte("123")
ni, err := Btoi(is)
assert.NoError(t, err)
assert.Equal(t, i, ni)
}

func TestUpdateToLower(t *testing.T) {
bs := []byte{'A', 'B', 'c'}
UpdateToLower(bs)
assert.Equal(t, []byte{'a', 'b', 'c'}, bs)
}

func TestUpdateToUpper(t *testing.T) {
bs := []byte{'a', 'b', 'C'}
UpdateToUpper(bs)
assert.Equal(t, []byte{'A', 'B', 'C'}, bs)
}
24 changes: 4 additions & 20 deletions lib/hashkit/fnv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,12 @@ package hashkit

import (
"hash"
"sync"
)

// Fnv define fnv1 hash.
type Fnv struct {
pool sync.Pool
}

// NewFnv1a64 return fnv with fnv1a64.
func NewFnv1a64() *Fnv {
h := &Fnv{}
h.pool.New = func() interface{} { return New64a() }
return h
}

func (f *Fnv) fnv1a64(key []byte) (value uint) {
hs := f.pool.Get().(hash.Hash64)
hs.Write(key)
value = uint(hs.Sum64())
hs.Reset()
f.pool.Put(hs)
return
func fnv1a64(key []byte) (value uint) {
var s sum64a = offset64
s.Write(key)
return uint(s.Sum64())
}

type (
Expand Down
24 changes: 24 additions & 0 deletions lib/hashkit/fnv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package hashkit

import (
"testing"
)

func TestFnv(t *testing.T) {
f := New64a()
bs := []byte("abc")
fbs := f.Sum(bs)
t.Logf("sum:%x", fbs)

f.Reset()
r := f.Sum64()
if r != offset64 {
t.Errorf("not equal:%d", r)
}
if f.BlockSize() != 1 {
t.Errorf("blocksize:%d", f.BlockSize())
}
if f.Size() != 8 {
t.Errorf("size:%d", f.Size())
}
}
2 changes: 1 addition & 1 deletion lib/hashkit/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func NewRing(des, method string) *HashRing {
case HashMethodFnv1a:
fallthrough
default:
hash = NewFnv1a64().fnv1a64
hash = fnv1a64
}
return newRingWithHash(hash)
}
2 changes: 1 addition & 1 deletion lib/hashkit/ketama.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type HashRing struct {
// Default hash: fnv1a64
func Ketama() (h *HashRing) {
h = new(HashRing)
h.hash = NewFnv1a64().fnv1a64
h.hash = fnv1a64
return
}

Expand Down
16 changes: 11 additions & 5 deletions lib/hashkit/ketama_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package hashkit_test
package hashkit

import (
"bytes"
"strconv"
"testing"

"overlord/lib/hashkit"
)

var (
ring = hashkit.Ketama()
ring = Ketama()
nodes = []string{
"test1.server.com",
"test2.server.com",
Expand Down Expand Up @@ -42,7 +40,7 @@ func TestGetInfo(t *testing.T) {

ring.DelNode("wocao")
testHash(t)
t.Log("----del exist node test ok:expect 0 1 2 1 5----\n")
t.Log("----del not exist node test ok:expect 0 1 2 1 5----\n")

for _, node := range nodes {
ring.DelNode(node)
Expand Down Expand Up @@ -74,3 +72,11 @@ func testHash(t *testing.T) {
}
t.Log(node5, m[node5])
}

func BenchmarkHash(b *testing.B) {
ring.Init(nodes, sis)
for i := 0; i < b.N; i++ {
s := "test value" + strconv.FormatUint(uint64(i), 10)
ring.GetNode([]byte(s))
}
}
32 changes: 0 additions & 32 deletions lib/prom/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
const (
statConns = "overlord_proxy_conns"
statErr = "overlord_proxy_err"
statHit = "overlord_proxy_hit"
statMiss = "overlord_proxy_miss"

statProxyTimer = "overlord_proxy_timer"
statHandlerTimer = "overlord_proxy_handler_timer"
Expand All @@ -20,8 +18,6 @@ const (
var (
conns *prometheus.GaugeVec
gerr *prometheus.GaugeVec
hit *prometheus.CounterVec
miss *prometheus.CounterVec
proxyTimer *prometheus.HistogramVec
handlerTimer *prometheus.HistogramVec

Expand All @@ -48,18 +44,6 @@ func Init() {
Help: statErr,
}, clusterNodeErrLabels)
prometheus.MustRegister(gerr)
hit = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: statHit,
Help: statHit,
}, clusterNodeLabels)
prometheus.MustRegister(hit)
miss = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: statMiss,
Help: statMiss,
}, clusterNodeLabels)
prometheus.MustRegister(miss)
proxyTimer = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: statProxyTimer,
Expand Down Expand Up @@ -124,19 +108,3 @@ func ConnDecr(cluster string) {
}
conns.WithLabelValues(cluster).Dec()
}

// Hit increments one stat hit counter.
func Hit(cluster, node string) {
if hit == nil {
return
}
hit.WithLabelValues(cluster, node).Inc()
}

// Miss decrements one stat miss counter.
func Miss(cluster, node string) {
if miss == nil {
return
}
miss.WithLabelValues(cluster, node).Inc()
}
Loading

0 comments on commit 0cfee0f

Please sign in to comment.