RabbitMQ基础教程之基本使用篇 最近因为工作原因使用到RabbitMQ,之前也接触过其他的mq消息中间件,从实际使用感觉来看,却不太一样,正好趁着周末,可以好好看一下RabbitMQ的相关知识点;希望可以通过一些学习,可以搞清楚以下几点
基础环境搭建 
可以怎么使用 
实现原理是怎样的 
实际工程中的使用(比如结合SpringBoot可以怎么玩) 
 
相关博文,欢迎查看:
I. 前提准备 在开始之前,先得搭建基本的环境,因为个人主要是mac进行的开发,所有写了一篇mac上如何安装rabbitmq的教程,可以通过 《mac下安装和测试rabbitmq》  查看
1. Centos安装过程 下面简单说一下Linux系统下,可以如何安装
Centos 系统:
1 2 3 4 5 6 7 rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-8.noarch.rpm yum install erlang wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm 
启动和查看的命令
1 2 3 4 service rabbitmq-server start service rabbitmq-server status 
2. 注意 
安装完毕之后,可以开启控制台,主要就是 rabbitmq-plugins enable rabbitmq_management, 默认的端口号为15672 
默认分配的用户/密码为: guest/guest, 只允许本地访问;如果跨应用读写数据时,请添加账号和设置对应的权限(推荐参考上面mac安装的博文,里面有介绍) 
 
II. 基本使用篇 直接使用amqp-client客户端做基本的数据读写,先不考虑Spring容器的场景,我们可以怎样进行塞数据,然后又怎样可以从里面获取数据;
在实际使用之前,有必要了解一下RabbitMQ的几个基本概念,即什么是Queue,Exchange,Binding,关于这些基本概念,可以参考博文:
1. 基本使用姿势 首先是建立连接,一般需要设置服务器的IP,端口号,用户名密码之类的,公共代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 public  class  RabbitUtil      public  static  ConnectionFactory getConnectionFactory ()                    ConnectionFactory factory = new  ConnectionFactory();         factory.setHost("127.0.0.1" );         factory.setPort(5672 );         factory.setUsername("admin" );         factory.setPassword("admin" );         factory.setVirtualHost("/" );         return  factory;     } } 
a. 生产者 要使用,基本的就需要一个消息投递和一个消息消费两方,线看消息生产者的一般写法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public  class  MsgProducer      public  static  void  publishMsg (String exchange, BuiltinExchangeType exchangeType, String toutingKey, String message)              throws  IOException, TimeoutException  {        ConnectionFactory factory = RabbitUtil.getConnectionFactory();                  Connection connection = factory.newConnection();                  Channel channel = connection.createChannel();                  channel.exchangeDeclare(exchange, exchangeType, true , false , null );                  channel.basicPublish(exchange, toutingKey, null , message.getBytes());         channel.close();         connection.close();     } } 
针对上面的代码,结合RabbitMQ的基本概念进行分析
不管是干啥,第一步都是获取连接,也就是上面的Connection 
从《RabbitMq基础教程之基本概念》 直到,生产者消费者都是借助Channel与Exchange或者Queue打交道,接下来就是通过Connection创建数据流通信道Channel 
Channel准备完毕之后,生产者就可以向其中投递数据 
投递完毕之后,回收现场资源 
 
疑问: 
在声明Exchange时,是否就需要选择消息绑定策略? 
不声明时,默认是什么策略? 
 
b. 消费者 结合上面的代码和分析,大胆的预测下消费者的流程
获取连接Connection 
创建Channel 
将Channel与Queue进行绑定 
创建一个Consumer,从Queue中获取数据 
消息消费之后,ack 
 
下面给出一个mq推数据的消费过程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public  class  MsgConsumer      public  static  void  consumerMsg (String exchange, String queue, String routingKey)              throws  IOException, TimeoutException  {        ConnectionFactory factory = RabbitUtil.getConnectionFactory();                  Connection connection = factory.newConnection();                  final  Channel channel = connection.createChannel();                  channel.queueDeclare(queue, true , false , false , null );                  channel.queueBind(queue, exchange, routingKey);         System.out.println("[*] Waiting for message. To exist press CTRL+C" );         Consumer consumer = new  DefaultConsumer(channel) {             @Override              public  void  handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties,                      byte [] body)  throws  IOException                 String message = new  String(body, "UTF-8" );                 try  {                     System.out.println(" [x] Received '"  + message);                 } finally  {                     System.out.println(" [x] Done" );                     channel.basicAck(envelope.getDeliveryTag(), false );                 }             }         };                  channel.basicConsume(queue, false , consumer);     } } 
2. Direct方式 a. Producer 直接在前面的基础上进行测试,我们定义一个新的exchange名为direct.exchange,并且制定ExchangeType为直接路由方式 (先不管这种写法的合理性)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public  class  DirectProducer      private  static  final  String EXCHANGE_NAME = "direct.exchange" ;     public  void  publishMsg (String routingKey, String msg)           try  {             MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, routingKey, msg);         } catch  (Exception e) {             e.printStackTrace();         }     }     public  static  void  main (String[] args)           DirectProducer directProducer = new  DirectProducer();         String[] routingKey = new  String[]{"aaa" , "bbb" };         String msg = "hello >>> " ;         for  (int  i = 0 ; i < 30 ; i++) {             directProducer.publishMsg(routingKey[i % 2 ], msg + i);         }         System.out.println("----over-------" );     } } 
上面的代码执行一遍之后,看控制台会发现新增了一个Exchange
b. consumer 同样的我们写一下对应的消费者,一个用来消费aaa,一个消费bbb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public  class  DirectConsumer      private  static  final  String exchangeName = "direct.exchange" ;     public  void  msgConsumer (String queueName, String routingKey)           try  {             MsgConsumer.consumerMsg(exchangeName, queueName, routingKey);         } catch  (IOException e) {             e.printStackTrace();         } catch  (TimeoutException e) {             e.printStackTrace();         }     }     public  static  void  main (String[] args)  throws  InterruptedException          DirectConsumer consumer = new  DirectConsumer();         String[] routingKey = new  String[]{"aaa" , "bbb" };         String[] queueNames = new  String[]{"qa" , "qb" };         for  (int  i = 0 ; i < 2 ; i++) {             consumer.msgConsumer(queueNames[i], routingKey[i]);         }         Thread.sleep(1000  * 60  * 10 );     } } 
执行上面的代码之后,就会多两个Queue,且增加了Exchange到Queue的绑定
当上面两个代码配合起来使用时,就可以看到对于消费者而言,qa一直消费的是偶数,qb一直消费的是奇数,一次输出如下:
1 2 3 4 5 6 7 8 [qa] Waiting for  message. To exist press CTRL+C [qb] Waiting for  message. To exist press CTRL+C  [qa] Received 'hello >>> 0   [qb] Received ' hello >>> 1 [qa] Received 'hello >>> 2   [qb] Received ' hello >>> 3 [qa] Received 'hello >>> 4  ... 
3. Fanout方式 有了上面的case之后,这个的实现和测试就比较简单了
a. Producer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public  class  FanoutProducer      private  static  final  String EXCHANGE_NAME = "fanout.exchange" ;     public  void  publishMsg (String routingKey, String msg)           try  {             MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, routingKey, msg);         } catch  (Exception e) {             e.printStackTrace();         }     }     public  static  void  main (String[] args)           FanoutProducer directProducer = new  FanoutProducer();         String[] routingKey = new  String[]{"aaa" , "bbb" };         String msg = "hello >>> " ;         for  (int  i = 0 ; i < 30 ; i++) {             directProducer.publishMsg(routingKey[i % 2 ], msg + i);         }         System.out.println("----over-------" );     } } 
b. consumer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public  class  FanoutProducer      private  static  final  String EXCHANGE_NAME = "fanout.exchange" ;     public  void  publishMsg (String routingKey, String msg)           try  {             MsgProducer.publishMsg(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, routingKey, msg);         } catch  (Exception e) {             e.printStackTrace();         }     }     public  static  void  main (String[] args)           FanoutProducer directProducer = new  FanoutProducer();         String[] routingKey = new  String[]{"aaa" , "bbb" };         String msg = "hello >>> " ;         for  (int  i = 0 ; i < 30 ; i++) {             directProducer.publishMsg(routingKey[i % 2 ], msg + i);         }         System.out.println("----over-------" );     } } 
这个的输出就比较有意思了,fa,fb两个队列都可以接收到发布的消息,而且单独的执行一次上面的投递数据之后,发现fa/fb两个队列的数据都是30条
然后消费的结果如下
1 2 3 4 5 6 7 8 9 10 11 12 13 [qa] Waiting for  message. To exist press CTRL+C [qb] Waiting for  message. To exist press CTRL+C  [qa] Received 'hello >>> 0   [qb] Received ' hello >>> 0 [qa] Received 'hello >>> 1   [qb] Received ' hello >>> 1 [qb] Received 'hello >>> 2   [qa] Received ' hello >>> 2 [qa] Received 'hello >>> 3   [qb] Received ' hello >>> 3 [qb] Received 'hello >>> 4   [qa] Received ' hello >>> 4 ... 
4. Topic方式 代码和上面差不多,就不重复拷贝了,接下来卡另外几个问题
III. 基础进阶 在上面的基础使用中,会有几个疑问如下:
Exchange声明的问题(是否必须声明,如果不声明会怎样) 
Exchange声明的几个参数(durable, autoDelete)有啥区别 
当没有队列和Exchange绑定时,直接往队列中塞数据,好像不会有数据增加(即先塞数据,然后创建queue,建立绑定,从控制台上看这个queue里面也不会有数据) 
消息消费的两种姿势(一个主动去拿数据,一个是rabbit推数据)对比 
ack/nack怎么用,nack之后消息可以怎么处理 
 
以上内容,留待下一篇进行讲解
IV. 其他 1. 相关博文 
一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛
3. 声明 尽信书则不如,已上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激
4. 扫描关注