The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

将 HTTP 与响应式消息结合使用

本指南演示您的 Quarkus 应用程序如何利用 SmallRye Reactive Messaging 来使用和生成 HTTP 消息。

先决条件

完成这个指南,你需要:

  • 少于15分钟

  • 一个编辑器

  • JDK 11及以上版本,并已正确配置 JAVA_HOME

  • Apache Maven 3.8.1+

  • 如果你想以本地模式运行,请确保已经安装GraalVM、Docker或Podman。

架构

在本指南中我们将实现一项服务,名为 CostConverter ,它将以多种货币成本的方式消费 HTTP 消息,并将每个成本转换为其以欧元为单位的价值。

为了让用户轻松试用该服务,我们将实现一个 HTTP 资源来汇总成本 ( CostCollector ),以及一个简单的网页来添加新的成本并查看总和。

解决方案

我们建议您按照下一节的说明逐步创建应用程序。然而,您可以直接转到已完成的示例。

克隆 Git 仓库: git clone https://github.com/quarkusio/quarkus-quickstarts.git ,或下载一个 存档

该解决方案位于 reactive-messaging-http-quickstart 目录 中。

创建Maven项目

首先,我们需要一个新的项目。使用以下命令创建一个新的项目:

mvn io.quarkus.platform:quarkus-maven-plugin:2.11.2.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=reactive-messaging-http-quickstart \
    -Dextensions="reactive-messaging-http" \
    -DnoExamples
cd reactive-messaging-http-quickstart

该命令生成一个Maven项目,导入Reactive Messaging和HTTP 连接扩展。

转换器

创建 src/main/java/org/acme/reactivehttp/CostConverter.java 文件,内容如下:

package org.acme.reactivehttp;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.logging.Logger;

import javax.enterprise.context.ApplicationScoped;
import java.util.HashMap;
import java.util.Map;

/**
 * A bean consuming costs in multiple currencies and producing prices in EUR from them
 */
@ApplicationScoped
public class CostConverter {

    private static final Logger log = Logger.getLogger(CostConverter.class);

    private static final Map<String, Double> conversionRatios = new HashMap<>();

    static {
        conversionRatios.put("CHF", 0.93);
        conversionRatios.put("USD", 0.84);
        conversionRatios.put("PLN", 0.22);
        conversionRatios.put("EUR", 1.);
    }

    @Incoming("incoming-costs") (1)
    @Outgoing("outgoing-costs") (2)
    double convert(Cost cost) { (3)
        Double conversionRatio = conversionRatios.get(cost.getCurrency().toUpperCase());
        if (conversionRatio == null) {
            return 0.;
        }
        return conversionRatio * cost.getValue();
    }
}
1 incoming-costs 流中消费消息。
2 发送返回值到 outgoing-costs 流中。
3 消费一个具有 Cost 类型的有效载荷的事件,并生成一个 double 。在消费任意一个对象的情况下,reactive-messaging-http 扩展将尝试把请求体反序列化为一个JSON对象。

让我们定义 Cost 类:

package org.acme.reactivehttp;

public class Cost {
    private double value;
    private String currency;

    public double getValue() {
        return value;
    }

    public void setValue(double value) {
        this.value = value;
    }

    public String getCurrency() {
        return currency;
    }

    public void setCurrency(String currency) {
        this.currency = currency;
    }
}

下一步,我们将在 application.properties 文件中为这两个流创建配置。

配置HTTP连接器

我们需要配置HTTP连接器。这是在 application.properties 文件中完成的。 键的结构如下:

mp.messaging.[outgoing|incoming].{channel-name}.{property}=value

channel-name 片段必须与 @Incoming@Outgoing 注解中设定的值相匹配:

  • incoming-costs → 接收成本的来源

  • outgoing-costs → 接收转换成本的接收器

mp.messaging.outgoing.outgoing-costs.connector=quarkus-http

# 这里我们使用一个指向端点的URL
# 你可以使用 例如一个环境变量来改变它
mp.messaging.outgoing.outgoing-costs.url=http://localhost:${quarkus.http.port}/cost-collector

# 我们需要使用不同的端口进行测试:
%test.mp.messaging.outgoing.outgoing-costs.url=http://localhost:${quarkus.http.test-port}/cost-collector

# POST是默认的方式。 PUT方式也被支持
mp.messaging.outgoing.outgoing-costs.method=POST


mp.messaging.incoming.incoming-costs.connector=quarkus-http

# 传入成本通道将通过 `/costs` 路径上的端点提供
mp.messaging.incoming.incoming-costs.path=/costs

# POST是默认的方式。 PUT方式也被支持
mp.messaging.incoming.incoming-costs.method=POST

成本收集器(The CostCollector)

为了说明转换消息和传递消息是可行的,让我们添加一个端点来接收传出成本并将它们相加。 这是一个常见的 JAX-RS 端点。

package org.acme.reactivehttp;

import javax.enterprise.context.ApplicationScoped;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;

@Path("/cost-collector")
@ApplicationScoped
public class CostCollector {

    private double sum = 0;

    @POST
    public void consumeCost(String valueAsString) {
        sum += Double.parseDouble(valueAsString);
    }

    @GET
    public double getSum() {
        return sum;
    }

}

HTML页面

为了方便地与应用程序交互,让我们创建一个简单的网页。

该页面将提供一个表格用以添加成本,以及展示当前成本总和的信息。 该页面通过请求 /cost-collector 来定期更新当前成本的总和。

创建 src/main/resources/META-INF/resources/index.html 文件, 包含以下内容:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Costs</title>

    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
    <link rel="stylesheet" type="text/css"
          href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">

    <h2>Add a cost</h2>
    <div>
        <div>
            <label for="value">Value</label>
            <input type="text" id="value">
        </div>
        <div>
            <label for="currency">Currency</label>
            <select id="currency">
                <option value="CHF">Swiss franc</option>
                <option value="USD">United States dollar</option>
                <option value="CHF">Polish złoty</option>
            </select>
        </div>
        <input type="button" onclick="add()" value="Add">
    </div>


    <h2>Last cost</h2>
    <div class="row">
        <p class="col-md-12">The total cost is <strong><span id="content">N/A</span>&nbsp;&euro;</strong>.</p>
    </div>
</div>
</body>
<script>
    add = function() {
        var value = document.getElementById('value').value;
        var currency = document.getElementById('currency').value;

        var cost = {
            value: document.getElementById('value').value,
            currency: document.getElementById('currency').value
        };

        fetch('costs', { method: 'POST', body: JSON.stringify(cost) });
    }

    updateCost = function() {
        fetch('cost-collector').then(response => response.text()).then(sum =>
            document.getElementById('content').textContent=sum
        );
    }

    window.setInterval(updateCost, 500);
</script>
</html>

让它运行起来

使用以下命令运行应用程序:

./mvnw quarkus:dev

在您的浏览器中打开 http://localhost:8080/index.html

以本机可执行文件运行

您可以使用以下命令构建本机可执行文件:

./mvnw package -Pnative

进一步探索

HTTP 连接器选项

所有 quarkus-http 连接器选项:

# OUTGOING

# 目标 URL
mp.messaging.outgoing.<channelName>.url=http://localhost:8213

# 消息负载序列化器,可选的, `io.quarkus.reactivemessaging.http.runtime.serializers.Serializer` 的实现
mp.messaging.outgoing.<channelName>.serializer=com.example.MySerializer

# 尝试向远程端点发送请求的次数。不得小于零
# 默认为0
mp.messaging.outgoing.<channelName>.maxRetries=3

# 配置当使用回退且maxRetries > 0时使用的随机因子。 默认为0.5
mp.messaging.outgoing.<channelName>.jitter=0.3

# 配置尝试发送请求之间的回退延迟。
# 当发生多个故障时,随机因子(抖动)被使用来增加延迟。
mp.messaging.outgoing.<channelName>.delay=1s

#HTTP 请求方式 ( `POST` 或 `PUT` ),默认为 `POST`
mp.messaging.outgoing.<channelName>.method=PUT

#INCOMING
# HTTP 请求方式 ( `POST` 或 `PUT` ,默认为 `POST`
mp.messaging.incoming.<channelName>.method=POST

# 端点的路径
mp.messaging.incoming.<channelName>.path=/my-reactive-ws-endpoint

# 如果消费者无法跟上,HTTP 端点会缓冲消息。 此设置指定缓冲区的大小。
# 默认为 8
mp.messaging.incoming.<channelName>.buffer-size=13

响应式消息

此扩展利用 SmallRye 响应式消息来构建数据流应用程序。

如果您想进一步了解文档 SmallRye Reactive Messaging, 在Quarkus中使用的实现。