From 3b01167b3b172642888f49969d8619134c164c31 Mon Sep 17 00:00:00 2001 From: chaoranz758 <2715584135@qq.com> Date: Tue, 12 Sep 2023 14:02:37 +0800 Subject: [PATCH 1/2] feat: support registry retry --- README.md | 63 ++++++++++++++ etcd_registry.go | 123 +++++++++++++++++++++++++--- example/server/retry/main.go | 56 +++++++++++++ example/server/{ => simple}/main.go | 0 retry/option.go | 29 +++++++ retry/retry.go | 32 ++++++++ retry/retry_test.go | 26 ++++++ 7 files changed, 319 insertions(+), 10 deletions(-) create mode 100644 example/server/retry/main.go rename example/server/{ => simple}/main.go (100%) create mode 100644 retry/option.go create mode 100644 retry/retry.go create mode 100644 retry/retry_test.go diff --git a/README.md b/README.md index 7b10931..f05efff 100644 --- a/README.md +++ b/README.md @@ -125,6 +125,69 @@ func main() { } ``` +## Retry + +After the service is registered to ETCD, it will regularly check the status of the service. If any abnormal status is found, it will try to register the service again. `ObserveDelay` is the delay time for checking the service status under normal conditions, and `RetryDelay` is the delay time for attempting to register the service after disconnecting. + +### Default Retry Config + +| Config Name | Default Value | Description | +|:--------------------|:-----------------|:------------------------------------------------------------------------------------------| +| WithMaxAttemptTimes | 5 | Used to set the maximum number of attempts, if 0, it means infinite attempts | +| WithObserveDelay | 30 * time.Second | Used to set the delay time for checking service status under normal connection conditions | +| WithRetryDelay | 10 * time.Second | Used to set the retry delay time after disconnecting | + +### Example + +If you do not need to customize the retry configuration, use `etcd. NewEtcdRegistry()`. + +If you need to customize the retry configuration, use the following code: + +```go +package main + +import ( + "context" + "log" + "time" + + "github.com/cloudwego/kitex-examples/hello/kitex_gen/api" + "github.com/cloudwego/kitex-examples/hello/kitex_gen/api/hello" + "github.com/cloudwego/kitex/pkg/rpcinfo" + "github.com/cloudwego/kitex/server" + etcd "github.com/kitex-contrib/registry-etcd" + "github.com/kitex-contrib/registry-etcd/retry" +) + +type HelloImpl struct{} + +func (h *HelloImpl) Echo(ctx context.Context, req *api.Request) (resp *api.Response, err error) { + resp = &api.Response{ + Message: req.Message, + } + return +} + +func main() { + retryConfig := retry.NewRetryConfig( + retry.WithMaxAttemptTimes(10), + retry.WithObserveDelay(20*time.Second), + retry.WithRetryDelay(5*time.Second), + ) + r, err := etcd.NewEtcdRegistryWithRetry([]string{"127.0.0.1:2379"}, retryConfig) + if err != nil { + log.Fatal(err) + } + server := hello.NewServer(new(HelloImpl), server.WithRegistry(r), server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ + ServiceName: "Hello", + })) + err = server.Run() + if err != nil { + log.Fatal(err) + } +} +``` + ## How to Dynamically specify ip and port To dynamically specify an IP and port, one should first set the environment variables KITEX_IP_TO_REGISTRY and KITEX_PORT_TO_REGISTRY. If these variables are not set, the system defaults to using the service's listening IP and port. Notably, if the service's listening IP is either not set or set to "::", the system will automatically retrieve and use the machine's IPV4 address. diff --git a/etcd_registry.go b/etcd_registry.go index 71007f5..03431b7 100644 --- a/etcd_registry.go +++ b/etcd_registry.go @@ -27,6 +27,7 @@ import ( "github.com/cloudwego/kitex/pkg/klog" "github.com/cloudwego/kitex/pkg/registry" + "github.com/kitex-contrib/registry-etcd/retry" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -38,9 +39,11 @@ const ( ) type etcdRegistry struct { - etcdClient *clientv3.Client - leaseTTL int64 - meta *registerMeta + etcdClient *clientv3.Client + leaseTTL int64 + meta *registerMeta + retryConfig *retry.Config + stop chan struct{} } type registerMeta struct { @@ -61,13 +64,36 @@ func NewEtcdRegistry(endpoints []string, opts ...Option) (registry.Registry, err if err != nil { return nil, err } + retryConfig := retry.NewRetryConfig() return &etcdRegistry{ - etcdClient: etcdClient, - leaseTTL: getTTL(), + etcdClient: etcdClient, + leaseTTL: getTTL(), + retryConfig: retryConfig, + stop: make(chan struct{}, 1), }, nil } -// NewEtcdRegistryWithAuth creates a etcd based registry with given username and password. +// NewEtcdRegistryWithRetry creates an etcd based registry with given custom retry configs +func NewEtcdRegistryWithRetry(endpoints []string, retryConfig *retry.Config, opts ...Option) (registry.Registry, error) { + cfg := clientv3.Config{ + Endpoints: endpoints, + } + for _, opt := range opts { + opt(&cfg) + } + etcdClient, err := clientv3.New(cfg) + if err != nil { + return nil, err + } + return &etcdRegistry{ + etcdClient: etcdClient, + leaseTTL: getTTL(), + retryConfig: retryConfig, + stop: make(chan struct{}, 1), + }, nil +} + +// NewEtcdRegistryWithAuth creates an etcd based registry with given username and password. // Deprecated: Use WithAuthOpt instead. func NewEtcdRegistryWithAuth(endpoints []string, username, password string) (registry.Registry, error) { etcdClient, err := clientv3.New(clientv3.Config{ @@ -78,9 +104,12 @@ func NewEtcdRegistryWithAuth(endpoints []string, username, password string) (reg if err != nil { return nil, err } + retryConfig := retry.NewRetryConfig() return &etcdRegistry{ - etcdClient: etcdClient, - leaseTTL: getTTL(), + etcdClient: etcdClient, + leaseTTL: getTTL(), + retryConfig: retryConfig, + stop: make(chan struct{}, 1), }, nil } @@ -137,7 +166,77 @@ func (e *etcdRegistry) register(info *registry.Info, leaseID clientv3.LeaseID) e ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() _, err = e.etcdClient.Put(ctx, serviceKey(info.ServiceName, addr), string(val), clientv3.WithLease(leaseID)) - return err + if err != nil { + return err + } + + go func(key, val string) { + e.keepRegister(key, val, e.retryConfig) + }(serviceKey(info.ServiceName, addr), string(val)) + + return nil +} + +// keepRegister keep service registered status +// maxRetry == 0 means retry forever +func (e *etcdRegistry) keepRegister(key, val string, retryConfig *retry.Config) { + var failedTimes uint + delay := retryConfig.ObserveDelay + for retryConfig.MaxAttemptTimes == 0 || failedTimes < retryConfig.MaxAttemptTimes { + select { + case _, ok := <-e.stop: + if !ok { + close(e.stop) + } + klog.Infof("stop keep register service %s", key) + return + case <-time.After(delay): + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + resp, err := e.etcdClient.Get(ctx, key) + if err != nil { + klog.Warnf("keep register get %s failed with err: %v", key, err) + delay = retryConfig.RetryDelay + failedTimes++ + continue + } + + if len(resp.Kvs) == 0 { + klog.Infof("keep register service %s", key) + delay = retryConfig.RetryDelay + leaseID, err := e.grantLease() + if err != nil { + klog.Warnf("keep register grant lease %s failed with err: %v", key, err) + failedTimes++ + continue + } + + _, err = e.etcdClient.Put(ctx, key, val, clientv3.WithLease(leaseID)) + if err != nil { + klog.Warnf("keep register put %s failed with err: %v", key, err) + failedTimes++ + continue + } + + meta := registerMeta{ + leaseID: leaseID, + } + meta.ctx, meta.cancel = context.WithCancel(context.Background()) + if err := e.keepalive(&meta); err != nil { + klog.Warnf("keep register keepalive %s failed with err: %v", key, err) + failedTimes++ + continue + } + e.meta.cancel() + e.meta = &meta + delay = retryConfig.ObserveDelay + } + + failedTimes = 0 + } + klog.Errorf("keep register service %s failed times:%d", key, failedTimes) } func (e *etcdRegistry) deregister(info *registry.Info) error { @@ -148,7 +247,11 @@ func (e *etcdRegistry) deregister(info *registry.Info) error { return err } _, err = e.etcdClient.Delete(ctx, serviceKey(info.ServiceName, addr)) - return err + if err != nil { + return err + } + e.stop <- struct{}{} + return nil } func (e *etcdRegistry) grantLease() (clientv3.LeaseID, error) { diff --git a/example/server/retry/main.go b/example/server/retry/main.go new file mode 100644 index 0000000..592919e --- /dev/null +++ b/example/server/retry/main.go @@ -0,0 +1,56 @@ +// Copyright 2021 CloudWeGo Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "log" + "time" + + "github.com/cloudwego/kitex-examples/hello/kitex_gen/api" + "github.com/cloudwego/kitex-examples/hello/kitex_gen/api/hello" + "github.com/cloudwego/kitex/pkg/rpcinfo" + "github.com/cloudwego/kitex/server" + etcd "github.com/kitex-contrib/registry-etcd" + "github.com/kitex-contrib/registry-etcd/retry" +) + +type HelloImpl struct{} + +func (h *HelloImpl) Echo(ctx context.Context, req *api.Request) (resp *api.Response, err error) { + resp = &api.Response{ + Message: req.Message, + } + return +} + +func main() { + retryConfig := retry.NewRetryConfig( + retry.WithMaxAttemptTimes(10), + retry.WithObserveDelay(20*time.Second), + retry.WithRetryDelay(5*time.Second), + ) + r, err := etcd.NewEtcdRegistryWithRetry([]string{"127.0.0.1:2379"}, retryConfig) + if err != nil { + log.Fatal(err) + } + server := hello.NewServer(new(HelloImpl), server.WithRegistry(r), server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ + ServiceName: "Hello", + })) + err = server.Run() + if err != nil { + log.Fatal(err) + } +} diff --git a/example/server/main.go b/example/server/simple/main.go similarity index 100% rename from example/server/main.go rename to example/server/simple/main.go diff --git a/retry/option.go b/retry/option.go new file mode 100644 index 0000000..df810cd --- /dev/null +++ b/retry/option.go @@ -0,0 +1,29 @@ +package retry + +import "time" + +// Option is the only struct that can be used to set Retry Config. +type Option struct { + F func(o *Config) +} + +// WithMaxAttemptTimes sets MaxAttemptTimes +func WithMaxAttemptTimes(maxAttemptTimes uint) Option { + return Option{F: func(o *Config) { + o.MaxAttemptTimes = maxAttemptTimes + }} +} + +// WithObserveDelay sets ObserveDelay +func WithObserveDelay(observeDelay time.Duration) Option { + return Option{F: func(o *Config) { + o.ObserveDelay = observeDelay + }} +} + +// WithRetryDelay sets RetryDelay +func WithRetryDelay(retryDelay time.Duration) Option { + return Option{F: func(o *Config) { + o.RetryDelay = retryDelay + }} +} diff --git a/retry/retry.go b/retry/retry.go new file mode 100644 index 0000000..8f35aa6 --- /dev/null +++ b/retry/retry.go @@ -0,0 +1,32 @@ +package retry + +import "time" + +type Config struct { + // The maximum number of call attempt times, including the initial call + MaxAttemptTimes uint + + // The delay time of observing etcd key + ObserveDelay time.Duration + + // The retry delay time + RetryDelay time.Duration +} + +func (o *Config) Apply(opts []Option) { + for _, op := range opts { + op.F(o) + } +} + +func NewRetryConfig(opts ...Option) *Config { + retryConfig := &Config{ + MaxAttemptTimes: 5, + ObserveDelay: 30 * time.Second, + RetryDelay: 10 * time.Second, + } + + retryConfig.Apply(opts) + + return retryConfig +} diff --git a/retry/retry_test.go b/retry/retry_test.go new file mode 100644 index 0000000..37d2ab6 --- /dev/null +++ b/retry/retry_test.go @@ -0,0 +1,26 @@ +package retry + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestRetryConfig(t *testing.T) { + retryConfig := NewRetryConfig() + assert.Equal(t, uint(5), retryConfig.MaxAttemptTimes) + assert.Equal(t, 30*time.Second, retryConfig.ObserveDelay) + assert.Equal(t, 10*time.Second, retryConfig.RetryDelay) +} + +func TestRetryCustomConfig(t *testing.T) { + retryConfig := NewRetryConfig( + WithMaxAttemptTimes(10), + WithObserveDelay(20*time.Second), + WithRetryDelay(5*time.Second), + ) + assert.Equal(t, uint(10), retryConfig.MaxAttemptTimes) + assert.Equal(t, 20*time.Second, retryConfig.ObserveDelay) + assert.Equal(t, 5*time.Second, retryConfig.RetryDelay) +} From 289720a81dc20a2ef91b4a795124e70a0f179dea Mon Sep 17 00:00:00 2001 From: chaoranz758 <2715584135@qq.com> Date: Tue, 12 Sep 2023 14:05:30 +0800 Subject: [PATCH 2/2] add description --- retry/option.go | 14 ++++++++++++++ retry/retry.go | 14 ++++++++++++++ retry/retry_test.go | 14 ++++++++++++++ 3 files changed, 42 insertions(+) diff --git a/retry/option.go b/retry/option.go index df810cd..bf00280 100644 --- a/retry/option.go +++ b/retry/option.go @@ -1,3 +1,17 @@ +// Copyright 2021 CloudWeGo Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package retry import "time" diff --git a/retry/retry.go b/retry/retry.go index 8f35aa6..9bff8d2 100644 --- a/retry/retry.go +++ b/retry/retry.go @@ -1,3 +1,17 @@ +// Copyright 2021 CloudWeGo Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package retry import "time" diff --git a/retry/retry_test.go b/retry/retry_test.go index 37d2ab6..f1ef156 100644 --- a/retry/retry_test.go +++ b/retry/retry_test.go @@ -1,3 +1,17 @@ +// Copyright 2021 CloudWeGo Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package retry import (