发布者/订阅者 模型如下:
他与前面两个小案例最大的区别就是,他的消息不是阅完即焚的。他允许将同一条消息发送给多个消费者。而实现此操作的原因是加入了我们的交换机(exchange)。
在发布者和订阅者的模型中,各个组件的功能如下:
注意:交换机他只负责消息的转发,并不存储消息,如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!!
OK,这么解释肯定是不够的,下面我们就来说一下第一种交换机类型——Fanout(广播)在Java中的具体使用方式。
步骤一、在消费者服务中,利用代码声明队列、交换机,并将两者进行绑定。
SpringAMQP提供的**交换机(Exchange)、队列(Queue)、绑定(Binding)**的API如下:
要将我们的队列绑定到交换机,我们需要编写我们的配置类如下:
packagecom.demo.mq.config;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.FanoutExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassFanoutConfig{/** * 声明FanoutExchange(广播交换机) */@BeanpublicFanoutExchangefanoutExchange(){//交换机的名称returnnewFanoutExchange("exchange.fanout");}/** * 声明第一个队列 */@BeanpublicQueuefanoutQueue1(){returnnewQueue("fanout.queue1");}/** * 声明第二个队列 */@BeanpublicQueuefanoutQueue2(){returnnewQueue("fanout.queue2");}/** * 绑定 队列1 到 交换机 */@BeanpublicBindingbindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/** * 绑定 队列2 到 交换机 */@BeanpublicBindingbindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}
写完配置类,我们重启一下我们的消费者服务类,然后我们到RabbitMQ上看一下我们的交换机和队列。
可以看到,确实多了一个交换机叫 exchange.fanout。
我们再看一下队列,可以看到,我们两个队列也都注册成功了。
点击我们刚才新增的交换机,打开它的Bindings,可以看到这个交换机他告诉我们,他的消息是会转发到 fanout.queue1 和 fanout.queue2中:
ok,我们接着往下写:
**步骤二、在消费者服务中,编写两个消费者方法,分别监听 fanout.queue1 和 fanout.queue2。 **.
监听的方法,现在应该已经写得滚瓜烂熟了吧,这里就直接贴代码了。
1、编写的类记得加 @Component 将这个监听的类注册到 Spring容器中。
2、监听哪个queue,那么就写对应的方法,并在方法上方添加@RabbitListener注解,用queues属性标明要监听的queue即可。(如果有多个,那么用 @RabbitListener(queues = {“queueName1”, “queueName2”})表示即可。
@ComponentpublicclassSpringRabbitListener{@RabbitListener(queues="fanout.queue1")publicvoidlistenFanoutQueue1(String msg){System.out.println("监听到 fanout.queue1 的消息为:【"+ msg+"】");}@RabbitListener(queues="fanout.queue2")publicvoidlistenFanoutQueue2(String msg){System.out.println("监听到 fanout.queue2 的消息为:【"+ msg+"】");}}
步骤三、在发布者服务中,编写测试方法,向交换机 exchange.fanout 发送消息。
@TestpublicvoidtestFanoutExchange(){//交换机名称String exchangeName="exchange.fanout";//消息String msg="Hello,av8d!";//发送消息 rabbitTemplate.convertAndSend(exchangeName,"", msg);}
这里的rabbitTemplate.convertAndSend接受三个参数,分别是
publicvoidconvertAndSend(String exchange,String routingKey,Object object)
写完测试方法,我们跑一下我们的测试方法,然后看一下我们消费者的控制台如下:
可以看到,只发布了一条消息,但是通过交换机发布给两个Queue后,我们消费者的两个方法都监听到了我们同一条消息。