0x1.导入maven依赖

现在你需要导入一个starter包就好了,不用理会什么版本。

<dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

0x2.添加RabbitMQ配置

这里只需要配置下RabbitMQ服务端的连接参数就好了,其他的直接用默认配置。

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

0x3.消息发布者的配置

注册消息队列

@Bean
public Queue Queue() {
 return new Queue("TestQueue");
}
 /**
 * 绑定Queue和RabbitMQ服务端 否则Queue不会自动创建
 */
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
  SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
  container.setConnectionFactory(connectionFactory);
  container.setQueueNames(queueName);
  return container;
 }
  /**
  * 创建自定义MessageConverter 替换掉默认的SimpleMessageConverter
  */
@Bean
public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
  SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  factory.setConnectionFactory(connectionFactory);
  factory.setMessageConverter(new Jackson2JsonMessageConverter());
  return factory;
 }

向服务器注册一个新的消息队列,用于发布消息。如果不预先注册而直接使用的话,服务端会报错。



发布一个消息

public class TestQueueSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
         rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        String context = "hello " + new Date();
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("TestQueue", context);
    }
}

AmqpTemplate是收发消息接口定义,rabbitTemplate是springboot提供的默认实现。
例子中调用方法的原型

void convertAndSend(String routingKey, Object message) throws AmqpException;

第一个参数: routingKey, MQ里有个Exchange的概念,用于接收消息并且转发到指定的队列,针对不同消息的识别有四种模式,默认的是Direct,消息中带个routingKey来区分。
因为我没配置单独的exchange所以使用默认的。这个参数决定了消息会存放在服务端的哪个队列里。

第二个参数: message,第二个参数消息体,Object类型的。RabbitMQ有一个MessageConvert接口定义了消息的序列化,默认实现是SimpleMessageConverter,性能不够好,建议使用Jackson2JsonMessageConverter。

0x4.消息收费者的配置

创建消息的监听器

@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
    @RabbitHandler
    public void process(@Payload User user, @Headers Map<String,Object> headers,String hello) {
        System.out.println("Receiver  : " + hello);
    }
}
  • 1.使用 @RabbitListener 注解标记方法,当监听到队列中有消息时则会进行接收并处理
  • 2.当使用在class上时,则需要在接收方法上使用@RabbitHandler注解进一步明确。
  • 3.可以使用@RabbitHandler标记多个方法,具体调用哪个得看MessageConvert转化后的参数,实现了同一个接收器的重载。
  • 4.Payload 和 Headers 注解可以获取消息的body和headers

配置消息转换器:
默认的SimpleMessageConverter只能将消息转化成成byte[],这样的操作很不友好。我们需要重新定义一个。

@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
   SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
   factory.setConnectionFactory(connectionFactory);
   factory.setMessageConverter(new Jackson2JsonMessageConverter());
   return factory;
}

0x5.Exchange

现在有个问题,一个消息如何放置到指定的队列里面,客户端怎么描述自己需要的消息?这么多消息怎么分类。
OK,现在我们看一张图,标注了消息从生产者最终到消费者走的路。感觉看不清的点击放大了看。

在MQ中,消息会到达指定的exchange,exchange根据RoutingKey转发消息到指定的queue,消费者会从queue读取消息。所谓的Binding值的是exchange,RoutingKey,queue之间的具体的组合连接关系。

Exchange有四种类型:
第一种是Direct Exchange,RoutingKey是具体的值,通过是否相同来判断,也是默认的类型
第二种Topic Exchange,RoutingKey是按规则匹配的。
第三种Fanout Exchange,忽略RoutingKey的值,连向exchange的queue都会存入指定的消息。
最后一种Headers Exchange,是按请求的headers匹配的。

配置queue,RoutingKey,Binding
声明queue

@Bean(name= "message")
public Queue queueMessage() {
     return new Queue("topic.message");
}

声明exchange

@Bean
public TopicExchange exchange() {
    return new TopicExchange("exchange");
}

声明RoutingKey

String routingKey = "topic.message"

创建Binding

@Bean
Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) {
     return BindingBuilder.bind(queueMessage).to(exchange).with(routingKey);
}
届ける言葉を今は育ててる
最后更新于 2019-01-24