Vert.x参考指南
Vert.x 是一个用于构建响应式应用程序的工具集。正如 Quarkus响应式架构 中所描述的,下面,Quarkus将使用Vert.x。
本指南是《 从Quarkus应用程序中使用Eclipse Vert.x API 》指南的补充。它提供了关于Quarkus使用的Vert.x实例的使用和配置的更多高级细节。
Access the Vert.x instance
To access the managed Vert.x instance, add the quarkus-vertx
extension to your project.
This dependency might already be available in your project (as a transitive dependency).
通过这个扩展,你可以使用字段或构造函数注入检索Vert.x的托管实例:
@ApplicationScoped
public class MyBean {
// Field injection
@Inject Vertx vertx;
// Constructor injection
MyBean(Vertx vertx) {
// ...
}
}
你也可以这样注入:
-
io.vertx.core.Vertx
暴露了 bare Vert.x API的实例 -
io.vertx.mutiny.core.Vertx
暴露 Mutiny API的实例
我们推荐使用Mutiny变体,因为它可以与Quarkus提供的其他响应式API集成。
Mutiny
如果你不熟悉Mutiny,请查看 Mutiny - 一个直观的响应式编程库 。 |
关于Vert.x Mutiny变体的文档可在 https://smallrye.io/smallrye-mutiny-vertx-bindings
Configure the Vert.x instance
你可以从 application.properties
文件中配置Vert.x实例。下表列出了支持的属性:
Configuration property fixed at build time - All other configuration properties are overridable at runtime
Configuration property |
类型 |
默认值 |
---|---|---|
Enables or disables the Vert.x cache. Environment variable: Show more |
boolean |
|
Configure the file cache directory. When not set, the cache is stored in the system temporary directory (read from the Note that this property is ignored if the Environment variable: Show more |
string |
|
Enables or disabled the Vert.x classpath resource resolver. Environment variable: Show more |
boolean |
|
The number of event loops. By default, it matches the number of CPUs detected on the system. Environment variable: Show more |
int |
|
The maximum amount of time the event loop can be blocked. Environment variable: Show more |
|
|
The amount of time before a warning is displayed if the event loop is blocked. Environment variable: Show more |
|
|
The maximum amount of time the worker thread can be blocked. Environment variable: Show more |
|
|
The size of the internal thread pool (used for the file system). Environment variable: Show more |
int |
|
The queue size. For most applications this should be unbounded Environment variable: Show more |
int |
|
The executor growth resistance. A resistance factor applied after the core pool is full; values applied here will cause that fraction of submissions to create new threads when no idle thread is available. A value of Environment variable: Show more |
float |
|
The amount of time a thread will stay alive with no work. Environment variable: Show more |
|
|
Prefill thread pool when creating a new Executor. When Environment variable: Show more |
boolean |
|
Enables the async DNS resolver. Environment variable: Show more |
boolean |
|
PEM Key/cert config is disabled by default. Environment variable: Show more |
boolean |
|
Comma-separated list of the path to the key files (Pem format). Environment variable: Show more |
list of string |
|
Comma-separated list of the path to the certificate files (Pem format). Environment variable: Show more |
list of string |
|
JKS config is disabled by default. Environment variable: Show more |
boolean |
|
Path of the key file (JKS format). Environment variable: Show more |
string |
|
Password of the key file. Environment variable: Show more |
string |
|
PFX config is disabled by default. Environment variable: Show more |
boolean |
|
Path to the key file (PFX format). Environment variable: Show more |
string |
|
Password of the key. Environment variable: Show more |
string |
|
PEM Trust config is disabled by default. Environment variable: Show more |
boolean |
|
Comma-separated list of the trust certificate files (Pem format). Environment variable: Show more |
list of string |
|
JKS config is disabled by default. Environment variable: Show more |
boolean |
|
Path of the key file (JKS format). Environment variable: Show more |
string |
|
Password of the key file. Environment variable: Show more |
string |
|
PFX config is disabled by default. Environment variable: Show more |
boolean |
|
Path to the key file (PFX format). Environment variable: Show more |
string |
|
Password of the key. Environment variable: Show more |
string |
|
The accept backlog. Environment variable: Show more |
int |
|
The client authentication. Environment variable: Show more |
string |
|
The connect timeout. Environment variable: Show more |
|
|
The idle timeout in milliseconds. Environment variable: Show more |
||
The receive buffer size. Environment variable: Show more |
int |
|
The number of reconnection attempts. Environment variable: Show more |
int |
|
The reconnection interval in milliseconds. Environment variable: Show more |
|
|
Whether to reuse the address. Environment variable: Show more |
boolean |
|
Whether to reuse the port. Environment variable: Show more |
boolean |
|
The send buffer size. Environment variable: Show more |
int |
|
The so linger. Environment variable: Show more |
int |
|
Enables or Disabled SSL. Environment variable: Show more |
boolean |
|
Whether to keep the TCP connection opened (keep-alive). Environment variable: Show more |
boolean |
|
Configure the TCP no delay. Environment variable: Show more |
boolean |
|
Configure the traffic class. Environment variable: Show more |
int |
|
Enables or disables the trust all parameter. Environment variable: Show more |
boolean |
|
The host name. Environment variable: Show more |
string |
|
int |
||
The public host name. Environment variable: Show more |
string |
|
The public port. Environment variable: Show more |
int |
|
Enables or disables the clustering. Environment variable: Show more |
boolean |
|
The ping interval. Environment variable: Show more |
|
|
The ping reply interval. Environment variable: Show more |
|
|
The maximum amount of time in seconds that a successfully resolved address will be cached. If not set explicitly, resolved addresses may be cached forever. Environment variable: Show more |
int |
|
The minimum amount of time in seconds that a successfully resolved address will be cached. Environment variable: Show more |
int |
|
The amount of time in seconds that an unsuccessful attempt to resolve an address will be cached. Environment variable: Show more |
int |
|
The maximum number of queries to be sent during a resolution. Environment variable: Show more |
int |
|
The duration after which a DNS query is considered to be failed. Environment variable: Show more |
|
|
Set the path of an alternate hosts configuration file to use instead of the one provided by the os. The default value is Environment variable: Show more |
string |
|
Set the hosts configuration refresh period in millis, The resolver caches the hosts configuration (configured using Environment variable: Show more |
int |
|
Set the list of DNS server addresses, an address is the IP of the dns server, followed by an optional colon and a port, e.g Environment variable: Show more |
list of string |
|
Set to true to enable the automatic inclusion in DNS queries of an optional record that hints the remote DNS server about how much data the resolver can read per response. Environment variable: Show more |
boolean |
|
Set the DNS queries Recursion Desired flag value. Environment variable: Show more |
boolean |
|
Set the lists of DNS search domains. When the search domain list is null, the effective search domain list will be populated using the system DNS search domains. Environment variable: Show more |
list of string |
|
Set the ndots value used when resolving using search domains, the default value is Environment variable: Show more |
int |
|
Set to Environment variable: Show more |
boolean |
|
Set to Environment variable: Show more |
boolean |
|
Enable or disable native transport Environment variable: Show more |
boolean |
|
About the Duration format
To write duration values, use the standard You can also use a simplified format, starting with a number:
In other cases, the simplified format is translated to the
|
See Customize the Vert.x configuration to configure the Vert.x instance using a programmatic approach.
Use Vert.x clients
除了Vert.x核心,你可以使用大多数Vert.x生态系统库。一些Quarkus扩展已经包装了Vert.x库。
可用的API
The following table lists the most used libraries from the Vert.x ecosystem. To access these APIs, add the indicated extension or dependency to your project. Check the associated documentation to learn how to use them.
API |
扩展或依赖性 |
文件 |
AMQP客户端 |
|
|
熔断器 |
|
|
Consul客户端 |
|
|
DB2客户端 |
|
|
Kafka客户端 |
|
|
邮件客户端 |
|
|
MQTT客户端 |
|
No guide yet |
MS SQL客户端 |
|
|
MySQL客户端 |
|
|
Oracle客户端 |
|
|
PostgreSQL 客户端 |
|
|
RabbitMQ客户端 |
|
|
Redis客户端 |
|
|
Web客户端 |
|
要了解更多关于Vert.x Mutiny API的用法,请参考 https://smallrye.io/smallrye-mutiny-vertx-bindings
Use the Vert.x Web Client
This section gives an example using the Vert.x WebClient
in the context of a Quarkus REST (formerly RESTEasy Reactive) application.
As indicated in the table above, add the following dependency to your project:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>
implementation("io.smallrye.reactive:smallrye-mutiny-vertx-web-client")
现在,在你的代码中,你可以创建一个 WebClient
的实例:
package org.acme.vertx;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.ext.web.client.WebClient;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClientOptions;
@Path("/fruit-data")
public class ResourceUsingWebClient {
private final WebClient client;
@Inject
VertxResource(Vertx vertx) {
this.client = WebClient.create(vertx);
}
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/{name}")
public Uni<JsonObject> getFruitData(String name) {
return client.getAbs("https://.../api/fruit/" + name)
.send()
.onItem().transform(resp -> {
if (resp.statusCode() == 200) {
return resp.bodyAsJsonObject();
} else {
return new JsonObject()
.put("code", resp.statusCode())
.put("message", resp.bodyAsString());
}
});
}
}
该资源创建了一个 WebClient
,并在请求时使用该客户端来调用远程HTTP API。根据结果,响应被转发,或者创建一个包含错误的 JSON 对象。 WebClient
是异步的(和非阻塞的),端点返回一个 Uni
。
该应用程序也可以作为一个本地可执行文件运行。但是,首先,我们需要指示Quarkus启用 ssl (如果远程API使用HTTPS)。打开 src/main/resources/application.properties
,并添加:
quarkus.ssl.native=true
然后,用以下方法创建本地可执行文件:
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
Use Vert.x JSON
Vert.x的API经常依赖JSON。Vert.x提供了两个方便的类来操作JSON文档: io.vertx.core.json.JsonObject
和 io.vertx.core.json.JsonArray
。
JsonObject
可用于将一个对象映射到其JSON表示中,并从JSON文档中建立一个对象。
// Map an object into JSON
Person person = ...;
JsonObject json = JsonObject.mapFrom(person);
// Build an object from JSON
json = new JsonObject();
person = json.mapTo(Person.class);
注意,这些功能使用由 quarkus-jackson
扩展管理的映射器。请参考 Jackson的配置 来自定义映射。
JSON Object and JSON Array are both supported as Quarkus HTTP endpoint requests and response bodies (using classic RESTEasy and Quarkus REST). Consider these endpoints:
package org.acme.vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/hello")
@Produces(MediaType.APPLICATION_JSON)
public class VertxJsonResource {
@GET
@Path("{name}/object")
public JsonObject jsonObject(String name) {
return new JsonObject().put("Hello", name);
}
@GET
@Path("{name}/array")
public JsonArray jsonArray(String name) {
return new JsonArray().add("Hello").add(name);
}
}
{"Hello":"Quarkus"}
["Hello","Quarkus"]
当JSON内容是一个请求体或被包裹在一个 Uni
, Multi
, CompletionStage
或 Publisher
中时,这同样适用。
Use Verticles
Verticles 是_Vert.x提供的 "一个简单的、可扩展的、类似于演员的部署和并发模型"。这个模型并不声称是一个严格的actor-model实现,但它有相似之处,特别是关于并发、扩展和部署。为了使用这个模型,你编写和 部署 顶点,通过在事件总线上发送消息进行通信。
你可以在Quarkus中部署 verticles 。它支持:
-
bare verticle - 延伸的Java类
io.vertx.core.AbstractVerticle
-
Mutiny verticle - 扩展的Java类
io.smallrye.mutiny.vertx.core.AbstractVerticle
Deploy Verticles
要部署verticles,使用 deployVerticle
方法。
@Inject Vertx vertx;
// ...
vertx.deployVerticle(MyVerticle.class.getName(), ar -> { });
vertx.deployVerticle(new MyVerticle(), ar -> { });
如果你使用Vert.x的Mutiny-variant,请注意 deployVerticle
方法返回一个 Uni
,你需要触发一个订阅来进行实际部署。
接下来会有一个例子解释如何在应用程序的初始化过程中部署verticles: |
Use @ApplicationScoped beans as Verticle
一般来说,Vert.x的verticles不是CDI beans。所以不能使用注入。然而,在Quarkus中,你可以把verticle部署为Bean。注意,在这种情况下,CDI(Quarkus中的Arc)负责创建实例。
以下代码段提供了一个示例:
package io.quarkus.vertx.verticles;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class MyBeanVerticle extends AbstractVerticle {
@ConfigProperty(name = "address") String address;
@Override
public Uni<Void> asyncStart() {
return vertx.eventBus().consumer(address)
.handler(m -> m.replyAndForget("hello"))
.completionHandler();
}
}
你不需要注入 vertx
实例;相反,你可以利用 AbstractVerticle
的受保护字段。
然后,用以下方法部署verticle实例。
package io.quarkus.vertx.verticles;
import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
@ApplicationScoped
public class VerticleDeployer {
public void init(@Observes StartupEvent e, Vertx vertx, MyBeanVerticle verticle) {
vertx.deployVerticle(verticle).await().indefinitely();
}
}
如果你想部署每个暴露的 AbstractVerticle
,你可以使用:
public void init(@Observes StartupEvent e, Vertx vertx, Instance<AbstractVerticle> verticles) {
for (AbstractVerticle verticle : verticles) {
vertx.deployVerticle(verticle).await().indefinitely();
}
}
Create multiple verticles instances
当使用 @ApplicationScoped
,你将为你的verticle获得一个实例。拥有多个verticles的实例可以帮助他们分担负载。它们中的每一个都将与不同的I/O线程相关联(Vert.x事件循环)。
要部署 Verticle 的多个实例,请使用 @Dependent
范围而不是 @ApplicationScoped
:
package org.acme.verticle;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
@Dependent
public class MyVerticle extends AbstractVerticle {
@Override
public Uni<Void> asyncStart() {
return vertx.eventBus().consumer("address")
.handler(m -> m.reply("Hello from " + this))
.completionHandler();
}
}
然后,按以下方式部署你的verticle:
package org.acme.verticle;
import io.quarkus.runtime.StartupEvent;
import io.vertx.core.DeploymentOptions;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
@ApplicationScoped
public class MyApp {
void init(@Observes StartupEvent ev, Vertx vertx, Instance<MyVerticle> verticles) {
vertx
.deployVerticle(verticles::get, new DeploymentOptions().setInstances(2))
.await().indefinitely();
}
}
init
方法接收一个 Instance<MyVerticle>
。然后,你将提供者传递给 deployVerticle
方法。该提供者只是在调用 get()
方法。由于 @Dependent
范围,它每次调用都会返回一个新的实例。最后,你向 DeploymentOptions
传递所需的实例数量,比如在前面的例子中是两个。它将调用提供者两次,这将为你的verticle创建两个实例。
Use the Event Bus
Vert.x有一个内置的 事件总线 ,你可以从你的Quarkus应用程序中使用。因此,你的应用程序组件(CDI Bean、resources……)可以使用异步事件进行交互,从而促进松散耦合。
通过事件总线,你可以向 virtual addresses 发送 messages。事件总线提供三种类型的传递机制:
-
point-to-point - 发送消息,一个消费者接收。如果有几个消费者监听该地址,则采用轮流的方式;
-
publish/subscribe - 发布一个消息;所有监听该地址的消费者都在接收该消息。
-
request/reply - 发送消息并期望得到响应。接收者可以以异步的方式对消息作出回应。
所有这些交付机制都是无阻塞的,并提供了构建响应式应用的基本砖块之一。
Consume events
虽然你可以使用Vert.x的API来注册消费者,但Quarkus带有声明性支持。要消费事件,请使用 io.quarkus.vertx.ConsumeEvent
注解。
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent (1)
public String consume(String name) { (2)
return name.toUpperCase();
}
}
1 | 如果没有设置,地址是Bean的完全限定名称;例如,在这个片段中,它是 org.acme.vertx.GreetingService 。 |
2 | 方法参数是消息主体。如果该方法返回 something ,那就是消息的响应。 |
Configure the address
可以配置 @ConsumeEvent
注解设置地址:
@ConsumeEvent("greeting") (1)
public String consume(String name) {
return name.toUpperCase();
}
1 | 接收发送到 greeting 地址的信息 |
The address value can be a property expression.
In this case, the configured value is used instead: @ConsumeEvent("${my.consumer.address}")
.
Additionally, the property expression can specify a default value: @ConsumeEvent("${my.consumer.address:defaultAddress}")
.
@ConsumeEvent("${my.consumer.address}") (1)
public String consume(String name) {
return name.toLowerCase();
}
1 | Receive the messages sent to the address configured with the my.consumer.address key. |
If no config property with the specified key exists and no default value is set then the application startup fails. |
Process events asynchronously
前面的例子使用了同步处理。异步处理也可以通过返回一个 io.smallrye.mutiny.Uni
或一个 java.util.concurrent.CompletionStage
。
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent
public Uni<String> process(String name) {
// return an Uni completed when the processing is finished.
// You can also fail the Uni explicitly
}
}
Mutiny
前面的例子使用了Mutiny响应式类型。如果你不熟悉Mutiny,请查看 Mutiny - 一个直观的响应式编程库 。 |
Blocking processing of events
默认情况下,消费该事件的代码必须是 非阻塞的 ,因为它是在一个I/O线程上调用的。如果你的处理是阻塞的,请使用 @io.smallrye.common.annotation.Blocking
注解。
@ConsumeEvent(value = "blocking-consumer")
@Blocking
void consumeBlocking(String message) {
// Something blocking
}
或者,你可以使用 @ConsumeEvent
注解中的 blocking
属性。
@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
// Something blocking
}
当使用 @Blocking
,它忽略了 blocking
属性的值 @ConsumeEvent
。
Reply to events
用 @ConsumeEvent
注解的方法的 返回值 被用来响应传入的消息。例如,在下面的代码片段中,返回的 是 String
类型。
@ConsumeEvent("greeting")
public String consume(String name) {
return name.toUpperCase();
}
你也可以返回一个 Uni<T>
或一个 CompletionStage<T>
来处理异步回复:
@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}
如果你使用Context Propagation扩展,你可以注入一个
|
Implement fire-and-forget interactions
你不需要回复收到的消息。通常情况下,对于 fire 和 forget 交互来说,消息被消耗掉了,发送者不需要知道这件事。为了实现这种模式,你的消费者方法返回 void
类型 。
@ConsumeEvent("greeting")
public void consume(String event) {
// Do something with the event
}
Consume messages (instead of events)
与之前直接使用 有效载荷 的例子不同,你也可以直接使用 Message
。
@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
System.out.println(msg.address());
System.out.println(msg.body());
}
Handle failures
如果一个用 @ConsumeEvent
注解的方法抛出一个异常,那么:
-
如果设置了一个回复处理程序,那么失败就会通过一个带有代码
ConsumeEvent#FAILURE_CODE
和异常消息的io.vertx.core.eventbus.ReplyException
传播回发送者, -
if no reply handler is set, then the exception is rethrown (and wrapped in a
RuntimeException
if necessary) and can be handled by the default exception handler, i.e.io.vertx.core.Vertx#exceptionHandler()
.
Send messages
发送和发布消息使用Vert.x事件总线。
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
@Inject
EventBus bus; (1)
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (2)
.onItem().transform(Message::body);
}
}
1 | 注入事件总线 |
2 | 发送消息到地址 greeting 。消息的有效载荷是 name |
EventBus
对象提供了以下方法:
-
send
一个消息到一个特定的地址 - 单个消费者收到该消息。 -
publish
向一个特定的地址发送消息—所有的消费者都会收到这些消息。 -
request
留言并期望得到回复
// Case 1
bus.sendAndForget("greeting", name)
// Case 2
bus.publish("greeting", name)
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
.onItem().transform(Message::body);
Process events on virtual threads
Methods annotated with @ConsumeEvent
can also be annotated with @RunOnVirtualThread
.
In this case, the method is invoked on a virtual thread.
Each event is invoked on a different virtual thread.
To use this feature, make sure:
-
Your Java runtime supports virtual threads.
-
Your method uses a blocking signature.
The second point means only methods returning an object or void
can use @RunOnVirtualThread
.
Methods returning a Uni
or a CompletionStage
cannot run on virtual threads.
Read the virtual thread guide for more details.
Use codecs
The https://vertx.io/docs/vertx-core/java/event_bus[Vert.x Event Bus] uses https://vertx.io/docs/vertx-core/java/message_codecs[codecs] to _serialize and deserialize message objects.
Quarkus provides a default codec for local delivery.
This codec is automatically used for return types and message body parameters of local consumers, i.e. methods annotated with @ConsumeEvent
where ConsumeEvent#local() == true
(which is the default).
So that you can exchange the message objects as follows:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", new MyName(name))
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello " + name.getName());
}
如果你想使用一个特定的编解码器,你需要在两端明确地设置它:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name,
new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting", codec = MyNameCodec.class) (2)
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 | 设置用于发送消息的编解码器的名称 |
2 | 设置用于接收信息的编解码器 |
Combine HTTP and the Event Bus
Let’s revisit a greeting HTTP endpoint and use asynchronous message passing to delegate the call to a separated bean. It uses the request/reply dispatching mechanism. Instead of implementing the business logic inside the Jakarta REST endpoint, we are sending a message. Another bean consumes this message, and the response is sent using the reply mechanism.
在你的HTTP端点类中,注入事件总线,并使用 request
方法向事件总线发送一个消息,并期待一个响应:
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/bus")
public class EventResource {
@Inject
EventBus bus;
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (1)
.onItem().transform(Message::body); (2)
}
}
1 | 将 name 发送到 greeting 地址,并要求作出回应 |
2 | 当我们得到响应时,提取正文并将其发送给用户 |
the HTTP method returns a Uni .
If you are using Quarkus REST, Uni support is built-in.
If you are using classic RESTEasy, you need to add the quarkus resteasy-mutiny extension to your project.
|
我们需要一个消费者监听 greeting
地址。这个消费者可以在同一个类中,也可以是另一个Bean,比如:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent("greeting")
public String greeting(String name) {
return "Hello " + name;
}
}
这个Bean接收名字并返回响应信息。
有了这个, /bus/quarkus
上的每个HTTP请求都会向事件总线发送一个消息,等待回复,当这个回复到来时,就会写入HTTP响应:
Hello Quarkus
为了更好地理解,让我们详细介绍一下HTTP request/response 是如何被处理的:
-
该请求由
greeting
方法接收 -
含有该 name 的消息被发送到事件总线
-
另一个Bean收到这个消息,并计算出响应
-
使用回复机制发回此响应
-
一旦发送方收到回复,会将内容写入 HTTP 响应
Bidirectional communication with browsers by using SockJS
Vert.x提供的SockJS桥允许浏览器应用程序和Quarkus应用程序使用事件总线进行通信。它连接了双方。所以,双方都可以发送在另一方收到的消息。它支持三种传递机制。
SockJS负责协商Quarkus应用程序和浏览器之间的通信渠道。如果支持WebSockets,它就使用它们;否则,它就退化为SSE、long polling等。
因此,使用SockJS,你需要配置桥梁,特别是将用于通信的地址:
package org.acme;
import io.vertx.core.Vertx;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.sockjs.SockJSBridgeOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import java.util.concurrent.atomic.AtomicInteger;
@ApplicationScoped
public class SockJsExample {
@Inject
Vertx vertx;
public void init(@Observes Router router) {
SockJSHandler sockJSHandler = SockJSHandler.create(vertx);
Router bridge = sockJSHandler.bridge(new SockJSBridgeOptions()
.addOutboundPermitted(new PermittedOptions().setAddress("ticks")));
router.route("/eventbus/*").subRouter(bridge);
AtomicInteger counter = new AtomicInteger();
vertx.setPeriodic(1000,
ignored -> vertx.eventBus().publish("ticks", counter.getAndIncrement()));
}
}
This code configures the SockJS bridge to send all the messages targeting the ticks
address to the connected browsers.
More detailed explanations about the configuration can be found on the Vert.x SockJS Bridge documentation.
浏览器必须使用 vertx-eventbus
JavaScript库来消费该信:
<!doctype html>
<html>
<head>
<meta charset="utf-8"/>
<title>SockJS example - Quarkus</title>
<script src="https://code.jquery.com/jquery-3.3.1.min.js"
integrity="sha256-FgpCb/KJQlLNfOu91ta32o/NMZxltwRo8QtmkMRdAu8=" crossorigin="anonymous"></script>
<script type="application/javascript" src="https://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/vertx3-eventbus-client@3.8.5/vertx-eventbus.min.js"></script>
</head>
<body>
<h1>SockJS Examples</h1>
<p><strong>Last Tick:</strong> <span id="tick"></span></p>
</body>
<script>
var eb = new EventBus('/eventbus');
eb.onopen = function () {
eb.registerHandler('ticks', function (error, message) {
$("#tick").html(message.body);
});
}
</script>
</html>
Use native transports
Native transports are not supported in native executables. |
To use io_uring , refer to the Use io_uring section.
|
Vert.x is capable of using Netty’s native transports, which offers performance improvements on specific platforms. To enable them, you must include the appropriate dependency for your platform. It’s usually a good idea to have both to keep your application platform-agnostic. Netty is smart enough to use the correct one, that includes none at all on unsupported platforms:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<classifier>osx-x86_64</classifier>
</dependency>
implementation("io.netty:netty-transport-native-epoll::linux-x86_64")
implementation("io.netty:netty-transport-native-kqueue::osx-x86_64")
你还必须明确地配置Vert.x来使用本地传输。在 application.properties
中添加:
quarkus.vertx.prefer-native-transport=true
或者在 application.yml
:
quarkus:
vertx:
prefer-native-transport: true
如果一切顺利的话,quarkus 会输出以下日志:
[io.qua.ver.cor.run.VertxCoreRecorder] (main) Vertx has Native Transport Enabled: true
Use a Vert.x context-aware scheduler
Some Mutiny operators need to schedule work on an executor thread pool.
A good example is .onItem().delayIt().by(Duration.ofMillis(10)
as it needs such an executor to delay emissions.
The default executor is returned by io.smallrye.mutiny.infrastructure.Infrastructure
and it is already configured and managed by Quarkus.
That being said, there are cases where you need to make sure that an operation is run on a Vert.x (duplicated) context and not just on any random thread.
The io.smallrye.mutiny.vertx.core.ContextAwareScheduler
interface offers an API to obtain context-aware schedulers.
Such a scheduler is configured with:
-
a delegate
ScheduledExecutorService
of your choice (hint: you can reuseInfrastructure.getDefaultWorkerPool()
), and -
a context fetching strategy among:
-
an explicit
Context
, or -
calling
Vertx::getOrCreateContext()
either on the current thread or later when the scheduling request happens, or -
calling
Vertx::currentContext()
, which fails if the current thread is not a Vert.x thread.
-
Here is a sample where ContextAwareScheduler
is used:
class MyVerticle extends AbstractVerticle {
@Override
public Uni<Void> asyncStart() {
vertx.getOrCreateContext().put("foo", "bar");
var delegate = Infrastructure.getDefaultWorkerPool();
var scheduler = ContextAwareScheduler.delegatingTo(delegate)
.withCurrentContext();
return Uni.createFrom().voidItem()
.onItem().delayIt().onExecutor(scheduler).by(Duration.ofMillis(10))
.onItem().invoke(() -> {
// Prints "bar"
var ctx = vertx.getOrCreateContext();
System.out.println(ctx.get("foo"));
});
}
}
In this example a scheduler is created by capturing the context of the Vert.x event-loop that calls asyncStart()
.
The delayIt
operator uses that scheduler, and we can check that the context that we get in invoke
is a Vert.x duplicated context where the data for key "foo"
has been propagated.
Use a Unix domain socket
如果与 quarkus 服务的连接是从同一主机建立的,那么侦听 Unix 域套接字允许我们免除 TCP 的开销。如果通过代理访问服务可能会发生这种情况,如果您使用 Envoy 等代理设置服务网格,通常会出现这种情况。
这仅适用于支持 [native-transport] 的平台。 |
启用适当的 [native-transport] 并设置以下环境属性:
quarkus.http.domain-socket=/var/run/io.quarkus.app.socket quarkus.http.domain-socket-enabled=true quarkus.vertx.prefer-native-transport=true
就其本身而言,这不会禁用默认情况下将在 0.0.0.0:8080 上打开的 tcp socket(套接字)。可以明确禁用它:
quarkus.http.host-enabled=false
这些属性可以通过 Java 的 -D
命令行参数或在 application.properties
上设置。
Do not forget to add the native transport dependency. See Use native transports for details. |
Make sure your application has the right permissions to write to the socket. |
Use io_uring
io_uring is not supported in native executables.
|
io_uring support is experimental
|
io_uring
is a Linux kernel interface that allows you to send and receive data asynchronously.
It provides unified semantics for both file and network I/O.
It was originally designed to target block devices and files but has since gained the ability to work with things like network sockets.
It has the potential to provide modest performance benefits to network I/O on its own and greater benefits for mixed file and network I/O application workloads.
To learn more about io_uring
, we recommend the following links:
-
Why you should use io_uring for network I/O: The main benefit of io_uring for network I/O is a modern asynchronous API that is straightforward to use and provides unified semantics for file and network I/O. A potential performance benefit of io_uring for network I/O is reducing the number of syscalls. This could provide the biggest benefit for high volumes of small operations where the overhead of system calls can be significant.
-
The Backend Revolution and Why io_uring Is So Important: The io_uring API uses two ring buffers for communication between application and kernel (hence the API name) and designed in a way that enables natural batching of requests and responses. Besides, it provides a way to submit multiple requests in one system call, which can reduce overhead.
-
What exactly is io_uring?: io_uring is a Linux kernel interface to efficiently allow you to send and receive data asynchronously. It was originally designed to target block devices and files but has since gained the ability to work with things like network sockets.
To use io_uring
, you need to add two dependencies to your project and enable native transport.
First add the following dependencies to your project:
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<version>0.0.21.Final</version> <!-- Update this version (https://github.com/netty/netty-incubator-transport-io_uring/tags) -->
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-io_uring-incubator</artifactId>
</dependency>
// Update the io_uring version by picking the latest from https://github.com/netty/netty-incubator-transport-io_uring/tags
implementation("io.netty.incubator:netty-incubator-transport-native-io_uring:0.0.21.Final")
implementation("io.vertx:vertx-io_uring-incubator")
Then, in the application.properties
, add:
quarkus.vertx.prefer-native-transport=true
Can I use io_uring on my Linux machine?
To check if you can use
If it prints something like above, you can use |
Troubleshooting
|
Domain sockets are not yet supported with io_uring. |
The Vert.x asynchronous file system API does not use io_uring yet. |
Deploy on read-only environments
在具有只读文件系统的环境中,您可能会收到以下形式的错误:
java.lang.IllegalStateException: Failed to create cache dir
Assuming /tmp/
is writable this can be fixed by setting the vertx.cacheDirBase
property to point to a directory in /tmp/
for instance in Kubernetes by creating an environment variable JAVA_OPTS
with the value -Dvertx.cacheDirBase=/tmp/vertx
, or setting the quarkus.vertx.cache-directory
property in application.properties
:
quarkus.vertx.cache-directory=/tmp/vertx
Customize the Vert.x configuration
The configuration of the managed Vert.x instance can be provided using the application.properties
file, but also using special beans.
CDI beans exposing the io.quarkus.vertx.VertxOptionsCustomizer
interface can be used to customize the Vert.x configuration.
For example, the following customizer change the tmp
base directory:
@ApplicationScoped
public class MyCustomizer implements VertxOptionsCustomizer {
@Override
public void accept(VertxOptions options) {
options.setFileSystemOptions(new FileSystemOptions().setFileCacheDir("target"));
}
}
The customizer beans received the VertxOptions
(coming from the application configuration), and can modify them.