Skip to content

Commit

Permalink
NVSHAS-8502 add TTL to installation ID cache
Browse files Browse the repository at this point in the history
In this commit, a TTL is added to installation ID.

In the default fresh install scenario, 3 controllers will start by
determing who should be the bootstrap controller.
"neuvector-svc-controller" service will be resolved during the process,
and the controller that gets only one IP address and owns the IP
address will be the winner/leader.

The winner will use "--bootstrap" as its consul process' arguments.

Check cluster.StartCluster() for detail.

However, depending on CNI implementation, there is no guarantee that
only one controller will meet that criteria.  Therefore, two consul
instances with "--bootstrap" flag could be created.

Unfortunately, consul doesn't allow two instances with "--bootstrap"
flag to join together.

In that case, split brain will happen and installation ID can be
inconsistent among these nodes.

While this split-brain is fixable via an upgrade, installation ID will
not be consistent until all consul nodes join.

While we can wait for some time, it's hard to determine when
installation ID has become consistent.

By giving the cache TTL, we provide it a chance to recover
from this condition.
  • Loading branch information
holyspectral committed Dec 5, 2023
1 parent 2c45dc5 commit 659c57d
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 29 deletions.
4 changes: 1 addition & 3 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,10 +534,8 @@ func main() {

// Initialize installation ID. Ignore if ID is already set.
clusHelper := kv.GetClusterHelper()
if id, err := clusHelper.GetInstallationID(); err != nil {
if _, err := clusHelper.GetInstallationID(); err != nil {
log.WithError(err).Warn("installation id is not readable. Will retry later.")
} else {
log.WithField("installation-id", id).Info("Installation id is created")
}

if Ctrler.Leader {
Expand Down
49 changes: 38 additions & 11 deletions controller/kv/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io/ioutil"
"net/url"
"strings"
"sync"
"time"

"github.com/pkg/errors"
Expand All @@ -26,6 +27,10 @@ import (
"github.com/neuvector/neuvector/share/utils"
)

const (
InstallationCacheTTL = time.Minute * 30
)

type MockKvConfigUpdateFunc func(nType cluster.ClusterNotifyType, key string, value []byte)

type LogEventFunc func(share.TLogEvent, time.Time, int, string)
Expand Down Expand Up @@ -298,11 +303,16 @@ type ClusterHelper interface {
SetCacheMockCallback(keyStore string, mockFunc MockKvConfigUpdateFunc)
}

var (
installationID string
installationIDLastUpdate time.Time
installationIDLock sync.RWMutex
)

type clusterHelper struct {
id string
version string
persist bool
installationID string
id string
version string
persist bool
}

var clusHelperImpl *clusterHelper
Expand Down Expand Up @@ -502,7 +512,7 @@ func (m clusterHelper) GetOrCreateInstallationID() (string, error) {
}
id = string(value)
if id != "" {
// Already have an installation ID. Do nothing
// Already have an installation ID stored in "id". Do nothing
return nil
}

Expand All @@ -525,14 +535,31 @@ func (m clusterHelper) GetOrCreateInstallationID() (string, error) {
return id, nil
}

func (m clusterHelper) GetInstallationID() (string, error) {
// Get from cache if exists
if m.installationID != "" {
return m.installationID, nil
// Installation ID will be cached for the given TTL.
// This is to correct data inconsistency that could happen during fresh install.
func (m *clusterHelper) GetInstallationID() (string, error) {
// Get from cache if it exists and does not expire.
installationIDLock.RLock()
if installationID != "" && time.Now().Before(installationIDLastUpdate.Add(InstallationCacheTTL)) {
installationIDLock.RUnlock()
return installationID, nil
}
installationIDLock.RUnlock()

installationIDLock.Lock()
defer installationIDLock.Unlock()

// Otherwise try to create one.
return m.GetOrCreateInstallationID()
// Otherwise try to get/create one.
id, err := m.GetOrCreateInstallationID()
if err != nil {
return "", err
}
if installationID != id {
log.WithFields(log.Fields{"id": id}).Info("installation ID is updated")
installationID = id
}
installationIDLastUpdate = time.Now()
return id, err
}

func (m clusterHelper) GetAllEnforcers() []*share.CLUSAgent {
Expand Down
29 changes: 17 additions & 12 deletions controller/rest/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"crypto/rsa"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"math"
Expand All @@ -14,6 +13,8 @@ import (
"sync"
"time"

"github.com/pkg/errors"

"github.com/dgrijalva/jwt-go"
"github.com/julienschmidt/httprouter"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -167,14 +168,6 @@ func GetJWTSigningKey() JWTCertificateState {
return jwtCertState
}

func GetInstallationID() string {
if installID == nil {
id, _ := clusHelper.GetInstallationID()
installID = &id
}
return *installID
}

// With userMutex locked when calling this because it does loginSession lookup first
func newLoginSessionFromToken(token string, claims *tokenClaim, now time.Time) (*loginSession, int) {
s := &loginSession{
Expand Down Expand Up @@ -707,7 +700,11 @@ func restReq2User(r *http.Request) (*loginSession, int, string) {
if !ok {
if claims.MainSessionID == "" || strings.HasPrefix(claims.MainSessionID, _rancherSessionPrefix) { // meaning it's not a master token issued by master cluster
// Check if the token is from the same "installation"
installID := GetInstallationID()
installID, err := clusHelper.GetInstallationID()
if err != nil {
log.WithError(err).Error("failed to get installation ID")
return nil, userTimeout, rsessToken
}
if installID != claims.Subject {
log.Debug("Token from different installation")
return nil, userTimeout, rsessToken
Expand Down Expand Up @@ -1231,7 +1228,11 @@ func jwtValidateToken(encryptedToken, secret string, rsaPublicKey *rsa.PublicKey
var alternativeKey *rsa.PublicKey

jwtCert := GetJWTSigningKey()
installID := GetInstallationID()
installID, err := clusHelper.GetInstallationID()
if err != nil {
log.WithError(err).Error("failed to get installation ID")
return nil, errors.Wrap(err, "failed to get installation ID")
}

if secret == "" {
tokenString = utils.DecryptUserToken(encryptedToken, []byte(installID))
Expand Down Expand Up @@ -1329,7 +1330,11 @@ func jwtValidateFedJoinTicket(encryptedTicket, secret string) error {

func jwtGenerateToken(user *share.CLUSUser, roles access.DomainRole, remote, mainSessionID, mainSessionUser string, sso *SsoSession) (string, string, *tokenClaim) {
id := utils.GetRandomID(idLength, "")
installID := GetInstallationID()
installID, err := clusHelper.GetInstallationID()
if err != nil {
log.WithError(err).Error("failed to get installation ID")
return "", "", &tokenClaim{}
}
now := time.Now()
c := tokenClaim{
Remote: remote,
Expand Down
6 changes: 3 additions & 3 deletions share/cluster/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,7 @@ func compareStoreKeys(cache map[string]uint64, kvs api.KVPairs) []string {
}

var ret []string
for key, _ := range cache {
for key := range cache {
if _, ok := m[key]; ok {
continue
}
Expand Down Expand Up @@ -1334,15 +1334,15 @@ func (m *consulMethod) RegisterExistingWatchers() {
go registerNodeUpdate()
}
keyWatcherMutex.RLock()
for key, _ := range keyWatchers {
for key := range keyWatchers {
go registerKeyUpdate(key)
}
keyWatcherMutex.RUnlock()
if len(stateWatchers) > 0 {
go registerStateUpdate()
}
storeWatcherMutex.RLock()
for store, _ := range storeWatchers {
for store := range storeWatchers {
go registerStoreUpdate(store, storeWatchersCongestCtl[store])
}
storeWatcherMutex.RUnlock()
Expand Down

0 comments on commit 659c57d

Please sign in to comment.