Skip to content

Commit

Permalink
feat: add rabbitMQ basic content
Browse files Browse the repository at this point in the history
  • Loading branch information
mx52jing committed Jul 28, 2024
1 parent 6d58c17 commit 569b1ca
Show file tree
Hide file tree
Showing 2 changed files with 351 additions and 0 deletions.
8 changes: 8 additions & 0 deletions docs/.vitepress/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,14 @@ export default {
{ text: '操作动态配置文件', link: '/knowledge-deposition/Nacos/操作动态配置文件'},
]
},
{
text: 'RabbitMQ',
collapsible: false,
collapsed: false,
items: [
{ text: '基本概念和安装启动', link: '/knowledge-deposition/RabbitMQ/基本概念和安装启动'},
]
},
],
'/algorithm/': [
{
Expand Down
343 changes: 343 additions & 0 deletions docs/knowledge-deposition/RabbitMQ/基本概念和安装启动.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,343 @@
---
layout: doc
---

# 安装启动和基本使用

## 简介

- [官方网站](https://www.rabbitmq.com/)

> `RabbitMQ`是一款可靠且成熟的消息传递和流媒体代理,可以轻松部署在云环境、本地环境以及本地计算机上

## 核心概念

> `RabbitMQ`是一个`消息代理`(broker):它接受并转发消息
> 可以把它想象成是一个`邮局`:我们把要寄出的信件放入邮箱时,可以确信邮递员最终会将信件送到收件人手中
> 在这个类比中,`RabbitMQ`就像`一个邮箱``一个邮局``一名邮递员`
> 它有以下几个重要的概念:
1. `message`:RabbitMQ`接收``存储``转发`=>`二进制数据块`—-也就是`message`
2. `Producer`:生产者是消息的发送方,它将消息发送到`RabbitMQ`的交换器(`Exchange`)中
3. `Exchange`
4. `Queue`
5. `Consumer`


## 安装和启动

- [RabbitMQ各个版本的时间排布](https://www.rabbitmq.com/release-information)

- [Docker社区镜像](https://hub.docker.com/_/rabbitmq/)

### Docker启动

> 直接使用`docker run`来启动一个`image`
:::tip
- `3.13-management`版本自带了`web管理界面`
:::

```shell
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
```

> 或者使用`docker-compose`
```shell
version: "2.8"
services:
RabbitMQ:
image: rabbitmq:3.13-management
container_name: RabbitMQ
restart: always
ports:
- "5672:5672"
- "15672:15672"
```

## 使用`homebrew`安装启动教程

- Mac[安装教程](https://www.rabbitmq.com/docs/install-homebrew)

## 简单示例

> 原理如图所示:
![](https://raw.githubusercontent.com/mx52jing/image-hosting/main/images/RabbitMQ/rabbitMQ-single-single.png)

- 这里使用[Go语言](https://www.rabbitmq.com/tutorials/tutorial-one-go)演示


:::tip
- RabbitMQ支持多种协议,我们使用`AMQP 0-9-1`,这是一种用于消息传递的开放通用协议

- 有许多不同语言的RabbitMQ客户端,我们使用`Go amqp`客户端
:::

> 初始化项目并下载依赖
```shell
go mod init <your-module-name>
go get github.com/rabbitmq/amqp091-go
```

写一个错误处理的通用函数

```Go
// FailOnError a helper function to check the return value for each amqp call
func FailOnError(err error, msg string) {
if err != nil {
log.Panicf("%s】: %s", msg, err)
}
}
```

### 生产者(Producer)

1. 连接RabbitMQ服务

```Go
// 1. connect to the RabbitMQ
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
shared.FailOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
```

2. 开启一个`channel`

```Go
// 2. open a channel
ch, err := conn.Channel()
shared.FailOnError(err, "create channel error")
defer ch.Close()
```

:::warning
- 这里的`ch`不是go原生的channel,而是一个`AMQP channel`

```Go
func (c *Connection) Channel() (*Channel, error)
```
:::

3. 声明一个队列,并发送消息

```Go
// 3. declare a queue,it will only be created if it doesn't exist already
queue, err := ch.QueueDeclare(
"first_queue", // queue name
false, // durable
false, //delete when unused
false, //exclusive
false, // no-wait
nil, // arguments
)
shared.FailOnError(err, "declare queue error")

withTimeoutCtx, cancelFunc := context.WithTimeout(context.Background(), 6*time.Second)
defer cancelFunc()
// declare a message to be sent
messageBody := "hello, I am the first message, you have received me"
err = ch.PublishWithContext(
withTimeoutCtx,
"", //exchange name
queue.Name, // routing key
false, // mandatory
false, //immediate
amqp091.Publishing{
Body: []byte(messageBody),
ContentType: "text/plain",
},
)
shared.FailOnError(err, "Publish error")
log.Printf(" [x] Sent %s\n", messageBody)
```

:::warning
- 这里的`queue`只有不存在时才会被创建,但是`Producer``Consumer`的代码中都要声明,因为不知道`Producer``Consumer`哪个会先启动
:::

### 消费者/接收者(Consumer)

> 我们启动一个Consumer,然后让其一直监听是否有可接收的匹配消息
1.`Producer`一样,都需要`创建链接``channel``声明队列`

```Go
// 1、create connection
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
shared.FailOnError(err, "create connection error")
defer conn.Close()

// open a channel
ch, err := conn.Channel()
shared.FailOnError(err, "open a channel error")
defer ch.Close()

// 3、declare queue
queue, err := ch.QueueDeclare(
"first_queue",
false,
false,
false,
false,
nil,
)
shared.FailOnError(err, "declare queue error")
```

2. 消费消息

```Go
// 4、consume message
message, err := ch.Consume(
queue.Name,
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
shared.FailOnError(err, "consume queue message error")
waitCh := make(chan struct{})
go func() {
for msg := range message {
log.Printf("Received a message: %s \n", msg.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-waitCh
```

:::tip
- 为了能持续接受到消息,这里使用`channel`做锁
:::


## 完整代码

:::tip
- 这个例子中`Producer``Consumer`通过`命名队列`来确定发送和接受的消息
:::

> `send.go`
```Go
package main

import (
"context"
"github.com/rabbitmq/amqp091-go"
"go-rabbitmq/shared"
"log"
"time"
)

func connectAndSendMessage() {
// 1. connect to the RabbitMQ
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
shared.FailOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

// 2. open a channel
ch, err := conn.Channel()
shared.FailOnError(err, "create channel error")
defer ch.Close()

// 3. declare a queue,it will only be created if it doesn't exist already
queue, err := ch.QueueDeclare(
"first_queue", // queue name
false, // durable
false, //delete when unused
false, //exclusive
false, // no-wait
nil, // arguments
)
shared.FailOnError(err, "declare queue error")

withTimeoutCtx, cancelFunc := context.WithTimeout(context.Background(), 6*time.Second)
defer cancelFunc()
// declare a message to be sent
messageBody := "hello, I am the first message, you have received me"
err = ch.PublishWithContext(
withTimeoutCtx,
"", //exchange name
queue.Name, // routing key
false, // mandatory
false, //immediate
amqp091.Publishing{
Body: []byte(messageBody),
ContentType: "text/plain",
},
)
shared.FailOnError(err, "Publish error")
log.Printf(" [x] Sent %s\n", messageBody)
}

func main() {
connectAndSendMessage()
}
```

> `receive.go`
```Go
package main

import (
"github.com/rabbitmq/amqp091-go"
"go-rabbitmq/shared"
"log"
)

func createReceiverAndReceiveMessage() {
// 1、create connection
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
shared.FailOnError(err, "create connection error")
defer conn.Close()

// open a channel
ch, err := conn.Channel()
shared.FailOnError(err, "open a channel error")
defer ch.Close()

// 3、declare queue
queue, err := ch.QueueDeclare(
"first_queue",
false,
false,
false,
false,
nil,
)
shared.FailOnError(err, "declare queue error")
// 4、consume message
message, err := ch.Consume(
queue.Name,
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
shared.FailOnError(err, "consume queue message error")
waitCh := make(chan struct{})
go func() {
for msg := range message {
log.Printf("Received a message: %s \n", msg.Body)
}
}()
log.Printf("[*] Waiting for messages. To exit press CTRL+C")
<-waitCh
}

func main() {
createReceiverAndReceiveMessage()
}
```

0 comments on commit 569b1ca

Please sign in to comment.