消息中间件MQ

参考博客

幂等性

采用同样的输入多次调用处理函数,得到同样的结果

幂等例子

1
update stat_table set count= 10 where id =1

反例

1
update stat_table set count= count +1 where id= 1;

重复消费

造成原因

主要是因为消息接收者成功处理完消息后,消息中间件不能及时更新投递状态造成的重复消息投递

解决方法

  • mvcc

在更新时,带上数据的版本号,消费者更新时需比较版本号,如果不符合则不能够更新,防止重复消费,每一个 version 只有一次执行成功的机会,一旦失败了生产者必须重新获取数据的最新版本号再次发起更新。

1
2
3
4
// 接口
public boolean addCount(Long id, Long version);
// 更新sql
update blogTable set count= count+1,version=version+1 where id=321 and version=123
  • 去重表

利用一张日志表来记录已经处理成功的消息的 id,如果新到的消息 id 已经在日志表中,那么就不再处理这条消息

保证消息的可靠性传输

ActiveMQ

  • Producer (生产者) 应该采用事务消息或持久化消息

  • Consumer(消费者) 需要发出ACK指令,指明改消息被消费,Broker收到ACK指令后,才会认为消息被正确的处理了,以下为四种确认机制

    • AUTO_ACKNOWLEDGE=1:自动确认
    • CLIENT_ACKNOWLEDGE=2:客户端手动确认
    • DUPS_OK_ACKNOWLEDGE=3:自动批量确认
    • SESSION_TRANSACTED=0:事务提交并确认

高级特性:Message Groups 解决了负载均衡和一组消息顺序消费

在一个消息被分发到 Consumer 之前,Broker 首先检查消息 JMSXGroupID 属性。如果存在,那么 Broker 会检查是否有某个 Consumer 拥有这个 Message Group。如果没有,那么 Broker 会选择一个 Consumer,并将它关联到这个 Message Group。此后,这个 Consumer 会接收这个 Message Group 的所有消息,直到 Consumer 被关闭。

同一个 queue 中,拥有相同 JMSXGroupID 的消息将发往同一个消费者,解决顺序问题;不同分组的消息又能被其他消费者并行消费,解决负载均衡的问题

RabbitMQ**

  • 防止Producer (生产者)搞丢了数据: 使用 Confirm 机制,和事务机制相比,优点是异步回调,如果成功接收消息,RabbitMQ会给你回传ACK消息,如果接受失败,会回调你nack接口,每次写的消息都会分配一个唯一id

  • 防止RabbitMQ搞丢了数据:数据必须开启RabbitMQ持久化,持久化步骤如下

    • 创建queue和交换器的时候将其设置为持久化,这样能保证RabbitMQ中元数据被持久化

    • 发送消息时将deliveryMode设置为2

这样的话配合前面生产者的Confirm 机制,只有消息被持久化到磁盘之后,才会通知生产者 ACK

  • 防止消费端丢失数据:如果你消费的时候,刚消费到,但还没有处理进程就挂了,比如重启,此时,如果是自动ACK那么,RabbitMQ会认为你已经消费了,那么数据也就丢了。为了防止此类情况产生,需要关闭RabbitMQ 的自动ACK,通过API调用手动ACK即可,数据处理完之后ACK即可
赏个🍗吧
0%