Skip to content

Commit

Permalink
*: independent the service discovery package (#8825)
Browse files Browse the repository at this point in the history
ref #8690

Make the service discovery module of the client a separate package.

Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato authored Nov 25, 2024
1 parent 20c4157 commit ec77762
Show file tree
Hide file tree
Showing 22 changed files with 396 additions and 317 deletions.
57 changes: 10 additions & 47 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,15 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/caller"
"github.com/tikv/pd/client/clients/metastorage"
"github.com/tikv/pd/client/constants"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/metrics"
"github.com/tikv/pd/client/opt"
sd "github.com/tikv/pd/client/servicediscovery"
"github.com/tikv/pd/client/utils/tlsutil"
"go.uber.org/zap"
)

const (
// defaultKeyspaceID is the default key space id.
// Valid keyspace id range is [0, 0xFFFFFF](uint24max, or 16777215)
// ​0 is reserved for default keyspace with the name "DEFAULT", It's initialized
// when PD bootstrap and reserved for users who haven't been assigned keyspace.
defaultKeyspaceID = uint32(0)
maxKeyspaceID = uint32(0xFFFFFF)
// nullKeyspaceID is used for api v1 or legacy path where is keyspace agnostic.
nullKeyspaceID = uint32(0xFFFFFFFF)
// defaultKeySpaceGroupID is the default key space group id.
// We also reserved 0 for the keyspace group for the same purpose.
defaultKeySpaceGroupID = uint32(0)
defaultKeyspaceName = "DEFAULT"
)

// Region contains information of a region's meta and its peers.
type Region struct {
Meta *metapb.Region
Expand Down Expand Up @@ -175,7 +162,7 @@ type Client interface {
// syncing leader from server.
GetLeaderURL() string
// GetServiceDiscovery returns ServiceDiscovery
GetServiceDiscovery() ServiceDiscovery
GetServiceDiscovery() sd.ServiceDiscovery

// UpdateOption updates the client option.
UpdateOption(option opt.DynamicOption, value any) error
Expand All @@ -184,19 +171,6 @@ type Client interface {
Close()
}

var (
// errUnmatchedClusterID is returned when found a PD with a different cluster ID.
errUnmatchedClusterID = errors.New("[pd] unmatched cluster id")
// errFailInitClusterID is returned when failed to load clusterID from all supplied PD addresses.
errFailInitClusterID = errors.New("[pd] failed to get cluster id")
// errClosing is returned when request is canceled when client is closing.
errClosing = errors.New("[pd] closing")
// errTSOLength is returned when the number of response timestamps is inconsistent with request.
errTSOLength = errors.New("[pd] tso length in rpc response is incorrect")
// errInvalidRespHeader is returned when the response doesn't contain service mode info unexpectedly.
errNoServiceModeReturned = errors.New("[pd] no service mode returned")
)

var _ Client = (*client)(nil)

// serviceModeKeeper is for service mode switching.
Expand All @@ -206,7 +180,7 @@ type serviceModeKeeper struct {
sync.RWMutex
serviceMode pdpb.ServiceMode
tsoClient *tsoClient
tsoSvcDiscovery ServiceDiscovery
tsoSvcDiscovery sd.ServiceDiscovery
}

func (k *serviceModeKeeper) close() {
Expand Down Expand Up @@ -289,7 +263,7 @@ func NewClientWithContext(
security SecurityOption, opts ...opt.ClientOption,
) (Client, error) {
return createClientWithKeyspace(ctx, callerComponent,
nullKeyspaceID, svrAddrs, security, opts...)
constants.NullKeyspaceID, svrAddrs, security, opts...)
}

// NewClientWithKeyspace creates a client with context and the specified keyspace id.
Expand All @@ -300,9 +274,9 @@ func NewClientWithKeyspace(
keyspaceID uint32, svrAddrs []string,
security SecurityOption, opts ...opt.ClientOption,
) (Client, error) {
if keyspaceID < defaultKeyspaceID || keyspaceID > maxKeyspaceID {
if keyspaceID < constants.DefaultKeyspaceID || keyspaceID > constants.MaxKeyspaceID {
return nil, errors.Errorf("invalid keyspace id %d. It must be in the range of [%d, %d]",
keyspaceID, defaultKeyspaceID, maxKeyspaceID)
keyspaceID, constants.DefaultKeyspaceID, constants.MaxKeyspaceID)
}
return createClientWithKeyspace(ctx, callerComponent, keyspaceID,
svrAddrs, security, opts...)
Expand Down Expand Up @@ -392,7 +366,7 @@ type apiContextV2 struct {
// NewAPIContextV2 creates a API context with the specified keyspace name for V2.
func NewAPIContextV2(keyspaceName string) APIContext {
if len(keyspaceName) == 0 {
keyspaceName = defaultKeyspaceName
keyspaceName = constants.DefaultKeyspaceName
}
return &apiContextV2{keyspaceName: keyspaceName}
}
Expand Down Expand Up @@ -452,7 +426,7 @@ func newClientWithKeyspaceName(
inner: &innerClient{
// Create a PD service discovery with null keyspace id, then query the real id with the keyspace name,
// finally update the keyspace id to the PD service discovery for the following interactions.
keyspaceID: nullKeyspaceID,
keyspaceID: constants.NullKeyspaceID,
updateTokenConnectionCh: make(chan struct{}, 1),
ctx: clientCtx,
cancel: clientCancel,
Expand Down Expand Up @@ -511,7 +485,7 @@ func (c *client) GetLeaderURL() string {
}

// GetServiceDiscovery returns the client-side service discovery object
func (c *client) GetServiceDiscovery() ServiceDiscovery {
func (c *client) GetServiceDiscovery() sd.ServiceDiscovery {
return c.inner.pdSvcDiscovery
}

Expand Down Expand Up @@ -1277,17 +1251,6 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint
return resp, nil
}

const (
httpSchemePrefix = "http://"
httpsSchemePrefix = "https://"
)

func trimHTTPPrefix(str string) string {
str = strings.TrimPrefix(str, httpSchemePrefix)
str = strings.TrimPrefix(str, httpsSchemePrefix)
return str
}

// LoadGlobalConfig implements the RPCClient interface.
func (c *client) LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error) {
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
Expand Down
51 changes: 0 additions & 51 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/client/caller"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/utils/testutil"
"github.com/tikv/pd/client/utils/tsoutil"
"go.uber.org/goleak"
"google.golang.org/grpc"
)

func TestMain(m *testing.M) {
Expand All @@ -43,36 +41,6 @@ func TestTSLessEqual(t *testing.T) {
re.True(tsoutil.TSLessEqual(9, 6, 9, 8))
}

func TestUpdateURLs(t *testing.T) {
re := require.New(t)
members := []*pdpb.Member{
{Name: "pd4", ClientUrls: []string{"tmp://pd4"}},
{Name: "pd1", ClientUrls: []string{"tmp://pd1"}},
{Name: "pd3", ClientUrls: []string{"tmp://pd3"}},
{Name: "pd2", ClientUrls: []string{"tmp://pd2"}},
}
getURLs := func(ms []*pdpb.Member) (urls []string) {
for _, m := range ms {
urls = append(urls, m.GetClientUrls()[0])
}
return
}
cli := &pdServiceDiscovery{option: opt.NewOption()}
cli.urls.Store([]string{})
cli.updateURLs(members[1:])
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs())
cli.updateURLs(members[1:])
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs())
cli.updateURLs(members)
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2], members[0]}), cli.GetServiceURLs())
cli.updateURLs(members[1:])
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs())
cli.updateURLs(members[2:])
re.Equal(getURLs([]*pdpb.Member{members[3], members[2]}), cli.GetServiceURLs())
cli.updateURLs(members[3:])
re.Equal(getURLs([]*pdpb.Member{members[3]}), cli.GetServiceURLs())
}

const testClientURL = "tmp://test.url:5255"

func TestClientCtx(t *testing.T) {
Expand All @@ -95,25 +63,6 @@ func TestClientWithRetry(t *testing.T) {
re.Less(time.Since(start), time.Second*10)
}

func TestGRPCDialOption(t *testing.T) {
re := require.New(t)
start := time.Now()
ctx, cancel := context.WithTimeout(context.TODO(), 500*time.Millisecond)
defer cancel()
cli := &pdServiceDiscovery{
checkMembershipCh: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
tlsCfg: nil,
option: opt.NewOption(),
}
cli.urls.Store([]string{testClientURL})
cli.option.GRPCDialOptions = []grpc.DialOption{grpc.WithBlock()}
err := cli.updateMember()
re.Error(err)
re.Greater(time.Since(start), 500*time.Millisecond)
}

func TestTsoRequestWait(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down
32 changes: 32 additions & 0 deletions client/constants/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package constants

const (
// DefaultKeyspaceID is the default keyspace ID.
// Valid keyspace id range is [0, 0xFFFFFF](uint24max, or 16777215)
// ​0 is reserved for default keyspace with the name "DEFAULT", It's initialized
// when PD bootstrap and reserved for users who haven't been assigned keyspace.
DefaultKeyspaceID = uint32(0)
// MaxKeyspaceID is the maximum keyspace ID.
MaxKeyspaceID = uint32(0xFFFFFF)
// NullKeyspaceID is used for API v1 or legacy path where is keyspace agnostic.
NullKeyspaceID = uint32(0xFFFFFFFF)
// DefaultKeyspaceGroupID is the default key space group id.
// We also reserved 0 for the keyspace group for the same purpose.
DefaultKeyspaceGroupID = uint32(0)
// DefaultKeyspaceName is the default keyspace name.
DefaultKeyspaceName = "DEFAULT"
)
14 changes: 14 additions & 0 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,20 @@ const (
NotPrimaryErr = "not primary"
)

// internal errors
var (
// ErrUnmatchedClusterID is returned when found a PD with a different cluster ID.
ErrUnmatchedClusterID = errors.New("[pd] unmatched cluster id")
// ErrFailInitClusterID is returned when failed to load clusterID from all supplied PD addresses.
ErrFailInitClusterID = errors.New("[pd] failed to get cluster id")
// ErrClosing is returned when request is canceled when client is closing.
ErrClosing = errors.New("[pd] closing")
// ErrTSOLength is returned when the number of response timestamps is inconsistent with request.
ErrTSOLength = errors.New("[pd] tso length in rpc response is incorrect")
// ErrNoServiceModeReturned is returned when the response doesn't contain service mode info unexpectedly.
ErrNoServiceModeReturned = errors.New("[pd] no service mode returned")
)

// client errors
var (
ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient"))
Expand Down
6 changes: 6 additions & 0 deletions client/errs/errs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/codes"
)

// IsLeaderChange will determine whether there is a leader/primary change.
Expand All @@ -38,6 +39,11 @@ func IsLeaderChange(err error) bool {
strings.Contains(errMsg, NotPrimaryErr)
}

// IsNetworkError returns true if the error is a network error.
func IsNetworkError(code codes.Code) bool {
return code == codes.Unavailable || code == codes.DeadlineExceeded
}

// ZapError is used to make the log output easier.
func ZapError(err error, causeError ...error) zap.Field {
if err == nil {
Expand Down
12 changes: 6 additions & 6 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/retry"
sd "github.com/tikv/pd/client/servicediscovery"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -56,7 +56,7 @@ type clientInner struct {
ctx context.Context
cancel context.CancelFunc

sd pd.ServiceDiscovery
sd sd.ServiceDiscovery

// source is used to mark the source of the client creation,
// it will also be used in the caller ID of the inner client.
Expand All @@ -74,7 +74,7 @@ func newClientInner(ctx context.Context, cancel context.CancelFunc, source strin
return &clientInner{ctx: ctx, cancel: cancel, source: source}
}

func (ci *clientInner) init(sd pd.ServiceDiscovery) {
func (ci *clientInner) init(sd sd.ServiceDiscovery) {
// Init the HTTP client if it's not configured.
if ci.cli == nil {
ci.cli = &http.Client{Timeout: defaultTimeout}
Expand Down Expand Up @@ -305,7 +305,7 @@ func WithMetrics(
// NewClientWithServiceDiscovery creates a PD HTTP client with the given PD service discovery.
func NewClientWithServiceDiscovery(
source string,
sd pd.ServiceDiscovery,
sd sd.ServiceDiscovery,
opts ...ClientOption,
) Client {
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -330,7 +330,7 @@ func NewClient(
for _, opt := range opts {
opt(c)
}
sd := pd.NewDefaultPDServiceDiscovery(ctx, cancel, pdAddrs, c.inner.tlsConf)
sd := sd.NewDefaultPDServiceDiscovery(ctx, cancel, pdAddrs, c.inner.tlsConf)
if err := sd.Init(); err != nil {
log.Error("[pd] init service discovery failed",
zap.String("source", source), zap.Strings("pd-addrs", pdAddrs), zap.Error(err))
Expand Down Expand Up @@ -430,7 +430,7 @@ func newClientWithMockServiceDiscovery(
for _, opt := range opts {
opt(c)
}
sd := pd.NewMockPDServiceDiscovery(pdAddrs, c.inner.tlsConf)
sd := sd.NewMockPDServiceDiscovery(pdAddrs, c.inner.tlsConf)
if err := sd.Init(); err != nil {
log.Error("[pd] init mock service discovery failed",
zap.String("source", source), zap.Strings("pd-addrs", pdAddrs), zap.Error(err))
Expand Down
Loading

0 comments on commit ec77762

Please sign in to comment.