MQ:RabbitMQ

同步和异步通讯

同步通讯:

需要实时响应,时效性强

耦合度高

每次增加功能都要修改两边的代码

性能下降

需要等待服务提供者的响应,如果调用链过长则每次响应时间需要等待所有调用完成

资源浪费

调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源

级联失败

如果一个服务的提供者出现了问题,所有调用方都会出问题,出现雪崩问题

异步通讯:

在发布方和接收方之间存在中间人(Broker)

发布方只需将消息发布到中间人

接受方只需要从中间人订阅消息

实现解耦

吞吐量提升

无需等待订阅者处理完成,响应快速

故障隔离

服务不存在直接调用,不存在级联失败问题

资源占用问题

调用之间不会阻塞,不会产生无效资源占用问题

解耦

每个服务之间灵活插拔,实现解耦

流量削峰

所有发布事件由Broker直接接受,接受者按照自己的速度从Broker处理事件,实现缓冲

MQ

MessageQueue消息队列

即上述过程中的Broker

几种常见的MQ对比
请添加图片描述

MQ的基本结构

请添加图片描述

publisher:发布者

consumer:消费者

exchange:交换机,负责消息路由

queue:队列,存储消息,消息的缓冲区

队列绑定交换机

virtualHost:虚拟主机

channel:表示通道,操作MQ的工具,连接消息发布者和交换机,连接消息接受者和队列

RabbitMQ整体工作流程

发布者发布消息给交换机

交换机将消息路由到与其绑定的队列

消费者监听与其对应的队列获取消息

RabbitMQ消息模型

生产者->(交换机)->队列->消费者
基本消息队列BasicQueue

请添加图片描述

工作消息队列WorkQueue

请添加图片描述

多个消费者并发消费队列,消费者之间是竞争关系

发布订阅(Publish,Subscribe)

根据交换机类型不同分为三种

广播

请添加图片描述

消费者各自拥有

生产者将消息发送到交换机,具体发给哪个队列,生产者无法决定,由交换机决定.

交换机把消息发送给绑定过的所有队列,队列的所有消费者都能拿到消息.实现一条消息被多个消费者消费

生产者->所有消费者

路由

请添加图片描述

需求不同的消息被不同队列消费,就需要用到Direct类型的Exchange.在Direct模型下,需要指定RoutingKey(路由key).在消息发送方向交换机发送消息时,必须指定消息的路由key

交换机在接受到生产者消息后,将消息递交给routingkey完全匹配的队列

主题

请添加图片描述

topic类型相当于可以使用通配符匹配routingkey的路由类型

通配符规则

#:匹配一个或者多个词

*:匹配恰好一个词

SpringAMQP

Spring官方基于RabbitMQ提供的一套消息收发的模版工具:SpringAMQP

提供了三个功能:

自动声明队列,交换机以及绑定关系

基于注解的监听器模式,异步接收消息

封装RabbitTemplate工具用于发送消息

引入依赖
 <!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

实现通信

新建队列->publisher发送->consumer接收

基于rabbitTemplate API进行发送

简单队列通信

发送:

    @GetMapping("/simple")
    public void publishMessage(){
        String queueName = "simplequeue";
        String message = "simplequeuemessage";
        rabbitTemplate.convertAndSend(queueName,message);
        System.out.println("simple_success");
    }

接收:

@Component
public class MqListener {
    @RabbitListener(queues = "simplequeue")
    public void simpleListener(String msg){
        System.out.printf("%s 简单队列收到消息:%s", Convert.toStr(LocalDateTime.now()),msg);
    }
}

工作队列通信

多个消费者监听一个队列,不同消费者因为自身的能力不同对消息处理的时间也不同

如果不进行额外设置的话,会将队列中的消息平均分配给所有消费者

造成处理能力浪费的情况

所以我们可以通过配置

    listener:
      simple:
        prefetch: 1#每个消费者每次只能取一条

来限制每个消费者预取的数量,实现能者多劳的工作场景

发送:

    @GetMapping("/work")
    public void PublishWorkMessage(){
        String queueName = this.WORK;
        for (int i = 0; i < 100; i++) {
            rabbitTemplate.convertAndSend(queueName,System.currentTimeMillis());
        }
    }

接收:

    @RabbitListener(queues = "workqueue")
    public void workListener1(String msg){
        System.out.printf("%s 工作队列1收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);
    }

    @RabbitListener(queues = "workqueue")
    public void workListener2(String msg){
        System.out.printf("%s 工作队列2收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);
    }

交换机

上述两种工作方式都是不包含交换机,消息直接发送到队列的通信方式

我们可以通过引入交换机来实现消息的路由,决定具体要发送到哪个队列

消息通信流程 发布者->交换机->交换机决定的队列->消费者

交换机类型

包含以下四种:

FanOut:广播,将消息传递给所有绑定交换机的队列
Direct:基于RoutingKey发送消息给对应的队列
Topic:通配符订阅,基于通配符RoutingKey发送消息给对应的队列
Headers:头匹配,基于MQ的消息头匹配

FanOut

创建多个队列->创建交换机进行绑定->发布者发布->消费者接收

发布:

    @GetMapping("/fanout")
    public void publishFanoutMessage(){
        String exchangeName = this.FANOUT;
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend(exchangeName,"",System.currentTimeMillis() + "from:fanout");
        }
    }

接收:

    @RabbitListener(queues = "cfjg_queue1")
    public void queue1Listener(String msg){
        System.out.printf("%s 队列1收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);
    }
    @RabbitListener(queues = "cfjg_queue2")
    public void queue2Listener(String msg){
        System.out.printf("%s 队列2收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);
    }

Direct

创建多个队列->创建交换机进行路由key绑定->发布者发布->消费者接收

发布:

    @GetMapping("/direct")
    public void publishDirectMessage(){
        String exchangeName = this.DIRECT;
        for (int i = 0; i < 10; i++) {
            if(i%2 == 0){
                rabbitTemplate.convertAndSend(exchangeName,this.Queue1,"directTo:queue1--" + i);
            }else if(i%2 !=  0){
                rabbitTemplate.convertAndSend(exchangeName,this.Queue2,"directTo:queue2--" + i);
            }
            if(i%5 == 0){
                rabbitTemplate.convertAndSend(exchangeName,this.ALL,"directTo:all--" + i);
            }
        }
    }

接收:

    @RabbitListener(queues = "cfjg_queue1")
    public void queue1Listener(String msg){
        System.out.printf("%s 队列1收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);
    }
    @RabbitListener(queues = "cfjg_queue2")
    public void queue2Listener(String msg){
        System.out.printf("%s 队列2收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);
    }

Topic

创建多个队列->创建交换机进行通配符路由key绑定->发布者发布->消费者接收

路由key由多个由.分割的单词组成

绑定时使用#或*通配符进行绑定

#:代表任意多个单词

*.代表任意一个单词

eg:cfjg.test

可以由cfjg.#或者*.test进行匹配

发布:

    @GetMapping("/topic")
    public void publishTopicMessage(){
        for (int i = 0; i < 100; i++) {
            String tmp = i % 2 == 0 ? "two" : "one";
            String routingKey = "cfjg." + tmp;
            System.out.println(routingKey);
            rabbitTemplate.convertAndSend(this.TOPIC,routingKey,"topicTo:queue--" + i);
        }
    }

接收:

    @RabbitListener(queues = "cfjg_queue1")
    public void queue1Listener(String msg){
        System.out.printf("%s 队列1收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);
    }
    @RabbitListener(queues = "cfjg_queue2")
    public void queue2Listener(String msg){
        System.out.printf("%s 队列2收到消息:%s\n", Convert.toStr(LocalDateTime.now()),msg);
    }

声明队列和交换机

SpringAMQP提供了一个Queue类用来创建队列

public class Queue extends AbstractDeclarable implements Cloneable {}

提供了一个Exchange接口用来表示不同类型的交换机

请添加图片描述

SpringAMQP提供了ExchangeBuilder和BindingBuilder来简化创建和绑定队列和交换机的过程

我们可以在消费者中编写一个配置类来对队列和交换机进行声明

@Configuration
public class MqConfig {
    //声明交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("cfjg.fanout");
    }
	
    //声明队列
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("test_queue");
    }
	
    //声明绑定
    @Bean
    public Binding bingingQueue1(){
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }
}
direct和topic模式需要每个key都进行一次绑定(同控制台操作)

基于注解声明

Spring提供了基于注解方式进行声明的途径

通过注解可以声明

绑定
队列
交换机
路由key
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(name = "cfjg_queue2"),
                    exchange = @Exchange(name = "cfjg.fanout" , type = ExchangeTypes.FANOUT),
                    key = {"red","blue"}
            )
    )
    public void fanoutQueue(String msg){
        System.out.println(msg);
    }

消息转换器

在MQ的消息传输中,会先将对象序列化为字节,接收消息时将字节反序列化为Java对象

但是默认的JDK序列化存在以下问题

数据体积过大,

存在安全漏洞,

可读性差

可以使用JSON进行序列化和反序列化

引入JackSon依赖来进行JSON序列化

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

通过在配置类中注册转换器Bean来实现消息发送时的自动序列化

	@Bean
    public MessageConverter messageConverter(){
        return new jackson2JsonMessageConverter();
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/775698.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【后端面试题】【中间件】【NoSQL】MongoDB查询优化2(优化排序、mongos优化)

优化排序 在MongoDB里面&#xff0c;如果能够利用索引来排序的话&#xff0c;直接按照索引顺序加载数据就可以了。如果不能利用索引来排序的话&#xff0c;就必须在加载了数据之后&#xff0c;再次进行排序&#xff0c;也就是进行内存排序。 可想而知&#xff0c;如果内存排序…

【RT-thread studio 下使用STM32F103-学习sem-信号量-初步使用-线程之间控制-基础样例】

【RT-thread studio 下使用STM32F103-学习sem-信号量-初步使用-线程之间控制-基础样例】 1、前言2、环境3、事项了解&#xff08;1&#xff09;了解sem概念-了解官网消息&#xff08;2&#xff09;根据自己理解&#xff0c;设计几个使用方式&#xff08;3&#xff09;不建议运行…

DataWhale-吃瓜教程学习笔记 (七)

学习视频**&#xff1a;第6章-支持向量机_哔哩哔哩_bilibili 西瓜书对应章节&#xff1a; 第六章 支持向量机 - 算法原理 几何角度 对于线性可分数据集&#xff0c;找距离正负样本距离都最远的超平面&#xff0c;解是唯一的&#xff0c;泛化性能较好 - 超平面 - 几何间隔 例…

堆叠的作用

一、为什么要堆叠 传统的园区网络采用设备和链路冗余来保证高可靠性&#xff0c;但其链路利用率低、网络维护成本高&#xff0c;堆叠技术将多台交换机虚拟成一台交换机&#xff0c;达到简化网络部署和降低网络维护工作量的目的。 二、堆叠优势 1、提高可靠性 堆叠系统多台成…

ServiceImpl中的参数封装为Map到Mapper.java中查询

ServiceImpl中的参数封装为Map到Mapper.java中查询&#xff0c;可以直接从map中获取到key对应的value

【Python机器学习】处理文本数据——多个单词的词袋(n元分词)

使用词袋表示的主要缺点之一就是完全舍弃了单词顺序。因此“its bad&#xff0c;not good at all”和“its good&#xff0c;not bad at all”这两个字符串的词袋表示完全相同&#xff0c;尽管它们的含义相反。幸运的是&#xff0c;使用词袋表示时有一种获取上下文的方法&#…

LeetCode热题100刷题3:3. 无重复字符的最长子串、438. 找到字符串中所有字母异位词、560. 和为 K 的子数组

3. 无重复字符的最长子串 滑动窗口、双指针 class Solution { public:int lengthOfLongestSubstring(string s) {//滑动窗口试一下//英文字母、数字、符号、空格,ascii 一共包含128个字符vector<int> pos(128,-1);int ans 0;for(int i0,j0 ; i<s.size();i) {//s[i]…

全端面试题15(canvas)

在前端开发领域&#xff0c;<canvas> 元素和相关的 API 是面试中经常被提及的主题。下面是一些常见的关于 HTML5 Canvas 的面试问题及解答示例&#xff1a; 1. 什么是 <canvas> 元素&#xff1f; <canvas> 是 HTML5 引入的一个用于图形渲染的标签。它本身并…

能否免费使用Adobe XD?

Adobe XD不是免费的。Adobe 目前XD采用订阅模式&#xff0c;提供订阅模式 7 每天试用期结束后需要付费购买&#xff0c;具体价格根据不同的订阅计划确定&#xff0c;包括每月购买&#xff0c;包括 9.99 美元或每月 99.99 美元&#xff0c;或者选择购买Adobe CreativeCloud整体订…

【qt】如何通过域名获得IP地址?

域名是什么呢?像www.baidu.com的baidu.com就是域名. 域名相当于是网站的门牌号. 域名可以通过 DNS 解析将其转换为对应的 IP 地址. 用我们获取IP地址的方式就可以,但是现在没有可以用另一种方法. 槽函数的实现: void MainWindow::lookupHost(const QHostInfo &hostInf…

Python学习笔记29:进阶篇(十八)常见标准库使用之质量控制中的数据清洗

前言 本文是根据python官方教程中标准库模块的介绍&#xff0c;自己查询资料并整理&#xff0c;编写代码示例做出的学习笔记。 根据模块知识&#xff0c;一次讲解单个或者多个模块的内容。 教程链接&#xff1a;https://docs.python.org/zh-cn/3/tutorial/index.html 质量控制…

RedHat / CentOS安装FTP服务

本章教程,记录在RedHat / CentOS中安装FTP的具体步骤。FTP默认端口:21 1、安装 epel 源 yum install -y epel-release2、安装 pure-ftpd yum -y install pure-ftpd3、修改默认配置 # 默认配置位于 /etc/pure-ftpd/pure-ftpd.conf,在配置文件中找到下面几个参数进行修改:#…

并发、多线程和HTTP连接之间有什么关系?

一、并发的概念 并发是系统同时处理多个任务或事件的能力。在计算中&#xff0c;这意味着系统能够在同一时间段内处理多个任务&#xff0c;而不是严格按照顺序一个接一个地执行它们。并发提高了系统的效率和资源利用率&#xff0c;从而更好地满足用户的需求。在现代应用程序中&…

C++ windows下使用openvino部署yoloV8

目录 准备版本&#xff1a; 准备事项: 选择配置界面&#xff1a; 下载界面&#xff1a; ​编辑 添加VS配置&#xff1a; 准备代码&#xff1a; yolov8.h yolov8.cpp detect.cpp 如何找到并放置DLL&#xff1a; 准备版本&#xff1a; opencv 4.6.0 openvino 2024.0…

深度解读:Etched Sohu与Groq LPU芯片的区别

本文简单讲解一下Etched Sohu与Groq LPU两种芯片的区别。 设计理念的差异 首先&#xff0c;这两款产品在设计理念上完全是两条不同的路线。Etched Sohu芯片的设计理念是围绕Transformer模型进行优化。Transformer模型近年来在NLP任务中表现出色&#xff0c;Etched公司因此为其…

SpringSecurity中文文档(Servlet Password Storage)

存储机制&#xff08;Storage Mechanisms&#xff09; 每种支持的读取用户名和密码的机制都可以使用任何支持的存储机制&#xff1a; Simple Storage with In-Memory AuthenticationRelational Databases with JDBC AuthenticationCustom data stores with UserDetailsServic…

4个免费文章生成器,为你免费一键生成原创文章

在当今的创作领域&#xff0c;创作者们常常陷入各种困境。灵感的缺失、内容创新的压力&#xff0c;每一项都如同沉重的枷锁&#xff0c;束缚着他们的创作步伐。但随着免费文章生成器的出现&#xff0c;宛如一场及时雨&#xff0c;为创作者们带来了新的希望和转机。免费文章生成…

【ABB】原点设定

【ABB】原点设定 操作流程演示 操作流程 操作轴回原点编辑电机校准偏移更新转速计数器 1.首先得了解机器手的轴&#xff0c;这里以6轴作参考。 注意先回456轴&#xff0c;后回123轴。 2.然后需要了解机器人关节运动模式&#xff0c;即选择如下两个模式。 3.注意机器人各轴移动…

19C 单机文件系统安装文档

准备工作 1)查看系统版本、内核参数 more /etc/redhat-release more /etc/redflag-releaseuname -a2)查看当前系统是否配置了HugePages。在下面的查询中&#xff0c;HugePages的几个相关值都为0&#xff0c;表明当前未配值HugePages&#xff0c;其次可以看到该版本的大页大小为…

Linux服务器性能参数指标

【摘要】一个基于 Linux 操作系统的服务器运行的同时&#xff0c;会表征出各种各样参数信息&#xff0c;这些蛛丝马迹往往会帮助快速定位跟踪问题。 这里只是一些简单的工具查看系统的相关参数&#xff0c;当然很多工具也是通过分析加工 /proc、/sys 下的数据来工作的&#xff…