The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Vert.x参考指南

Vert.x 是一个用于构建响应式应用程序的工具集。正如 Quarkus响应式架构 中所描述的,下面,Quarkus将使用Vert.x。

本指南是《 从Quarkus应用程序中使用Eclipse Vert.x API 》指南的补充。它提供了关于Quarkus使用的Vert.x实例的使用和配置的更多高级细节。

Access the Vert.x instance

To access the managed Vert.x instance, add the quarkus-vertx extension to your project. This dependency might already be available in your project (as a transitive dependency).

通过这个扩展,你可以使用字段或构造函数注入检索Vert.x的托管实例:

@ApplicationScoped
public class MyBean {

    // Field injection
    @Inject Vertx vertx;

    // Constructor injection
    MyBean(Vertx vertx) {
        // ...
    }

}

你也可以这样注入:

  • io.vertx.core.Vertx 暴露了 bare Vert.x API的实例

  • io.vertx.mutiny.core.Vertx 暴露 Mutiny API的实例

我们推荐使用Mutiny变体,因为它可以与Quarkus提供的其他响应式API集成。

Mutiny

如果你不熟悉Mutiny,请查看 Mutiny - 一个直观的响应式编程库

关于Vert.x Mutiny变体的文档可在 https://smallrye.io/smallrye-mutiny-vertx-bindings

Configure the Vert.x instance

你可以从 application.properties 文件中配置Vert.x实例。下表列出了支持的属性:

Configuration property fixed at build time - All other configuration properties are overridable at runtime

Configuration property

类型

默认

Enables or disables the Vert.x cache.

Environment variable: QUARKUS_VERTX_CACHING

Show more

boolean

true

Enables or disabled the Vert.x classpath resource resolver.

Environment variable: QUARKUS_VERTX_CLASSPATH_RESOLVING

Show more

boolean

true

The number of event loops. By default, it matches the number of CPUs detected on the system.

Environment variable: QUARKUS_VERTX_EVENT_LOOPS_POOL_SIZE

Show more

int

The maximum amount of time the event loop can be blocked.

Environment variable: QUARKUS_VERTX_MAX_EVENT_LOOP_EXECUTE_TIME

Show more

Duration

2S

The amount of time before a warning is displayed if the event loop is blocked.

Environment variable: QUARKUS_VERTX_WARNING_EXCEPTION_TIME

Show more

Duration

2S

The maximum amount of time the worker thread can be blocked.

Environment variable: QUARKUS_VERTX_MAX_WORKER_EXECUTE_TIME

Show more

Duration

60S

The size of the internal thread pool (used for the file system).

Environment variable: QUARKUS_VERTX_INTERNAL_BLOCKING_POOL_SIZE

Show more

int

20

The queue size. For most applications this should be unbounded

Environment variable: QUARKUS_VERTX_QUEUE_SIZE

Show more

int

The executor growth resistance. A resistance factor applied after the core pool is full; values applied here will cause that fraction of submissions to create new threads when no idle thread is available. A value of 0.0f implies that threads beyond the core size should be created as aggressively as threads within it; a value of 1.0f implies that threads beyond the core size should never be created.

Environment variable: QUARKUS_VERTX_GROWTH_RESISTANCE

Show more

float

0

The amount of time a thread will stay alive with no work.

Environment variable: QUARKUS_VERTX_KEEP_ALIVE_TIME

Show more

Duration

30S

Prefill thread pool when creating a new Executor. When io.vertx.core.spi.ExecutorServiceFactory#createExecutor is called, initialise with the number of defined threads at startup

Environment variable: QUARKUS_VERTX_PREFILL

Show more

boolean

false

Enables the async DNS resolver.

Environment variable: QUARKUS_VERTX_USE_ASYNC_DNS

Show more

boolean

false

PEM Key/cert config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PEM

Show more

boolean

false

Comma-separated list of the path to the key files (Pem format).

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PEM_KEYS

Show more

list of string

Comma-separated list of the path to the certificate files (Pem format).

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PEM_CERTS

Show more

list of string

JKS config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_JKS

Show more

boolean

false

Path of the key file (JKS format).

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_JKS_PATH

Show more

string

Password of the key file.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_JKS_PASSWORD

Show more

string

PFX config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PFX

Show more

boolean

false

Path to the key file (PFX format).

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PFX_PATH

Show more

string

Password of the key.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PFX_PASSWORD

Show more

string

PEM Trust config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PEM

Show more

boolean

false

Comma-separated list of the trust certificate files (Pem format).

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PEM_CERTS

Show more

list of string

JKS config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_JKS

Show more

boolean

false

Path of the key file (JKS format).

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_JKS_PATH

Show more

string

Password of the key file.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_JKS_PASSWORD

Show more

string

PFX config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PFX

Show more

boolean

false

Path to the key file (PFX format).

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PFX_PATH

Show more

string

Password of the key.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PFX_PASSWORD

Show more

string

The accept backlog.

Environment variable: QUARKUS_VERTX_EVENTBUS_ACCEPT_BACKLOG

Show more

int

The client authentication.

Environment variable: QUARKUS_VERTX_EVENTBUS_CLIENT_AUTH

Show more

string

NONE

The connect timeout.

Environment variable: QUARKUS_VERTX_EVENTBUS_CONNECT_TIMEOUT

Show more

Duration

60S

The idle timeout in milliseconds.

Environment variable: QUARKUS_VERTX_EVENTBUS_IDLE_TIMEOUT

Show more

Duration

The receive buffer size.

Environment variable: QUARKUS_VERTX_EVENTBUS_RECEIVE_BUFFER_SIZE

Show more

int

The number of reconnection attempts.

Environment variable: QUARKUS_VERTX_EVENTBUS_RECONNECT_ATTEMPTS

Show more

int

0

The reconnection interval in milliseconds.

Environment variable: QUARKUS_VERTX_EVENTBUS_RECONNECT_INTERVAL

Show more

Duration

1S

Whether to reuse the address.

Environment variable: QUARKUS_VERTX_EVENTBUS_REUSE_ADDRESS

Show more

boolean

true

Whether to reuse the port.

Environment variable: QUARKUS_VERTX_EVENTBUS_REUSE_PORT

Show more

boolean

false

The send buffer size.

Environment variable: QUARKUS_VERTX_EVENTBUS_SEND_BUFFER_SIZE

Show more

int

The so linger.

Environment variable: QUARKUS_VERTX_EVENTBUS_SO_LINGER

Show more

int

Enables or Disabled SSL.

Environment variable: QUARKUS_VERTX_EVENTBUS_SSL

Show more

boolean

false

Whether to keep the TCP connection opened (keep-alive).

Environment variable: QUARKUS_VERTX_EVENTBUS_TCP_KEEP_ALIVE

Show more

boolean

false

Configure the TCP no delay.

Environment variable: QUARKUS_VERTX_EVENTBUS_TCP_NO_DELAY

Show more

boolean

true

Configure the traffic class.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRAFFIC_CLASS

Show more

int

Enables or disables the trust all parameter.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_ALL

Show more

boolean

false

The host name.

Environment variable: QUARKUS_VERTX_CLUSTER_HOST

Show more

string

localhost

The port.

Environment variable: QUARKUS_VERTX_CLUSTER_PORT

Show more

int

The public host name.

Environment variable: QUARKUS_VERTX_CLUSTER_PUBLIC_HOST

Show more

string

The public port.

Environment variable: QUARKUS_VERTX_CLUSTER_PUBLIC_PORT

Show more

int

Enables or disables the clustering.

Environment variable: QUARKUS_VERTX_CLUSTER_CLUSTERED

Show more

boolean

false

The ping interval.

Environment variable: QUARKUS_VERTX_CLUSTER_PING_INTERVAL

Show more

Duration

20S

The ping reply interval.

Environment variable: QUARKUS_VERTX_CLUSTER_PING_REPLY_INTERVAL

Show more

Duration

20S

The maximum amount of time in seconds that a successfully resolved address will be cached.

If not set explicitly, resolved addresses may be cached forever.

Environment variable: QUARKUS_VERTX_RESOLVER_CACHE_MAX_TIME_TO_LIVE

Show more

int

2147483647

The minimum amount of time in seconds that a successfully resolved address will be cached.

Environment variable: QUARKUS_VERTX_RESOLVER_CACHE_MIN_TIME_TO_LIVE

Show more

int

0

The amount of time in seconds that an unsuccessful attempt to resolve an address will be cached.

Environment variable: QUARKUS_VERTX_RESOLVER_CACHE_NEGATIVE_TIME_TO_LIVE

Show more

int

0

The maximum number of queries to be sent during a resolution.

Environment variable: QUARKUS_VERTX_RESOLVER_MAX_QUERIES

Show more

int

4

The duration after which a DNS query is considered to be failed.

Environment variable: QUARKUS_VERTX_RESOLVER_QUERY_TIMEOUT

Show more

Duration

5S

Enable or disable native transport

Environment variable: QUARKUS_VERTX_PREFER_NATIVE_TRANSPORT

Show more

boolean

false

About the Duration format

To write duration values, use the standard java.time.Duration format. See the Duration#parse() Java API documentation for more information.

You can also use a simplified format, starting with a number:

  • If the value is only a number, it represents time in seconds.

  • If the value is a number followed by ms, it represents time in milliseconds.

In other cases, the simplified format is translated to the java.time.Duration format for parsing:

  • If the value is a number followed by h, m, or s, it is prefixed with PT.

  • If the value is a number followed by d, it is prefixed with P.

See Customize the Vert.x configuration to configure the Vert.x instance using a programmatic approach.

Use Vert.x clients

除了Vert.x核心,你可以使用大多数Vert.x生态系统库。一些Quarkus扩展已经包装了Vert.x库。

可用的API

The following table lists the most used libraries from the Vert.x ecosystem. To access these APIs, add the indicated extension or dependency to your project. Check the associated documentation to learn how to use them.

API

扩展或依赖性

文件

AMQP客户端

io.quarkus:quarkus-messaging-amqp (extension)

Getting Started to Quarkus Messaging with AMQP

熔断器

io.smallrye.reactive:smallrye-mutiny-vertx-circuit-breaker (外部依赖性)

https://vertx.io/docs/vertx-circuit-breaker/java/

Consul客户端

io.smallrye.reactive:smallrye-mutiny-vertx-consul-client (外部依赖性)

https://vertx.io/docs/vertx-consul-client/java/

DB2客户端

io.quarkus:quarkus-reactive-db2-client (扩展)

Reactive SQL Clients

Kafka客户端

io.quarkus:quarkus-messaging-kafka (extension)

Apache Kafka Reference Guide

邮件客户端

io.quarkus:quarkus-mailer (扩展)

Sending emails using SMTP

MQTT客户端

io.quarkus:quarkus-messaging-mqtt (extension)

No guide yet

MS SQL客户端

io.quarkus:quarkus-reactive-mssql-client (扩展)

Reactive SQL Clients

MySQL客户端

io.quarkus:quarkus-reactive-mysql-client (扩展)

Reactive SQL Clients

Oracle客户端

io.quarkus:quarkus-reactive-oracle-client (扩展)

Reactive SQL Clients

PostgreSQL 客户端

io.quarkus:quarkus-reactive-pg-client (扩展)

Reactive SQL Clients

RabbitMQ客户端

io.smallrye.reactive:smallrye-mutiny-vertx-rabbitmq-client (外部依赖性)

https://vertx.io/docs/vertx-rabbitmq-client/java

Redis客户端

io.quarkus:quarkus-redis-client (扩展)

Using the Redis Client

Web客户端

io.smallrye.reactive:smallrye-mutiny-vertx-web-client (外部依赖性)

https://vertx.io/docs/vertx-web-client/java/

要了解更多关于Vert.x Mutiny API的用法,请参考 https://smallrye.io/smallrye-mutiny-vertx-bindings

Use the Vert.x Web Client

This section gives an example using the Vert.x WebClient in the context of a Quarkus REST (formerly RESTEasy Reactive) application. As indicated in the table above, add the following dependency to your project:

pom.xml
<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>
build.gradle
implementation("io.smallrye.reactive:smallrye-mutiny-vertx-web-client")

现在,在你的代码中,你可以创建一个 WebClient 的实例:

package org.acme.vertx;


import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import io.smallrye.mutiny.Uni;

import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.ext.web.client.WebClient;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClientOptions;

@Path("/fruit-data")
public class ResourceUsingWebClient {

    private final WebClient client;

    @Inject
    VertxResource(Vertx vertx) {
        this.client = WebClient.create(vertx);
    }

    @GET
    @Produces(MediaType.APPLICATION_JSON)
    @Path("/{name}")
    public Uni<JsonObject> getFruitData(String name) {
        return client.getAbs("https://.../api/fruit/" + name)
                .send()
                .onItem().transform(resp -> {
                    if (resp.statusCode() == 200) {
                        return resp.bodyAsJsonObject();
                    } else {
                        return new JsonObject()
                                .put("code", resp.statusCode())
                                .put("message", resp.bodyAsString());
                    }
                });
    }

}

该资源创建了一个 WebClient ,并在请求时使用该客户端来调用远程HTTP API。根据结果,响应被转发,或者创建一个包含错误的 JSON 对象。 WebClient 是异步的(和非阻塞的),端点返回一个 Uni

该应用程序也可以作为一个本地可执行文件运行。但是,首先,我们需要指示Quarkus启用 ssl (如果远程API使用HTTPS)。打开 src/main/resources/application.properties ,并添加:

quarkus.ssl.native=true

然后,用以下方法创建本地可执行文件:

CLI
quarkus build --native
Maven
./mvnw install -Dnative
Gradle
./gradlew build -Dquarkus.package.type=native

Use Vert.x JSON

Vert.x的API经常依赖JSON。Vert.x提供了两个方便的类来操作JSON文档: io.vertx.core.json.JsonObjectio.vertx.core.json.JsonArray

JsonObject 可用于将一个对象映射到其JSON表示中,并从JSON文档中建立一个对象。

// Map an object into JSON
Person person = ...;
JsonObject json = JsonObject.mapFrom(person);

// Build an object from JSON
json = new JsonObject();
person = json.mapTo(Person.class);

注意,这些功能使用由 quarkus-jackson 扩展管理的映射器。请参考 Jackson的配置 来自定义映射。

JSON Object and JSON Array are both supported as Quarkus HTTP endpoint requests and response bodies (using classic RESTEasy and Quarkus REST). Consider these endpoints:

package org.acme.vertx;

import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/hello")
@Produces(MediaType.APPLICATION_JSON)
public class VertxJsonResource {

    @GET
    @Path("{name}/object")
    public JsonObject jsonObject(String name) {
        return new JsonObject().put("Hello", name);
    }

    @GET
    @Path("{name}/array")
    public JsonArray jsonArray(String name) {
        return new JsonArray().add("Hello").add(name);
    }
}
{"Hello":"Quarkus"}
["Hello","Quarkus"]

当JSON内容是一个请求体或被包裹在一个 Uni , Multi , CompletionStagePublisher 中时,这同样适用。

Use Verticles

Verticles 是_Vert.x提供的 "一个简单的、可扩展的、类似于演员的部署和并发模型"。这个模型并不声称是一个严格的actor-model实现,但它有相似之处,特别是关于并发、扩展和部署。为了使用这个模型,你编写和 部署 顶点,通过在事件总线上发送消息进行通信。

你可以在Quarkus中部署 verticles 。它支持:

  • bare verticle - 延伸的Java类 io.vertx.core.AbstractVerticle

  • Mutiny verticle - 扩展的Java类 io.smallrye.mutiny.vertx.core.AbstractVerticle

Deploy Verticles

要部署verticles,使用 deployVerticle 方法。

@Inject Vertx vertx;

// ...
vertx.deployVerticle(MyVerticle.class.getName(), ar -> { });
vertx.deployVerticle(new MyVerticle(), ar -> { });

如果你使用Vert.x的Mutiny-variant,请注意 deployVerticle 方法返回一个 Uni ,你需要触发一个订阅来进行实际部署。

接下来会有一个例子解释如何在应用程序的初始化过程中部署verticles:

Use @ApplicationScoped beans as Verticle

一般来说,Vert.x的verticles不是CDI beans。所以不能使用注入。然而,在Quarkus中,你可以把verticle部署为Bean。注意,在这种情况下,CDI(Quarkus中的Arc)负责创建实例。

以下代码段提供了一个示例:

package io.quarkus.vertx.verticles;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class MyBeanVerticle extends AbstractVerticle {

    @ConfigProperty(name = "address") String address;

    @Override
    public Uni<Void> asyncStart() {
        return vertx.eventBus().consumer(address)
                .handler(m -> m.replyAndForget("hello"))
                .completionHandler();
    }
}

你不需要注入 vertx 实例;相反,你可以利用 AbstractVerticle 的受保护字段。

然后,用以下方法部署verticle实例。

package io.quarkus.vertx.verticles;

import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;

@ApplicationScoped
public class VerticleDeployer {

    public void init(@Observes StartupEvent e, Vertx vertx, MyBeanVerticle verticle) {
         vertx.deployVerticle(verticle).await().indefinitely();
    }
}

如果你想部署每个暴露的 AbstractVerticle ,你可以使用:

public void init(@Observes StartupEvent e, Vertx vertx, Instance<AbstractVerticle> verticles) {
    for (AbstractVerticle verticle : verticles) {
        vertx.deployVerticle(verticle).await().indefinitely();
    }
}

Create multiple verticles instances

当使用 @ApplicationScoped ,你将为你的verticle获得一个实例。拥有多个verticles的实例可以帮助他们分担负载。它们中的每一个都将与不同的I/O线程相关联(Vert.x事件循环)。

要部署 Verticle 的多个实例,请使用 @Dependent 范围而不是 @ApplicationScoped

package org.acme.verticle;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;

import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;

@Dependent
public class MyVerticle extends AbstractVerticle {

    @Override
    public Uni<Void> asyncStart() {
        return vertx.eventBus().consumer("address")
                .handler(m -> m.reply("Hello from " + this))
                .completionHandler();
    }
}

然后,按以下方式部署你的verticle:

package org.acme.verticle;

import io.quarkus.runtime.StartupEvent;
import io.vertx.core.DeploymentOptions;
import io.vertx.mutiny.core.Vertx;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;

@ApplicationScoped
public class MyApp {

    void init(@Observes StartupEvent ev, Vertx vertx, Instance<MyVerticle> verticles) {
        vertx
                .deployVerticle(verticles::get, new DeploymentOptions().setInstances(2))
                .await().indefinitely();
    }
}

init 方法接收一个 Instance<MyVerticle> 。然后,你将提供者传递给 deployVerticle 方法。该提供者只是在调用 get() 方法。由于 @Dependent 范围,它每次调用都会返回一个新的实例。最后,你向 DeploymentOptions 传递所需的实例数量,比如在前面的例子中是两个。它将调用提供者两次,这将为你的verticle创建两个实例。

Use the Event Bus

Vert.x有一个内置的 事件总线 ,你可以从你的Quarkus应用程序中使用。因此,你的应用程序组件(CDI Bean、resources…​…​)可以使用异步事件进行交互,从而促进松散耦合。

通过事件总线,你可以向 virtual addresses 发送 messages。事件总线提供三种类型的传递机制:

  • point-to-point - 发送消息,一个消费者接收。如果有几个消费者监听该地址,则采用轮流的方式;

  • publish/subscribe - 发布一个消息;所有监听该地址的消费者都在接收该消息。

  • request/reply - 发送消息并期望得到响应。接收者可以以异步的方式对消息作出回应。

所有这些交付机制都是无阻塞的,并提供了构建响应式应用的基本砖块之一。

Consume events

虽然你可以使用Vert.x的API来注册消费者,但Quarkus带有声明性支持。要消费事件,请使用 io.quarkus.vertx.ConsumeEvent 注解。

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent                           (1)
    public String consume(String name) {    (2)
        return name.toUpperCase();
    }
}
1 如果没有设置,地址是Bean的完全限定名称;例如,在这个片段中,它是 org.acme.vertx.GreetingService
2 方法参数是消息主体。如果该方法返回 something ,那就是消息的响应。

Configure the address

可以配置 @ConsumeEvent 注解设置地址:

@ConsumeEvent("greeting")               (1)
public String consume(String name) {
    return name.toUpperCase();
}
1 接收发送到 greeting 地址的信息

The address value can be a property expression. In this case, the configured value is used instead: @ConsumeEvent("${my.consumer.address}"). Additionally, the property expression can specify a default value: @ConsumeEvent("${my.consumer.address:defaultAddress}").

Config Property Example
@ConsumeEvent("${my.consumer.address}")   (1)
public String consume(String name) {
    return name.toLowerCase();
}
1 Receive the messages sent to the address configured with the my.consumer.address key.
If no config property with the specified key exists and no default value is set then the application startup fails.

Process events asynchronously

前面的例子使用了同步处理。异步处理也可以通过返回一个 io.smallrye.mutiny.Uni 或一个 java.util.concurrent.CompletionStage

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent
    public Uni<String> process(String name) {
        // return an Uni completed when the processing is finished.
        // You can also fail the Uni explicitly
    }
}
Mutiny

前面的例子使用了Mutiny响应式类型。如果你不熟悉Mutiny,请查看 Mutiny - 一个直观的响应式编程库

Blocking processing of events

默认情况下,消费该事件的代码必须是 非阻塞的 ,因为它是在一个I/O线程上调用的。如果你的处理是阻塞的,请使用 @io.smallrye.common.annotation.Blocking 注解。

@ConsumeEvent(value = "blocking-consumer")
@Blocking
void consumeBlocking(String message) {
    // Something blocking
}

或者,你可以使用 @ConsumeEvent 注解中的 blocking 属性。

@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
    // Something blocking
}

当使用 @Blocking ,它忽略了 blocking 属性的值 @ConsumeEvent

Reply to events

@ConsumeEvent 注解的方法的 返回值 被用来响应传入的消息。例如,在下面的代码片段中,返回的 是 String 类型。

@ConsumeEvent("greeting")
public String consume(String name) {
    return name.toUpperCase();
}

你也可以返回一个 Uni<T> 或一个 CompletionStage<T> 来处理异步回复:

@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
    return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}

如果你使用Context Propagation扩展,你可以注入一个 executor

@Inject Executor executor;

Implement fire-and-forget interactions

你不需要回复收到的消息。通常情况下,对于 fire 和 forget 交互来说,消息被消耗掉了,发送者不需要知道这件事。为了实现这种模式,你的消费者方法返回 void 类型 。

@ConsumeEvent("greeting")
public void consume(String event) {
    // Do something with the event
}

Consume messages (instead of events)

与之前直接使用 有效载荷 的例子不同,你也可以直接使用 Message

@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
    System.out.println(msg.address());
    System.out.println(msg.body());
}

Handle failures

如果一个用 @ConsumeEvent 注解的方法抛出一个异常,那么:

  • 如果设置了一个回复处理程序,那么失败就会通过一个带有代码 ConsumeEvent#FAILURE_CODE 和异常消息的 io.vertx.core.eventbus.ReplyException 传播回发送者,

  • if no reply handler is set, then the exception is rethrown (and wrapped in a RuntimeException if necessary) and can be handled by the default exception handler, i.e. io.vertx.core.Vertx#exceptionHandler().

Send messages

发送和发布消息使用Vert.x事件总线。

package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/async")
public class EventResource {

    @Inject
    EventBus bus;                                            (1)

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)        (2)
                .onItem().transform(Message::body);
    }
}
1 注入事件总线
2 发送消息到地址 greeting 。消息的有效载荷是 name

EventBus 对象提供了以下方法:

  1. send 一个消息到一个特定的地址 - 单个消费者收到该消息。

  2. publish 向一个特定的地址发送消息—​所有的消费者都会收到这些消息。

  3. request 留言并期望得到回复

// Case 1
bus.sendAndForget("greeting", name)
// Case 2
bus.publish("greeting", name)
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
        .onItem().transform(Message::body);

Process events on virtual threads

Methods annotated with @ConsumeEvent can also be annotated with @RunOnVirtualThread. In this case, the method is invoked on a virtual thread. Each event is invoked on a different virtual thread.

To use this feature, make sure:

  1. Your Java runtime supports virtual threads.

  2. Your method uses a blocking signature.

The second point means only methods returning an object or void can use @RunOnVirtualThread. Methods returning a Uni or a CompletionStage cannot run on virtual threads.

Read the virtual thread guide for more details.

Use codecs

The https://vertx.io/docs/vertx-core/java/event_bus[Vert.x Event Bus] uses https://vertx.io/docs/vertx-core/java/message_codecs[codecs] to _serialize and deserialize message objects. Quarkus provides a default codec for local delivery. This codec is automatically used for return types and message body parameters of local consumers, i.e. methods annotated with @ConsumeEvent whete ConsumeEvent#local() == true (which is the default).

So that you can exchange the message objects as follows:

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", new MyName(name))
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello " + name.getName());
}

如果你想使用一个特定的编解码器,你需要在两端明确地设置它:

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", name,
        new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting", codec = MyNameCodec.class)            (2)
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 设置用于发送消息的编解码器的名称
2 设置用于接收信息的编解码器

Combine HTTP and the Event Bus

Let’s revisit a greeting HTTP endpoint and use asynchronous message passing to delegate the call to a separated bean. It uses the request/reply dispatching mechanism. Instead of implementing the business logic inside the Jakarta REST endpoint, we are sending a message. Another bean consumes this message, and the response is sent using the reply mechanism.

在你的HTTP端点类中,注入事件总线,并使用 request 方法向事件总线发送一个消息,并期待一个响应:

package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

@Path("/bus")
public class EventResource {

    @Inject
    EventBus bus;

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)            (1)
                .onItem().transform(Message::body);            (2)
    }
}
1 name 发送到 greeting 地址,并要求作出回应
2 当我们得到响应时,提取正文并将其发送给用户
the HTTP method returns a Uni. If you are using Quarkus REST, Uni support is built-in. If you are using classic RESTEasy, you need to add the quarkus resteasy-mutiny extension to your project.

我们需要一个消费者监听 greeting 地址。这个消费者可以在同一个类中,也可以是另一个Bean,比如:

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent("greeting")
    public String greeting(String name) {
        return "Hello " + name;
    }

}

这个Bean接收名字并返回响应信息。

有了这个, /bus/quarkus 上的每个HTTP请求都会向事件总线发送一个消息,等待回复,当这个回复到来时,就会写入HTTP响应:

Hello Quarkus

为了更好地理解,让我们详细介绍一下HTTP request/response 是如何被处理的:

  1. 该请求由 greeting 方法接收

  2. 含有该 name 的消息被发送到事件总线

  3. 另一个Bean收到这个消息,并计算出响应

  4. 使用回复机制发回此响应

  5. 一旦发送方收到回复,会将内容写入 HTTP 响应

Bidirectional communication with browsers by using SockJS

Vert.x提供的SockJS桥允许浏览器应用程序和Quarkus应用程序使用事件总线进行通信。它连接了双方。所以,双方都可以发送在另一方收到的消息。它支持三种传递机制。

SockJS负责协商Quarkus应用程序和浏览器之间的通信渠道。如果支持WebSockets,它就使用它们;否则,它就退化为SSE、long polling等。

因此,使用SockJS,你需要配置桥梁,特别是将用于通信的地址:

package org.acme;

import io.vertx.core.Vertx;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.sockjs.SockJSBridgeOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import java.util.concurrent.atomic.AtomicInteger;

@ApplicationScoped
public class SockJsExample {

    @Inject
    Vertx vertx;

    public void init(@Observes Router router) {
        SockJSHandler sockJSHandler = SockJSHandler.create(vertx);
        Router bridge = sockJSHandler.bridge(new SockJSBridgeOptions()
                .addOutboundPermitted(new PermittedOptions().setAddress("ticks")));
        router.route("/eventbus/*").subRouter(bridge);

        AtomicInteger counter = new AtomicInteger();
        vertx.setPeriodic(1000,
                ignored -> vertx.eventBus().publish("ticks", counter.getAndIncrement()));
    }

}

This code configures the SockJS bridge to send all the messages targeting the ticks address to the connected browsers. More detailed explanations about the configuration can be found on the Vert.x SockJS Bridge documentation.

浏览器必须使用 vertx-eventbus JavaScript库来消费该信:

<!doctype html>
<html>
<head>
    <meta charset="utf-8"/>
    <title>SockJS example - Quarkus</title>
    <script src="https://code.jquery.com/jquery-3.3.1.min.js"
            integrity="sha256-FgpCb/KJQlLNfOu91ta32o/NMZxltwRo8QtmkMRdAu8=" crossorigin="anonymous"></script>
    <script type="application/javascript" src="https://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js"></script>
    <script src="https://cdn.jsdelivr.net/npm/vertx3-eventbus-client@3.8.5/vertx-eventbus.min.js"></script>
</head>
<body>

<h1>SockJS Examples</h1>

<p><strong>Last Tick:</strong> <span id="tick"></span></p>

</body>
<script>
    var eb = new EventBus('/eventbus');

    eb.onopen = function () {

        eb.registerHandler('ticks', function (error, message) {
            $("#tick").html(message.body);
        });
    }

</script>
</html>

Use native transports

Native transports are not supported in native executables.
To use io_uring, refer to the Use io_uring section.

Vert.x is capable of using Netty’s native transports, which offers performance improvements on specific platforms. To enable them, you must include the appropriate dependency for your platform. It’s usually a good idea to have both to keep your application platform-agnostic. Netty is smart enough to use the correct one, that includes none at all on unsupported platforms:

pom.xml
<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-transport-native-epoll</artifactId>
  <classifier>linux-x86_64</classifier>
</dependency>

<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-transport-native-kqueue</artifactId>
  <classifier>osx-x86_64</classifier>
</dependency>
build.gradle
implementation("io.netty:netty-transport-native-epoll::linux-x86_64")

implementation("io.netty:netty-transport-native-kqueue::osx-x86_64")

你还必须明确地配置Vert.x来使用本地传输。在 application.properties 中添加:

quarkus.vertx.prefer-native-transport=true

或者在 application.yml

quarkus:
  vertx:
    prefer-native-transport: true

如果一切顺利的话,quarkus 会输出以下日志:

[io.qua.ver.cor.run.VertxCoreRecorder] (main) Vertx has Native Transport Enabled: true

Native Linux transport

在Linux上,你可以启用以下socket选项:

  • SO_REUSEPORT

quarkus.http.so-reuse-port=true
  • TCP_QUICKACK

quarkus.http.tcp-quick-ack=true
  • TCP_CORK

quarkus.http.tcp-cork=true
  • TCP_FASTOPEN

quarkus.http.tcp-fast-open=true

Native macOS transport

On macOS Sierra and above you can enable the following socket options:

  • SO_REUSEPORT

quarkus.http.so-reuse-port=true

Use a Vert.x context-aware scheduler

Some Mutiny operators need to schedule work on an executor thread pool. A good example is .onItem().delayIt().by(Duration.ofMillis(10) as it needs such an executor to delay emissions.

The default executor is returned by io.smallrye.mutiny.infrastructure.Infrastructure and it is already configured and managed by Quarkus.

That being said, there are cases where you need to make sure that an operation is run on a Vert.x (duplicated) context and not just on any random thread.

The io.smallrye.mutiny.vertx.core.ContextAwareScheduler interface offers an API to obtain context-aware schedulers. Such a scheduler is configured with:

  1. a delegate ScheduledExecutorService of your choice (hint: you can reuse Infrastructure.getDefaultWorkerPool()), and

  2. a context fetching strategy among:

    • an explicit Context, or

    • calling Vertx::getOrCreateContext() either on the current thread or later when the scheduling request happens, or

    • calling Vertx::currentContext(), which fails if the current thread is not a Vert.x thread.

Here is a sample where ContextAwareScheduler is used:

class MyVerticle extends AbstractVerticle {

    @Override
    public Uni<Void> asyncStart() {
        vertx.getOrCreateContext().put("foo", "bar");

        var delegate = Infrastructure.getDefaultWorkerPool();
        var scheduler = ContextAwareScheduler.delegatingTo(delegate)
            .withCurrentContext();

        return Uni.createFrom().voidItem()
                .onItem().delayIt().onExecutor(scheduler).by(Duration.ofMillis(10))
                .onItem().invoke(() -> {
                    // Prints "bar"
                    var ctx = vertx.getOrCreateContext();
                    System.out.println(ctx.get("foo"));
                });
    }
}

In this example a scheduler is created by capturing the context of the Vert.x event-loop that calls asyncStart(). The delayIt operator uses that scheduler, and we can check that the context that we get in invoke is a Vert.x duplicated context where the data for key "foo" has been propagated.

Use a Unix domain socket

如果与 quarkus 服务的连接是从同一主机建立的,那么侦听 Unix 域套接字允许我们免除 TCP 的开销。如果通过代理访问服务可能会发生这种情况,如果您使用 Envoy 等代理设置服务网格,通常会出现这种情况。

这仅适用于支持 [native-transport] 的平台。

启用适当的 [native-transport] 并设置以下环境属性:

quarkus.http.domain-socket=/var/run/io.quarkus.app.socket
quarkus.http.domain-socket-enabled=true

quarkus.vertx.prefer-native-transport=true

就其本身而言,这不会禁用默认情况下将在 0.0.0.0:8080 上打开的 tcp socket(套接字)。可以明确禁用它:

quarkus.http.host-enabled=false

这些属性可以通过 Java 的 -D 命令行参数或在 application.properties 上设置。

Do not forget to add the native transport dependency. See Use native transports for details.
Make sure your application has the right permissions to write to the socket.

Use io_uring

io_uring is not supported in native executables.
io_uring support is experimental

io_uring is a Linux kernel interface that allows you to send and receive data asynchronously. It provides unified semantics for both file and network I/O. It was originally designed to target block devices and files but has since gained the ability to work with things like network sockets. It has the potential to provide modest performance benefits to network I/O on its own and greater benefits for mixed file and network I/O application workloads.

To learn more about io_uring, we recommend the following links:

  • Why you should use io_uring for network I/O: The main benefit of io_uring for network I/O is a modern asynchronous API that is straightforward to use and provides unified semantics for file and network I/O. A potential performance benefit of io_uring for network I/O is reducing the number of syscalls. This could provide the biggest benefit for high volumes of small operations where the overhead of system calls can be significant.

  • The Backend Revolution and Why io_uring Is So Important: The io_uring API uses two ring buffers for communication between application and kernel (hence the API name) and designed in a way that enables natural batching of requests and responses. Besides, it provides a way to submit multiple requests in one system call, which can reduce overhead.

  • What exactly is io_uring?: io_uring is a Linux kernel interface to efficiently allow you to send and receive data asynchronously. It was originally designed to target block devices and files but has since gained the ability to work with things like network sockets.

To use io_uring, you need to add two dependencies to your project and enable native transport. First add the following dependencies to your project:

pom.xml
<dependency>
    <groupId>io.netty.incubator</groupId>
    <artifactId>netty-incubator-transport-native-io_uring</artifactId>
    <version>0.0.21.Final</version> <!-- Update this version (https://github.com/netty/netty-incubator-transport-io_uring/tags) -->
    <classifier>linux-x86_64</classifier>
</dependency>
<dependency>
      <groupId>io.vertx</groupId>
      <artifactId>vertx-io_uring-incubator</artifactId>
</dependency>
build.gradle
// Update the io_uring version by picking the latest from https://github.com/netty/netty-incubator-transport-io_uring/tags
implementation("io.netty.incubator:netty-incubator-transport-native-io_uring:0.0.21.Final")
implementation("io.vertx:vertx-io_uring-incubator")

Then, in the application.properties, add:

quarkus.vertx.prefer-native-transport=true
Can I use io_uring on my Linux machine?

To check if you can use io_uring on your Linux machine, execute the following command:

> grep io_uring_setup /proc/kallsyms
0000000000000000 t __pfx_io_uring_setup
0000000000000000 t io_uring_setup
0000000000000000 T __pfx___x64_sys_io_uring_setup
0000000000000000 T __x64_sys_io_uring_setup
0000000000000000 T __pfx___ia32_sys_io_uring_setup
0000000000000000 T __ia32_sys_io_uring_setup
0000000000000000 d event_exit__io_uring_setup
0000000000000000 d event_enter__io_uring_setup
0000000000000000 d __syscall_meta__io_uring_setup
0000000000000000 d args__io_uring_setup
0000000000000000 d types__io_uring_setup
0000000000000000 d __event_exit__io_uring_setup
0000000000000000 d __event_enter__io_uring_setup
0000000000000000 d __p_syscall_meta__io_uring_setup

If it prints something like above, you can use io_uring.

Troubleshooting

io_uring support is still experimental. Check the Netty io_uring FAQ if you see some odd behavior. Also, the netty io_uring was slower than epoll issue describes a few configuration mistakes.

Domain sockets are not yet supported with io_uring.
The Vert.x asynchronous file system API does not use io_uring yet.

Deploy on read-only environments

在具有只读文件系统的环境中,您可能会收到以下形式的错误:

java.lang.IllegalStateException: Failed to create cache dir

假设 /tmp/ 是可写的,可以通过将 vertx.cacheDirBase 属性设置为指向 /tmp/ 中的目录来修复此问题,例如在OpenShift中,通过创建一个值为 -Dvertx.cacheDirBase=/tmp/vertx ,名为 JAVA_OPTS 的环境变量。

Customize the Vert.x configuration

The configuration of the managed Vert.x instance can be provided using the application.properties file, but also using special beans. CDI beans exposing the io.quarkus.vertx.VertxOptionsCustomizer interface can be used to customize the Vert.x configuration. For example, the following customizer change the tmp base directory:

@ApplicationScoped
public class MyCustomizer implements VertxOptionsCustomizer {

    @Override
    public void accept(VertxOptions options) {
        options.setFileSystemOptions(new FileSystemOptions().setFileCacheDir("target"));
    }
}

The customizer beans received the VertxOptions (coming from the application configuration), and can modify them.