diff --git a/.github/workflows/gh-pages.yml b/.github/workflows/gh-pages.yml deleted file mode 100644 index 64c650c..0000000 --- a/.github/workflows/gh-pages.yml +++ /dev/null @@ -1,32 +0,0 @@ -name: github pages - -on: - push: - pull_request: - branches: - - main - -jobs: - deploy: - runs-on: ubuntu-20.04 - steps: - - uses: actions/checkout@v2 - with: - submodules: true # Fetch Hugo themes (true OR recursive) - fetch-depth: 0 # Fetch all history for .GitInfo and .Lastmod - - - name: Setup Hugo - uses: peaceiris/actions-hugo@v2 - with: - hugo-version: 'latest' - extended: true - - - name: Build - run: hugo --gc --minify --buildFuture --source ./docs - - - name: Deploy - uses: peaceiris/actions-gh-pages@v3 - if: github.ref == 'refs/heads/main' - with: - github_token: ${{ secrets.GITHUB_TOKEN }} - publish_dir: ./docs/public diff --git a/.gitmodules b/.gitmodules deleted file mode 100644 index 47029a0..0000000 --- a/.gitmodules +++ /dev/null @@ -1,3 +0,0 @@ -[submodule "docs/themes/hugo-book"] - path = docs/themes/hugo-book - url = https://github.com/alex-shpak/hugo-book diff --git a/README.md b/README.md index d9b2def..e7ce2d9 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,13 @@ # goim -> Instant Messaging Server written by golang. +![GoIM](https://go-goim.github.io/images/logo.png) -[![goliangci-lint](https://github.com/yusank/goim/actions/workflows/golangci-lint.yml/badge.svg)](https://github.com/yusank/goim/actions/workflows/golangci-lint.yml) -[![github pages](https://github.com/yusank/goim/actions/workflows/gh-pages.yml/badge.svg)](https://github.com/yusank/goim/actions/workflows/gh-pages.yml) -[![Semgrep](https://github.com/yusank/goim/actions/workflows/semgrep.yml/badge.svg)](https://github.com/yusank/goim/actions/workflows/semgrep.yml) +Instant Messaging Server written by golang. -> 选型参考:[https://zhuanlan.zhihu.com/p/31377253](https://zhuanlan.zhihu.com/p/31377253) - -预期能力: +[![golangci-lint](https://github.com/go-goim/goim/actions/workflows/golangci-lint.yml/badge.svg)](https://github.com/go-goim/goim/actions/workflows/golangci-lint.yml) +[![CodeQL](https://github.com/go-goim/goim/actions/workflows/codeql-analysis.yml/badge.svg)](https://github.com/go-goim/goim/actions/workflows/codeql-analysis.yml) +[![Semgrep](https://github.com/go-goim/goim/actions/workflows/semgrep.yml/badge.svg)](https://github.com/go-goim/goim/actions/workflows/semgrep.yml) -![design](./static/images/goim.png) - -目前消息流转: +> 选型参考:[https://zhuanlan.zhihu.com/p/31377253](https://zhuanlan.zhihu.com/p/31377253) -![msg](./static/images/send_rec_msg.png) \ No newline at end of file +详细文档请在官网查看:[https://go-goim.github.io](https://go-goim.github.io/) \ No newline at end of file diff --git a/docs/about/index.md b/docs/about/index.md deleted file mode 100644 index a9407bb..0000000 --- a/docs/about/index.md +++ /dev/null @@ -1,3 +0,0 @@ -# about - -GoIM is instant messaging system written by Go. diff --git a/docs/archetypes/default.md b/docs/archetypes/default.md deleted file mode 100644 index 00e77bd..0000000 --- a/docs/archetypes/default.md +++ /dev/null @@ -1,6 +0,0 @@ ---- -title: "{{ replace .Name "-" " " | title }}" -date: {{ .Date }} -draft: true ---- - diff --git a/docs/config.toml b/docs/config.toml deleted file mode 100644 index d204b35..0000000 --- a/docs/config.toml +++ /dev/null @@ -1,85 +0,0 @@ -baseURL = 'http://yusank.github.io/goim/' -title = 'GoIM' -theme = "hugo-book" -# (Optional) Set this to true if you use capital letters in file names -disablePathToLower = true -# (Optional) Set this to true to enable 'Last Modified by' date and git author -# information on 'doc' type pages. -enableGitInfo = true - -# Needed for mermaid/katex shortcodes -[markup] -[markup.goldmark.renderer] - unsafe = true - -[markup.tableOfContents] - startLevel = 1 - endLevel = 4 - - -# (Optional) Theme is intended for documentation use, therefore it doesn't render taxonomy. -# You can remove related files with config below - -[languages] -[languages.en] - languageName = '中文' - contentDir = 'content' - weight = 1 - -[params] - # (Optional, default light) Sets color theme: light, dark or auto. - # Theme 'auto' switches between dark and light modes based on browser/os preferences - BookTheme = 'auto' - - # (Optional, default true) Controls table of contents visibility on right side of pages. - # Start and end levels can be controlled with markup.tableOfContents setting. - # You can also specify this parameter per page in front matter. - BookToC = true - -# BookMenuBundle = '/menu' - - # (Optional, default docs) Specify section of content to render as menu - # You can also set value to "*" to render all sections to menu - BookSection = 'docs' - - # Set source repository location. - # Used for 'Last Modified' and 'Edit this page' links. - BookRepo = 'https://github.com/yusank/goim' - - # Specifies commit portion of the link to the page's last modified commit hash for 'doc' page - # type. - # Required if 'BookRepo' param is set. - # Value used to construct a URL consisting of BookRepo/BookCommitPath/ - # Github uses 'commit', Bitbucket uses 'commits' - # BookCommitPath = 'commit' - - # Enable 'Edit this page' links for 'doc' page type. - # Disabled by default. Uncomment to enable. Requires 'BookRepo' param. - # Path must point to the site directory. - BookEditPath = 'edit/main/docs' - - # (Optional, default January 2, 2006) Configure the date format used on the pages - # - In git information - # - In blog posts - BookDateFormat = 'Jan 2, 2006' - - # (Optional, default true) Enables search function with flexsearch, - # Index is built on fly, therefore it might slowdown your website. - # Configuration for indexing can be adjusted in i18n folder per language. - BookSearch = true - - # (Optional, default true) Enables comments template on pages - # By default partials/docs/comments.html includes Disqus template - # See https://gohugo.io/content-management/comments/#configure-disqus - # Can be overwritten by same param in page frontmatter - BookComments = true - - # /!\ This is an experimental feature, might be removed or changed at any time - # (Optional, experimental, default false) Enables portable links and link checks in markdown pages. - # Portable links meant to work with text editors and let you write markdown without {{< relref >}} shortcode - # Theme will print warning if page referenced in markdown does not exists. - BookPortableLinks = true - - # /!\ This is an experimental feature, might be removed or changed at any time - # (Optional, experimental, default false) Enables service worker that caches visited pages and resources for offline use. - BookServiceWorker = true diff --git a/docs/content/_index.md b/docs/content/_index.md deleted file mode 100644 index a74731e..0000000 --- a/docs/content/_index.md +++ /dev/null @@ -1,75 +0,0 @@ ---- -title: Introduction -type: docs ---- - -> Instant Messaging system written by Go. - -## How to run - -```shell -# run push server -make run Srv=push -# run gateway server -make run Srv=gateway -# run msg server -make run Srv=msg -``` - -## Design of GoIM - -### 整体能力规划 - -![design](https://raw.githubusercontent.com/yusank/goim/main/static/images/goim.png) - -#### 客户端如何查找和连接长连接服务 - -客户端如何连接长连接服务,目前我有两个方案各有优缺点,但是还没确定。 - -#### 反向代理方案 - -客户端统一入口在 gateway 上,gateway 支持反向代理能力,客户端发起长连接请求时,代理到后端的服务(这里准备使用一致性哈希来确定转发到哪台机器上) - -优点: - -- 入口统一,且可以在 gateway 上完成鉴权等操作 -- 后端服务无需暴露 ip,且可任意扩缩容比较安全 - -缺点: - -- gateway 需要承受长连接带来的压力,需要更多的 gateway 来承受大量在线用户的情况 - -#### httpdns 方案 - -客户端先通过暴露的域名,去访问 httpdns 服务获取真正后端服务的 ip,然后通过 ip 直接进行长连接 - -优点: - -- 客户端与长连接服务器直连,减少代理层的压力 - -缺点: - -- 要求暴露后端服务 ip,安全性降低且比较浪费 ip 资源 - -##### 纯 httpdns - -![ws](https://raw.githubusercontent.com/yusank/goim/main/static/images/conn_ws_dns.png) - -##### 结合 gateway - -![gateway](https://raw.githubusercontent.com/yusank/goim/main/static/images/conn_ws_gateway.png) - -#### 结论 - -最终决定,使用基于 gateway 作为第一入口,再返回长链接服务的方案. -原因如下: - -1. 可以在 gateway 这一层做初步的校验和分配长链接服务的策略(比如按最小连接数,id 哈希等) -2. 反向代理会使系统更复杂且上层反向代理会有比较大的压力,项目初期不想搞太复杂 -3. 对于客户端来说 gateway 就是一切了,之后要加的用户体系都是通过 gateway 暴露出来,入口可以比较收拢. - -### 消息的流转 - -IM 数据将在 HBASE 上存储,关系型数据存在 MySQL - -![msg](https://raw.githubusercontent.com/yusank/goim/main/static/images/send_rec_msg.png) diff --git a/docs/content/docs/advance/_index.md b/docs/content/docs/advance/_index.md deleted file mode 100644 index ea0e46f..0000000 --- a/docs/content/docs/advance/_index.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -weight: 2 -bookFlatSection: true -title: "高级特性" ---- \ No newline at end of file diff --git a/docs/content/docs/advance/configuration.md b/docs/content/docs/advance/configuration.md deleted file mode 100644 index 99ffa40..0000000 --- a/docs/content/docs/advance/configuration.md +++ /dev/null @@ -1,80 +0,0 @@ ---- -weight: 1 ---- - -# Configuration - -配置为两份文件分别为 service config 和 registry config - -- service config 关注服务启停以及声明周期中需要的各类配置 -- registry config 关注服务注册相关配置 - -## server config definition - -```proto -// Service 为一个服务的全部配置 -message Service { - string name = 1; - string version = 2; - optional Server http = 3; - optional Server grpc = 4; - Log log = 5; - map metadata = 6; - Redis redis = 7; - MQ mq = 8; -} - -message Server { - string scheme = 1; - string addr = 2; - int32 port = 3; -} - - -enum Level { - DEBUG = 0; - INFO = 1; - WARING = 2; - ERROR = 3; - FATAL = 4; -} - -message Log { - optional string log_path = 1; - repeated Level level = 2; -} - -message Redis { - string addr = 1; - string password = 2; - int32 max_conns = 3; - int32 min_idle_conns = 4; - google.protobuf.Duration dial_timeout = 5; - google.protobuf.Duration idle_timeout = 6; -} - -message MQ { - repeated string addr = 1; - int32 max_retry = 2; -} -``` - -## registry config definition - -```proto -message RegistryInfo { - repeated string addr = 1; - string scheme = 2; - google.protobuf.Duration dial_timeout_sec = 3; - google.protobuf.Duration dial_keep_alive_time_sec = 4; - google.protobuf.Duration dial_keep_alive_timeout_sec = 5; -} - -message Registry { - string name = 1; - oneof reg { - RegistryInfo consul = 2; - RegistryInfo etcd = 3; - } -} -``` diff --git a/docs/content/docs/design/_index.md b/docs/content/docs/design/_index.md deleted file mode 100644 index 641573b..0000000 --- a/docs/content/docs/design/_index.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -weight: 2 -bookFlatSection: true -title: "设计" ---- \ No newline at end of file diff --git a/docs/content/docs/design/design.v1.md b/docs/content/docs/design/design.v1.md deleted file mode 100644 index 850a347..0000000 --- a/docs/content/docs/design/design.v1.md +++ /dev/null @@ -1,77 +0,0 @@ ---- -weight: 1 ---- - -# Design of version1 - -> 关于下个大版本的设计稿 - -## 准备工作 - -- 提升现有的系统稳定性 -- 确保系统日志已接入,可回溯问题原因 -- 抽象长链接部分,确保后期可替换可测试 -- 完善测试流程和测试用例 - -## 目标 - -- 支持用户关系,设计用户信息与用户关系 -- 完善消息协议 -- 支持消息持久化 -- 支持 Prometheus 和 open tracing -- **支持写扩散、读扩散** - -## 非目标 - -- admin 能力可以添加但是不重要 - -## 核心流程 - -1. 提升稳定性 -2. 设计用户相关表设计 -3. 设计消息持久化数据结构 -4. 添加用户管理相关能力,增删改查用户关系等 -5. 完善消息协议,补充必要信息 -6. 支持写扩散,读扩散并进行压测 - -### 提升稳定性 - -- 添加更完善的测试场景和测试用例,一次全量测试用例跑完后,能确保功能可用的状态。 -- 提供一键部署能力,最好是通过 docker-compose 的方式部署各个组件和 Service - -### 用户体系 - -- 用户登录状态 -- 用户鉴权 -- 用户关系的变更 - -### 消息持久化 - -- 通过单独一个组件对所有消息进行持久化到 HBASE -- mq 中的消息信息需要更详细 -- 提供查询历史消息的能力 - -### 用户管理 - -> optional - -- 提供用户管理能力,方便测试 debug - -### 消息协议的完善 - -- 根据进度补充信息 - -### 读写扩散 - -- 列出详细的对比、使用场景 -- 根据需求实现读写扩散 - -### 监控 - -> optional - -- 添加 Prometheus, open tracing,方便监控和追踪问题 - -## 其他 - -- 应把 connection 抽象出来,并支持 tcp 方式的长链接 diff --git a/docs/content/docs/example/_index.md b/docs/content/docs/example/_index.md deleted file mode 100644 index 7e925ba..0000000 --- a/docs/content/docs/example/_index.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -weight: 3 -bookFlatSection: true -title: "使用示例" ---- \ No newline at end of file diff --git a/docs/content/docs/example/terminal_cli/gui.png b/docs/content/docs/example/terminal_cli/gui.png deleted file mode 100644 index 6fcdc23..0000000 Binary files a/docs/content/docs/example/terminal_cli/gui.png and /dev/null differ diff --git a/docs/content/docs/example/terminal_cli/index.md b/docs/content/docs/example/terminal_cli/index.md deleted file mode 100644 index 1061803..0000000 --- a/docs/content/docs/example/terminal_cli/index.md +++ /dev/null @@ -1,22 +0,0 @@ ---- -title: "Terminal CLI" ---- - -## How to run it - -服务提供一个简单的终端 GUI 可以测试消息的发送和接受,代码在 `tests` 目录下。 - -在 `goim/test` 目录下执行如下命令: - -```shell -# 支持参数 -# ADDR ?= 127.0.0.1:18071 -# UID ?= user1 -# TOUID ?= user2 - -make run-gui UID=user3 TOUID=user2 -``` - -界面如下: - -![gui](./gui.png) diff --git a/docs/content/docs/quick_start/_index.md b/docs/content/docs/quick_start/_index.md deleted file mode 100644 index 1253786..0000000 --- a/docs/content/docs/quick_start/_index.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -weight: 1 -bookFlatSection: true -title: "快速开始" ---- \ No newline at end of file diff --git a/docs/content/docs/quick_start/prepare.md b/docs/content/docs/quick_start/prepare.md deleted file mode 100644 index e834406..0000000 --- a/docs/content/docs/quick_start/prepare.md +++ /dev/null @@ -1,44 +0,0 @@ ---- -weight: 2 -title: "Prepare" ---- - -## requirement - -### environment - -- apache/rocketmq 4.6.0+ -- consul 1.11.4+ -- redis 2.0+ - -> 关于部署 rocketmq:docker 部署 rocketmq 过程中遇到过一些问题,如果你有疑问可以参考这篇文章 [Docker 部署 RocketMQ](https://yusank.space/posts/rocketmq-deploy/) - -### config - -msg service 为例,`apps/msg/config/config.yaml`: - - name: goim.msg.service - version: v0.0.0 - grpc: - scheme: grpc - port: 18063 - log: - level: - - INFO - - DEBUG - metadata: - grpcSrv: yes - redis: - addr: 127.0.0.1:6379 - mq: - addr: - - 127.0.0.1:9876 - -`apps/msg/config/registry.yaml` : - - consul: - addr: - - 127.0.0.1:8500 - scheme: http - -根据自己的环境去修改各个组件的地址和端口。 diff --git a/docs/content/docs/quick_start/quick_start.md b/docs/content/docs/quick_start/quick_start.md deleted file mode 100644 index 89f5324..0000000 --- a/docs/content/docs/quick_start/quick_start.md +++ /dev/null @@ -1,46 +0,0 @@ ---- -weight: 1 ---- - -# Quick Start - -## run - -```shell -# run msg service -$ make run Srv=msg -# run gateway service -$ make run Srv=gateway -# run push service -$ make run Srv=push -``` - -## other make command - -```shell -make help - -Usage: - make - -Development - vet Run go vet against code. - lint Run go lint against code. - test Run test against code. - -Generate - protoc Run protoc command to generate pb code. - -Build - build build provided server - build-all build all apps - -Docker - docker-build build docker image - -Run - run run provided server - -General - help Display this help. -``` diff --git a/docs/content/posts/_index.md b/docs/content/posts/_index.md deleted file mode 100644 index abfb8c4..0000000 --- a/docs/content/posts/_index.md +++ /dev/null @@ -1,7 +0,0 @@ ---- -menu: - after: - name: blog - weight: 5 -title: Blog ---- diff --git a/docs/content/posts/bugs.md b/docs/content/posts/bugs.md deleted file mode 100644 index 7c11cb6..0000000 --- a/docs/content/posts/bugs.md +++ /dev/null @@ -1,13 +0,0 @@ ---- -weight: 1 -bookFlatSection: true -title: "解决bug" -date: 2022-03-31 -tags: - - "bugs" -categories: - - "Development" - - "bugs" ---- - -> 记录开过过程中遇到的问题以及解决过程。 diff --git a/docs/content/posts/worker_pool.md b/docs/content/posts/worker_pool.md deleted file mode 100644 index 7b29619..0000000 --- a/docs/content/posts/worker_pool.md +++ /dev/null @@ -1,532 +0,0 @@ ---- -weight: 2 -bookFlatSection: true -title: "实现异步并发 worker 队列" -date: 2022-04-01 -tags: - - "go" - - "golang" - - "worker" - - "pool" -categories: - - "Development" - - "golang" ---- - -> 记录实现一个异步并发 worker 队列的过程。 - -在开发 broadcast 功能的时候,碰到一个比较棘手的问题,需要并发执行多个 worker 来讲 broadcast 消息推送到所有在线用户,同时我希望能控制并发数量。 - - - -## 前言 - -以往遇到类似的问题我都会借助 `sync.WaitGroup` 加 `channel` 的方式去做,实现方式也比较简单。大致思路如下: - -```go -type LimitedWaitGroup struct { - wg *sync.WaitGroup - ch chan int -} - -func NewLimitedWaitGroup(size int) *LimitedWaitGroup { - return &LimitedWaitGroup{ - wg : new(sync.WaitGroup), - ch : make(chan int, size) - } -} - -func (w *LimitedWaitGroup) Add(f func()) { - // wait if channel is full - w.ch <- 1 - w.wg.Add(1) - go func() { - defer w.done() - f() - }() -} - -func (w *LimitedWaitGroup) done() { - <-w.ch - w.wg.Done() -} - -func (w *LimitedWaitGroup) Wait() { - w.wg.Wait() -} -``` - -这样能解决我大部分的简单需求,但是现在我想要的能力用这个简单的 `LimitedWaitGroup` 无法完全满足,所以重新设计了一个 `worker pool` 的概念来满足我现在以及以后类似的需求。 - -## 设计 - -### 需求整理 - -首先将目前想到的需求以及其优先级列出来: - -**高优先级:** - -1. worker pool 支持设置 size,防止 worker 无限增多 -2. 任务并发执行且能指定并发数 -3. 当 worker 达到上线时,新的任务在一定范围内支持排队等待(即 `limited queue`) -4. 支持捕获任务错误 -5. 排队中的任务应该按顺序调度执行 - -**低优先级:** - -1. 任务支持实时状态更新 -2. 任务可以外部等待完成(类似 `waitGroup.Done()` ) -3. 当空闲 worker 小于指定并发数时,支持占用空闲 worker 部分运行(如当前剩余 3 个 worker 可用,但是新的任务需要 5 个并发,则尝试先占用这 3 个worker,并在运行过程中继续监听 pool 空闲出来的 worker 并尝试去占用) - -{{< hint info >}} -**小结** -列出完需求及其优先级后,经过考虑决定,高优先级除了`第五条`, 低优先级除了`第三条`, 其他需求都在目前版本里实现。 - -原因如下: - -- 首先说低优先级第三条,这块的部分调度执行 worker,目前没有想好比较优雅的实现方式,所以暂时没有实现(但是下个版本会实现) -- 高优先级的第五条也是跟调度有点关系,如果队列里靠前的任务需要大量的 worker,那很容易造成阻塞,后面的 task 一直没办法执行,即便需要很少的 worker。所以等部分调度执行开发完再把任务按需执行打开。 - -{{< /hint >}} - -### Task Definition - -`task` 表示一次任务,包含了任务执行的方法,并发数,所属的 `workerSet`以及执行状态等。 - -```go -type TaskFunc func() error - -type task struct { - tf TaskFunc // task function - concurrence int // concurrence of task - ws *workerSet // assign value after task distribute to worker. - status TaskStatus // store task status. -} - -// TaskStatus is the status of task. -type TaskStatus int -``` - -### Worker Definition - -`worker` 作为最小调度单元,仅包含 `workerSet` 和 `error` . - -```go -type worker struct { - ws *workerSet - err error -} -``` - -### TaskResult Definition - -`TaskResult` 是一个对外暴露的 `interface`, 用于外部调用者获取和管理任务执行状态信息。 - -```go -// TaskResult is a manager of submitted task. -type TaskResult interface { - // get error if task failed. - Err() error - // wait for task done. - Wait() - // get task status. - Status() TaskStatus - // kill task. - Kill() -} -``` - -{{< hint warning >}} -**`task` 和 `TaskStatus` 分别实现 `TaskResult` 的接口,从而外部统一拿到 `TaskResult`** - -> 之所以 `TaskStatus` 也需要实现 `TaskResult` 是因为部分情况下,不需要创建 `task` 直接返回错误状态即可。如: -> 提交的任务的并发数过高(超过 pool 的 size),当前 queue 已满不能再处理任何其他任务了,这种情况直接返回对应的状态码。 -{{< /hint >}} - -### WorkerSet Definition - -`workerSet` 为一组 `worker`的集合,作用是调度 `worker` 并维护起所属 `task` 的整个生命过程. - -```go -// workerSet represent a group of task handle workers -type workerSet struct { - task *task - runningWorker atomic.Int32 - workers []*worker - ctx context.Context - cancel context.CancelFunc - wg *sync.WaitGroup -} -``` - -### WorkerPool Definition - -`Pool` 是一个可指定 size 的 worker pool. 可并发运行多个 task 并且支持额外的任务排队能力。 - -```go -// Pool is a buffered worker pool -type Pool struct { - // TODO: taskQueue should be a linked list, so that we can get the task from the head of the list and put it back to the head. - // If we use a channel as taskQueue, we can't get the task from the head of the list and put it back to the head. - // But make sure that before change it to linked list, we should have the ability run the task in min(taskQueue length, concurrence) goroutines. - taskQueue chan *task - enqueuedTaskCount atomic.Int32 // count of unhandled tasks - bufferSize int // size of taskQueue buffer, means can count of bufferSize task can wait to be handled - maxWorker int // count of how many worker run in concurrence - workerSets []*workerSet - lock *sync.Mutex - stopFlag atomic.Bool -} -``` - -## 实现 - -上面已经确定需要的能力和基础的数据结构了,下面一个个去实现各个模块的能力。 - -### Worker Implement - -`worker` 能力相对纯粹,看看 worker 是如何工作的: - -```go -func (w *worker) run() { - defer w.ws.done() - - var ec = make(chan error, 1) - defer close(ec) - go func() { - ec <- w.ws.task.tf() - }() - - select { - case e := <-ec: - w.err = e - case <-w.ws.ctx.Done(): - } -} -``` - -### WorkerSet Implement - -`workerSet` 调度 worker,记录 worker 运行状态等。 - -{{< details "点击展开" "...">}} - -```go - -func newWorkerSet(ctx context.Context, t *task) *workerSet { - // 初始化参数 - // ... 省略代码 - return ws -} - -func (ws *workerSet) run() { - ws.task.updateStatus(TaskStatusRunning) - for _, w := range ws.workers { - ws.addOne() - go w.run() - } -} - -func (ws *workerSet) stopAll() { - ws.cancel() - ws.task.updateStatus(TaskStatusKilled) -} - -// err returns the first error that occurred in the workerSet. -func (ws *workerSet) err() error { - // ...省略代码 - return nil -} - -func (ws *workerSet) getRunningWorker() int { - return int(ws.runningWorker.Load()) -} - -// done called when worker stop. -func (ws *workerSet) done() { - ws.addRunningWorker(-1) - ws.wg.Done() - if ws.getRunningWorker() == 0 { - ws.task.updateStatus(TaskStatusDone) - } -} - -// addOne called when worker start running. -func (ws *workerSet) addOne() { - ws.addRunningWorker(1) - ws.wg.Add(1) -} - -func (ws *workerSet) wait() { - ws.wg.Wait() -} -``` - -{{< /details >}} - -### Task Implement - -`task` 主要是记录 task 的状态,并通过 workerSet 控制其下的 worker. - -{{< details "点击展开" "...">}} - -```go -func newTask(tf TaskFunc, concurrence int) *task { - return &task{ - tf: tf, - concurrence: concurrence, - } -} - -// Err returns the first error that occurred in the workerSet. -func (t *task) Err() error { - // check t.ws if nil return nil. - if t.ws == nil { - return nil - } - - return t.ws.err() -} - -// Wait for task done. -// Please make sure task is done or running before call this function. -func (t *task) Wait() { - // check t.ws if nil. - if t.ws == nil { - return - } - - t.ws.wait() -} - -// Status returns task status. -func (t *task) Status() TaskStatus { - return t.status -} - -// Kill task. -func (t *task) Kill() { - // check t.ws if nil. - if t.ws == nil { - return - } - - t.ws.stopAll() -} - -func (t *task) assignWorkerSet(ws *workerSet) { - t.ws = ws -} - -func (t *task) updateStatus(status TaskStatus) { - t.status = status -} -``` - -{{< /details >}} - -### TaskStatus Implement - -`TaskStatus` 虽然实现了 `TaskResult` 接口,但是不能控制任何 task,其有效的方法只有 `Status()` 和 `Err()` - -```go -func (t TaskStatus) Error() string { - return t.String() -} - -func (t TaskStatus) Err() error { - switch t { - case TaskStatusError, TaskStatusQueueFull, TaskStatusTooManyWorker, TaskStatusPoolClosed, TaskStatusKilled: - return t - } - - return nil -} - -func (t TaskStatus) Wait() { - // do nothing. -} - -func (t TaskStatus) Status() TaskStatus { - return t -} - -func (t TaskStatus) Kill() { - // do nothing. -} -``` - -### Pool Implement - -`Pool` 是总的入口,任务会提交到 `Pool`, 并由 `Pool` 创建 task 并调度到 `workerSet` 上,同时定时清理已完成的 `workerSet`, -确保空闲 `worker` 能被合理使用。 - -{{< details "点击展开" "...">}} - -```go -func NewPool(workerSize, queueSize int) *Pool { - // ... 初始化各个参数 - - // check p.enqueue to find out why make this channel size with p.bufferSize+1. - // - p.taskQueue = make(chan *task, p.bufferSize+1) - // 启动单独 goroutine 维护队列 - go p.consumeQueue() - return p -} - -func (p *Pool) Submit(ctx context.Context, tf TaskFunc, concurrence int) TaskResult { - if p.stopFlag.Load() { - return TaskStatusPoolClosed - } - - if concurrence > p.maxWorker { - return TaskStatusTooManyWorker - } - - // check if there has any worker place left - p.lock.Lock() - defer p.lock.Unlock() - - t := newTask(tf, concurrence) - if p.tryRunTask(ctx, t) { - return t - } - - if p.enqueueTask(t, true) { - return t - } - - return TaskStatusQueueFull -} - -func (p *Pool) Stop() { - // 关闭队列和正在运行的 workerSet -} - -// tryRunTask try to put task into workerSet and run it.Return false if capacity not enough. -// Make sure get p.Lock before call this func -func (p *Pool) tryRunTask(ctx context.Context, t *task) bool { - if p.curRunningWorkerNum()+t.concurrence <= p.maxWorker { - ws := newWorkerSet(ctx, t) - p.workerSets = append(p.workerSets, ws) - // run 为异步方法 - ws.run() - return true - } - - return false -} - -// curRunningWorkerNum get current running worker num -// make sure lock mutex before call this func -func (p *Pool) curRunningWorkerNum() int { - // ...省略代码 - return cnt -} - -// enqueueTask put task to queue. -// p.enqueuedTaskCount increase 1 if is new task -func (p *Pool) enqueueTask(t *task, isNewTask bool) bool { - // ... 省略代码 - return true -} - -func (p *Pool) consumeQueue() { - var ticker = time.NewTicker(time.Second) - for { - select { - case t, ok := <-p.taskQueue: - if !ok { - // channel closed - return - } - if p.tryRunTask(context.Background(), t) { - p.enqueuedTaskCount.Sub(1) - goto unlock - } - // if enqueueTask return false, means channel is closed. - if !p.enqueueTask(t, false) { - // channel is closed - goto unlock - } - - unlock: - p.lock.Unlock() - case <-ticker.C: - log.Printf("current running worker num: %d", p.curRunningWorkerNum()) - } - } - - // never reach here -} -``` - -{{< /details >}} - -## 使用 - -到这里相关开发基本结束了,有一些 `TODO` 项后面后补充完善,下面通过 test case 来看一下如何使用这个 worker pool: - - -```go - -func TestPool_SubmitOrEnqueue(t *testing.T) { - p := NewPool(5, 1) - var ( - cnt int - concurrence = 5 - ) - - tf := func() error { - time.Sleep(time.Second) - log.Println("hello world") - cnt++ - return nil - } - - got := p.Submit(context.Background(), tf, concurrence) - if got.Status() != TaskStatusRunning { - t.Errorf("SubmitOrEnqueue() = %v, want %v", got.Status(), TaskStatusRunning) - return - } - got.Wait() - if cnt != concurrence { - t.Errorf("cnt = %v, want %v", cnt, concurrence) - } - if got := p.Submit(context.Background(), tf, concurrence); got.Status() != TaskStatusRunning { - t.Errorf("SubmitOrEnqueue() = %v, want %v", got, TaskStatusRunning) - return - } - if got := p.Submit(context.Background(), tf, concurrence); got.Status() != TaskStatusEnqueue { - t.Errorf("SubmitOrEnqueue() = %v, want %v", got.Status(), TaskStatusEnqueue) - return - } - if got := p.Submit(context.Background(), tf, 6); got.Status() != TaskStatusTooManyWorker { - t.Errorf("SubmitOrEnqueue() = %v, want %v", got.Status(), TaskStatusTooManyWorker) - return - } - if got := p.Submit(context.Background(), tf, concurrence); got.Status() != TaskStatusQueueFull { - t.Errorf("SubmitOrEnqueue() = %v, want %v", got.Status(), TaskStatusQueueFull) - return - } - p.Stop() - if got := p.Submit(context.Background(), tf, 1); got.Status() != TaskStatusPoolClosed { - t.Errorf("SubmitOrEnqueue() = %v, want %v", got.Status(), TaskStatusPoolClosed) - return - } -} -``` - -## 总结 - -到这里这篇文章内容全部结束了,下面做一个简单的总结: - -- 介绍背景和需求 -- 根据需求定义了一组概念:`task`, `worker`, `workerSet`, `pool` -- 各个结构之前的关系以及如何实现 -- 最终给出使用的 test case. - -## 链接 🔗 - -**如果想仔细阅读源码,并持续关注这块功能的后续更新优化,请点击这里跳转到 [GitHub](https://github.com/yusank/goim/tree/main/pkg/worker).** diff --git a/docs/resources/_gen/assets/scss/goim/book.scss_50fc8c04e12a2f59027287995557ceff.content b/docs/resources/_gen/assets/scss/goim/book.scss_50fc8c04e12a2f59027287995557ceff.content deleted file mode 100644 index ed65056..0000000 --- a/docs/resources/_gen/assets/scss/goim/book.scss_50fc8c04e12a2f59027287995557ceff.content +++ /dev/null @@ -1 +0,0 @@ -@charset "UTF-8";:root{--gray-100:#f8f9fa;--gray-200:#e9ecef;--gray-500:#adb5bd;--color-link:#0055bb;--color-visited-link:#8440f1;--body-background:white;--body-font-color:black;--icon-filter:none;--hint-color-info:#6bf;--hint-color-warning:#fd6;--hint-color-danger:#f66}@media(prefers-color-scheme:dark){:root{--gray-100:rgba(255, 255, 255, 0.1);--gray-200:rgba(255, 255, 255, 0.2);--gray-500:rgba(255, 255, 255, 0.5);--color-link:#84b2ff;--color-visited-link:#b88dff;--body-background:#343a40;--body-font-color:#e9ecef;--icon-filter:brightness(0) invert(1);--hint-color-info:#6bf;--hint-color-warning:#fd6;--hint-color-danger:#f66}}/*!normalize.css v8.0.1 | MIT License | github.com/necolas/normalize.css*/html{line-height:1.15;-webkit-text-size-adjust:100%}body{margin:0}main{display:block}h1{font-size:2em;margin:.67em 0}hr{box-sizing:content-box;height:0;overflow:visible}pre{font-family:monospace,monospace;font-size:1em}a{background-color:transparent}abbr[title]{border-bottom:none;text-decoration:underline;text-decoration:underline dotted}b,strong{font-weight:bolder}code,kbd,samp{font-family:monospace,monospace;font-size:1em}small{font-size:80%}sub,sup{font-size:75%;line-height:0;position:relative;vertical-align:baseline}sub{bottom:-.25em}sup{top:-.5em}img{border-style:none}button,input,optgroup,select,textarea{font-family:inherit;font-size:100%;line-height:1.15;margin:0}button,input{overflow:visible}button,select{text-transform:none}button,[type=button],[type=reset],[type=submit]{-webkit-appearance:button}button::-moz-focus-inner,[type=button]::-moz-focus-inner,[type=reset]::-moz-focus-inner,[type=submit]::-moz-focus-inner{border-style:none;padding:0}button:-moz-focusring,[type=button]:-moz-focusring,[type=reset]:-moz-focusring,[type=submit]:-moz-focusring{outline:1px dotted ButtonText}fieldset{padding:.35em .75em .625em}legend{box-sizing:border-box;color:inherit;display:table;max-width:100%;padding:0;white-space:normal}progress{vertical-align:baseline}textarea{overflow:auto}[type=checkbox],[type=radio]{box-sizing:border-box;padding:0}[type=number]::-webkit-inner-spin-button,[type=number]::-webkit-outer-spin-button{height:auto}[type=search]{-webkit-appearance:textfield;outline-offset:-2px}[type=search]::-webkit-search-decoration{-webkit-appearance:none}::-webkit-file-upload-button{-webkit-appearance:button;font:inherit}details{display:block}summary{display:list-item}template{display:none}[hidden]{display:none}.flex{display:flex}.flex-auto{flex:auto}.flex-even{flex:1 1}.flex-wrap{flex-wrap:wrap}.justify-start{justify-content:flex-start}.justify-end{justify-content:flex-end}.justify-center{justify-content:center}.justify-between{justify-content:space-between}.align-center{align-items:center}.mx-auto{margin:0 auto}.text-center{text-align:center}.text-left{text-align:left}.text-right{text-align:right}.hidden{display:none}input.toggle{height:0;width:0;overflow:hidden;opacity:0;position:absolute}.clearfix::after{content:"";display:table;clear:both}html{font-size:16px;scroll-behavior:smooth;touch-action:manipulation}body{min-width:20rem;color:var(--body-font-color);background:var(--body-background);letter-spacing:.33px;font-weight:400;text-rendering:optimizeLegibility;-webkit-font-smoothing:antialiased;-moz-osx-font-smoothing:grayscale;box-sizing:border-box}body *{box-sizing:inherit}h1,h2,h3,h4,h5{font-weight:400}a{text-decoration:none;color:var(--color-link)}img{vertical-align:baseline}:focus{outline-style:auto;outline-color:currentColor;outline-color:-webkit-focus-ring-color}aside nav ul{padding:0;margin:0;list-style:none}aside nav ul li{margin:1em 0;position:relative}aside nav ul a{display:block}aside nav ul a:hover{opacity:.5}aside nav ul ul{padding-inline-start:1rem}ul.pagination{display:flex;justify-content:center;list-style-type:none}ul.pagination .page-item a{padding:1rem}.container{max-width:80rem;margin:0 auto}.book-icon{filter:var(--icon-filter)}.book-brand{margin-top:0;margin-bottom:1rem}.book-brand img{height:1.5em;width:1.5em;margin-inline-end:.5rem}.book-menu{flex:0 0 16rem;font-size:.875rem}.book-menu .book-menu-content{width:16rem;padding:1rem;background:var(--body-background);position:fixed;top:0;bottom:0;overflow-x:hidden;overflow-y:auto}.book-menu a,.book-menu label{color:inherit;cursor:pointer;word-wrap:break-word}.book-menu a.active{color:var(--color-link)}.book-menu input.toggle+label+ul{display:none}.book-menu input.toggle:checked+label+ul{display:block}.book-menu input.toggle+label::after{content:"▸"}.book-menu input.toggle:checked+label::after{content:"▾"}body[dir=rtl] .book-menu input.toggle+label::after{content:"◂"}body[dir=rtl] .book-menu input.toggle:checked+label::after{content:"▾"}.book-section-flat{margin:2rem 0}.book-section-flat>a,.book-section-flat>span,.book-section-flat>label{font-weight:bolder}.book-section-flat>ul{padding-inline-start:0}.book-page{min-width:20rem;flex-grow:1;padding:1rem}.book-post{margin-bottom:3rem}.book-header{display:none;margin-bottom:1rem}.book-header label{line-height:0}.book-header img.book-icon{height:1.5em;width:1.5em}.book-search{position:relative;margin:1rem 0;border-bottom:1px solid transparent}.book-search input{width:100%;padding:.5rem;border:0;border-radius:.25rem;background:var(--gray-100);color:var(--body-font-color)}.book-search input:required+.book-search-spinner{display:block}.book-search .book-search-spinner{position:absolute;top:0;margin:.5rem;margin-inline-start:calc(100% - 1.5rem);width:1rem;height:1rem;border:1px solid transparent;border-top-color:var(--body-font-color);border-radius:50%;animation:spin 1s ease infinite}@keyframes spin{100%{transform:rotate(360deg)}}.book-search small{opacity:.5}.book-toc{flex:0 0 16rem;font-size:.75rem}.book-toc .book-toc-content{width:16rem;padding:1rem;position:fixed;top:0;bottom:0;overflow-x:hidden;overflow-y:auto}.book-toc img{height:1em;width:1em}.book-toc nav>ul>li:first-child{margin-top:0}.book-footer{padding-top:1rem;font-size:.875rem}.book-footer img{height:1em;width:1em;margin-inline-end:.5rem}.book-comments{margin-top:1rem}.book-languages{margin-block-end:2rem}.book-languages .book-icon{height:1em;width:1em;margin-inline-end:.5em}.book-languages ul{padding-inline-start:1.5em}.book-menu-content,.book-toc-content,.book-page,.book-header aside,.markdown{transition:.2s ease-in-out;transition-property:transform,margin,opacity,visibility;will-change:transform,margin,opacity}@media screen and (max-width:56rem){#menu-control,#toc-control{display:inline}.book-menu{visibility:hidden;margin-inline-start:-16rem;font-size:16px;z-index:1}.book-toc{display:none}.book-header{display:block}#menu-control:focus~main label[for=menu-control]{outline-style:auto;outline-color:currentColor;outline-color:-webkit-focus-ring-color}#menu-control:checked~main .book-menu{visibility:initial}#menu-control:checked~main .book-menu .book-menu-content{transform:translateX(16rem);box-shadow:0 0 .5rem rgba(0,0,0,.1)}#menu-control:checked~main .book-page{opacity:.25}#menu-control:checked~main .book-menu-overlay{display:block;position:absolute;top:0;bottom:0;left:0;right:0}#toc-control:focus~main label[for=toc-control]{outline-style:auto;outline-color:currentColor;outline-color:-webkit-focus-ring-color}#toc-control:checked~main .book-header aside{display:block}body[dir=rtl] #menu-control:checked~main .book-menu .book-menu-content{transform:translateX(-16rem)}}@media screen and (min-width:80rem){.book-page,.book-menu .book-menu-content,.book-toc .book-toc-content{padding:2rem 1rem}}@font-face{font-family:roboto;font-style:normal;font-weight:400;font-display:swap;src:local(""),url(fonts/roboto-v27-latin-regular.woff2)format("woff2"),url(fonts/roboto-v27-latin-regular.woff)format("woff")}@font-face{font-family:roboto;font-style:normal;font-weight:700;font-display:swap;src:local(""),url(fonts/roboto-v27-latin-700.woff2)format("woff2"),url(fonts/roboto-v27-latin-700.woff)format("woff")}@font-face{font-family:roboto mono;font-style:normal;font-weight:400;font-display:swap;src:local(""),url(fonts/roboto-mono-v13-latin-regular.woff2)format("woff2"),url(fonts/roboto-mono-v13-latin-regular.woff)format("woff")}body{font-family:roboto,sans-serif}code{font-family:roboto mono,monospace}@media print{.book-menu,.book-footer,.book-toc{display:none}.book-header,.book-header aside{display:block}main{display:block!important}}.markdown{line-height:1.6}.markdown>:first-child{margin-top:0}.markdown h1,.markdown h2,.markdown h3,.markdown h4,.markdown h5,.markdown h6{font-weight:400;line-height:1;margin-top:1.5em;margin-bottom:1rem}.markdown h1 a.anchor,.markdown h2 a.anchor,.markdown h3 a.anchor,.markdown h4 a.anchor,.markdown h5 a.anchor,.markdown h6 a.anchor{opacity:0;font-size:.75em;vertical-align:middle;text-decoration:none}.markdown h1:hover a.anchor,.markdown h1 a.anchor:focus,.markdown h2:hover a.anchor,.markdown h2 a.anchor:focus,.markdown h3:hover a.anchor,.markdown h3 a.anchor:focus,.markdown h4:hover a.anchor,.markdown h4 a.anchor:focus,.markdown h5:hover a.anchor,.markdown h5 a.anchor:focus,.markdown h6:hover a.anchor,.markdown h6 a.anchor:focus{opacity:initial}.markdown h4,.markdown h5,.markdown h6{font-weight:bolder}.markdown h5{font-size:.875em}.markdown h6{font-size:.75em}.markdown b,.markdown optgroup,.markdown strong{font-weight:bolder}.markdown a{text-decoration:none}.markdown a:hover{text-decoration:underline}.markdown a:visited{color:var(--color-visited-link)}.markdown img{max-width:100%;height:auto}.markdown code{padding:0 .25rem;background:var(--gray-200);border-radius:.25rem;font-size:.875em}.markdown pre{padding:1rem;background:var(--gray-100);border-radius:.25rem;overflow-x:auto}.markdown pre code{padding:0;background:0 0}.markdown p{word-wrap:break-word}.markdown blockquote{margin:1rem 0;padding:.5rem 1rem .5rem .75rem;border-inline-start:.25rem solid var(--gray-200);border-radius:.25rem}.markdown blockquote :first-child{margin-top:0}.markdown blockquote :last-child{margin-bottom:0}.markdown table{overflow:auto;display:block;border-spacing:0;border-collapse:collapse;margin-top:1rem;margin-bottom:1rem}.markdown table tr th,.markdown table tr td{padding:.5rem 1rem;border:1px solid var(--gray-200)}.markdown table tr:nth-child(2n){background:var(--gray-100)}.markdown hr{height:1px;border:none;background:var(--gray-200)}.markdown ul,.markdown ol{padding-inline-start:2rem}.markdown dl dt{font-weight:bolder;margin-top:1rem}.markdown dl dd{margin-inline-start:0;margin-bottom:1rem}.markdown .highlight table tr td:nth-child(1) pre{margin:0;padding-inline-end:0}.markdown .highlight table tr td:nth-child(2) pre{margin:0;padding-inline-start:0}.markdown details{padding:1rem;border:1px solid var(--gray-200);border-radius:.25rem}.markdown details summary{line-height:1;padding:1rem;margin:-1rem;cursor:pointer}.markdown details[open] summary{margin-bottom:0}.markdown figure{margin:1rem 0}.markdown figure figcaption p{margin-top:0}.markdown-inner>:first-child{margin-top:0}.markdown-inner>:last-child{margin-bottom:0}.markdown .book-expand{margin-top:1rem;margin-bottom:1rem;border:1px solid var(--gray-200);border-radius:.25rem;overflow:hidden}.markdown .book-expand .book-expand-head{background:var(--gray-100);padding:.5rem 1rem;cursor:pointer}.markdown .book-expand .book-expand-content{display:none;padding:1rem}.markdown .book-expand input[type=checkbox]:checked+.book-expand-content{display:block}.markdown .book-tabs{margin-top:1rem;margin-bottom:1rem;border:1px solid var(--gray-200);border-radius:.25rem;overflow:hidden;display:flex;flex-wrap:wrap}.markdown .book-tabs label{display:inline-block;padding:.5rem 1rem;border-bottom:1px transparent;cursor:pointer}.markdown .book-tabs .book-tabs-content{order:999;width:100%;border-top:1px solid var(--gray-100);padding:1rem;display:none}.markdown .book-tabs input[type=radio]:checked+label{border-bottom:1px solid var(--color-link)}.markdown .book-tabs input[type=radio]:checked+label+.book-tabs-content{display:block}.markdown .book-tabs input[type=radio]:focus+label{outline-style:auto;outline-color:currentColor;outline-color:-webkit-focus-ring-color}.markdown .book-columns{margin-left:-1rem;margin-right:-1rem}.markdown .book-columns>div{margin:1rem 0;min-width:10rem;padding:0 1rem}.markdown a.book-btn{display:inline-block;font-size:.875rem;color:var(--color-link);line-height:2rem;padding:0 1rem;border:1px solid var(--color-link);border-radius:.25rem;cursor:pointer}.markdown a.book-btn:hover{text-decoration:none}.markdown .book-hint.info{border-color:#6bf;background-color:rgba(102,187,255,.1)}.markdown .book-hint.warning{border-color:#fd6;background-color:rgba(255,221,102,.1)}.markdown .book-hint.danger{border-color:#f66;background-color:rgba(255,102,102,.1)} \ No newline at end of file diff --git a/docs/resources/_gen/assets/scss/goim/book.scss_50fc8c04e12a2f59027287995557ceff.json b/docs/resources/_gen/assets/scss/goim/book.scss_50fc8c04e12a2f59027287995557ceff.json deleted file mode 100644 index 383eb23..0000000 --- a/docs/resources/_gen/assets/scss/goim/book.scss_50fc8c04e12a2f59027287995557ceff.json +++ /dev/null @@ -1 +0,0 @@ -{"Target":"book.min.97cfda4f5e3c9fa49a2bf8d401f4ddc0eec576c99cdcf6afbec19173200c37db.css","MediaType":"text/css","Data":{"Integrity":"sha256-l8/aT148n6SaK/jUAfTdwO7Fdsmc3PavvsGRcyAMN9s="}} \ No newline at end of file diff --git a/docs/themes/hugo-book b/docs/themes/hugo-book deleted file mode 160000 index 98d19b8..0000000 --- a/docs/themes/hugo-book +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 98d19b8e95019534622fd4c5eae707423730df2c