From 16dcf0e620e6a66ecc460ec9a87895b056a58d24 Mon Sep 17 00:00:00 2001 From: mushroomsir Date: Tue, 10 Nov 2020 16:17:48 +0800 Subject: [PATCH] Refresh labels with different product active times --- src/bll/user.go | 10 ++++++---- src/bll/user_test.go | 46 +++++++++++++++++++++++++++++++++++++++++++ src/model/common.go | 6 +++--- src/model/user.go | 30 +++++++++++++++++++++------- src/schema/user.go | 47 +++++++++++++++++++++++++++++++------------- 5 files changed, 111 insertions(+), 28 deletions(-) diff --git a/src/bll/user.go b/src/bll/user.go index 9d74eff..dc488de 100644 --- a/src/bll/user.go +++ b/src/bll/user.go @@ -65,19 +65,21 @@ func (b *User) ListCachedLabels(ctx context.Context, uid, product string) *tpl.C return res } + activeAt := user.GetCache(product).ActiveAt // user 上缓存的 labels 过期,则刷新获取最新,RefreshUser 要考虑并发场景 - if user.ActiveAt == 0 { + if activeAt == 0 { if user = b.ms.TryApplyLabelRulesAndRefreshUserLabels(ctx, productID, product, user.ID, now, true); user == nil { return res } - } else if conf.Config.IsCacheLabelExpired(now.Unix()-5, user.ActiveAt) { // 提前 5s 异步处理 + } else if conf.Config.IsCacheLabelExpired(now.Unix()-5, activeAt) { // 提前 5s 异步处理 util.Go(10*time.Second, func(gctx context.Context) { b.ms.TryApplyLabelRulesAndRefreshUserLabels(gctx, productID, product, user.ID, now, false) }) } + userCache := user.GetCache(product) - res.Result = user.GetLabels(product) - res.Timestamp = user.ActiveAt + res.Result = userCache.Labels + res.Timestamp = userCache.ActiveAt return res } diff --git a/src/bll/user_test.go b/src/bll/user_test.go index 2a547ed..cca1984 100644 --- a/src/bll/user_test.go +++ b/src/bll/user_test.go @@ -177,3 +177,49 @@ func TestChildLabelUserPercent(t *testing.T) { assert.Equal(labelRes.Name, userLabels[1].Label) }) } + +func TestUserListCachedLabels(t *testing.T) { + user := &User{ms: model.NewModels(service.NewDB())} + product := &Product{ms: model.NewModels(service.NewDB())} + + require := require.New(t) + ctx := context.Background() + + uid1 := tpl.RandUID() + user.BatchAdd(ctx, []string{uid1}) + userObj, err := user.ms.User.Acquire(ctx, uid1) + require.Nil(err) + + for i := 0; i < 3; i++ { + productName := tpl.RandName() + productRes, err := product.Create(ctx, productName, productName) + + label := &schema.Label{ + ProductID: productRes.Result.ID, + Name: tpl.RandName(), + } + err = user.ms.Label.Create(ctx, label) + require.Nil(err) + + labelRes, err := user.ms.Label.Acquire(ctx, productRes.Result.ID, label.Name) + require.Nil(err) + labelRule := &schema.LabelRule{ + ProductID: productRes.Result.ID, + LabelID: labelRes.ID, + Kind: schema.RuleUserPercent, + Rule: `{"value": 100 }`, + } + require.Equal(100, labelRule.ToPercent()) + err = user.ms.LabelRule.Create(ctx, labelRule) + require.Nil(err) + + res1 := user.ListCachedLabels(ctx, userObj.UID, productName) + require.Equal(1, len(res1.Result), i) + require.Equal(label.Name, res1.Result[0].Label) + time.Sleep(time.Millisecond * 1100) + // test cache + res2 := user.ListCachedLabels(ctx, userObj.UID, productName) + require.Equal(1, len(res2.Result)) + require.Equal(res1.Timestamp, res2.Timestamp) + } +} diff --git a/src/model/common.go b/src/model/common.go index 444d397..ff6ecd3 100644 --- a/src/model/common.go +++ b/src/model/common.go @@ -69,7 +69,7 @@ func NewModels(sql *service.SQL) *Models { // ApplyLabelRulesAndRefreshUserLabels ... func (ms *Models) ApplyLabelRulesAndRefreshUserLabels(ctx context.Context, productID int64, product string, userID int64, now time.Time, force bool) (*schema.User, error) { - user, labelIDs, ok, err := ms.User.RefreshLabels(ctx, userID, now.Unix(), force) + user, labelIDs, ok, err := ms.User.RefreshLabels(ctx, userID, now.Unix(), force, product) userProductLables := user.GetLabels(product) if ok && len(userProductLables) == 0 { hit, err := ms.LabelRule.ApplyRules(ctx, productID, userID, labelIDs, schema.RuleUserPercent) @@ -78,7 +78,7 @@ func (ms *Models) ApplyLabelRulesAndRefreshUserLabels(ctx context.Context, produ } if hit > 0 { // refresh label again - user, labelIDs, ok, err = ms.User.RefreshLabels(ctx, userID, now.Unix(), true) + user, labelIDs, ok, err = ms.User.RefreshLabels(ctx, userID, now.Unix(), true, product) } } else if len(userProductLables) > 0 { pg := tpl.Pagination{PageSize: 200} @@ -99,7 +99,7 @@ func (ms *Models) ApplyLabelRulesAndRefreshUserLabels(ctx context.Context, produ return nil, err } if hit > 0 { - user, labelIDs, ok, err = ms.User.RefreshLabels(ctx, userID, now.Unix(), true) + user, labelIDs, ok, err = ms.User.RefreshLabels(ctx, userID, now.Unix(), true, product) } break } diff --git a/src/model/user.go b/src/model/user.go index 24b5def..255c6d5 100644 --- a/src/model/user.go +++ b/src/model/user.go @@ -85,7 +85,7 @@ func (m *User) Find(ctx context.Context, pg tpl.Pagination) ([]schema.User, int, } // RefreshLabels 更新 user 上的 labels 缓存,包括通过 group 关系获得的 labels -func (m *User) RefreshLabels(ctx context.Context, id int64, now int64, force bool) (*schema.User, []int64, bool, error) { +func (m *User) RefreshLabels(ctx context.Context, id int64, now int64, force bool, product string) (*schema.User, []int64, bool, error) { user := &schema.User{} labelIDs := make([]int64, 0) refreshed := false @@ -107,12 +107,25 @@ func (m *User) RefreshLabels(ctx context.Context, id int64, now int64, force boo return gear.ErrNotFound.WithMsgf("user %d not found for RefreshLabels", id) } - if !force && !conf.Config.IsCacheLabelExpired(now-5, user.ActiveAt) { + if !force && product != "" && !conf.Config.IsCacheLabelExpired(now-5, user.GetCache(product).ActiveAt) { // 已被其它请求更新 return nil } - data := make(schema.UserCacheLabels) + data := user.GetCacheMap() + for _, v := range data { // 清空旧标签缓存 + v.Labels = make([]schema.UserCacheLabel, 0) + if product == "" { + v.ActiveAt = now //强制刷新缓存,更新所有活跃时间 + } + } + if product != "" { + // 放在外面赋值,是为了新产品的第一次更新。 + data[product] = &schema.UserCache{ + Labels: make([]schema.UserCacheLabel, 0), + ActiveAt: now, // 只更新该产品缓存的刷新时间 + } + } sd = tx.Select( goqu.I("t1.created_at"), @@ -169,11 +182,14 @@ func (m *User) RefreshLabels(ctx context.Context, id int64, now int64, force boo set[myLabelInfo.ID] = struct{}{} labelIDs = append(labelIDs, myLabelInfo.ID) - arr, ok := data[myLabelInfo.Product] + uclm, ok := data[myLabelInfo.Product] if !ok { - arr = make([]schema.UserCacheLabel, 0) + uclm = &schema.UserCache{ + Labels: make([]schema.UserCacheLabel, 0), + } + data[myLabelInfo.Product] = uclm } - data[myLabelInfo.Product] = append(arr, schema.UserCacheLabel{ + data[myLabelInfo.Product].Labels = append(uclm.Labels, schema.UserCacheLabel{ Label: myLabelInfo.Name, Clients: tpl.StringToSlice(myLabelInfo.Clients), Channels: tpl.StringToSlice(myLabelInfo.Channels), @@ -189,7 +205,7 @@ func (m *User) RefreshLabels(ctx context.Context, id int64, now int64, force boo refreshed = true user.ActiveAt = time.Now().UTC().Unix() - _ = user.PutLabels(data) + _ = user.PutCacheMap(data) _, err = service.DeResult(tx.Update(schema.TableUser). Where(goqu.C("id").Eq(id)). Set(goqu.Record{"labels": user.Labels, "active_at": user.ActiveAt}). diff --git a/src/schema/user.go b/src/schema/user.go index f8df36b..cfa79eb 100644 --- a/src/schema/user.go +++ b/src/schema/user.go @@ -45,6 +45,12 @@ type MyLabelInfo struct { Product string `db:"product"` } +// UserCache 用于在 User 数据上缓存数据 +type UserCache struct { + ActiveAt int64 `json:"activeAt"` // 最近活跃时间戳,1970 以来的秒数,但不及时更新 + Labels []UserCacheLabel `json:"labels"` +} + // UserCacheLabel 用于在 User 数据上缓存 labels type UserCacheLabel struct { Label string `json:"l"` @@ -52,28 +58,41 @@ type UserCacheLabel struct { Channels []string `json:"chs,omitempty"` } -// UserCacheLabels 用于在 User 数据上缓存 labels -type UserCacheLabels map[string][]UserCacheLabel +// UserCacheLabelMap 用于在 User 数据上缓存 +type UserCacheLabelMap map[string]*UserCache -// GetLabels 从 user 上读取结构化的 labels 数据 -func (u *User) GetLabels(product string) []UserCacheLabel { - data := make(UserCacheLabels) - labels := []UserCacheLabel{} +// GetCache 从 user 上读取结构化的缓存数据 +func (u *User) GetCache(product string) *UserCache { + userCache := &UserCache{} if u.Labels == "" { - return labels + return userCache } - - _ = json.Unmarshal([]byte(u.Labels), &data) - for k, arr := range data { + data := u.GetCacheMap() + for k, ucl := range data { if k == product { - return arr + return ucl } } - return labels + return userCache +} + +// GetLabels 从 user 上读取结构化的 labels 数据 +func (u *User) GetLabels(product string) []UserCacheLabel { + return u.GetCache(product).Labels +} + +// GetCacheMap 从 user 上读取结构化的缓存数据 +func (u *User) GetCacheMap() UserCacheLabelMap { + data := make(UserCacheLabelMap) + if u.Labels == "" { + return data + } + _ = json.Unmarshal([]byte(u.Labels), &data) + return data } -// PutLabels 把结构化的 labels 数据转成字符串设置在 user.Labels 上 -func (u *User) PutLabels(labels UserCacheLabels) error { +// PutCacheMap 把结构化的 labels 数据转成字符串设置在 user.Labels 上 +func (u *User) PutCacheMap(labels UserCacheLabelMap) error { data, err := json.Marshal(labels) if err == nil { u.Labels = string(data)