The English version of quarkus.io is the official project site. Translated sites are community supported on a best-effort basis.

Vert.x参考指南

Vert.x 是一个用于构建响应式应用的工具包。正如 Quarkus响应式架构 中所描述的,Quarkus在下面使用Vert.x。

本指南是《 从Quarkus应用程序中使用Eclipse Vert.x API 》指南的补充。它提供了关于Quarkus使用的Vert.x实例的使用和配置的更多高级细节。

访问Vert.x实例

要访问管理的Vert.x实例,请在你的项目中添加 quarkus-vertx 扩展。注意,这个依赖可能已经安装了(作为一个过渡性依赖)。

通过这个扩展,你可以使用字段或构造函数注入检索Vert.x的托管实例:

@ApplicationScoped
public class MyBean {
// Field injection
@Inject Vertx vertx;

// Constructor injection
MyBean(Vertx vertx) {
    // ...
}

}

你也可以这样注入:

  • io.vertx.core.Vertx 暴露了 bare Vert.x API的实例

  • io.vertx.mutiny.core.Vertx 暴露 Mutiny API的实例

我们推荐使用Mutiny变体,因为它可以与Quarkus提供的其他响应式API集成。

Mutiny

如果你不熟悉Mutiny,请查看 Mutiny - 一个直观的响应式编程库

关于Vert.x Mutiny变体的文档可在 https://smallrye.io/smallrye-mutiny-vertx-bindings

配置Vert.x实例

你可以从 application.properties 文件中配置Vert.x实例。下表列出了支持的属性:

Configuration property fixed at build time - All other configuration properties are overridable at runtime

Configuration property

类型

默认

Enables or disables the Vert.x cache.

Environment variable: QUARKUS_VERTX_CACHING

boolean

true

Enables or disabled the Vert.x classpath resource resolver.

Environment variable: QUARKUS_VERTX_CLASSPATH_RESOLVING

boolean

true

The number of event loops. By default, it matches the number of CPUs detected on the system.

Environment variable: QUARKUS_VERTX_EVENT_LOOPS_POOL_SIZE

int

The maximum amount of time the event loop can be blocked.

Environment variable: QUARKUS_VERTX_MAX_EVENT_LOOP_EXECUTE_TIME

Duration

2S

The amount of time before a warning is displayed if the event loop is blocked.

Environment variable: QUARKUS_VERTX_WARNING_EXCEPTION_TIME

Duration

2S

The size of the worker thread pool.

Environment variable: QUARKUS_VERTX_WORKER_POOL_SIZE

int

20

The maximum amount of time the worker thread can be blocked.

Environment variable: QUARKUS_VERTX_MAX_WORKER_EXECUTE_TIME

Duration

60S

The size of the internal thread pool (used for the file system).

Environment variable: QUARKUS_VERTX_INTERNAL_BLOCKING_POOL_SIZE

int

20

The queue size. For most applications this should be unbounded

Environment variable: QUARKUS_VERTX_QUEUE_SIZE

int

The executor growth resistance. A resistance factor applied after the core pool is full; values applied here will cause that fraction of submissions to create new threads when no idle thread is available. A value of 0.0f implies that threads beyond the core size should be created as aggressively as threads within it; a value of 1.0f implies that threads beyond the core size should never be created.

Environment variable: QUARKUS_VERTX_GROWTH_RESISTANCE

float

0f

The amount of time a thread will stay alive with no work.

Environment variable: QUARKUS_VERTX_KEEP_ALIVE_TIME

Duration

30S

Prefill thread pool when creating a new Executor. When io.vertx.core.spi.ExecutorServiceFactory.createExecutor is called, initialise with the number of defined threads at startup

Environment variable: QUARKUS_VERTX_PREFILL

boolean

false

Enables the async DNS resolver.

Environment variable: QUARKUS_VERTX_USE_ASYNC_DNS

boolean

false

PEM Key/cert config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PEM

boolean

false

Comma-separated list of the path to the key files (Pem format).

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PEM_KEYS

list of string

Comma-separated list of the path to the certificate files (Pem format).

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PEM_CERTS

list of string

JKS config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_JKS

boolean

false

Path of the key file (JKS format).

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_JKS_PATH

string

Password of the key file.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_JKS_PASSWORD

string

PFX config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PFX

boolean

false

Path to the key file (PFX format).

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PFX_PATH

string

Password of the key.

Environment variable: QUARKUS_VERTX_EVENTBUS_KEY_CERTIFICATE_PFX_PASSWORD

string

PEM Trust config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PEM

boolean

false

Comma-separated list of the trust certificate files (Pem format).

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PEM_CERTS

list of string

JKS config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_JKS

boolean

false

Path of the key file (JKS format).

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_JKS_PATH

string

Password of the key file.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_JKS_PASSWORD

string

PFX config is disabled by default.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PFX

boolean

false

Path to the key file (PFX format).

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PFX_PATH

string

Password of the key.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_CERTIFICATE_PFX_PASSWORD

string

The accept backlog.

Environment variable: QUARKUS_VERTX_EVENTBUS_ACCEPT_BACKLOG

int

The client authentication.

Environment variable: QUARKUS_VERTX_EVENTBUS_CLIENT_AUTH

string

NONE

The connect timeout.

Environment variable: QUARKUS_VERTX_EVENTBUS_CONNECT_TIMEOUT

Duration

60S

The idle timeout in milliseconds.

Environment variable: QUARKUS_VERTX_EVENTBUS_IDLE_TIMEOUT

Duration

The receive buffer size.

Environment variable: QUARKUS_VERTX_EVENTBUS_RECEIVE_BUFFER_SIZE

int

The number of reconnection attempts.

Environment variable: QUARKUS_VERTX_EVENTBUS_RECONNECT_ATTEMPTS

int

0

The reconnection interval in milliseconds.

Environment variable: QUARKUS_VERTX_EVENTBUS_RECONNECT_INTERVAL

Duration

1S

Whether to reuse the address.

Environment variable: QUARKUS_VERTX_EVENTBUS_REUSE_ADDRESS

boolean

true

Whether to reuse the port.

Environment variable: QUARKUS_VERTX_EVENTBUS_REUSE_PORT

boolean

false

The send buffer size.

Environment variable: QUARKUS_VERTX_EVENTBUS_SEND_BUFFER_SIZE

int

The so linger.

Environment variable: QUARKUS_VERTX_EVENTBUS_SOLINGER

int

Enables or Disabled SSL.

Environment variable: QUARKUS_VERTX_EVENTBUS_SSL

boolean

false

Whether to keep the TCP connection opened (keep-alive).

Environment variable: QUARKUS_VERTX_EVENTBUS_TCP_KEEP_ALIVE

boolean

false

Configure the TCP no delay.

Environment variable: QUARKUS_VERTX_EVENTBUS_TCP_NO_DELAY

boolean

true

Configure the traffic class.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRAFFIC_CLASS

int

Enables or disables the trust all parameter.

Environment variable: QUARKUS_VERTX_EVENTBUS_TRUST_ALL

boolean

false

The host name.

Environment variable: QUARKUS_VERTX_CLUSTER_HOST

string

localhost

The port.

Environment variable: QUARKUS_VERTX_CLUSTER_PORT

int

The public host name.

Environment variable: QUARKUS_VERTX_CLUSTER_PUBLIC_HOST

string

The public port.

Environment variable: QUARKUS_VERTX_CLUSTER_PUBLIC_PORT

int

Enables or disables the clustering.

Environment variable: QUARKUS_VERTX_CLUSTER_CLUSTERED

boolean

false

The ping interval.

Environment variable: QUARKUS_VERTX_CLUSTER_PING_INTERVAL

Duration

20S

The ping reply interval.

Environment variable: QUARKUS_VERTX_CLUSTER_PING_REPLY_INTERVAL

Duration

20S

The maximum amount of time in seconds that a successfully resolved address will be cached. If not set explicitly, resolved addresses may be cached forever.

Environment variable: QUARKUS_VERTX_RESOLVER_CACHE_MAX_TIME_TO_LIVE

int

2147483647

The minimum amount of time in seconds that a successfully resolved address will be cached.

Environment variable: QUARKUS_VERTX_RESOLVER_CACHE_MIN_TIME_TO_LIVE

int

0

The amount of time in seconds that an unsuccessful attempt to resolve an address will be cached.

Environment variable: QUARKUS_VERTX_RESOLVER_CACHE_NEGATIVE_TIME_TO_LIVE

int

0

The maximum number of queries to be sent during a resolution.

Environment variable: QUARKUS_VERTX_RESOLVER_MAX_QUERIES

int

4

The duration after which a DNS query is considered to be failed.

Environment variable: QUARKUS_VERTX_RESOLVER_QUERY_TIMEOUT

Duration

5S

Enable or disable native transport

Environment variable: QUARKUS_VERTX_PREFER_NATIVE_TRANSPORT

boolean

false

About the Duration format

持续时间的格式使用标准的 java.time.Duration 格式您可以在 Duration#parse() javadoc 中了解更多信息。

您还可以提供以数字开头的持续时间值。 在这种情况下,如果该值仅包含一个数字,则转换器将该值视为秒。 否则,PT 会隐式添加到值的前面,以获得标准的 java.time.Duration 格式。

使用Vert.x客户端

除了Vert.x核心,你可以使用大多数Vert.x生态系统库。一些Quarkus扩展已经包装了Vert.x库。

可用的API

下表列出了Vert.x生态系统中最常用的库。要访问这些API,请在你的项目中添加指定的扩展或依赖性。请参考相关文档,了解如何使用它们。

API

扩展或依赖性

文件

AMQP客户端

io.quarkus:quarkus-smallrye-reactive-messaging-amqp (扩展)

https://cn.quarkus.io/guides/amqp

熔断器

io.smallrye.reactive:smallrye-mutiny-vertx-circuit-breaker (外部依赖性)

https://vertx.io/docs/vertx-circuit-breaker/java/

Consul客户端

io.smallrye.reactive:smallrye-mutiny-vertx-consul-client (外部依赖性)

https://vertx.io/docs/vertx-consul-client/java/

DB2客户端

io.quarkus:quarkus-reactive-db2-client (扩展)

https://cn.quarkus.io/guides/reactive-sql-clients

Kafka客户端

io.quarkus:quarkus-smallrye-reactive-messaging-kafka (扩展)

https://cn.quarkus.io/guides/kafka

邮件客户端

io.quarkus:quarkus-mailer (扩展)

https://cn.quarkus.io/guides/mailer

MQTT客户端

io.quarkus:quarkus-smallrye-reactive-messaging-mqtt (扩展)

No guide yet

MS SQL客户端

io.quarkus:quarkus-reactive-mssql-client (扩展)

https://cn.quarkus.io/guides/reactive-sql-clients

MySQL客户端

io.quarkus:quarkus-reactive-mysql-client (扩展)

https://cn.quarkus.io/guides/reactive-sql-clients

Oracle客户端

io.quarkus:quarkus-reactive-oracle-client (扩展)

https://cn.quarkus.io/guides/reactive-sql-clients

PostgreSQL 客户端

io.quarkus:quarkus-reactive-pg-client (扩展)

https://cn.quarkus.io/guides/reactive-sql-clients

RabbitMQ客户端

io.smallrye.reactive:smallrye-mutiny-vertx-rabbitmq-client (外部依赖性)

https://vertx.io/docs/vertx-rabbitmq-client/java

Redis客户端

io.quarkus:quarkus-redis-client (扩展)

https://cn.quarkus.io/guides/redis

Web客户端

io.smallrye.reactive:smallrye-mutiny-vertx-web-client (外部依赖性)

https://vertx.io/docs/vertx-web-client/java/

要了解更多关于Vert.x Mutiny API的用法,请参考 https://smallrye.io/smallrye-mutiny-vertx-bindings

使用示例

本节给出一个在RESTEasy Reactive应用程序中使用Vert.x WebClient 的例子。如上表所示,在你的项目中添加以下依赖:

pom.xml
<dependency>
    <groupId>io.smallrye.reactive</groupId>
    <artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>
build.gradle
implementation("io.smallrye.reactive:smallrye-mutiny-vertx-web-client")

现在,在你的代码中,你可以创建一个 WebClient 的实例:

package org.acme.vertx;


import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import io.smallrye.mutiny.Uni;

import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.ext.web.client.WebClient;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClientOptions;

@Path("/fruit-data")
public class ResourceUsingWebClient {

    private final WebClient client;

    @Inject
    VertxResource(Vertx vertx) {
        this.client = WebClient.create(vertx);
    }

    @GET
    @Produces(MediaType.APPLICATION_JSON)
    @Path("/{name}")
    public Uni<JsonObject> getFruitData(String name) {
        return client.getAbs("https://.../api/fruit/" + name)
                .send()
                .onItem().transform(resp -> {
                    if (resp.statusCode() == 200) {
                        return resp.bodyAsJsonObject();
                    } else {
                        return new JsonObject()
                                .put("code", resp.statusCode())
                                .put("message", resp.bodyAsString());
                    }
                });
    }

}

该资源创建了一个 WebClient ,并在请求时使用该客户端来调用远程HTTP API。根据结果,响应被转发,或者创建一个包含错误的 JSON 对象。 WebClient 是异步的(和非阻塞的),端点返回一个 Uni

该应用程序也可以作为一个本地可执行文件运行。但是,首先,我们需要指示Quarkus启用 ssl (如果远程API使用HTTPS)。打开 src/main/resources/application.properties ,并添加:

quarkus.ssl.native=true

然后,用以下方法创建本地可执行文件:

CLI
quarkus build --native
Maven
./mvnw install -Dnative
Gradle
./gradlew build -Dquarkus.package.type=native

使用Vert.x JSON

Vert.x的API经常依赖JSON。Vert.x提供了两个方便的类来操作JSON文档: io.vertx.core.json.JsonObjectio.vertx.core.json.JsonArray

JsonObject 可用于将一个对象映射到其JSON表示中,并从JSON文档中建立一个对象。

// Map an object into JSON
Person person = ...;
JsonObject json = JsonObject.mapFrom(person);

// Build an object from JSON
json = new JsonObject();
person = json.mapTo(Person.class);

注意,这些功能使用由 quarkus-jackson 扩展管理的映射器。请参考 Jackson的配置 来自定义映射。

JSON Object和JSON Array都支持作为Quarkus HTTP端点的请求和响应体(使用经典的RESTEasy和RESTEasy Reactive)。考虑一下这些端点:

package org.acme.vertx;

import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/hello")
@Produces(MediaType.APPLICATION_JSON)
public class VertxJsonResource {

    @GET
    @Path("{name}/object")
    public JsonObject jsonObject(String name) {
        return new JsonObject().put("Hello", name);
    }

    @GET
    @Path("{name}/array")
    public JsonArray jsonArray(String name) {
        return new JsonArray().add("Hello").add(name);
    }
}
http://localhost:8080/hello/Quarkus/object 返回:
{"Hello":"Quarkus"}
http://localhost:8080/hello/Quarkus/array 返回:
["Hello","Quarkus"]

当JSON内容是一个请求体或被包裹在一个 Uni , Multi , CompletionStagePublisher 中时,这同样适用。

使用verticles

Verticles 是_Vert.x提供的 "一个简单的、可扩展的、类似于演员的部署和并发模型"。这个模型并不声称是一个严格的actor-model实现,但它有相似之处,特别是关于并发、扩展和部署。为了使用这个模型,你编写和 部署 顶点,通过在事件总线上发送消息进行通信。

你可以在Quarkus中部署 verticles 。它支持:

  • bare verticle - 延伸的Java类 io.vertx.core.AbstractVerticle

  • Mutiny verticle - 扩展的Java类 io.smallrye.mutiny.vertx.core.AbstractVerticle

部署verticles

要部署verticles,使用 deployVerticle 方法。

@Inject Vertx vertx;

// ...
vertx.deployVerticle(MyVerticle.class.getName(), ar -> { });
vertx.deployVerticle(new MyVerticle(), ar -> { });

如果你使用Vert.x的Mutiny-variant,请注意 deployVerticle 方法返回一个 Uni ,你需要触发一个订阅来进行实际部署。

接下来会有一个例子解释如何在应用程序的初始化过程中部署verticles:

使用@ApplicationScoped Beans作为Verticle

一般来说,Vert.x的verticles不是CDI beans。所以不能使用注入。然而,在Quarkus中,你可以把verticle部署为Bean。注意,在这种情况下,CDI(Quarkus中的Arc)负责创建实例。

以下代码段提供了一个示例:

package io.quarkus.vertx.verticles;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class MyBeanVerticle extends AbstractVerticle {

    @ConfigProperty(name = "address") String address;

    @Override
    public Uni<Void> asyncStart() {
        return vertx.eventBus().consumer(address)
                .handler(m -> m.replyAndForget("hello"))
                .completionHandler();
    }
}

你不需要注入 vertx 实例;相反,你可以利用 AbstractVerticle 的受保护字段。

然后,用以下方法部署verticle实例。

package io.quarkus.vertx.verticles;

import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;

@ApplicationScoped
public class VerticleDeployer {

    public void init(@Observes StartupEvent e, Vertx vertx, MyBeanVerticle verticle) {
         vertx.deployVerticle(verticle).await().indefinitely();
    }
}

如果你想部署每个暴露的 AbstractVerticle ,你可以使用:

public void init(@Observes StartupEvent e, Vertx vertx, Instance<AbstractVerticle> verticles) {
    for (AbstractVerticle verticle : verticles) {
        vertx.deployVerticle(verticle).await().indefinitely();
    }
}

使用多个verticles的实例

当使用 @ApplicationScoped ,你将为你的verticle获得一个实例。拥有多个verticles的实例可以帮助他们分担负载。它们中的每一个都将与不同的I/O线程相关联(Vert.x事件循环)。

要部署 Verticle 的多个实例,请使用 @Dependent 范围而不是 @ApplicationScoped

package org.acme.verticle;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;

import javax.enterprise.context.Dependent;
import javax.inject.Inject;

@Dependent
public class MyVerticle extends AbstractVerticle {

    @Override
    public Uni<Void> asyncStart() {
        return vertx.eventBus().consumer("address")
                .handler(m -> m.reply("Hello from " + this))
                .completionHandler();
    }
}

然后,按以下方式部署你的verticle:

package org.acme.verticle;

import io.quarkus.runtime.StartupEvent;
import io.vertx.core.DeploymentOptions;
import io.vertx.mutiny.core.Vertx;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;

@ApplicationScoped
public class MyApp {

    void init(@Observes StartupEvent ev, Vertx vertx, Instance<MyVerticle> verticles) {
        vertx
                .deployVerticle(verticles::get, new DeploymentOptions().setInstances(2))
                .await().indefinitely();
    }
}

init 方法接收一个 Instance<MyVerticle> 。然后,你将提供者传递给 deployVerticle 方法。该提供者只是在调用 get() 方法。由于 @Dependent 范围,它每次调用都会返回一个新的实例。最后,你向 DeploymentOptions 传递所需的实例数量,比如在前面的例子中是两个。它将调用提供者两次,这将为你的verticle创建两个实例。

使用事件总线

Vert.x有一个内置的 事件总线 ,你可以从你的Quarkus应用程序中使用。因此,你的应用程序组件(CDI Bean、resources…​…​)可以使用异步事件进行交互,从而促进松散耦合。

通过事件总线,你可以向 virtual addresses 发送 messages。事件总线提供三种类型的传递机制:

  • point-to-point - 发送消息,一个消费者接收。如果有几个消费者监听该地址,则采用轮流的方式;

  • publish/subscribe - 发布一个消息;所有监听该地址的消费者都在接收该消息。

  • request/reply - 发送消息并期望得到响应。接收者可以以异步的方式对消息作出回应。

所有这些交付机制都是无阻塞的,并提供了构建响应式应用的基本砖块之一。

Consuming事件

虽然你可以使用Vert.x的API来注册消费者,但Quarkus带有声明性支持。要消费事件,请使用 io.quarkus.vertx.ConsumeEvent 注解。

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent                           (1)
    public String consume(String name) {    (2)
        return name.toUpperCase();
    }
}
1 如果没有设置,地址是Bean的完全限定名称;例如,在这个片段中,它是 org.acme.vertx.GreetingService
2 方法参数是消息主体。如果该方法返回 something ,那就是消息的响应。

配置地址

可以配置 @ConsumeEvent 注解设置地址:

@ConsumeEvent("greeting")               (1)
public String consume(String name) {
    return name.toUpperCase();
}
1 接收发送到 greeting 地址的信息

异步处理

前面的例子使用了同步处理。异步处理也可以通过返回一个 io.smallrye.mutiny.Uni 或一个 java.util.concurrent.CompletionStage

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent
    public CompletionStage<String> consume(String name) {
        // return a CompletionStage completed when the processing is finished.
        // You can also fail the CompletionStage explicitly
    }

    @ConsumeEvent
    public Uni<String> process(String name) {
        // return an Uni completed when the processing is finished.
        // You can also fail the Uni explicitly
    }
}
Mutiny

前面的例子使用了Mutiny响应式类型。如果你不熟悉Mutiny,请查看 Mutiny - 一个直观的响应式编程库

阻塞处理

默认情况下,消费该事件的代码必须是 非阻塞的 ,因为它是在一个I/O线程上调用的。如果你的处理是阻塞的,请使用 @io.smallrye.common.annotation.Blocking 注解。

@ConsumeEvent(value = "blocking-consumer")
@Blocking
void consumeBlocking(String message) {
    // Something blocking
}

或者,你可以使用 @ConsumeEvent 注解中的 blocking 属性。

@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
    // Something blocking
}

当使用 @Blocking ,它忽略了 blocking 属性的值 @ConsumeEvent

回复信息

@ConsumeEvent 注解的方法的 返回值 被用来响应传入的消息。例如,在下面的代码片段中,返回的 是 String 类型。

@ConsumeEvent("greeting")
public String consume(String name) {
    return name.toUpperCase();
}

你也可以返回一个 Uni<T> 或一个 CompletionStage<T> 来处理异步回复:

@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
    return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}

如果你使用Context Propagation扩展,你可以注入一个 executor

@Inject Executor executor;

实现即发即弃(fire and forget)交互

你不需要回复收到的消息。通常情况下,对于 fire 和 forget 交互来说,消息被消耗掉了,发送者不需要知道这件事。为了实现这种模式,你的消费者方法返回 void 类型 。

@ConsumeEvent("greeting")
public void consume(String event) {
    // Do something with the event
}

处理信息

与之前直接使用 有效载荷 的例子不同,你也可以直接使用 Message

@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
    System.out.println(msg.address());
    System.out.println(msg.body());
}

处理故障

如果一个用 @ConsumeEvent 注解的方法抛出一个异常,那么:

  • 如果设置了一个回复处理程序,那么失败就会通过一个带有代码 ConsumeEvent#FAILURE_CODE 和异常消息的 io.vertx.core.eventbus.ReplyException 传播回发送者,

  • 如果没有设置回复处理程序,那么异常会被重新抛出(如果需要的话,会被包裹在一个 RuntimeException ),并且可以由默认的异常处理程序来处理,即 io.vertx.core.Vertx#exceptionHandler()

发送信息

发送和发布消息使用Vert.x事件总线。

package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/async")
public class EventResource {

    @Inject
    EventBus bus;                                            (1)

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)        (2)
                .onItem().transform(Message::body);
    }
}
1 注入事件总线
2 发送消息到地址 greeting 。消息的有效载荷是 name

EventBus 对象提供了以下方法:

  1. send 一个消息到一个特定的地址 - 单个消费者收到该消息。

  2. publish 向一个特定的地址发送消息—​所有的消费者都会收到这些消息。

  3. request 留言并期望得到回复

// Case 1
bus.sendAndForget("greeting", name)
// Case 2
bus.publish("greeting", name)
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
        .onItem().transform(Message::body);

使用编解码器

Vert.x事件总线 使用编解码器来 序列化反序列化 对象。Quarkus为本地交付提供了一个默认的编解码器。所以你可以按以下方式交换对象:

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", new MyName(name))
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello " + name.getName());
}

如果你想使用一个特定的编解码器,你需要在两端明确地设置它:

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", name,
        new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
        .onItem().transform(Message::body);
}

@ConsumeEvent(value = "greeting", codec = MyNameCodec.class)            (2)
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 设置用于发送消息的编解码器的名称
2 设置用于接收信息的编解码器

结合HTTP和事件总线

让我们重新访问一个响应的HTTP端点,并使用异步消息传递将调用委托给一个单独的bean。它使用了request/reply的调度机制。我们不是在JAX-RS端点内实现业务逻辑,而是发送一个消息。另一个Bean使用这个消息,并使用 回复 机制发送响应。

在你的HTTP端点类中,注入事件总线,并使用 request 方法向事件总线发送一个消息,并期待一个响应:

package org.acme.vertx;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path("/bus")
public class EventResource {

    @Inject
    EventBus bus;

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)            (1)
                .onItem().transform(Message::body);            (2)
    }
}
1 name 发送到 greeting 地址,并要求作出回应
2 当我们得到响应时,提取正文并将其发送给用户
HTTP方法返回一个 Uni 。如果你使用的是RESTEasy Reactive, Uni 支持是内置的。如果你使用的是 经典的 RESTEasy,你需要在你的项目中添加 quarkus resteasy-mutiny 扩展。

我们需要一个消费者监听 greeting 地址。这个消费者可以在同一个类中,也可以是另一个Bean,比如:

package org.acme.vertx;

import io.quarkus.vertx.ConsumeEvent;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class GreetingService {

    @ConsumeEvent("greeting")
    public String greeting(String name) {
        return "Hello " + name;
    }

}

这个Bean接收名字并返回响应信息。

有了这个, /bus/quarkus 上的每个HTTP请求都会向事件总线发送一个消息,等待回复,当这个回复到来时,就会写入HTTP响应:

Hello Quarkus

为了更好地理解,让我们详细介绍一下HTTP request/response 是如何被处理的:

  1. 该请求由 greeting 方法接收

  2. 含有该 name 的消息被发送到事件总线

  3. 另一个Bean收到这个消息,并计算出响应

  4. 使用回复机制发回此响应

  5. 一旦发送方收到回复,会将内容写入 HTTP 响应

Bidirectional communication with browsers using SockJS

Vert.x提供的SockJS桥允许浏览器应用程序和Quarkus应用程序使用事件总线进行通信。它连接了双方。所以,双方都可以发送在另一方收到的消息。它支持三种传递机制。

SockJS负责协商Quarkus应用程序和浏览器之间的通信渠道。如果支持WebSockets,它就使用它们;否则,它就退化为SSE、long polling等。

因此,使用SockJS,你需要配置桥梁,特别是将用于通信的地址:

package org.acme.vertx;

import io.vertx.core.Vertx;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.sockjs.SockJSBridgeOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import java.util.concurrent.atomic.AtomicInteger;

@ApplicationScoped
public class SockJsExample {

    @Inject
    Vertx vertx;

    public void init(@Observes Router router) {
        SockJSHandler sockJSHandler = SockJSHandler.create(vertx);
        sockJSHandler.bridge(new SockJSBridgeOptions()
                .addOutboundPermitted(new PermittedOptions().setAddress("ticks")));
        router.route("/eventbus/*").handler(sockJSHandler);
    }

}

This code configures the SockJS bridge to send all the messages targeting the ticks address to the connected browsers. More detailed explanations about the configuration can be found on the Vert.x SockJS Bridge documentation.

浏览器必须使用 vertx-eventbus JavaScript库来消费该信:

<!doctype html>
<html>
<head>
    <meta charset="utf-8"/>
    <title>SockJS example - Quarkus</title>
    <script src="https://code.jquery.com/jquery-3.3.1.min.js"
            integrity="sha256-FgpCb/KJQlLNfOu91ta32o/NMZxltwRo8QtmkMRdAu8=" crossorigin="anonymous"></script>
    <script type="application/javascript" src="https://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js"></script>
    <script src="https://cdn.jsdelivr.net/npm/vertx3-eventbus-client@3.8.5/vertx-eventbus.min.js"></script>
</head>
<body>

<h1>SockJS Examples</h1>

<p><strong>Last Tick:</strong> <span id="tick"></span></p>

</body>
<script>
    var eb = new EventBus('/eventbus');

    eb.onopen = function () {

        eb.registerHandler('ticks', function (error, message) {
            $("#tick").html(message.body);
        });
    }

</script>
</html>

本地运输

GraalVM生产的二进制文件中不支持本地传输。

Vert.x is capable of using Netty’s native transports, which offers performance improvements on specific platforms.To enable them, you must include the appropriate dependency for your platform. It’s usually a good idea to have both to keep your application platform-agnostic. Netty is smart enough to use the correct one, that includes none at all on unsupported platforms:

pom.xml
<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-transport-native-epoll</artifactId>
  <classifier>linux-x86_64</classifier>
</dependency>

<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-transport-native-kqueue</artifactId>
  <classifier>osx-x86_64</classifier>
</dependency>
build.gradle
implementation("io.netty:netty-transport-native-epoll::linux-x86_64")

implementation("io.netty:netty-transport-native-kqueue::osx-x86_64")

你还必须明确地配置Vert.x来使用本地传输。在 application.properties 中添加:

quarkus.vertx.prefer-native-transport=true

或者在 application.yml

quarkus:
  vertx:
    prefer-native-transport: true

如果一切顺利的话,quarkus 会输出以下日志:

[io.qua.ver.cor.run.VertxCoreRecorder] (main) Vertx has Native Transport Enabled: true

本地Linux传输

在Linux上,你可以启用以下socket选项:

  • SO_REUSEPORT

quarkus.http.so-reuse-port=true
  • TCP_QUICKACK

quarkus.http.tcp-quick-ack=true
  • TCP_CORK

quarkus.http.tcp-cork=true
  • TCP_FASTOPEN

quarkus.http.tcp-fast-open=true

Native macOS Transport

On macOS Sierra and above you can enable the following socket options:

  • SO_REUSEPORT

quarkus.http.so-reuse-port=true

监听Unix Domain Socket

Listening on a Unix domain socket allows us to dispense with the overhead of TCP if the connection to the quarkus service is established from the same host. This can happen if access to the service goes through a proxy which is often the case if you’re setting up a service mesh with a proxy like Envoy.

这仅适用于支持 [native-transport] 的平台。

启用适当的 [native-transport] 并设置以下环境属性:

quarkus.http.domain-socket=/var/run/io.quarkus.app.socket
quarkus.http.domain-socket-enabled=true

quarkus.vertx.prefer-native-transport=true

就其本身而言,这不会禁用默认情况下将在 0.0.0.0:8080 上打开的 tcp socket(套接字)。可以明确禁用它:

quarkus.http.host-enabled=false

这些属性可以通过 Java 的 -D 命令行参数或在 application.properties 上设置。

Do not forget to add the native transport dependency. See 本地运输 for details.
Make sure your application has the right permissions to write to the socket.

只读部署环境

在具有只读文件系统的环境中,您可能会收到以下形式的错误:

java.lang.IllegalStateException: Failed to create cache dir

假设 /tmp/ 是可写的,可以通过将 vertx.cacheDirBase 属性设置为指向 /tmp/ 中的目录来修复此问题,例如在OpenShift中,通过创建一个值为 -Dvertx.cacheDirBase=/tmp/vertx ,名为 JAVA_OPTS 的环境变量。

Customizing the Vert.x configuration

The configuration of the managed Vert.x instance can be provided using the application.properties file, but also using special beans. CDI beans exposing the io.quarkus.vertx.VertxOptionsCustomizer interface can be used to customize the Vert.x configuration. For example, the following customizer change the tmp base directory:

@ApplicationScoped
public class MyCustomizer implements VertxOptionsCustomizer {

    @Override
    public void accept(VertxOptions options) {
        options.setFileSystemOptions(new FileSystemOptions().setFileCacheDir("target"));
    }
}

The customizer beans received the VertxOptions (coming from the application configuration), and can modify them.