From 492d856c626a21f41c93573af7e50c31511b9467 Mon Sep 17 00:00:00 2001 From: jolestar Date: Fri, 26 Aug 2016 17:58:44 +0800 Subject: [PATCH] fix backend sync stop bug. --- backends/etcd/client.go | 32 +++++++++++++++--------------- backends/etcd/client_test.go | 36 ++++++++++++++++++++++++++++++++++ backends/etcdv3/client.go | 32 +++++++++++++++--------------- backends/etcdv3/client_test.go | 36 ++++++++++++++++++++++++++++++++++ 4 files changed, 104 insertions(+), 32 deletions(-) create mode 100644 backends/etcd/client_test.go create mode 100644 backends/etcdv3/client_test.go diff --git a/backends/etcd/client.go b/backends/etcd/client.go index 5bfd296..f1bdee4 100644 --- a/backends/etcd/client.go +++ b/backends/etcd/client.go @@ -198,32 +198,31 @@ func (c *Client) internalGet(prefix, nodePath string) (string, error) { func (c *Client) internalSync(prefix string, store store.Store, stopChan chan bool) { - defer func() { - if r := recover(); r != nil { - log.Error("Sync Recover: %v, try restart.", r) - time.Sleep(time.Duration(1000) * time.Millisecond) - c.internalSync(prefix, store, stopChan) - } - }() - var waitIndex uint64 = 0 - inited := false + init := false + cancelRoutine := make(chan bool) + defer close(cancelRoutine) + for { + select { + case <-cancelRoutine: + return + default: + } + watcher := c.client.Watcher(prefix, &client.WatcherOptions{AfterIndex: waitIndex, Recursive: true}) ctx, cancel := context.WithCancel(context.Background()) - cancelRoutine := make(chan bool) - defer close(cancelRoutine) - go func() { select { case <-stopChan: + log.Info("Sync %s stop.", prefix) cancel() - case <-cancelRoutine: + cancelRoutine <- true return } }() - for !inited { + for !init { val, err := c.internalGets(prefix, "/") if err != nil { log.Error("GetValue from etcd nodePath:%s, error-type: %s, error: %s", prefix, reflect.TypeOf(err), err.Error()) @@ -241,13 +240,14 @@ func (c *Client) internalSync(prefix string, store store.Store, stopChan chan bo } } } + log.Info("Init store for prefix %s fail, retry.", prefix) time.Sleep(time.Duration(1000) * time.Millisecond) continue } store.PutBulk("/", val) - inited = true + log.Info("Init store for prefix %s success.", prefix) + init = true } - resp, err := watcher.Next(ctx) if err != nil { log.Error("Watch etcd error: %s", err.Error()) diff --git a/backends/etcd/client_test.go b/backends/etcd/client_test.go new file mode 100644 index 0000000..eb9a162 --- /dev/null +++ b/backends/etcd/client_test.go @@ -0,0 +1,36 @@ +package etcd + +import ( + "fmt" + "github.com/stretchr/testify/assert" + "github.com/yunify/metadata-proxy/log" + "github.com/yunify/metadata-proxy/store" + "math/rand" + "testing" + "time" +) + +func init() { + log.SetLevel("debug") + rand.Seed(int64(time.Now().Nanosecond())) +} + +func TestClientSyncStop(t *testing.T) { + + prefix := fmt.Sprintf("/prefix%v", rand.Intn(1000)) + + stopChan := make(chan bool) + + nodes := []string{"http://127.0.0.1:2379"} + storeClient, err := NewEtcdClient(prefix, nodes, "", "", "", false, "", "") + assert.NoError(t, err) + + time.Sleep(3000 * time.Millisecond) + go func() { + stopChan <- true + }() + + metastore := store.New() + // expect internalSync not block after stopChan has signal + storeClient.internalSync(prefix, metastore, stopChan) +} diff --git a/backends/etcdv3/client.go b/backends/etcdv3/client.go index 59044ea..ee3021b 100644 --- a/backends/etcdv3/client.go +++ b/backends/etcdv3/client.go @@ -173,41 +173,41 @@ func handleGetResp(prefix string, resp *client.GetResponse, vars map[string]stri func (c *Client) internalSync(prefix string, store store.Store, stopChan chan bool) { - defer func() { - if r := recover(); r != nil { - log.Error("Sync Recover: %v, try restart.", r) - time.Sleep(time.Duration(1000) * time.Millisecond) - c.internalSync(prefix, store, stopChan) - } - }() - var rev int64 = 0 - inited := false + init := false + cancelRoutine := make(chan bool) + defer close(cancelRoutine) + for { + select { + case <-cancelRoutine: + return + default: + } + ctx, cancel := context.WithCancel(context.Background()) watchChan := c.client.Watch(ctx, prefix, client.WithPrefix(), client.WithRev(rev)) - cancelRoutine := make(chan bool) - defer close(cancelRoutine) - go func() { select { case <-stopChan: + log.Info("Sync %s stop.", prefix) cancel() - case <-cancelRoutine: - return + cancelRoutine <- true } }() - for !inited { + for !init { val, err := c.internalGets(prefix, "/") if err != nil { log.Error("GetValue from etcd nodePath:%s, error-type: %s, error: %s", prefix, reflect.TypeOf(err), err.Error()) time.Sleep(time.Duration(1000) * time.Millisecond) + log.Info("Init store for prefix %s fail, retry.", prefix) continue } store.PutBulk("/", val) - inited = true + log.Info("Init store for prefix %s success.", prefix) + init = true } for resp := range watchChan { processSyncChange(prefix, store, &resp) diff --git a/backends/etcdv3/client_test.go b/backends/etcdv3/client_test.go new file mode 100644 index 0000000..783fb98 --- /dev/null +++ b/backends/etcdv3/client_test.go @@ -0,0 +1,36 @@ +package etcdv3 + +import ( + "fmt" + "github.com/stretchr/testify/assert" + "github.com/yunify/metadata-proxy/log" + "github.com/yunify/metadata-proxy/store" + "math/rand" + "testing" + "time" +) + +func init() { + log.SetLevel("debug") + rand.Seed(int64(time.Now().Nanosecond())) +} + +func TestClientSyncStop(t *testing.T) { + + prefix := fmt.Sprintf("/prefix%v", rand.Intn(1000)) + + stopChan := make(chan bool) + + nodes := []string{"http://127.0.0.1:2379"} + storeClient, err := NewEtcdClient(prefix, nodes, "", "", "", false, "", "") + assert.NoError(t, err) + + time.Sleep(3000 * time.Millisecond) + go func() { + stopChan <- true + }() + + metastore := store.New() + // expect internalSync not block after stopChan has signal + storeClient.internalSync(prefix, metastore, stopChan) +}