Spring Cloud整合RabbitMQ
0x1.导入maven依赖
现在你需要导入一个starter包就好了,不用理会什么版本。
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
0x2.添加RabbitMQ配置
这里只需要配置下RabbitMQ服务端的连接参数就好了,其他的直接用默认配置。
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest
0x3.消息发布者的配置
注册消息队列
@Bean public Queue Queue() { return new Queue("TestQueue"); } /** * 绑定Queue和RabbitMQ服务端 否则Queue不会自动创建 */ @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(queueName); return container; } /** * 创建自定义MessageConverter 替换掉默认的SimpleMessageConverter */ @Bean public RabbitListenerContainerFactory> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; }
向服务器注册一个新的消息队列,用于发布消息。如果不预先注册而直接使用的话,服务端会报错。
发布一个消息
public class TestQueueSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); String context = "hello " + new Date(); System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("TestQueue", context); } }
AmqpTemplate是收发消息接口定义,rabbitTemplate是springboot提供的默认实现。
例子中调用方法的原型
void convertAndSend(String routingKey, Object message) throws AmqpException;
第一个参数: routingKey, MQ里有个Exchange的概念,用于接收消息并且转发到指定的队列,针对不同消息的识别有四种模式,默认的是Direct,消息中带个routingKey来区分。
因为我没配置单独的exchange所以使用默认的。这个参数决定了消息会存放在服务端的哪个队列里。
第二个参数: message,第二个参数消息体,Object类型的。RabbitMQ有一个MessageConvert接口定义了消息的序列化,默认实现是SimpleMessageConverter,性能不够好,建议使用Jackson2JsonMessageConverter。
0x4.消息收费者的配置
创建消息的监听器
@Component @RabbitListener(queues = "hello") public class HelloReceiver { @RabbitHandler public void process(@Payload User user, @Headers Map<String,Object> headers,String hello) { System.out.println("Receiver : " + hello); } }
- 1.使用 @RabbitListener 注解标记方法,当监听到队列中有消息时则会进行接收并处理
- 2.当使用在class上时,则需要在接收方法上使用@RabbitHandler注解进一步明确。
- 3.可以使用@RabbitHandler标记多个方法,具体调用哪个得看MessageConvert转化后的参数,实现了同一个接收器的重载。
- 4.Payload 和 Headers 注解可以获取消息的body和headers
配置消息转换器:
默认的SimpleMessageConverter只能将消息转化成成byte[],这样的操作很不友好。我们需要重新定义一个。
@Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; }
0x5.Exchange
现在有个问题,一个消息如何放置到指定的队列里面,客户端怎么描述自己需要的消息?这么多消息怎么分类。
OK,现在我们看一张图,标注了消息从生产者最终到消费者走的路。感觉看不清的点击放大了看。
在MQ中,消息会到达指定的exchange,exchange根据RoutingKey转发消息到指定的queue,消费者会从queue读取消息。所谓的Binding值的是exchange,RoutingKey,queue之间的具体的组合连接关系。
Exchange有四种类型:
第一种是Direct Exchange,RoutingKey是具体的值,通过是否相同来判断,也是默认的类型
第二种Topic Exchange,RoutingKey是按规则匹配的。
第三种Fanout Exchange,忽略RoutingKey的值,连向exchange的queue都会存入指定的消息。
最后一种Headers Exchange,是按请求的headers匹配的。
配置queue,RoutingKey,Binding
声明queue
@Bean(name= "message") public Queue queueMessage() { return new Queue("topic.message"); }
声明exchange
@Bean public TopicExchange exchange() { return new TopicExchange("exchange"); }
声明RoutingKey
String routingKey = "topic.message"
创建Binding
@Bean Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with(routingKey); }