diff --git a/apps/gateway/cmd/main.go b/apps/gateway/cmd/main.go index a3f54f9..7c92b17 100644 --- a/apps/gateway/cmd/main.go +++ b/apps/gateway/cmd/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "github.com/gin-gonic/gin" @@ -9,6 +10,7 @@ import ( "github.com/yusank/goim/apps/gateway/internal/app" "github.com/yusank/goim/apps/gateway/internal/router" "github.com/yusank/goim/apps/gateway/internal/service" + "github.com/yusank/goim/pkg/graceful" "github.com/go-kratos/kratos/v2/log" ) @@ -39,5 +41,8 @@ func main() { log.Info(err) } - application.Stop() + graceful.Register(application.Shutdown) + if err = graceful.Shutdown(context.TODO()); err != nil { + log.Infof("graceful shutdown error: %v", err) + } } diff --git a/apps/msg/cmd/main.go b/apps/msg/cmd/main.go index 3d32d21..4d8d1a5 100644 --- a/apps/msg/cmd/main.go +++ b/apps/msg/cmd/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "github.com/go-kratos/kratos/v2/log" @@ -8,6 +9,7 @@ import ( messagev1 "github.com/yusank/goim/api/message/v1" "github.com/yusank/goim/apps/msg/internal/app" "github.com/yusank/goim/apps/msg/internal/service" + "github.com/yusank/goim/pkg/graceful" "github.com/yusank/goim/pkg/mq" ) @@ -44,5 +46,8 @@ func main() { log.Info(err) } - application.Stop() + graceful.Register(application.Shutdown) + if err = graceful.Shutdown(context.TODO()); err != nil { + log.Infof("graceful shutdown error: %v", err) + } } diff --git a/apps/msg/internal/service/msg_service.go b/apps/msg/internal/service/msg_service.go index b5fd729..2b514f0 100644 --- a/apps/msg/internal/service/msg_service.go +++ b/apps/msg/internal/service/msg_service.go @@ -7,13 +7,12 @@ import ( "strings" "sync" - redisv8 "github.com/go-redis/redis/v8" - "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/selector" "github.com/go-kratos/kratos/v2/transport/grpc" + redisv8 "github.com/go-redis/redis/v8" ggrpc "google.golang.org/grpc" messagev1 "github.com/yusank/goim/api/message/v1" @@ -61,41 +60,39 @@ func (s *MqMessageService) Consume(ctx context.Context, msg ...*primitive.Messag func (s *MqMessageService) handleSingleMsg(ctx context.Context, msg *primitive.MessageExt) error { // PushMessageReq contains all MqMessage fields. - req := &messagev1.PushMessageReq{} + req := &messagev1.MqMessage{} if err := json.Unmarshal(msg.Body, req); err != nil { return err } + in := &messagev1.PushMessageReq{ + FromUser: req.GetFromUser(), + ToUser: req.GetToUser(), + PushMessageType: req.GetPushMessageType(), + ContentType: req.GetContentType(), + Content: req.GetContent(), + MsgSeq: msg.MsgId, + } + if req.GetPushMessageType() == messagev1.PushMessageType_Broadcast { - return s.broadcast(ctx, req) + return s.broadcast(ctx, in) } - var agentID string str, err := s.rdb.Get(ctx, data.GetUserOnlineAgentKey(req.GetToUser())).Result() if err != nil { if err == redisv8.Nil { log.Infof("user=%s not online, put to offline queue", req.GetToUser()) - return s.putToRedis(ctx, msg, req) + return s.putToRedis(ctx, msg, in) } return err } - agentID = str - cc, err := s.loadGrpcConn(ctx, agentID) + in.AgentId = str + cc, err := s.loadGrpcConn(ctx, in.AgentId) if err != nil { return err } - in := &messagev1.PushMessageReq{ - FromUser: req.GetFromUser(), - ToUser: req.GetToUser(), - PushMessageType: messagev1.PushMessageType_User, - ContentType: req.GetContentType(), - Content: req.GetContent(), - AgentId: agentID, - MsgSeq: msg.MsgId, - } - out, err := messagev1.NewPushMessagerClient(cc).PushMessage(ctx, in) if err != nil { log.Info("MSG send msg err=", err) diff --git a/apps/push/cmd/main.go b/apps/push/cmd/main.go index b5116ac..050100f 100644 --- a/apps/push/cmd/main.go +++ b/apps/push/cmd/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "github.com/gin-gonic/gin" @@ -10,6 +11,7 @@ import ( "github.com/yusank/goim/apps/push/internal/app" "github.com/yusank/goim/apps/push/internal/router" "github.com/yusank/goim/apps/push/internal/service" + "github.com/yusank/goim/pkg/graceful" ) var ( @@ -27,7 +29,11 @@ func main() { if err != nil { log.Fatal(err) } + + // register grpc messagev1.RegisterPushMessagerServer(application.GrpcSrv, service.GetPushMessager()) + + // register router g := gin.Default() router.RegisterRouter(g.Group("/push/service")) application.HTTPSrv.HandlePrefix("/", g) @@ -35,4 +41,9 @@ func main() { if err = application.Run(); err != nil { log.Fatal(err) } + + graceful.Register(application.Shutdown) + if err = graceful.Shutdown(context.TODO()); err != nil { + log.Infof("graceful shutdown error: %s", err) + } } diff --git a/apps/push/internal/service/push_message.go b/apps/push/internal/service/push_message.go index 398e871..b1716c4 100644 --- a/apps/push/internal/service/push_message.go +++ b/apps/push/internal/service/push_message.go @@ -11,6 +11,7 @@ import ( messagev1 "github.com/yusank/goim/api/message/v1" "github.com/yusank/goim/pkg/conn/pool" "github.com/yusank/goim/pkg/conn/wrapper" + "github.com/yusank/goim/pkg/graceful" "github.com/yusank/goim/pkg/worker" ) @@ -28,6 +29,7 @@ func GetPushMessager() *PushMessager { pmOnce.Do(func() { pm = new(PushMessager) pm.workerPool = worker.NewPool(100, 20) + graceful.Register(pm.workerPool.Shutdown) }) return pm diff --git a/docs/content/docs/advance/configuration.md b/docs/content/docs/advance/configuration.md index 02b68c2..bb372ff 100644 --- a/docs/content/docs/advance/configuration.md +++ b/docs/content/docs/advance/configuration.md @@ -1,3 +1,80 @@ +--- +weight: 1 +--- + # Configuration -> config \ No newline at end of file +配置为两份文件分别为 service config 和 registry config + +- service config 关注服务启停以及声明周期中需要的各类配置 +- registry config 关注服务注册相关配置 + +## server config definition + +```proto +// Service 为一个服务的全部配置 +message Service { + string name = 1; + string version = 2; + optional Server http = 3; + optional Server grpc = 4; + Log log = 5; + map<string, string> metadata = 6; + Redis redis = 7; + MQ mq = 8; +} + +message Server { + string scheme = 1; + string addr = 2; + int32 port = 3; +} + + +enum Level { + DEBUG = 0; + INFO = 1; + WARING = 2; + ERROR = 3; + FATAL = 4; +} + +message Log { + optional string log_path = 1; + repeated Level level = 2; +} + +message Redis { + string addr = 1; + string password = 2; + int32 max_conns = 3; + int32 min_idle_conns = 4; + google.protobuf.Duration dial_timeout = 5; + google.protobuf.Duration idle_timeout = 6; +} + +message MQ { + repeated string addr = 1; + int32 max_retry = 2; +} +``` + +## registry config definition + +```proto +message RegistryInfo { + repeated string addr = 1; + string scheme = 2; + google.protobuf.Duration dial_timeout_sec = 3; + google.protobuf.Duration dial_keep_alive_time_sec = 4; + google.protobuf.Duration dial_keep_alive_timeout_sec = 5; +} + +message Registry { + string name = 1; + oneof reg { + RegistryInfo consul = 2; + RegistryInfo etcd = 3; + } +} +``` diff --git a/docs/content/docs/example/example.md b/docs/content/docs/example/example.md deleted file mode 100644 index bd7b116..0000000 --- a/docs/content/docs/example/example.md +++ /dev/null @@ -1,3 +0,0 @@ -# Example - -> hello world. diff --git a/docs/content/docs/example/terminal_cli/gui.png b/docs/content/docs/example/terminal_cli/gui.png new file mode 100644 index 0000000..6fcdc23 Binary files /dev/null and b/docs/content/docs/example/terminal_cli/gui.png differ diff --git a/docs/content/docs/example/terminal_cli/index.md b/docs/content/docs/example/terminal_cli/index.md new file mode 100644 index 0000000..1061803 --- /dev/null +++ b/docs/content/docs/example/terminal_cli/index.md @@ -0,0 +1,22 @@ +--- +title: "Terminal CLI" +--- + +## How to run it + +服务提供一个简单的终端 GUI 可以测试消息的发送和接受,代码在 `tests` 目录下。 + +在 `goim/test` 目录下执行如下命令: + +```shell +# 支持参数 +# ADDR ?= 127.0.0.1:18071 +# UID ?= user1 +# TOUID ?= user2 + +make run-gui UID=user3 TOUID=user2 +``` + +界面如下: + +![gui](./gui.png) diff --git a/docs/content/docs/quick_start/prepare.md b/docs/content/docs/quick_start/prepare.md index b737574..e834406 100644 --- a/docs/content/docs/quick_start/prepare.md +++ b/docs/content/docs/quick_start/prepare.md @@ -2,10 +2,43 @@ weight: 2 title: "Prepare" --- -# Prepare -## install mq +## requirement -## install consul +### environment -## install redis \ No newline at end of file +- apache/rocketmq 4.6.0+ +- consul 1.11.4+ +- redis 2.0+ + +> 关于部署 rocketmq:docker 部署 rocketmq 过程中遇到过一些问题,如果你有疑问可以参考这篇文章 [Docker 部署 RocketMQ](https://yusank.space/posts/rocketmq-deploy/) + +### config + +msg service 为例,`apps/msg/config/config.yaml`: + + name: goim.msg.service + version: v0.0.0 + grpc: + scheme: grpc + port: 18063 + log: + level: + - INFO + - DEBUG + metadata: + grpcSrv: yes + redis: + addr: 127.0.0.1:6379 + mq: + addr: + - 127.0.0.1:9876 + +`apps/msg/config/registry.yaml` : + + consul: + addr: + - 127.0.0.1:8500 + scheme: http + +根据自己的环境去修改各个组件的地址和端口。 diff --git a/docs/content/docs/quick_start/quick_start.md b/docs/content/docs/quick_start/quick_start.md index de55708..89f5324 100644 --- a/docs/content/docs/quick_start/quick_start.md +++ b/docs/content/docs/quick_start/quick_start.md @@ -4,14 +4,43 @@ weight: 1 # Quick Start -## build +## run ```shell -$ make build all +# run msg service +$ make run Srv=msg +# run gateway service +$ make run Srv=gateway +# run push service +$ make run Srv=push ``` -## run +## other make command ```shell -$ make run Srv=xxx +make help + +Usage: + make <target> + +Development + vet Run go vet against code. + lint Run go lint against code. + test Run test against code. + +Generate + protoc Run protoc command to generate pb code. + +Build + build build provided server + build-all build all apps + +Docker + docker-build build docker image + +Run + run run provided server + +General + help Display this help. ``` diff --git a/pkg/app/application.go b/pkg/app/application.go index 127392c..6f27e5f 100644 --- a/pkg/app/application.go +++ b/pkg/app/application.go @@ -1,8 +1,8 @@ package app import ( + "context" "fmt" - "log" "go.uber.org/atomic" @@ -14,6 +14,7 @@ import ( redisv8 "github.com/go-redis/redis/v8" "github.com/yusank/goim/pkg/db/redis" + "github.com/yusank/goim/pkg/errors" "github.com/yusank/goim/pkg/mq" "github.com/yusank/goim/pkg/registry" ) @@ -141,18 +142,53 @@ func (a *Application) Run() error { return a.Core.Run() } -func (a *Application) Stop() { - if a.Producer != nil { - if err := a.Producer.Shutdown(); err != nil { - log.Println("stop producer err=", err) +func (a *Application) Shutdown(ctx context.Context) error { + var ( + es = make(errors.ErrorSet, 0) + checkCtxAndExecute = func(f func() error) { + select { + case <-ctx.Done(): + es = append(es, ctx.Err()) + return + default: + } + + if err := f(); err != nil { + es = append(es, err) + } } + ) + + if a.Producer != nil { + checkCtxAndExecute(func() error { + if err := a.Producer.Shutdown(); err != nil { + return fmt.Errorf("shutdown producer error: %w", err) + } + + return nil + }) } for _, consumer := range a.Consumer { - if err := consumer.Shutdown(); err != nil { - log.Println("stop consumer err=", err) - } + checkCtxAndExecute(func() error { + if err := consumer.Shutdown(); err != nil { + return fmt.Errorf("shutdown consumer error: %w", err) + } + + return nil + }) + } + + if a.Redis != nil { + checkCtxAndExecute(func() error { + if err := a.Redis.Close(); err != nil { + return fmt.Errorf("close redis error: %w", err) + } + return nil + }) } + + return es.Err() } func (a *Application) AddConsumer(c mq.Consumer) { diff --git a/pkg/errors/error_set.go b/pkg/errors/error_set.go new file mode 100644 index 0000000..3aad184 --- /dev/null +++ b/pkg/errors/error_set.go @@ -0,0 +1,21 @@ +package errors + +type ErrorSet []error + +// Err returns the error set as an error if error set length is greater then 0. +// Otherwise, it returns nil. +func (e ErrorSet) Err() error { + if len(e) > 0 { + return e + } + + return nil +} + +func (e ErrorSet) Error() string { + var s string + for _, err := range e { + s += err.Error() + "\n" + } + return s +} diff --git a/pkg/graceful/graceful.go b/pkg/graceful/graceful.go new file mode 100644 index 0000000..28cc942 --- /dev/null +++ b/pkg/graceful/graceful.go @@ -0,0 +1,71 @@ +package graceful + +import ( + "context" + "sync" + "time" + + "github.com/go-kratos/kratos/v2/log" + "github.com/yusank/goim/pkg/errors" +) + +var ( + // DefaultTimeout is the default timeout for graceful shutdown. + DefaultTimeout = 30 * time.Second + + // need a function set to store all the graceful shutdown functions + // so that we can call them all at once when the server is shutdown. + gracefulShutdownFuncs []gracefulShutdownFunc +) + +// gracefulShutdownFunc is a function that is called when the server is shutdown. +// gracefulShutdownFunc need pass context to limit the time of function execution. +type gracefulShutdownFunc func(ctx context.Context) error + +// Shutdown gracefully shuts down the server. +// +// This function will block until the shutdown is complete. +// It is recommended to pass a context with timeout to limit the time of server shutdown. +func Shutdown(ctx context.Context) error { + log.Info("graceful shutdown...") + var cancel context.CancelFunc + if ctx == nil { + ctx = context.Background() + ctx, cancel = context.WithTimeout(ctx, DefaultTimeout) + defer cancel() + } + + var ( + done = make(chan struct{}) + errs = make(errors.ErrorSet, 0) + wg sync.WaitGroup + ) + + for _, f := range gracefulShutdownFuncs { + wg.Add(1) + go func(f gracefulShutdownFunc) { + defer wg.Done() + if err := f(ctx); err != nil { + errs = append(errs, err) + } + }(f) + } + + go func() { + wg.Wait() + done <- struct{}{} + }() + + select { + case <-ctx.Done(): // timeout + return ctx.Err() + case <-done: // shutdown complete + return errs.Err() + } + +} + +// Register registers a function to be called when the server is shutdown. +func Register(f gracefulShutdownFunc) { + gracefulShutdownFuncs = append(gracefulShutdownFuncs, f) +} diff --git a/pkg/worker/pool.go b/pkg/worker/pool.go index 3ccc3a9..e67b17f 100644 --- a/pkg/worker/pool.go +++ b/pkg/worker/pool.go @@ -2,7 +2,6 @@ package worker import ( "context" - "log" "sync" "time" @@ -77,15 +76,22 @@ func (p *Pool) Submit(ctx context.Context, tf TaskFunc, concurrence int) TaskRes return TaskStatusQueueFull } -func (p *Pool) Stop() { +func (p *Pool) Shutdown(ctx context.Context) error { p.stopFlag.Store(true) // stop queue daemon close(p.taskQueue) // stop all workers for _, ws := range p.workerSets { - ws.stopAll() - ws.wait() + select { + case <-ctx.Done(): + return ctx.Err() + default: + ws.stopAll() + ws.wait() + } } + + return nil } // tryRunTask try to put task into workerSet and run it.Return false if capacity not enough. @@ -188,7 +194,7 @@ func (p *Pool) consumeQueue() { // check if there has any worker place left // TODO: check if there is any workerSet is idle and remove it // TODO: try to run enqueued tasks even if there is no enough worker to run. - log.Printf("current running worker num: %d", p.curRunningWorkerNum()) + // log.Printf("current running worker num: %d", p.curRunningWorkerNum()) } } diff --git a/pkg/worker/pool_test.go b/pkg/worker/pool_test.go index 4c81d2b..b1defff 100644 --- a/pkg/worker/pool_test.go +++ b/pkg/worker/pool_test.go @@ -46,7 +46,7 @@ func TestPool_SubmitOrEnqueue(t *testing.T) { t.Errorf("SubmitOrEnqueue() = %v, want %v", got.Status(), TaskStatusQueueFull) return } - p.Stop() + _ = p.Shutdown(context.TODO()) if got := p.Submit(context.Background(), tf, 1); got.Status() != TaskStatusPoolClosed { t.Errorf("SubmitOrEnqueue() = %v, want %v", got.Status(), TaskStatusPoolClosed) return diff --git a/tests/client-bench/client.go b/tests/client-bench/client.go index 588cc6a..7f85cf0 100644 --- a/tests/client-bench/client.go +++ b/tests/client-bench/client.go @@ -93,7 +93,10 @@ func (c *client) readMsgFromConn() { log.Println("read msg err:", err) return } - log.Printf("Client=%s|data:%s\n", c.uid, string(data)) + str := string(data) + str = strings.Replace(str, "\n", "", -1) + str = strings.Replace(str, "\r", "", -1) + log.Printf("Client=%s|data:%s\n", c.uid, str) } } diff --git a/tests/client-gui/client.go b/tests/client-gui/client.go index 6b0bd16..67eed0f 100644 --- a/tests/client-gui/client.go +++ b/tests/client-gui/client.go @@ -151,7 +151,10 @@ func readMsgFromConn(conn *websocket.Conn) (chan []byte, chan error) { errChan <- err return } - logger.Println("data:", string(data)) + str := string(data) + str = strings.Replace(str, "\n", "", -1) + str = strings.Replace(str, "\r", "", -1) + logger.Println("data:", str) dataChan <- data } }() diff --git a/tests/client-gui/layout.go b/tests/client-gui/layout.go index 6b6c02e..80b61f0 100644 --- a/tests/client-gui/layout.go +++ b/tests/client-gui/layout.go @@ -63,7 +63,7 @@ func resetInput(g *gocui.Gui, v *gocui.View) error { r := bytes.NewReader(b) size := r.Size() - req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://%s/gateway/service/v1/send_msg", serverAddr), r) + req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://%s/gateway/service/v1/msg", serverAddr), r) if err != nil { logger.Println(err) return err