The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

响应式消息AMQP 1.0连接器参考文档

本指南是 AMQP 1.0入门的配套指南。它更详细地解释了响应式消息AMQP连接器的配置和使用。

本文档并没有涵盖连接器的所有细节。请参考 SmallRye 响应式消息网站 ,来了解更多细节。

The AMQP connector allows Quarkus applications to send and receive messages using the AMQP 1.0 protocol. More details about the protocol can be found in the AMQP 1.0 specification. It’s important to note that AMQP 1.0 and AMQP 0.9.1 (implemented by RabbitMQ) are incompatible. Check Using RabbitMQ to get more details.

AMQP连接器扩展

To use the connector, you need to add the quarkus-messaging-amqp extension.

你可以用以下方法将扩展添加到你的项目中:

CLI
quarkus extension add quarkus-messaging-amqp
Maven
./mvnw quarkus:add-extension -Dextensions='quarkus-messaging-amqp'
Gradle
./gradlew addExtension --extensions='quarkus-messaging-amqp'

或者只需在你的项目中添加以下依赖项:

pom.xml
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-amqp</artifactId>
</dependency>
build.gradle
implementation("io.quarkus:quarkus-messaging-amqp")

一旦添加到你的项目中,你就可以通过配置 connector 属性将 channels 映射到AMQP地址:

# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-amqp

# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-amqp
连接器自动连接

如果在你的classpath上有一个连接器,你可以省略 connector 属性配置。Quarkus会自动将 orphan 通道与classpath上找到的(唯一的)连接器联系起来。 Orphans 通道是没有下游消费者的传出通道或没有上游生产者的传入通道。

可以用以下方法禁用这种自动连接功能:

quarkus.messaging.auto-connector-attachment=false

配置AMQP代理访问

AMQP连接器连接到AMQP 1.0代理,如Apache ActiveMQ或Artemis。要配置代理的位置和凭证,请在 application.properties 中添加以下属性:

amqp-host=amqp (1)
amqp-port=5672 (2)
amqp-username=my-username (3)
amqp-password=my-password (4)

mp.messaging.incoming.prices.connector=smallrye-amqp (5)
1 配置代理/路由器的主机名。你可以按通道进行配置(使用 host 属性),或使用 amqp-host 进行全局配置
2 配置代理/路由器的端口。你可以按通道(使用 port 属性)进行配置,或使用 amqp-port 进行全局配置。默认是 5672
3 如果需要,请配置代理/路由器的用户名。你可以按通道(使用 username 属性)进行配置,或使用 amqp-username 进行全局配置。
4 如果需要,请配置代理/路由器的密码。你可以按通道(使用 password 属性)进行配置,或使用 amqp-password 进行全局配置。
5 指示将由AMQP连接器管理的价格通道。

在开发模式下,当运行测试时,AMQP的开发服务 会自动启动一个AMQP代理。

接收AMQP消息

让我们想象一下你的应用程序接收 Message<Double> 。你可以直接消费该有效载荷:

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class AmqpPriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

或者,你可以检索Message<Double>。

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class AmqpPriceMessageConsumer {

    @Incoming("prices")
    public CompletionStage<Void> consume(Message<Double> price) {
        // process your price.

        // Acknowledge the incoming message, marking the AMQP message as `accepted`.
        return price.ack();
    }

}

入站元数据

来自AMQP的消息在元数据中包含一个 IncomingAmqpMetadata 的实例。

Optional<IncomingAmqpMetadata> metadata = incoming.getMetadata(IncomingAmqpMetadata.class);
metadata.ifPresent(meta -> {
    String address = meta.getAddress();
    String subject = meta.getSubject();
    boolean durable = meta.isDurable();
    // Use io.vertx.core.json.JsonObject
    JsonObject properties = meta.getProperties();
    // ...
});

反序列化

连接器将传入的AMQP消息转换为响应式消息 Message<T> 实例。 T 取决于收到的AMQP消息的 body

The AMQP Type System defines the supported types.

AMQP主体类型 <T>

AMQP Value containing a AMQP Primitive Type

对应的Java类型

使用 Binary 类型的AMQP值

byte[]

AMQP 序列

List

AMQP数据(有二进制内容),并且 content-type 被设为 application/json

JsonObject

带有不同 content-type 的AMQP数据

byte[]

如果你用这个AMQP连接器(出站连接器)发送对象,它会被编码为JSON,并以二进制形式发送。 content-type 被设置为 application/json 。因此,你可以按以下方式重建对象。

import io.vertx.core.json.JsonObject;
//
@ApplicationScoped
public static class Consumer {

    List<Price> prices = new CopyOnWriteArrayList<>();

    @Incoming("from-amqp") (1)
    public void consume(JsonObject p) {   (2)
        Price price = p.mapTo(Price.class);  (3)
        prices.add(price);
    }

    public List<Price> list() {
        return prices;
    }
}
1 Price 实例被连接器自动编码为JSON。
2 你可以使用 JsonObject 接收它
3 然后,你可以使用 mapTo 方法重新构建该实例
mapTo 方法使用Quarkus Jackson 映射器。请查看 本指南 以了解更多关于映射器的配置。

消息确认

When a Reactive Messaging Message associated with an AMQP Message is acknowledged, it informs the broker that the message has been accepted.

失败管理

如果一个由AMQP消息产生的消息是 nacked ,就会应用一个失败策略。AMQP连接器支持六种策略:

  • fail - 使应用失败;不再处理AMQP消息(默认)。AMQP消息被标记为拒绝。

  • accept - this strategy marks the AMQP message as accepted. The processing continues ignoring the failure. Refer to the accepted delivery state documentation.

  • release - this strategy marks the AMQP message as released. The processing continues with the next message. The broker can redeliver the message. Refer to the released delivery state documentation.

  • reject - this strategy marks the AMQP message as rejected. The processing continues with the next message. Refer to the rejected delivery state documentation.

  • modified-failed - this strategy marks the AMQP message as modified and indicates that it failed (with the delivery-failed attribute). The processing continues with the next message, but the broker may attempt to redeliver the message. Refer to the modified delivery state documentation

  • modified-failed-undeliverable-here - this strategy marks the AMQP message as modified and indicates that it failed (with the delivery-failed attribute). It also indicates that the application cannot process the message, meaning that the broker will not attempt to redeliver the message to this node. The processing continues with the next message. Refer to the modified delivery state documentation

发送AMQP消息

序列化

当发送一个 Message<T> 时,连接器将消息转换为AMQP消息。有效载荷被转换为AMQP消息 body

T AMQP消息主体

原始类型或 String

带有有效载荷的AMQP值

InstantUUID

使用相应的AMQP类型的AMQP值

JsonObjectJsonArray

使用二进制内容的AMQP数据。 content-type 被设置为 application/json

io.vertx.mutiny.core.buffer.Buffer

使用二进制内容的AMQP数据。没有设置 content-type

任何其他类

有效载荷被转换为JSON(使用Json Mapper)。结果被包装成使用 二进制 内容的AMQP数据。 content-type 被设置为 application/json

如果消息的有效载荷不能被序列化为JSON,那么该消息是 nacked

出站元数据

当发送 Messages 时,你可以添加一个 OutgoingAmqpMetadata 的实例来影响消息是如何被发送到AMQP的。例如,你可以配置主题、属性。

 OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
    .withDurable(true)
    .withSubject("my-subject")
    .build();

// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);

动态地址名称

有时,需要动态选择消息的目的地。在这种情况下,你不应该在你的应用程序配置文件中配置地址,而应该使用出站元数据来设置地址。

例如,你可以根据传入的消息发送至一个动态地址。

String addressName = selectAddressFromIncommingMessage(incoming);
OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
    .withAddress(addressName)
    .withDurable(true)
    .build();

// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);
为了能够按消息设置地址,连接器使用了一个 anonymous sender

消息确认

默认情况下,当代理确认消息时,响应式消息 Message 也会被确认。当使用路由器时,这种确认可能不会被启用。在这种情况下,配置 auto-acknowledgement 属性,以便在消息被发送到路由器后立即确认。

如果一个AMQP消息被代理拒绝/释放/修改(或不能成功发送),那么该消息是不被认可的。

背压和信用

背压力是由AMQP credits 来处理的。出站连接器只请求允许的信用额度。当信用额度达到0时,它就会等待(以非阻塞方式),直到代理授予AMQP发送者更多的信用额度。

配置AMQP地址

你可以使用 address 属性配置AMQP地址:

mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.address=my-queue

mp.messaging.outgoing.orders.connector=smallrye-amqp
mp.messaging.outgoing.orders.address=my-order-queue

如果没有设置 address 属性,连接器将使用通道名称。

要使用一个现有的队列,你需要配置 address , container-id 及可选的 link-name 属性。例如,如果你有一个Apache Artemis代理,使用以下方式配置:

<queues>
    <queue name="people">
        <address>people</address>
        <durable>true</durable>
        <user>artemis</user>
    </queue>
</queues>

你需要以下配置:

mp.messaging.outgoing.people.connector=smallrye-amqp
mp.messaging.outgoing.people.durable=true
mp.messaging.outgoing.people.address=people
mp.messaging.outgoing.people.container-id=people

如果队列名称不是通道名称,你可能需要配置 link-name 属性:

mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=people
mp.messaging.outgoing.people-out.container-id=people
mp.messaging.outgoing.people-out.link-name=people

要使用 MULTICAST 队列,你需要提供 FQQN (完全限定队列名称),而不仅仅是队列的名称:

mp.messaging.outgoing.people-out.connector=smallrye-amqp
mp.messaging.outgoing.people-out.durable=true
mp.messaging.outgoing.people-out.address=foo
mp.messaging.outgoing.people-out.container-id=foo

mp.messaging.incoming.people-out.connector=smallrye-amqp
mp.messaging.incoming.people-out.durable=true
mp.messaging.incoming.people-out.address=foo::bar # Note the syntax: address-name::queue-name
mp.messaging.incoming.people-out.container-id=bar
mp.messaging.incoming.people-out.link-name=people

关于AMQP地址模型的更多详情,请参阅 Artemis文档

执行模型和阻塞处理

Reactive Messaging会在一个I/O线程中调用您的方法。关于这个话题的更多细节,请看 Quarkus Reactive Architecture documentation 。但是您可能需要经常将Reactive Messaging 与阻塞式处理相结合使用,比如与数据库通信。为此,您需要使用 @Blocking 注解来表该明处理是 阻塞的 ,并且不在调用者线程中运行。

例如,下面的代码说明了如何使用带有Panache的Hibernate来将传入的有效载荷存储到数据库:

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

@ApplicationScoped
public class PriceStorage {

    @Incoming("prices")
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();
    }

}

有2个 @Blocking 注释:

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

它们效果相同。因此,您可以随意使用。第一个提供了更精细的配置,比如worker pool以及是否保留顺序。第二种,同其他的Quarkus Reactive功能类似,使用默认的worker pool并且保留了顺序。

@RunOnVirtualThread

For running the blocking processing on Java virtual threads, see the Quarkus Virtual Thread support with Reactive Messaging documentation.

@Transactional

如果你的方法被注释为 @Transactional ,它将被自动视为 blocking ,即使该方法没有被注释为 @Blocking

定制底层AMQP客户端

连接器在下面使用Vert.x AMQP客户端。关于这个客户端的更多详情,请参阅 Vert.x网站

你可以通过产生一个 AmqpClientOptions 的实例来定制底层的客户端配置,如下所示:

@Produces
@Identifier("my-named-options")
public AmqpClientOptions getNamedOptions() {
  // You can use the produced options to configure the TLS connection
  PemKeyCertOptions keycert = new PemKeyCertOptions()
    .addCertPath("./tls/tls.crt")
    .addKeyPath("./tls/tls.key");
  PemTrustOptions trust = new PemTrustOptions().addCertPath("./tlc/ca.crt");
  return new AmqpClientOptions()
        .setSsl(true)
        .setPemKeyCertOptions(keycert)
        .setPemTrustOptions(trust)
        .addEnabledSaslMechanism("EXTERNAL")
        .setHostnameVerificationAlgorithm("") // Disables the hostname verification. Defaults is "HTTPS"
        .setConnectTimeout(30000)
        .setReconnectInterval(5000)
        .setContainerId("my-container");
}

这个实例被检索,并用于配置连接器使用的客户端。你需要使用 client-options-name 属性指定客户端的名称。

mp.messaging.incoming.prices.client-options-name=my-named-options

If you experience frequent disconnections from the broker, the AmqpClientOptions can also be used to set a heartbeat if you need to keep the AMQP connection permanently. Some brokers might terminate the AMQP connection after a certain idle timeout. You can provide a heartbeat value which will be used by the Vert.x proton client to advertise the idle timeout when opening transport to a remote peer.

@Produces
@Identifier("my-named-options")
public AmqpClientOptions getNamedOptions() {
  // set a heartbeat of 30s (in milliseconds)
  return new AmqpClientOptions()
        .setHeartbeat(30000);
}

TLS Configuration

AMQP 1.0 Messaging extension integrates with the Quarkus TLS registry to configure the Vert.x AMQP client.

To configure the TLS for an AMQP 1.0 channel, you need to provide a named TLS configuration in the application.properties:

quarkus.tls.your-tls-config.trust-store.pem.certs=ca.crt,ca2.pem
# ...
mp.messaging.incoming.prices.tls-configuration-name=your-tls-config

健康报告

如果你将AMQP连接器与 quarkus-smallrye-health 扩展一起使用,它将有助于就绪性和活跃度探测。AMQP连接器报告连接器所管理的每个通道的就绪性和活跃度。目前,AMQP连接器对就绪性和活跃度检查使用同样的逻辑。

要禁用健康报告,请将通道的 health-enabled 属性设为false。在入站端(接收来自AMQP的消息),检查会验证接收者是否连接到代理。在出站端(向AMQP发送记录),检查会验证发送者是否连接到代理。

请注意,消息处理失败会不认可消息,其然后由 failure-strategy 进行处理。报告失败并影响检查的结果是 failure-strategy 的责任。 fail 失败策略报告失败,因此检查将报告故障。

使用RabbitMQ

此连接器用于 AMQP 1.0. RabbitMQ 实现 AMQP 0.9.1。RabbitMQ 默认不提供 AMQP 1.0,但有一个插件。要将此连接器与RabbitMQ一起使用,请启用并配置 AMQP 1.0 插件。

尽管存在插件,但少数 AMQP 1.0 特性并不能与 RabbitMQ 一起工作。因此,我们建议采用以下配置。

要接收来自 RabbitMQ 的消息:

  • 将durable设为false

mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.durable=false

要向 RabbitMQ 发送消息:

  • 设置目的地地址(不支持匿名发送者)。

  • use-anonymous-sender 设为false

mp.messaging.outgoing.generated-price.connector=smallrye-amqp
mp.messaging.outgoing.generated-price.address=prices
mp.messaging.outgoing.generated-price.use-anonymous-sender=false

因此,在使用 RabbitMQ 时,不可能动态改变目的地(使用消息元数据)。

接收云事件

AMQP连接器支持 云事件 。当连接器检测到一个 structuredbinary 云事件时,它会向 Message 的元数据中添加一个 IncomingCloudEventMetadata<T>IncomingCloudEventMetadata 包含对强制和可选云事件属性的访问器。

如果连接器不能提取云事件元数据,它将发送没有元数据的消息。

关于接收云事件的更多信息,请参阅SmallRye响应式消息文档中的 接收云事件

发送云事件

AMQP连接器支持 云事件 。连接器会将出站记录作为云事件发送,如果:

  • 消息元数据包含一个 io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata 实例。

  • 通道配置定义了 cloud-events-typecloud-events-source 属性。

关于发送云事件的更多信息,请参阅SmallRye响应式消息文档中的 发送云事件

AMQP连接器配置参考

Quarkus的特定配置

Configuration property fixed at build time - All other configuration properties are overridable at runtime

Configuration property

类型

默认

Dev Services

类型

默认

If Dev Services for AMQP has been explicitly enabled or disabled. Dev Services are generally enabled by default, unless there is an existing configuration present. For AMQP, Dev Services starts a broker unless amqp-host or amqp-port are set or if all the Reactive Messaging AMQP channel are configured with host or port.

Environment variable: QUARKUS_AMQP_DEVSERVICES_ENABLED

Show more

boolean

Optional fixed port the dev service will listen to.

If not defined, the port will be chosen randomly.

Environment variable: QUARKUS_AMQP_DEVSERVICES_PORT

Show more

int

The image to use. Note that only ActiveMQ Artemis images are supported. Specifically, the image repository must end with artemiscloud/activemq-artemis-broker.

Check the activemq-artemis-broker on Quay page to find the available versions.

Environment variable: QUARKUS_AMQP_DEVSERVICES_IMAGE_NAME

Show more

string

quay.io/artemiscloud/activemq-artemis-broker:1.0.25

The value of the AMQ_EXTRA_ARGS environment variable to pass to the container. For ActiveMQ Artemis Broker ⇐ 1.0.21, set this property to --no-autotune --mapped --no-fsync --relax-jolokia --http-host 0.0.0.0

Environment variable: QUARKUS_AMQP_DEVSERVICES_EXTRA_ARGS

Show more

string

--no-autotune --mapped --no-fsync --relax-jolokia

Indicates if the AMQP broker managed by Quarkus Dev Services is shared. When shared, Quarkus looks for running containers using label-based service discovery. If a matching container is found, it is used, and so a second one is not started. Otherwise, Dev Services for AMQP starts a new container.

The discovery uses the quarkus-dev-service-amqp label. The value is configured using the service-name property.

Container sharing is only used in dev mode.

Environment variable: QUARKUS_AMQP_DEVSERVICES_SHARED

Show more

boolean

true

The value of the quarkus-dev-service-aqmp label attached to the started container. This property is used when shared is set to true. In this case, before starting a container, Dev Services for AMQP looks for a container with the quarkus-dev-service-amqp label set to the configured value. If found, it will use this container instead of starting a new one. Otherwise, it starts a new container with the quarkus-dev-service-amqp label set to the specified value.

This property is used when you need multiple shared AMQP brokers.

Environment variable: QUARKUS_AMQP_DEVSERVICES_SERVICE_NAME

Show more

string

amqp

Environment variables that are passed to the container.

Environment variable: QUARKUS_AMQP_DEVSERVICES_CONTAINER_ENV__ENVIRONMENT_VARIABLE_NAME_

Show more

Map<String,String>

传入通道配置

Attribute (alias) Description Mandatory Default

address

AMQP地址。如果没有设置,则使用通道名称

类型: string

false

auto-acknowledgement

收到的AMQP消息是否必须在收到时被确认 类型: boolean

false

false

broadcast

收到的AMQP消息是否必须派发给多个 subscribers

类型: boolean

false

false

capabilities

A comma-separated list of capabilities proposed by the sender or receiver client.

Type: string

false

client-options-name

(amqp-client-options-name)

用于定制AMQP客户端配置的AMQP客户端选项bean的名称

类型: string

false

cloud-events

启用(默认)或禁用云事件支持。如果在 incoming 通道上启用了,则连接器会分析传入记录并尝试创建云事件元数据。如果在 outgoing 通道上启用了,如果消息包括云事件元数据,则连接器会将传出消息作为云事件发送。

类型: boolean

false

true

connect-timeout

(amqp-connect-timeout)

连接超时以毫秒为单位

类型: int

false

1000

container-id

AMQP容器的id

类型: string

false

durable

AMQP订阅是否是durable

类型: boolean

false

false

failure-strategy

当从AMQP消息中产生的消息为不被认可时,指定要应用的失败策略。可接受的值是 fail (默认), acceptreleaserejectmodified-failedmodified-failed-undeliverable-here

类型: string

false

fail

health-timeout

等待的最大秒数,以确定用于就绪检查的与代理的连接是否仍在建立。在该阈值之后,检查被认为是失败的。

类型: int

false

3

host

(amqp-host)

代理的主机名

类型: string

false

localhost

link-name

链接的名称。如果没有设置,则使用通道名称。

类型: string

false

password

(amqp-password)

用于对代理进行身份验证的密码

类型: string

false

port

(amqp-port)

代理端口

类型: int

false

5672

reconnect-attempts

(amqp-reconnect-attempts)

尝试重新连接的次数

类型: int

false

100

reconnect-interval

(amqp-reconnect-interval)

两次尝试重新连接之间的间隔(秒)

类型: int

false

10

sni-server-name

(amqp-sni-server-name)

如果设置了,明确地覆盖用于TLS SNI服务器名称的主机名

类型: string

false

selector

Sets a message selector. This attribute is used to define an apache.org:selector-filter:string filter on the source terminus, using SQL-based syntax to request the server filters which messages are delivered to the receiver (if supported by the server in question). Precise functionality supported and syntax needed can vary depending on the server.

Type: string

false

tracing-enabled

是否启用(默认)或禁用了追踪

类型: boolean

false

true

use-ssl

(amqp-use-ssl)

AMQP连接是否使用了SSL/TLS

类型: boolean

false

false

username

(amqp-username)

用于对代理进行身份验证的用户名

类型: string

false

virtual-host

(amqp-virtual-host)

如果设置了,配置用于连接AMQP开放框架和TLS SNI服务器名称的主机名值(如果正在使用TLS)

类型: string

false

传出通道配置

Attribute (alias) Description Mandatory Default

address

AMQP地址。如果没有设置,则使用通道名称

类型: string

false

capabilities

A comma-separated list of capabilities proposed by the sender or receiver client.

Type: string

false

client-options-name

(amqp-client-options-name)

用于定制AMQP客户端配置的AMQP客户端选项bean的名称

类型: string

false

cloud-events

启用(默认)或禁用云事件支持。如果在 incoming 通道上启用了,则连接器会分析传入记录并尝试创建云事件元数据。如果在 outgoing 通道上启用了,如果消息包括云事件元数据,则连接器会将传出消息作为云事件发送。

类型: boolean

false

true

cloud-events-data-content-type

(cloud-events-default-data-content-type)

配置传出的云事件的默认 datacontenttype 属性。要求将 cloud-events 设为 true 。如果消息本身没有配置 datacontenttype 属性,则使用此值

类型: string

false

cloud-events-data-schema

(cloud-events-default-data-schema)

配置传出的云事件的默认 dataschema 属性。要求将 cloud-events 设为 true 。如果消息本身没有配置 dataschema 属性,则使用此值

类型: string

false

cloud-events-insert-timestamp

(cloud-events-default-timestamp)

连接器是否应将 time 属性自动插入到传出的云事件中。要求将 cloud-events 设为 true 。如果消息本身没有配置 time 属性,则使用此值

类型: boolean

false

true

cloud-events-mode

云事件模式( structuredbinary (默认))。表示如何在传出的记录中写入云事件

类型: string

false

binary

cloud-events-source

(cloud-events-default-source)

配置传出的云事件的默认 source 属性。要求将 cloud-events 设为 true 。如果消息本身没有配置 source 属性,则使用此值

类型: _string

false

cloud-events-subject

(cloud-events-default-subject)

配置传出的云事件的默认 subject 属性。要求将 cloud-events 设为 true 。如果消息本身没有配置 subject 属性,则使用此值

类型: string

false

cloud-events-type

(cloud-events-default-type)

配置传出的云事件的默认 type 属性。要求将 cloud-events 设为 true 。如果消息本身没有配置 type 属性,则使用此值

类型: string

false

connect-timeout

(amqp-connect-timeout)

连接超时以毫秒为单位

类型: int

false

1000

container-id

AMQP容器的id

类型: string

false

credit-retrieval-period

两次尝试检索代理授予的信用额度之间的时间段(以毫秒为单位)。当发送者用完信用额度的时候使用这个时间。

类型: int

false

2000

durable

发送的AMQP消息是否被标记为durable

类型: boolean

false

false

health-timeout

等待的最大秒数,以确定用于就绪检查的与代理的连接是否仍在建立。在该阈值之后,检查被认为是失败的。

类型: int

false

3

host

(amqp-host)

代理的主机名

类型: string

false

localhost

link-name

链接的名称。如果没有设置,则使用通道名称。

类型: string

false

merge

连接器是否应允许多个上游

类型: boolean

false

false

password

(amqp-password)

用于对代理进行身份验证的密码

类型: string

false

port

(amqp-port)

代理端口

类型: int

false

5672

reconnect-attempts

(amqp-reconnect-attempts)

尝试重新连接的次数

类型: int

false

100

reconnect-interval

(amqp-reconnect-interval)

两次尝试重新连接之间的间隔(秒)

类型: int

false

10

sni-server-name

(amqp-sni-server-name)

如果设置了,明确地覆盖用于TLS SNI服务器名称的主机名

类型: string

false

tracing-enabled

是否启用(默认)或禁用了追踪

类型: boolean

false

true

ttl

发送的AMQP消息的生存时间。0表示禁用TTL

类型: long

false

0

use-anonymous-sender

连接器是否应使用匿名发送者。如果代理支持,则默认值为 true ,否则为 false 。如果不支持,则无法动态更改目的地地址。

类型: boolean

false

use-ssl

(amqp-use-ssl)

AMQP连接是否使用了SSL/TLS

类型: boolean

false

false

username

(amqp-username)

用于对代理进行身份验证的用户名

类型: string

false

virtual-host

(amqp-virtual-host)

如果设置了,配置用于连接AMQP开放框架和TLS SNI服务器名称的主机名值(如果正在使用TLS)

类型: string

false

Conditionally configure channels

You can configure the channels using a specific profile. Thus, the channels are only configured (and added to the application) when the specified profile is enabled.

To achieve this, you need:

  1. Prefix the mp.messaging.[incoming|outgoing].$channel entries with %my-profile such as %my-profile.mp.messaging.[incoming|outgoing].$channel.key=value

  2. Use the @IfBuildProfile("my-profile") on the CDI beans containing @Incoming(channel) and @Outgoing(channel) annotations that need only to be enabled when the profile is enabled.

Note that reactive messaging verifies that the graph is complete. So, when using such a conditional configuration, ensure the application works with and without the profile enabled.

Note that this approach can also be used to change the channel configuration based on a profile.

Related content