Vert.x参考指南
Vert.x 是一个用于构建响应式应用的工具包。正如 Quarkus响应式架构 中所描述的,Quarkus在下面使用Vert.x。
本指南是《 从Quarkus应用程序中使用Eclipse Vert.x API 》指南的补充。它提供了关于Quarkus使用的Vert.x实例的使用和配置的更多高级细节。
访问Vert.x实例
要访问管理的Vert.x实例,请在你的项目中添加 quarkus-vertx
扩展。注意,这个依赖可能已经安装了(作为一个过渡性依赖)。
通过这个扩展,你可以使用字段或构造函数注入检索Vert.x的托管实例:
@ApplicationScoped
public class MyBean {
// Field injection
@Inject Vertx vertx;
// Constructor injection
MyBean(Vertx vertx) {
// ...
}
}
你也可以这样注入:
-
io.vertx.core.Vertx
暴露了 bare Vert.x API的实例 -
io.vertx.mutiny.core.Vertx
暴露 Mutiny API的实例
我们推荐使用Mutiny变体,因为它可以与Quarkus提供的其他响应式API集成。
Mutiny
如果你不熟悉Mutiny,请查看 Mutiny - 一个直观的响应式编程库 。 |
关于Vert.x Mutiny变体的文档可在 https://smallrye.io/smallrye-mutiny-vertx-bindings
配置Vert.x实例
你可以从 application.properties
文件中配置Vert.x实例。下表列出了支持的属性:
Configuration property fixed at build time - All other configuration properties are overridable at runtime
类型 |
默认 |
|
---|---|---|
Enables or disables the Vert.x cache. Environment variable: |
boolean |
|
Enables or disabled the Vert.x classpath resource resolver. Environment variable: |
boolean |
|
The number of event loops. By default, it matches the number of CPUs detected on the system. Environment variable: |
int |
|
The maximum amount of time the event loop can be blocked. Environment variable: |
|
|
The amount of time before a warning is displayed if the event loop is blocked. Environment variable: |
|
|
The size of the worker thread pool. Environment variable: |
int |
|
The maximum amount of time the worker thread can be blocked. Environment variable: |
|
|
The size of the internal thread pool (used for the file system). Environment variable: |
int |
|
The queue size. For most applications this should be unbounded Environment variable: |
int |
|
The executor growth resistance. A resistance factor applied after the core pool is full; values applied here will cause that fraction of submissions to create new threads when no idle thread is available. A value of Environment variable: |
float |
|
The amount of time a thread will stay alive with no work. Environment variable: |
|
|
Prefill thread pool when creating a new Executor. When io.vertx.core.spi.ExecutorServiceFactory.createExecutor is called, initialise with the number of defined threads at startup Environment variable: |
boolean |
|
Enables the async DNS resolver. Environment variable: |
boolean |
|
PEM Key/cert config is disabled by default. Environment variable: |
boolean |
|
Comma-separated list of the path to the key files (Pem format). Environment variable: |
list of string |
|
Comma-separated list of the path to the certificate files (Pem format). Environment variable: |
list of string |
|
JKS config is disabled by default. Environment variable: |
boolean |
|
Path of the key file (JKS format). Environment variable: |
string |
|
Password of the key file. Environment variable: |
string |
|
PFX config is disabled by default. Environment variable: |
boolean |
|
Path to the key file (PFX format). Environment variable: |
string |
|
Password of the key. Environment variable: |
string |
|
PEM Trust config is disabled by default. Environment variable: |
boolean |
|
Comma-separated list of the trust certificate files (Pem format). Environment variable: |
list of string |
|
JKS config is disabled by default. Environment variable: |
boolean |
|
Path of the key file (JKS format). Environment variable: |
string |
|
Password of the key file. Environment variable: |
string |
|
PFX config is disabled by default. Environment variable: |
boolean |
|
Path to the key file (PFX format). Environment variable: |
string |
|
Password of the key. Environment variable: |
string |
|
The accept backlog. Environment variable: |
int |
|
The client authentication. Environment variable: |
string |
|
The connect timeout. Environment variable: |
|
|
The idle timeout in milliseconds. Environment variable: |
||
The receive buffer size. Environment variable: |
int |
|
The number of reconnection attempts. Environment variable: |
int |
|
The reconnection interval in milliseconds. Environment variable: |
|
|
Whether to reuse the address. Environment variable: |
boolean |
|
Whether to reuse the port. Environment variable: |
boolean |
|
The send buffer size. Environment variable: |
int |
|
The so linger. Environment variable: |
int |
|
Enables or Disabled SSL. Environment variable: |
boolean |
|
Whether to keep the TCP connection opened (keep-alive). Environment variable: |
boolean |
|
Configure the TCP no delay. Environment variable: |
boolean |
|
Configure the traffic class. Environment variable: |
int |
|
Enables or disables the trust all parameter. Environment variable: |
boolean |
|
The host name. Environment variable: |
string |
|
The port. Environment variable: |
int |
|
The public host name. Environment variable: |
string |
|
The public port. Environment variable: |
int |
|
Enables or disables the clustering. Environment variable: |
boolean |
|
The ping interval. Environment variable: |
|
|
The ping reply interval. Environment variable: |
|
|
The maximum amount of time in seconds that a successfully resolved address will be cached. If not set explicitly, resolved addresses may be cached forever. Environment variable: |
int |
|
The minimum amount of time in seconds that a successfully resolved address will be cached. Environment variable: |
int |
|
The amount of time in seconds that an unsuccessful attempt to resolve an address will be cached. Environment variable: |
int |
|
The maximum number of queries to be sent during a resolution. Environment variable: |
int |
|
The duration after which a DNS query is considered to be failed. Environment variable: |
|
|
Enable or disable native transport Environment variable: |
boolean |
|
About the Duration format
持续时间的格式使用标准的 您还可以提供以数字开头的持续时间值。 在这种情况下,如果该值仅包含一个数字,则转换器将该值视为秒。 否则, |
使用Vert.x客户端
除了Vert.x核心,你可以使用大多数Vert.x生态系统库。一些Quarkus扩展已经包装了Vert.x库。
可用的API
下表列出了Vert.x生态系统中最常用的库。要访问这些API,请在你的项目中添加指定的扩展或依赖性。请参考相关文档,了解如何使用它们。
API |
扩展或依赖性 |
文件 |
AMQP客户端 |
|
|
熔断器 |
|
|
Consul客户端 |
|
|
DB2客户端 |
|
|
Kafka客户端 |
|
|
邮件客户端 |
|
|
MQTT客户端 |
|
No guide yet |
MS SQL客户端 |
|
|
MySQL客户端 |
|
|
Oracle客户端 |
|
|
PostgreSQL 客户端 |
|
|
RabbitMQ客户端 |
|
|
Redis客户端 |
|
|
Web客户端 |
|
要了解更多关于Vert.x Mutiny API的用法,请参考 https://smallrye.io/smallrye-mutiny-vertx-bindings
使用示例
本节给出一个在RESTEasy Reactive应用程序中使用Vert.x WebClient
的例子。如上表所示,在你的项目中添加以下依赖:
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-web-client</artifactId>
</dependency>
implementation("io.smallrye.reactive:smallrye-mutiny-vertx-web-client")
现在,在你的代码中,你可以创建一个 WebClient
的实例:
package org.acme.vertx;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.ext.web.client.WebClient;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClientOptions;
@Path("/fruit-data")
public class ResourceUsingWebClient {
private final WebClient client;
@Inject
VertxResource(Vertx vertx) {
this.client = WebClient.create(vertx);
}
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/{name}")
public Uni<JsonObject> getFruitData(String name) {
return client.getAbs("https://.../api/fruit/" + name)
.send()
.onItem().transform(resp -> {
if (resp.statusCode() == 200) {
return resp.bodyAsJsonObject();
} else {
return new JsonObject()
.put("code", resp.statusCode())
.put("message", resp.bodyAsString());
}
});
}
}
该资源创建了一个 WebClient
,并在请求时使用该客户端来调用远程HTTP API。根据结果,响应被转发,或者创建一个包含错误的 JSON 对象。 WebClient
是异步的(和非阻塞的),端点返回一个 Uni
。
该应用程序也可以作为一个本地可执行文件运行。但是,首先,我们需要指示Quarkus启用 ssl (如果远程API使用HTTPS)。打开 src/main/resources/application.properties
,并添加:
quarkus.ssl.native=true
然后,用以下方法创建本地可执行文件:
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.package.type=native
使用Vert.x JSON
Vert.x的API经常依赖JSON。Vert.x提供了两个方便的类来操作JSON文档: io.vertx.core.json.JsonObject
和 io.vertx.core.json.JsonArray
。
JsonObject
可用于将一个对象映射到其JSON表示中,并从JSON文档中建立一个对象。
// Map an object into JSON
Person person = ...;
JsonObject json = JsonObject.mapFrom(person);
// Build an object from JSON
json = new JsonObject();
person = json.mapTo(Person.class);
注意,这些功能使用由 quarkus-jackson
扩展管理的映射器。请参考 Jackson的配置 来自定义映射。
JSON Object和JSON Array都支持作为Quarkus HTTP端点的请求和响应体(使用经典的RESTEasy和RESTEasy Reactive)。考虑一下这些端点:
package org.acme.vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
@Path("/hello")
@Produces(MediaType.APPLICATION_JSON)
public class VertxJsonResource {
@GET
@Path("{name}/object")
public JsonObject jsonObject(String name) {
return new JsonObject().put("Hello", name);
}
@GET
@Path("{name}/array")
public JsonArray jsonArray(String name) {
return new JsonArray().add("Hello").add(name);
}
}
http://localhost:8080/hello/Quarkus/object 返回:
{"Hello":"Quarkus"}
http://localhost:8080/hello/Quarkus/array 返回:
["Hello","Quarkus"]
当JSON内容是一个请求体或被包裹在一个 Uni
, Multi
, CompletionStage
或 Publisher
中时,这同样适用。
使用verticles
Verticles 是_Vert.x提供的 "一个简单的、可扩展的、类似于演员的部署和并发模型"。这个模型并不声称是一个严格的actor-model实现,但它有相似之处,特别是关于并发、扩展和部署。为了使用这个模型,你编写和 部署 顶点,通过在事件总线上发送消息进行通信。
你可以在Quarkus中部署 verticles 。它支持:
-
bare verticle - 延伸的Java类
io.vertx.core.AbstractVerticle
-
Mutiny verticle - 扩展的Java类
io.smallrye.mutiny.vertx.core.AbstractVerticle
部署verticles
要部署verticles,使用 deployVerticle
方法。
@Inject Vertx vertx;
// ...
vertx.deployVerticle(MyVerticle.class.getName(), ar -> { });
vertx.deployVerticle(new MyVerticle(), ar -> { });
如果你使用Vert.x的Mutiny-variant,请注意 deployVerticle
方法返回一个 Uni
,你需要触发一个订阅来进行实际部署。
接下来会有一个例子解释如何在应用程序的初始化过程中部署verticles: |
使用@ApplicationScoped Beans作为Verticle
一般来说,Vert.x的verticles不是CDI beans。所以不能使用注入。然而,在Quarkus中,你可以把verticle部署为Bean。注意,在这种情况下,CDI(Quarkus中的Arc)负责创建实例。
以下代码段提供了一个示例:
package io.quarkus.vertx.verticles;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class MyBeanVerticle extends AbstractVerticle {
@ConfigProperty(name = "address") String address;
@Override
public Uni<Void> asyncStart() {
return vertx.eventBus().consumer(address)
.handler(m -> m.replyAndForget("hello"))
.completionHandler();
}
}
你不需要注入 vertx
实例;相反,你可以利用 AbstractVerticle
的受保护字段。
然后,用以下方法部署verticle实例。
package io.quarkus.vertx.verticles;
import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.core.Vertx;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
@ApplicationScoped
public class VerticleDeployer {
public void init(@Observes StartupEvent e, Vertx vertx, MyBeanVerticle verticle) {
vertx.deployVerticle(verticle).await().indefinitely();
}
}
如果你想部署每个暴露的 AbstractVerticle
,你可以使用:
public void init(@Observes StartupEvent e, Vertx vertx, Instance<AbstractVerticle> verticles) {
for (AbstractVerticle verticle : verticles) {
vertx.deployVerticle(verticle).await().indefinitely();
}
}
使用多个verticles的实例
当使用 @ApplicationScoped
,你将为你的verticle获得一个实例。拥有多个verticles的实例可以帮助他们分担负载。它们中的每一个都将与不同的I/O线程相关联(Vert.x事件循环)。
要部署 Verticle 的多个实例,请使用 @Dependent
范围而不是 @ApplicationScoped
:
package org.acme.verticle;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.core.AbstractVerticle;
import javax.enterprise.context.Dependent;
import javax.inject.Inject;
@Dependent
public class MyVerticle extends AbstractVerticle {
@Override
public Uni<Void> asyncStart() {
return vertx.eventBus().consumer("address")
.handler(m -> m.reply("Hello from " + this))
.completionHandler();
}
}
然后,按以下方式部署你的verticle:
package org.acme.verticle;
import io.quarkus.runtime.StartupEvent;
import io.vertx.core.DeploymentOptions;
import io.vertx.mutiny.core.Vertx;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
@ApplicationScoped
public class MyApp {
void init(@Observes StartupEvent ev, Vertx vertx, Instance<MyVerticle> verticles) {
vertx
.deployVerticle(verticles::get, new DeploymentOptions().setInstances(2))
.await().indefinitely();
}
}
init
方法接收一个 Instance<MyVerticle>
。然后,你将提供者传递给 deployVerticle
方法。该提供者只是在调用 get()
方法。由于 @Dependent
范围,它每次调用都会返回一个新的实例。最后,你向 DeploymentOptions
传递所需的实例数量,比如在前面的例子中是两个。它将调用提供者两次,这将为你的verticle创建两个实例。
使用事件总线
Vert.x有一个内置的 事件总线 ,你可以从你的Quarkus应用程序中使用。因此,你的应用程序组件(CDI Bean、resources……)可以使用异步事件进行交互,从而促进松散耦合。
通过事件总线,你可以向 virtual addresses 发送 messages。事件总线提供三种类型的传递机制:
-
point-to-point - 发送消息,一个消费者接收。如果有几个消费者监听该地址,则采用轮流的方式;
-
publish/subscribe - 发布一个消息;所有监听该地址的消费者都在接收该消息。
-
request/reply - 发送消息并期望得到响应。接收者可以以异步的方式对消息作出回应。
所有这些交付机制都是无阻塞的,并提供了构建响应式应用的基本砖块之一。
Consuming事件
虽然你可以使用Vert.x的API来注册消费者,但Quarkus带有声明性支持。要消费事件,请使用 io.quarkus.vertx.ConsumeEvent
注解。
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent (1)
public String consume(String name) { (2)
return name.toUpperCase();
}
}
1 | 如果没有设置,地址是Bean的完全限定名称;例如,在这个片段中,它是 org.acme.vertx.GreetingService 。 |
2 | 方法参数是消息主体。如果该方法返回 something ,那就是消息的响应。 |
配置地址
可以配置 @ConsumeEvent
注解设置地址:
@ConsumeEvent("greeting") (1)
public String consume(String name) {
return name.toUpperCase();
}
1 | 接收发送到 greeting 地址的信息 |
异步处理
前面的例子使用了同步处理。异步处理也可以通过返回一个 io.smallrye.mutiny.Uni
或一个 java.util.concurrent.CompletionStage
。
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent
public CompletionStage<String> consume(String name) {
// return a CompletionStage completed when the processing is finished.
// You can also fail the CompletionStage explicitly
}
@ConsumeEvent
public Uni<String> process(String name) {
// return an Uni completed when the processing is finished.
// You can also fail the Uni explicitly
}
}
Mutiny
前面的例子使用了Mutiny响应式类型。如果你不熟悉Mutiny,请查看 Mutiny - 一个直观的响应式编程库 。 |
阻塞处理
默认情况下,消费该事件的代码必须是 非阻塞的 ,因为它是在一个I/O线程上调用的。如果你的处理是阻塞的,请使用 @io.smallrye.common.annotation.Blocking
注解。
@ConsumeEvent(value = "blocking-consumer")
@Blocking
void consumeBlocking(String message) {
// Something blocking
}
或者,你可以使用 @ConsumeEvent
注解中的 blocking
属性。
@ConsumeEvent(value = "blocking-consumer", blocking = true)
void consumeBlocking(String message) {
// Something blocking
}
当使用 @Blocking
,它忽略了 blocking
属性的值 @ConsumeEvent
。
回复信息
用 @ConsumeEvent
注解的方法的 返回值 被用来响应传入的消息。例如,在下面的代码片段中,返回的 是 String
类型。
@ConsumeEvent("greeting")
public String consume(String name) {
return name.toUpperCase();
}
你也可以返回一个 Uni<T>
或一个 CompletionStage<T>
来处理异步回复:
@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}
如果你使用Context Propagation扩展,你可以注入一个
|
实现即发即弃(fire and forget)交互
你不需要回复收到的消息。通常情况下,对于 fire 和 forget 交互来说,消息被消耗掉了,发送者不需要知道这件事。为了实现这种模式,你的消费者方法返回 void
类型 。
@ConsumeEvent("greeting")
public void consume(String event) {
// Do something with the event
}
处理信息
与之前直接使用 有效载荷 的例子不同,你也可以直接使用 Message
。
@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
System.out.println(msg.address());
System.out.println(msg.body());
}
处理故障
如果一个用 @ConsumeEvent
注解的方法抛出一个异常,那么:
-
如果设置了一个回复处理程序,那么失败就会通过一个带有代码
ConsumeEvent#FAILURE_CODE
和异常消息的io.vertx.core.eventbus.ReplyException
传播回发送者, -
如果没有设置回复处理程序,那么异常会被重新抛出(如果需要的话,会被包裹在一个
RuntimeException
),并且可以由默认的异常处理程序来处理,即io.vertx.core.Vertx#exceptionHandler()
。
发送信息
发送和发布消息使用Vert.x事件总线。
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
@Inject
EventBus bus; (1)
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (2)
.onItem().transform(Message::body);
}
}
1 | 注入事件总线 |
2 | 发送消息到地址 greeting 。消息的有效载荷是 name |
EventBus
对象提供了以下方法:
-
send
一个消息到一个特定的地址 - 单个消费者收到该消息。 -
publish
向一个特定的地址发送消息—所有的消费者都会收到这些消息。 -
request
留言并期望得到回复
// Case 1
bus.sendAndForget("greeting", name)
// Case 2
bus.publish("greeting", name)
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
.onItem().transform(Message::body);
使用编解码器
Vert.x事件总线 使用编解码器来 序列化 和 反序列化 对象。Quarkus为本地交付提供了一个默认的编解码器。所以你可以按以下方式交换对象:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", new MyName(name))
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello " + name.getName());
}
如果你想使用一个特定的编解码器,你需要在两端明确地设置它:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name,
new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting", codec = MyNameCodec.class) (2)
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 | 设置用于发送消息的编解码器的名称 |
2 | 设置用于接收信息的编解码器 |
结合HTTP和事件总线
让我们重新访问一个响应的HTTP端点,并使用异步消息传递将调用委托给一个单独的bean。它使用了request/reply的调度机制。我们不是在JAX-RS端点内实现业务逻辑,而是发送一个消息。另一个Bean使用这个消息,并使用 回复 机制发送响应。
在你的HTTP端点类中,注入事件总线,并使用 request
方法向事件总线发送一个消息,并期待一个响应:
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
@Path("/bus")
public class EventResource {
@Inject
EventBus bus;
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (1)
.onItem().transform(Message::body); (2)
}
}
1 | 将 name 发送到 greeting 地址,并要求作出回应 |
2 | 当我们得到响应时,提取正文并将其发送给用户 |
HTTP方法返回一个 Uni 。如果你使用的是RESTEasy Reactive, Uni 支持是内置的。如果你使用的是 经典的 RESTEasy,你需要在你的项目中添加 quarkus resteasy-mutiny 扩展。
|
我们需要一个消费者监听 greeting
地址。这个消费者可以在同一个类中,也可以是另一个Bean,比如:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent("greeting")
public String greeting(String name) {
return "Hello " + name;
}
}
这个Bean接收名字并返回响应信息。
有了这个, /bus/quarkus
上的每个HTTP请求都会向事件总线发送一个消息,等待回复,当这个回复到来时,就会写入HTTP响应:
Hello Quarkus
为了更好地理解,让我们详细介绍一下HTTP request/response 是如何被处理的:
-
该请求由
greeting
方法接收 -
含有该 name 的消息被发送到事件总线
-
另一个Bean收到这个消息,并计算出响应
-
使用回复机制发回此响应
-
一旦发送方收到回复,会将内容写入 HTTP 响应
Bidirectional communication with browsers using SockJS
Vert.x提供的SockJS桥允许浏览器应用程序和Quarkus应用程序使用事件总线进行通信。它连接了双方。所以,双方都可以发送在另一方收到的消息。它支持三种传递机制。
SockJS负责协商Quarkus应用程序和浏览器之间的通信渠道。如果支持WebSockets,它就使用它们;否则,它就退化为SSE、long polling等。
因此,使用SockJS,你需要配置桥梁,特别是将用于通信的地址:
package org.acme.vertx;
import io.vertx.core.Vertx;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.sockjs.SockJSBridgeOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import java.util.concurrent.atomic.AtomicInteger;
@ApplicationScoped
public class SockJsExample {
@Inject
Vertx vertx;
public void init(@Observes Router router) {
SockJSHandler sockJSHandler = SockJSHandler.create(vertx);
sockJSHandler.bridge(new SockJSBridgeOptions()
.addOutboundPermitted(new PermittedOptions().setAddress("ticks")));
router.route("/eventbus/*").handler(sockJSHandler);
}
}
This code configures the SockJS bridge to send all the messages targeting the ticks
address to the connected browsers. More detailed explanations about the configuration can be found on the Vert.x SockJS Bridge documentation.
浏览器必须使用 vertx-eventbus
JavaScript库来消费该信:
<!doctype html>
<html>
<head>
<meta charset="utf-8"/>
<title>SockJS example - Quarkus</title>
<script src="https://code.jquery.com/jquery-3.3.1.min.js"
integrity="sha256-FgpCb/KJQlLNfOu91ta32o/NMZxltwRo8QtmkMRdAu8=" crossorigin="anonymous"></script>
<script type="application/javascript" src="https://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/vertx3-eventbus-client@3.8.5/vertx-eventbus.min.js"></script>
</head>
<body>
<h1>SockJS Examples</h1>
<p><strong>Last Tick:</strong> <span id="tick"></span></p>
</body>
<script>
var eb = new EventBus('/eventbus');
eb.onopen = function () {
eb.registerHandler('ticks', function (error, message) {
$("#tick").html(message.body);
});
}
</script>
</html>
本地运输
GraalVM生产的二进制文件中不支持本地传输。 |
Vert.x is capable of using Netty’s native transports, which offers performance improvements on specific platforms.To enable them, you must include the appropriate dependency for your platform. It’s usually a good idea to have both to keep your application platform-agnostic. Netty is smart enough to use the correct one, that includes none at all on unsupported platforms:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<classifier>osx-x86_64</classifier>
</dependency>
implementation("io.netty:netty-transport-native-epoll::linux-x86_64")
implementation("io.netty:netty-transport-native-kqueue::osx-x86_64")
你还必须明确地配置Vert.x来使用本地传输。在 application.properties
中添加:
quarkus.vertx.prefer-native-transport=true
或者在 application.yml
:
quarkus:
vertx:
prefer-native-transport: true
如果一切顺利的话,quarkus 会输出以下日志:
[io.qua.ver.cor.run.VertxCoreRecorder] (main) Vertx has Native Transport Enabled: true
监听Unix Domain Socket
Listening on a Unix domain socket allows us to dispense with the overhead of TCP if the connection to the quarkus service is established from the same host. This can happen if access to the service goes through a proxy which is often the case if you’re setting up a service mesh with a proxy like Envoy.
这仅适用于支持 [native-transport] 的平台。 |
启用适当的 [native-transport] 并设置以下环境属性:
quarkus.http.domain-socket=/var/run/io.quarkus.app.socket quarkus.http.domain-socket-enabled=true quarkus.vertx.prefer-native-transport=true
就其本身而言,这不会禁用默认情况下将在 0.0.0.0:8080 上打开的 tcp socket(套接字)。可以明确禁用它:
quarkus.http.host-enabled=false
这些属性可以通过 Java 的 -D
命令行参数或在 application.properties
上设置。
Do not forget to add the native transport dependency. See 本地运输 for details. |
Make sure your application has the right permissions to write to the socket. |
只读部署环境
在具有只读文件系统的环境中,您可能会收到以下形式的错误:
java.lang.IllegalStateException: Failed to create cache dir
假设 /tmp/
是可写的,可以通过将 vertx.cacheDirBase
属性设置为指向 /tmp/
中的目录来修复此问题,例如在OpenShift中,通过创建一个值为 -Dvertx.cacheDirBase=/tmp/vertx
,名为 JAVA_OPTS
的环境变量。
Customizing the Vert.x configuration
The configuration of the managed Vert.x instance can be provided using the application.properties
file, but also using special beans. CDI beans exposing the io.quarkus.vertx.VertxOptionsCustomizer
interface can be used to customize the Vert.x configuration. For example, the following customizer change the tmp
base directory:
@ApplicationScoped
public class MyCustomizer implements VertxOptionsCustomizer {
@Override
public void accept(VertxOptions options) {
options.setFileSystemOptions(new FileSystemOptions().setFileCacheDir("target"));
}
}
The customizer beans received the VertxOptions
(coming from the application configuration), and can modify them.