AMQP: Advaced Message Queuing Protocol 高级消息队列协议,RabbitMQ完全满足该协议。
消息中间件(Message-oriented middleware, MOM)是一种软件或者硬件基础设施,通过它可以在分布式系统中发送和接受消息。RabbitMQ通过高级路由和消息分发工功能巧妙地实现了这一角色,即使需要满足广域网环境下实现可靠性所应达到的容错条件,分布式系统也可以很容易与其他系统进行互连。
消息中间件使用于需要可靠的数据传送的分布式环境。采用消息中间件的系统中,不同的对象之间通过传递消息来激活对方的事件,以完成相应的操作。发送者将消息发送给消息服务器,消息服务器将消息存放在若干队列中,在合适的时候再将消息转发给接收者。消息中间件能在不同平台之间通信,它常被用来屏蔽各种平台及协议之间的特性,实现应用程序之间的协同,其优点在于能够在客户和服务器之间提供同步和异步的连接,并且在任何时刻都可以将消息进行传送或者存储转发(这也是比远程过程调用更进步的原因)。
本文的所有操作,均采用Python的pika库来完成RabbitMQ的操作。尽管还有其他的第三方库,单是因为都遵循AMQP协议,所以提供的调用方法参数等大致差不多,个别会设置一些独有的参数。
¶基本概念
AMQ三个抽象组件:
交换器Exchange
:接收发送到RabbitMQ的消息并决定把它们投递到何处(由Exchange将消息路由到一个或多个队列中(或者丢弃))。
队列Queue
:负责存储接收到的消息,同时也可能包含处理消息的配置信息。消息可以只存储在内存中,也可以存储在硬盘中,然后以先进先出的顺序进行投递。
绑定:使用绑定来定义队列和交换器之间的关系。使用绑定或者绑定键(BindingKey)对于某些交换器类型,绑定同时告知交换器如何对消息进行过滤从而决定能够投递到队列的消息。当一条消息到交换器时,应用程序使用路由键(RoutingKey)属性来进行匹配。
RabbitMQ运转流程:
生产者发送消息的过程:
1.生产者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)。
2.生产者声明一个交换器(Exchange),并设置相关属性,比如交换机类型、是否持久化等。
3.生产者声明一个队列并设置相关属性(通过创建的通道开启),比如是否排他、是否持久化、是否自动删除等。
4.生产者通过绑定键将交换器和队列绑定起来。
5.生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息。
6.响应的交换器根据接收到的路由键查找相匹配的队列。
7.如果找到,则将从生产者发送过来的消息存入相应的队列中。
8.如果没有找到,则根据生成者配置的属性选择丢弃还是回退给生产者。
9.关闭信道。
10.关闭连接。
消费者接收消息的过程:
1.消费者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)。
2.消费者向RabbitMQ Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作。
3.等待 RabbitMQ Broker 回应并投递相应队列中的消息,消费者接收消息。
4.消费者确认(ack)接收到的消息。
5.RabbitMQ从队列中删除相应的已经被接收的消息。
6.关闭信道。
7.关闭连接。
¶连接和通道
连接和通道:
无论是生产者和消费者都需要通过TCP连接和RabbitMQ Broker建立连接,一个连接就是一个TCP连接。建立连接之后,用户就可以在连接的基础上建立一个AMQP信道,每个信道会被赋值一个唯一的ID(在各自的连接中不同,不同的连接重新算),信道是建立在TCP上的虚拟连接,RabbitMQ处理的每条AMQP指令都是通过信道完成的。
这样设计的理由:
TCP的创建和销毁是非常消耗资源的,遇到使用高峰,就会遇到性能瓶颈,采用NIO的做法,实现TCP连接复用,不仅可以减少性能开销,同时还便于管理。
每个线程都把持一个信道,所以信道复用了Conneciton的TCP连接。
RabbitMQ可以保证每个信道的私密性。当每个信道的流量不是很大的时候,复用单一的Connection可以在产生性能瓶颈的情况下有效地节省TCP的连接资源。但是当信道本身的流量很大的时候,多个信道复用同一个Connection就会产生性能瓶颈,进而整体的流量就会被限制了,此时就需要开辟多个Connection,将这些信道均摊道这些Connection中。
信道不是线程安全的,如果有多个线程,应该保证每个线程独享一个信道。
¶pika的使用
¶连接Connection以及信道Channel的创建。
1 | import pika |
可以创建多个通道,生产者发送消息、创建交换器、交换器和通道的绑定都是通过通道对象来实现的,但是通道只是负责通信,这些操作放到好几个channel对象来做也是没有问题的。(因为channel是非线程安全的,所以多线程中应该保持每个线程独占一个channel。)
¶交换器exchange和队列queue
1 | channel.exchange_declare(exchange='hello', exchange_type='fanout', durable=True) |
创建交换器以及队列都是通过信道对象完成,创建的的时候交换器和队列没有直接关系,可以使用多个通道完成(不是说只能通过同一个通道创建的交换器和队列才有关系,信道只是传递操作指令的通道)。
信道可以声明已经存在的交换器或者队列,如果声明的参数完全匹配现存的交换器或者队列,RabbitMQ就什么都不做,并成功返回,如果参数不一致,则报错
pika.exceptions.ChannelClosedByBroker: (406, "PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello' in vhost '/': received 'true' but current is 'false'")
也就是说不能出现同名的交换器和队列,交换器和队列都是唯一的。
¶交换器声明方法的常用属性:
channel.exchange_declare(exchange, exchange_type, passive, durable, auto_delete, internal, arguments)
exchange
: 交换器名称。exchange_type
: 交换器类型(fanout, direct, topic, headers),默认使用的是direct,为RoutingKey和BindingKey完全匹配。passive
: 默认为False。durable
: 默认为False,用于设置是否持久化,持久化可以将交换器存盘,这样在服务器重启的时候不会丢失相关信息。auto_delete
: 默认为False, 用于设置是否自动删除,自动删除的前提是在交换器至少与一个队列或者交换器绑定后,之后所有的与这个交换器绑定的队列或者交换器都与此解绑,此时这个交换器会自动删除。internal
: 默认为False,用于设置是否是内置的,如果是内置的交换器,客户端无法直接发送消息到这个交换器中,只能通过其他交换器路由到该交换器(设置方式和绑定queue类似,通过channel.exchange_bind()设置)。arguments
: 其他的一些结构化的参数,如alternate-exchange(用于设置备份交换机)。
与之对应还有一个交换器的删除方法:exchange_delete(self, exchange=None, if_unused=False)
,is_unused为True,则表示只有在交换器未被使用的情况下才会被删除。
¶队列声明方法的常用属性:
channel.queue_declare(queue, passive, durable, exclusive, auto_delete, arguments)
queue
: 队列名称。passive
: 默认值为False。Only check to see if the queue exists and raise ChannelClosed if it doesn’t.durable
: 默认为False,用于设置是否持久化,如果为True,则会存盘,在RabbitMQ服务重启的时候,队列就不会消失。(信息是存储按照先后顺序存储在队列中的,只要队列不消失,数据就不会消失,和交换器没有关系(交换器只是负责数据的路由的)。交换的的持久化是在硬盘中存储交换器的相关信息,否则一旦重启服务,没有持久化的交换器就会消失。)exclusive
: 默认为Flase,用于设置使用当前连接Connection创建的队列是排他队列(该队列只能在当前连接内使用,所以当前的连接内的channle都是能够使用的,然后在连接关闭的时候,该排他队列会自动删除(无论是否设置了durable参数进行持久化,都会被删除),此外,因为排他队列只能当前连接使用,所以此时其他连接中也是不能建立同名的排他队列的(本来是能够获的该队列的,但是由于是排他所以无法使用,要报错。))auto_delete
: 默认为False,设置为True之后,当队列创建之后,所有与该队列连接的消费者都断开的时候,即没有消费者连接该队列的时候,该队列会自动删除。arguments
: 设置队列的一些其他参数,如 x-message-ttl、 x-expires、x-max-length、 x-max-length-bytes、x-dead-letter-exchange、 x-dead-letter-routing-key、 x-max-priority等。
与之对应队列删除的方法:channel.queue_delete(queue, if_unused=False, if_empty=False)
,is_unused为True,则表示只有在队列未被使用的情况下才会被删除。is_empty表示只有当队列为空的时候才能被删除。
channel.queue_purge(queue)
用于清空队列,不会删除队列。
¶队列和交换器的绑定
channel.queue_bind(queue, exchange, routing_key=None, arguments=None)