Rabbit 介绍
RabbitMQ 是基于AMQP协议开发的一套高性能消息中间件。 AMQP(Advanced Message Queuing Protocol)是一种网络协议,用于在应用程序之间传递消息。它定义了消息的格式、交换方式和队列管理等内容,使得不同应用程序之间可以方便地进行消息传递和通信。 AMQP协议的内容包括:
- 消息格式:定义了消息的结构,包括消息头、消息体和属性等信息。
- 交换方式:AMQP使用交换机(Exchange)来接收消息,并根据规则将消息路由到对应的队列。
- 队列管理:定义了队列的创建、删除、绑定等操作,以及消息的生产和消费。
- 连接和会话管理:定义了客户端与消息代理之间的连接建立和维护的机制。
- 安全机制:包括认证、授权和加密等内容,确保消息传递的安全性。 总的来说,AMQP协议提供了一种标准化的消息传递机制,使得不同的消息中间件和应用程序可以遵循同一套规范进行通信,从而实现更加灵活和可靠的消息传递
Rabbit 集群基本组件概念
Producer
生产者,消息的产生客户端,可以由N个producer组成,用于生产并发送消息到 Broker ,直接连接到指定的 Virtual host
Consumer
消费者,消息的消费客户端,可以由N个consumer组成,用于消费 Queque 下的消息,直接连接到指定的 Virtual host
Connection
端和Brocker 的TCP连接,N个节点有N个connection,一一对应。Connection 采用一个连接多路复用的方式(NIO),一个connection 可以包含多个 channel ,一个线程维护一个channel
Channel
网络通道,进行读写消息的基本单元。所有和 Broker 的交互操作都是通过 channel 进行的。一个channel 对应一个线程来进行管理,当高并发读写的情况下,可以增加读写性能。 在 Spring Cloud 中,当消费者连接到 RabbitMQ 时,通常会创建一个 connection,并且可以配置创建多个 channel 进行通信。默认情况下,Spring Cloud Stream 会为每个消息通道创建一个 channel,但是你也可以通过配置来控制 channel 的数量。 在 Spring Cloud Stream 中,可以通过如下配置设置channel 数量。
spring.cloud.stream.rabbit.bindings.myChannel.producer.required-groups=2
spring:
cloud:
stream:
rabbit:
bindings:
myChannel:
producer:
required-groups: 2
Spring Cloud Stream 会根据这个配置来创建相应数量的 channel,并将它们用于消息的发送和接收。这样可以提高并发性能,同时也可以更好地管理消息通道之间的通信。
Broker
服务端,收受来自 consumer 和 producer 的连接,是保存消息和分发路由消息的主体。每个Broker 都会存储消息的副本以便节点异常可以快速切换。
Virtual Host
虚拟主机,服务端可以做到用户级别隔离,每个虚拟主机里面都有自己的Exchange 和 binding 策略,客户端连接的是哪个Virtual Host 就使用哪个。Broker 可以由多个 Virtual Host 组成,但是端和Virtual Host 的关系是连接的时候就已经确定,所以要注意自己使用的连接配置。
Exchange
交换机,消息到达 Broker后,由交换机进行消息的路由(Routing key)转发,交换机类型分为 topic、direct、 fanout、headers 、x-delay(延迟消息)
Routing key
路由规则,Exchange 会根据路由规则进行消息的投递,Routing key 是Exchange 和 Queque之间的桥梁。但是也不一定是必须的,例如采用 fanout 模式,fanout模式会投递消息到所有队列。
Binding
exchange 和 queque 的一种绑定关系,通过绑定关系,可以找到对应的队列,用于分发消息到指定队列中。存储在 exchange 查询表中。
Queque
队列,接收交换机路由过来的消息队列,消费确认后会删除队列中的消息。
Message
消息,在端和broker之间流转的消息体,由消息 properties,body 等属性组成,properties包含头信息等属性。
持久化
在 RabbitMQ 中,消息持久化是通过以下两种方式来实现的:
- 队列持久化:在声明队列时,可以设置 durable = true 来使队列持久化。持久化的队列会在 RabbitMQ 服务器重启后仍然存在,不会丢失。即使服务器重启,持久化的队列也会被恢复。
- 消息持久化:在发送消息时,可以设置消息的 deliveryMode = 2 来使消息持久化。持久化的消息会被写入磁盘,即使 RabbitMQ 服务器重启,消息也不会丢失。 如果队列中的消息已经被消费者消费掉并且删除了,但消息本身是持久化的,那么即使消息被删除,仍然可以通过 RabbitMQ 的日志文件来恢复消息。RabbitMQ 会将持久化的消息写入磁盘中的日志文件,即使消息被消费后删除,仍然可以通过日志文件来找回消息。
Rabbit 消息路由方式
Direct
精准匹配,和 Routing key 配合使用,exchange 先进行Routing key和Queue 进行绑定,绑定完成后,生产者发送带有 Routing key 的消息到交换机,交换机根据之前绑定的key进行对应的路由。 暂时无法在飞书文档外展示此内容
Fanout
广播模式,无需 Routing key ,该模式下会把消息投递到所有和 exchange 绑定的队列中。 暂时无法在飞书文档外展示此内容
Topic
可以理解为direct的升级版,topic 类型的 Exchange 必须要配置一个Routing key,使用符号 . 来进行分割,也可以使用符号 # 来匹配任意字符,符号* 可以精确代替一个字符。当消息的 Routing key 匹配到了规则,则将消息投递到匹配的队列中
- *:与Routing key进行匹配时,可以精确的代替一个字符
- #:与Routing key进行匹配时,可以代替0个或多个字符
暂时无法在飞书文档外展示此内容
Header
根据header参数进行匹配,exchange不需要Routing key,而是根据 headers 中的参数进行匹配。 X-match :
- all:必须参数全部匹配
- any:任何参数匹配即可
Message audioMessage = MessageBuilder.withPayload(audioMsgDTO).setHeader("headerA",headerValue).build();
暂时无法展示此内容
x-delayed-message
延迟消息,这个是一个特殊的消息类型,使用场景比较丰富,但是 Rabbit 对这种类型的支持不是很好。目前我们代码中是使用插件的形式来使用的,也就是设置 header 参数,添加 “x-delay”, 并设置TTL时间参数。如下 Message audioMessage = MessageBuilder.withPayload(audioMsgDTO).setHeader(“x-delay”,delayInterval).build(); 除此外,还有一种死信 + TTL 来设置死信的模式,下面是延迟队列的原理和消息流转过程:
- 创建延迟队列:首先创建一个普通的队列,并设置该队列的 x-message-ttl 属性为消息的过期时间,同时设置队列的 x-dead-letter-exchange 属性为一个交换机,这个交换机会接收队列中过期的消息。
- 消息发送:当生产者发送消息到延迟队列时,消息会被发送到这个普通队列中。消息的过期时间会根据队列的 x-message-ttl 属性来设置。
- 消息过期:当消息的过期时间到达时,消息会变成死信(Dead Letter),并被发送到设置的交换机中。
-
特殊处理:在交换机接收到死信消息后,可以将这些消息路由到另一个队列,消费者可以从这个队列中获取延迟的消息进行处理。 通过上述流程,就可以实现延迟队列的效果。消息在发送到交换机后,会经过特殊处理来达到延迟效果。
延迟队列的实现方式虽然简单有效,但也存在一些风险和注意事项:
- 消息堆积:如果消费者处理消息的速度比消息产生的速度慢,可能会导致消息在队列中堆积,影响系统性能。
- 消息丢失:在某些情况下,可能会出现消息丢失的情况,比如网络故障、节点故障等情况。
- 性能影响:使用延迟队列会增加系统的复杂性和维护成本,可能会影响系统的性能和稳定性。
喜欢文章请关注我
程序领域
点击关注+转发,私信发送【面试】或者【资料】可以收获更多资源