使用事件总线
Quarkus allows different beans to interact using asynchronous events, thus promoting loose-coupling. The messages are sent to virtual addresses. It offers 3 types of delivery mechanism:
- 
point-to-point - 发送消息,一个消费者接收。如果有几个消费者监听该地址,则采用轮流的方式;
 - 
publish/subscribe - publish a message, all the consumers listening to the address are receiving the message;
 - 
request/reply - send the message and expect a response. The receiver can respond to the message in an asynchronous-fashion
 
All these delivery mechanisms are non-blocking, and are providing one of the fundamental brick to build reactive applications.
| The asynchronous message passing feature allows replying to messages which is not supported by Reactive Messaging. However, it is limited to single-event behavior (no stream) and to local messages. | 
Installing
This mechanism uses the Vert.x EventBus, so you need to enable the vertx extension to use this feature.
If you are creating a new project, set the extensions parameter as follows:
For Windows users:
- 
If using cmd, (don’t use backward slash
\and put everything on the same line) - 
If using Powershell, wrap
-Dparameters in double quotes e.g."-DprojectArtifactId=vertx-quickstart" 
If you have an already created project, the vertx extension can be added to an existing Quarkus project with
the add-extension command:
quarkus extension add vertx
./mvnw quarkus:add-extension -Dextensions='vertx'
./gradlew addExtension --extensions='vertx'
Otherwise, you can manually add this to the dependencies section of your build file:
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-vertx</artifactId>
</dependency>
implementation("io.quarkus:quarkus-vertx")
Consuming events
To consume events, use the io.quarkus.vertx.ConsumeEvent annotation:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
    @ConsumeEvent                           (1)
    public String consume(String name) {    (2)
        return name.toUpperCase();
    }
}
| 1 | If not set, the address is the fully qualified name of the bean, for instance, in this snippet it’s org.acme.vertx.GreetingService. | 
| 2 | The method parameter is the message body. If the method returns something it’s the message response. | 
| 
 By default, the code consuming the event must be non-blocking, as it’s called on the Vert.x event loop.
If your processing is blocking, use the  
Alternatively, you can annotate your method with  
When using   | 
Asynchronous processing is also possible by returning either an io.smallrye.mutiny.Uni or a java.util.concurrent.CompletionStage:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class GreetingService {
    @ConsumeEvent
    public CompletionStage<String> consume(String name) {
        // return a CompletionStage completed when the processing is finished.
        // You can also fail the CompletionStage explicitly
    }
    @ConsumeEvent
    public Uni<String> process(String name) {
        // return an Uni completed when the processing is finished.
        // You can also fail the Uni explicitly
    }
}
| 
 Mutiny 
前面的例子使用了Mutiny响应式类型。如果你不熟悉Mutiny,请查看 Mutiny - 一个直观的响应式编程库 。  | 
配置地址
可以配置 @ConsumeEvent 注解设置地址:
@ConsumeEvent("greeting")               (1)
public String consume(String name) {
    return name.toUpperCase();
}
| 1 | 接收发送到 greeting 地址的信息 | 
Replying
The return value of a method annotated with @ConsumeEvent is used as response to the incoming message.
For instance, in the following snippet, the returned String is the response.
@ConsumeEvent("greeting")
public String consume(String name) {
    return name.toUpperCase();
}
你也可以返回一个 Uni<T> 或一个 CompletionStage<T> 来处理异步回复:
@ConsumeEvent("greeting")
public Uni<String> consume2(String name) {
    return Uni.createFrom().item(() -> name.toUpperCase()).emitOn(executor);
}
| 
 如果你使用Context Propagation扩展,你可以注入一个  
Alternatively, you can use the default Quarkus worker pool using: 
 | 
实现即发即弃(fire and forget)交互
You don’t have to reply to received messages.
Typically, for a fire and forget interaction, the messages are consumed and the sender does not need to know about it.
To implement this, your consumer method just returns void
@ConsumeEvent("greeting")
public void consume(String event) {
    // Do something with the event
}
处理信息
As said above, this mechanism is based on the Vert.x event bus. So, you can also use Message directly:
@ConsumeEvent("greeting")
public void consume(Message<String> msg) {
    System.out.println(msg.address());
    System.out.println(msg.body());
}
处理故障
If a method annotated with @ConsumeEvent throws an exception then:
- 
if a reply handler is set then the failure is propagated back to the sender via an
io.vertx.core.eventbus.ReplyExceptionwith codeConsumeEvent#FAILURE_CODEand the exception message, - 
if no reply handler is set then the exception is rethrown (and wrapped in a
RuntimeExceptionif necessary) and can be handled by the default exception handler, i.e.io.vertx.core.Vertx#exceptionHandler(). 
发送信息
Ok, we have seen how to receive messages, let’s now switch to the other side: the sender. Sending and publishing messages use the Vert.x event bus:
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
    @Inject
    EventBus bus;                                       (1)
    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)        (2)
                .onItem().transform(Message::body);
    }
}
| 1 | 注入事件总线 | 
| 2 | 发送消息到地址 greeting 。消息的有效载荷是 name  | 
EventBus 对象提供了以下方法:
- 
send一个消息到一个特定的地址 - 单个消费者收到该消息。 - 
publish向一个特定的地址发送消息—所有的消费者都会收到这些消息。 - 
senda message and expect reply asynchronously - 
senda message and expect reply in a blocking manner 
// Case 1
bus.<String>requestAndForget("greeting", name);
// Case 2
bus.publish("greeting", name);
// Case 3
Uni<String> response = bus.<String>request("address", "hello, how are you?")
        .onItem().transform(Message::body);
// Case 4
String response = bus.<String>requestAndAwait("greeting", name).body();
Putting things together - bridging HTTP and messages
Let’s revisit a greeting HTTP endpoint and use asynchronous message passing to delegate the call to a separated bean. It uses the request/reply dispatching mechanism. Instead of implementing the business logic inside the Jakarta REST endpoint, we are sending a message. This message is consumed by another bean and the response is sent using the reply mechanism.
First create a new project using:
For Windows users:
- 
If using cmd, (don’t use backward slash
\and put everything on the same line) - 
If using Powershell, wrap
-Dparameters in double quotes e.g."-DprojectArtifactId=vertx-http-quickstart" 
You can already start the application in dev mode using:
quarkus dev
./mvnw quarkus:dev
./gradlew --console=plain quarkusDev
Then, creates a new Jakarta REST resource with the following content:
package org.acme.vertx;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
@Path("/async")
public class EventResource {
    @Inject
    EventBus bus;
    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("{name}")
    public Uni<String> greeting(String name) {
        return bus.<String>request("greeting", name)            (1)
                .onItem().transform(Message::body);            (2)
    }
}
| 1 | 将 name 发送到 greeting 地址,并要求作出回应 | 
| 2 | 当我们得到响应时,提取正文并将其发送给用户 | 
If you call this endpoint, you will wait and get a timeout. Indeed, no one is listening.
So, we need a consumer listening on the greeting address. Create a GreetingService bean with the following content:
package org.acme.vertx;
import io.quarkus.vertx.ConsumeEvent;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class GreetingService {
    @ConsumeEvent("greeting")
    public String greeting(String name) {
        return "Hello " + name;
    }
}
This bean receives the name, and returns the greeting message.
Now, open your browser to http://localhost:8080/async/Quarkus, and you should see:
Hello Quarkus
为了更好地理解,让我们详细介绍一下HTTP request/response 是如何被处理的:
- 
The request is received by the
hellomethod - 
含有该 name 的消息被发送到事件总线
 - 
另一个Bean收到这个消息,并计算出响应
 - 
使用回复机制发回此响应
 - 
一旦发送方收到回复,会将内容写入 HTTP 响应
 
This application can be packaged using:
quarkus build
./mvnw install
./gradlew build
You can also compile it as a native executable with:
quarkus build --native
./mvnw install -Dnative
./gradlew build -Dquarkus.native.enabled=true
使用编解码器
Vert.x事件总线 使用编解码器来 序列化 和 反序列化 对象。Quarkus为本地交付提供了一个默认的编解码器。所以你可以按以下方式交换对象:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", new MyName(name))
        .onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting")
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello " + name.getName());
}
If you want to use a specific codec, you need to explicitly set it on both ends:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("{name}")
public Uni<String> greeting(String name) {
    return bus.<String>request("greeting", name,
        new DeliveryOptions().setCodecName(MyNameCodec.class.getName())) (1)
        .onItem().transform(Message::body);
}
@ConsumeEvent(value = "greeting", codec = MyNameCodec.class)            (2)
Uni<String> greeting(MyName name) {
    return Uni.createFrom().item(() -> "Hello "+name.getName());
}
| 1 | 设置用于发送消息的编解码器的名称 | 
| 2 | 设置用于接收信息的编解码器 |