Skip to content

Commit

Permalink
dion't check engine on offline node
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyang0 committed Dec 30, 2023
1 parent dfa230c commit 5f25049
Show file tree
Hide file tree
Showing 14 changed files with 242 additions and 61 deletions.
7 changes: 6 additions & 1 deletion cluster/calcium/calcium.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/projecteru2/core/source/github"
"github.com/projecteru2/core/source/gitlab"
"github.com/projecteru2/core/store"
storefactory "github.com/projecteru2/core/store/factory"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
"github.com/projecteru2/core/wal"
Expand All @@ -37,7 +38,7 @@ type Calcium struct {
func New(ctx context.Context, config types.Config, t *testing.T) (*Calcium, error) {
logger := log.WithFunc("calcium.New")
// set store
store, err := store.NewStore(config, t)
store, err := storefactory.NewStore(config, t)
if err != nil {
logger.Error(ctx, err)
return nil, err
Expand Down Expand Up @@ -111,3 +112,7 @@ func (c *Calcium) Finalizer() {
func (c *Calcium) GetIdentifier() string {
return c.identifier
}

func (c *Calcium) GetStore() store.Store {
return c.store
}
4 changes: 3 additions & 1 deletion cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/projecteru2/core/metrics"
"github.com/projecteru2/core/resource/plugins"
resourcetypes "github.com/projecteru2/core/resource/types"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)
Expand Down Expand Up @@ -109,7 +110,8 @@ func (c *Calcium) RemoveNode(ctx context.Context, nodename string) error {
// node with resource info
func (c *Calcium) ListPodNodes(ctx context.Context, opts *types.ListNodesOptions) (<-chan *types.Node, error) {
logger := log.WithFunc("calcium.ListPodNodes").WithField("podname", opts.Podname).WithField("labels", opts.Labels).WithField("all", opts.All).WithField("info", opts.CallInfo)
nodes, err := c.store.GetNodesByPod(ctx, &types.NodeFilter{Podname: opts.Podname, Labels: opts.Labels, All: opts.All})
nf := &types.NodeFilter{Podname: opts.Podname, Labels: opts.Labels, All: opts.All}
nodes, err := c.store.GetNodesByPod(ctx, nf, store.WithoutEngineOption())
if err != nil {
logger.Error(ctx, err)
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
func TestAddNode(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()
factory.InitEngineCache(ctx, c.config)
factory.InitEngineCache(ctx, c.config, nil)

opts := &types.AddNodeOptions{}
// failed by validating
Expand Down
7 changes: 4 additions & 3 deletions core.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ func serve(c *cli.Context) error {
defer log.SentryDefer()
logger := log.WithFunc("main")

// init engine cache and start engine cache checker
factory.InitEngineCache(c.Context, config)

var t *testing.T
if embeddedStorage {
t = &testing.T{}
Expand All @@ -60,6 +57,10 @@ func serve(c *cli.Context) error {
return err
}
defer cluster.Finalizer()

// init engine cache and start engine cache checker
factory.InitEngineCache(c.Context, config, cluster.GetStore())

cluster.DisasterRecover(c.Context)

stop := make(chan struct{}, 1)
Expand Down
87 changes: 77 additions & 10 deletions engine/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strings"
"sync"
"time"
"unsafe"

"github.com/alphadose/haxmap"
"github.com/cockroachdb/errors"
Expand All @@ -19,6 +18,7 @@ import (
"github.com/projecteru2/core/engine/systemd"
"github.com/projecteru2/core/engine/virt"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)
Expand All @@ -39,26 +39,33 @@ var (
// EngineCache .
type EngineCache struct {
cache *utils.EngineCache
keysToCheck *haxmap.Map[uintptr, engineParams]
keysToCheck *haxmap.Map[string, engineParams]
pool *ants.PoolWithFunc
config types.Config
sto store.Store
}

// NewEngineCache .
func NewEngineCache(config types.Config) *EngineCache {
func NewEngineCache(config types.Config, sto store.Store) *EngineCache {
pool, _ := utils.NewPool(config.MaxConcurrency)
return &EngineCache{
cache: utils.NewEngineCache(12*time.Hour, 10*time.Minute),
keysToCheck: haxmap.New[uintptr, engineParams](),
keysToCheck: haxmap.New[string, engineParams](),
pool: pool,
config: config,
sto: sto,
}
}

// InitEngineCache init engine cache and start engine cache checker
func InitEngineCache(ctx context.Context, config types.Config) {
engineCache = NewEngineCache(config)
func InitEngineCache(ctx context.Context, config types.Config, sto store.Store) {
engineCache = NewEngineCache(config, sto)
// init the cache, we don't care the return values
_, _ = engineCache.sto.GetNodesByPod(ctx, &types.NodeFilter{
All: true,
})
go engineCache.CheckAlive(ctx)
go engineCache.CheckNodeStatus(ctx)
}

// Get .
Expand All @@ -69,7 +76,7 @@ func (e *EngineCache) Get(key string) engine.API {
// Set .
func (e *EngineCache) Set(params engineParams, client engine.API) {
e.cache.Set(params.getCacheKey(), client)
e.keysToCheck.Set(uintptr(unsafe.Pointer(&params)), params)
e.keysToCheck.Set(params.getCacheKey(), params)
}

// Delete .
Expand All @@ -92,7 +99,7 @@ func (e *EngineCache) CheckAlive(ctx context.Context) {

paramsChan := make(chan engineParams)
go func() {
e.keysToCheck.ForEach(func(k uintptr, v engineParams) bool {
e.keysToCheck.ForEach(func(k string, v engineParams) bool {
paramsChan <- v
return true
})
Expand All @@ -105,16 +112,25 @@ func (e *EngineCache) CheckAlive(ctx context.Context) {
params := params
_ = e.pool.Invoke(func() {
defer wg.Done()
nodename := params.nodename
cacheKey := params.getCacheKey()
client := e.cache.Get(cacheKey)
if client == nil {
e.cache.Delete(params.getCacheKey())
e.keysToCheck.Del(uintptr(unsafe.Pointer(&params)))
e.keysToCheck.Del(cacheKey)
return
}
if _, ok := client.(*fake.EngineWithErr); ok {
if _, ok := client.(*fake.EngineWithErr); ok { //nolint:nestif
if newClient, err := newEngine(ctx, e.config, params.nodename, params.endpoint, params.ca, params.key, params.cert); err != nil {
logger.Errorf(ctx, err, "engine %+v is still unavailable", cacheKey)
// check node status
if _, err := e.sto.GetNodeStatus(ctx, nodename); err != nil && errors.Is(err, types.ErrInvaildCount) {
logger.Warnf(ctx, "node %s is offline, the cache will be removed", nodename)
RemoveEngineFromCache(ctx, params.endpoint, params.ca, params.cert, params.key)
} else {
logger.Errorf(ctx, err, "engine %+v is unavailable, will be replaced and removed", cacheKey)
e.cache.Set(cacheKey, &fake.EngineWithErr{DefaultErr: err})
}
} else {
e.cache.Set(cacheKey, newClient)
}
Expand All @@ -131,6 +147,42 @@ func (e *EngineCache) CheckAlive(ctx context.Context) {
}
}

func (e *EngineCache) CheckNodeStatus(ctx context.Context) {
logger := log.WithFunc("engine.factory.CheckNodeStatus")
logger.Info(ctx, "check NodeStatus starts")
defer logger.Info(ctx, "check NodeStatus ends")
if e.sto == nil {
logger.Warnf(ctx, "nodeStore is nil")
return
}
for {
select {
case <-ctx.Done():
return
default:
}
ch := e.sto.NodeStatusStream(ctx)

for ns := range ch {
if ns.Alive {
// GetNode will call GetEngine, so GetNode updates the engine cache automatically
if _, err := e.sto.GetNode(ctx, ns.Nodename); err != nil {
logger.Warnf(ctx, "failed to get node %s: %s", ns.Nodename, err)
}
} else {
// a node may have multiple engines, so we need check all key here
e.keysToCheck.ForEach(func(k string, ep engineParams) bool {
if ep.nodename == ns.Nodename {
logger.Infof(ctx, "remove engine %+v from cache", ep.getCacheKey())
RemoveEngineFromCache(ctx, ep.endpoint, ep.ca, ep.cert, ep.key)
}
return true
})
}
}
}
}

// GetEngineFromCache .
func GetEngineFromCache(_ context.Context, endpoint, ca, cert, key string) engine.API {
return engineCache.Get(getEngineCacheKey(endpoint, ca, cert, key))
Expand All @@ -143,6 +195,21 @@ func RemoveEngineFromCache(ctx context.Context, endpoint, ca, cert, key string)
engineCache.Delete(cacheKey)
}

func GetNodeEngine(ctx context.Context, config types.Config, node *types.Node) (client engine.API, err error) {
if err := engineCache.sto.LoadNodeCert(ctx, node); err != nil {
return nil, err
}
return GetEngine(ctx, config, node.Name, node.Endpoint, node.Ca, node.Cert, node.Key)
}

func GetWorkloadEngine(ctx context.Context, config types.Config, wrk *types.Workload) (client engine.API, err error) {
node, err := engineCache.sto.GetNode(ctx, wrk.Nodename)
if err != nil {
return nil, err
}
return GetNodeEngine(ctx, config, node)
}

// GetEngine get engine with cache
func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca, cert, key string) (client engine.API, err error) {
logger := log.WithFunc("engine.factory.GetEngine")
Expand Down
3 changes: 2 additions & 1 deletion selfmon/selfmon.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/projecteru2/core/cluster"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/store"
storefactory "github.com/projecteru2/core/store/factory"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)
Expand All @@ -31,7 +32,7 @@ type NodeStatusWatcher struct {
func RunNodeStatusWatcher(ctx context.Context, config types.Config, cluster cluster.Cluster, t *testing.T) {
r := rand.New(rand.NewSource(int64(new(maphash.Hash).Sum64()))) //nolint
ID := r.Int63n(10000) //nolint
store, err := store.NewStore(config, t)
store, err := storefactory.NewStore(config, t)
if err != nil {
log.WithFunc("selfmon.RunNodeStatusWatcher").WithField("ID", ID).Error(ctx, err, "failed to create store")
return
Expand Down
2 changes: 1 addition & 1 deletion store/etcdv3/mercury_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func NewMercury(t *testing.T) *Mercury {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
factory.InitEngineCache(ctx, config)
factory.InitEngineCache(ctx, config, nil)

m, err := New(config, t)
assert.NoError(t, err)
Expand Down
46 changes: 37 additions & 9 deletions store/etcdv3/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/projecteru2/core/engine/fake"
"github.com/projecteru2/core/engine/mocks/fakeengine"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
)
Expand Down Expand Up @@ -65,19 +66,23 @@ func (m *Mercury) GetNodes(ctx context.Context, nodenames []string) ([]*types.No
if err != nil {
return nil, err
}
return m.doGetNodes(ctx, kvs, nil, true)
return m.doGetNodes(ctx, kvs, nil, true, nil)
}

// GetNodesByPod get all nodes bound to pod
// here we use podname instead of pod instance
func (m *Mercury) GetNodesByPod(ctx context.Context, nodeFilter *types.NodeFilter) ([]*types.Node, error) {
func (m *Mercury) GetNodesByPod(ctx context.Context, nodeFilter *types.NodeFilter, opts ...store.Option) ([]*types.Node, error) {
var op store.Op
for _, opt := range opts {
opt(&op)
}
do := func(podname string) ([]*types.Node, error) {
key := fmt.Sprintf(nodePodKey, podname, "")
resp, err := m.Get(ctx, key, clientv3.WithPrefix())
if err != nil {
return nil, err
}
return m.doGetNodes(ctx, resp.Kvs, nodeFilter.Labels, nodeFilter.All)
return m.doGetNodes(ctx, resp.Kvs, nodeFilter.Labels, nodeFilter.All, &op)
}
if nodeFilter.Podname != "" {
return do(nodeFilter.Podname)
Expand Down Expand Up @@ -212,6 +217,24 @@ func (m *Mercury) NodeStatusStream(ctx context.Context) chan *types.NodeStatus {
return ch
}

func (m *Mercury) LoadNodeCert(ctx context.Context, node *types.Node) (err error) {
keyFormats := []string{nodeCaKey, nodeCertKey, nodeKeyKey}
data := []string{"", "", ""}
for i := 0; i < 3; i++ {
ev, err := m.GetOne(ctx, fmt.Sprintf(keyFormats[i], node.Name))
if err != nil {
if !errors.Is(err, types.ErrInvaildCount) {
log.WithFunc("store.etcdv3.LoadNodeCert").Warn(ctx, err, "Get key failed")
return err
}
continue
}
data[i] = string(ev.Value)
}
node.Ca, node.Cert, node.Key = data[0], data[1], data[2]
return nil
}

func (m *Mercury) makeClient(ctx context.Context, node *types.Node) (client engine.API, err error) {
// try to get from cache without ca/cert/key
if client = enginefactory.GetEngineFromCache(ctx, node.Endpoint, "", "", ""); client != nil {
Expand Down Expand Up @@ -298,7 +321,10 @@ func (m *Mercury) doRemoveNode(ctx context.Context, podname, nodename, endpoint
return err
}

func (m *Mercury) doGetNodes(ctx context.Context, kvs []*mvccpb.KeyValue, labels map[string]string, all bool) (nodes []*types.Node, err error) {
func (m *Mercury) doGetNodes(
ctx context.Context, kvs []*mvccpb.KeyValue,
labels map[string]string, all bool, op *store.Op,
) (nodes []*types.Node, err error) {
allNodes := []*types.Node{}
for _, ev := range kvs {
node := &types.Node{}
Expand Down Expand Up @@ -332,11 +358,13 @@ func (m *Mercury) doGetNodes(ctx context.Context, kvs []*mvccpb.KeyValue, labels
return
}

// update engine
if client, err := m.makeClient(ctx, node); err != nil {
logger.Errorf(ctx, err, "failed to make client for %+v", node.Name)
} else {
node.Engine = client
if op == nil || (!op.WithoutEngine) {
// update engine
if client, err := m.makeClient(ctx, node); err != nil {
logger.Errorf(ctx, err, "failed to make client for %+v", node.Name)
} else {
node.Engine = client
}
}
nodesCh <- node
})
Expand Down
21 changes: 21 additions & 0 deletions store/factory/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package factory

import (
"testing"

"github.com/projecteru2/core/store"
"github.com/projecteru2/core/store/etcdv3"
"github.com/projecteru2/core/store/redis"
"github.com/projecteru2/core/types"
)

// NewStore creates a store
func NewStore(config types.Config, t *testing.T) (sto store.Store, err error) {
switch config.Store {
case types.Redis:
sto, err = redis.New(config, t)
default:
sto, err = etcdv3.New(config, t)
}
return sto, err
}
Loading

0 comments on commit 5f25049

Please sign in to comment.