当前位置:首页 > Java技术 > 分布式专题|因为不知道Rabbit如何实现延时队列,我最终还是没能进入大厂

分布式专题|因为不知道Rabbit如何实现延时队列,我最终还是没能进入大厂

2022年11月09日 23:24:32Java技术29


学过rabbitmq的同学应该都知道,rabbitmq是没有延时队列功能的,为什么面试官还会问这个奇葩的问题呢?
因为面试官问你这个问题,是在考你知识整合的逻辑能力.

我在这里可以肯定地说:rabbitmq是没有实现延时队列的功能,但是我们可以曲线救国,使用死信队列+TTL同样可以实现延时队列的功能。

还有一种实现方式是通过延迟队列插件实现,我后面也会介绍。

延时队列使用场景

用的最多的地方就是订单支付超时取消订单

在说如何实现之前,我们先来介绍下什么是死信队列和TTL:

关键点讲解

死信队列

在rabbitmq中,死信队列其实应该称为死信交换机,那么这个死信到底是什么意思呢?
死信队列和普通队列没有区别的,但是用户不会主动把消息发送到这个队列或交换机中的,只有当以下几种情况发生了,消息才会从原来的队列中转发到死信队列:

  • 原队列的消息长度超过预定的限制
  • 消费者拒收消息,basicNack/basicReject,并且没有把消息重新放回队列中
  • 原队列消息设置了过期时间,如果在过期之前,还没有被消费者消费,那么也会被转到死信队列中;

死信队列相关的设置参数是绑定在队列中设置的:x-dead-letter-exchange
x-dead-letter-routing-key
分布式专题|因为不知道Rabbit如何实现延时队列,我最终还是没能进入大厂 _ JavaClub全栈架构师技术笔记

TTL

TTL全称是Time To Live,翻译为过期时间,当消息已经存活到ttl设置的时间后还没有被消费,则会清除该消息,rabbit可以对队列设置过期消息,也可以对具体的消息设置过期消息,这里提一个小小的面试题:

问:rabbit是如何处理设置了过期时间的消息的?

答:rabbit实现的是一个懒策略去清理过期时间,目的是为了保证消息队列的高吞吐量;这个懒策略是通过在消息到达了队列的顶部之后,broker会检查队列是否设置了过期时间,如果设置了则检查过期时间是否已经到了,如果到了则剔除消息,不推送此消息,切记不要回答,broker会遍历每个消息,检查过期时间,切记切记!!!

前面已经介绍了两个重要的技术点,现在该进入本文的主题了,rabbitmq到底是如何实现延时队列的呢?

使用TTL+DLX

实现思路

想必大家在经过我上面对TTL和死信队列的讲解后,大家有可能心里已经知道该如何实现了,不过就算你知道如何实现了,我还是要讲的,哈哈

因为TTL是可以对消息设置过期时间,而进入死信队列的条件中有这么一条:原队列消息设置了过期时间,如果在过期之前,还没有被消费者消费,那么也会被转到死信队列中,那么我们可以结合这两者这么去做,处理正常业务的监听器去监听这个死信队列,然后给正常队列设置下这个死信队列的参数,那么消息流转会变成这样:

  • 我发送了一个设置过期时间为10000毫秒的消息到broker中
  • broker把消息放到了队列中
  • 过了10000毫秒后,消息还没有被消费掉
  • broker就会把消息转发到死信交换机中,再由死信交换机把消息推送到死信队列中
  • 我刚开始已经设置了一个监听器去监听了死信队列,那么我收到这个消息的时候肯定是在10000毫秒以后了;

代码编写

  • 生产者队列与交换机绑定和队列声明
@Configuration
public class RabbitMQConfig {
     
    public static final String QUEUE_TEST_DLX = "queue_test_dlx";
    public static final String QUEUE_TEST_NORMAL = "queue_test_normal";
    public static final String EXCHANGE_TEST_DLX = "exchange_test_dlx";
//    声明一个默认不进行消费的队列 绑定死信队列交换机和死信队列的key
    @Bean("queueTestNormal")
    public Queue queueTestNormal() {
     
        return QueueBuilder.durable(QUEUE_TEST_NORMAL).deadLetterExchange(EXCHANGE_TEST_DLX).deadLetterRoutingKey("testdlx").build();
    }
    //    声明死信队列
    @Bean("queueTestDLX")
    public Queue queueTestDLX() {
     
        return QueueBuilder.durable(QUEUE_TEST_DLX).build();
    }
    //  声明死信交换机
    @Bean("exchangeTestDLX")
    public Exchange exchangeTestDLX() {
     
        return ExchangeBuilder.directExchange(EXCHANGE_TEST_DLX).durable(true).build();
    }
    //    死信队列与死信交换机绑定
    @Bean
    public Binding itemQueueExchange7(@Qualifier("queueTestDLX") Queue queue,
                                      @Qualifier("exchangeTestDLX") Exchange exchange) {
     
        return BindingBuilder.bind(queue).to(exchange).with("testdlx").noargs();
    }
    }
  • 生产者使用简单发送消息给普通队列,并设置过期时间为10s
    @Test
    public void testDLX() {
     
        rabbitTemplate.convertAndSend(null, "queue_test_normal","我是10秒之后才到的", new MessagePostProcessor() {
     
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
     
                MessageProperties messageProperties = message.getMessageProperties();
                messageProperties.setExpiration(10000+"");
                return message;
            }
        });
        System.out.println("我发送消息的时间为:"+(System.currentTimeMillis()));
        System.out.println("开始倒计时:10");
        int i = 10;
        while (true){
     
            try {
     
                Thread.sleep(1000);
            } catch (InterruptedException e) {
     
                e.printStackTrace();
            }
            if(i>0){
     
                System.out.println("倒计时:"+(--i));
            }

        }
    }
  • 消费者监听器编写
    @RabbitListener(queues = "queue_test_dlx")
    public void onMessage5(Message message, Channel channel) throws Exception {
     
        System.out.println("我收到消息的时间为:"+(System.currentTimeMillis()));
        System.out.println("收到消息来自队列queue_test_dlx:" + new String(message.getBody()));
    }

总结

到目前为止。延时队列已经实现完成,我们现在来总结下这种方式实现延时队列的唯一缺点:

不及时:因为只有消息到达了队列顶部,broker才会去检查消息是否过期,进行推送,加入设置了过期时间的消息前面有一个设置了更长时间过期时间的消息,这样会导致过期时间小的消息一直没有被处理掉,一直在队列中等待;

因为这个原因,rabbitmq引入了一个延时队列插件,这个插件的实现思路和前面的实现方式不同,当给一个消息设置了延迟时间后,它并不会立即把消息推送到队列,而是等消息过了设置的延时时间后才放到队列中,我们现在介绍下延时队列插件是如何实现的:

使用延时队列插件

安装延时队列插件

#下载插件 https://www.cnblogs.com/geekdc/p/13549613.html

docker cp /Users/yangle/docker/rabbitmq/plugins/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez rabbitmq:/plugins

#进入容器

docker exec -it rabbitmq /bin/bash

#启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

#查看

rabbitmq-plugins list

#重新启动容器

docker restart rabbitmq

代码编写

  • 交换机与队列绑定配置文件
@Configuration
public class RabbitMQConfig {
     
    public static final String QUEUE_TEST_DELAY_PLUGIN = "queue_test_delay_plugin";
    public static final String EXCHANGE_TEST_DELAY_PLUGIN = "exchange_test_delay_plugin";
     //    声明一个队列
    @Bean("queueDelayPlugin")
    public Queue queueDelayPlugin() {
     
        return QueueBuilder.durable(QUEUE_TEST_DELAY_PLUGIN).build();
    }
    @Bean
    CustomExchange delayExchange(){
     
        Map<String, Object> args = new HashMap<>();
        // 设置为路由模式
        args.put("x-delayed-type", "direct");
        // type必须设置为x-delayed-message
        return new CustomExchange(EXCHANGE_TEST_DELAY_PLUGIN, "x-delayed-message", true,false, args);
    }

    //    插件交换机与队列绑定
    @Bean
    public Binding itemQueueExchange8(@Qualifier("queueDelayPlugin") Queue queue,
                                      @Qualifier("delayExchange") Exchange exchange) {
     
        return BindingBuilder.bind(queue).to(exchange).with("testDelayPlugin").noargs();
    }}
  • 发送消息
    @Test
    public void testDelayPlugin() {
     
        rabbitTemplate.convertAndSend("exchange_test_delay_plugin", "testDelayPlugin", "测试延时插件发送消息", new MessagePostProcessor() {
     
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
     
                message.getMessageProperties().setDelay(10000);
                return message;
            }
        });
    }
  • 监听器
    @RabbitListener(queues = "queue_test_delay_plugin")
    public void onMessage6(Message message, Channel channel) throws Exception {
     
        System.out.println("我收到消息的时间为:"+(System.currentTimeMillis()));
        System.out.println("收到消息来自队列queue_test_delay_plugin:" + new String(message.getBody()));
    }

总结

虽然说插件实现延迟队列的方式简单,但是它也有他的局限性:

  • 会降低性能,所以如果没有该需求,则不要使用。
  • 该插件不适合大数据量的延时消息,比如百万或一亿。
  • 延时时长:0<=n<=(2^32)-1 ,单位毫秒。

微信搜一搜【乐哉开讲】关注帅气的我,回复【干货领取】,将会有大量面试资料和架构师必看书籍等你挑选,包括java基础、java并发、微服务、中间件等更多资料等你来取哦。

作者:乐哥聊编程
来源链接:https://lglbc.blog.csdn.net/article/details/109701943

版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。

2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。


本文链接:https://www.javaclub.cn/java/69068.html

分享给朋友:

“分布式专题|因为不知道Rabbit如何实现延时队列,我最终还是没能进入大厂” 的相关文章

yum安装步骤(网络下载安装)

yum安装步骤(网络下载安装)

yum是Linux系统的安装必备神器,简直不要太方便。但是新系统一般是不自带yum工具的,所以需要手动安装一下。环境:centos7新建一个目录用来保存yum安装包 mkdir install  进入文件夹并输入命令wget http://yum.bas...

Dubbo原理浅析

Dubbo原理浅析

一、Dubbo是什么 百度百科(没有什么比度娘知道的更多):Dubbo是阿里巴巴公司开源的一个高性能优秀的服务框架,使得应用可通过高性能的 RPC 实现服务的输出和输入功能,可以和Spring框架无缝集成。Dubbo是一款高性能、轻量级的开源Java RPC框架,它提供了三大核心能力:面向接...

SpringBoot整合Dubbo与zookeeper纯注解版

SpringBoot整合Dubbo与zookeeper纯注解版

一、Dubbo和zk的作用 上回讲到,Dubbo作为一款优秀的RPC框架,封装了dubbo-provider(提供者)和dubbo-consumer(消费者),而provider和consumer之间需要通过注册中心来作为可发现的服务目录。而zookeeper(此处简称zk)提供了服务接口注...

linux系统(CentOS7)下安装tomcat8及加载缓慢处理

linux系统(CentOS7)下安装tomcat8及加载缓慢处理

一、下载tomcat 地址奉上 二、linux下安装 1.使用SecureCRT终端连接服务器 2.使用sftp Session上传tomcat压缩包到服务器上 3.使用tar -zxvf 命令解压tomcat 4.进入bin目录启动tomcat(./sta...

SpringBoot整合hibernate纯注解版

SpringBoot整合hibernate纯注解版

一、hibernate是什么 hibernate是一款优秀的ORM(Object Relational Mapping ,对象关系映射)框架,是一种面向对象编程的框架,它对JDBC进行了封装,是一个全自动的ORM框架,可以自动生成SQL语句,也可以自定义HQL进行执行脚本。 优点:hib...

常用设计模式系列(五)—原型模式

常用设计模式系列(五)—原型模式

一、前言 各位朋友大家好,经过几天的努力,我已经讲解了常用设计模式其中的四种,分别是简单工厂、工厂方法、抽象工厂、建造者模式,正在想着下一遍写哪个模式的时候,有个朋友先说他想先看看原型模式,说想看看原型模式是如何运行的,然后我给这个朋友讲了个故事:“从前有个老铁得了个病,听说还挺严重的,后来...

java基础知识讲解(一)数据类型和运算符

java基础知识讲解(一)数据类型和运算符

Java是一种强类型语言,每个变量都必须声明其数据类型。 Java的数据类型可分为两大类:基本数据类型(primitive data type)和引用数据类型(reference data type)。 Java中定义了3类8种基本数据类型 数值型- b...

SpringBoot整合MybatisPlus基本的增删改查,保姆级教程

SpringBoot整合MybatisPlus基本的增删改查,保姆级教程

概述MybatisPlus是国产的第三方插件, 它封装了许多常用的CURDapi,免去了我们写mapper.xml的重复劳动,这里介绍了基本的整合SpringBoot和基础用法。引入依赖在项目中pom文件引入mybatisplus和mysql驱动依赖,如下图   &nb...

一文搞懂“网络协议”

一文搞懂“网络协议”

目录 一.各类协议介绍 1.计算机协议介绍 2.TCP/IP 协议群 3.TCP 协议传输特点 4.服务端口 5.数据包与处理流程 二.HTTP 协议 1.HTTP 协议介绍 2.HTTP 协议的请求与响应 5.1.7...

Spring Cloud面试问题

Spring Cloud面试问题

问:什么是Spring Cloud?     答: Spring Cloud Stream App Starters是基于Spring Boot的Spring Integration应用程序,提供与外部系统的集成。Spring Cloud Task。...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。