使用事件总线
Quarkus allows different beans to interact using asynchronous events, thus promoting loose-coupling. The messages are sent to virtual addresses. It offers 3 types of delivery mechanism:
-
point-to-point - 发送消息,一个消费者接收。如果有几个消费者监听该地址,则采用轮流的方式;
-
publish/subscribe - publish a message, all the consumers listening to the address are receiving the message;
-
request/reply - send the message and expect a response. The receiver can respond to the message in an asynchronous-fashion
All these delivery mechanisms are non-blocking, and are providing one of the fundamental brick to build reactive applications.
The asynchronous message passing feature allows replying to messages which is not supported by Reactive Messaging. However, it is limited to single-event behavior (no stream) and to local messages. |
Installing
This mechanism uses the Vert.x EventBus, so you need to enable the vertx
extension to use this feature.
If you are creating a new project, set the extensions
parameter as follows:
For Windows users:
-
If using cmd, (don’t use backward slash
\
and put everything on the same line) -
If using Powershell, wrap
-D
parameters in double quotes e.g."-DprojectArtifactId=vertx-quickstart"
If you have an already created project, the vertx
extension can be added to an existing Quarkus project with
the add-extension
command:
quarkus extension add vertx
./mvnw quarkus:add-extension -Dextensions='vertx'
./gradlew addExtension --extensions='vertx'
Otherwise, you can manually add this to the dependencies section of your build file:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx</artifactId>
</dependency>
implementation("io.quarkus:quarkus-vertx")
Consuming events
To consume events, use the io.quarkus.vertx.ConsumeEvent
annotation:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent (1)
public String consume(String name) { (2)
return name.toUpperCase();
}
}
1 | If not set, the address is the fully qualified name of the bean, for instance, in this snippet it’s org.acme.vertx.GreetingService . |
2 | The method parameter is the message body. If the method returns something it’s the message response. |
By default, the code consuming the event must be non-blocking, as it’s called on the Vert.x event loop.
If your processing is blocking, use the
Alternatively, you can annotate your method with
When using |
Asynchronous processing is also possible by returning either an io.smallrye.mutiny.Uni
or a java.util.concurrent.CompletionStage
:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent
public CompletionStage<String> consume(String name) {
// return a CompletionStage completed when the processing is finished.
// You can also fail the CompletionStage explicitly
}
@ConsumeEvent
public Uni<String> process(String name) {
// return an Uni completed when the processing is finished.
// You can also fail the Uni explicitly
}
}
Mutiny
前面的例子使用了Mutiny响应式类型。如果你不熟悉Mutiny,请查看 Mutiny - 一个直观的响应式编程库 。 |
配置地址
可以配置 @ConsumeEvent
注解设置地址:
@ConsumeEvent("greeting") (1)
public String consume(String name) {
return name.toUpperCase();
}
1 | 接收发送到 greeting 地址的信息 |
Replying
The return value of a method annotated with @ConsumeEvent
is used as response to the incoming message.
For instance, in the following snippet, the returned String
is the response.
@ConsumeEvent("greeting")
public String consume(String name) {
return name.toUpperCase();
}
你也可以返回一个 Uni<T>
或一个 CompletionStage<T>
来处理异步回复:
@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}
如果你使用Context Propagation扩展,你可以注入一个
Alternatively, you can use the default Quarkus worker pool using:
|
实现即发即弃(fire and forget)交互
You don’t have to reply to received messages.
Typically, for a fire and forget interaction, the messages are consumed and the sender does not need to know about it.
To implement this, your consumer method just returns void
@ConsumeEvent("greeting")
public void consume(String event) {
// Do something with the event
}
处理信息
As said above, this mechanism is based on the Vert.x event bus. So, you can also use Message
directly:
@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
System.out.println(msg.address());
System.out.println(msg.body());
}
处理故障
If a method annotated with @ConsumeEvent
throws an exception then:
-
if a reply handler is set then the failure is propagated back to the sender via an
io.vertx.core.eventbus.ReplyException
with codeConsumeEvent#FAILURE_CODE
and the exception message, -
if no reply handler is set then the exception is rethrown (and wrapped in a
RuntimeException
if necessary) and can be handled by the default exception handler, i.e.io.vertx.core.Vertx#exceptionHandler()
.
发送信息
Ok, we have seen how to receive messages, let’s now switch to the other side: the sender. Sending and publishing messages use the Vert.x event bus:
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
@Inject
EventBus bus; (1)
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (2)
.onItem().transform(Message::body);
}
}
1 | 注入事件总线 |
2 | 发送消息到地址 greeting 。消息的有效载荷是 name |
EventBus
对象提供了以下方法:
-
send
一个消息到一个特定的地址 - 单个消费者收到该消息。 -
publish
向一个特定的地址发送消息—所有的消费者都会收到这些消息。 -
send
a message and expect reply asynchronously -
send
a message and expect reply in a blocking manner
// Case 1
bus.<String>requestAndForget("greeting", name);
// Case 2
bus.publish("greeting", name);
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
.onItem().transform(Message::body);
// Case 4
String response = bus.<String>requestAndAwait("greeting", name).body();
Putting things together - bridging HTTP and messages
Let’s revisit a greeting HTTP endpoint and use asynchronous message passing to delegate the call to a separated bean. It uses the request/reply dispatching mechanism. Instead of implementing the business logic inside the Jakarta REST endpoint, we are sending a message. This message is consumed by another bean and the response is sent using the reply mechanism.
First create a new project using:
For Windows users:
-
If using cmd, (don’t use backward slash
\
and put everything on the same line) -
If using Powershell, wrap
-D
parameters in double quotes e.g."-DprojectArtifactId=vertx-http-quickstart"
You can already start the application in dev mode using:
quarkus dev
./mvnw quarkus:dev
./gradlew --console=plain quarkusDev
Then, creates a new Jakarta REST resource with the following content:
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
@Inject
EventBus bus;
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name) (1)
.onItem().transform(Message::body); (2)
}
}
1 | 将 name 发送到 greeting 地址,并要求作出回应 |
2 | 当我们得到响应时,提取正文并将其发送给用户 |
If you call this endpoint, you will wait and get a timeout. Indeed, no one is listening.
So, we need a consumer listening on the greeting
address. Create a GreetingService
bean with the following content:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
@ConsumeEvent("greeting")
public String greeting(String name) {
return "Hello " + name;
}
}
This bean receives the name, and returns the greeting message.
Now, open your browser to http://localhost:8080/async/Quarkus, and you should see:
Hello Quarkus
为了更好地理解,让我们详细介绍一下HTTP request/response 是如何被处理的:
-
The request is received by the
hello
method -
含有该 name 的消息被发送到事件总线
-
另一个Bean收到这个消息,并计算出响应
-
使用回复机制发回此响应
-
一旦发送方收到回复,会将内容写入 HTTP 响应
This application can be packaged using:
quarkus build
./mvnw install
./gradlew build
You can also compile it as a native executable with:
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
使用编解码器
Vert.x事件总线 使用编解码器来 序列化 和 反序列化 对象。Quarkus为本地交付提供了一个默认的编解码器。所以你可以按以下方式交换对象:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", new MyName(name))
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello " + name.getName());
}
If you want to use a specific codec, you need to explicitly set it on both ends:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
return bus.<String>request("greeting", name,
new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
.onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting", codec = MyNameCodec.class) (2)
Uni<String> greeting(MyName name) {
return Uni.createFrom().item(() -> "Hello "+name.getName());
}
1 | 设置用于发送消息的编解码器的名称 |
2 | 设置用于接收信息的编解码器 |