RabbitMQ整合SpringBoot

RabbitMQ六种工作模式

RabbitMQ流程

生产者发送消息,包括Exchange交换机名称,和routingkey,然后匹配到正确的消息队列,消费者在从那里取出消息

默认的视图界面端口为15672,账号密码为guest

整合SpringBoot

  • pom
1
2
3
4
5
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

生产端

  • properties
1
2
3
4
5
spring.rabbitmq.addresses=localhost:5672
spring.activemq.user=guest
spring.activemq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
1
2
3
4
5
6
7
8
@Data
// 这里要注意的是要传入MQ的数据信息必须要实现Serializable接口
public class message implements Serializable {
private static final long serialVersionUID = -1652432744718867431L;
private String id;
private String name;
private String messageId;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
public class producer {
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendOrder(message message) {
// 消息唯一ID
CorrelationData correlationData = new CorrelationData();
correlationData.setId(message.getMessageId());
// convertAndSend方法参数为交换机名称,routingkey,传输的信息,消息唯一ID
rabbitTemplate.convertAndSend("message-Exchange", "message.abcd", message, correlationData);
}
}
  • 分别新建一个交换机和消息队列

  • 绑定交换机和消息队列

Routing key中如果是xxx.*则匹配的只有一层,比如xxx.123,但如果是xxx.#则匹配所有层,比如xxx.123.123

1
2
3
4
5
6
7
8
9
10
11
@Autowired
producer producer;
// 测试MQ
@Test
public void sendMessage() {
message message = new message();
message.setId("10086");
message.setName("zyj");
message.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID());
producer.sendOrder(message);
}

消费端

  • properties
1
2
3
4
5
6
7
8
9
10
11
12
13
## rabbitmq 基本配置
spring.rabbitmq.addresses=localhost:5672
spring.activemq.user=guest
spring.activemq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

## rabbitmq 消费端配置
spring.rabbitmq.listener.simple.max-concurrency=10
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.prefetch=1
## ack手动签收消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • 和生产端一样需要一个pojo,如上的message类
  • 消费端代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 消费端代码
@Component
public class consumer {

// 标示方法
@RabbitHandler
// 开启监听,如果没有该绑定的信息,则自动生成
// durable是否持久化
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "message-queues",durable = "true"),
exchange = @Exchange(value = "message-Exchange",durable = "true",type = "topic"),
key = "message.abcd"
)
)
public void messageReceive(@Payload message message, Channel channel, @Headers Map<String, Object> headers) throws Exception {
System.out.println(message.getMessageId());
System.out.println(message.getName());
Long DELIVERY_TAG = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 因为前面acknowledge-mode设置为manual,需要手动签收,所以这里需要ack
// 是否支持批量签发
channel.basicAck(DELIVERY_TAG, false);
}
}

使用ack的目的

如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会丢失任何消息了

订单系统mq流程图

赏个🍗吧
0%