Skip to content

Commit

Permalink
add engine params to engine API (#641)
Browse files Browse the repository at this point in the history
* add engine params to engine API

* replace go-cache with haxmap

* remove cache utility

* fix review issue
  • Loading branch information
yuyang0 authored Jul 8, 2024
1 parent 08e4fbc commit 606f643
Show file tree
Hide file tree
Showing 16 changed files with 331 additions and 147 deletions.
18 changes: 17 additions & 1 deletion engine/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
type Engine struct {
client dockerapi.APIClient
config coretypes.Config
ep *enginetypes.Params
}

// MakeClient make docker cli
Expand All @@ -45,7 +46,18 @@ func MakeClient(ctx context.Context, config coretypes.Config, nodename, endpoint
}

logger.Debugf(ctx, "Create new http.Client for %s, %s", endpoint, config.Docker.APIVersion)
return makeDockerClient(ctx, config, client, endpoint)
e, err := makeDockerClient(ctx, config, client, endpoint)
if err != nil {
return nil, err
}
e.ep = &enginetypes.Params{
Nodename: nodename,
Endpoint: endpoint,
CA: ca,
Cert: cert,
Key: key,
}
return e, nil
}

// Info show node info
Expand All @@ -59,6 +71,10 @@ func (e *Engine) Info(ctx context.Context) (*enginetypes.Info, error) {
return &enginetypes.Info{Type: Type, ID: r.ID, NCPU: r.NCPU, MemTotal: r.MemTotal}, nil
}

func (e *Engine) GetParams() *enginetypes.Params {
return e.ep
}

// Ping test connection
func (e *Engine) Ping(ctx context.Context) error {
_, err := e.client.Ping(ctx)
Expand Down
4 changes: 2 additions & 2 deletions engine/docker/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,15 +299,15 @@ func GetIP(ctx context.Context, daemonHost string) string {
return u.Hostname()
}

func makeDockerClient(_ context.Context, config coretypes.Config, client *http.Client, endpoint string) (engine.API, error) {
func makeDockerClient(_ context.Context, config coretypes.Config, client *http.Client, endpoint string) (*Engine, error) {
cli, err := dockerapi.NewClientWithOpts(
dockerapi.WithHost(endpoint),
dockerapi.WithVersion(config.Docker.APIVersion),
dockerapi.WithHTTPClient(client))
if err != nil {
return nil, err
}
return &Engine{cli, config}, nil
return &Engine{client: cli, config: config}, nil
}

func useCNI(labels map[string]string) bool {
Expand Down
1 change: 1 addition & 0 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type API interface {
Info(ctx context.Context) (*enginetypes.Info, error)
Ping(ctx context.Context) error
CloseConn() error
GetParams() *enginetypes.Params

Execute(ctx context.Context, ID string, config *enginetypes.ExecConfig) (execID string, stdout, stderr io.ReadCloser, stdin io.WriteCloser, err error)
ExecResize(ctx context.Context, execID string, height, width uint) (err error)
Expand Down
126 changes: 55 additions & 71 deletions engine/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package factory

import (
"context"
"fmt"
"strings"
"sync"
"time"
Expand All @@ -16,6 +15,7 @@ import (
"github.com/projecteru2/core/engine/fake"
"github.com/projecteru2/core/engine/mocks/fakeengine"
"github.com/projecteru2/core/engine/systemd"
enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/engine/virt"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/store"
Expand All @@ -38,22 +38,20 @@ var (

// EngineCache .
type EngineCache struct {
cache *utils.EngineCache
keysToCheck *haxmap.Map[string, engineParams]
pool *ants.PoolWithFunc
config types.Config
stor store.Store
cache *haxmap.Map[string, engine.API]
pool *ants.PoolWithFunc
config types.Config
stor store.Store
}

// NewEngineCache .
func NewEngineCache(config types.Config, stor store.Store) *EngineCache {
pool, _ := utils.NewPool(config.MaxConcurrency)
return &EngineCache{
cache: utils.NewEngineCache(12*time.Hour, 10*time.Minute),
keysToCheck: haxmap.New[string, engineParams](),
pool: pool,
config: config,
stor: stor,
cache: haxmap.New[string, engine.API](),
pool: pool,
config: config,
stor: stor,
}
}

Expand All @@ -66,28 +64,28 @@ func InitEngineCache(ctx context.Context, config types.Config, stor store.Store)
All: true,
})
}
go engineCache.CheckAlive(ctx)
go engineCache.CheckNodeStatus(ctx)
go engineCache.checkAlive(ctx)
go engineCache.checkNodeStatus(ctx)
}

// Get .
func (e *EngineCache) Get(key string) engine.API {
return e.cache.Get(key)
api, _ := e.cache.Get(key)
return api
}

// Set .
func (e *EngineCache) Set(params engineParams, client engine.API) {
e.cache.Set(params.getCacheKey(), client)
e.keysToCheck.Set(params.getCacheKey(), params)
func (e *EngineCache) Set(key string, client engine.API) {
e.cache.Set(key, client)
}

// Delete .
func (e *EngineCache) Delete(key string) {
e.cache.Delete(key)
e.cache.Del(key)
}

// CheckAlive checks if the engine in cache is available
func (e *EngineCache) CheckAlive(ctx context.Context) {
// checkAlive checks if the engine in cache is available
func (e *EngineCache) checkAlive(ctx context.Context) {
logger := log.WithFunc("engine.factory.CheckAlive")
logger.Info(ctx, "check alive starts")
defer logger.Info(ctx, "check alive ends")
Expand All @@ -99,52 +97,43 @@ func (e *EngineCache) CheckAlive(ctx context.Context) {
default:
}

paramsChan := make(chan engineParams)
go func() {
e.keysToCheck.ForEach(func(_ string, v engineParams) bool {
paramsChan <- v
return true
})
close(paramsChan)
}()

wg := &sync.WaitGroup{}
for params := range paramsChan {
e.cache.ForEach(func(_ string, v engine.API) bool {
wg.Add(1)
params := params
params := v.GetParams()
_ = e.pool.Invoke(func() {
defer wg.Done()
cacheKey := params.getCacheKey()
client := e.cache.Get(cacheKey)
cacheKey := params.CacheKey()
client := e.Get(cacheKey)
if client == nil {
e.cache.Delete(params.getCacheKey())
e.keysToCheck.Del(cacheKey)
e.Delete(cacheKey)
return
}
if _, ok := client.(*fake.EngineWithErr); ok {
if newClient, err := newEngine(ctx, e.config, params.nodename, params.endpoint, params.ca, params.key, params.cert); err != nil {
if newClient, err := newEngine(ctx, e.config, params.Nodename, params.Endpoint, params.CA, params.Key, params.Cert); err != nil {
logger.Errorf(ctx, err, "engine %+v is still unavailable", cacheKey)
e.cache.Set(cacheKey, &fake.EngineWithErr{DefaultErr: err})
e.Set(cacheKey, &fake.EngineWithErr{DefaultErr: err, EP: params})
// check node status
e.checkOneNodeStatus(ctx, &params)
e.checkOneNodeStatus(ctx, params)
} else {
e.cache.Set(cacheKey, newClient)
e.Set(cacheKey, newClient)
}
return
}
if err := validateEngine(ctx, client, e.config.ConnectionTimeout); err != nil {
logger.Errorf(ctx, err, "engine %+v is unavailable, will be replaced and removed", cacheKey)
e.cache.Set(cacheKey, &fake.EngineWithErr{DefaultErr: err})
e.Set(cacheKey, &fake.EngineWithErr{DefaultErr: err, EP: params})
}
logger.Debugf(ctx, "engine %+v is available", cacheKey)
})
}
return true
})
wg.Wait()
time.Sleep(e.config.ConnectionTimeout)
}
}

func (e *EngineCache) CheckNodeStatus(ctx context.Context) {
func (e *EngineCache) checkNodeStatus(ctx context.Context) {
logger := log.WithFunc("engine.factory.CheckNodeStatus")
logger.Info(ctx, "check NodeStatus starts")
defer logger.Info(ctx, "check NodeStatus ends")
Expand All @@ -169,10 +158,11 @@ func (e *EngineCache) CheckNodeStatus(ctx context.Context) {
continue
}
// a node may have multiple engines, so we need check all key here
e.keysToCheck.ForEach(func(_ string, ep engineParams) bool {
if ep.nodename == ns.Nodename {
logger.Infof(ctx, "remove engine %+v from cache", ep.getCacheKey())
RemoveEngineFromCache(ctx, ep.endpoint, ep.ca, ep.cert, ep.key)
e.cache.ForEach(func(_ string, v engine.API) bool {
ep := v.GetParams()
if ep.Nodename == ns.Nodename {
logger.Infof(ctx, "remove engine %+v from cache", ep.CacheKey())
RemoveEngineFromCache(ctx, ep.Endpoint, ep.CA, ep.Cert, ep.Key)
}
return true
})
Expand Down Expand Up @@ -200,38 +190,26 @@ func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca,
}

defer func() {
params := engineParams{
nodename: nodename,
endpoint: endpoint,
ca: ca,
cert: cert,
key: key,
params := &enginetypes.Params{
Nodename: nodename,
Endpoint: endpoint,
CA: ca,
Cert: cert,
Key: key,
}
cacheKey := params.getCacheKey()
cacheKey := params.CacheKey()
if err == nil {
engineCache.Set(params, client)
engineCache.Set(cacheKey, client)
logger.Infof(ctx, "store engine %+v in cache", cacheKey)
} else {
engineCache.Set(params, &fake.EngineWithErr{DefaultErr: err})
engineCache.Set(cacheKey, &fake.EngineWithErr{DefaultErr: err, EP: params})
logger.Infof(ctx, "store fake engine %+v in cache", cacheKey)
}
}()

return newEngine(ctx, config, nodename, endpoint, ca, cert, key)
}

type engineParams struct {
nodename string
endpoint string
ca string
cert string
key string
}

func (ep engineParams) getCacheKey() string {
return getEngineCacheKey(ep.endpoint, ep.ca, ep.cert, ep.key)
}

func validateEngine(ctx context.Context, engine engine.API, timeout time.Duration) (err error) {
utils.WithTimeout(ctx, timeout, func(ctx context.Context) {
err = engine.Ping(ctx)
Expand All @@ -249,7 +227,13 @@ func getEnginePrefix(endpoint string) (string, error) {
}

func getEngineCacheKey(endpoint, ca, cert, key string) string {
return fmt.Sprintf("%+v-%+v", endpoint, utils.SHA256(fmt.Sprintf(":%+v:%+v:%+v", ca, cert, key))[:8])
p := enginetypes.Params{
Endpoint: endpoint,
CA: ca,
Cert: cert,
Key: key,
}
return p.CacheKey()
}

// newEngine get engine
Expand All @@ -275,13 +259,13 @@ func newEngine(ctx context.Context, config types.Config, nodename, endpoint, ca,
return client, nil
}

func (e *EngineCache) checkOneNodeStatus(ctx context.Context, params *engineParams) {
func (e *EngineCache) checkOneNodeStatus(ctx context.Context, params *enginetypes.Params) {
if e.stor == nil {
return
}
logger := log.WithFunc("engine.factory.checkOneNodeStatus")
nodename := params.nodename
cacheKey := params.getCacheKey()
nodename := params.Nodename
cacheKey := params.CacheKey()
if ns, err := e.stor.GetNodeStatus(ctx, nodename); (err != nil && errors.Is(err, types.ErrInvaildCount)) || (!ns.Alive) {
logger.Warnf(ctx, "node %s is offline, the cache will be removed", nodename)
e.Delete(cacheKey)
Expand Down
5 changes: 5 additions & 0 deletions engine/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@ import (
// EngineWithErr use to mock the nil engine
type EngineWithErr struct {
DefaultErr error
EP *enginetypes.Params
}

// Info .
func (f *EngineWithErr) Info(_ context.Context) (*enginetypes.Info, error) {
return nil, f.DefaultErr
}

func (f *EngineWithErr) GetParams() *enginetypes.Params {
return f.EP
}

// Ping .
func (f *EngineWithErr) Ping(_ context.Context) error {
return f.DefaultErr
Expand Down
Loading

0 comments on commit 606f643

Please sign in to comment.