The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

SmallRye响应式消息与AMQP 1.0入门

本指南演示了你的Quarkus应用程序如何使用SmallRye Reactive Messaging来与AMQP 1.0进行交互。

如果你想使用 RabbitMQ,你应该使用 SmallRye响应式消息RabbitMQ 扩展 。另外,如果你想使用带有 AMQP 1.0的RabbitMQ ,你需要在 RabbitMQ 代理中启用 AMQP 1.0 插件;请查看 连接到 RabbitMQ 文档。

先决条件

完成这个指南,你需要:

  • 大概15分钟

  • 编辑器

  • JDK 17+ installed with JAVA_HOME configured appropriately

  • Apache Maven 3.9.6

  • Docker and Docker Compose or Podman, and Docker Compose

  • 如果你愿意的话,还可以选择使用Quarkus CLI

  • 如果你想构建原生可执行程序,可以选择安装Mandrel或者GraalVM,并正确配置(或者使用Docker在容器中进行构建)

架构

在本指南中,我们将开发两个与AMQP代理进行通信的应用程序。我们将使用 Artemis ,但你可以使用任何AMQP 1.0代理。第一个应用程序向AMQP队列发送一个 quote request ,并消费 quote queue 中的消息。第二个应用程序接收 quote request , 并发送一个 quote back。

Architecture

第一个应用程序 producer ,将让用户通过一个HTTP端点请求一些报价。对于每一个报价请求,都会生成一个随机标识符,并返回给用户,以便将报价请求放在 pending 上。同时,生成的请求ID通过 quote-requests 队列被发送。

Producer App UI

反过来,第二个应用程序 processor ,将读取 quote-requests 队列,将一个随机价格放入报价,并将其发送给一个名为 quotes 的队列。

最后, producer 将读取报价,并使用服务器发送事件将其发送给浏览器。因此,用户将实时看到报价从 pending 更新为收到的价格。

解决方案

我们建议你按照下面章节中的说明,一步一步地创建应用程序。但是,你可以直接转到已完成的示例。

克隆 Git 仓库。 git clone https://github.com/quarkusio/quarkus-quickstarts.git ,或者下载一个 存档

The solution is located in the amqp-quickstart directory.

创建Maven项目

首先,我们需要创建两个项目: producerprocessor

要创建 producer 项目,请在终端中运行:

CLI
quarkus create app org.acme:amqp-quickstart-producer \
    --extension='resteasy-reactive-jackson,smallrye-reactive-messaging-amqp' \
    --no-code

创建Grade项目,请添加 --gradle 或者 --gradle-kotlin-dsl 参数。

For more information about how to install and use the Quarkus CLI, see the Quarkus CLI guide.

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:3.8.3:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=amqp-quickstart-producer \
    -Dextensions='resteasy-reactive-jackson,smallrye-reactive-messaging-amqp' \
    -DnoCode

创建Grade项目,请添加 -DbuildTool=gradle 或者 -DbuildTool=gradle-kotlin-dsl 参数。

For Windows users:

  • If using cmd, (don’t use backward slash \ and put everything on the same line)

  • If using Powershell, wrap -D parameters in double quotes e.g. "-DprojectArtifactId=amqp-quickstart-producer"

这个命令会创建项目结构,并选择我们将要使用的两个Quarkus扩展:

  1. RESTEasy Reactive和其Jackson支持来处理JSON有效载荷

  2. 响应式消息AMQP连接器

要创建 processor 项目,请在同一目录下运行:

CLI
quarkus create app org.acme:amqp-quickstart-processor \
    --extension='smallrye-reactive-messaging-amqp' \
    --no-code

创建Grade项目,请添加 --gradle 或者 --gradle-kotlin-dsl 参数。

For more information about how to install and use the Quarkus CLI, see the Quarkus CLI guide.

Maven
mvn io.quarkus.platform:quarkus-maven-plugin:3.8.3:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=amqp-quickstart-processor \
    -Dextensions='smallrye-reactive-messaging-amqp' \
    -DnoCode

创建Grade项目,请添加 -DbuildTool=gradle 或者 -DbuildTool=gradle-kotlin-dsl 参数。

For Windows users:

  • If using cmd, (don’t use backward slash \ and put everything on the same line)

  • If using Powershell, wrap -D parameters in double quotes e.g. "-DprojectArtifactId=amqp-quickstart-processor"

此时,你应该有如下的结构:

.
├── amqp-quickstart-processor
│  ├── README.md
│  ├── mvnw
│  ├── mvnw.cmd
│  ├── pom.xml
│  └── src
│     └── main
│        ├── docker
│        ├── java
│        └── resources
│           └── application.properties
└── amqp-quickstart-producer
   ├── README.md
   ├── mvnw
   ├── mvnw.cmd
   ├── pom.xml
   └── src
      └── main
         ├── docker
         ├── java
         └── resources
            └── application.properties

在你喜欢的IDE中打开这两个项目。

Quote对象

Quote 类将用在 producerprocessor 项目中。为了简单起见,我们将重复这个类。在两个项目中,创建 src/main/java/org/acme/amqp/model/Quote.java 文件,其内容如下:

package org.acme.amqp.model;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class Quote {

    public String id;
    public int price;

    /**
    * Default constructor required for Jackson serializer
    */
    public Quote() { }

    public Quote(String id, int price) {
        this.id = id;
        this.price = price;
    }

    @Override
    public String toString() {
        return "Quote{" +
                "id='" + id + '\'' +
                ", price=" + price +
                '}';
    }
}

Quote 对象的JSON表示将被用在发送给AMQP队列的消息中,也被用在发送给浏览器客户端的服务器发送事件中。

Quarkus有内置的功能来处理JSON AMQP消息。

@RegisterForReflection

@RegisterForReflection 注释指示Quarkus在构建本地可执行文件时包含类(包括字段和方法)。这在以后我们在容器中作为本地可执行文件运行应用程序时将会很有用。如果没有这个注释,本地编译会在死代码清除阶段删除这些字段和方法。

发送报价请求

producer 项目中找到生成的 src/main/java/org/acme/amqp/producer/QuotesResource.java 文件,并将其内容更新为:

package org.acme.amqp.producer;

import java.util.UUID;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import org.acme.amqp.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import io.smallrye.mutiny.Multi;

@Path("/quotes")
public class QuotesResource {

    @Channel("quote-requests") Emitter<String> quoteRequestEmitter; (1)

    /**
     * Endpoint to generate a new quote request id and send it to "quote-requests" AMQP queue using the emitter.
     */
    @POST
    @Path("/request")
    @Produces(MediaType.TEXT_PLAIN)
    public String createRequest() {
        UUID uuid = UUID.randomUUID();
        quoteRequestEmitter.send(uuid.toString()); (2)
        return uuid.toString();
    }
}
1 注入一个响应式消息 Emitter ,来向 quote-requests 通道发送消息。
2 在post请求中,生成一个随机的UUID,并使用emitter将其发送给AMQP队列。

quote-requests 通道将被作为一个AMQP队列来管理,因为这是classpath上唯一的连接器。如果没有另行说明,如本例中所示,Quarkus将使用通道名称作为AMQP队列名称。因此,在本例中,应用程序将消息发送给 quote-requests 队列。

当你有多个连接器时,你需要在应用程序配置中指出你想要使用哪个连接器。

处理报价请求

现在让我们来消费报价请求,并给出一个价格。在 processor 项目中,找到 src/main/java/org/acme/amqp/processor/QuoteProcessor.java 文件,并添加以下内容:

package org.acme.amqp.processor;

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.acme.amqp.model.Quote;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.annotations.Blocking;

/**
 * A bean consuming data from the "request" AMQP queue and giving out a random quote.
 * The result is pushed to the "quotes" AMQP queue.
 */
@ApplicationScoped
public class QuoteProcessor {

    private Random random = new Random();

    @Incoming("requests")       (1)
    @Outgoing("quotes")         (2)
    @Blocking                   (3)
    public Quote process(String quoteRequest) throws InterruptedException {
        // simulate some hard-working task
        Thread.sleep(200);
        return new Quote(quoteRequest, random.nextInt(100));
    }
}
1 表示该方法消费 requests 通道中的项
2 表示方法返回的对象被发送给 quotes 通道
3 表示该处理是 blocking ,不能在调用者线程上运行。

process 方法被 quote-requests 队列中的每一个AMQP消息调用,并将 Quote 对象发送给 quotes 队列。

因为我们想把 quotes-requests 队列中的消息消费到 requests 通道中,因此我们需要配置这个关联。请打开 src/main/resources/application.properties 文件并添加:

mp.messaging.incoming.requests.address=quote-requests

The configuration properties are structured as follows:

mp.messaging.[outgoing|incoming].{channel-name}.property=value

在我们的情形中,我们想配置 address 属性来表示队列的名称。

接收报价

回到我们的 producer 项目。让我们修改 QuotesResource 来消费报价,将其绑定到一个HTTP端点,来向客户端发送事件:

import io.smallrye.mutiny.Multi;
//...

@Channel("quotes") Multi<Quote> quotes;     (1)

/**
 * Endpoint retrieving the "quotes" queue and sending the items to a server sent event.
 */
@GET
@Produces(MediaType.SERVER_SENT_EVENTS) (2)
public Multi<Quote> stream() {
    return quotes; (3)
}
1 使用 @Channel 修饰符注入 quotes 通道
2 表示内容是使用 Server Sent Events 发送的
3 返回流 (Reactive Stream) 。

HTML页面

最后,HTML页面使用SSE读取转换后的价格。

producer 项目中创建 src/main/resources/META-INF/resources/quotes.html 文件,内容如下:

<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Quotes</title>

    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">
    <div class="card">
        <div class="card-body">
            <h2 class="card-title">Quotes</h2>
            <button class="btn btn-info" id="request-quote">Request Quote</button>
            <div class="quotes"></div>
        </div>
    </div>
</div>
</body>
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<script>
    $("#request-quote").click((event) => {
        fetch("/quotes/request", {method: "POST"})
        .then(res => res.text())
        .then(qid => {
            var row = $(`<h4 class='col-md-12' id='${qid}'>Quote # <i>${qid}</i> | <strong>Pending</strong></h4>`);
            $(".quotes").append(row);
        });
    });
    var source = new EventSource("/quotes");
    source.onmessage = (event) => {
      var json = JSON.parse(event.data);
      $(`#${json.id}`).html(function(index, html) {
        return html.replace("Pending", `\$\xA0${json.price}`);
      });
    };
</script>
</html>

这里没有什么特别之处。对于每一个收到的报价,它都会更新页面。

让其运行

你只需使用以下命令运行这两个应用程序:

> mvn -f amqp-quickstart-producer quarkus:dev

并且,在另一台终端中:

> mvn -f amqp-quickstart-processor quarkus:dev

Quarkus会自动启动一个AMQP代理,配置应用程序,并在不同的应用程序之间共享代理实例。要了解更多的细节,请参阅 AMQP的开发服务

在你的浏览器中打开 http://localhost:8080/quotes.html ,点击按钮来请求一些报价。

在JVM或本地模式下运行

当不在开发或测试模式下运行时,你需要启动你的AMQP代理。你可以按照 Apache ActiveMQ Artemis网站中的说明,或者使用以下内容创建一个 docker-compose.yaml 文件:

version: '2'

services:

  artemis:
    image: quay.io/artemiscloud/activemq-artemis-broker:1.0.25
    ports:
      - "8161:8161"
      - "61616:61616"
      - "5672:5672"
    environment:
      AMQ_USER: quarkus
      AMQ_PASSWORD: quarkus
    networks:
      - amqp-quickstart-network

  producer:
    image: quarkus-quickstarts/amqp-quickstart-producer:1.0-${QUARKUS_MODE:-jvm}
    build:
      context: amqp-quickstart-producer
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    environment:
      AMQP_HOST: artemis
      AMQP_PORT: 5672
    ports:
      - "8080:8080"
    networks:
      - amqp-quickstart-network

  processor:
    image: quarkus-quickstarts/amqp-quickstart-processor:1.0-${QUARKUS_MODE:-jvm}
    build:
      context: amqp-quickstart-processor
      dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm}
    environment:
      AMQP_HOST: artemis
      AMQP_PORT: 5672
    networks:
      - amqp-quickstart-network

networks:
  amqp-quickstart-network:
    name: amqp-quickstart

注意AMQP代理的位置是如何配置的。 amqp.hostamqp.port ( AMQP_HOSTAMQP_PORT 环境变量) 属性配置位置。

首先,确保你停止了应用程序,并在JVM模式下使用以下命令构建这两个应用程序:

> mvn -f amqp-quickstart-producer clean package
> mvn -f amqp-quickstart-processor clean package

一旦打包完成,请运行 docker compose up --build 。UI在 http://localhost:8080/quotes.html

要以本地方式运行你的应用程序,我们首先需要构建本地可执行文件:

> mvn -f amqp-quickstart-producer package -Dnative  -Dquarkus.native.container-build=true
> mvn -f amqp-quickstart-processor package -Dnative -Dquarkus.native.container-build=true

-Dquarkus.native.container-build=true 指示Quarkus构建可以在容器中运行的64位Linux本地可执行文件。然后,使用以下命令运行系统:

> export QUARKUS_MODE=native
> docker compose up --build

与之前一样,UI也是在 http://localhost:8080/quotes.html

更进一步

本指南展示了如何使用Quarkus与AMQP 1.0进行交互。它使用 SmallRye响应式消息来构建数据流应用程序。

如果你学过Kafka快速入门,你会意识到这是相同的代码。唯一的区别是连接器配置和JSON映射。

Related content