将 HTTP 与响应式消息结合使用
本指南演示您的 Quarkus 应用程序如何利用 SmallRye Reactive Messaging 来使用和生成 HTTP 消息。
先决条件
完成这个指南,你需要:
-
少于15分钟
-
一个编辑器
-
JDK 11及以上版本,并已正确配置
JAVA_HOME
-
Apache Maven 3.8.1+
-
如果你想以本地模式运行,请确保已经安装GraalVM、Docker或Podman。
架构
在本指南中我们将实现一项服务,名为 CostConverter
,它将以多种货币成本的方式消费 HTTP 消息,并将每个成本转换为其以欧元为单位的价值。
为了让用户轻松试用该服务,我们将实现一个 HTTP 资源来汇总成本 ( CostCollector
),以及一个简单的网页来添加新的成本并查看总和。
解决方案
我们建议您按照下一节的说明逐步创建应用程序。然而,您可以直接转到已完成的示例。
克隆 Git 仓库: git clone https://github.com/quarkusio/quarkus-quickstarts.git
,或下载一个 存档 。
该解决方案位于 reactive-messaging-http-quickstart
目录 中。
创建Maven项目
首先,我们需要一个新的项目。使用以下命令创建一个新的项目:
mvn io.quarkus.platform:quarkus-maven-plugin:2.11.2.Final:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=reactive-messaging-http-quickstart \
-Dextensions="reactive-messaging-http" \
-DnoExamples
cd reactive-messaging-http-quickstart
该命令生成一个Maven项目,导入Reactive Messaging和HTTP 连接扩展。
转换器
创建 src/main/java/org/acme/reactivehttp/CostConverter.java
文件,内容如下:
package org.acme.reactivehttp;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.logging.Logger;
import javax.enterprise.context.ApplicationScoped;
import java.util.HashMap;
import java.util.Map;
/**
* A bean consuming costs in multiple currencies and producing prices in EUR from them
*/
@ApplicationScoped
public class CostConverter {
private static final Logger log = Logger.getLogger(CostConverter.class);
private static final Map<String, Double> conversionRatios = new HashMap<>();
static {
conversionRatios.put("CHF", 0.93);
conversionRatios.put("USD", 0.84);
conversionRatios.put("PLN", 0.22);
conversionRatios.put("EUR", 1.);
}
@Incoming("incoming-costs") (1)
@Outgoing("outgoing-costs") (2)
double convert(Cost cost) { (3)
Double conversionRatio = conversionRatios.get(cost.getCurrency().toUpperCase());
if (conversionRatio == null) {
return 0.;
}
return conversionRatio * cost.getValue();
}
}
1 | 从 incoming-costs 流中消费消息。 |
2 | 发送返回值到 outgoing-costs 流中。 |
3 | 消费一个具有 Cost 类型的有效载荷的事件,并生成一个 double 。在消费任意一个对象的情况下,reactive-messaging-http 扩展将尝试把请求体反序列化为一个JSON对象。 |
让我们定义 Cost
类:
package org.acme.reactivehttp;
public class Cost {
private double value;
private String currency;
public double getValue() {
return value;
}
public void setValue(double value) {
this.value = value;
}
public String getCurrency() {
return currency;
}
public void setCurrency(String currency) {
this.currency = currency;
}
}
下一步,我们将在 application.properties
文件中为这两个流创建配置。
配置HTTP连接器
我们需要配置HTTP连接器。这是在 application.properties
文件中完成的。 键的结构如下:
mp.messaging.[outgoing|incoming].{channel-name}.{property}=value
channel-name
片段必须与 @Incoming
和 @Outgoing
注解中设定的值相匹配:
-
incoming-costs
→ 接收成本的来源 -
outgoing-costs
→ 接收转换成本的接收器
mp.messaging.outgoing.outgoing-costs.connector=quarkus-http
# 这里我们使用一个指向端点的URL
# 你可以使用 例如一个环境变量来改变它
mp.messaging.outgoing.outgoing-costs.url=http://localhost:${quarkus.http.port}/cost-collector
# 我们需要使用不同的端口进行测试:
%test.mp.messaging.outgoing.outgoing-costs.url=http://localhost:${quarkus.http.test-port}/cost-collector
# POST是默认的方式。 PUT方式也被支持
mp.messaging.outgoing.outgoing-costs.method=POST
mp.messaging.incoming.incoming-costs.connector=quarkus-http
# 传入成本通道将通过 `/costs` 路径上的端点提供
mp.messaging.incoming.incoming-costs.path=/costs
# POST是默认的方式。 PUT方式也被支持
mp.messaging.incoming.incoming-costs.method=POST
成本收集器(The CostCollector)
为了说明转换消息和传递消息是可行的,让我们添加一个端点来接收传出成本并将它们相加。 这是一个常见的 JAX-RS 端点。
package org.acme.reactivehttp;
import javax.enterprise.context.ApplicationScoped;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
@Path("/cost-collector")
@ApplicationScoped
public class CostCollector {
private double sum = 0;
@POST
public void consumeCost(String valueAsString) {
sum += Double.parseDouble(valueAsString);
}
@GET
public double getSum() {
return sum;
}
}
HTML页面
为了方便地与应用程序交互,让我们创建一个简单的网页。
该页面将提供一个表格用以添加成本,以及展示当前成本总和的信息。 该页面通过请求 /cost-collector
来定期更新当前成本的总和。
创建 src/main/resources/META-INF/resources/index.html
文件, 包含以下内容:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Costs</title>
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
<link rel="stylesheet" type="text/css"
href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">
</head>
<body>
<div class="container">
<h2>Add a cost</h2>
<div>
<div>
<label for="value">Value</label>
<input type="text" id="value">
</div>
<div>
<label for="currency">Currency</label>
<select id="currency">
<option value="CHF">Swiss franc</option>
<option value="USD">United States dollar</option>
<option value="CHF">Polish złoty</option>
</select>
</div>
<input type="button" onclick="add()" value="Add">
</div>
<h2>Last cost</h2>
<div class="row">
<p class="col-md-12">The total cost is <strong><span id="content">N/A</span> €</strong>.</p>
</div>
</div>
</body>
<script>
add = function() {
var value = document.getElementById('value').value;
var currency = document.getElementById('currency').value;
var cost = {
value: document.getElementById('value').value,
currency: document.getElementById('currency').value
};
fetch('costs', { method: 'POST', body: JSON.stringify(cost) });
}
updateCost = function() {
fetch('cost-collector').then(response => response.text()).then(sum =>
document.getElementById('content').textContent=sum
);
}
window.setInterval(updateCost, 500);
</script>
</html>
进一步探索
HTTP 连接器选项
所有 quarkus-http
连接器选项:
# OUTGOING
# 目标 URL
mp.messaging.outgoing.<channelName>.url=http://localhost:8213
# 消息负载序列化器,可选的, `io.quarkus.reactivemessaging.http.runtime.serializers.Serializer` 的实现
mp.messaging.outgoing.<channelName>.serializer=com.example.MySerializer
# 尝试向远程端点发送请求的次数。不得小于零
# 默认为0
mp.messaging.outgoing.<channelName>.maxRetries=3
# 配置当使用回退且maxRetries > 0时使用的随机因子。 默认为0.5
mp.messaging.outgoing.<channelName>.jitter=0.3
# 配置尝试发送请求之间的回退延迟。
# 当发生多个故障时,随机因子(抖动)被使用来增加延迟。
mp.messaging.outgoing.<channelName>.delay=1s
#HTTP 请求方式 ( `POST` 或 `PUT` ),默认为 `POST`
mp.messaging.outgoing.<channelName>.method=PUT
#INCOMING
# HTTP 请求方式 ( `POST` 或 `PUT` ,默认为 `POST`
mp.messaging.incoming.<channelName>.method=POST
# 端点的路径
mp.messaging.incoming.<channelName>.path=/my-reactive-ws-endpoint
# 如果消费者无法跟上,HTTP 端点会缓冲消息。 此设置指定缓冲区的大小。
# 默认为 8
mp.messaging.incoming.<channelName>.buffer-size=13