From c20a4f910cc87958cb91a063aba59f5025ffcd1d Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 28 Oct 2024 16:40:44 +0800 Subject: [PATCH 1/7] api: support to query whether pd has loaded region Signed-off-by: lhy1024 --- pkg/storage/storage.go | 33 +++++++++++++++++++++++++-------- pkg/versioninfo/versioninfo.go | 1 + server/api/status.go | 11 +++++++---- 3 files changed, 33 insertions(+), 12 deletions(-) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index dce2f1712b8..0bb5f7c76ba 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -77,9 +77,10 @@ type coreStorage struct { Storage regionStorage endpoint.RegionStorage - useRegionStorage int32 - regionLoaded bool - mu syncutil.Mutex + useRegionStorage int32 + regionLoadedFromDefault bool + regionLoadedFromStorage bool + mu syncutil.RWMutex } // NewCoreStorage creates a new core storage with the given default and region storage. @@ -133,17 +134,22 @@ func TryLoadRegionsOnce(ctx context.Context, s Storage, f func(region *core.Regi return s.LoadRegions(ctx, f) } + ps.mu.Lock() + defer ps.mu.Unlock() + if atomic.LoadInt32(&ps.useRegionStorage) == 0 { - return ps.Storage.LoadRegions(ctx, f) + err := ps.Storage.LoadRegions(ctx, f) + if err == nil { + ps.regionLoadedFromDefault = true + } + return err } - ps.mu.Lock() - defer ps.mu.Unlock() - if !ps.regionLoaded { + if !ps.regionLoadedFromStorage { if err := ps.regionStorage.LoadRegions(ctx, f); err != nil { return err } - ps.regionLoaded = true + ps.regionLoadedFromStorage = true } return nil } @@ -197,3 +203,14 @@ func (ps *coreStorage) Close() error { } return nil } + +// AreRegionsLoaded returns whether the regions are loaded. +func AreRegionsLoaded(s Storage) bool { + ps := s.(*coreStorage) + ps.mu.RLock() + defer ps.mu.RUnlock() + if atomic.LoadInt32(&ps.useRegionStorage) == 0 { + return ps.regionLoadedFromDefault + } + return ps.regionLoadedFromStorage +} diff --git a/pkg/versioninfo/versioninfo.go b/pkg/versioninfo/versioninfo.go index d6f4738d786..3479a894631 100644 --- a/pkg/versioninfo/versioninfo.go +++ b/pkg/versioninfo/versioninfo.go @@ -31,6 +31,7 @@ type Status struct { Version string `json:"version"` GitHash string `json:"git_hash"` StartTimestamp int64 `json:"start_timestamp"` + AreRegionsLoaded bool `json:"are_regions_loaded"` } const ( diff --git a/server/api/status.go b/server/api/status.go index e25f5da5287..19d1a3a0684 100644 --- a/server/api/status.go +++ b/server/api/status.go @@ -17,6 +17,7 @@ package api import ( "net/http" + "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/versioninfo" "github.com/tikv/pd/server" "github.com/unrolled/render" @@ -39,11 +40,13 @@ func newStatusHandler(svr *server.Server, rd *render.Render) *statusHandler { // @Success 200 {object} versioninfo.Status // @Router /status [get] func (h *statusHandler) GetPDStatus(w http.ResponseWriter, _ *http.Request) { + areRegionsLoaded := storage.AreRegionsLoaded(h.svr.GetStorage()) version := versioninfo.Status{ - BuildTS: versioninfo.PDBuildTS, - GitHash: versioninfo.PDGitHash, - Version: versioninfo.PDReleaseVersion, - StartTimestamp: h.svr.StartTimestamp(), + BuildTS: versioninfo.PDBuildTS, + GitHash: versioninfo.PDGitHash, + Version: versioninfo.PDReleaseVersion, + StartTimestamp: h.svr.StartTimestamp(), + AreRegionsLoaded: areRegionsLoaded, } h.rd.JSON(w, http.StatusOK, version) From db574ec78b19b47c07afd29fc9a2d05042a9617e Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 30 Oct 2024 12:23:40 +0800 Subject: [PATCH 2/7] fix lint Signed-off-by: lhy1024 --- pkg/versioninfo/versioninfo.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/versioninfo/versioninfo.go b/pkg/versioninfo/versioninfo.go index 3479a894631..2e61a6f93bc 100644 --- a/pkg/versioninfo/versioninfo.go +++ b/pkg/versioninfo/versioninfo.go @@ -27,11 +27,11 @@ import ( // Status is the status of PD server. // NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. type Status struct { - BuildTS string `json:"build_ts"` - Version string `json:"version"` - GitHash string `json:"git_hash"` - StartTimestamp int64 `json:"start_timestamp"` - AreRegionsLoaded bool `json:"are_regions_loaded"` + BuildTS string `json:"build_ts"` + Version string `json:"version"` + GitHash string `json:"git_hash"` + StartTimestamp int64 `json:"start_timestamp"` + AreRegionsLoaded bool `json:"are_regions_loaded"` } const ( From 64f12cc17e5ea7280a56979c0c3f44a56f7e64d2 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 1 Nov 2024 11:10:02 +0800 Subject: [PATCH 3/7] address comments Signed-off-by: lhy1024 --- pkg/versioninfo/versioninfo.go | 2 +- server/api/status.go | 11 +++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/versioninfo/versioninfo.go b/pkg/versioninfo/versioninfo.go index 3479a894631..af93338fddd 100644 --- a/pkg/versioninfo/versioninfo.go +++ b/pkg/versioninfo/versioninfo.go @@ -31,7 +31,7 @@ type Status struct { Version string `json:"version"` GitHash string `json:"git_hash"` StartTimestamp int64 `json:"start_timestamp"` - AreRegionsLoaded bool `json:"are_regions_loaded"` + Loaded bool `json:"loaded"` } const ( diff --git a/server/api/status.go b/server/api/status.go index 19d1a3a0684..6548ba0f161 100644 --- a/server/api/status.go +++ b/server/api/status.go @@ -40,13 +40,12 @@ func newStatusHandler(svr *server.Server, rd *render.Render) *statusHandler { // @Success 200 {object} versioninfo.Status // @Router /status [get] func (h *statusHandler) GetPDStatus(w http.ResponseWriter, _ *http.Request) { - areRegionsLoaded := storage.AreRegionsLoaded(h.svr.GetStorage()) version := versioninfo.Status{ - BuildTS: versioninfo.PDBuildTS, - GitHash: versioninfo.PDGitHash, - Version: versioninfo.PDReleaseVersion, - StartTimestamp: h.svr.StartTimestamp(), - AreRegionsLoaded: areRegionsLoaded, + BuildTS: versioninfo.PDBuildTS, + GitHash: versioninfo.PDGitHash, + Version: versioninfo.PDReleaseVersion, + StartTimestamp: h.svr.StartTimestamp(), + Loaded: storage.AreRegionsLoaded(h.svr.GetStorage()), } h.rd.JSON(w, http.StatusOK, version) From 9c9e9314ee0fab6dc71d890e645f36aead1203f1 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 11 Nov 2024 19:28:29 +0800 Subject: [PATCH 4/7] test compatibility Signed-off-by: lhy1024 --- server/api/status_test.go | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/server/api/status_test.go b/server/api/status_test.go index 5444fda77b4..c3db51b71df 100644 --- a/server/api/status_test.go +++ b/server/api/status_test.go @@ -23,12 +23,11 @@ import ( "github.com/tikv/pd/pkg/versioninfo" ) -func checkStatusResponse(re *require.Assertions, body []byte) { - got := versioninfo.Status{} - re.NoError(json.Unmarshal(body, &got)) - re.Equal(versioninfo.PDBuildTS, got.BuildTS) - re.Equal(versioninfo.PDGitHash, got.GitHash) - re.Equal(versioninfo.PDReleaseVersion, got.Version) +type oldStatus struct { + BuildTS string `json:"build_ts"` + Version string `json:"version"` + GitHash string `json:"git_hash"` + StartTimestamp int64 `json:"start_timestamp"` } func TestStatus(t *testing.T) { @@ -46,3 +45,17 @@ func TestStatus(t *testing.T) { resp.Body.Close() } } + +func checkStatusResponse(re *require.Assertions, body []byte) { + got := versioninfo.Status{} + re.NoError(json.Unmarshal(body, &got)) + re.Equal(versioninfo.PDBuildTS, got.BuildTS) + re.Equal(versioninfo.PDGitHash, got.GitHash) + re.Equal(versioninfo.PDReleaseVersion, got.Version) + re.Equal(false, got.Loaded) + gotWithOldStatus := oldStatus{} + re.NoError(json.Unmarshal(body, &gotWithOldStatus)) + re.Equal(versioninfo.PDBuildTS, gotWithOldStatus.BuildTS) + re.Equal(versioninfo.PDGitHash, gotWithOldStatus.GitHash) + re.Equal(versioninfo.PDReleaseVersion, gotWithOldStatus.Version) +} From bc7428b326a059a4358d94d42418e1e9cf1ae7b3 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 12 Nov 2024 11:18:46 +0800 Subject: [PATCH 5/7] address comments Signed-off-by: lhy1024 --- pkg/versioninfo/versioninfo.go | 2 +- server/api/status.go | 2 +- server/api/status_test.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/versioninfo/versioninfo.go b/pkg/versioninfo/versioninfo.go index af93338fddd..5fca7e1698b 100644 --- a/pkg/versioninfo/versioninfo.go +++ b/pkg/versioninfo/versioninfo.go @@ -31,7 +31,7 @@ type Status struct { Version string `json:"version"` GitHash string `json:"git_hash"` StartTimestamp int64 `json:"start_timestamp"` - Loaded bool `json:"loaded"` + RegionLoaded bool `json:"region_loaded"` } const ( diff --git a/server/api/status.go b/server/api/status.go index 6548ba0f161..5b9e877aa1a 100644 --- a/server/api/status.go +++ b/server/api/status.go @@ -45,7 +45,7 @@ func (h *statusHandler) GetPDStatus(w http.ResponseWriter, _ *http.Request) { GitHash: versioninfo.PDGitHash, Version: versioninfo.PDReleaseVersion, StartTimestamp: h.svr.StartTimestamp(), - Loaded: storage.AreRegionsLoaded(h.svr.GetStorage()), + RegionLoaded: storage.AreRegionsLoaded(h.svr.GetStorage()), } h.rd.JSON(w, http.StatusOK, version) diff --git a/server/api/status_test.go b/server/api/status_test.go index c3db51b71df..221bce6160b 100644 --- a/server/api/status_test.go +++ b/server/api/status_test.go @@ -52,7 +52,7 @@ func checkStatusResponse(re *require.Assertions, body []byte) { re.Equal(versioninfo.PDBuildTS, got.BuildTS) re.Equal(versioninfo.PDGitHash, got.GitHash) re.Equal(versioninfo.PDReleaseVersion, got.Version) - re.Equal(false, got.Loaded) + re.False(got.RegionLoaded) gotWithOldStatus := oldStatus{} re.NoError(json.Unmarshal(body, &gotWithOldStatus)) re.Equal(versioninfo.PDBuildTS, gotWithOldStatus.BuildTS) From df4ac32e4757a37263c43aeb4440ed4c200fd26e Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 18 Nov 2024 12:00:50 +0800 Subject: [PATCH 6/7] refactor Signed-off-by: lhy1024 --- pkg/storage/storage.go | 46 ++++++++++++++++++++++++++---------------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 0bb5f7c76ba..9679732af17 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -73,14 +73,21 @@ func NewRegionStorageWithLevelDBBackend( // TODO: support other KV storage backends like BadgerDB in the future. +type regionSource int + +const ( + unloaded regionSource = iota + fromEtcd + fromLeveldb +) + type coreStorage struct { Storage regionStorage endpoint.RegionStorage - useRegionStorage int32 - regionLoadedFromDefault bool - regionLoadedFromStorage bool - mu syncutil.RWMutex + useRegionStorageFlag int32 + regionLoaded regionSource + mu syncutil.RWMutex } // NewCoreStorage creates a new core storage with the given default and region storage. @@ -91,6 +98,7 @@ func NewCoreStorage(defaultStorage Storage, regionStorage endpoint.RegionStorage return &coreStorage{ Storage: defaultStorage, regionStorage: regionStorage, + regionLoaded: unloaded, } } @@ -117,12 +125,12 @@ func TrySwitchRegionStorage(s Storage, useLocalRegionStorage bool) endpoint.Regi if useLocalRegionStorage { // Switch the region storage to regionStorage, all region info will be read/saved by the internal // regionStorage, and in most cases it's LevelDB-backend. - atomic.StoreInt32(&ps.useRegionStorage, 1) + atomic.StoreInt32(&ps.useRegionStorageFlag, 1) return ps.regionStorage } // Switch the region storage to defaultStorage, all region info will be read/saved by the internal // defaultStorage, and in most cases it's etcd-backend. - atomic.StoreInt32(&ps.useRegionStorage, 0) + atomic.StoreInt32(&ps.useRegionStorageFlag, 0) return ps.Storage } @@ -137,26 +145,26 @@ func TryLoadRegionsOnce(ctx context.Context, s Storage, f func(region *core.Regi ps.mu.Lock() defer ps.mu.Unlock() - if atomic.LoadInt32(&ps.useRegionStorage) == 0 { + if !ps.useRegionStorage() { err := ps.Storage.LoadRegions(ctx, f) if err == nil { - ps.regionLoadedFromDefault = true + ps.regionLoaded = fromEtcd } return err } - if !ps.regionLoadedFromStorage { + if ps.regionLoaded == unloaded { if err := ps.regionStorage.LoadRegions(ctx, f); err != nil { return err } - ps.regionLoadedFromStorage = true + ps.regionLoaded = fromLeveldb } return nil } // LoadRegion loads one region from storage. func (ps *coreStorage) LoadRegion(regionID uint64, region *metapb.Region) (ok bool, err error) { - if atomic.LoadInt32(&ps.useRegionStorage) > 0 { + if ps.useRegionStorage() { return ps.regionStorage.LoadRegion(regionID, region) } return ps.Storage.LoadRegion(regionID, region) @@ -164,7 +172,7 @@ func (ps *coreStorage) LoadRegion(regionID uint64, region *metapb.Region) (ok bo // LoadRegions loads all regions from storage to RegionsInfo. func (ps *coreStorage) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error { - if atomic.LoadInt32(&ps.useRegionStorage) > 0 { + if ps.useRegionStorage() { return ps.regionStorage.LoadRegions(ctx, f) } return ps.Storage.LoadRegions(ctx, f) @@ -172,7 +180,7 @@ func (ps *coreStorage) LoadRegions(ctx context.Context, f func(region *core.Regi // SaveRegion saves one region to storage. func (ps *coreStorage) SaveRegion(region *metapb.Region) error { - if atomic.LoadInt32(&ps.useRegionStorage) > 0 { + if ps.useRegionStorage() { return ps.regionStorage.SaveRegion(region) } return ps.Storage.SaveRegion(region) @@ -180,7 +188,7 @@ func (ps *coreStorage) SaveRegion(region *metapb.Region) error { // DeleteRegion deletes one region from storage. func (ps *coreStorage) DeleteRegion(region *metapb.Region) error { - if atomic.LoadInt32(&ps.useRegionStorage) > 0 { + if ps.useRegionStorage() { return ps.regionStorage.DeleteRegion(region) } return ps.Storage.DeleteRegion(region) @@ -204,13 +212,17 @@ func (ps *coreStorage) Close() error { return nil } +func (ps *coreStorage) useRegionStorage() bool { + return atomic.LoadInt32(&ps.useRegionStorageFlag) > 0 +} + // AreRegionsLoaded returns whether the regions are loaded. func AreRegionsLoaded(s Storage) bool { ps := s.(*coreStorage) ps.mu.RLock() defer ps.mu.RUnlock() - if atomic.LoadInt32(&ps.useRegionStorage) == 0 { - return ps.regionLoadedFromDefault + if ps.useRegionStorage() { + return ps.regionLoaded == fromLeveldb } - return ps.regionLoadedFromStorage + return ps.regionLoaded == fromEtcd } From d0d687565f0454c71cc4c448a0626a9ee7fc4e02 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 18 Nov 2024 13:44:32 +0800 Subject: [PATCH 7/7] address comments Signed-off-by: lhy1024 --- pkg/storage/storage.go | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 9679732af17..72df0fd79d3 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -85,9 +85,9 @@ type coreStorage struct { Storage regionStorage endpoint.RegionStorage - useRegionStorageFlag int32 - regionLoaded regionSource - mu syncutil.RWMutex + useRegionStorage atomic.Bool + regionLoaded regionSource + mu syncutil.RWMutex } // NewCoreStorage creates a new core storage with the given default and region storage. @@ -125,12 +125,12 @@ func TrySwitchRegionStorage(s Storage, useLocalRegionStorage bool) endpoint.Regi if useLocalRegionStorage { // Switch the region storage to regionStorage, all region info will be read/saved by the internal // regionStorage, and in most cases it's LevelDB-backend. - atomic.StoreInt32(&ps.useRegionStorageFlag, 1) + ps.useRegionStorage.Store(true) return ps.regionStorage } // Switch the region storage to defaultStorage, all region info will be read/saved by the internal // defaultStorage, and in most cases it's etcd-backend. - atomic.StoreInt32(&ps.useRegionStorageFlag, 0) + ps.useRegionStorage.Store(false) return ps.Storage } @@ -145,7 +145,7 @@ func TryLoadRegionsOnce(ctx context.Context, s Storage, f func(region *core.Regi ps.mu.Lock() defer ps.mu.Unlock() - if !ps.useRegionStorage() { + if !ps.useRegionStorage.Load() { err := ps.Storage.LoadRegions(ctx, f) if err == nil { ps.regionLoaded = fromEtcd @@ -164,7 +164,7 @@ func TryLoadRegionsOnce(ctx context.Context, s Storage, f func(region *core.Regi // LoadRegion loads one region from storage. func (ps *coreStorage) LoadRegion(regionID uint64, region *metapb.Region) (ok bool, err error) { - if ps.useRegionStorage() { + if ps.useRegionStorage.Load() { return ps.regionStorage.LoadRegion(regionID, region) } return ps.Storage.LoadRegion(regionID, region) @@ -172,7 +172,7 @@ func (ps *coreStorage) LoadRegion(regionID uint64, region *metapb.Region) (ok bo // LoadRegions loads all regions from storage to RegionsInfo. func (ps *coreStorage) LoadRegions(ctx context.Context, f func(region *core.RegionInfo) []*core.RegionInfo) error { - if ps.useRegionStorage() { + if ps.useRegionStorage.Load() { return ps.regionStorage.LoadRegions(ctx, f) } return ps.Storage.LoadRegions(ctx, f) @@ -180,7 +180,7 @@ func (ps *coreStorage) LoadRegions(ctx context.Context, f func(region *core.Regi // SaveRegion saves one region to storage. func (ps *coreStorage) SaveRegion(region *metapb.Region) error { - if ps.useRegionStorage() { + if ps.useRegionStorage.Load() { return ps.regionStorage.SaveRegion(region) } return ps.Storage.SaveRegion(region) @@ -188,7 +188,7 @@ func (ps *coreStorage) SaveRegion(region *metapb.Region) error { // DeleteRegion deletes one region from storage. func (ps *coreStorage) DeleteRegion(region *metapb.Region) error { - if ps.useRegionStorage() { + if ps.useRegionStorage.Load() { return ps.regionStorage.DeleteRegion(region) } return ps.Storage.DeleteRegion(region) @@ -212,16 +212,12 @@ func (ps *coreStorage) Close() error { return nil } -func (ps *coreStorage) useRegionStorage() bool { - return atomic.LoadInt32(&ps.useRegionStorageFlag) > 0 -} - // AreRegionsLoaded returns whether the regions are loaded. func AreRegionsLoaded(s Storage) bool { ps := s.(*coreStorage) ps.mu.RLock() defer ps.mu.RUnlock() - if ps.useRegionStorage() { + if ps.useRegionStorage.Load() { return ps.regionLoaded == fromLeveldb } return ps.regionLoaded == fromEtcd