LOGO OA教程 ERP教程 模切知識(shí)交流 PMS教程 CRM教程 開(kāi)發(fā)文檔 其他文檔  
 
網(wǎng)站管理員

RabbitMQ基礎(chǔ)入門

freeflydom
2025年8月4日 10:42 本文熱度 576

RabbitMQ介紹

RabbitMQ是基于Erlang語(yǔ)言開(kāi)發(fā)的開(kāi)源消息通信中間件,官網(wǎng)地址:
Messaging that just works — RabbitMQ
接下來(lái),我們就學(xué)習(xí)它的基本概念和基礎(chǔ)用法。

安裝

在安裝命令中有兩個(gè)映射的端口:

  • 15672:RabbitMQ提供的管理控制臺(tái)的端口
  • 5672:RabbitMQ的消息發(fā)送處理接口

安裝完成后,訪問(wèn) http://127.0.0.1:15672即可看到管理控制臺(tái)。首次訪問(wèn)需要登錄,默認(rèn)的用戶名和密碼在配置文件中已經(jīng)指定了。
登錄后即可看到管理控制臺(tái)總覽頁(yè)面:

RabbitMQ對(duì)應(yīng)的架構(gòu)如圖:

其中包含幾個(gè)概念:

  • publisher:生產(chǎn)者,也就是發(fā)送消息的一方
  • consumer:消費(fèi)者,也就是消費(fèi)消息的一方
  • queue:隊(duì)列,存儲(chǔ)消息。生產(chǎn)者投遞的消息會(huì)暫存在消息隊(duì)列中,等待消費(fèi)者處理
  • exchange:交換機(jī),負(fù)責(zé)消息路由。生產(chǎn)者發(fā)送的消息由交換機(jī)決定投遞到哪個(gè)隊(duì)列。
  • virtual host:虛擬主機(jī),起到數(shù)據(jù)隔離的作用。每個(gè)虛擬主機(jī)相互獨(dú)立,有各自的exchange、queue

上述這些東西都可以在RabbitMQ的管理控制臺(tái)來(lái)管理,下一節(jié)我們就一起來(lái)學(xué)習(xí)控制臺(tái)的使用。

收發(fā)消息

交換機(jī)

打開(kāi)Exchanges選項(xiàng)卡,可以看到已經(jīng)存在很多交換機(jī):

點(diǎn)擊任意交換機(jī),即可進(jìn)入交換機(jī)詳情頁(yè)面。仍然會(huì)利用控制臺(tái)中的publish message 發(fā)送一條消息:


這里是由控制臺(tái)模擬了生產(chǎn)者發(fā)送的消息。由于沒(méi)有消費(fèi)者存在,最終消息丟失了,這樣說(shuō)明交換機(jī)沒(méi)有存儲(chǔ)消息的能力。

隊(duì)列

打開(kāi)Queues選項(xiàng)卡,新建一個(gè)隊(duì)列:

命名為hello.queue1

再以相同的方式,創(chuàng)建一個(gè)隊(duì)列,密碼為hello.queue2,最終隊(duì)列列表如下:

此時(shí),再次向amq.fanout交換機(jī)發(fā)送一條消息。會(huì)發(fā)現(xiàn)消息依然沒(méi)有到達(dá)隊(duì)列!!
怎么回事呢?
發(fā)送到交換機(jī)的消息,只會(huì)路由到與其綁定的隊(duì)列,因此僅僅創(chuàng)建隊(duì)列是不夠的,還需要將其與交換機(jī)綁定。

綁定關(guān)系

點(diǎn)擊Exchanges選項(xiàng)卡,點(diǎn)擊amq.fanout交換機(jī),進(jìn)入交換機(jī)詳情頁(yè),然后點(diǎn)擊Bindings菜單,在表單中填寫要綁定的隊(duì)列名稱:

相同的方式,將hello.queue2也綁定到改交換機(jī)。
最終,綁定結(jié)果如下:

發(fā)送消息

再次回到exchange頁(yè)面,找到剛剛綁定的amq.fanout,點(diǎn)擊進(jìn)入詳情頁(yè),再次發(fā)送一條消息:

回到Queues頁(yè)面,可以發(fā)現(xiàn)hello.queue中已經(jīng)有一條消息了:

點(diǎn)擊隊(duì)列名稱,進(jìn)入詳情頁(yè),查看隊(duì)列詳情,這次我們點(diǎn)擊get message:

可以看到消息到達(dá)隊(duì)列了:

這個(gè)時(shí)候如果有消費(fèi)者監(jiān)聽(tīng)了MQ的hello.queue1hello.queue2隊(duì)列,自然就能接收到消息了。

數(shù)據(jù)隔離

用戶管理

點(diǎn)擊Admin選項(xiàng)卡,首先會(huì)看到RabbitMQ控制臺(tái)的用戶管理界面:

這里的用戶都是RabbitMQ的管理或運(yùn)維人員。目前只有安裝RabbitMQ時(shí)添加的itheima這個(gè)用戶。仔細(xì)觀察用戶表格中的字段,如下:

  • Nameitheima,也就是用戶名
  • Tagsadministrator,說(shuō)明itheima用戶是超級(jí)管理員,擁有所有權(quán)限
  • Can access virtual host: /,可以訪問(wèn)的virtual host,這里的/是默認(rèn)的virtual host

對(duì)于小型企業(yè)而言,出于成本考慮,我們通常只會(huì)搭建一套MQ集群,公司內(nèi)的多個(gè)不同項(xiàng)目同時(shí)使用。這個(gè)時(shí)候?yàn)榱吮苊饣ハ喔蓴_, 我們會(huì)利用virtual host的隔離特性,將不同項(xiàng)目隔離。一般會(huì)做兩件事情:

  • 給每個(gè)項(xiàng)目創(chuàng)建獨(dú)立的運(yùn)維賬號(hào),將管理權(quán)限分離。
  • 給每個(gè)項(xiàng)目創(chuàng)建不同的virtual host,將每個(gè)項(xiàng)目的數(shù)據(jù)隔離。

virtual host

先退出登錄:

切換到剛剛創(chuàng)建的 用戶登錄,然后點(diǎn)擊Virtual Hosts菜單,進(jìn)入virtual host管理頁(yè):

可以看到目前只有一個(gè)默認(rèn)的virtual host,名字為 /。
我們可以給項(xiàng)目創(chuàng)建一個(gè)單獨(dú)的virtual host,而不是使用默認(rèn)的/。

創(chuàng)建完成后如圖:

由于是登錄hmall賬戶后創(chuàng)建的virtual host,因此回到users菜單,你會(huì)發(fā)現(xiàn)當(dāng)前用戶已經(jīng)具備了對(duì)/hmall這個(gè)virtual host的訪問(wèn)權(quán)限了:

此時(shí),點(diǎn)擊頁(yè)面右上角的virtual host下拉菜單,切換virtual host為 /hmall

然后再次查看queues選項(xiàng)卡,會(huì)發(fā)現(xiàn)之前的隊(duì)列已經(jīng)看不到了:

這就是基于virtual host的隔離效果。

SpringAMQP

將來(lái)我們開(kāi)發(fā)業(yè)務(wù)功能的時(shí)候,肯定不會(huì)在控制臺(tái)收發(fā)消息,而是應(yīng)該基于編程的方式。由于RabbitMQ采用了AMQP協(xié)議,因此它具備跨語(yǔ)言的特性。任何語(yǔ)言只要遵循AMQP協(xié)議收發(fā)消息,都可以與RabbitMQ交互。并且RabbitMQ官方也提供了各種不同語(yǔ)言的客戶端。
但是,RabbitMQ官方提供的Java客戶端編碼相對(duì)復(fù)雜,一般生產(chǎn)環(huán)境下我們更多會(huì)結(jié)合Spring來(lái)使用。而Spring的官方剛好基于RabbitMQ提供了這樣一套消息收發(fā)的模板工具:SpringAMQP。并且還基于SpringBoot對(duì)其實(shí)現(xiàn)了自動(dòng)裝配,使用起來(lái)非常方便。

SpringAmqp的官方地址:Spring AMQP

SpringAMQP提供了三個(gè)功能:

  • 自動(dòng)聲明隊(duì)列、交換機(jī)及其綁定關(guān)系
  • 基于注解的監(jiān)聽(tīng)器模式,異步接收消息
  • 封裝了RabbitTemplate工具,用于發(fā)送消息

這一章我們就一起學(xué)習(xí)一下,如何利用SpringAMQP實(shí)現(xiàn)對(duì)RabbitMQ的消息收發(fā)。

配置依賴

配置SpringAMQP相關(guān)的依賴:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>cn.itcast.demo</groupId>
    <artifactId>mq-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>publisher</module>
        <module>consumer</module>
    </modules>
    <packaging>pom</packaging>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.12</version>
        <relativePath/>
    </parent>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--AMQP依賴,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--單元測(cè)試-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
</project>

快速入門

在之前的案例中,我們都是經(jīng)過(guò)交換機(jī)發(fā)送消息到隊(duì)列,不過(guò)有時(shí)候?yàn)榱藴y(cè)試方便,我們也可以直接向隊(duì)列發(fā)送消息,跳過(guò)交換機(jī)。如圖:

也就是:

  • publisher直接發(fā)送消息到隊(duì)列
  • 消費(fèi)者監(jiān)聽(tīng)并處理隊(duì)列中的消息

注意:這種模式一般測(cè)試使用,很少在生產(chǎn)中使用。

為了方便測(cè)試,我們現(xiàn)在控制臺(tái)新建一個(gè)隊(duì)列:simple.queue

添加成功:

接下來(lái),我們就可以利用Java代碼收發(fā)消息了。

消息發(fā)送

首先配置MQ地址,在publisher服務(wù)的application.yml中添加配置:

spring:
  rabbitmq:
    host: 127.0.0.1 # 你的虛擬機(jī)IP
    port: 5672 # 端口
    virtual-host: /hmall # 虛擬主機(jī)
    username: hmall # 用戶名
    password: 123 # 密碼

然后在publisher服務(wù)中編寫測(cè)試類SpringAmqpTest,并利用RabbitTemplate實(shí)現(xiàn)消息發(fā)送:

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue() {
        // 隊(duì)列名稱
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 發(fā)送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

打開(kāi)控制臺(tái),可以看到消息已經(jīng)發(fā)送到隊(duì)列中:

接下來(lái),我們?cè)賮?lái)實(shí)現(xiàn)消息接收。

消息接收

首先配置MQ地址,在consumer服務(wù)的application.yml中添加配置:

spring:
  rabbitmq:
    host: 127.0.0.1 # 你的虛擬機(jī)IP
    port: 5672 # 端口
    virtual-host: /hmall # 虛擬主機(jī)
    username: hmall # 用戶名
    password: 123 # 密碼

然后在consumer服務(wù)的com.itheima.consumer.listener包中新建一個(gè)類SpringRabbitListener,代碼如下:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitListener {
	// 利用RabbitListener來(lái)聲明要監(jiān)聽(tīng)的隊(duì)列信息
    // 將來(lái)一旦監(jiān)聽(tīng)的隊(duì)列中有了消息,就會(huì)推送給當(dāng)前服務(wù),調(diào)用當(dāng)前方法,處理消息。
    // 可以看到方法體中接收的就是消息體的內(nèi)容
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消費(fèi)者接收到消息:【" + msg + "】");
    }
}

測(cè)試

啟動(dòng)consumer服務(wù),然后在publisher服務(wù)中運(yùn)行測(cè)試代碼,發(fā)送MQ消息。最終consumer收到消息:

WorkQueues模型

Work queues,任務(wù)模型。簡(jiǎn)單來(lái)說(shuō)就是讓多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列,共同消費(fèi)隊(duì)列中的消息。

當(dāng)消息處理比較耗時(shí)的時(shí)候,可能生產(chǎn)消息的速度會(huì)遠(yuǎn)遠(yuǎn)大于消息的消費(fèi)速度。長(zhǎng)此以往,消息就會(huì)堆積越來(lái)越多,無(wú)法及時(shí)處理。
此時(shí)就可以使用work 模型,多個(gè)消費(fèi)者共同處理消息處理,消息處理的速度就能大大提高了。

接下來(lái),我們就來(lái)模擬這樣的場(chǎng)景。
首先,我們?cè)诳刂婆_(tái)創(chuàng)建一個(gè)新的隊(duì)列,命名為work.queue

消息發(fā)送

這次我們循環(huán)發(fā)送,模擬大量消息堆積現(xiàn)象。
在publisher服務(wù)中的SpringAmqpTest類中添加一個(gè)測(cè)試方法:

/**
     * workQueue
     * 向隊(duì)列中不停發(fā)送消息,模擬消息堆積。
     */
@Test
public void testWorkQueue() throws InterruptedException {
    // 隊(duì)列名稱
    String queueName = "simple.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 0; i < 50; i++) {
        // 發(fā)送消息,每20毫秒發(fā)送一次,相當(dāng)于每秒發(fā)送50條消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}

消息接收

要模擬多個(gè)消費(fèi)者綁定同一個(gè)隊(duì)列,我們?cè)赾onsumer服務(wù)的SpringRabbitListener中添加2個(gè)新的方法:

@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消費(fèi)者1接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消費(fèi)者2........接收到消息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
}

注意到這兩消費(fèi)者,都設(shè)置了Thead.sleep,模擬任務(wù)耗時(shí):

  • 消費(fèi)者1 sleep了20毫秒,相當(dāng)于每秒鐘處理50個(gè)消息
  • 消費(fèi)者2 sleep了200毫秒,相當(dāng)于每秒處理5個(gè)消息

測(cè)試

啟動(dòng)ConsumerApplication后,在執(zhí)行publisher服務(wù)中剛剛編寫的發(fā)送測(cè)試方法testWorkQueue。
最終結(jié)果如下:

消費(fèi)者1接收到消息:【hello, message_0】21:06:00.869555300
消費(fèi)者2........接收到消息:【hello, message_1】21:06:00.884518
消費(fèi)者1接收到消息:【hello, message_2】21:06:00.907454400
消費(fèi)者1接收到消息:【hello, message_4】21:06:00.953332100
消費(fèi)者1接收到消息:【hello, message_6】21:06:00.997867300
消費(fèi)者1接收到消息:【hello, message_8】21:06:01.042178700
消費(fèi)者2........接收到消息:【hello, message_3】21:06:01.086478800
消費(fèi)者1接收到消息:【hello, message_10】21:06:01.087476600
消費(fèi)者1接收到消息:【hello, message_12】21:06:01.132578300
消費(fèi)者1接收到消息:【hello, message_14】21:06:01.175851200
消費(fèi)者1接收到消息:【hello, message_16】21:06:01.218533400
消費(fèi)者1接收到消息:【hello, message_18】21:06:01.261322900
消費(fèi)者2........接收到消息:【hello, message_5】21:06:01.287003700
消費(fèi)者1接收到消息:【hello, message_20】21:06:01.304412400
消費(fèi)者1接收到消息:【hello, message_22】21:06:01.349950100
消費(fèi)者1接收到消息:【hello, message_24】21:06:01.394533900
消費(fèi)者1接收到消息:【hello, message_26】21:06:01.439876500
消費(fèi)者1接收到消息:【hello, message_28】21:06:01.482937800
消費(fèi)者2........接收到消息:【hello, message_7】21:06:01.488977100
消費(fèi)者1接收到消息:【hello, message_30】21:06:01.526409300
消費(fèi)者1接收到消息:【hello, message_32】21:06:01.572148
消費(fèi)者1接收到消息:【hello, message_34】21:06:01.618264800
消費(fèi)者1接收到消息:【hello, message_36】21:06:01.660780600
消費(fèi)者2........接收到消息:【hello, message_9】21:06:01.689189300
消費(fèi)者1接收到消息:【hello, message_38】21:06:01.705261
消費(fèi)者1接收到消息:【hello, message_40】21:06:01.746927300
消費(fèi)者1接收到消息:【hello, message_42】21:06:01.789835
消費(fèi)者1接收到消息:【hello, message_44】21:06:01.834393100
消費(fèi)者1接收到消息:【hello, message_46】21:06:01.875312100
消費(fèi)者2........接收到消息:【hello, message_11】21:06:01.889969500
消費(fèi)者1接收到消息:【hello, message_48】21:06:01.920702500
消費(fèi)者2........接收到消息:【hello, message_13】21:06:02.090725900
消費(fèi)者2........接收到消息:【hello, message_15】21:06:02.293060600
消費(fèi)者2........接收到消息:【hello, message_17】21:06:02.493748
消費(fèi)者2........接收到消息:【hello, message_19】21:06:02.696635100
消費(fèi)者2........接收到消息:【hello, message_21】21:06:02.896809700
消費(fèi)者2........接收到消息:【hello, message_23】21:06:03.099533400
消費(fèi)者2........接收到消息:【hello, message_25】21:06:03.301446400
消費(fèi)者2........接收到消息:【hello, message_27】21:06:03.504999100
消費(fèi)者2........接收到消息:【hello, message_29】21:06:03.705702500
消費(fèi)者2........接收到消息:【hello, message_31】21:06:03.906601200
消費(fèi)者2........接收到消息:【hello, message_33】21:06:04.108118500
消費(fèi)者2........接收到消息:【hello, message_35】21:06:04.308945400
消費(fèi)者2........接收到消息:【hello, message_37】21:06:04.511547700
消費(fèi)者2........接收到消息:【hello, message_39】21:06:04.714038400
消費(fèi)者2........接收到消息:【hello, message_41】21:06:04.916192700
消費(fèi)者2........接收到消息:【hello, message_43】21:06:05.116286400
消費(fèi)者2........接收到消息:【hello, message_45】21:06:05.318055100
消費(fèi)者2........接收到消息:【hello, message_47】21:06:05.520656400
消費(fèi)者2........接收到消息:【hello, message_49】21:06:05.723106700

可以看到消費(fèi)者1和消費(fèi)者2竟然每人消費(fèi)了25條消息:

  • 消費(fèi)者1很快完成了自己的25條消息
  • 消費(fèi)者2卻在緩慢的處理自己的25條消息。

也就是說(shuō)消息是平均分配給每個(gè)消費(fèi)者,并沒(méi)有考慮到消費(fèi)者的處理能力。導(dǎo)致1個(gè)消費(fèi)者空閑,另一個(gè)消費(fèi)者忙的不可開(kāi)交。沒(méi)有充分利用每一個(gè)消費(fèi)者的能力,最終消息處理的耗時(shí)遠(yuǎn)遠(yuǎn)超過(guò)了1秒。這樣顯然是有問(wèn)題的。

能者多勞

在spring中有一個(gè)簡(jiǎn)單的配置,可以解決這個(gè)問(wèn)題。我們修改consumer服務(wù)的application.yml文件,添加配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個(gè)消息

再次測(cè)試,發(fā)現(xiàn)結(jié)果如下:

消費(fèi)者1接收到消息:【hello, message_0】21:12:51.659664200
消費(fèi)者2........接收到消息:【hello, message_1】21:12:51.680610
消費(fèi)者1接收到消息:【hello, message_2】21:12:51.703625
消費(fèi)者1接收到消息:【hello, message_3】21:12:51.724330100
消費(fèi)者1接收到消息:【hello, message_4】21:12:51.746651100
消費(fèi)者1接收到消息:【hello, message_5】21:12:51.768401400
消費(fèi)者1接收到消息:【hello, message_6】21:12:51.790511400
消費(fèi)者1接收到消息:【hello, message_7】21:12:51.812559800
消費(fèi)者1接收到消息:【hello, message_8】21:12:51.834500600
消費(fèi)者1接收到消息:【hello, message_9】21:12:51.857438800
消費(fèi)者1接收到消息:【hello, message_10】21:12:51.880379600
消費(fèi)者2........接收到消息:【hello, message_11】21:12:51.899327100
消費(fèi)者1接收到消息:【hello, message_12】21:12:51.922828400
消費(fèi)者1接收到消息:【hello, message_13】21:12:51.945617400
消費(fèi)者1接收到消息:【hello, message_14】21:12:51.968942500
消費(fèi)者1接收到消息:【hello, message_15】21:12:51.992215400
消費(fèi)者1接收到消息:【hello, message_16】21:12:52.013325600
消費(fèi)者1接收到消息:【hello, message_17】21:12:52.035687100
消費(fèi)者1接收到消息:【hello, message_18】21:12:52.058188
消費(fèi)者1接收到消息:【hello, message_19】21:12:52.081208400
消費(fèi)者2........接收到消息:【hello, message_20】21:12:52.103406200
消費(fèi)者1接收到消息:【hello, message_21】21:12:52.123827300
消費(fèi)者1接收到消息:【hello, message_22】21:12:52.146165100
消費(fèi)者1接收到消息:【hello, message_23】21:12:52.168828300
消費(fèi)者1接收到消息:【hello, message_24】21:12:52.191769500
消費(fèi)者1接收到消息:【hello, message_25】21:12:52.214839100
消費(fèi)者1接收到消息:【hello, message_26】21:12:52.238998700
消費(fèi)者1接收到消息:【hello, message_27】21:12:52.259772600
消費(fèi)者1接收到消息:【hello, message_28】21:12:52.284131800
消費(fèi)者2........接收到消息:【hello, message_29】21:12:52.306190600
消費(fèi)者1接收到消息:【hello, message_30】21:12:52.325315800
消費(fèi)者1接收到消息:【hello, message_31】21:12:52.347012500
消費(fèi)者1接收到消息:【hello, message_32】21:12:52.368508600
消費(fèi)者1接收到消息:【hello, message_33】21:12:52.391785100
消費(fèi)者1接收到消息:【hello, message_34】21:12:52.416383800
消費(fèi)者1接收到消息:【hello, message_35】21:12:52.439019
消費(fèi)者1接收到消息:【hello, message_36】21:12:52.461733900
消費(fèi)者1接收到消息:【hello, message_37】21:12:52.485990
消費(fèi)者1接收到消息:【hello, message_38】21:12:52.509219900
消費(fèi)者2........接收到消息:【hello, message_39】21:12:52.523683400
消費(fèi)者1接收到消息:【hello, message_40】21:12:52.547412100
消費(fèi)者1接收到消息:【hello, message_41】21:12:52.571191800
消費(fèi)者1接收到消息:【hello, message_42】21:12:52.593024600
消費(fèi)者1接收到消息:【hello, message_43】21:12:52.616731800
消費(fèi)者1接收到消息:【hello, message_44】21:12:52.640317
消費(fèi)者1接收到消息:【hello, message_45】21:12:52.663111100
消費(fèi)者1接收到消息:【hello, message_46】21:12:52.686727
消費(fèi)者1接收到消息:【hello, message_47】21:12:52.709266500
消費(fèi)者2........接收到消息:【hello, message_48】21:12:52.725884900
消費(fèi)者1接收到消息:【hello, message_49】21:12:52.746299900

可以發(fā)現(xiàn),由于消費(fèi)者1處理速度較快,所以處理了更多的消息;消費(fèi)者2處理速度較慢,只處理了6條消息。而最終總的執(zhí)行耗時(shí)也在1秒左右,大大提升。
正所謂能者多勞,這樣充分利用了每一個(gè)消費(fèi)者的處理能力,可以有效避免消息積壓?jiǎn)栴}。

總結(jié)

Work模型的使用:

  • 多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列,同一條消息只會(huì)被一個(gè)消費(fèi)者處理
  • 通過(guò)設(shè)置prefetch來(lái)控制消費(fèi)者預(yù)取的消息數(shù)量

交換機(jī)類型

在之前的兩個(gè)測(cè)試案例中,都沒(méi)有交換機(jī),生產(chǎn)者直接發(fā)送消息到隊(duì)列。而一旦引入交換機(jī),消息發(fā)送的模式會(huì)有很大變化:

可以看到,在訂閱模型中,多了一個(gè)exchange角色,而且過(guò)程略有變化:

  • Publisher:生產(chǎn)者,不再發(fā)送消息到隊(duì)列中,而是發(fā)給交換機(jī)
  • Exchange:交換機(jī),一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個(gè)特別隊(duì)列、遞交給所有隊(duì)列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。
  • Queue:消息隊(duì)列也與以前一樣,接收消息、緩存消息。不過(guò)隊(duì)列一定要與交換機(jī)綁定。
  • Consumer:消費(fèi)者,與以前一樣,訂閱隊(duì)列,沒(méi)有變化

Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力,因此如果沒(méi)有任何隊(duì)列與Exchange綁定,或者沒(méi)有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失!

交換機(jī)的類型有四種:

  • Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列。我們最早在控制臺(tái)使用的正是Fanout交換機(jī)
  • Direct:訂閱,基于RoutingKey(路由key)發(fā)送給訂閱了消息的隊(duì)列
  • Topic:通配符訂閱,與Direct類似,只不過(guò)RoutingKey可以使用通配符
  • Headers:頭匹配,基于MQ的消息頭匹配,用的較少。

這里主要講解前面的三種交換機(jī)模式。

Fanout交換機(jī)

Fanout,英文翻譯是扇出,我覺(jué)得在MQ中叫廣播更合適。

在廣播模式下,消息發(fā)送流程是這樣的:

  • 1)  可以有多個(gè)隊(duì)列
  • 2)  每個(gè)隊(duì)列都要綁定到Exchange(交換機(jī))
  • 3)  生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機(jī)
  • 4)  交換機(jī)把消息發(fā)送給綁定過(guò)的所有隊(duì)列
  • 5)  訂閱隊(duì)列的消費(fèi)者都能拿到消息

我們的計(jì)劃是這樣的:

  • 創(chuàng)建一個(gè)名為hmall.fanout的交換機(jī),類型是Fanout
  • 創(chuàng)建兩個(gè)隊(duì)列fanout.queue1fanout.queue2,綁定到交換機(jī)hmall.fanout
聲明隊(duì)列和交換機(jī)

在控制臺(tái)創(chuàng)建隊(duì)列fanout.queue1:

在創(chuàng)建一個(gè)隊(duì)列fanout.queue2

然后再創(chuàng)建一個(gè)交換機(jī):

然后綁定兩個(gè)隊(duì)列到交換機(jī):

消息發(fā)送

在publisher服務(wù)的SpringAmqpTest類中添加測(cè)試方法:

@Test
public void testFanoutExchange() {
    // 交換機(jī)名稱
    String exchangeName = "hmall.fanout";
    // 消息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}
消息接收

在consumer服務(wù)的SpringRabbitListener中添加兩個(gè)方法,作為消費(fèi)者:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消費(fèi)者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消費(fèi)者2接收到Fanout消息:【" + msg + "】");
}
總結(jié)

交換機(jī)的作用是什么?

  • 接收publisher發(fā)送的消息
  • 將消息按照規(guī)則路由到與之綁定的隊(duì)列
  • 不能緩存消息,路由失敗,消息丟失
  • FanoutExchange的會(huì)將消息路由到每個(gè)綁定的隊(duì)列

Direct交換機(jī)

在Fanout模式中,一條消息,會(huì)被所有訂閱的隊(duì)列都消費(fèi)。但是,在某些場(chǎng)景下,我們希望不同的消息被不同的隊(duì)列消費(fèi)。這時(shí)就要用到Direct類型的Exchange。

在Direct模型下:

  • 隊(duì)列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個(gè)RoutingKey(路由key)
  • 消息的發(fā)送方在 向 Exchange發(fā)送消息時(shí),也必須指定消息的 RoutingKey。
  • Exchange不再把消息交給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的Routing Key進(jìn)行判斷,只有隊(duì)列的Routingkey與消息的 Routing key完全一致,才會(huì)接收到消息

案例需求如圖

  1. 聲明一個(gè)名為hmall.direct的交換機(jī)
  2. 聲明隊(duì)列direct.queue1,綁定hmall.direct,bindingKeybludred
  3. 聲明隊(duì)列direct.queue2,綁定hmall.direct,bindingKeyyellowred
  4. consumer服務(wù)中,編寫兩個(gè)消費(fèi)者方法,分別監(jiān)聽(tīng)direct.queue1和direct.queue2
  5. 在publisher中編寫測(cè)試方法,向hmall.direct發(fā)送消息
聲明隊(duì)列和交換機(jī)

首先在控制臺(tái)聲明兩個(gè)隊(duì)列direct.queue1direct.queue2,這里不再展示過(guò)程:

然后聲明一個(gè)direct類型的交換機(jī),命名為hmall.direct:

然后使用redblue作為key,綁定direct.queue1hmall.direct

同理,使用redyellow作為key,綁定direct.queue2hmall.direct,步驟略,最終結(jié)果:

消息接收

在consumer服務(wù)的SpringRabbitListener中添加方法:

@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
    System.out.println("消費(fèi)者1接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
    System.out.println("消費(fèi)者2接收到direct.queue2的消息:【" + msg + "】");
}
消息發(fā)送

在publisher服務(wù)的SpringAmqpTest類中添加測(cè)試方法:

@Test
public void testSendDirectExchange() {
    // 交換機(jī)名稱
    String exchangeName = "hmall.direct";
    // 消息
    String message = "紅色警報(bào)!日本亂排核廢水,導(dǎo)致海洋生物變異,驚現(xiàn)哥斯拉!";
    // 發(fā)送消息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

由于使用的red這個(gè)key,所以兩個(gè)消費(fèi)者都收到了消息:

我們?cè)偾袚Q為blue這個(gè)key:

@Test
public void testSendDirectExchange() {
    // 交換機(jī)名稱
    String exchangeName = "hmall.direct";
    // 消息
    String message = "最新報(bào)道,哥斯拉是居民自治巨型氣球,虛驚一場(chǎng)!";
    // 發(fā)送消息
    rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}

你會(huì)發(fā)現(xiàn),只有消費(fèi)者1收到了消息:

總結(jié)

描述下Direct交換機(jī)與Fanout交換機(jī)的差異?

  • Fanout交換機(jī)將消息路由給每一個(gè)與之綁定的隊(duì)列
  • Direct交換機(jī)根據(jù)RoutingKey判斷路由給哪個(gè)隊(duì)列
  • 如果多個(gè)隊(duì)列具有相同的RoutingKey,則與Fanout功能類似

Topic交換機(jī)

說(shuō)明

Topic類型的ExchangeDirect相比,都是可以根據(jù)RoutingKey把消息路由到不同的隊(duì)列。
只不過(guò)Topic類型Exchange可以讓隊(duì)列在綁定BindingKey 的時(shí)候使用通配符!

BindingKey 一般都是有一個(gè)或多個(gè)單詞組成,多個(gè)單詞之間以.分割,例如: item.insert

通配符規(guī)則:

  • #:匹配一個(gè)或多個(gè)詞
  • *:匹配不多不少恰好1個(gè)詞

舉例:

  • item.#:能夠匹配item.spu.insert 或者 item.spu
  • item.*:只能匹配item.spu

圖示:

假如此時(shí)publisher發(fā)送的消息使用的RoutingKey共有四種:

  • china.news代表有中國(guó)的新聞消息;
  • china.weather 代表中國(guó)的天氣消息;
  • japan.news 則代表日本新聞
  • japan.weather 代表日本的天氣消息;

解釋:

  • topic.queue1:綁定的是china.# ,凡是以 china.開(kāi)頭的routing key 都會(huì)被匹配到,包括:
    • china.news
    • china.weather
  • topic.queue2:綁定的是#.news ,凡是以 .news結(jié)尾的 routing key 都會(huì)被匹配。包括:
    • china.news
    • japan.news

接下來(lái),我們就按照上圖所示,來(lái)演示一下Topic交換機(jī)的用法。
首先,在控制臺(tái)按照?qǐng)D示例子創(chuàng)建隊(duì)列、交換機(jī),并利用通配符綁定隊(duì)列和交換機(jī)。此處步驟略。最終結(jié)果如下:

消息發(fā)送

在publisher服務(wù)的SpringAmqpTest類中添加測(cè)試方法:

/**
 * topicExchange
 */
@Test
public void testSendTopicExchange() {
    // 交換機(jī)名稱
    String exchangeName = "hmall.topic";
    // 消息
    String message = "喜報(bào)!孫悟空大戰(zhàn)哥斯拉,勝!";
    // 發(fā)送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
消息接收

在consumer服務(wù)的SpringRabbitListener中添加方法:

@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){
    System.out.println("消費(fèi)者1接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){
    System.out.println("消費(fèi)者2接收到topic.queue2的消息:【" + msg + "】");
}
總結(jié)

描述下Direct交換機(jī)與Topic交換機(jī)的差異?

  • Topic交換機(jī)接收的消息RoutingKey必須是多個(gè)單詞,以 **.** 分割
  • Topic交換機(jī)與隊(duì)列綁定時(shí)的bindingKey可以指定通配符
  • #:代表0個(gè)或多個(gè)詞
  • *:代表1個(gè)詞

聲明隊(duì)列和交換機(jī)

在之前我們都是基于RabbitMQ控制臺(tái)來(lái)創(chuàng)建隊(duì)列、交換機(jī)。但是在實(shí)際開(kāi)發(fā)時(shí),隊(duì)列和交換機(jī)是程序員定義的,將來(lái)項(xiàng)目上線,又要交給運(yùn)維去創(chuàng)建。那么程序員就需要把程序中運(yùn)行的所有隊(duì)列和交換機(jī)都寫下來(lái),交給運(yùn)維。在這個(gè)過(guò)程中是很容易出現(xiàn)錯(cuò)誤的。
因此推薦的做法是由程序啟動(dòng)時(shí)檢查隊(duì)列和交換機(jī)是否存在,如果不存在自動(dòng)創(chuàng)建。

基本API

SpringAMQP提供了一個(gè)Queue類,用來(lái)創(chuàng)建隊(duì)列:

SpringAMQP還提供了一個(gè)Exchange接口,來(lái)表示所有不同類型的交換機(jī):

我們可以自己創(chuàng)建隊(duì)列和交換機(jī),不過(guò)SpringAMQP還提供了ExchangeBuilder來(lái)簡(jiǎn)化這個(gè)過(guò)程:

而在綁定隊(duì)列和交換機(jī)時(shí),則需要使用BindingBuilder來(lái)創(chuàng)建Binding對(duì)象:

fanout示例

在consumer中創(chuàng)建一個(gè)類,聲明隊(duì)列和交換機(jī):

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
    /**
     * 聲明交換機(jī)
     * @return Fanout類型交換機(jī)
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("hmall.fanout");
    }
    /**
     * 第1個(gè)隊(duì)列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
    /**
     * 綁定隊(duì)列和交換機(jī)
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    /**
     * 第2個(gè)隊(duì)列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }
    /**
     * 綁定隊(duì)列和交換機(jī)
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

direct示例

direct模式由于要綁定多個(gè)KEY,會(huì)非常麻煩,每一個(gè)Key都要編寫一個(gè)binding:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectConfig {
    /**
     * 聲明交換機(jī)
     * @return Direct類型交換機(jī)
     */
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange("hmall.direct").build();
    }
    /**
     * 第1個(gè)隊(duì)列
     */
    @Bean
    public Queue directQueue1(){
        return new Queue("direct.queue1");
    }
    /**
     * 綁定隊(duì)列和交換機(jī)
     */
    @Bean
    public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
    }
    /**
     * 綁定隊(duì)列和交換機(jī)
     */
    @Bean
    public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
    }
    /**
     * 第2個(gè)隊(duì)列
     */
    @Bean
    public Queue directQueue2(){
        return new Queue("direct.queue2");
    }
    /**
     * 綁定隊(duì)列和交換機(jī)
     */
    @Bean
    public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
    }
    /**
     * 綁定隊(duì)列和交換機(jī)
     */
    @Bean
    public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
        return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
    }
}

基于注解聲明

基于@Bean的方式聲明隊(duì)列和交換機(jī)比較麻煩,Spring還提供了基于注解方式來(lái)聲明。

例如,我們同樣聲明Direct模式的交換機(jī)和隊(duì)列:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消費(fèi)者1接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消費(fèi)者2接收到direct.queue2的消息:【" + msg + "】");
}

是不是簡(jiǎn)單多了。
再試試Topic模式:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
))
public void listenTopicQueue1(String msg){
    System.out.println("消費(fèi)者1接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
))
public void listenTopicQueue2(String msg){
    System.out.println("消費(fèi)者2接收到topic.queue2的消息:【" + msg + "】");
}

消息轉(zhuǎn)換器

Spring的消息發(fā)送代碼接收的消息體是一個(gè)Object:

而在數(shù)據(jù)傳輸時(shí),它會(huì)把你發(fā)送的消息序列化為字節(jié)發(fā)送給MQ,接收消息的時(shí)候,還會(huì)把字節(jié)反序列化為Java對(duì)象。
只不過(guò),默認(rèn)情況下Spring采用的序列化方式是JDK序列化。眾所周知,JDK序列化存在下列問(wèn)題:

  • 數(shù)據(jù)體積過(guò)大
  • 有安全漏洞
  • 可讀性差

我們來(lái)測(cè)試一下。

測(cè)試默認(rèn)轉(zhuǎn)換器

1)創(chuàng)建測(cè)試隊(duì)列
首先,我們?cè)赾onsumer服務(wù)中聲明一個(gè)新的配置類:

利用@Bean的方式創(chuàng)建一個(gè)隊(duì)列,具體代碼:

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MessageConfig {
    @Bean
    public Queue objectQueue() {
        return new Queue("object.queue");
    }
}

注意,這里我們先不要給這個(gè)隊(duì)列添加消費(fèi)者,我們要查看消息體的格式。

重啟consumer服務(wù)以后,該隊(duì)列就會(huì)被自動(dòng)創(chuàng)建出來(lái)了:

2)發(fā)送消息
我們?cè)趐ublisher模塊的SpringAmqpTest中新增一個(gè)消息發(fā)送的代碼,發(fā)送一個(gè)Map對(duì)象:

@Test
public void testSendMap() throws InterruptedException {
    // 準(zhǔn)備消息
    Map<String,Object> msg = new HashMap<>();
    msg.put("name", "柳巖");
    msg.put("age", 21);
    // 發(fā)送消息
    rabbitTemplate.convertAndSend("object.queue", msg);
}

發(fā)送消息后查看控制臺(tái):

可以看到消息格式非常不友好。

配置JSON轉(zhuǎn)換器

顯然,JDK序列化方式并不合適。我們希望消息體的體積更小、可讀性更高,因此可以使用JSON方式來(lái)做序列化和反序列化。

publisherconsumer兩個(gè)服務(wù)中都引入依賴:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

注意,如果項(xiàng)目中引入了spring-boot-starter-web依賴,則無(wú)需再次引入Jackson依賴。

配置消息轉(zhuǎn)換器,在publisherconsumer兩個(gè)服務(wù)的啟動(dòng)類中添加一個(gè)Bean即可:

@Bean
public MessageConverter messageConverter(){
    // 1.定義消息轉(zhuǎn)換器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自動(dòng)創(chuàng)建消息id,用于識(shí)別不同消息,也可以在業(yè)務(wù)中基于ID判斷是否是重復(fù)消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

消息轉(zhuǎn)換器中添加的messageId可以便于我們將來(lái)做冪等性判斷。

此時(shí),我們到MQ控制臺(tái)刪除object.queue中的舊的消息。然后再次執(zhí)行剛才的消息發(fā)送的代碼,到MQ的控制臺(tái)查看消息結(jié)構(gòu):

消費(fèi)者接收Object

我們?cè)赾onsumer服務(wù)中定義一個(gè)新的消費(fèi)者,publisher是用Map發(fā)送,那么消費(fèi)者也一定要用Map接收,格式如下:

@RabbitListener(queues = "object.queue")
public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {
    System.out.println("消費(fèi)者接收到object.queue消息:【" + msg + "】");
}

常用的 RabbitMQ 插件

RabbitMQ 支持許多插件,這些插件可以擴(kuò)展 RabbitMQ 的功能和特性。以下是一些常用的 RabbitMQ 插件:

  • Management Plugin: RabbitMQ 管理插件提供了一個(gè) Web 管理界面,用于監(jiān)控和管理 RabbitMQ 服務(wù)器??梢圆榭搓?duì)列、交換機(jī)、連接、通道等的狀態(tài),并進(jìn)行配置和操作。
  • Shovel Plugin: Shovel 插件用于將消息從一個(gè) RabbitMQ 服務(wù)器傳遞到另一個(gè) RabbitMQ 服務(wù)器,實(shí)現(xiàn)消息復(fù)制和跨集群通信。它可以用于實(shí)現(xiàn)數(shù)據(jù)復(fù)制、故障恢復(fù)、數(shù)據(jù)中心間同步等。
  • Federation Plugin: Federation 插件允許不同 RabbitMQ 集群之間建立聯(lián)合,實(shí)現(xiàn)消息的跨集群傳遞。這對(duì)于構(gòu)建分布式系統(tǒng)、將消息從一個(gè)地理位置傳遞到另一個(gè)地理位置非常有用。
  • STOMP Plugin: STOMP插件允許使用 STOMP 協(xié)議與 RabbitMQ 進(jìn)行通信。這對(duì)于使用非 AMQP 協(xié)議的客戶端與 RabbitMQ 交互非常有用,例如使用 WebSocket 的 Web 應(yīng)用程序。
  • Prometheus Plugin: Prometheus 插件用于將 RabbitMQ 的性能指標(biāo)導(dǎo)出到 Prometheus 監(jiān)控系統(tǒng),以便進(jìn)行性能監(jiān)控和警報(bào)。
  • Delayed Message Plugin: 延遲消息插件允許發(fā)布延遲交付的消息,使你能夠在稍后的時(shí)間點(diǎn)將消息傳遞給消費(fèi)者。這對(duì)于實(shí)現(xiàn)定時(shí)任務(wù)、延遲重試等場(chǎng)景非常有用。

轉(zhuǎn)自https://www.cnblogs.com/seven97-top/p/18860298


該文章在 2025/8/4 10:42:33 編輯過(guò)
關(guān)鍵字查詢
相關(guān)文章
正在查詢...
點(diǎn)晴ERP是一款針對(duì)中小制造業(yè)的專業(yè)生產(chǎn)管理軟件系統(tǒng),系統(tǒng)成熟度和易用性得到了國(guó)內(nèi)大量中小企業(yè)的青睞。
點(diǎn)晴PMS碼頭管理系統(tǒng)主要針對(duì)港口碼頭集裝箱與散貨日常運(yùn)作、調(diào)度、堆場(chǎng)、車隊(duì)、財(cái)務(wù)費(fèi)用、相關(guān)報(bào)表等業(yè)務(wù)管理,結(jié)合碼頭的業(yè)務(wù)特點(diǎn),圍繞調(diào)度、堆場(chǎng)作業(yè)而開(kāi)發(fā)的。集技術(shù)的先進(jìn)性、管理的有效性于一體,是物流碼頭及其他港口類企業(yè)的高效ERP管理信息系統(tǒng)。
點(diǎn)晴WMS倉(cāng)儲(chǔ)管理系統(tǒng)提供了貨物產(chǎn)品管理,銷售管理,采購(gòu)管理,倉(cāng)儲(chǔ)管理,倉(cāng)庫(kù)管理,保質(zhì)期管理,貨位管理,庫(kù)位管理,生產(chǎn)管理,WMS管理系統(tǒng),標(biāo)簽打印,條形碼,二維碼管理,批號(hào)管理軟件。
點(diǎn)晴免費(fèi)OA是一款軟件和通用服務(wù)都免費(fèi),不限功能、不限時(shí)間、不限用戶的免費(fèi)OA協(xié)同辦公管理系統(tǒng)。
Copyright 2010-2025 ClickSun All Rights Reserved

黄频国产免费高清视频,久久不卡精品中文字幕一区,激情五月天AV电影在线观看,欧美国产韩国日本一区二区
日本一区二区三区精品视频 | 在线日本精品a免费播放 | 五月天婷五月天综合网在线 | 中文字幕国产AV | 亚洲欧美在线看h片666 | 中文字幕一区二区三区乱码 |