Skip to content

Commit

Permalink
fix backend sync stop bug.
Browse files Browse the repository at this point in the history
  • Loading branch information
jolestar committed Aug 26, 2016
1 parent b8cc2f5 commit 492d856
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 32 deletions.
32 changes: 16 additions & 16 deletions backends/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,32 +198,31 @@ func (c *Client) internalGet(prefix, nodePath string) (string, error) {

func (c *Client) internalSync(prefix string, store store.Store, stopChan chan bool) {

defer func() {
if r := recover(); r != nil {
log.Error("Sync Recover: %v, try restart.", r)
time.Sleep(time.Duration(1000) * time.Millisecond)
c.internalSync(prefix, store, stopChan)
}
}()

var waitIndex uint64 = 0
inited := false
init := false
cancelRoutine := make(chan bool)
defer close(cancelRoutine)

for {
select {
case <-cancelRoutine:
return
default:
}

watcher := c.client.Watcher(prefix, &client.WatcherOptions{AfterIndex: waitIndex, Recursive: true})
ctx, cancel := context.WithCancel(context.Background())
cancelRoutine := make(chan bool)
defer close(cancelRoutine)

go func() {
select {
case <-stopChan:
log.Info("Sync %s stop.", prefix)
cancel()
case <-cancelRoutine:
cancelRoutine <- true
return
}
}()

for !inited {
for !init {
val, err := c.internalGets(prefix, "/")
if err != nil {
log.Error("GetValue from etcd nodePath:%s, error-type: %s, error: %s", prefix, reflect.TypeOf(err), err.Error())
Expand All @@ -241,13 +240,14 @@ func (c *Client) internalSync(prefix string, store store.Store, stopChan chan bo
}
}
}
log.Info("Init store for prefix %s fail, retry.", prefix)
time.Sleep(time.Duration(1000) * time.Millisecond)
continue
}
store.PutBulk("/", val)
inited = true
log.Info("Init store for prefix %s success.", prefix)
init = true
}

resp, err := watcher.Next(ctx)
if err != nil {
log.Error("Watch etcd error: %s", err.Error())
Expand Down
36 changes: 36 additions & 0 deletions backends/etcd/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package etcd

import (
"fmt"
"github.com/stretchr/testify/assert"
"github.com/yunify/metadata-proxy/log"
"github.com/yunify/metadata-proxy/store"
"math/rand"
"testing"
"time"
)

func init() {
log.SetLevel("debug")
rand.Seed(int64(time.Now().Nanosecond()))
}

func TestClientSyncStop(t *testing.T) {

prefix := fmt.Sprintf("/prefix%v", rand.Intn(1000))

stopChan := make(chan bool)

nodes := []string{"http://127.0.0.1:2379"}
storeClient, err := NewEtcdClient(prefix, nodes, "", "", "", false, "", "")
assert.NoError(t, err)

time.Sleep(3000 * time.Millisecond)
go func() {
stopChan <- true
}()

metastore := store.New()
// expect internalSync not block after stopChan has signal
storeClient.internalSync(prefix, metastore, stopChan)
}
32 changes: 16 additions & 16 deletions backends/etcdv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,41 +173,41 @@ func handleGetResp(prefix string, resp *client.GetResponse, vars map[string]stri

func (c *Client) internalSync(prefix string, store store.Store, stopChan chan bool) {

defer func() {
if r := recover(); r != nil {
log.Error("Sync Recover: %v, try restart.", r)
time.Sleep(time.Duration(1000) * time.Millisecond)
c.internalSync(prefix, store, stopChan)
}
}()

var rev int64 = 0
inited := false
init := false
cancelRoutine := make(chan bool)
defer close(cancelRoutine)

for {
select {
case <-cancelRoutine:
return
default:
}

ctx, cancel := context.WithCancel(context.Background())
watchChan := c.client.Watch(ctx, prefix, client.WithPrefix(), client.WithRev(rev))

cancelRoutine := make(chan bool)
defer close(cancelRoutine)

go func() {
select {
case <-stopChan:
log.Info("Sync %s stop.", prefix)
cancel()
case <-cancelRoutine:
return
cancelRoutine <- true
}
}()

for !inited {
for !init {
val, err := c.internalGets(prefix, "/")
if err != nil {
log.Error("GetValue from etcd nodePath:%s, error-type: %s, error: %s", prefix, reflect.TypeOf(err), err.Error())
time.Sleep(time.Duration(1000) * time.Millisecond)
log.Info("Init store for prefix %s fail, retry.", prefix)
continue
}
store.PutBulk("/", val)
inited = true
log.Info("Init store for prefix %s success.", prefix)
init = true
}
for resp := range watchChan {
processSyncChange(prefix, store, &resp)
Expand Down
36 changes: 36 additions & 0 deletions backends/etcdv3/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package etcdv3

import (
"fmt"
"github.com/stretchr/testify/assert"
"github.com/yunify/metadata-proxy/log"
"github.com/yunify/metadata-proxy/store"
"math/rand"
"testing"
"time"
)

func init() {
log.SetLevel("debug")
rand.Seed(int64(time.Now().Nanosecond()))
}

func TestClientSyncStop(t *testing.T) {

prefix := fmt.Sprintf("/prefix%v", rand.Intn(1000))

stopChan := make(chan bool)

nodes := []string{"http://127.0.0.1:2379"}
storeClient, err := NewEtcdClient(prefix, nodes, "", "", "", false, "", "")
assert.NoError(t, err)

time.Sleep(3000 * time.Millisecond)
go func() {
stopChan <- true
}()

metastore := store.New()
// expect internalSync not block after stopChan has signal
storeClient.internalSync(prefix, metastore, stopChan)
}

0 comments on commit 492d856

Please sign in to comment.