服务异步通讯

MQ(消息队列)

同步通讯和异步通讯

学习MQ之前,先学习同步通讯和异步通讯

同步通讯

同步通讯指的是通讯双方需要在时间上保持一致,也就是一个操作必须等待另一个操作完成后才能执行下一个操作。

比如一个请求需要等待服务器返回结果之后才能继续处理下一个请求。

同步通讯方式的特点:

  1. 简单明了
  2. 适用于处理量不大、并发量不高的场景

缺点:

  1. 阻塞等待:在同步通讯过程中,请求方需要等待响应方返回数据,这个等待过程可能会阻塞请求方的线程,从而导致程序无法继续执行其他任务。
  2. 慢速处理:由于同步通讯需要等待响应方返回数据后再进行下一步处理,所以它的处理速度相对慢,特别是在高并发量和大数据量的情况下。
  3. 处理逻辑复杂:同步通讯需要明确的时间规划和处理顺序,这往往需要编写更复杂的代码逻辑,增加了程序的开发难度和维护成本。
  4. 可靠性低:由于同步通讯需要双方在时间上保持一致,所以如果其中一个方出现了问题,可能会导致整个通讯过程失败,进而影响到整个系统的正常运行。

同步调用存在问题:

  1. 耦合度高:每次加入新的需求,都要修改原来的代码
  2. 性能下降:调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和。
  3. 资源浪费:调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源
  4. 级联失败:如果服务提供者出现问题,所有调用方都会跟着出问题,如同多米诺骨牌一样,迅速导致整个微服务群故障

异步通讯

异步通讯则是指通讯双方不需要时间上保持一致,也就是一个操作可以在另一个操作执行的过程中继续执行。

异步通讯方式特点:

  1. 高效性
  2. 灵活性
  3. 适用于处理大数据量、高并发量的场景

缺点:

  1. 多线程开销:由于异步通讯需要使用多线程来处理并发请求,所以会占用更多的系统资源和内存空间,进而增加系统开销和负担。
  2. 处理逻辑复杂:由于异步通讯需要使用回调函数等机制来处理回应消息,所以编写程序时需要设计更复杂的逻辑和代码结构。
  3. 可读性低:异步通讯中的回调函数容易形成层层嵌套的调用关系,这会影响程序的可读性和可维护性。
  4. 调试难度大:由于异步通讯中的执行顺序不是固定的,同时又涉及到多线程、事件驱动等复杂的技术,所以对程序进行调试比同步通讯更为困难。

异步通讯的典型例子包括消息队列、事件驱动等。

在团队合作中的异步通信发生在没有实时对话或交互的情况下,比如通过邮件、留言等方式进行沟通;而同步通信则是在实时对话或交互的情况下进行,比如在线会议、视频通话等。需要根据具体的情况选择使用异步或同步通讯方式,以达到最佳的工作效果。

image-20240415141551972

统一发送,但是服务执行的时间并不要同时完成

异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

  • 消息发送者:投递消息的人,就是原来的调用方

  • 消息代理:管理、暂存、转发消息,你可以把它理解成微信服务器

  • 消息接收者:接收和处理消息的人,就是原来的服务提供方

image-20240415141728572异步调用优势

  1. 服务解耦
  2. 性能提升,吞吐量提高
  3. 服务没有强依赖,不担心级联失败问题
  4. 流量削峰

image-20240415141817012

四种MQ:主要是使用RabbiMQ 、RocketMQ、Kafka

image-20240415141842883

image-20240415141855359

RabbitMQ

介绍与安装

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:

Messaging that just works — RabbitMQ 接下来,我们就学习它的基本概念和基础用法。

将提供的mq.tar文件拉近服务器

执行代码

1
docker load -i mq.tar

我们同样基于Docker来安装RabbitMQ,使用下面的命令即可:

1
2
3
4
5
6
7
8
9
10
11
docker run \
-e RABBITMQ_DEFAULT_USER=itheima \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network hmall \
-d \
rabbitmq:3.8-management

如果拉取镜像困难的话,可以使用课前资料给大家准备的镜像,利用docker load命令加载:

image-20240415142034937

可以看到在安装命令中有两个映射的端口:

  • 15672:RabbitMQ提供的管理控制台的端口
  • 5672:RabbitMQ的消息发送处理接口

安装完成后,我们访问 http://192.168.150.101:15672即可看到管理控制台。首次访问需要登录,默认的用户名和密码在配置文件中已经指定了。登录后即可看到管理控制台总览页面:

image-20240415142105800安装完成后,我们访问

http://192.168.150.101:15672即可看到管理控制台。首次访问需要登录,默认的用户名和密码在配置文件中已经指定了。 登录后即可看到管理控制台总览页面:

RabbitMQ对应的架构如图:

image-20240415142159538

其中包含几个概念:

  • virtual-host:虚拟主机,起到数据管理的作用
  • publisher:消息发送者
  • consumer:消息的消费者
  • queue:队列,存储消息
  • exchange:交换机,负责路由消息

快速入门

需求:在RabbitMQ的控制台完成下列操作:

  • 新建队列hello.queue1和hello.queue2
  • 向默认的amp.fanout交换机发送一条消息
  • 查看是否达到hello.queue1和hello.queue2

交换机

我们打开Exchanges选项卡,可以看到已经存在很多交换机:我们点击任意交换机,即可进入交换机详情页面。仍然会利用控制台中的publish message 发送一条消息:

image-20240415142337166

这里是由控制台模拟了生产者发送的消息。由于没有消费者存在,最终消息丢失了,这样说明交换机没有存储消息的能力。

image-20240415142508692

image-20240415142537622

队列

我们打开Queues选项卡,新建一个队列:

命名为hello.queue1:

再以相同的方式,创建一个队列,密码为hello.queue2,最终队列列表如下:

此时,我们再次向amq.fanout交换机发送一条消息。会发现消息依然没有到达队列!!

怎么回事呢?

发送到交换机的消息,只会路由到与其绑定的队列,因此仅仅创建队列是不够的,我们还需要将其与交换机绑定。

image-20240415142737157

image-20240415142753261

image-20240415142810006

绑定关系

点击Exchanges选项卡,点击amq.fanout交换机,进入交换机详情页,然后点击Bindings菜单,在表单中填写要绑定的队列名称:

相同的方式,将hello.queue2也绑定到改交换机。

最终,绑定结果如下:

image-20240415142854852

image-20240415142910259

发送消息

再次回到exchange页面,找到刚刚绑定的amq.fanout,点击进入详情页,再次发送一条消息:

回到Queues页面,可以发现hello.queue中已经有一条消息了:

点击队列名称,进入详情页,查看队列详情,这次我们点击get message:

可以看到消息到达队列了:

这个时候如果有消费者监听了MQ的hello.queue1或hello.queue2队列,自然就能接收到消息了。

image-20240415142942583

image-20240415142957475

image-20240415143016803

image-20240415143034707

数据隔离

需求:在RabbitMQ的控制台完成下列操作:

  • 新建一个用户hamall
  • 为hmall用户创建一个virtual host
  • 测试不同virtual host之间的数据隔离现象

用户管理

点击Admin选项卡,首先会看到RabbitMQ控制台的用户管理界面:这里的用户都是RabbitMQ的管理或运维人员。目前只有安装RabbitMQ时添加的itheima这个用户。仔细观察用户表格中的字段,如下:

image-20240415143156267

  • Name

:itheima,也就是用户名

  • Tags

:administrator,说明itheima用户是超级管理员,拥有所有权限

  • Can access virtual host

: /,可以访问的virtual host,这里的/是默认的virtual host

对于小型企业而言,出于成本考虑,我们通常只会搭建一套MQ集群,公司内的多个不同项目同时使用。这个时候为了避免互相干扰, 我们会利用virtual host的隔离特性,将不同项目隔离。一般会做两件事情:

  • 给每个项目创建独立的运维账号,将管理权限分离。
  • 给每个项目创建不同的

virtual host,将每个项目的数据隔离。

比如,我们给黑马商城创建一个新的用户,命名为hmall:你会发现此时hmall用户没有任何virtual host的访问权限:别急,接下来我们就来授权。

image-20240415143237400

image-20240415143522340

virtual host

我们先退出登录:切换到刚刚创建的hmall用户登录,然后点击Virtual Hosts菜单,进入virtual host管理页:可以看到目前只有一个默认的virtual host,名字为 /。 我们可以给黑马商城项目创建一个单独的virtual host,而不是使用默认的/。创建完成后如图:由于我们是登录hmall账户后创建的virtual host,因此回到users菜单,你会发现当前用户已经具备了对/hmall这个virtual host的访问权限了:

image-20240415143603599

image-20240415143623670

image-20240415143637627

image-20240415143650641

image-20240415143705398

此时,点击页面右上角的virtual host下拉菜单,切换virtual host为 /hmall:然后再次查看queues选项卡,会发现之前的队列已经看不到了:这就是基于virtual host的隔离效果。

image-20240415143850767

image-20240415143907825

java客户端-快速入门

AMQP(Advanced Message Queuing Protocol,用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求)

Spring AMQP(是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp)是基础抽象,spring-rabbit是底层的默认实现。

需求如下:

  • 利用控制台创建队列simple.queue
  • 在publisher服务中,利用SpringAMQP直接向simple.queue分手发送消息
  • 在consumer服务中,利用SpringAMQP编写消费者,监听simple.queue队列

1.创建队列simple.queue

image-20240415145026046

2.在父工程中中引入spring-amqp依赖,这样publish和consumer服务都可以使用:

1
2
3
4
5
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.在每一个微服务中引入 MQ服务端信息,这样微服务才能连接到RabbitMQ

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 112.124.30.198
port: 5672
virtual-host: /wcw
username: hmall
password: 123

4.SpringAMQP提供了RabbitTemplate工具类,方便我们发送代码如下:

1
2
3
4
5
6
7
8
9
@Autowored
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue(){
//队列名称
String quwueName = "simplate.queue";
String message = "hello,spring amqp";
rabbitTemplate.converAndSend(queueName,message);
}

5.接收消息

SpringAMQP提供声明式的消息监听,我们只需要通过注注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前的方法:

1
2
3
4
5
6
7
8
9
10
11
12
@Self4j
@Component
public class SpringRabbitListener{
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
log.info("spring 消费者接收消息:【"+msg+"】");
if(true){
throw new MessageConversionException("故意的");
}
log.info("消息处理完成");
}
}

image-20240415145208689

SpringAMQP如何收发消息?

1.引入spring-boot-starter-amqp依赖

2.配置rabbitmq服务端消息

3.利用RabbitTemplate发送消息

4.利用@RabbitListener注解声明要监听的队列,监听消息

Work模型

模拟workQueue,实现一个队列绑定多个消费者

​ 1.在RabbitMQ的控制台创建一个队列,名为work.queue

​ 2.在publishe服务中定义测试方法,在1秒内产生50条消息,发送到work.queue

​ 3.在consumer服务中定义两个消息监听者,都监听work.queue队列

​ 4.消费者1每秒处理50条消息,消费者2每秒处理5条消息

1.在RabbitMQ的控制台创建一个队列,名为work.queue

image-20240415145352131

2.在publisher服务中定义测试方法,在1秒内产生50条消息,发送到work.queue

1
2
3
4
5
6
7
8
9
10
@Test
void tesWorkQueue() throws InterruptedException{
String queueName = "work.queue";
for(int i = 0; i < 50; i++ ){
String msg = "hello,work,message_"+i;
rabbitTemplate.convertAndSend(queueName,msg);
Thread.sleep(20);
}

}

3.在consumer服务中定义两个消息监听者,都监听work.queue队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Slf4j
@Component
public class MqListener {

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg){
System.out.println("spring 消费者接收消息:【"+msg+"】");
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg){
System.out.println("消费者1 收到WorkQueue消息:【"+msg+"】");
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg){
System.err.println("消费者2 收到WorkQueue消息:【"+msg+"】");
}
}

默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。

因此我们需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息:

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
prefetch: 1#每次只能获取一条消息,处理完成才能获取下一个消息

image-20240415145457739

work模型的使用:

  • 多个消费者绑定到一个队列,可以加快消息处理速度
  • 同一个消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量,处理完一条处理下一条,实现能者多劳

交换机

真正生产环境都会经过exchange来发送消息,而不是直接发送给队列,交换机的类型有以下三钟:

  • Fanout:广播
  • Direct:定向
  • Topic:话题

Fanout交换机

Fanout Exchange 会将接收到每一个跟其绑定的queue,所以也叫广播模式

image-20240415145605575

实现思路:

​ 在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2

​ 在RabbitMQ控制台中,声明交换机hmall。fanout,将两个队列与其绑定

​ 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

​ 在publisher中编写测试方法,向hmall.fanout发送消息

1.在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2

image-20240415145656482

2.在RabbitMQ控制台中,声明交换机hmall.fanout,将两个队列与其绑定

image-20240415145719047

image-20240415145729249

3.在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

1
2
3
4
5
6
7
8
9
10
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) throws InterruptedException{
System.out.println("消费者1 收到fanoutQueue1消息:【"+msg+"】");
Thread.sleep(20);
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) throws InterruptedException{
System.out.println("消费者2 收到fanoutQueue2消息:【"+msg+"】");
Thread.sleep(100);
}

4.在publisher中编写测试方法,向hmall.fanout发送消息

1
2
3
4
5
6
@Test
void testSendFanout(){
String exChangeName = "hmall.fanout";
String msg = "hello,everyone";
rabbitTemplate.convertAndSend(exChangeName,null,msg);
}

image-20240415145857235

交换机的作用是什么?

  • 接接收publisher发送的消息
  • 将消息按规则路由到与之绑定的队列
  • FanoutExchange的会将消息路由到每个绑定的队列

Direct交换机

Direct Exchange会将接收到消息根据规则路由到指定的Queue,因此成为定向路由

  • 每个Queue都与Exchange设置一个BindKey
  • 发布者发送消息时,指定消息的RoutingKry
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

image-20240415145958511

需求:

  • 在RabbitMQ控制台中,声明对列direct.queue1和direct.queue2

  • 在RabbitMQ控制台中,声明交换机hmall.dirext,将两个队列与其绑定

  • 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

  • 在publicsher中编写测试方法,利用不同的RoutingKey向hmall.direct发送消息

1.在RabbitMQ控制台中,声明对列direct.queue1和direct.queue2

image-20240415150056867

2.在RabbitMQ控制台中,声明交换机hmall,dirext,将两个队列与其绑定

image-20240415150117746

3.在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

1
2
3
4
5
6
7
8
9
10
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) throws InterruptedException{
System.out.println("消费者1 收到fanoutQueue1消息:【"+msg+"】");
Thread.sleep(20);
}
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) throws InterruptedException{
System.out.println("消费者2 收到fanoutQueue2消息:【"+msg+"】");
Thread.sleep(100);
}

4.在publicsher中编写测试方法,利用不同的RoutingKey向hmall.direct发送消息

1
2
3
4
5
6
7
8
9
10
11
12
@Test
void testSendDirect1(){
String exChangeName = "hmall.direct";
String msg = "hello,everyone";
rabbitTemplate.convertAndSend(exChangeName,"red",msg);
}
@Test
void testSendDirect2(){
String exChangeName = "hmall.direct";
String msg = "hello,everyone";
rabbitTemplate.convertAndSend(exChangeName,"blue",msg);
}

先测试键为red(路由绑定队列共有的键)的测试方法,测试结果如图所示:

image-20240415150231742

在测试键为blue的键(这是directQueue1独有的键),因此如下只有收到directQueue1的消息

image-20240415150310111

Topic交换机

TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并以.分割

  • #:代指0个或多个单词
  • *:代指一个单词

image-20240415150346468

需求如下:

1.在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2

2.在RabbitMQ控制台中,声明交换机hmall.topic,将两个队列与其绑定

3.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2

4.publisher中编写测试方法,利用不同的RoutingKey向hmall.topic发送消息

1.在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2

image-20240415150408303

2.在RabbitMQ控制台中,声明交换机hmall.topic,将两个队列与其绑定

image-20240415150444087

3.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2

1
2
3
4
5
6
7
8
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg) throws InterruptedException{
System.out.println("消费者1 收到topicQueue1消息:【"+msg+"】");
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg) throws InterruptedException{
System.out.println("消费者2 收到topicQueue2消息:【"+msg+"】");
}

4.publisher中编写测试方法,利用不同的RoutingKey向hmall.topic发送消息

1
2
3
4
5
6
@Test
void testSendTopic1(){
String exChangeName = "hmall.topic";
String msg = "我来通知你了!!!";
rabbitTemplate.convertAndSend(exChangeName,"japan.news",msg);
}

image-20240415150534327

1
2
3
4
5
6
@Test
void testSendTopic2(){
String exChangeName = "hmall.topic";
String msg = "今天天气很不错,我的心情特别好!!!";
rabbitTemplate.convertAndSend(exChangeName,"chain.weather",msg);
}

image-20240415150626183

描述下Direct交换机和Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey可以是多个单词,以.分割
  • Topic交换机与队列绑定时的bingdingKey可以指定通配符
  • #:代表0个或多个词
  • *:代表1个词

基于注解@RabbitListener声明队列和交换机

记得停止项目运行在删除上面得交换机和队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//@RabbitListener(queues = "direct.queue1")
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1",durable = "true"),
exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
key = {"red","blue"}
))
public void listenDirectQueue1(String msg) throws InterruptedException{
System.out.println("消费者1 收到directQueue1消息:【"+msg+"】");
Thread.sleep(20);
}
//@RabbitListener(queues = "direct.queue2")
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2",durable = "true"),
exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
key = {"red","yellow"}
))
public void listenDirectQueue2(String msg) throws InterruptedException{
System.out.println("消费者2 收到directQueue2消息:【"+msg+"】");
Thread.sleep(100);
}

image-20240415150838262

image-20240415150854136

声明队列、交换机、绑定关系得Bean是什么?

  • Queue
  • FanoutExchange、DirectExchange、TopicExchange
  • Binding

基于@RabbitListener注解声明队列和交换机有哪些常见注解?

  • @Queue
  • Exchange
  • @QueueBinding

消息转换器

需求:测试利用SpringAMQP发送对象型的消息

  1. 声明一个队列,名为object.queue
  2. 编写单元测试,向一个队列直接发送一个消息,消息类型为Map
  3. 在控制台查看消息,直接你能发现的问题
1
2
3
4
//准备消息
Map<String,Obeject> msg = new HashMap<>();
msg.put("name","Jack");
msg.put("age",21);

1.声明一个队列,名为object.queue

image-20240415150954892

2.编写单元测试,向一个队列直接发送一个消息,消息类型为Map

1
2
3
4
5
6
7
@Test
void testSendObject(){
Map<String,Object> msg = new HashMap<>(2);
msg.put("name","jack");
msg.put("age",21);
rabbitTemplate.convertAndSend("object.queue",msg);
}

运行测试代码我们发现,队列存的是一串我们不认识的字符串(字节码)

image-20240415151046809

3.引入Jackson依赖

1
2
3
4
5
6
<!--Jackson-->
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-xml -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>

4.在启动类上添加下面的bean

1
2
3
4
@Bean
public MessageConverter jacksonMessageConvertor(){
return new Jackson2JsonMessageConverter();
}

再次启动一下测试类

然后在rabittmq管理端查看队列,如图:

image-20240415151155347

5.编写一个接收消息的方法

image-20240415151222747

image-20240415151229269

第一个被原生的序列化的字节序列当消费者收到时直接报错,第二条被json序列化的数据接收正常

image-20240415151308584