RabbitMQ基础教程之Spring使用篇
相关博文,推荐查看:
- RabbitMq基础教程之安装与测试
- RabbitMq基础教程之基本概念
- RabbitMQ基础教程之基本使用篇
- RabbitMQ基础教程之使用进阶篇
在实际的应用场景中,将RabbitMQ和Spring结合起来使用的时候可能更加频繁,网上关于Spring结合的博文中,大多都是xml的方式,这篇博文,则主要介绍下利用JavaConfig的结合,又会是怎样的
I. Spring中RabbitMQ的基本使用姿势
1. 准备
开始之前,首先添加上必要的依赖,主要利用 spring-rabbit 来实现,这个依赖中,内部又依赖的Spring相关的模块,下面统一改成5.0.4版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.20</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.3.RELEASE</version> </dependency>
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.0.4.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>5.0.4.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>5.0.4.RELEASE</version> </dependency>
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies>
|
流程分析
实现主要分为两块,一个是投递服务,一个是消费服务,结合前面RabbitMQ的基本使用姿势中的流程,即便是使用Spring,我们也避免不了下面几步
- 建立连接
- 声明Exchange ,声明Queue
- 建立Queue和Exchange之间的绑定关系
- 发送消息
- 消费消息(ack/nak)
2. 基本case
首先借助Spring,来实现一个最基本的最简单的实现方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
|
public class SimpleProducer { public static void main(String[] args) throws InterruptedException { CachingConnectionFactory factory = new CachingConnectionFactory("127.0.0.1", 5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/");
RabbitAdmin admin = new RabbitAdmin(factory);
Queue queue = new Queue("hello", true, false, false, null); admin.declareQueue(queue);
TopicExchange exchange = new TopicExchange("topic.exchange"); admin.declareExchange(exchange);
admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("foo.*"));
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(factory); Object listener = new Object() { public void handleMessage(String foo) { System.out.println(" [x] Received '" + foo + "'"); } }; MessageListenerAdapter adapter = new MessageListenerAdapter(listener); container.setMessageListener(adapter); container.setQueues(queue); container.start();
RabbitTemplate template = new RabbitTemplate(factory); template.convertAndSend("topic.exchange", "foo.bar", "Hello, world!"); Thread.sleep(1000);
container.stop(); } }
|
3. 逻辑分析
上面这一段代码中,包含了消息投递和消费两块,从实现而言,基本上逻辑和前面的基础使用没有什么太大的区别,步骤如下:
- 建立连接:
new CachingConnectionFactory("127.0.0.1", 5672)
- 声明Queue:
new Queue("hello", true, false, false, null)
- 声明Exchange:
new TopicExchange("topic.exchange")
- 绑定Queue和Exchange:
admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("foo.*"));
- 投递消息:
template.convertAndSend("topic.exchange", "foo.bar", "Hello, world!");
- 消费消息: 设置
MessageListenerAdapter
这里面有几个类需要额外注意:
- RabbitTemplate: Spring实现的发送消息的模板,可以直接发送消息
- SimpleMessageListenerContainer: 注册接收消息的容器
II. Spring结合JavaConfig使用RabbitMQ使用姿势
1. 公共配置
主要是将公共的ConnectionFactory 和 RabbitAdmin 抽取出来
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @Configuration @ComponentScan("com.git.hui.rabbit.spring") public class SpringConfig {
private Environment environment;
@Autowired public void setEnvironment(Environment environment) { this.environment = environment; System.out.println("then env: " + environment); }
@Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); return factory; }
@Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } }
|
2. 消息投递
发送消息的组件就比较简单了,直接利用 AmqpTemplate 即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Component public class AmqpProducer {
private AmqpTemplate amqpTemplate;
@Autowired public void amqpTemplate(ConnectionFactory connectionFactory) { amqpTemplate = new RabbitTemplate(connectionFactory); }
public void publishMsg(String exchange, String routingKey, Object msg) { amqpTemplate.convertAndSend(exchange, routingKey, msg); } }
|
3. DirectExchange消息消费
根据不同的Exchange类型,分别实现如下
DirectExchange方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| @Configuration public class DirectConsumerConfig { @Autowired private ConnectionFactory connectionFactory;
@Autowired private RabbitAdmin rabbitAdmin;
@Bean public DirectExchange directExchange() { DirectExchange directExchange = new DirectExchange("direct.exchange"); directExchange.setAdminsThatShouldDeclare(rabbitAdmin); return directExchange; }
@Bean public Queue directQueue() { Queue queue = new Queue("aaa"); queue.setAdminsThatShouldDeclare(rabbitAdmin); return queue; }
@Bean public Binding directQueueBinding() { Binding binding = BindingBuilder.bind(directQueue()).to(directExchange()).with("test1"); binding.setAdminsThatShouldDeclare(rabbitAdmin); return binding; }
@Bean public ChannelAwareMessageListener directConsumer() { return new BasicConsumer("direct"); }
@Bean(name = "directMessageListenerContainer") public MessageListenerContainer messageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setRabbitAdmin(rabbitAdmin); container.setQueues(directQueue()); container.setPrefetchCount(20); container.setAcknowledgeMode(AcknowledgeMode.AUTO); container.setMessageListener(directConsumer()); return container; } }
|
从上面的实现,基本上都是重新定义了一个Queue, Exchange, Binding, MessageListenerContainer(用来监听消息),并将消息的消费抽出了一个公共类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Slf4j public class BasicConsumer implements ChannelAwareMessageListener { private String name;
public BasicConsumer(String name) { this.name = name; }
@Override public void onMessage(Message message, Channel channel) throws Exception { try { byte[] bytes = message.getBody(); String data = new String(bytes, "utf-8"); System.out.println(name + " data: " + data + " tagId: " + message.getMessageProperties().getDeliveryTag()); } catch (Exception e) { log.error("local cache rabbit mq localQueue error! e: {}", e); } } }
|
4. 测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = SpringConfig.class) public class SprintUnit { @Autowired private AmqpProducer amqpProducer;
@Test public void testDirectConsumer() throws InterruptedException { String[] routingKey = new String[]{"hello.world", "world", "test1"}; for (int i = 0; i < 10; i++) { amqpProducer .publishMsg("direct.exchange", routingKey[i % 3], ">>> hello " + routingKey[i % 3] + ">>> " + i); } System.out.println("-------over---------");
Thread.sleep(1000 * 60 * 10); } }
|
这个测试类中,虽然主要是往MQ中投递消息,但在Spring容器启动之后,接收MQ消息并消费的实际任务,是通过前面的MessageListenerContainer
托付给Spring容器了,上面测试执行之后,输出为
1 2 3
| direct data: >>> hello test1>>> 2 tagId: 1 direct data: >>> hello test1>>> 5 tagId: 2 direct data: >>> hello test1>>> 8 tagId: 3
|
5. Topic & Fanout策略
上面的一个写出来之后,再看这两个就比较相似了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| @Configuration public class TopicConsumerConfig { @Autowired private ConnectionFactory connectionFactory;
@Autowired private RabbitAdmin rabbitAdmin;
@Bean public TopicExchange topicExchange() { TopicExchange topicExchange = new TopicExchange("topic.exchange"); topicExchange.setAdminsThatShouldDeclare(rabbitAdmin); return topicExchange; }
@Bean public Queue topicQueue() { Queue queue = new Queue("bbb"); queue.setAdminsThatShouldDeclare(rabbitAdmin); return queue; }
@Bean public Binding topicQueueBinding() { Binding binding = BindingBuilder.bind(topicQueue()).to(topicExchange()).with("*.queue"); binding.setAdminsThatShouldDeclare(rabbitAdmin); return binding; }
@Bean public ChannelAwareMessageListener topicConsumer() { return new BasicConsumer("topic"); }
@Bean(name = "topicMessageListenerContainer") public MessageListenerContainer messageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setRabbitAdmin(rabbitAdmin); container.setQueues(topicQueue()); container.setPrefetchCount(20); container.setAcknowledgeMode(AcknowledgeMode.AUTO); container.setMessageListener(topicConsumer()); return container; } }
|
对应的测试case
1 2 3 4 5 6 7 8 9 10
| @Test public void testTopicConsumer() throws InterruptedException { String[] routingKey = new String[]{"d.queue", "a.queue", "cqueue"}; for (int i = 0; i < 20; i++) { amqpProducer.publishMsg("topic.exchange", routingKey[i % 3], ">>> hello " + routingKey[i % 3] + ">>> " + i); } System.out.println("-------over---------");
Thread.sleep(1000 * 60 * 10); }
|
广播方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| @Configuration public class FanoutConsumerConfig {
@Autowired private ConnectionFactory connectionFactory;
@Autowired private RabbitAdmin rabbitAdmin;
@Bean public FanoutExchange fanoutExchange() { FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange"); fanoutExchange.setAdminsThatShouldDeclare(rabbitAdmin); return fanoutExchange; }
@Bean public Queue fanoutQueue() { Queue queue = new Queue("ccc"); queue.setAdminsThatShouldDeclare(rabbitAdmin); return queue; }
@Bean public Binding fanoutQueueBinding() { Binding binding = BindingBuilder.bind(fanoutQueue()).to(fanoutExchange()); binding.setAdminsThatShouldDeclare(rabbitAdmin); return binding; }
@Bean public ChannelAwareMessageListener fanoutConsumer() { return new BasicConsumer("fanout"); }
@Bean(name = "FanoutMessageListenerContainer") public MessageListenerContainer messageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setRabbitAdmin(rabbitAdmin); container.setQueues(fanoutQueue()); container.setPrefetchCount(20); container.setAcknowledgeMode(AcknowledgeMode.AUTO); container.setMessageListener(fanoutConsumer()); return container; } }
|
对应的测试case
1 2 3 4 5 6 7 8 9 10 11
| @Test public void testFanoutConsumer() throws InterruptedException { String[] routingKey = new String[]{"d.queue", "a.queue", "cqueue", "hello.world", "world", "test1"}; for (int i = 0; i < 20; i++) { amqpProducer .publishMsg("fanout.exchange", routingKey[i % 6], ">>> hello " + routingKey[i % 6] + ">>> " + i); } System.out.println("-------over---------");
Thread.sleep(1000 * 60 * 10); }
|
II. 其他
项目地址
一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛
声明
尽信书则不如,已上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激
扫描关注