Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: support to query whether pd has loaded region #8749

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 40 additions & 15 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,21 @@ func NewRegionStorageWithLevelDBBackend(

// TODO: support other KV storage backends like BadgerDB in the future.

type regionSource int

const (
unloaded regionSource = iota
fromEtcd
fromLeveldb
)

type coreStorage struct {
Storage
regionStorage endpoint.RegionStorage

useRegionStorage int32
regionLoaded bool
mu syncutil.Mutex
useRegionStorage atomic.Bool
regionLoaded regionSource
mu syncutil.RWMutex
}

// NewCoreStorage creates a new core storage with the given default and region storage.
Expand All @@ -90,6 +98,7 @@ func NewCoreStorage(defaultStorage Storage, regionStorage endpoint.RegionStorage
return &coreStorage{
Storage: defaultStorage,
regionStorage: regionStorage,
regionLoaded: unloaded,
}
}

Expand All @@ -116,12 +125,12 @@ func TrySwitchRegionStorage(s Storage, useLocalRegionStorage bool) endpoint.Regi
if useLocalRegionStorage {
// Switch the region storage to regionStorage, all region info will be read/saved by the internal
// regionStorage, and in most cases it's LevelDB-backend.
atomic.StoreInt32(&ps.useRegionStorage, 1)
ps.useRegionStorage.Store(true)
return ps.regionStorage
}
// Switch the region storage to defaultStorage, all region info will be read/saved by the internal
// defaultStorage, and in most cases it's etcd-backend.
atomic.StoreInt32(&ps.useRegionStorage, 0)
ps.useRegionStorage.Store(false)
return ps.Storage
}

Expand All @@ -133,48 +142,53 @@ func TryLoadRegionsOnce(ctx context.Context, s Storage, f func(region *core.Regi
return s.LoadRegions(ctx, f)
}

if atomic.LoadInt32(&ps.useRegionStorage) == 0 {
return ps.Storage.LoadRegions(ctx, f)
}

ps.mu.Lock()
defer ps.mu.Unlock()
if !ps.regionLoaded {

if !ps.useRegionStorage.Load() {
err := ps.Storage.LoadRegions(ctx, f)
if err == nil {
ps.regionLoaded = fromEtcd
}
return err
}

if ps.regionLoaded == unloaded {
if err := ps.regionStorage.LoadRegions(ctx, f); err != nil {
return err
}
ps.regionLoaded = true
ps.regionLoaded = fromLeveldb
}
return nil
}

// LoadRegion loads one region from storage.
func (ps *coreStorage) LoadRegion(regionID uint64, region *metapb.Region) (ok bool, err error) {
if atomic.LoadInt32(&ps.useRegionStorage) > 0 {
if ps.useRegionStorage.Load() {
return ps.regionStorage.LoadRegion(regionID, region)
}
return ps.Storage.LoadRegion(regionID, region)
}

// LoadRegions loads all regions from storage to RegionsInfo.
func (ps *coreStorage) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error {
if atomic.LoadInt32(&ps.useRegionStorage) > 0 {
if ps.useRegionStorage.Load() {
return ps.regionStorage.LoadRegions(ctx, f)
}
return ps.Storage.LoadRegions(ctx, f)
}

// SaveRegion saves one region to storage.
func (ps *coreStorage) SaveRegion(region *metapb.Region) error {
if atomic.LoadInt32(&ps.useRegionStorage) > 0 {
if ps.useRegionStorage.Load() {
return ps.regionStorage.SaveRegion(region)
}
return ps.Storage.SaveRegion(region)
}

// DeleteRegion deletes one region from storage.
func (ps *coreStorage) DeleteRegion(region *metapb.Region) error {
if atomic.LoadInt32(&ps.useRegionStorage) > 0 {
if ps.useRegionStorage.Load() {
return ps.regionStorage.DeleteRegion(region)
}
return ps.Storage.DeleteRegion(region)
Expand All @@ -197,3 +211,14 @@ func (ps *coreStorage) Close() error {
}
return nil
}

// AreRegionsLoaded returns whether the regions are loaded.
func AreRegionsLoaded(s Storage) bool {
ps := s.(*coreStorage)
ps.mu.RLock()
defer ps.mu.RUnlock()
if ps.useRegionStorage.Load() {
return ps.regionLoaded == fromLeveldb
}
return ps.regionLoaded == fromEtcd
}
1 change: 1 addition & 0 deletions pkg/versioninfo/versioninfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Status struct {
Version string `json:"version"`
GitHash string `json:"git_hash"`
StartTimestamp int64 `json:"start_timestamp"`
RegionLoaded bool `json:"region_loaded"`
}

const (
Expand Down
2 changes: 2 additions & 0 deletions server/api/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package api
import (
"net/http"

"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/versioninfo"
"github.com/tikv/pd/server"
"github.com/unrolled/render"
Expand Down Expand Up @@ -44,6 +45,7 @@ func (h *statusHandler) GetPDStatus(w http.ResponseWriter, _ *http.Request) {
GitHash: versioninfo.PDGitHash,
Version: versioninfo.PDReleaseVersion,
StartTimestamp: h.svr.StartTimestamp(),
RegionLoaded: storage.AreRegionsLoaded(h.svr.GetStorage()),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a more general field here like "Ready" which we can extend in the future if we find more conditions under which PD can't serve?

Does PDStatus API exposed by all components of disaggregated PD?

Any chance we introduce a new API /ready which we can use across all TiDB components in the future?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Tema, thanks for your advice
Q1: A general field will be helpful to expand different scenarios, but we found that currently there is only RegionLoaded is needed. If there are other situations in the future, we tend to add new fields with more accurate meanings.
Q2: Standalone PD or Scheduling Service will expose this API(especially the RegionLoaded field), others such as TSO Service don't.
Q3: IMO, it's good we can use the same interface to query service status, but the fields may be different. The benefits of unification may not be obvious, but it is indeed a better way.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally you want to decouple data plane (pd) from control plane (tidb-operator, tiUp), so that if you make changes to one, you don't have to upgrade other and it can start leveraging any improvements right away. That is why usually the common convention is to use /ready across all components. This way the control-plane can have a common logic for rolling restart of all components, without need to necessarily overcustomize each of them. This is not just about PD microservices but many other components of TiDB cluster (tidb, dm, cdc, tikv). Some of them still require additional customization, but it would be nice to keep it to the minimum. Did you talk to tidb-operator team? Don't they want to have a /ready across all components. I've herd there is a work on tidb-operator v2 or something like that. This might be a good opportunity to standartize protocol between control plane and data place as once it is out there, it is hard to change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Tema, for TiDB Operator, it's better to use a general API (like /ready) to check the status in most cases. If this API can't meet some other special cases in the future, then we can consider to add/use some other APIs.

For this RegionLoaded case, in fact, TiDB Operator just wants to know when it can restart the next instance safely.

}

h.rd.JSON(w, http.StatusOK, version)
Expand Down
25 changes: 19 additions & 6 deletions server/api/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ import (
"github.com/tikv/pd/pkg/versioninfo"
)

func checkStatusResponse(re *require.Assertions, body []byte) {
got := versioninfo.Status{}
re.NoError(json.Unmarshal(body, &got))
re.Equal(versioninfo.PDBuildTS, got.BuildTS)
re.Equal(versioninfo.PDGitHash, got.GitHash)
re.Equal(versioninfo.PDReleaseVersion, got.Version)
type oldStatus struct {
BuildTS string `json:"build_ts"`
Version string `json:"version"`
GitHash string `json:"git_hash"`
StartTimestamp int64 `json:"start_timestamp"`
}

func TestStatus(t *testing.T) {
Expand All @@ -46,3 +45,17 @@ func TestStatus(t *testing.T) {
resp.Body.Close()
}
}

func checkStatusResponse(re *require.Assertions, body []byte) {
got := versioninfo.Status{}
re.NoError(json.Unmarshal(body, &got))
re.Equal(versioninfo.PDBuildTS, got.BuildTS)
re.Equal(versioninfo.PDGitHash, got.GitHash)
re.Equal(versioninfo.PDReleaseVersion, got.Version)
re.False(got.RegionLoaded)
gotWithOldStatus := oldStatus{}
re.NoError(json.Unmarshal(body, &gotWithOldStatus))
re.Equal(versioninfo.PDBuildTS, gotWithOldStatus.BuildTS)
re.Equal(versioninfo.PDGitHash, gotWithOldStatus.GitHash)
re.Equal(versioninfo.PDReleaseVersion, gotWithOldStatus.Version)
}
Loading