Skip to content

Commit

Permalink
Merge pull request #13 from teambition/refreshLabel
Browse files Browse the repository at this point in the history
Refresh labels with different product active times
  • Loading branch information
zensh authored Nov 11, 2020
2 parents cdbfa84 + 16dcf0e commit 6eeb41d
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 28 deletions.
10 changes: 6 additions & 4 deletions src/bll/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,21 @@ func (b *User) ListCachedLabels(ctx context.Context, uid, product string) *tpl.C
return res
}

activeAt := user.GetCache(product).ActiveAt
// user 上缓存的 labels 过期,则刷新获取最新,RefreshUser 要考虑并发场景
if user.ActiveAt == 0 {
if activeAt == 0 {
if user = b.ms.TryApplyLabelRulesAndRefreshUserLabels(ctx, productID, product, user.ID, now, true); user == nil {
return res
}
} else if conf.Config.IsCacheLabelExpired(now.Unix()-5, user.ActiveAt) { // 提前 5s 异步处理
} else if conf.Config.IsCacheLabelExpired(now.Unix()-5, activeAt) { // 提前 5s 异步处理
util.Go(10*time.Second, func(gctx context.Context) {
b.ms.TryApplyLabelRulesAndRefreshUserLabels(gctx, productID, product, user.ID, now, false)
})
}
userCache := user.GetCache(product)

res.Result = user.GetLabels(product)
res.Timestamp = user.ActiveAt
res.Result = userCache.Labels
res.Timestamp = userCache.ActiveAt
return res
}

Expand Down
46 changes: 46 additions & 0 deletions src/bll/user_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,49 @@ func TestChildLabelUserPercent(t *testing.T) {
assert.Equal(labelRes.Name, userLabels[1].Label)
})
}

func TestUserListCachedLabels(t *testing.T) {
user := &User{ms: model.NewModels(service.NewDB())}
product := &Product{ms: model.NewModels(service.NewDB())}

require := require.New(t)
ctx := context.Background()

uid1 := tpl.RandUID()
user.BatchAdd(ctx, []string{uid1})
userObj, err := user.ms.User.Acquire(ctx, uid1)
require.Nil(err)

for i := 0; i < 3; i++ {
productName := tpl.RandName()
productRes, err := product.Create(ctx, productName, productName)

label := &schema.Label{
ProductID: productRes.Result.ID,
Name: tpl.RandName(),
}
err = user.ms.Label.Create(ctx, label)
require.Nil(err)

labelRes, err := user.ms.Label.Acquire(ctx, productRes.Result.ID, label.Name)
require.Nil(err)
labelRule := &schema.LabelRule{
ProductID: productRes.Result.ID,
LabelID: labelRes.ID,
Kind: schema.RuleUserPercent,
Rule: `{"value": 100 }`,
}
require.Equal(100, labelRule.ToPercent())
err = user.ms.LabelRule.Create(ctx, labelRule)
require.Nil(err)

res1 := user.ListCachedLabels(ctx, userObj.UID, productName)
require.Equal(1, len(res1.Result), i)
require.Equal(label.Name, res1.Result[0].Label)
time.Sleep(time.Millisecond * 1100)
// test cache
res2 := user.ListCachedLabels(ctx, userObj.UID, productName)
require.Equal(1, len(res2.Result))
require.Equal(res1.Timestamp, res2.Timestamp)
}
}
6 changes: 3 additions & 3 deletions src/model/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func NewModels(sql *service.SQL) *Models {

// ApplyLabelRulesAndRefreshUserLabels ...
func (ms *Models) ApplyLabelRulesAndRefreshUserLabels(ctx context.Context, productID int64, product string, userID int64, now time.Time, force bool) (*schema.User, error) {
user, labelIDs, ok, err := ms.User.RefreshLabels(ctx, userID, now.Unix(), force)
user, labelIDs, ok, err := ms.User.RefreshLabels(ctx, userID, now.Unix(), force, product)
userProductLables := user.GetLabels(product)
if ok && len(userProductLables) == 0 {
hit, err := ms.LabelRule.ApplyRules(ctx, productID, userID, labelIDs, schema.RuleUserPercent)
Expand All @@ -78,7 +78,7 @@ func (ms *Models) ApplyLabelRulesAndRefreshUserLabels(ctx context.Context, produ
}
if hit > 0 {
// refresh label again
user, labelIDs, ok, err = ms.User.RefreshLabels(ctx, userID, now.Unix(), true)
user, labelIDs, ok, err = ms.User.RefreshLabels(ctx, userID, now.Unix(), true, product)
}
} else if len(userProductLables) > 0 {
pg := tpl.Pagination{PageSize: 200}
Expand All @@ -99,7 +99,7 @@ func (ms *Models) ApplyLabelRulesAndRefreshUserLabels(ctx context.Context, produ
return nil, err
}
if hit > 0 {
user, labelIDs, ok, err = ms.User.RefreshLabels(ctx, userID, now.Unix(), true)
user, labelIDs, ok, err = ms.User.RefreshLabels(ctx, userID, now.Unix(), true, product)
}
break
}
Expand Down
30 changes: 23 additions & 7 deletions src/model/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (m *User) Find(ctx context.Context, pg tpl.Pagination) ([]schema.User, int,
}

// RefreshLabels 更新 user 上的 labels 缓存,包括通过 group 关系获得的 labels
func (m *User) RefreshLabels(ctx context.Context, id int64, now int64, force bool) (*schema.User, []int64, bool, error) {
func (m *User) RefreshLabels(ctx context.Context, id int64, now int64, force bool, product string) (*schema.User, []int64, bool, error) {
user := &schema.User{}
labelIDs := make([]int64, 0)
refreshed := false
Expand All @@ -107,12 +107,25 @@ func (m *User) RefreshLabels(ctx context.Context, id int64, now int64, force boo
return gear.ErrNotFound.WithMsgf("user %d not found for RefreshLabels", id)
}

if !force && !conf.Config.IsCacheLabelExpired(now-5, user.ActiveAt) {
if !force && product != "" && !conf.Config.IsCacheLabelExpired(now-5, user.GetCache(product).ActiveAt) {
// 已被其它请求更新
return nil
}

data := make(schema.UserCacheLabels)
data := user.GetCacheMap()
for _, v := range data { // 清空旧标签缓存
v.Labels = make([]schema.UserCacheLabel, 0)
if product == "" {
v.ActiveAt = now //强制刷新缓存,更新所有活跃时间
}
}
if product != "" {
// 放在外面赋值,是为了新产品的第一次更新。
data[product] = &schema.UserCache{
Labels: make([]schema.UserCacheLabel, 0),
ActiveAt: now, // 只更新该产品缓存的刷新时间
}
}

sd = tx.Select(
goqu.I("t1.created_at"),
Expand Down Expand Up @@ -169,11 +182,14 @@ func (m *User) RefreshLabels(ctx context.Context, id int64, now int64, force boo
set[myLabelInfo.ID] = struct{}{}

labelIDs = append(labelIDs, myLabelInfo.ID)
arr, ok := data[myLabelInfo.Product]
uclm, ok := data[myLabelInfo.Product]
if !ok {
arr = make([]schema.UserCacheLabel, 0)
uclm = &schema.UserCache{
Labels: make([]schema.UserCacheLabel, 0),
}
data[myLabelInfo.Product] = uclm
}
data[myLabelInfo.Product] = append(arr, schema.UserCacheLabel{
data[myLabelInfo.Product].Labels = append(uclm.Labels, schema.UserCacheLabel{
Label: myLabelInfo.Name,
Clients: tpl.StringToSlice(myLabelInfo.Clients),
Channels: tpl.StringToSlice(myLabelInfo.Channels),
Expand All @@ -189,7 +205,7 @@ func (m *User) RefreshLabels(ctx context.Context, id int64, now int64, force boo

refreshed = true
user.ActiveAt = time.Now().UTC().Unix()
_ = user.PutLabels(data)
_ = user.PutCacheMap(data)
_, err = service.DeResult(tx.Update(schema.TableUser).
Where(goqu.C("id").Eq(id)).
Set(goqu.Record{"labels": user.Labels, "active_at": user.ActiveAt}).
Expand Down
47 changes: 33 additions & 14 deletions src/schema/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,35 +45,54 @@ type MyLabelInfo struct {
Product string `db:"product"`
}

// UserCache 用于在 User 数据上缓存数据
type UserCache struct {
ActiveAt int64 `json:"activeAt"` // 最近活跃时间戳,1970 以来的秒数,但不及时更新
Labels []UserCacheLabel `json:"labels"`
}

// UserCacheLabel 用于在 User 数据上缓存 labels
type UserCacheLabel struct {
Label string `json:"l"`
Clients []string `json:"cls,omitempty"`
Channels []string `json:"chs,omitempty"`
}

// UserCacheLabels 用于在 User 数据上缓存 labels
type UserCacheLabels map[string][]UserCacheLabel
// UserCacheLabelMap 用于在 User 数据上缓存
type UserCacheLabelMap map[string]*UserCache

// GetLabels 从 user 上读取结构化的 labels 数据
func (u *User) GetLabels(product string) []UserCacheLabel {
data := make(UserCacheLabels)
labels := []UserCacheLabel{}
// GetCache 从 user 上读取结构化的缓存数据
func (u *User) GetCache(product string) *UserCache {
userCache := &UserCache{}
if u.Labels == "" {
return labels
return userCache
}

_ = json.Unmarshal([]byte(u.Labels), &data)
for k, arr := range data {
data := u.GetCacheMap()
for k, ucl := range data {
if k == product {
return arr
return ucl
}
}
return labels
return userCache
}

// GetLabels 从 user 上读取结构化的 labels 数据
func (u *User) GetLabels(product string) []UserCacheLabel {
return u.GetCache(product).Labels
}

// GetCacheMap 从 user 上读取结构化的缓存数据
func (u *User) GetCacheMap() UserCacheLabelMap {
data := make(UserCacheLabelMap)
if u.Labels == "" {
return data
}
_ = json.Unmarshal([]byte(u.Labels), &data)
return data
}

// PutLabels 把结构化的 labels 数据转成字符串设置在 user.Labels 上
func (u *User) PutLabels(labels UserCacheLabels) error {
// PutCacheMap 把结构化的 labels 数据转成字符串设置在 user.Labels 上
func (u *User) PutCacheMap(labels UserCacheLabelMap) error {
data, err := json.Marshal(labels)
if err == nil {
u.Labels = string(data)
Expand Down

0 comments on commit 6eeb41d

Please sign in to comment.