04 March 2020

一.消息队列(Message Queue)是一种中间件(Middleware),为什么要在系统中引入消息队列呢?引入消息队列有什么好处?

消息队列的好处主要有以下几点:

  • 解偶
  • 异步
  • 削峰/限流

以上三点不想再做过多解释了,开发生产过程中遇到过很多类似情况了。

二. 任何事物都有两面性,MQ也一样,那么使用MQ会带来哪些问题呢?

  • 系统可用性降低,引入外部依赖越多,越容易挂掉,MQ一旦挂掉,可能导致服务不可用
  • 系统复杂性提高,可能会遇到信息重复(幂等性)、信息丢失、顺序错乱,队列堆积等问题
  • 系统一致性问题,如果有多个消费者(A、B、C),可能发生处理不一致情况(A、B写入成功了,C失败了)

三. 目前有哪些成熟的MQ产品,各自有什么优缺点呢?如何进行技术选型?

目前主要有ActiveMQ、RabbitMQ、RocketMQ、Kafka等消息队列产品,另外Redis也可以用作简单的消息队列。

  • ActiveMQ是老牌MQ,比较成熟,功能完备,单机吞吐量一般(万级),消息可靠性一般(有较低概率丢失数据),基于主从架构实现高可用, 社区已经不活跃了,更新频次低,目前已经不推荐使用
  • RabbitMQ是目前主流MQ,功能完备,单机吞吐量一般(万级),延时低,时效性性能极好(微秒级,其他MQ是毫秒级),消息可靠性好(一般不丢消息), 基于主从架构实现高可用,使用erlang语言开发(很难读源码,不容易定制和掌控),提供非常完善的管理界面,社区活跃,更新频次高,非常适合中小型公司业务。
  • RocketMQ是阿里开源的MQ,功能比较完备,使用java开发,单机吞吐量高(十万级),topic可以达到几百上千(吞吐量下降幅度不大),消息可靠性好(可做到0丢失), 分布式架构实现高可用,扩展性好,社区较活跃,可以支持复杂业务场景,适合有基础架构研发实力的中大型公司
  • Kafka功能较为简单,单机吞吐量高(十万级),社区活跃度高,topic多时吞吐量会下降的比较明显,消息可靠性好(可做到0丢失), 分布式架构实现高可用,非常适合大数据领域的实时计算、日志采集等,

四. MQ是如何实现高可用的?

对于RabbitMQ,有两种集群模式,普通集群模式(默认)和镜像集群模式

  • 普通集群模式:所有实例有相同的queue元数据(即队列的结构),但消息实体只存在于其中一个实例中,consumer消费时,如果访问的实例没有消息实体,则实例节点间会进行数据传输, 从有消息实体的实例取出消息实体并经过当前访问的实例发送给consumer,这种模式可以提高消息吞吐量,缺点是会在集群内部产生大量数据传输,可用性也没有保障。
  • 镜像集群模式:多个实例都保存queue的完整镜像(包含全部数据),每次写消息到queue时,都会进行多个实例间的消息同步,consumer可以从任何一个节点消费数据, 即使某个实例宕机也不影响。缺点是网络带宽开销大,并且这不是分布式的,不具备扩展性。

可以通过后台新增一个镜像集群模式的策略,可以同步到指定数量的节点或所有节点,创建queue时应用这个策略即可。

Kafka的高可用架构

Kafaka由多个broker组成,一个broker进程就是一个节点,创建一个topic,这个topic可以划分成多个partition,partition可存在于不同的broker上, 每个partition只放一部分数据 -- 以此实现分布式架构。

Kafka通过副本(replica)机制实现高可用。每个topic+partition都有多个副本,副本间会选举出一个作为leader,其他副本作为follower, 只有leader可以对外提供读写,leader会把数据同步到follower上。如果leader宕机,Kafka会立刻感知,并从follower中选举出新的leader。

五. 为什么会出现重复消费?

以Kafka为例,每条消息都有一个offset,代表这个消息的顺序序号,consumer消费数据时,是按照这个顺序去消费的,consumer消费完一条数据时, 会提交offset(基于zookeeper),假如此时consumer被重启,consumer会告知kafka继续从上次消费到的地方后面的数据开始传送过来。

消费者提交offset不是实时的,是定时提交的,如果准备提交offset但还没有提交时,consumer被重启了,会造成kafka不知道当前消费到哪条数据了, 所以在consumer恢复以后,kafka会重复向消费者发送数据。

如何保证幂等性?

要保证幂等性,可以在消费者拿到一条数据后,先往redis中存一条,每次拿到新数据后,先判断redis里面有没有这条数据(通过一个唯一id),有则抛弃,没有才能继续后续处理。

如果要写入数据库,可以基于唯一索引保证数据不重复插入。

六. 为什么会丢失数据?以及如何防止数据丢失?

对于RabbitMQ,有三个环节会造成数据丢失:

  • 生产者在网络传输过程中可能丢数据,或者消息到了mq但内部出错没保存下来。
    解决方案:
    1. 使用事物功能。
      开启事物 channel.txSelect,回滚事物 channel.txRoolback,提交事物 channel.txCommit。
      缺点:同步阻塞,降低吞吐量。
    2. confirm模式
      通过回调接口实现,异步非阻塞
  • RabbitMQ收到消息之后,消费者没来得及消费,RabbitMQ有可能挂掉了,RabbitMQ内存中的数据会丢失。
    解决方案:开启持久化。分两步:
    1. 创建queue时将其设置为持久化的,这一步可以保证持久化元数据,但不会持久化消息实体。
    2. 发送消息时将消息的deliveryMode设置为2,将消息实体持久化
  • 消费者消费到一条消息, 但是还没来得及处理就挂掉了,RabbitMQ以为已经处理完了,会造成数据丢失。
    造成这种问题是因为开启了autoAck机制,消费到数据会自动通知RabbitMQ,但如果消息处理中还没处理完时系统宕机了, RabbitMQ却以为处理完了。
    解决方案:关闭autoAck

对于Kafka,消费者弄丢数据的情况和RabbitMQ类似,消费者会自动提交offset,只要关闭自动提交,就可以避免数据丢失。

Kafka自己弄丢数据的情况,常见的场景是某个broker(leader)突然宕机,数据还没来得及同步到follower,重新选举的leader就会丢失一部分数据。
解决方案:

  1. topic设置replication.factor参数必须大于1,要求每个partition至少有2个副本。
  2. kafka服务端设置min.insync.replicas参数必须大于1,要求一个leader至少感知到有一个follower跟自己保持联系。
  3. producer端设置 acks=all,要求每条数据必须写入所有replica之后,才认为写入成功了,如果失败则重试。
  4. producer端设置 retries=很大的数字,相当于无限次重试

七. 为什么发生顺序错乱情况?如何保证从MQ拿到的数据顺序执行?

对于RabbitMQ,一个queue,多个消费者,必然出现顺序不一致
解决方案:
拆分成多个queue,需要保证顺序性的消息都发到同一个queue里,只由一个consumer消费。
或者一个queue,对应一个consumer,consumer内部用内存队列排队,分发给底层不同worker处理。

对于kafka,生产者在写入时可以指定一个key(例如订单id),同一个key的数据一定会被分发到同一个partition中,同一个partition中的数据一定是有序的。 一个partition只能有一个消费者,所以消费者从partition中取出的数据一定是有序的。

但是如果一个消费者内部使用多线程处理,就会导致消息错乱,解决办法是每一个线程增加一个内存队列,partition中的消息先按订单id分发到队列中再处理。

八. 如何处理消息积压?

解决办法是临时紧急扩容,具体步骤为:

  1. 先修复consumer的问题,确保其恢复消费速度,然后将现有consumer都停掉。
  2. 临时建立好原先10倍20倍的queue数量。
  3. 写一个临时的分发数据的consumer,消费积压的数据,消费之后不做耗时处理,直接均匀轮训写入临时建立的10倍/20倍数量的queue里
  4. 临时征用10倍机器部署consumer,每一个consumer消费一个queue,以10倍速度消费数据。
  5. 挤压数据消费完成后,恢复原先架构。

如果MQ设置了过期时间,并且因为超时未处理导致数据丢失怎么办?首先,通常不应设置MQ过期时间,万一发生了这种情况,只有写一个临时程序, 等系统过了高峰期,把丢掉的数据一点点找回来。

如果MQ积压导致磁盘满了,也只能临时写一个消费者,把消息写到别的队列,或者直接丢弃,先解决线上问题,再想办法补偿。


PS:

阿里云的仓库镜像: /Users/yourname/.m2/settings.xml

            <mirror>
                <id>alimaven
                <name>aliyun maven
                <url>http://maven.aliyun.com/nexus/content/groups/public/
                <mirrorOf>central
            </mirror>