From 8bce6e8f4dcb9f680288d78fce399570730d35e6 Mon Sep 17 00:00:00 2001 From: RockChinQ <1010553892@qq.com> Date: Fri, 26 Apr 2024 14:49:36 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=B6=E5=88=B0=E6=96=B0=E7=A8=BF?= =?UTF-8?q?=E4=BB=B6=E6=97=B6=E5=8F=91=E9=80=81=E5=88=B0mq?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/config/config.go | 1 + backend/core/app.go | 2 +- backend/mq/redis.go | 17 +++++++++++++++++ backend/service/post.go | 12 +++++++++++- 4 files changed, 30 insertions(+), 2 deletions(-) diff --git a/backend/config/config.go b/backend/config/config.go index df1f3d2..d1ba1ce 100644 --- a/backend/config/config.go +++ b/backend/config/config.go @@ -34,6 +34,7 @@ func SetDefault() { viper.SetDefault("mq.redis.password", "") viper.SetDefault("mq.redis.db", 0) viper.SetDefault("mq.redis.stream.publish_post", "campux_publish_post") + viper.SetDefault("mq.redis.stream.new_post", "campux_new_post") } diff --git a/backend/core/app.go b/backend/core/app.go index d2371fb..d373792 100644 --- a/backend/core/app.go +++ b/backend/core/app.go @@ -23,7 +23,7 @@ func NewApplication() *Application { msq := mq.NewRedisStreamMQ() as := service.NewAccountService(*db) - ps := service.NewPostService(*db, *fs) + ps := service.NewPostService(*db, *fs, *msq) ms := service.NewMiscService(*db) err := ScheduleRoutines(*db, *msq) diff --git a/backend/mq/redis.go b/backend/mq/redis.go index e5b6293..87a9d3e 100644 --- a/backend/mq/redis.go +++ b/backend/mq/redis.go @@ -10,6 +10,7 @@ import ( type RedisStreamMQ struct { Client *redis.Client PublishPostStream string + NewPostStream string } func NewRedisStreamMQ() *RedisStreamMQ { @@ -18,9 +19,15 @@ func NewRedisStreamMQ() *RedisStreamMQ { Password: viper.GetString("mq.redis.password"), DB: viper.GetInt("mq.redis.db"), }) + + // 检查流是否存在 + client.XGroupCreateMkStream(context.Background(), viper.GetString("mq.redis.stream.publish_post"), "campux", "0") + client.XGroupCreateMkStream(context.Background(), viper.GetString("mq.redis.stream.new_post"), "campux", "0") + return &RedisStreamMQ{ Client: client, PublishPostStream: viper.GetString("mq.redis.stream.publish_post"), + NewPostStream: viper.GetString("mq.redis.stream.new_post"), } } @@ -33,3 +40,13 @@ func (r *RedisStreamMQ) PublishPost(postID int) error { }).Result() return err } + +func (r *RedisStreamMQ) NewPost(postID int) error { + _, err := r.Client.XAdd(context.Background(), &redis.XAddArgs{ + Stream: r.NewPostStream, + Values: map[string]interface{}{ + "post_id": postID, + }, + }).Result() + return err +} diff --git a/backend/service/post.go b/backend/service/post.go index 19ec712..e89e127 100644 --- a/backend/service/post.go +++ b/backend/service/post.go @@ -5,6 +5,7 @@ import ( "io" "github.com/RockChinQ/Campux/backend/database" + "github.com/RockChinQ/Campux/backend/mq" "github.com/RockChinQ/Campux/backend/oss" "github.com/RockChinQ/Campux/backend/util" ) @@ -12,12 +13,14 @@ import ( type PostService struct { DB database.MongoDBManager OSS oss.MinioClient + MQ mq.RedisStreamMQ } -func NewPostService(db database.MongoDBManager, oss oss.MinioClient) *PostService { +func NewPostService(db database.MongoDBManager, oss oss.MinioClient, mq mq.RedisStreamMQ) *PostService { return &PostService{ DB: db, OSS: oss, + MQ: mq, } } @@ -78,6 +81,13 @@ func (ps *PostService) PostNew(uuid string, uin int64, text string, images []str return -1, err } + // 通知到mq + err = ps.MQ.NewPost(id) + + if err != nil { + return -1, err + } + return id, nil }