响应式消息AMQP 1.0连接器参考文档
本指南是 AMQP 1.0入门的配套指南。它更详细地解释了响应式消息AMQP连接器的配置和使用。
本文档并没有涵盖连接器的所有细节。请参考 SmallRye 响应式消息网站 ,来了解更多细节。 |
The AMQP connector allows Quarkus applications to send and receive messages using the AMQP 1.0 protocol. More details about the protocol can be found in the AMQP 1.0 specification. It’s important to note that AMQP 1.0 and AMQP 0.9.1 (implemented by RabbitMQ) are incompatible. Check Using RabbitMQ to get more details.
AMQP连接器扩展
To use the connector, you need to add the quarkus-messaging-amqp
extension.
你可以用以下方法将扩展添加到你的项目中:
quarkus extension add quarkus-messaging-amqp
./mvnw quarkus:add-extension -Dextensions='quarkus-messaging-amqp'
./gradlew addExtension --extensions='quarkus-messaging-amqp'
或者只需在你的项目中添加以下依赖项:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-amqp</artifactId>
</dependency>
implementation("io.quarkus:quarkus-messaging-amqp")
一旦添加到你的项目中,你就可以通过配置 connector
属性将 channels 映射到AMQP地址:
# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-amqp
# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-amqp
连接器自动连接
如果在你的classpath上有一个连接器,你可以省略 可以用以下方法禁用这种自动连接功能:
|
配置AMQP代理访问
AMQP连接器连接到AMQP 1.0代理,如Apache ActiveMQ或Artemis。要配置代理的位置和凭证,请在 application.properties
中添加以下属性:
amqp-host=amqp (1)
amqp-port=5672 (2)
amqp-username=my-username (3)
amqp-password=my-password (4)
mp.messaging.incoming.prices.connector=smallrye-amqp (5)
1 | 配置代理/路由器的主机名。你可以按通道进行配置(使用 host 属性),或使用 amqp-host 进行全局配置 |
2 | 配置代理/路由器的端口。你可以按通道(使用 port 属性)进行配置,或使用 amqp-port 进行全局配置。默认是 5672 。 |
3 | 如果需要,请配置代理/路由器的用户名。你可以按通道(使用 username 属性)进行配置,或使用 amqp-username 进行全局配置。 |
4 | 如果需要,请配置代理/路由器的密码。你可以按通道(使用 password 属性)进行配置,或使用 amqp-password 进行全局配置。 |
5 | 指示将由AMQP连接器管理的价格通道。 |
在开发模式下,当运行测试时,AMQP的开发服务 会自动启动一个AMQP代理。
接收AMQP消息
让我们想象一下你的应用程序接收 Message<Double>
。你可以直接消费该有效载荷:
package inbound;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class AmqpPriceConsumer {
@Incoming("prices")
public void consume(double price) {
// process your price.
}
}
或者,你可以检索Message<Double>。
package inbound;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;
@ApplicationScoped
public class AmqpPriceMessageConsumer {
@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> price) {
// process your price.
// Acknowledge the incoming message, marking the AMQP message as `accepted`.
return price.ack();
}
}
入站元数据
来自AMQP的消息在元数据中包含一个 IncomingAmqpMetadata
的实例。
Optional<IncomingAmqpMetadata> metadata = incoming.getMetadata(IncomingAmqpMetadata.class);
metadata.ifPresent(meta -> {
String address = meta.getAddress();
String subject = meta.getSubject();
boolean durable = meta.isDurable();
// Use io.vertx.core.json.JsonObject
JsonObject properties = meta.getProperties();
// ...
});
反序列化
连接器将传入的AMQP消息转换为响应式消息 Message<T>
实例。 T
取决于收到的AMQP消息的 body 。
The AMQP Type System defines the supported types.
AMQP主体类型 | <T> |
---|---|
AMQP Value containing a AMQP Primitive Type |
对应的Java类型 |
使用 |
|
AMQP 序列 |
|
AMQP数据(有二进制内容),并且 |
|
带有不同 |
|
如果你用这个AMQP连接器(出站连接器)发送对象,它会被编码为JSON,并以二进制形式发送。 content-type
被设置为 application/json
。因此,你可以按以下方式重建对象。
import io.vertx.core.json.JsonObject;
//
@ApplicationScoped
public static class Consumer {
List<Price> prices = new CopyOnWriteArrayList<>();
@Incoming("from-amqp") (1)
public void consume(JsonObject p) { (2)
Price price = p.mapTo(Price.class); (3)
prices.add(price);
}
public List<Price> list() {
return prices;
}
}
1 | Price 实例被连接器自动编码为JSON。 |
2 | 你可以使用 JsonObject 接收它 |
3 | 然后,你可以使用 mapTo 方法重新构建该实例 |
mapTo 方法使用Quarkus Jackson 映射器。请查看 本指南 以了解更多关于映射器的配置。
|
消息确认
When a Reactive Messaging Message associated with an AMQP Message is acknowledged, it informs the broker that the message has been accepted.
失败管理
如果一个由AMQP消息产生的消息是 nacked ,就会应用一个失败策略。AMQP连接器支持六种策略:
-
fail
- 使应用失败;不再处理AMQP消息(默认)。AMQP消息被标记为拒绝。 -
accept
- this strategy marks the AMQP message as accepted. The processing continues ignoring the failure. Refer to the accepted delivery state documentation. -
release
- this strategy marks the AMQP message as released. The processing continues with the next message. The broker can redeliver the message. Refer to the released delivery state documentation. -
reject
- this strategy marks the AMQP message as rejected. The processing continues with the next message. Refer to the rejected delivery state documentation. -
modified-failed
- this strategy marks the AMQP message as modified and indicates that it failed (with thedelivery-failed
attribute). The processing continues with the next message, but the broker may attempt to redeliver the message. Refer to the modified delivery state documentation -
modified-failed-undeliverable-here
- this strategy marks the AMQP message as modified and indicates that it failed (with thedelivery-failed
attribute). It also indicates that the application cannot process the message, meaning that the broker will not attempt to redeliver the message to this node. The processing continues with the next message. Refer to the modified delivery state documentation
发送AMQP消息
序列化
当发送一个 Message<T>
时,连接器将消息转换为AMQP消息。有效载荷被转换为AMQP消息 body 。
T |
AMQP消息主体 |
---|---|
原始类型或 |
带有有效载荷的AMQP值 |
|
使用相应的AMQP类型的AMQP值 |
使用二进制内容的AMQP数据。 |
|
|
使用二进制内容的AMQP数据。没有设置 |
任何其他类 |
有效载荷被转换为JSON(使用Json Mapper)。结果被包装成使用 二进制 内容的AMQP数据。 |
如果消息的有效载荷不能被序列化为JSON,那么该消息是 nacked 。
出站元数据
当发送 Messages
时,你可以添加一个 OutgoingAmqpMetadata
的实例来影响消息是如何被发送到AMQP的。例如,你可以配置主题、属性。
OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
.withDurable(true)
.withSubject("my-subject")
.build();
// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);
动态地址名称
有时,需要动态选择消息的目的地。在这种情况下,你不应该在你的应用程序配置文件中配置地址,而应该使用出站元数据来设置地址。
例如,你可以根据传入的消息发送至一个动态地址。
String addressName = selectAddressFromIncommingMessage(incoming);
OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
.withAddress(addressName)
.withDurable(true)
.build();
// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);
为了能够按消息设置地址,连接器使用了一个 anonymous sender 。 |
配置AMQP地址
你可以使用 address
属性配置AMQP地址:
mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.address=my-queue
mp.messaging.outgoing.orders.connector=smallrye-amqp
mp.messaging.outgoing.orders.address=my-order-queue
如果没有设置 address
属性,连接器将使用通道名称。
要使用一个现有的队列,你需要配置 address
, container-id
及可选的 link-name
属性。例如,如果你有一个Apache Artemis代理,使用以下方式配置:
<queues>
<queue name="people">
<address>people</address>
<durable>true</durable>
<user>artemis</user>
</queue>
</queues>
你需要以下配置:
mp.messaging.outgoing.people.connector=smallrye-amqp
mp.messaging.outgoing.people.durable=true
mp.messaging.outgoing.people.address=people
mp.messaging.outgoing.people.container-id=people
如果队列名称不是通道名称,你可能需要配置 link-name
属性:
mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=people
mp.messaging.outgoing.people-out.container-id=people
mp.messaging.outgoing.people-out.link-name=people
要使用 MULTICAST
队列,你需要提供 FQQN (完全限定队列名称),而不仅仅是队列的名称:
mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=foo
mp.messaging.outgoing.people-out.container-id=foo
mp.messaging.incoming.people-out.connector=smallrye-amqp
mp.messaging.incoming.people-out.durable=true
mp.messaging.incoming.people-out.address=foo::bar # Note the syntax: address-name::queue-name
mp.messaging.incoming.people-out.container-id=bar
mp.messaging.incoming.people-out.link-name=people
关于AMQP地址模型的更多详情,请参阅 Artemis文档 。
执行模型和阻塞处理
Reactive Messaging会在一个I/O线程中调用您的方法。关于这个话题的更多细节,请看 Quarkus Reactive Architecture documentation 。但是您可能需要经常将Reactive Messaging 与阻塞式处理相结合使用,比如与数据库通信。为此,您需要使用 @Blocking
注解来表该明处理是 阻塞的 ,并且不在调用者线程中运行。
例如,下面的代码说明了如何使用带有Panache的Hibernate来将传入的有效载荷存储到数据库:
import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;
@ApplicationScoped
public class PriceStorage {
@Incoming("prices")
@Transactional
public void store(int priceInUsd) {
Price price = new Price();
price.value = priceInUsd;
price.persist();
}
}
有2个
它们效果相同。因此,您可以随意使用。第一个提供了更精细的配置,比如worker pool以及是否保留顺序。第二种,同其他的Quarkus Reactive功能类似,使用默认的worker pool并且保留了顺序。 |
@RunOnVirtualThread
For running the blocking processing on Java virtual threads, see the Quarkus Virtual Thread support with Reactive Messaging documentation. |
@Transactional
如果你的方法被注释为 |
定制底层AMQP客户端
连接器在下面使用Vert.x AMQP客户端。关于这个客户端的更多详情,请参阅 Vert.x网站 。
你可以通过产生一个 AmqpClientOptions
的实例来定制底层的客户端配置,如下所示:
@Produces
@Identifier("my-named-options")
public AmqpClientOptions getNamedOptions() {
// You can use the produced options to configure the TLS connection
PemKeyCertOptions keycert = new PemKeyCertOptions()
.addCertPath("./tls/tls.crt")
.addKeyPath("./tls/tls.key");
PemTrustOptions trust = new PemTrustOptions().addCertPath("./tlc/ca.crt");
return new AmqpClientOptions()
.setSsl(true)
.setPemKeyCertOptions(keycert)
.setPemTrustOptions(trust)
.addEnabledSaslMechanism("EXTERNAL")
.setHostnameVerificationAlgorithm("") // Disables the hostname verification. Defaults is "HTTPS"
.setConnectTimeout(30000)
.setReconnectInterval(5000)
.setContainerId("my-container");
}
这个实例被检索,并用于配置连接器使用的客户端。你需要使用 client-options-name
属性指定客户端的名称。
mp.messaging.incoming.prices.client-options-name=my-named-options
If you experience frequent disconnections from the broker, the AmqpClientOptions
can also be used to set a heartbeat if you need to keep the AMQP connection permanently.
Some brokers might terminate the AMQP connection after a certain idle timeout.
You can provide a heartbeat value which will be used by the Vert.x proton client to advertise the idle timeout when opening transport to a remote peer.
@Produces
@Identifier("my-named-options")
public AmqpClientOptions getNamedOptions() {
// set a heartbeat of 30s (in milliseconds)
return new AmqpClientOptions()
.setHeartbeat(30000);
}
TLS Configuration
AMQP 1.0 Messaging extension integrates with the Quarkus TLS registry to configure the Vert.x AMQP client.
To configure the TLS for an AMQP 1.0 channel, you need to provide a named TLS configuration in the application.properties
:
quarkus.tls.your-tls-config.trust-store.pem.certs=ca.crt,ca2.pem
# ...
mp.messaging.incoming.prices.tls-configuration-name=your-tls-config
健康报告
如果你将AMQP连接器与 quarkus-smallrye-health
扩展一起使用,它将有助于就绪性和活跃度探测。AMQP连接器报告连接器所管理的每个通道的就绪性和活跃度。目前,AMQP连接器对就绪性和活跃度检查使用同样的逻辑。
要禁用健康报告,请将通道的 health-enabled
属性设为false。在入站端(接收来自AMQP的消息),检查会验证接收者是否连接到代理。在出站端(向AMQP发送记录),检查会验证发送者是否连接到代理。
请注意,消息处理失败会不认可消息,其然后由 failure-strategy
进行处理。报告失败并影响检查的结果是 failure-strategy
的责任。 fail
失败策略报告失败,因此检查将报告故障。
使用RabbitMQ
此连接器用于 AMQP 1.0. RabbitMQ 实现 AMQP 0.9.1。RabbitMQ 默认不提供 AMQP 1.0,但有一个插件。要将此连接器与RabbitMQ一起使用,请启用并配置 AMQP 1.0 插件。
尽管存在插件,但少数 AMQP 1.0 特性并不能与 RabbitMQ 一起工作。因此,我们建议采用以下配置。
要接收来自 RabbitMQ 的消息:
-
将durable设为false
mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.durable=false
要向 RabbitMQ 发送消息:
-
设置目的地地址(不支持匿名发送者)。
-
将
use-anonymous-sender
设为false
mp.messaging.outgoing.generated-price.connector=smallrye-amqp
mp.messaging.outgoing.generated-price.address=prices
mp.messaging.outgoing.generated-price.use-anonymous-sender=false
因此,在使用 RabbitMQ 时,不可能动态改变目的地(使用消息元数据)。
接收云事件
AMQP连接器配置参考
Quarkus的特定配置
Configuration property fixed at build time - All other configuration properties are overridable at runtime
Configuration property |
类型 |
默认 |
---|---|---|
类型 |
默认 |
|
If Dev Services for AMQP has been explicitly enabled or disabled. Dev Services are generally enabled by default, unless there is an existing configuration present. For AMQP, Dev Services starts a broker unless Environment variable: Show more |
boolean |
|
Optional fixed port the dev service will listen to. If not defined, the port will be chosen randomly. Environment variable: Show more |
int |
|
The image to use. Note that only ActiveMQ Artemis images are supported. Specifically, the image repository must end with Check the activemq-artemis-broker on Quay page to find the available versions. Environment variable: Show more |
string |
|
The value of the Environment variable: Show more |
string |
|
Indicates if the AMQP broker managed by Quarkus Dev Services is shared. When shared, Quarkus looks for running containers using label-based service discovery. If a matching container is found, it is used, and so a second one is not started. Otherwise, Dev Services for AMQP starts a new container. The discovery uses the Container sharing is only used in dev mode. Environment variable: Show more |
boolean |
|
The value of the This property is used when you need multiple shared AMQP brokers. Environment variable: Show more |
string |
|
Environment variables that are passed to the container. Environment variable: Show more |
Map<String,String> |
传入通道配置
Attribute (alias) | Description | Mandatory | Default |
---|---|---|---|
address |
AMQP地址。如果没有设置,则使用通道名称 类型: string |
false |
|
auto-acknowledgement |
收到的AMQP消息是否必须在收到时被确认 类型: boolean |
false |
|
broadcast |
收到的AMQP消息是否必须派发给多个 subscribers 类型: boolean |
false |
|
capabilities |
A comma-separated list of capabilities proposed by the sender or receiver client. Type: string |
false |
|
client-options-name (amqp-client-options-name) |
用于定制AMQP客户端配置的AMQP客户端选项bean的名称 类型: string |
false |
|
cloud-events |
启用(默认)或禁用云事件支持。如果在 incoming 通道上启用了,则连接器会分析传入记录并尝试创建云事件元数据。如果在 outgoing 通道上启用了,如果消息包括云事件元数据,则连接器会将传出消息作为云事件发送。 类型: boolean |
false |
|
connect-timeout (amqp-connect-timeout) |
连接超时以毫秒为单位 类型: int |
false |
|
container-id |
AMQP容器的id 类型: string |
false |
|
durable |
AMQP订阅是否是durable 类型: boolean |
false |
|
failure-strategy |
当从AMQP消息中产生的消息为不被认可时,指定要应用的失败策略。可接受的值是 类型: string |
false |
|
health-timeout |
等待的最大秒数,以确定用于就绪检查的与代理的连接是否仍在建立。在该阈值之后,检查被认为是失败的。 类型: int |
false |
|
host (amqp-host) |
代理的主机名 类型: string |
false |
|
link-name |
链接的名称。如果没有设置,则使用通道名称。 类型: string |
false |
|
password (amqp-password) |
用于对代理进行身份验证的密码 类型: string |
false |
|
port (amqp-port) |
代理端口 类型: int |
false |
|
reconnect-attempts (amqp-reconnect-attempts) |
尝试重新连接的次数 类型: int |
false |
|
reconnect-interval (amqp-reconnect-interval) |
两次尝试重新连接之间的间隔(秒) 类型: int |
false |
|
sni-server-name (amqp-sni-server-name) |
如果设置了,明确地覆盖用于TLS SNI服务器名称的主机名 类型: string |
false |
|
selector |
Sets a message selector. This attribute is used to define an Type: string |
false |
|
tracing-enabled |
是否启用(默认)或禁用了追踪 类型: boolean |
false |
|
use-ssl (amqp-use-ssl) |
AMQP连接是否使用了SSL/TLS 类型: boolean |
false |
|
username (amqp-username) |
用于对代理进行身份验证的用户名 类型: string |
false |
|
virtual-host (amqp-virtual-host) |
如果设置了,配置用于连接AMQP开放框架和TLS SNI服务器名称的主机名值(如果正在使用TLS) 类型: string |
false |
传出通道配置
Attribute (alias) | Description | Mandatory | Default |
---|---|---|---|
address |
AMQP地址。如果没有设置,则使用通道名称 类型: string |
false |
|
capabilities |
A comma-separated list of capabilities proposed by the sender or receiver client. Type: string |
false |
|
client-options-name (amqp-client-options-name) |
用于定制AMQP客户端配置的AMQP客户端选项bean的名称 类型: string |
false |
|
cloud-events |
启用(默认)或禁用云事件支持。如果在 incoming 通道上启用了,则连接器会分析传入记录并尝试创建云事件元数据。如果在 outgoing 通道上启用了,如果消息包括云事件元数据,则连接器会将传出消息作为云事件发送。 类型: boolean |
false |
|
cloud-events-data-content-type (cloud-events-default-data-content-type) |
配置传出的云事件的默认 类型: string |
false |
|
cloud-events-data-schema (cloud-events-default-data-schema) |
配置传出的云事件的默认 类型: string |
false |
|
cloud-events-insert-timestamp (cloud-events-default-timestamp) |
连接器是否应将 类型: boolean |
false |
|
cloud-events-mode |
云事件模式( 类型: string |
false |
|
cloud-events-source (cloud-events-default-source) |
配置传出的云事件的默认 类型: _string |
false |
|
cloud-events-subject (cloud-events-default-subject) |
配置传出的云事件的默认 类型: string |
false |
|
cloud-events-type (cloud-events-default-type) |
配置传出的云事件的默认 类型: string |
false |
|
connect-timeout (amqp-connect-timeout) |
连接超时以毫秒为单位 类型: int |
false |
|
container-id |
AMQP容器的id 类型: string |
false |
|
credit-retrieval-period |
两次尝试检索代理授予的信用额度之间的时间段(以毫秒为单位)。当发送者用完信用额度的时候使用这个时间。 类型: int |
false |
|
durable |
发送的AMQP消息是否被标记为durable 类型: boolean |
false |
|
health-timeout |
等待的最大秒数,以确定用于就绪检查的与代理的连接是否仍在建立。在该阈值之后,检查被认为是失败的。 类型: int |
false |
|
host (amqp-host) |
代理的主机名 类型: string |
false |
|
link-name |
链接的名称。如果没有设置,则使用通道名称。 类型: string |
false |
|
merge |
连接器是否应允许多个上游 类型: boolean |
false |
|
password (amqp-password) |
用于对代理进行身份验证的密码 类型: string |
false |
|
port (amqp-port) |
代理端口 类型: int |
false |
|
reconnect-attempts (amqp-reconnect-attempts) |
尝试重新连接的次数 类型: int |
false |
|
reconnect-interval (amqp-reconnect-interval) |
两次尝试重新连接之间的间隔(秒) 类型: int |
false |
|
sni-server-name (amqp-sni-server-name) |
如果设置了,明确地覆盖用于TLS SNI服务器名称的主机名 类型: string |
false |
|
tracing-enabled |
是否启用(默认)或禁用了追踪 类型: boolean |
false |
|
ttl |
发送的AMQP消息的生存时间。0表示禁用TTL 类型: long |
false |
|
use-anonymous-sender |
连接器是否应使用匿名发送者。如果代理支持,则默认值为 类型: boolean |
false |
|
use-ssl (amqp-use-ssl) |
AMQP连接是否使用了SSL/TLS 类型: boolean |
false |
|
username (amqp-username) |
用于对代理进行身份验证的用户名 类型: string |
false |
|
virtual-host (amqp-virtual-host) |
如果设置了,配置用于连接AMQP开放框架和TLS SNI服务器名称的主机名值(如果正在使用TLS) 类型: string |
false |
Conditionally configure channels
You can configure the channels using a specific profile. Thus, the channels are only configured (and added to the application) when the specified profile is enabled.
To achieve this, you need:
-
Prefix the
mp.messaging.[incoming|outgoing].$channel
entries with%my-profile
such as%my-profile.mp.messaging.[incoming|outgoing].$channel.key=value
-
Use the
@IfBuildProfile("my-profile")
on the CDI beans containing@Incoming(channel)
and@Outgoing(channel)
annotations that need only to be enabled when the profile is enabled.
Note that reactive messaging verifies that the graph is complete. So, when using such a conditional configuration, ensure the application works with and without the profile enabled.
Note that this approach can also be used to change the channel configuration based on a profile.