基于Spring深入源码学习RabbitMQ
这篇文章将继续学习RabbitMq的一些东西,为了更好的学习,阅读和理解相关源码是非常有必要的。本篇文章将简单讲解下与RabbitMQ交互所使用的几个类和方法。
基本注解
@RabbitListener(queues = queueName)
public @interface RabbitListener { // 本监听器的唯一id String id() default ""; // 指定容器工厂 默认的是SimpleRabbitListenerContainerFactory String containerFactory() default ""; // 绑定的队列对象 通过名字绑定 String[] queues() default {}; // 绑定的队列对象 直接通过实例绑定 Queue[] queuesToDeclare() default {}; // 是否独占 boolean exclusive() default false; // 优先级 String priority() default ""; // 绑定rabbitadmin rabbitadmin是用来管理消息队列的,比如创建删除队列什么的 一般不需要单独配置 String admin() default ""; // 绑定的Binding 通过bindings来决定绑定的消息队列 和queuesToDeclare queues 互斥 QueueBinding[] bindings() default {}; // 针对不同的监听器分组 后续可以分组操作这些监听器 String group() default ""; // 值为字符串形式的true和false 如果是true 监听器抛出的异常(异常实体类RemoteInvocationResult,可用RemoteInvocationAwareMessageConverterAdapter重写)直接发送给服务端, // 如果是false ,则异常抛给监听器容器, String returnExceptions() default ""; // 设置一个异常处理器(RabbitListenerErrorHandler) 如果监听器抛出异常则调用 String errorHandler() default ""; // 设置并发特性 如果值是个整数,则用户设置监听器的实例个数,如果是【max,min】格式,则动态设定实例的个数 String concurrency() default ""; // 字符串类型的true和false 用于覆盖默认的容器工厂设置 String autoStartup() default ""; }
RabbitListener标记一个方法作为Rabbit消息监听器的处理方法,该注解由RabbitListenerAnnotationBeanPostProcessor类解析并处理。
在方法级别使用该注解,会为注解方法创建一个监听器容器。如果是在类上使用该注解,则需要使用@RabbitHandler注解定义具体的处理方法,RabbitHandler注解可存在多个,具体找哪个方法需要根据MessageConverter转换后的对象类型决定
RabbitListener标记一个方法作为Rabbit消息监听器的目标,该注解由RabbitListenerAnnotationBeanPostProcessor类发现并处理。
在方法级别使用该注解,会为这个方法创建一个侦听器容器。如果是在类上使用该注解,则需要使用@RabbitHandler注解定义具体的处理方法,RabbitHandler注解可存在多个,具体找哪个方法需要根据MessageConverter转换后的对象类型决定
@Component // 绑定的队列名 @RabbitListener(queues = "test") public class HelloReceiver { // 如果消息经messageconverter后是User 则走本方法 @RabbitHandler public void process(@Payload User user, @Headers Map<String, Object> headers) { System.out.println("Receiver : " + user.getId()); } // 如果消息经messageconverter后是Post 则走本方法 @RabbitHandler public void process(@Payload Post post, @Headers Map<String, Object> headers) { System.out.println("Receiver : " + post.getId()); } }
RabbitListenerContainerFactory
Spring的消息监听器容器工厂,默认实现是SimpleRabbitListenerContainerFactory,用于构建SimpleMessageListenerContainer,定义了创建特定容器工厂和默认容器工厂两个方法。注意java8之后接口能直接定义方法体,使用default关键字即可。讲道理这应该是个语法糖。。
@FunctionalInterface public interface RabbitListenerContainerFactory<C extends MessageListenerContainer> { C createListenerContainer(@Nullable RabbitListenerEndpoint endpoint); default C createListenerContainer() { return createListenerContainer(null); } }
创建自定义工厂需要一个RabbitListenerEndpoint参数,RabbitListenerEndpoint定义了监听器端点的配置,默认实现类SimpleRabbitListenerEndpoint,
它内部存放了MessageListener实例,消息监听器容器,监听器配置,以及包含了相关读写的方法。简单的讲就是一个model。。
RabbitListenerContainerFactory的默认实现类是SimpleRabbitListenerContainerFactory,
SimpleMessageListenerContainer
SimpleMessageListenerContainer作为监听器的容器,内置持有了Consumers,Consumers指的是监听器的实例。
容器本身包含了run restart 维护Consumers 关闭监听等功能,监听器的部分属性比如Concurrency也是在容器中配置。
Queue
Queue类包含在org.springframework.amqp.core下,用于描述消息队列的model类。
public class Queue extends AbstractDeclarable { /** * Argument key for the master locator. * @since 2.1 */ public static final String X_QUEUE_MASTER_LOCATOR = "x-queue-master-locator"; private final String name; private final boolean durable; private final boolean exclusive; private final boolean autoDelete; private final Map<String, Object> arguments; private volatile String actualName; }
Exchange
Exchange接口包含了name type isDurable isAutoDelete等属性定义,它有五个实现类,除了4种exchange类型外,还包含了CustomExchange类。
本质都是model,没声明特殊的操作。
RabbitTemplate
RabbitTemplate实现了AmqpTemplate,而AmqpTemplate定义了AMQP协议的基本操作,包含了接收消息发送消息的N个方法重载。
RabbitTemplate定义了发送消息的超时时间,消息编码(默认utf8),默认exchange(值为空字符串),内部持有Channel(就是连接通道,连接对象)
里面的内容太多了,具体的看官方doc
常用方法:
convertAndSend(final Object object)
convertAndSend(String routingKey, final Object object)
convertAndSend(String exchange, String routingKey, final Object object)
根据 exchange和routingKey发送object到指定的队列 ,exchange和routingKey不写则就是默认的空字符串。
convertSendAndReceive(final Object object)
convertSendAndReceive(String routingKey, final Object object)
convertSendAndReceive(String exchange, String routingKey, final Object object)
参数和convertAndSend一致,多了Object类型请求响应对象。因为要接收消费者的返回内容,所以是同步的
convertSendAndReceiveAsType(final Object object,ParameterizedTypeReference
参数和上面的一致,多了一个T指明返回类型,其实就是省略了反序列化的步骤,也是同步的。