Osheep

时光不回头,当下最重要。

3个小时快速入门RocketMQ

最近RocketMQ刚刚上生产环境,闲暇之时在这里做一些分享,主要目的是让初学者能快速上手RocketMQ。

RocketMQ是什么

Github上关于RocketMQ的介绍:
RcoketMQ是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。
具有以下特性:

  1. 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
  2. 在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
  3. 支持拉(pull)和推(push)两种消息模式
  4. 单一队列百万消息的堆积能力
  5. 支持多种消息协议,如JMS、MQTT等
  6. 分布式高可用的部署架构,满足至少一次消息传递语义
  7. 提供docker镜像用于隔离测试和云集群部署
  8. 提供配置、指标和监控等功能丰富的Dashboard

对于这些特性描述,大家简单过一眼就即可,深入学习之后自然就明白了。

专业术语

Producer

消息生产者,生产者的作用就是将消息发送到MQ,生产者本身既可以产生消息,如读取文本信息等。也可以对外提供接口,由外部应用来调用接口,再由生产者将收到的消息发送到MQ。

Producer Group

生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。在这里可以不用关心,只要知道有这么一个概念即可。

Consumer

消息消费者,简单来说,消费MQ上的消息的应用程序就是消费者,至于消息是否进行逻辑处理,还是直接存储到数据库等取决于业务需要。

Consumer Group

消费者组,和生产者类似,消费同一类消息的多个consumer实例组成一个消费者组。

Topic

Topic是一种消息的逻辑分类,比如说你有订单类的消息,也有库存类的消息,那么就需要进行分类,一个是订单Topic存放订单相关的消息,一个是库存Topic存储库存相关的消息。

Message

Message是消息的载体。一个Message必须指定topic,相当于寄信的地址。Message还有一个可选的tag设置,以便消费端可以基于tag进行过滤消息。也可以添加额外的键值对,例如你需要一个业务key来查找broker上的消息,方便在开发过程中诊断问题。

Tag

标签可以被认为是对Topic进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。

Broker

Broker是RocketMQ系统的主要角色,其实就是前面一直说的MQ。Broker接收来自生产者的消息,储存以及为消费者拉取消息的请求做好准备。

Name Server

Name Server为producer和consumer提供路由信息。

RocketMQ架构

《3个小时快速入门RocketMQ》

RocketMQ架构

由这张图可以看到有四个集群,分别是NameServer集群、Broker集群、Producer集群和Consumer集群。

  1. NameServer: 提供轻量级的服务发现和路由。 每个NameServer记录完整的路由信息,提供等效的读写服务,并支持快速存储扩展。
  2. Broker: 通过提供轻量级的Topic和Queue机制来处理消息存储,同时支持推(push)和拉(pull)模式以及主从结构的容错机制。
  3. Producer:生产者,产生消息的实例,拥有相同Producer Group的Producer组成一个集群。
  4. Consumer:消费者,接收消息进行消费的实例,拥有相同Consumer Group的Consumer组成一个集群。

简单说明一下图中箭头含义,从Broker开始,Broker Master1和Broker Slave1是主从结构,它们之间会进行数据同步,即Date Sync。同时每个Broker与NameServer集群中的所有节
点建立长连接,定时注册Topic信息到所有NameServer中。
Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Broker Master建立长连接,且定时向Broker发送心跳。Producer只能将消息发送到Broker master,但是Consumer则不一样,它同时和提供Topic服务的Master和Slave建立长连接,既可以从Broker Master订阅消息,也可以从Broker Slave订阅消息。

RocketMQ集群部署模式

  1. 单master模式
    也就是只有一个master节点,称不上是集群,一旦这个master节点宕机,那么整个服务就不可用,适合个人学习使用。
  2. 多master模式
    多个master节点组成集群,单个master节点宕机或者重启对应用没有影响。
    优点:所有模式中性能最高
    缺点:单个master节点宕机期间,未被消费的消息在节点恢复之前不可用,消息的实时性就受到影响。
    注意:使用同步刷盘可以保证消息不丢失,同时Topic相对应的queue应该分布在集群中各个节点,而不是只在某各节点上,否则,该节点宕机会对订阅该topic的应用造成影响。
  3. 多master多slave异步复制模式
    在多master模式的基础上,每个master节点都有至少一个对应的slave。master节点可读可写,但是slave只能读不能写,类似于mysql的主备模式。
    优点: 在master宕机时,消费者可以从slave读取消息,消息的实时性不会受影响,性能几乎和多master一样
    缺点:使用异步复制的同步方式有可能会有消息丢失的问题。
  4. 多master多slave同步双写模式
    同多master多slave异步复制模式类似,区别在于master和slave之间的数据同步方式。
    优点:同步双写的同步模式能保证数据不丢失
    缺点:发送单个消息RT会略长,性能相比异步复制低10%左右。
    刷盘策略:同步刷盘和异步刷盘(指的是节点自身数据是同步还是异步存储)
    同步方式:同步双写和异步复制(指的一组master和slave之间数据的同步)
    注意:要保证数据可靠,需采用同步刷盘和同步双写的方式,但性能会较其他方式低。

RocketMQ单主部署

鉴于是快速入门,我选择的是第一种单master的部署模式。
先说明一下我的安装环境:

  1. Centos 7.2
  2. jdk 1.8
  3. Maven 3.2.x
  4. Git

这里git可用可不用,主要是用来直接下载github上的源码。也可以选择自己到github上下载,然后上传到服务器上。以git操作为示例。

  1. clone源码并用maven编译

    > git clone https://github.com/alibaba/RocketMQ.git /opt/RocketMQ
    > cd /opt/RocketMQ && mvn -Dmaven.test.skip=true clean package install assembly:assembly -U
    > cd target/alibaba-rocketmq-broker/alibaba-rocketmq

    此处可能遇到的问题
    一、执行git clone https://github.com/alibaba/RocketMQ.git /home/inspkgs/RocketMQ时出现以下提示:

    fatal: unable to access 'https://github.com/alibaba/RocketMQ.git/': Could not resolve host: github.com; Unknown error

    解决办法:一般是由于网络原因造成的,执行以下命令

    > ping github.com

    确定可以ping通之后,再重新执行git clone命令。
    二、执行mvn -Dmaven.test.skip=true clean package install assembly:assembly -U编译时,可能出现下载相关jar很慢的情况。
    这也是由于默认maven中央仓库在国外的原因,可以根据需要在/home/maven/conf/setting.xml中的<mirrors></mirrors>添加以下内容后重新编译:

    <mirror>
     <id>aliyun</id>
     <mirrorOf>central</mirrorOf>
     <name>aliyun maven</name>
     <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </mirror>
  2. 启动Name Server

    > nohup sh /opt/RocketMQ/bin/mqnamesrv &
    //执行jps查看进程
    > jps
    25913 NamesrvStartup
    //查看日志确保服务已正常启动
    > tail -f ~/logs/rocketmqlogs/namesrv.log
    The Name Server boot success...
  3. 启动broker
    > nohup sh /opt/RocketMQ/bin/mqbroker -n localhost:9876 &
    //执行jps查看进程
    > jps
    25954 BrokerStartup
    //查看日志确保服务已正常启动
    > tail -f ~/logs/rocketmqlogs/broker.log 
    The broker[broker-a, 10.1.54.121:10911] boot success...
  4. 发送和接收消息
    发送/接收消息之前,我们需要告诉客户端NameServer地址。RocketMQ提供了多种方式来实现这一目标。为简单起见,我们使用环境变量NAMESRV_ADDR
    > export NAMESRV_ADDR=localhost:9876
    > sh /opt/RocketMQ/bin/tools.sh com.alibaba.rocketmq.example.quickstart.Producer
    SendResult [sendStatus=SEND_OK, msgId= ...
    > sh /opt/RocketMQ/bin/tools.sh com.alibaba.rocketmq.example.quickstart.Consumer
    ConsumeMessageThread_%d Receive New Messages: [MessageExt...
  5. 关闭服务
    > sh /opt/RocketMQ/bin/mqshutdown broker
    The mqbroker(36695) is running...
    Send shutdown request to mqbroker(36695) OK
    > sh /opt/RocketMQ/bin/mqshutdown namesrv
    The mqnamesrv(36664) is running...
    Send shutdown request to mqnamesrv(36664) OK

生产者、消费者Demo

  1. 生产者

    public class Producer {
     public static void main(String[] args) throws MQClientException, InterruptedException {
    
         //声明并初始化一个producer
         //需要一个producer group名字作为构造方法的参数,这里为producer1
         DefaultMQProducer producer = new DefaultMQProducer("producer1");
    
         //设置NameServer地址,此处应改为实际NameServer地址,多个地址之间用;分隔
         //NameServer的地址必须有,但是也可以通过环境变量的方式设置,不一定非得写死在代码里
         producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");
    
         //调用start()方法启动一个producer实例
         producer.start();
    
         //发送10条消息到Topic为TopicTest,tag为TagA,消息内容为“Hello RocketMQ”拼接上i的值
         for (int i = 0; i < 10; i++) {
             try {
                 Message msg = new Message("TopicTest",// topic
                         "TagA",// tag
                         ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
                 );
    
                 //调用producer的send()方法发送消息
                 //这里调用的是同步的方式,所以会有返回结果
                 SendResult sendResult = producer.send(msg);
    
                 //打印返回结果,可以看到消息发送的状态以及一些相关信息
                 System.out.println(sendResult);
             } catch (Exception e) {
                 e.printStackTrace();
                 Thread.sleep(1000);
             }
         }
    
         //发送完消息之后,调用shutdown()方法关闭producer
         producer.shutdown();
     }
    }
  2. 消费者

    public class Consumer {
    
     public static void main(String[] args) throws InterruptedException, MQClientException {
    
         //声明并初始化一个consumer
         //需要一个consumer group名字作为构造方法的参数,这里为consumer1
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
    
         //同样也要设置NameServer地址
         consumer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");
    
         //这里设置的是一个consumer的消费策略
         //CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
         //CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
         //CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
         //设置consumer所订阅的Topic和Tag,*代表全部的Tag
         consumer.subscribe("TopicTest", "*");
    
         //设置一个Listener,主要进行消息的逻辑处理
         consumer.registerMessageListener(new MessageListenerConcurrently() {
    
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                             ConsumeConcurrentlyContext context) {
    
                 System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
    
                 //返回消费状态
                 //CONSUME_SUCCESS 消费成功
                 //RECONSUME_LATER 消费失败,需要稍后重新消费
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
             }
         });
    
         //调用start()方法启动consumer
         consumer.start();
    
         System.out.println("Consumer Started.");
     }
    }

本篇到此完结,第一次写文章,表达不清楚的地方还请各位小伙伴海涵。如有问题,可以多多交流。如果后面有时间的话,我也会分享更多的干货,包括producer、consumer的API详解、如何与spring boot整合、生产环境部署的一些注意事项,以及学习RocketMQ这一路踩过的坑等等。
最后,如果觉得写的还过得去,点个喜欢或者小小打赏一下都是一种肯定,谢谢大家的支持。

点赞