The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Reactive Messaging RabbitMQ Connector Reference Documentation

This guide is the companion from the Getting Started with RabbitMQ. It explains in more details the configuration and usage of the RabbitMQ connector for reactive messaging.

本文档并没有涵盖连接器的所有细节。请参考 SmallRye 响应式消息网站 ,来了解更多细节。

The RabbitMQ connector allows Quarkus applications to send and receive messages using the AMQP 0.9.1 protocol. More details about the protocol can be found in the AMQP 0.9.1 specification.

The RabbitMQ connector supports AMQP 0-9-1, which is very different from the AMQP 1.0 protocol used by the AMQP 1.0 connector. You can use the AMQP 1.0 connector with RabbitMQ as described in the AMQP 1.0 connector reference, albeit with reduced functionality.

这项技术被认为是preview。

preview(预览) 中,不保证向后兼容和在生态系统中的存在。具体的改进可能需要改变配置或API,并且正在计划变得 稳定 。欢迎在我们的 邮件列表 中提供反馈,或在我们的 GitHub问题列表 中提出问题。

For a full list of possible statuses, check our FAQ entry.

RabbitMQ connector extension

To use the connector, you need to add the quarkus-messaging-rabbitmq extension.

你可以用以下方法将扩展添加到你的项目中:

> ./mvnw quarkus:add-extensions -Dextensions="quarkus-messaging-rabbitmq"

或者只需在你的项目中添加以下依赖项:

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-rabbitmq</artifactId>
</dependency>

Once added to your project, you can map channels to RabbitMQ exchanges or queues by configuring the connector attribute:

# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-rabbitmq

# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-rabbitmq

outgoing channels are mapped to RabbitMQ exchanges and incoming channels are mapped to RabbitMQ queues as required by the broker.

Configuring the RabbitMQ Broker access

The RabbitMQ connector connects to RabbitMQ brokers. To configure the location and credentials of the broker, add the following properties in the application.properties:

rabbitmq-host=amqp (1)
rabbitmq-port=5672 (2)
rabbitmq-username=my-username (3)
rabbitmq-password=my-password (4)

mp.messaging.incoming.prices.connector=smallrye-rabbitmq (5)
1 Configures the broker host name. You can do it per channel (using the host attribute) or globally using rabbitmq-host
2 Configures the broker port. You can do it per channel (using the port attribute) or globally using rabbitmq-port. The default is 5672.
3 Configures the broker username if required. You can do it per channel (using the username attribute) or globally using rabbitmq-username.
4 Configures the broker password if required. You can do it per channel (using the password attribute) or globally using rabbitmq-password.
5 Instructs the prices channel to be managed by the RabbitMQ connector

In dev mode and when running tests, Dev Services for RabbitMQ automatically starts a RabbitMQ broker.

Receiving RabbitMQ messages

让我们想象一下你的应用程序接收 Message<Double> 。你可以直接消费该有效载荷:

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class RabbitMQPriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

或者,你可以检索Message<Double>。

package inbound;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class RabbitMQPriceMessageConsumer {

    @Incoming("prices")
    public CompletionStage<Void> consume(Message<Double> price) {
        // process your price.

        // Acknowledge the incoming message, marking the RabbitMQ message as `accepted`.
        return price.ack();
    }

}

入站元数据

Messages coming from RabbitMQ contain an instance of IncomingRabbitMQMetadata in the metadata.

Optional<IncomingRabbitMQMetadata> metadata = incoming.getMetadata(IncomingRabbitMQMetadata.class);
metadata.ifPresent(meta -> {
    final Optional<String> contentEncoding = meta.getContentEncoding();
    final Optional<String> contentType = meta.getContentType();
    final Optional<String> correlationId = meta.getCorrelationId();
    final Optional<ZonedDateTime> creationTime = meta.getCreationTime(ZoneId.systemDefault());
    final Optional<Integer> priority = meta.getPriority();
    final Optional<String> replyTo = meta.getReplyTo();
    final Optional<String> userId = meta.getUserId();

    // Access a single String-valued header
    final Optional<String> stringHeader = meta.getHeader("my-header", String.class);

    // Access all headers
    final Map<String,Object> headers = meta.getHeaders();
    // ...
});

反序列化

The connector converts incoming RabbitMQ Messages into Reactive Messaging Message<T> instances. The payload type T depends on the value of the RabbitMQ received message Envelope content_type and content_encoding properties.

content_encoding content_type T

Value present

n/a

byte[]

No value

text/plain

String

No value

application/json

a JSON element which can be a JsonArray, JsonObject, String, …​etc if the buffer contains an array, object, string, …​etc

No value

Anything else

byte[]

If you send objects with this RabbitMQ connector (outbound connector), they are encoded as JSON and sent with content_type set to application/json. You can receive this payload using (Vert.x) JSON Objects, and then map it to the object class you want:

@ApplicationScoped
public static class Generator {

    @Outgoing("to-rabbitmq")
    public Multi<Price> prices() { (1)
        AtomicInteger count = new AtomicInteger();
        return Multi.createFrom().ticks().every(Duration.ofMillis(1000))
                .map(l -> new Price().setPrice(count.incrementAndGet()))
                .onOverflow().drop();
    }

}

@ApplicationScoped
public static class Consumer {

    List<Price> prices = new CopyOnWriteArrayList<>();

    @Incoming("from-rabbitmq")
    public void consume(JsonObject p) { (2)
        Price price = p.mapTo(Price.class); (3)
        prices.add(price);
    }

    public List<Price> list() {
        return prices;
    }
}
1 Price 实例被连接器自动编码为JSON。
2 你可以使用 JsonObject 接收它
3 然后,你可以使用 mapTo 方法重新构建该实例
mapTo 方法使用Quarkus Jackson 映射器。请查看 本指南 以了解更多关于映射器的配置。

消息确认

When a Reactive Messaging Message associated with a RabbitMQ Message is acknowledged, it informs the broker that the message has been accepted.

Whether you need to explicitly acknowledge the message depends on the auto-acknowledgement setting for the channel; if that is set to true then your message will be automatically acknowledged on receipt.

失败管理

If a message produced from a RabbitMQ message is nacked, a failure strategy is applied. The RabbitMQ connector supports three strategies, controlled by the failure-strategy channel setting:

  • fail - fail the application; no more RabbitMQ messages will be processed. The RabbitMQ message is marked as rejected.

  • accept - this strategy marks the RabbitMQ message as accepted. The processing continues ignoring the failure.

  • reject - this strategy marks the RabbitMQ message as rejected (default). The processing continues with the next message.

Sending RabbitMQ messages

序列化

When sending a Message<T>, the connector converts the message into a RabbitMQ Message. The payload is converted to the RabbitMQ Message body.

T RabbitMQ Message Body

primitive types or UUID/String

String value with content_type set to text/plain

JsonObjectJsonArray

Serialized String payload with content_type set to application/json

io.vertx.mutiny.core.buffer.Buffer

Binary content, with content_type set to application/octet-stream

byte[]

Binary content, with content_type set to application/octet-stream

任何其他类

The payload is converted to JSON (using a Json Mapper) then serialized with content_type set to application/json

如果消息的有效载荷不能被序列化为JSON,那么该消息是 nacked

出站元数据

When sending Messages, you can add an instance of OutgoingRabbitMQMetadata to influence how the message is handled by RabbitMQ. For example, you can configure the routing key, timestamp and headers:

final OutgoingRabbitMQMetadata metadata = new OutgoingRabbitMQMetadata.Builder()
        .withHeader("my-header", "xyzzy")
        .withRoutingKey("urgent")
        .withTimestamp(ZonedDateTime.now())
        .build();

// Add `metadata` to the metadata of the outgoing message.
return Message.of("Hello", Metadata.of(metadata));

消息确认

By default, the Reactive Messaging Message is acknowledged when the broker acknowledges the message.

Configuring the RabbitMQ Exchange/Queue

You can configure the RabbitMQ exchange or queue associated with a channel using properties on the channel configuration. incoming channels are mapped to RabbitMQ queues and outgoing channels are mapped to RabbitMQ exchanges. For example:

mp.messaging.incoming.prices.connector=smallrye-rabbitmq
mp.messaging.incoming.prices.queue.name=my-queue

mp.messaging.outgoing.orders.connector=smallrye-rabbitmq
mp.messaging.outgoing.orders.exchange.name=my-order-queue

If the exchange.name or queue.name attribute is not set, the connector uses the channel name.

To use an existing queue, you need to configure the name and set the exchange’s or queue’s declare property to false. For example, if you have a RabbitMQ broker configured with a people exchange and queue, you need the following configuration:

mp.messaging.incoming.people.connector=smallrye-rabbitmq
mp.messaging.incoming.people.queue.name=people
mp.messaging.incoming.people.queue.declare=false

mp.messaging.outgoing.people.connector=smallrye-rabbitmq
mp.messaging.outgoing.people.exchange.name=people
mp.messaging.outgoing.people.exchange.declare=false

执行模型和阻塞处理

Reactive Messaging会在一个I/O线程中调用您的方法。关于这个话题的更多细节,请看 Quarkus Reactive Architecture documentation 。但是您可能需要经常将Reactive Messaging 与阻塞式处理相结合使用,比如与数据库通信。为此,您需要使用 @Blocking 注解来表该明处理是 阻塞的 ,并且不在调用者线程中运行。

例如,下面的代码演示了如何使用Hibernate与Panache将接收到的payload存储到数据库:

import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;

@ApplicationScoped
public class PriceStorage {

    @Incoming("prices")
    @Blocking
    @Transactional
    public void store(int priceInUsd) {
        Price price = new Price();
        price.value = priceInUsd;
        price.persist();
    }

}

有2种 @Blocking 注解:

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

它们效果相同。因此,您可以随意使用。第一个提供了更精细的配置,比如worker pool以及是否保留顺序。第二种,同其他的Quarkus Reactive功能类似,使用默认的worker pool并且保留了顺序。

@RunOnVirtualThread

For running the blocking processing on Java virtual threads, see the Quarkus Virtual Thread support with Reactive Messaging documentation.

Customizing the underlying RabbitMQ client

The connector uses the Vert.x RabbitMQ client underneath. More details about this client can be found in the Vert.x website.

You can customize the underlying client configuration by producing an instance of RabbitMQOptions as follows:

@Produces
@Identifier("my-named-options")
public RabbitMQOptions getNamedOptions() {
  PemKeyCertOptions keycert = new PemKeyCertOptions()
        .addCertPath("./tls/tls.crt")
        .addKeyPath("./tls/tls.key");
  PemTrustOptions trust = new PemTrustOptions().addCertPath("./tlc/ca.crt");
  // You can use the produced options to configure the TLS connection
  return new RabbitMQOptions()
        .setSsl(true)
        .setPemKeyCertOptions(keycert)
        .setPemTrustOptions(trust)
        .setUser("user1")
        .setPassword("password1")
        .setHost("localhost")
        .setPort(5672)
        .setVirtualHost("vhost1")
        .setConnectionTimeout(6000) // in milliseconds
        .setRequestedHeartbeat(60) // in seconds
        .setHandshakeTimeout(6000) // in milliseconds
        .setRequestedChannelMax(5)
        .setNetworkRecoveryInterval(500) // in milliseconds
        .setAutomaticRecoveryEnabled(true);
}

这个实例被检索,并用于配置连接器使用的客户端。你需要使用 client-options-name 属性指定客户端的名称。

mp.messaging.incoming.prices.client-options-name=my-named-options

健康报告

If you use the RabbitMQ connector with the quarkus-smallrye-health extension, it contributes to the readiness and liveness probes. The RabbitMQ connector reports the readiness and liveness of each channel managed by the connector.

To disable health reporting, set the health-enabled attribute for the channel to false.

On the inbound side (receiving messages from RabbitMQ), the check verifies that the receiver is connected to the broker.

On the outbound side (sending records to RabbitMQ), the check verifies that the sender is not disconnected from the broker; the sender may still be in an initialised state (connection not yet attempted), but this is regarded as live/ready.

Note that a message processing failures nacks the message, which is then handled by the failure-strategy. It’s the responsibility of the failure-strategy to report the failure and influence the outcome of the checks. The fail failure strategy reports the failure, and so the check will report the fault.

Dynamic Credentials

Quarkus and the RabbitMQ connector support Vault’s RabbitMQ secrets engine for generating short-lived dynamic credentials. This allows Vault to create and retire RabbitMQ credentials on a regular basis.

First we need to enable Vault’s rabbitmq secret engine, configure it with RabbitMQ’s connection and authentication information, and create a Vault role my-role (replace 10.0.0.3 by the actual host that is running the RabbitMQ container):

vault secrets enable rabbitmq

vault write rabbitmq/config/connection \
    connection_uri=http://10.0.0.3:15672 \
    username=guest \
    password=guest

vault write rabbitmq/roles/my-role \
    vhosts='{"/":{"write": ".*", "read": ".*"}}'

For this use case, user guest configured above needs to be a RabbitMQ admin user with the capability to create credentials.

Then we need to give a read capability to the Quarkus application on path rabbitmq/creds/my-role.

cat <<EOF | vault policy write vault-rabbitmq-policy -
path "secret/data/myapps/vault-rabbitmq-test/*" {
  capabilities = ["read"]
}
path "rabbitmq/creds/my-role" {
  capabilities = [ "read" ]
}
EOF

Now that Vault knows how to create users in RabbitMQ, we need to configure Quarkus to use a credentials-provider for RabbitMQ.

First we tell Quarkus to request dynamic credentials using a credentials-provider named rabbitmq.

quarkus.rabbitmq.credentials-provider = rabbitmq

Next we configure the rabbitmq credentials provider. The credentials-role option must be set to the name of the role we created in Vault, in our case my-role. The credentials-mount option must be set to rabbitmq.

quarkus.vault.credentials-provider.rabbitmq.credentials-role=my-role
quarkus.vault.credentials-provider.rabbitmq.credentials-mount=rabbitmq
The credentials-mount is used directly as the mount of the secret engine in Vault. Here we are using the default mount path of rabbitmq. If the RabbitMQ secret engine was mounted at a custom path, the credentials-mount option must be set to that path instead.

RabbitMQ Connector Configuration Reference

传入通道配置

属性 (别名) 描述 是否强制 默认

username

(rabbitmq-username)

用于对代理进行身份验证的用户名

类型: string

false

password

(rabbitmq-password)

用于对代理进行身份验证的密码

类型: string

false

host

(rabbitmq-host)

代理的主机名

类型: string

false

localhost

port

(rabbitmq-port)

代理端口

类型: int

false

5672

ssl

(rabbitmq-ssl)

Whether the connection should use SSL

Type: boolean

false

false

trust-all

(rabbitmq-trust-all)

Whether to skip trust certificate verification

Type: boolean

false

false

trust-store-path

(rabbitmq-trust-store-path)

The path to a JKS trust store

Type: string

false

trust-store-password

(rabbitmq-trust-store-password)

The password of the JKS trust store

Type: string

false

credentials-provider-name

(rabbitmq-credentials-provider-name)

The name of the RabbitMQ Credentials Provider bean used to provide dynamic credentials to the RabbitMQ client

Type: string

false

connection-timeout

The TCP connection timeout (ms); 0 is interpreted as no timeout

Type: int

false

60000

handshake-timeout

The AMQP 0-9-1 protocol handshake timeout (ms)

Type: int

false

10000

automatic-recovery-enabled

Whether automatic connection recovery is enabled

Type: boolean

false

false

automatic-recovery-on-initial-connection

Whether automatic recovery on initial connections is enabled

Type: boolean

false

true

reconnect-attempts

(rabbitmq-reconnect-attempts)

尝试重新连接的次数

类型: int

false

100

reconnect-interval

(rabbitmq-reconnect-interval)

The interval (in seconds) between two reconnection attempts

Type: int

false

10

network-recovery-interval

How long (ms) will automatic recovery wait before attempting to reconnect

Type: int

false

5000

user

The AMQP username to use when connecting to the broker

Type: string

false

guest

include-properties

Whether to include properties when a broker message is passed on the event bus

Type: boolean

false

false

requested-channel-max

The initially requested maximum channel number

Type: int

false

2047

requested-heartbeat

The initially requested heartbeat interval (seconds), zero for none

Type: int

false

60

use-nio

Whether usage of NIO Sockets is enabled

Type: boolean

false

false

virtual-host

(rabbitmq-virtual-host)

The virtual host to use when connecting to the broker

Type: string

false

/

exchange.name

The exchange that messages are published to or consumed from. If not set, the channel name is used. If set to "", the default exchange is used

Type: string

false

exchange.durable

Whether the exchange is durable

Type: boolean

false

true

exchange.auto-delete

Whether the exchange should be deleted after use

Type: boolean

false

false

exchange.type

The exchange type: direct, fanout, headers or topic (default)

Type: string

false

topic

exchange.declare

Whether to declare the exchange; set to false if the exchange is expected to be set up independently

Type: boolean

false

true

tracing.enabled

是否启用(默认启用)或禁用tracing

类型: boolean

false

true

tracing.attribute-headers

A comma-separated list of headers that should be recorded as span attributes. Relevant only if tracing.enabled=true

Type: string

false

``

queue.name

The queue from which messages are consumed.

Type: string

true

queue.durable

Whether the queue is durable

Type: boolean

false

true

queue.exclusive

Whether the queue is for exclusive use

Type: boolean

false

false

queue.auto-delete

Whether the queue should be deleted after use

Type: boolean

false

false

queue.declare

Whether to declare the queue and binding; set to false if these are expected to be set up independently

Type: boolean

false

true

queue.ttl

If specified, the time (ms) for which a message can remain in the queue undelivered before it is dead

Type: long

false

queue.single-active-consumer

If set to true, only one consumer can actively consume messages

Type: boolean

false

false

queue.x-queue-type

If automatically declare queue, we can choose different types of queue [quorum, classic, stream]

Type: string

false

classic

queue.x-queue-mode

If automatically declare queue, we can choose different modes of queue [lazy, default]

Type: string

false

default

max-incoming-internal-queue-size

The maximum size of the incoming internal queue

Type: int

false

connection-count

The number of RabbitMQ connections to create for consuming from this queue. This might be necessary to consume from a sharded queue with a single client.

Type: int

false

1

auto-bind-dlq

Whether to automatically declare the DLQ and bind it to the binder DLX

Type: boolean

false

false

dead-letter-queue-name

The name of the DLQ; if not supplied will default to the queue name with '.dlq' appended

Type: string

false

dead-letter-exchange

A DLX to assign to the queue. Relevant only if auto-bind-dlq is true

Type: string

false

DLX

dead-letter-exchange-type

The type of the DLX to assign to the queue. Relevant only if auto-bind-dlq is true

Type: string

false

direct

dead-letter-routing-key

A dead letter routing key to assign to the queue; if not supplied will default to the queue name

Type: string

false

dlx.declare

Whether to declare the dead letter exchange binding. Relevant only if auto-bind-dlq is true; set to false if these are expected to be set up independently

Type: boolean

false

false

dead-letter-queue-type

If automatically declare DLQ, we can choose different types of DLQ [quorum, classic, stream]

Type: string

false

classic

dead-letter-queue-mode

If automatically declare DLQ, we can choose different modes of DLQ [lazy, default]

Type: string

false

default

failure-strategy

The failure strategy to apply when a RabbitMQ message is nacked. Accepted values are fail, accept, reject (default)

Type: string

false

reject

broadcast

Whether the received RabbitMQ messages must be dispatched to multiple subscribers

Type: boolean

false

false

auto-acknowledgement

Whether the received RabbitMQ messages must be acknowledged when received; if true then delivery constitutes acknowledgement

Type: boolean

false

false

keep-most-recent

Whether to discard old messages instead of recent ones

Type: boolean

false

false

routing-keys

A comma-separated list of routing keys to bind the queue to the exchange

Type: string

false

#

content-type-override

Override the content_type attribute of the incoming message, should be a valid MIME type

Type: string

false

max-outstanding-messages

The maximum number of outstanding/unacknowledged messages being processed by the connector at a time; must be a positive number

Type: int

false

传出通道配置

属性 (别名) 描述 是否强制 默认

automatic-recovery-enabled

Whether automatic connection recovery is enabled

Type: boolean

false

false

automatic-recovery-on-initial-connection

Whether automatic recovery on initial connections is enabled

Type: boolean

false

true

connection-timeout

The TCP connection timeout (ms); 0 is interpreted as no timeout

Type: int

false

60000

default-routing-key

The default routing key to use when sending messages to the exchange

Type: string

false

``

default-ttl

If specified, the time (ms) sent messages can remain in queues undelivered before they are dead

Type: long

false

exchange.auto-delete

Whether the exchange should be deleted after use

Type: boolean

false

false

exchange.declare

Whether to declare the exchange; set to false if the exchange is expected to be set up independently

Type: boolean

false

true

exchange.durable

Whether the exchange is durable

Type: boolean

false

true

exchange.name

The exchange that messages are published to or consumed from. If not set, the channel name is used. If set to "", the default exchange is used

Type: string

false

exchange.type

The exchange type: direct, fanout, headers or topic (default)

Type: string

false

topic

handshake-timeout

The AMQP 0-9-1 protocol handshake timeout (ms)

Type: int

false

10000

host

(rabbitmq-host)

代理的主机名

类型: string

false

localhost

include-properties

Whether to include properties when a broker message is passed on the event bus

Type: boolean

false

false

max-inflight-messages

The maximum number of messages to be written to RabbitMQ concurrently; must be a positive number

Type: long

false

1024

max-outgoing-internal-queue-size

The maximum size of the outgoing internal queue

Type: int

false

network-recovery-interval

How long (ms) will automatic recovery wait before attempting to reconnect

Type: int

false

5000

password

(rabbitmq-password)

用于对代理进行身份验证的密码

类型: string

false

port

(rabbitmq-port)

代理端口

类型: int

false

5672

reconnect-attempts

(rabbitmq-reconnect-attempts)

尝试重新连接的次数

类型: int

false

100

reconnect-interval

(rabbitmq-reconnect-interval)

The interval (in seconds) between two reconnection attempts

Type: int

false

10

requested-channel-max

The initially requested maximum channel number

Type: int

false

2047

requested-heartbeat

The initially requested heartbeat interval (seconds), zero for none

Type: int

false

60

ssl

(rabbitmq-ssl)

Whether the connection should use SSL

Type: boolean

false

false

tracing.attribute-headers

A comma-separated list of headers that should be recorded as span attributes. Relevant only if tracing.enabled=true

Type: string

false

``

tracing.enabled

是否启用(默认启用)或禁用tracing

类型: boolean

false

true

trust-all

(rabbitmq-trust-all)

Whether to skip trust certificate verification

Type: boolean

false

false

trust-store-password

(rabbitmq-trust-store-password)

The password of the JKS trust store

Type: string

false

trust-store-path

(rabbitmq-trust-store-path)

The path to a JKS trust store

Type: string

false

credentials-provider-name

(rabbitmq-credentials-provider-name)

The name of the RabbitMQ Credentials Provider bean used to provide dynamic credentials to the RabbitMQ client

Type: string

false

use-nio

Whether usage of NIO Sockets is enabled

Type: boolean

false

false

user

The AMQP username to use when connecting to the broker

Type: string

false

guest

username

(rabbitmq-username)

用于对代理进行身份验证的用户名

类型: string

false

virtual-host

(rabbitmq-virtual-host)

The virtual host to use when connecting to the broker

Type: string

false

/

Related content